فهرست منبع

Merge pull request #8023 from ghalliday/issue14603

HPCC-14603 Compress serialized stats format

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 سال پیش
والد
کامیت
b84f3be7af
4فایلهای تغییر یافته به همراه57 افزوده شده و 11 حذف شده
  1. 2 2
      rtl/eclrtl/eclrtl.cpp
  2. 42 0
      system/jlib/jbuff.cpp
  3. 5 1
      system/jlib/jbuff.hpp
  4. 8 8
      system/jlib/jstats.cpp

+ 2 - 2
rtl/eclrtl/eclrtl.cpp

@@ -5804,8 +5804,8 @@ inline unsigned numExtraBytesFromFirst(byte first)
                 return 0;
 }
 
-static byte leadingValueMask[9] = { 0x7f, 0x3f, 0x1f, 0x0f, 0x07, 0x03, 0x01, 0x00, 0x00 };
-static byte leadingLengthMask[9] = { 0x00, 0x80, 0xC0, 0xE0, 0xF0, 0xF8, 0xFC, 0xFE, 0xFF };
+const static byte leadingValueMask[9] = { 0x7f, 0x3f, 0x1f, 0x0f, 0x07, 0x03, 0x01, 0x00, 0x00 };
+const static byte leadingLengthMask[9] = { 0x00, 0x80, 0xC0, 0xE0, 0xF0, 0xF8, 0xFC, 0xFE, 0xFF };
 
 //maximum number of bytes for a packed value is size+1 bytes for size <=8 and last byte being fully used.
 unsigned __int64 rtlGetPackedUnsigned(const void * _ptr)

+ 42 - 0
system/jlib/jbuff.cpp

@@ -554,6 +554,18 @@ MemoryBuffer & MemoryBuffer::append(unsigned __int64 value)
     return appendEndian(sizeof(value), &value);
 }
 
+MemoryBuffer & MemoryBuffer::appendPacked(unsigned __int64 value)
+{
+    //Append bytes with the top bit set until the value is less than 0x80
+    while (value >= 0x80)
+    {
+        byte next = ((byte)value) | 0x80;
+        append(next);
+        value >>= 7;
+    }
+    return append((byte)value);
+}
+
 MemoryBuffer & MemoryBuffer::append(const MemoryBuffer & value)
 {
     size32_t SourceLen = value.length();
@@ -736,6 +748,36 @@ const byte * MemoryBuffer::readDirect(size32_t len)
     return ret;
 }
 
+unsigned __int64 MemoryBuffer::readPacked()
+{
+    unsigned __int64 value = 0;
+    unsigned shift = 0;
+    loop
+    {
+        byte next;
+        read(next);
+        value = value | (((unsigned __int64)(next & 0x7f)) << shift);
+        if (!(next & 0x80))
+            break;
+        shift += 7;
+    }
+    return value;
+}
+
+MemoryBuffer &  MemoryBuffer::readPacked(unsigned & value)
+{
+    unsigned __int64 serializedValue = readPacked();
+    dbgassertex((unsigned)serializedValue == serializedValue);
+    value = (unsigned)serializedValue;
+    return *this;
+}
+
+MemoryBuffer &  MemoryBuffer::readPacked(unsigned __int64 & value)
+{
+    value = readPacked();
+    return *this;
+}
+
 MemoryBuffer & MemoryBuffer::skip(unsigned len)
 {
     CHECKREADPOS(len);

+ 5 - 1
system/jlib/jbuff.hpp

@@ -56,7 +56,6 @@ private:
     size_t len;
 };
 
-
 //--------------------------------------------------------------------------------------------------------------------
 
 interface IMemoryBlock
@@ -159,6 +158,7 @@ public:
     MemoryBuffer &  appendEndian(size32_t len, const void * value);
     MemoryBuffer &  appendFile(const char *fileName);
     MemoryBuffer &  appendSwap(size32_t len, const void * value);
+    MemoryBuffer &  appendPacked(unsigned __int64 value); // compatible with any unsigned size
     inline MemoryBuffer &  appendMemSize(memsize_t  value) { __int64 val=(__int64)value; append(val); return *this; }
     
     
@@ -182,6 +182,9 @@ public:
     MemoryBuffer &  readFile(StringAttr &fileName);
     MemoryBuffer &  readSwap(size32_t len, void * value);
     const byte *    readDirect(size32_t len);                                       // for efficiency
+    MemoryBuffer &  readPacked(unsigned & value);
+    MemoryBuffer &  readPacked(unsigned __int64 & value);
+
     inline MemoryBuffer &  readMemSize(memsize_t & value) { __int64 val; read(val); value = (memsize_t)val; assertex(val == (__int64) value); return *this; }
     MemoryBuffer &  skip(unsigned len);
     void            writeDirect(size32_t pos,size32_t len,const void *buf);         // NB does not extend buffer
@@ -229,6 +232,7 @@ private:
     void _insert(unsigned offset, size32_t len);
     void init();
     void kill();
+    unsigned __int64 readPacked();
     void _realloc(size32_t max);
     void _reallocExact(size32_t max);
     MemoryBuffer & _remove(unsigned start, unsigned len);

+ 8 - 8
system/jlib/jstats.cpp

@@ -1474,12 +1474,12 @@ StringBuffer & CRuntimeStatisticCollection::toStr(StringBuffer &str) const
 void CRuntimeStatisticCollection::deserialize(MemoryBuffer& in)
 {
     unsigned numValid;
-    in.read(numValid);
+    in.readPacked(numValid);
     for (unsigned i=0; i < numValid; i++)
     {
         unsigned kindVal;
         unsigned __int64 value;
-        in.read(kindVal).read(value);
+        in.readPacked(kindVal).readPacked(value);
         StatisticKind kind = (StatisticKind)kindVal;
         setStatistic(kind, value);
     }
@@ -1488,12 +1488,12 @@ void CRuntimeStatisticCollection::deserialize(MemoryBuffer& in)
 void CRuntimeStatisticCollection::deserializeMerge(MemoryBuffer& in)
 {
     unsigned numValid;
-    in.read(numValid);
+    in.readPacked(numValid);
     for (unsigned i=0; i < numValid; i++)
     {
         unsigned kindVal;
         unsigned __int64 value;
-        in.read(kindVal).read(value);
+        in.readPacked(kindVal).readPacked(value);
         StatisticKind kind = (StatisticKind)kindVal;
         StatsMergeAction mergeAction = queryMergeMode(kind);
         mergeStatistic(kind, value, mergeAction);
@@ -1508,15 +1508,15 @@ bool CRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
         if (values[i1].get())
             numValid++;
     }
-    //out.ensure(sizeof(unsigned)+numValid*(sizeof(unsigned)+sizeof(unsigned __int64)));
-    out.append(numValid);
+
+    out.appendPacked(numValid);
     ForEachItem(i2)
     {
         unsigned __int64 value = values[i2].get();
         if (value)
         {
-            out.append((unsigned)mapping.getKind(i2));
-            out.append(value);
+            out.appendPacked((unsigned)mapping.getKind(i2));
+            out.appendPacked(value);
         }
     }
     return numValid != 0;