Browse Source

Fix #2315 - child query varfilename handling

Child queries with activties based on variable/dynamic filenames, need to
be re-initialized with new meta data from the master each iteration.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 năm trước cách đây
mục cha
commit
0972ea3df1

+ 5 - 1
thorlcr/activities/fetch/thfetch.cpp

@@ -33,6 +33,8 @@ class CFetchActivityMaster : public CMasterActivity
     SocketEndpoint *endpoints;
 
 protected:
+    IHThorFetchArg *helper;
+    IHThorFetchContext *fetchContext;
     Owned<IDistributedFile> fetchFile;
 public:
     CFetchActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
@@ -40,6 +42,9 @@ public:
         endpoints = NULL;
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJob().allocateMPTag();
+        helper = (IHThorFetchArg *)queryHelper();
+        fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
+        reInit = 0 != (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
     }
     ~CFetchActivityMaster()
     {
@@ -47,7 +52,6 @@ public:
     }
     virtual void init()
     {
-        IHThorFetchArg *helper = (IHThorFetchArg *)queryHelper();
         fetchFile.setown(queryThorFileManager().lookup(container.queryJob(), helper->getFileName(), false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true));
         if (fetchFile)
         {

+ 3 - 2
thorlcr/activities/fetch/thfetchslave.cpp

@@ -291,6 +291,9 @@ public:
         fetchStream = NULL;
         keyIn = NULL;
         fetchStreamOut = NULL;
+        fetchBaseHelper = (IHThorFetchBaseArg *)queryHelper();
+        fetchContext = static_cast<IHThorFetchContext *>(fetchBaseHelper->selectInterface(TAIfetchcontext_1));
+        reInit = 0 != (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
     }
     ~CFetchSlaveBase()
     {
@@ -300,8 +303,6 @@ public:
 
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        fetchBaseHelper = (IHThorFetchBaseArg *)queryHelper();
-        fetchContext = static_cast<IHThorFetchContext *>(queryHelper()->selectInterface(TAIfetchcontext_1));
         unsigned numParts;
         data.read(numParts);
         offsetCount = 0;

+ 12 - 11
thorlcr/activities/indexread/thindexread.cpp

@@ -32,7 +32,7 @@ protected:
     Linked<IDistributedFile> index;
     Owned<IFileDescriptor> fileDesc;
     rowcount_t limit;
-    IHThorIndexReadBaseArg *helper;
+    IHThorIndexReadBaseArg *indexBaseHelper;
     Owned<CSlavePartMapping> mapping;
     bool nofilter;
     ProgressInfoArray progressInfoArr;
@@ -69,7 +69,7 @@ protected:
         unsigned nparts = f->numParts(); // includes tlks if any, but unused in array
         performPartLookup.ensure(nparts);
 
-        bool checkTLKConsistency = NULL != super && 0 != (TIRsorted & helper->getFlags());
+        bool checkTLKConsistency = NULL != super && 0 != (TIRsorted & indexBaseHelper->getFlags());
         if (nofilter)
         {
             while (nparts--) performPartLookup.append(true);
@@ -154,11 +154,11 @@ protected:
                     }
                 }
                 if (!keyIndex)
-                    throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", helper->getFileName());
+                    throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", indexBaseHelper->getFileName());
             
-                unsigned maxSize = helper->queryDiskRecordSize()->querySerializedMeta()->getRecordSize(NULL); // used only if fixed
+                unsigned maxSize = indexBaseHelper->queryDiskRecordSize()->querySerializedMeta()->getRecordSize(NULL); // used only if fixed
                 Owned <IKeyManager> tlk = createKeyManager(keyIndex, maxSize, NULL);
-                helper->createSegmentMonitors(tlk);
+                indexBaseHelper->createSegmentMonitors(tlk);
                 tlk->finishSegmentMonitors();
                 tlk->reset();
                 while (tlk->lookup(false))
@@ -179,6 +179,7 @@ protected:
 public:
     CIndexReadBase(CMasterGraphElement *info) : CMasterActivity(info)
     {
+        indexBaseHelper = (IHThorIndexReadBaseArg *)queryHelper();
         limit = RCMAX;
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJob().allocateMPTag();
@@ -187,16 +188,16 @@ public:
         ForEachItemIn(l, progressLabels)
             progressInfoArr.append(*new ProgressInfo);
         inputProgress.setown(new ProgressInfo);
+        reInit = 0 != (indexBaseHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     void init()
     {
-        helper = (IHThorIndexReadArg *)queryHelper();
         nofilter = false;
 
-        index.setown(queryThorFileManager().lookup(container.queryJob(), helper->getFileName(), false, 0 != (TIRoptional & helper->getFlags()), true));
+        index.setown(queryThorFileManager().lookup(container.queryJob(), indexBaseHelper->getFileName(), false, 0 != (TIRoptional & indexBaseHelper->getFlags()), true));
         if (index)
         {
-            nofilter = 0 != (TIRnofilter & helper->getFlags());
+            nofilter = 0 != (TIRnofilter & indexBaseHelper->getFlags());
             if (index->queryAttributes().getPropBool("@local"))
                 nofilter = true;
             else
@@ -206,8 +207,8 @@ public:
                 if (sub && 1 == sub->numParts())
                     nofilter = true;
             }   
-            checkFormatCrc(this, index, helper->getFormatCrc(), true);
-            if ((container.queryLocalOrGrouped() || helper->canMatchAny()) && index->numParts())
+            checkFormatCrc(this, index, indexBaseHelper->getFormatCrc(), true);
+            if ((container.queryLocalOrGrouped() || indexBaseHelper->canMatchAny()) && index->numParts())
             {
                 fileDesc.setown(getConfiguredFileDescriptor(*index));
                 if (container.queryLocalOrGrouped())
@@ -220,7 +221,7 @@ public:
     }
     void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
-        dst.append(helper->getFileName());
+        dst.append(indexBaseHelper->getFileName());
         if (!container.queryLocalOrGrouped())
             dst.append(mpTag);
         IArrayOf<IPartDescriptor> parts;

+ 1 - 0
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -170,6 +170,7 @@ public:
         localKey = false;
         fixedDiskRecordSize = helper->queryDiskRecordSize()->querySerializedMeta()->getFixedSize(); // 0 if variable and unused
         progress = 0;
+        reInit = 0 != (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     rowcount_t sendGetCount(rowcount_t count)
     {

+ 11 - 12
thorlcr/activities/indexwrite/thindexwrite.cpp

@@ -38,24 +38,26 @@ class IndexWriteActivityMaster : public CMasterActivity
     Owned<ProgressInfo> replicateProgress;
     bool publishReplicatedDone;
     CDfsLogicalFileName dlfn;
+    IHThorIndexWriteArg *helper;
 
 public:
     IndexWriteActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
+        helper = (IHThorIndexWriteArg *)queryHelper();
         replicateProgress.setown(new ProgressInfo);
         publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
         recordsProcessed = 0;
         refactor = singlePartKey = isLocal = false;
         mpTag2 = TAG_NULL;
+        reInit = (0 != (TIWvarfilename & helper->getFlags()));
     }
     ~IndexWriteActivityMaster()
     {
         if (TAG_NULL != mpTag2)
             container.queryJob().freeMPTag(mpTag2);
     }
-    void init()
+    virtual void init()
     {
-        IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
         dlfn.set(helper->getFileName());
         isLocal = 0 != (TIWlocal & helper->getFlags());
         unsigned maxSize = helper->queryDiskRecordSize()->getMinRecordSize();
@@ -63,6 +65,7 @@ public:
             throw MakeActivityException(this, 0, "Index minimum record length (%d) exceeds 32767 internal limit", maxSize);
 
         singlePartKey = 0 != (helper->getFlags() & TIWsmall) || dlfn.isExternal();
+        clusters.kill();
         unsigned idx=0;
         while (helper->queryCluster(idx))
             clusters.append(helper->queryCluster(idx++));
@@ -170,7 +173,7 @@ public:
         mpTag = container.queryJob().allocateMPTag();
         mpTag2 = container.queryJob().allocateMPTag();
     }
-    void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
         IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper(); 
         dst.append(mpTag);  // used to build TLK on node1
@@ -220,7 +223,7 @@ public:
             }
         }
     }
-    void done()
+    virtual void done()
     {
         IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
         StringBuffer scopedName;
@@ -287,16 +290,12 @@ public:
             }
         }
     }
-    void process()
-    {
-        CMasterActivity::process();
-    }
-    void abort()
+    virtual void abort()
     {
         CMasterActivity::abort();
         cancelReceiveMsg(RANK_ALL, mpTag2);
     }
-    void preStart(size32_t parentExtractSz, const byte *parentExtract)
+    virtual void preStart(size32_t parentExtractSz, const byte *parentExtract)
     {
         CMasterActivity::preStart(parentExtractSz, parentExtract);
         IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *) queryHelper();
@@ -311,14 +310,14 @@ public:
             }
         }
     }
-    void deserializeStats(unsigned node, MemoryBuffer &mb)
+    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {
         CMasterActivity::deserializeStats(node, mb);
         unsigned repPerc;
         mb.read(repPerc);
         replicateProgress->set(node, repPerc);
     }
-    void getXGMML(IWUGraphProgress *progress, IPropertyTree *node)
+    virtual void getXGMML(IWUGraphProgress *progress, IPropertyTree *node)
     {
         CMasterActivity::getXGMML(progress, node);
         if (publishReplicatedDone)

+ 2 - 3
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -102,7 +102,7 @@ public:
 
     IndexWriteSlaveActivity(CGraphElementBase *_container) : ProcessSlaveActivity(_container)
     {
-        helper = NULL;
+        helper = static_cast <IHThorIndexWriteArg *> (queryHelper());
         sizeSignalled = false;
         initTotalCount = totalCount = 0;
         maxDiskRecordSize = lastRowSize = firstRowSize = 0;
@@ -117,12 +117,11 @@ public:
         needFirstRow = true;
         receivingTag2 = false;
         enableTlkPart0 = (0 != container.queryJob().getWorkUnitValueInt("enableTlkPart0", globals->getPropBool("@enableTlkPart0", true)));
+        reInit = (0 != (TIWvarfilename & helper->getFlags()));
     }
 
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        helper = static_cast <IHThorIndexWriteArg *> (queryHelper());
-
         isLocal = 0 != (TIWlocal & helper->getFlags());
 
         mpTag = container.queryJob().deserializeMPTag(data);

+ 6 - 5
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -26,6 +26,7 @@
 
 class CKeyedJoinMaster : public CMasterActivity
 {
+    IHThorKeyedJoinArg *helper;
     Owned<CSlavePartMapping> dataFileMapping;
     Owned<IDistributedFile> indexFile, dataFile;
     MemoryBuffer offsetMapMb, initMb;
@@ -39,13 +40,13 @@ class CKeyedJoinMaster : public CMasterActivity
 public:
     CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
+        helper = (IHThorKeyedJoinArg *) queryHelper();
         progressLabels.append("seeks");
         progressLabels.append("scans");
         progressLabels.append("accepted");
         progressLabels.append("postfiltered");
         progressLabels.append("prefiltered");
 
-        IHThorKeyedJoinArg *helper = (IHThorKeyedJoinArg *) queryHelper();
         if (helper->diskAccessRequired())
         {
             progressLabels.append("diskSeeks");
@@ -57,6 +58,7 @@ public:
         localKey = false;
         numTags = 0;
         tags[0] = tags[1] = tags[2] = tags[3] = TAG_NULL;
+        reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
     }
     ~CKeyedJoinMaster()
     {
@@ -67,14 +69,14 @@ public:
     }
     void init()
     {
-        IHThorKeyedJoinArg *helper = (IHThorKeyedJoinArg *)queryHelper();
-        
         indexFile.setown(queryThorFileManager().lookup(container.queryJob(), helper->getIndexFileName(), false, 0 != (helper->getJoinFlags() & JFindexoptional), true));
 
         unsigned keyReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJKRR", 0);
         if (!keyReadWidth || keyReadWidth>container.queryJob().querySlaves())
             keyReadWidth = container.queryJob().querySlaves();
         
+
+        initMb.clear();
         initMb.append(helper->getIndexFileName());
         if (helper->diskAccessRequired())
             numTags += 2;
@@ -222,7 +224,7 @@ public:
                                 dataReadWidth = container.queryJob().querySlaves();
                             Owned<IGroup> grp = container.queryJob().querySlaveGroup().subset((unsigned)0, dataReadWidth);
                             dataFileMapping.setown(getFileSlaveMaps(dataFile->queryLogicalName(), *dataFileDesc, container.queryJob().queryUserDescriptor(), *grp, false, false, NULL));
-                            dataFileMapping->serializeFileOffsetMap(offsetMapMb);
+                            dataFileMapping->serializeFileOffsetMap(offsetMapMb.clear());
                             queryThorFileManager().noteFileRead(container.queryJob(), dataFile);
                         }
                         else
@@ -240,7 +242,6 @@ public:
     }
     void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
-        IHThorKeyedJoinArg *helper = (IHThorKeyedJoinArg *)queryHelper();
         dst.append(initMb);
         if (indexFile && helper->diskAccessRequired())
         {

+ 5 - 0
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1587,6 +1587,7 @@ public:
         lastTick = 0;
 #endif
         helper = (IHThorKeyedJoinArg *)queryHelper();
+        reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
     }
     ~CKeyedJoinSlave()
     {
@@ -1832,6 +1833,10 @@ public:
             tags.append(tag);
             container.queryJob().queryJobComm().flush(tag);
         }
+        indexParts.kill();
+        dataParts.kill();
+        tlkKeySet.setown(createKeyIndexSet());
+        partKeySet.setown(createKeyIndexSet());
         unsigned numIndexParts;
         data.read(numIndexParts);
         if (numIndexParts)

+ 1 - 0
thorlcr/activities/thdiskbase.cpp

@@ -47,6 +47,7 @@ void CDiskReadMasterBase::init()
             fileDesc.setown(getConfiguredFileDescriptor(*file));
         else
             fileDesc.setown(file->getFileDescriptor());
+        reInit = 0 != (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename));
         if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
         {
             bool local;

+ 3 - 0
thorlcr/activities/thdiskbaseslave.cpp

@@ -201,6 +201,7 @@ const char * CDiskPartHandlerBase::queryLogicalFilename(const void * row)
 CDiskReadSlaveActivityBase::CDiskReadSlaveActivityBase(CGraphElementBase *_container) : CSlaveActivity(_container)
 {
     helper = (IHThorDiskReadBaseArg *)queryHelper();
+    reInit = 0 != (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename));
     crcCheckCompressed = 0 != container.queryJob().getWorkUnitValueInt("crcCheckCompressed", 0);
     markStart = gotMeta = false;
     checkFileCrc = !globals->getPropBool("Debug/@fileCrcDisabled");
@@ -209,6 +210,8 @@ CDiskReadSlaveActivityBase::CDiskReadSlaveActivityBase(CGraphElementBase *_conta
 // IThorSlaveActivity
 void CDiskReadSlaveActivityBase::init(MemoryBuffer &data, MemoryBuffer &slaveData)
 {
+    subfileLogicalFilenames.kill();
+    partDescs.kill();
     data.read(logicalFilename);
     unsigned subfiles;
     data.read(subfiles);

+ 4 - 3
thorlcr/graph/thgraph.cpp

@@ -522,8 +522,7 @@ void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
 
 void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
 {
-    if (!onStartCalled) return;
-    mb.append(queryId());
+    assertex(onStartCalled);
     unsigned pos = mb.length();
     mb.append((size32_t)0);
     queryHelper()->serializeStartContext(mb);
@@ -545,6 +544,7 @@ void CGraphElementBase::deserializeStartContext(MemoryBuffer &mb)
     mb.read(startCtxLen);
     startCtxMb.append(startCtxLen, mb.readDirect(startCtxLen));
     haveStartCtx = true;
+    onStartCalled = false; // allow to be called again
 }
 
 void CGraphElementBase::onCreate()
@@ -1068,6 +1068,7 @@ void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
     ForEach (*iter)
     {
         CGraphElementBase &element = iter->query();
+        mb.append(element.queryId());
         element.serializeStartContext(mb);
     }
     mb.append((activity_id)0);
@@ -2631,7 +2632,7 @@ IThorResource &queryThor()
 CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
 {
     mpTag = TAG_NULL;
-    abortSoon = cancelledReceive = false;
+    abortSoon = cancelledReceive = reInit = false;
     baseHelper.set(container.queryHelper());
     parentExtractSz = 0;
     parentExtract = NULL;

+ 2 - 1
thorlcr/graph/thgraph.hpp

@@ -950,7 +950,7 @@ protected:
     const bool &timeActivities; // purely for access efficiency
     size32_t parentExtractSz;
     const byte *parentExtract;
-    bool receiving, cancelledReceive;
+    bool receiving, cancelledReceive, reInit;
     unsigned maxCores; // NB: only used by acts that sort at the moment
 
 public:
@@ -963,6 +963,7 @@ public:
     inline const mptag_t queryMpTag() const { return mpTag; }
     inline const bool &queryAbortSoon() const { return abortSoon; }
     inline IHThorArg *queryHelper() const { return baseHelper; }
+    inline bool needReInit() const { return reInit; }
     inline const bool &queryTimeActivities() const { return timeActivities; } 
     void onStart(size32_t _parentExtractSz, const byte *_parentExtract) { parentExtractSz = _parentExtractSz; parentExtract = _parentExtract; }
     bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);

+ 36 - 46
thorlcr/graph/thgraphmaster.cpp

@@ -183,17 +183,6 @@ void CSlaveMessageHandler::main()
                     msg.read(gid);
                     Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
                     assertex(graph);
-                    size32_t parentExtractSz;
-                    msg.read(parentExtractSz);
-                    const byte *parentExtract = NULL;
-                    if (parentExtractSz)
-                    {
-                        parentExtract = msg.readDirect(parentExtractSz);
-                        StringBuffer msg("Graph(");
-                        msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
-                        DBGLOG("%s", msg.str());
-                        parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
-                    }
                     {
                         CriticalBlock b(graph->queryCreateLock());
                         Owned<IThorActivityIterator> iter = graph->getIterator();
@@ -202,8 +191,6 @@ void CSlaveMessageHandler::main()
                         {
                             CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
                             element.onCreate();
-                            if (isDiskInput(element.getKind()))
-                                element.onStart(parentExtractSz, parentExtract);
                         }
                     }
                     msg.clear();
@@ -218,19 +205,32 @@ void CSlaveMessageHandler::main()
                     Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
                     assertex(graph);
                     CGraphElementArray toSerialize;
+                    CriticalBlock b(graph->queryCreateLock());
+                    size32_t parentExtractSz;
+                    msg.read(parentExtractSz);
+                    const byte *parentExtract = NULL;
+                    if (parentExtractSz)
                     {
-                        CriticalBlock b(graph->queryCreateLock());
-                        loop
-                        {
-                            activity_id id;
-                            msg.read(id);
-                            if (!id)
-                                break;
-                            CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
-                            assertex(element);
-                            element->doCreateActivity();
-                            toSerialize.append(*LINK(element));
-                        }
+                        parentExtract = msg.readDirect(parentExtractSz);
+                        StringBuffer msg("Graph(");
+                        msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
+                        DBGLOG("%s", msg.str());
+                        parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
+                    }
+                    loop
+                    {
+                        activity_id id;
+                        msg.read(id);
+                        if (!id)
+                            break;
+                        CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
+                        assertex(element);
+                        element->deserializeStartContext(msg);
+                        element->doCreateActivity(parentExtractSz, parentExtract);
+                        CActivityBase *activity = element->queryActivity();
+                        if (activity && activity->needReInit())
+                            element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
+                        toSerialize.append(*LINK(element));
                     }
                     msg.clear();
                     CMessageBuffer replyMsg;
@@ -529,13 +529,14 @@ bool CMasterGraphElement::checkUpdate()
 
 void CMasterGraphElement::initActivity()
 {
-    if (activity)
-        return;
+    CriticalBlock b(crit);
+    bool first = (NULL == activity);
     CGraphElementBase::initActivity();
-    ((CMasterActivity *)activity.get())->init();
+    if (first || activity->needReInit())
+        ((CMasterActivity *)activity.get())->init();
 }
 
-void CMasterGraphElement::doCreateActivity()
+void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract)
 {
     bool ok=false;
     switch (getKind())
@@ -560,6 +561,8 @@ void CMasterGraphElement::doCreateActivity()
     if (!ok)
         return;
     onCreate();
+    if (isDiskInput(getKind()))
+       onStart(parentExtractSz, parentExtract);
     initActivity();
 }
 
@@ -2017,22 +2020,6 @@ void CMasterGraph::serializeCreateContexts(MemoryBuffer &mb)
     }
 }
 
-void CMasterGraph::serializeStartCtxs(MemoryBuffer &mb)
-{
-    Owned<IThorActivityIterator> iter = getTraverseIterator();
-    ForEach (*iter)
-    {
-        CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
-        mb.append(element.queryId());
-        unsigned pos = mb.length();
-        mb.append((size32_t)0);
-        element.queryHelper()->serializeStartContext(mb);
-        size32_t sz = (mb.length()-pos)-sizeof(size32_t);
-        mb.writeDirect(pos, sizeof(sz), &sz);
-    }
-    mb.append((activity_id)0);
-}
-
 bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
 {
     CriticalBlock b(createdCrit);
@@ -2162,6 +2149,9 @@ void CMasterGraph::sendActivityInitData()
         ForEach(*iter)
         {
             CGraphElementBase &element = iter->query();
+            CActivityBase *activity = element.queryActivity();
+            if (activity && activity->needReInit())
+                element.sentActInitData->set(w, false); // force act init to be resent
             if (!element.sentActInitData->test(w)) // has it been sent
                 ++needActInit;
         }
@@ -2320,7 +2310,7 @@ bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
     {
         sentStartCtx = true;
         CMessageBuffer msg;
-        serializeStartCtxs(msg);
+        serializeStartContexts(msg);
         try
         {
             jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "startCtx", &bcastTag, true);

+ 1 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -66,7 +66,6 @@ public:
     CriticalSection &queryCreateLock() { return createdCrit; }
     void handleSlaveDone(unsigned node, MemoryBuffer &mb);
     void serializeCreateContexts(MemoryBuffer &mb);
-    void serializeStartCtxs(MemoryBuffer &mb);
     bool serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter);
     void readActivityInitData(MemoryBuffer &mb, unsigned slave);
     bool deserializeStats(unsigned node, MemoryBuffer &mb);
@@ -277,7 +276,7 @@ public:
     bool sentCreateCtx;
 
     CMasterGraphElement(CGraphBase &owner, IPropertyTree &xgmml);
-    void doCreateActivity();
+    void doCreateActivity(size32_t parentExtractSz=0, const byte *parentExtract=NULL);
     virtual bool checkUpdate();
 
     virtual void initActivity();

+ 13 - 15
thorlcr/graph/thgraphslave.cpp

@@ -353,26 +353,21 @@ void CSlaveGraph::recvStartCtx()
         CMessageBuffer msg;
         if (!job.queryJobComm().recv(msg, 0, mpTag, NULL, LONGTIMEOUT))
             throw MakeStringException(0, "Error receiving startCtx data for graph: %"GIDPF"d", graphId);
-        activity_id id;
-        loop
-        {
-            msg.read(id);
-            if (0 == id) break;
-            CSlaveGraphElement *element = (CSlaveGraphElement *)queryElement(id);
-            assertex(element);
-            element->deserializeStartContext(msg);
-        }
+        deserializeStartContexts(msg);
     }
 }
 
-bool CSlaveGraph::recvActivityInitData()
+bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract)
 {
     bool ret = true;
     unsigned needActInit = 0;
     Owned<IThorActivityIterator> iter = getTraverseIterator();
     ForEach(*iter)
     {
-        CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
+        CGraphElementBase &element = (CGraphElementBase &)iter->query();
+        CActivityBase *activity = element.queryActivity();
+        if (activity && activity->needReInit())
+            element.sentActInitData->set(0, false); // force act init to be resent
         if (!element.sentActInitData->test(0))
             ++needActInit;
     }
@@ -395,12 +390,18 @@ bool CSlaveGraph::recvActivityInitData()
             // initialize any for which no data was sent
             msg.append(smt_initActDataReq); // may cause graph to be created at master
             msg.append(queryGraphId());
+            assertex(!parentExtractSz || NULL!=parentExtract);
+            msg.append(parentExtractSz);
+            msg.append(parentExtractSz, parentExtract);
             Owned<IThorActivityIterator> iter = getTraverseIterator();
             ForEach(*iter)
             {
                 CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
                 if (!element.sentActInitData->test(0))
+                {
                     msg.append(element.queryId());
+                    element.serializeStartContext(msg);
+                }
             }
             msg.append((activity_id)0);
             if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
@@ -457,7 +458,7 @@ bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
     recvStartCtx();
     CGraphBase::preStart(parentExtractSz, parentExtract);
 
-    if (!recvActivityInitData())
+    if (!recvActivityInitData(parentExtractSz, parentExtract))
         return false;
     connect(); // only now do slave acts. have all their outputs prepared.
     if (isGlobal())
@@ -578,9 +579,6 @@ void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
                 CMessageBuffer msg;
                 msg.append(smt_initGraphReq);
                 msg.append(graphId);
-                assertex(!parentExtractSz || NULL!=parentExtract);
-                msg.append(parentExtractSz);
-                msg.append(parentExtractSz, parentExtract);
                 if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
                     throwUnexpected();
                 unsigned len;

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -101,7 +101,7 @@ public:
     void connect();
     void init(MemoryBuffer &mb);
     void recvStartCtx();
-    bool recvActivityInitData();
+    bool recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract);
     void setExecuteReplyTag(mptag_t _executeReplyTag) { executeReplyTag = _executeReplyTag; }
     void initWithActData(MemoryBuffer &in, MemoryBuffer &out);
     void getDone(MemoryBuffer &doneInfoMb);