Browse Source

HPCC-16369 Fix missing child query progress issue

Child queries which did not require meta data initialization,
did not flag master that they were initialized and as a
consequence their progress data was not updated, so missing from
the graph control.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 8 years ago
parent
commit
5d9606c32b

+ 2 - 1
thorlcr/graph/thgraph.cpp

@@ -1030,7 +1030,7 @@ bool isLoopActivity(CGraphElementBase &container)
 }
 /////
 
-CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob())
+CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob()), progressUpdated(false)
 {
     xgmml = NULL;
     parent = owner = NULL;
@@ -1099,6 +1099,7 @@ void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
 void CGraphBase::reset()
 {
     setCompleteEx(false);
+    clearProgressUpdated();
     graphCancelHandler.reset();
     if (0 == containers.count())
     {

+ 6 - 3
thorlcr/graph/thgraph.hpp

@@ -434,7 +434,6 @@ class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, im
     CGraphTable childGraphsTable;
     CGraphArrayCopy childGraphs;
     Owned<IGraphTempHandler> tmpHandler;
-    bool initialized = false;
 
     void clean();
 
@@ -549,6 +548,8 @@ protected:
     IBarrier *startBarrier, *waitBarrier, *doneBarrier;
     mptag_t mpTag, startBarrierTag, waitBarrierTag, doneBarrierTag;
     bool connected, started, aborted, graphDone, sequential;
+    bool initialized = false;
+    std::atomic_bool progressUpdated;
     CJobBase &job;
     CJobChannel &jobChannel;
     graph_id graphId;
@@ -583,9 +584,11 @@ public:
     CGraphBase *queryOwner() { return owner; }
     CGraphBase *queryParent() { return parent?parent:this; }
     IMPServer &queryMPServer() const;
+    void clearProgressUpdated() { progressUpdated.store(false); }
+    void setProgressUpdated() { progressUpdated.store(true); }
+    bool hasProgress() const { return progressUpdated; }
+    bool checkProgressUpdatedAndClear() { return progressUpdated.exchange(false); }
     bool syncInitData();
-    inline void setInitialized() { initialized = true; }
-    inline bool isInitialized() const { return initialized; }
     bool isComplete() const { return complete; }
     bool isGlobal() const { return global; }
     bool isStarted() const { return started; }

+ 2 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -249,7 +249,6 @@ void CSlaveMessageHandler::main()
                             element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
                         toSerialize.append(*LINK(element));
                     }
-                    graph->setInitialized();
                     msg.clear();
                     mptag_t replyTag = job.queryJobChannel(0).queryMPServer().createReplyTag();
                     msg.append(replyTag); // second reply
@@ -2661,6 +2660,8 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
     CriticalBlock b(createdCrit);
     unsigned count, _count;
     mb.read(count);
+    if (count)
+        setProgressUpdated();
     _count = count;
     while (count--)
     {

+ 12 - 24
thorlcr/graph/thgraphslave.cpp

@@ -739,7 +739,7 @@ void CSlaveLateStartActivity::reset()
 
 // CSlaveGraph
 
-CSlaveGraph::CSlaveGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
+CSlaveGraph::CSlaveGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel), progressActive(false)
 {
     jobS = (CJobSlave *)&jobChannel.queryJob();
 }
@@ -754,8 +754,6 @@ void CSlaveGraph::init(MemoryBuffer &mb)
     waitBarrier = queryJobChannel().createBarrier(waitBarrierTag);
     if (doneBarrierTag != TAG_NULL)
         doneBarrier = queryJobChannel().createBarrier(doneBarrierTag);
-    initialized = false;
-    progressActive = progressToCollect = false;
     unsigned subCount;
     mb.read(subCount);
     while (subCount--)
@@ -767,6 +765,12 @@ void CSlaveGraph::init(MemoryBuffer &mb)
     }
 }
 
+void CSlaveGraph::reset()
+{
+    CGraphBase::reset();
+    progressActive = false;
+}
+
 void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
 {
     activity_id id;
@@ -895,7 +899,6 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
             MemoryBuffer actInitData;
             actInitData.append(len, msg.readDirect(len));
             CriticalBlock b(progressCrit);
-            initialized = true;
             initWithActData(actInitData, actInitRtnData);
         }
         catch (IException *e)
@@ -944,11 +947,8 @@ bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
 
 void CSlaveGraph::start()
 {
-    {
-        SpinBlock b(progressActiveLock);
-        progressActive = true;
-        progressToCollect = true;
-    }
+    progressActive.store(true); // remains true whilst graph is running
+    setProgressUpdated(); // may remain true after graph is running
     bool forceAsync = !queryOwner() || isGlobal();
     Owned<IThorActivityIterator> iter = getSinkIterator();
     unsigned sinks = 0;
@@ -1071,11 +1071,8 @@ void CSlaveGraph::abort(IException *e)
 void CSlaveGraph::done()
 {
     GraphPrintLog("End of sub-graph");
-    {
-        SpinBlock b(progressActiveLock);
-        progressActive = false;
-        progressToCollect = true; // NB: ensure collected after end of graph
-    }
+    progressActive.store(false);
+    setProgressUpdated(); // NB: ensure collected after end of graph
     if (!aborted && graphDone && (!queryOwner() || isGlobal()))
         getDoneSem.wait(); // must wait on master
     if (!queryOwner())
@@ -1109,16 +1106,7 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
     // until started and activities initialized, activities are not ready to serlialize stats.
     if ((started&&initialized) || 0 == activityCount())
     {
-        bool collect=false;
-        {
-            SpinBlock b(progressActiveLock);
-            if (progressActive || progressToCollect)
-            {
-                progressToCollect = false;
-                collect = true;
-            }
-        }
-        if (collect)
+        if (checkProgressUpdatedAndClear() || progressActive)
         {
             unsigned sPos = mb.length();
             Owned<IThorActivityIterator> iter = getConnectedIterator();

+ 2 - 2
thorlcr/graph/thgraphslave.hpp

@@ -359,10 +359,9 @@ class graphslave_decl CSlaveGraph : public CGraphBase
 {
     CJobSlave *jobS;
     Semaphore getDoneSem;
-    bool initialized, progressActive, progressToCollect;
     CriticalSection progressCrit;
-    SpinLock progressActiveLock; // MORE use atomic variables (and progressToCollect.exchange()) to remove this spin lock
     bool doneInit = false;
+    std::atomic_bool progressActive;
 
 public:
 
@@ -383,6 +382,7 @@ public:
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract) override;
     virtual void start() override;
     virtual void abort(IException *e) override;
+    virtual void reset() override;
     virtual void done() override;
     virtual IThorGraphResults *createThorGraphResults(unsigned num);
 

+ 1 - 1
thorlcr/master/thdemonserver.cpp

@@ -72,7 +72,7 @@ private:
         try
         {
             StatsSubgraphScope subgraph(stats, graph->queryGraphId());
-            if (graph->isInitialized())
+            if (graph->hasProgress()) // if there have ever been any progress, ensure they are republished
                 doReportGraph(stats, graph, finished);
             Owned<IThorGraphIterator> graphIter = graph->getChildGraphs();
             ForEach (*graphIter)