소스 검색

Fix gh-2828 - Clean up code using DelayedSizeMarker

Use recently introduced DelayedSizeMarker utility class to tidyup and
clarify some size back patching code

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 년 전
부모
커밋
6daa1892f2

+ 2 - 5
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -108,14 +108,11 @@ protected:
     void sendResult(const void *row, IOutputRowSerializer *serializer, rank_t dst)
     {
         CMessageBuffer mb;
-        size32_t start = mb.length();
-        size32_t sz = 0;
-        mb.append(sz);
+        DelayedSizeMarker sizeMark(mb);
         if (row&&hadElement) {
             CMemoryRowSerializer mbs(mb);
             serializer->serialize(mbs,(const byte *)row);
-            sz = mb.length()-start-sizeof(size32_t);
-            mb.writeDirect(start,sizeof(size32_t),&sz);
+            sizeMark.write();
         }
         container.queryJob().queryJobComm().send(mb, dst, mpTag);
     }

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1904,7 +1904,7 @@ public:
             {
                 unsigned numDataParts;
                 data.read(numDataParts);
-                unsigned offsetMapSz = 0;
+                size32_t offsetMapSz = 0;
                 if (numDataParts)
                 {
                     deserializePartFileDescriptors(data, dataParts);

+ 4 - 5
thorlcr/activities/result/thresultslave.cpp

@@ -48,13 +48,12 @@ public:
 
         OwnedConstThorRow row = input->ungroupedNextRow();
         CMessageBuffer mb;
-        size32_t lenpos = mb.length(); // its 0 really
-        mb.append((size32_t)0);
-        if (row) {
+        DelayedSizeMarker sizeMark(mb);
+        if (row)
+        {
             CMemoryRowSerializer msz(mb);
             ::queryRowSerializer(input)->serialize(msz,(const byte *)row.get());
-            size32_t sz = mb.length()-lenpos-sizeof(size32_t);
-            mb.writeDirect(lenpos,sizeof(size32_t),&sz);
+            sizeMark.write();
             processed++;
         }
         container.queryJob().queryJobComm().send(mb, 0, masterMpTag);

+ 5 - 10
thorlcr/activities/rollup/throllupslave.cpp

@@ -252,29 +252,24 @@ public:
             return false;
         CMessageBuffer msg;
         msg.append(numKept);
-        unsigned msgPos = msg.length();
-        msg.append((size32_t)0);
+        DelayedSizeMarker sizeMark(msg);
         if (kept.get())
         {
             CMemoryRowSerializer msz(msg);
             rowif->queryRowSerializer()->serialize(msz,(const byte *)kept.get());
-            size32_t sz = msg.length()-(msgPos+sizeof(sz));
-            msg.writeDirect(msgPos, sizeof(sz), &sz);
+            sizeMark.write();
             if (rollup)
             {
-                msgPos = msg.length();
-                msg.append((size32_t)0);
+                sizeMark.restart();
                 if (kept.get()!=keptTransformed.get())
                 {
-                    sz = msg.length();
                     rowif->queryRowSerializer()->serialize(msz,(const byte *)keptTransformed.get());
-                    sz = msg.length()-(msgPos+sizeof(sz));
-                    msg.writeDirect(msgPos, sizeof(sz), &sz);
+                    sizeMark.write();
                 }
             }
         }
         else if (rollup)
-            msg.append((size32_t)0);
+            sizeMark.restart(); // write (0 size) for keptTransformed row
         container.queryJob().queryJobComm().send(msg, container.queryJob().queryMyRank()+1, mpTag); // to next node
         return true;
     }

+ 8 - 16
thorlcr/graph/thgraph.cpp

@@ -513,21 +513,17 @@ void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
 {
     if (!onCreateCalled) return;
     mb.append(queryId());
-    unsigned pos = mb.length();
-    mb.append((size32_t)0);
+    DelayedSizeMarker sizeMark(mb);
     queryHelper()->serializeCreateContext(mb);
-    size32_t sz = (mb.length()-pos)-sizeof(size32_t);
-    mb.writeDirect(pos, sizeof(sz), &sz);
+    sizeMark.write();
 }
 
 void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
 {
     assertex(onStartCalled);
-    unsigned pos = mb.length();
-    mb.append((size32_t)0);
+    DelayedSizeMarker sizeMark(mb);
     queryHelper()->serializeStartContext(mb);
-    size32_t sz = (mb.length()-pos)-sizeof(size32_t);
-    mb.writeDirect(pos, sizeof(sz), &sz);
+    sizeMark.write();
 }
 
 void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
@@ -1052,8 +1048,7 @@ void CGraphBase::clean()
 
 void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
 {
-    unsigned pos = mb.length();
-    mb.append((unsigned)0);
+    DelayedSizeMarker sizeMark(mb);
     Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getIterator() : getTraverseIterator(true); // all if non-global-child, or graph with conditionals
     ForEach (*iter)
     {
@@ -1061,14 +1056,12 @@ void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
         element.serializeCreateContext(mb);
     }
     mb.append((activity_id)0);
-    unsigned len=mb.length()-pos-sizeof(unsigned);
-    mb.writeDirect(pos, sizeof(len), &len);
+    sizeMark.write();
 }
 
 void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
 {
-    unsigned pos = mb.length();
-    mb.append((unsigned)0);
+    DelayedSizeMarker sizeMark(mb);
     Owned<IThorActivityIterator> iter = getTraverseIterator();
     ForEach (*iter)
     {
@@ -1077,8 +1070,7 @@ void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
         element.serializeStartContext(mb);
     }
     mb.append((activity_id)0);
-    unsigned len=mb.length()-pos-sizeof(unsigned);
-    mb.writeDirect(pos, sizeof(len), &len);
+    sizeMark.write();
 }
 
 void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)

+ 5 - 9
thorlcr/graph/thgraphmaster.cpp

@@ -2032,8 +2032,7 @@ void CMasterGraph::serializeCreateContexts(MemoryBuffer &mb)
 bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
 {
     CriticalBlock b(createdCrit);
-    unsigned pos=mb.length();
-    mb.append((unsigned)0);
+    DelayedSizeMarker sizeMark1(mb);
     ForEach (iter)
     {
         CMasterGraphElement &element = (CMasterGraphElement &)iter.query();
@@ -2043,19 +2042,16 @@ bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, I
             if (activity)
             {
                 mb.append(element.queryId());
-                unsigned pos = mb.length();
-                mb.append((size32_t)0);
+                DelayedSizeMarker sizeMark2(mb);
                 activity->serializeSlaveData(mb, slave);
-                size32_t sz = (mb.length()-pos)-sizeof(size32_t);
-                mb.writeDirect(pos, sizeof(sz), &sz);
+                sizeMark2.write();
             }
         }
     }
-    if (pos == (mb.length()-sizeof(unsigned)))
+    if (0 == sizeMark1.size())
         return false;
     mb.append((activity_id)0); // terminator
-    unsigned len=mb.length()-pos-sizeof(unsigned);
-    mb.writeDirect(pos, sizeof(len), &len);
+    sizeMark1.write();
     return true;
 }
 

+ 3 - 3
thorlcr/graph/thgraphslave.cpp

@@ -382,7 +382,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
     if (needActInit)
     {
         mptag_t replyTag = TAG_NULL;
-        unsigned len;
+        size32_t len;
         CMessageBuffer actInitRtnData;
         actInitRtnData.append(false);
         CMessageBuffer msg;
@@ -555,7 +555,7 @@ void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
                 throw MakeStringException(0, "Error receiving createctx data for graph: %"GIDPF"d", graphId);
             try
             {
-                unsigned len;
+                size32_t len;
                 msg.read(len);
                 if (len)
                 {
@@ -595,7 +595,7 @@ void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
                 msg.append(graphId);
                 if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
                     throwUnexpected();
-                unsigned len;
+                size32_t len;
                 msg.read(len);
                 if (len)
                     deserializeCreateContexts(msg);

+ 2 - 4
thorlcr/master/thactivitymaster.cpp

@@ -554,8 +554,7 @@ CSlavePartMapping::CSlavePartMapping(const char *_logicalName, IFileDescriptor &
 void CSlavePartMapping::serializeFileOffsetMap(MemoryBuffer &mb)
 {
     mb.append(fileWidth);
-    unsigned pos = mb.length();
-    mb.append((unsigned)0);
+    DelayedSizeMarker sizeMark(mb);
     ForEachItemIn(sm, maps)
     {
         CSlaveMap &map = maps.item(sm);
@@ -570,8 +569,7 @@ void CSlavePartMapping::serializeFileOffsetMap(MemoryBuffer &mb)
             mb.append(sizeof(FPosTableEntry), &entry);
         }
     }
-    unsigned l = mb.length()-pos-sizeof(unsigned);
-    mb.writeDirect(pos, sizeof(unsigned), &l);
+    sizeMark.write();
 }
 
 CSlavePartMapping *getFileSlaveMaps(const char *logicalName, IFileDescriptor &fileDesc, IUserDescriptor *userDesc, IGroup &localGroup, bool local, bool index, IHash *hash, IDistributedSuperFile *super)

+ 2 - 4
thorlcr/msort/tsorta.cpp

@@ -176,14 +176,12 @@ void CThorKeyArray::serialize(MemoryBuffer &mb)
     mb.append(totalserialsize);
     bool haskeyserializer = keyserializer!=NULL;
     mb.append(haskeyserializer);
-    size32_t pos = mb.length();
-    mb.append((size32_t)0);
+    DelayedSizeMarker sizeMark(mb);
     IOutputRowSerializer *serializer = haskeyserializer?keyif->queryRowSerializer():rowif->queryRowSerializer();
     CMemoryRowSerializer msz(mb);
     for (i=0;i<n;i++) 
         serializer->serialize(msz,(const byte *)keys.query(i));
-    size32_t l = mb.length()-pos-sizeof(size32_t);
-    mb.writeDirect(pos,sizeof(l),&l);
+    sizeMark.write();
 }
 
 void CThorKeyArray::deserialize(MemoryBuffer &mb,bool append)

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -274,7 +274,7 @@ public:
                         subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
                         PROGLOG("GraphInit: %s, graphId=%"GIDPF"d", jobKey.get(), subGraph->queryGraphId());
                         subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
-                        unsigned len;
+                        size32_t len;
                         msg.read(len);
                         MemoryBuffer initData;
                         initData.append(len, msg.readDirect(len));

+ 2 - 4
thorlcr/slave/slwatchdog.cpp

@@ -123,11 +123,9 @@ public:
             LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
             if (mb)
             {
-                unsigned pos=mb->length();
-                mb->append((size32_t)0); // placeholder
+                DelayedSizeMarker sizeMark(*mb);
                 gatherData(*mb);
-                size32_t len=(mb->length()-pos)-sizeof(size32_t);
-                mb->writeDirect(pos, sizeof(len), &len);
+                sizeMark.write();
             }
             activeGraphs.zap(graph);
         }