Browse Source

HPCC-16051 A couple of problems in prev. commit spotted/fixed

NB: This is mostly tidying up, removing a couple of unused methods
and unecssary member variables (involve onStart())

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 8 years ago
parent
commit
badc5b7de8

+ 19 - 67
thorlcr/graph/thgraph.cpp

@@ -361,7 +361,7 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
     isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
     isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
     resultsGraph = NULL;
     resultsGraph = NULL;
     ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
     ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
-    onCreateCalled = onStartCalled = prepared = haveCreateCtx = haveStartCtx = nullAct = false;
+    onCreateCalled = prepared = haveCreateCtx = nullAct = false;
     onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
     onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
 
 
     StringBuffer helperName("fAc");
     StringBuffer helperName("fAc");
@@ -467,7 +467,6 @@ IThorGraphDependencyIterator *CGraphElementBase::getDependsIterator() const
 void CGraphElementBase::reset()
 void CGraphElementBase::reset()
 {
 {
     alreadyUpdated = false;
     alreadyUpdated = false;
-    onStartCalled = false;
     if (activity)
     if (activity)
         activity->reset();
         activity->reset();
 }
 }
@@ -556,14 +555,6 @@ void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
         mb.append(alreadyUpdated);
         mb.append(alreadyUpdated);
 }
 }
 
 
-void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
-{
-    if (!onStartCalled) return;
-    DelayedSizeMarker sizeMark(mb);
-    queryHelper()->serializeStartContext(mb);
-    sizeMark.write();
-}
-
 void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
 void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
 {
 {
     size32_t createCtxLen;
     size32_t createCtxLen;
@@ -574,13 +565,11 @@ void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
         mb.read(alreadyUpdated);
         mb.read(alreadyUpdated);
 }
 }
 
 
-void CGraphElementBase::deserializeStartContext(MemoryBuffer &mb)
+void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
 {
 {
-    size32_t startCtxLen;
-    mb.read(startCtxLen);
-    startCtxMb.clear().append(startCtxLen, mb.readDirect(startCtxLen));
-    haveStartCtx = true;
-    onStartCalled = false; // allow to be called again
+    DelayedSizeMarker sizeMark(mb);
+    queryHelper()->serializeStartContext(mb);
+    sizeMark.write();
 }
 }
 
 
 void CGraphElementBase::onCreate()
 void CGraphElementBase::onCreate()
@@ -609,22 +598,12 @@ void CGraphElementBase::onCreate()
     }
     }
 }
 }
 
 
-void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract)
+void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
 {
 {
-    if (onStartCalled)
+    if (nullAct)
         return;
         return;
-    onStartCalled = true;
-    if (!nullAct)
-    {
-        if (haveStartCtx)
-        {
-            baseHelper->onStart(parentExtract, &startCtxMb);
-            startCtxMb.reset();
-            haveStartCtx = false;
-        }
-        else
-            baseHelper->onStart(parentExtract, NULL);
-    }
+    CriticalBlock b(crit);
+    baseHelper->onStart(parentExtract, startCtx);
 }
 }
 
 
 bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
 bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
@@ -647,7 +626,7 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
         bool create = true;
         bool create = true;
         if (connectOnly)
         if (connectOnly)
         {
         {
-            if (activity)
+            if (prepared)
                 return true;
                 return true;
             ForEachItemIn(i, inputs)
             ForEachItemIn(i, inputs)
             {
             {
@@ -799,10 +778,7 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
                 CIOConnection *inputIO = inputs.item(i2);
                 CIOConnection *inputIO = inputs.item(i2);
                 connectInput(i2, inputIO->activity, inputIO->index);
                 connectInput(i2, inputIO->activity, inputIO->index);
             }
             }
-            if (isSink())
-                owner->addActiveSink(*this);
-            assertex(!activity);
-            activity.setown(factory());
+            createActivity();
         }
         }
         return true;
         return true;
     }
     }
@@ -828,11 +804,14 @@ void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExt
     activity->preStart(parentExtractSz, parentExtract);
     activity->preStart(parentExtractSz, parentExtract);
 }
 }
 
 
-void CGraphElementBase::initActivity()
+void CGraphElementBase::createActivity()
 {
 {
+    CriticalBlock b(crit);
     if (activity)
     if (activity)
         return;
         return;
     activity.setown(factory());
     activity.setown(factory());
+    if (isSink())
+        owner->addActiveSink(*this);
 }
 }
 
 
 ICodeContext *CGraphElementBase::queryCodeContext()
 ICodeContext *CGraphElementBase::queryCodeContext()
@@ -1104,23 +1083,6 @@ void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
     sizeMark.write();
     sizeMark.write();
 }
 }
 
 
-void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
-{
-    DelayedSizeMarker sizeMark(mb);
-    Owned<IThorActivityIterator> iter = getIterator();
-    ForEach (*iter)
-    {
-        CGraphElementBase &element = iter->query();
-        if (element.isOnStarted())
-        {
-            mb.append(element.queryId());
-            element.serializeStartContext(mb);
-        }
-    }
-    mb.append((activity_id)0);
-    sizeMark.write();
-}
-
 void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
 void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
 {
 {
     activity_id id;
     activity_id id;
@@ -1134,19 +1096,6 @@ void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
     }
     }
 }
 }
 
 
-void CGraphBase::deserializeStartContexts(MemoryBuffer &mb)
-{
-    activity_id id;
-    loop
-    {
-        mb.read(id);
-        if (0 == id) break;
-        CGraphElementBase *element = queryElement(id);
-        assertex(element);
-        element->deserializeStartContext(mb);
-    }
-}
-
 void CGraphBase::reset()
 void CGraphBase::reset()
 {
 {
     setCompleteEx(false);
     setCompleteEx(false);
@@ -1194,7 +1143,6 @@ bool CGraphBase::fireException(IException *e)
 
 
 bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
 bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
 {
 {
-    started = true; // causes reset() to be called on all subsequent executions of this subgraph.
     Owned<IThorActivityIterator> iter = getConnectedIterator();
     Owned<IThorActivityIterator> iter = getConnectedIterator();
     ForEach(*iter)
     ForEach(*iter)
     {
     {
@@ -1274,11 +1222,14 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
             throw abortException.getLink();
             throw abortException.getLink();
         throw MakeGraphException(this, 0, "subgraph aborted");
         throw MakeGraphException(this, 0, "subgraph aborted");
     }
     }
+    GraphPrintLog("Processing graph");
     Owned<IException> exception;
     Owned<IException> exception;
     try
     try
     {
     {
         if (started)
         if (started)
             reset();
             reset();
+        else
+            started = true;
         Owned<IThorActivityIterator> iter = getConnectedIterator();
         Owned<IThorActivityIterator> iter = getConnectedIterator();
         ForEach(*iter)
         ForEach(*iter)
         {
         {
@@ -1286,6 +1237,7 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
             element.onStart(parentExtractSz, parentExtract);
             element.onStart(parentExtractSz, parentExtract);
             element.initActivity();
             element.initActivity();
         }
         }
+        initialized = true;
         if (!preStart(parentExtractSz, parentExtract)) return;
         if (!preStart(parentExtractSz, parentExtract)) return;
         start();
         start();
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.

+ 5 - 8
thorlcr/graph/thgraph.hpp

@@ -249,13 +249,13 @@ protected:
     activity_id id, ownerId;
     activity_id id, ownerId;
     StringAttr eclText;
     StringAttr eclText;
     Owned<IPropertyTree> xgmml;
     Owned<IPropertyTree> xgmml;
-    bool isLocal, isLocalData, isGrouped, sink, prepared, onCreateCalled, onStartCalled, onlyUpdateIfChanged, nullAct, log;
+    bool isLocal, isLocalData, isGrouped, sink, prepared, onCreateCalled, onlyUpdateIfChanged, nullAct, log;
     Owned<CActivityBase> activity;
     Owned<CActivityBase> activity;
     CGraphBase *resultsGraph, *owner;
     CGraphBase *resultsGraph, *owner;
     CGraphDependencyArray dependsOn;
     CGraphDependencyArray dependsOn;
     Owned<IThorBoundLoopGraph> loopGraph; // really only here as master and slave derivatives set/use
     Owned<IThorBoundLoopGraph> loopGraph; // really only here as master and slave derivatives set/use
     MemoryBuffer createCtxMb, startCtxMb;
     MemoryBuffer createCtxMb, startCtxMb;
-    bool haveCreateCtx, haveStartCtx;
+    bool haveCreateCtx;
     unsigned maxCores;
     unsigned maxCores;
 
 
 public:
 public:
@@ -297,17 +297,15 @@ public:
     IThorBoundLoopGraph *queryLoopGraph() { return loopGraph; }
     IThorBoundLoopGraph *queryLoopGraph() { return loopGraph; }
     bool executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async);
     bool executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async);
     virtual void deserializeCreateContext(MemoryBuffer &mb);
     virtual void deserializeCreateContext(MemoryBuffer &mb);
-    virtual void deserializeStartContext(MemoryBuffer &mb);
     virtual void serializeCreateContext(MemoryBuffer &mb); // called after onCreate and create() (of activity)
     virtual void serializeCreateContext(MemoryBuffer &mb); // called after onCreate and create() (of activity)
     virtual void serializeStartContext(MemoryBuffer &mb);
     virtual void serializeStartContext(MemoryBuffer &mb);
     virtual bool checkUpdate() { return alreadyUpdated; }
     virtual bool checkUpdate() { return alreadyUpdated; }
     virtual void reset();
     virtual void reset();
-    void onStart(size32_t parentExtractSz, const byte *parentExtract);
+    void onStart(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx=nullptr);
     void onCreate();
     void onCreate();
     void abort(IException *e);
     void abort(IException *e);
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
     bool isOnCreated() const { return onCreateCalled; }
     bool isOnCreated() const { return onCreateCalled; }
-    bool isOnStarted() const { return onStartCalled; }
     bool isPrepared() const { return prepared; }
     bool isPrepared() const { return prepared; }
 
 
     CGraphBase &queryOwner() const { return *owner; }
     CGraphBase &queryOwner() const { return *owner; }
@@ -350,9 +348,10 @@ public:
         return dst;
         return dst;
     }
     }
     virtual bool prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async, bool connectOnly);
     virtual bool prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async, bool connectOnly);
+    void createActivity();
     CActivityBase *queryActivity() { return activity; }
     CActivityBase *queryActivity() { return activity; }
 //
 //
-    virtual void initActivity();
+    virtual void initActivity() { }
     virtual CActivityBase *factory(ThorActivityKind kind) { assertex(false); return NULL; }
     virtual CActivityBase *factory(ThorActivityKind kind) { assertex(false); return NULL; }
     virtual CActivityBase *factory() { return factory(getKind()); }
     virtual CActivityBase *factory() { return factory(getKind()); }
     virtual CActivityBase *factorySet(ThorActivityKind kind) { CActivityBase *_activity = factory(kind); activity.setown(_activity); return _activity; }
     virtual CActivityBase *factorySet(ThorActivityKind kind) { CActivityBase *_activity = factory(kind); activity.setown(_activity); return _activity; }
@@ -607,9 +606,7 @@ public:
     unsigned queryLoopCounter() const { return counter; }
     unsigned queryLoopCounter() const { return counter; }
     virtual void setComplete(bool tf=true) { complete=tf; }
     virtual void setComplete(bool tf=true) { complete=tf; }
     virtual void deserializeCreateContexts(MemoryBuffer &mb);
     virtual void deserializeCreateContexts(MemoryBuffer &mb);
-    virtual void deserializeStartContexts(MemoryBuffer &mb);
     virtual void serializeCreateContexts(MemoryBuffer &mb);
     virtual void serializeCreateContexts(MemoryBuffer &mb);
-    virtual void serializeStartContexts(MemoryBuffer &mb);
     virtual void reset();
     virtual void reset();
     void disconnectActivities()
     void disconnectActivities()
     {
     {

+ 17 - 11
thorlcr/graph/thgraphmaster.cpp

@@ -234,7 +234,9 @@ void CSlaveMessageHandler::main()
                         try
                         try
                         {
                         {
                             element->reset();
                             element->reset();
-                            element->doCreateActivity(parentExtractSz, parentExtract, &msg);
+                            size32_t startCtxLen;
+                            msg.read(startCtxLen);
+                            element->doCreateActivity(parentExtractSz, parentExtract, startCtxLen ? &msg : nullptr);
                         }
                         }
                         catch (IException *e)
                         catch (IException *e)
                         {
                         {
@@ -247,6 +249,7 @@ void CSlaveMessageHandler::main()
                             element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
                             element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
                         toSerialize.append(*LINK(element));
                         toSerialize.append(*LINK(element));
                     }
                     }
+                    graph->setInitialized();
                     msg.clear();
                     msg.clear();
                     mptag_t replyTag = job.queryJobChannel(0).queryMPServer().createReplyTag();
                     mptag_t replyTag = job.queryJobChannel(0).queryMPServer().createReplyTag();
                     msg.append(replyTag); // second reply
                     msg.append(replyTag); // second reply
@@ -608,21 +611,25 @@ bool CMasterGraphElement::checkUpdate()
 
 
 void CMasterGraphElement::initActivity()
 void CMasterGraphElement::initActivity()
 {
 {
-    CGraphElementBase::initActivity();
-    if (!initialized || queryActivity()->needReInit())
+    CriticalBlock b(crit);
+    if (!initialized)
     {
     {
-        ((CMasterActivity *)queryActivity())->init();
         initialized = true;
         initialized = true;
+        ((CMasterActivity *)queryActivity())->init();
     }
     }
-    owner->setInitialized();
+}
+
+void CMasterGraphElement::reset()
+{
+    CGraphElementBase::reset();
+    if (activity && activity->needReInit())
+        initialized = false;
 }
 }
 
 
 void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
 void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
 {
 {
-    onCreate();
-    if (startCtx)
-        deserializeStartContext(*startCtx);
-    onStart(parentExtractSz, parentExtract);
+    createActivity();
+    onStart(parentExtractSz, parentExtract, startCtx);
     initActivity();
     initActivity();
 }
 }
 
 
@@ -2672,8 +2679,7 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
                 Owned<IException> e;
                 Owned<IException> e;
                 try
                 try
                 {
                 {
-                    element->onCreate();
-                    element->initActivity();
+                    element->createActivity();
                     activity = (CMasterActivity *)element->queryActivity();
                     activity = (CMasterActivity *)element->queryActivity();
                 }
                 }
                 catch (IException *_e)
                 catch (IException *_e)

+ 2 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -327,10 +327,10 @@ class graphmaster_decl CMasterGraphElement : public CGraphElementBase
     bool initialized = false;
     bool initialized = false;
 public:
 public:
     CMasterGraphElement(CGraphBase &owner, IPropertyTree &xgmml);
     CMasterGraphElement(CGraphBase &owner, IPropertyTree &xgmml);
-    void doCreateActivity(size32_t parentExtractSz=0, const byte *parentExtract=NULL, MemoryBuffer *startCtx=nullptr);
+    void doCreateActivity(size32_t parentExtractSz=0, const byte *parentExtract=nullptr, MemoryBuffer *startCtx=nullptr);
     virtual bool checkUpdate();
     virtual bool checkUpdate();
-
     virtual void initActivity() override;
     virtual void initActivity() override;
+    virtual void reset() override;
     virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb);
     virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb);
 };
 };
 
 

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -855,7 +855,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
             msg.append(parentExtractSz);
             msg.append(parentExtractSz);
             msg.append(parentExtractSz, parentExtract);
             msg.append(parentExtractSz, parentExtract);
 
 
-            // NB: will only request activities that need initializaton data (those that override CSlaveActivity::init())
+            // NB: will only request activities that need initializaton data
             ForEach(*iter)
             ForEach(*iter)
             {
             {
                 CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
                 CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -161,7 +161,7 @@ public:
     ~CSlaveActivity();
     ~CSlaveActivity();
     void setRequireInitData(bool tf)
     void setRequireInitData(bool tf)
     {
     {
-        // If not required sets sentActInitdata to true, to prevent it being request at graph initialization time.
+        // If not required, sets sentActInitdata to true, to prevent it being requested at graph initialization time.
         container.sentActInitData->set(0, !tf);
         container.sentActInitData->set(0, !tf);
     }
     }
     virtual void clearConnections();
     virtual void clearConnections();