Bläddra i källkod

HPCC-22386 Fix problems with wfids and embedded libraries

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 6 år sedan
förälder
incheckning
b6aa176406

+ 33 - 19
common/workunit/workunit.cpp

@@ -178,16 +178,9 @@ CWuGraphStats::CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _cre
     StatsScopeId graphScopeId;
     verifyex(graphScopeId.setScopeText(_rootScope));
 
-    if (wfid)
-    {
-        StatsScopeId rootScopeId(SSTworkflow,wfid);
-        collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
-        collector->beginScope(graphScopeId);
-    }
-    else
-    {
-        collector.setown(createStatisticsGatherer(_creatorType, _creator, graphScopeId));
-    }
+    StatsScopeId rootScopeId(SSTworkflow,wfid);
+    collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
+    collector->beginScope(graphScopeId);
 }
 
 void CWuGraphStats::beforeDispose()
@@ -208,7 +201,6 @@ void CWuGraphStats::beforeDispose()
     StringBuffer tag;
     tag.append("sg").append(id);
 
-
     //Replace the particular subgraph statistics added by this creator
     StringBuffer qualified(tag);
     qualified.append("[@creator='").append(creator).append("']");
@@ -474,6 +466,11 @@ static int compareGraphNode(IInterface * const *ll, IInterface * const *rr)
 {
     IPropertyTree *l = (IPropertyTree *) *ll;
     IPropertyTree *r = (IPropertyTree *) *rr;
+    unsigned lwfid = l->getPropInt("@wfid");
+    unsigned rwfid = r->getPropInt("@wfid");
+    if (lwfid != rwfid)
+        return lwfid > rwfid ? +1 : -1;
+
     const char * lname = l->queryName();
     const char * rname = r->queryName();
     return compareScopeName(lname, rname);
@@ -3714,9 +3711,9 @@ public:
             }
         }
     }
-    virtual void setGraphState(const char *graphName, WUGraphState state) const
+    virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
     {
-        Owned<IRemoteConnection> conn = getWritableProgressConnection(graphName);
+        Owned<IRemoteConnection> conn = getWritableProgressConnection(graphName, wfid);
         conn->queryRoot()->setPropInt("@_state", state);
     }
     virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
@@ -3753,7 +3750,7 @@ public:
     }
     virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override
     {
-        return new CDaliWuGraphStats(getWritableProgressConnection(graphName), creatorType, creator, _wfid, graphName, subgraph);
+        return new CDaliWuGraphStats(getWritableProgressConnection(graphName, _wfid), creatorType, creator, _wfid, graphName, subgraph);
     }
 
 protected:
@@ -3767,12 +3764,29 @@ protected:
         }
         return progressConnection.getLink();
     }
-    IRemoteConnection *getWritableProgressConnection(const char *graphName) const
+    IRemoteConnection *getWritableProgressConnection(const char *graphName, unsigned wfid) const
     {
         CriticalBlock block(crit);
         progressConnection.clear(); // Make sure subsequent reads from this workunit get the changes I am making
         VStringBuffer path("/GraphProgress/%s/%s", queryWuid(), graphName);
-        return querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        IPropertyTree * root = conn->queryRoot();
+        assertex(wfid);
+
+        if (!root->hasProp("@wfid"))
+        {
+            root->setPropInt("@wfid", wfid);
+        }
+        else
+        {
+            //Ideally the following code would check that the wfids are passed consistently.
+            //However there is an obscure problem with out of line functions being called from multiple workflow
+            //ids, and possibly library graphs.
+            //Stats for library graphs should be nested below the library call activity
+            //assertex(root->getPropInt("@wfid", 0) == wfid); // check that wfid is passed consistently
+        }
+
+        return conn.getClear();
     }
     IPropertyTree *getGraphProgressTree() const
     {
@@ -4013,8 +4027,8 @@ public:
             { return c->queryGraphState(graphName); }
     virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
             { return c->queryNodeState(graphName, nodeId); }
-    virtual void setGraphState(const char *graphName, WUGraphState state) const
-            { c->setGraphState(graphName, state); }
+    virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
+            { c->setGraphState(graphName, wfid, state); }
     virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
             { c->setNodeState(graphName, nodeId, state); }
     virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override
@@ -9985,7 +9999,7 @@ WUGraphState CLocalWorkUnit::queryNodeState(const char *graphName, WUGraphIDType
 {
     throwUnexpected();   // Should only be used for persisted workunits
 }
-void CLocalWorkUnit::setGraphState(const char *graphName, WUGraphState state) const
+void CLocalWorkUnit::setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
 {
     throwUnexpected();   // Should only be used for persisted workunits
 }

+ 1 - 1
common/workunit/workunit.hpp

@@ -1280,7 +1280,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
 
     virtual WUGraphState queryGraphState(const char *graphName) const = 0;
     virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
-    virtual void setGraphState(const char *graphName, WUGraphState state) const = 0;
+    virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
     virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
     virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const = 0;
     virtual void clearGraphProgress() const = 0;

+ 1 - 1
common/workunit/workunit.ipp

@@ -272,7 +272,7 @@ public:
     virtual IConstWUGraph * getGraph(const char *name) const;
     virtual IConstWUGraphProgress * getGraphProgress(const char * name) const;
     virtual WUGraphState queryGraphState(const char *graphName) const;
-    virtual void setGraphState(const char *graphName, WUGraphState state) const;
+    virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
     virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
     virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
     virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override;

+ 5 - 3
ecl/eclagent/eclagent.ipp

@@ -1115,8 +1115,8 @@ class EclGraph : public CInterface
     } graphCodeContext;
 
 public:
-    EclGraph(IAgentContext & _agent, const char *_graphName, IConstWorkUnit * _wu, bool _isLibrary, CHThorDebugContext * _debugContext, IProbeManager * _probeManager) :
-                            graphName(_graphName), wu(_wu), debugContext(_debugContext), probeManager(_probeManager)
+    EclGraph(IAgentContext & _agent, const char *_graphName, IConstWorkUnit * _wu, bool _isLibrary, CHThorDebugContext * _debugContext, IProbeManager * _probeManager, unsigned _wfid) :
+                            graphName(_graphName), wu(_wu), debugContext(_debugContext), probeManager(_probeManager), wfid(_wfid)
     {
         isLibrary = _isLibrary;
         graphCodeContext.set(_agent.queryCodeContext());
@@ -1145,7 +1145,8 @@ public:
 
     void associateSubGraph(EclSubGraph * subgraph);
 
-    inline bool queryLibrary() { return isLibrary; }
+    inline bool queryLibrary() const { return isLibrary; }
+    inline unsigned queryWfid() const { return wfid; }
 
 protected:
     IAgentContext * agent;
@@ -1157,6 +1158,7 @@ protected:
     bool isLibrary;
     CHThorDebugContext * debugContext;
     IProbeManager * probeManager;
+    unsigned wfid;
     bool aborted;
 };
 

+ 8 - 7
ecl/eclagent/eclgraph.cpp

@@ -1211,7 +1211,7 @@ void EclGraph::createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml, boo
 void EclGraph::execute(const byte * parentExtract)
 {
     if (agent->queryRemoteWorkunit())
-        wu->setGraphState(queryGraphName(), WUGraphRunning);
+        wu->setGraphState(queryGraphName(), wfid, WUGraphRunning);
 
     {
         Owned<IWorkUnit> wu(agent->updateWorkUnit());
@@ -1242,12 +1242,12 @@ void EclGraph::execute(const byte * parentExtract)
         }
 
         if (agent->queryRemoteWorkunit())
-            wu->setGraphState(queryGraphName(), WUGraphComplete);
+            wu->setGraphState(queryGraphName(), wfid, WUGraphComplete);
     }
     catch (...)
     {
         if (agent->queryRemoteWorkunit())
-            wu->setGraphState(queryGraphName(), WUGraphFailed);
+            wu->setGraphState(queryGraphName(), wfid, WUGraphFailed);
         throw;
     }
 }
@@ -1306,7 +1306,8 @@ void EclGraph::updateLibraryProgress()
     ForEachItemIn(idx, graphs)
     {
         EclSubGraph & cur = graphs.item(idx);
-        Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), agent->getWorkflowId(), cur.id);
+        unsigned wfid = cur.parent.queryWfid();
+        Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id);
         cur.updateProgress(progress->queryStatsBuilder());
     }
 }
@@ -1447,9 +1448,9 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)
 
 //---------------------------------------------------------------------------
 
-IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph)
+IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
 {
-    return wu->updateStats (queryGraphName(), creatorType, creator, wfid, subgraph);
+    return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph);
 }
 
 void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
@@ -1501,7 +1502,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa
 
     bool probeEnabled = wuRead->getDebugValueBool("_Probe", false);
 
-    Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager);
+    Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
     eclGraph->createFromXGMML(dll, xgmml, probeEnabled);
     return eclGraph.getClear();
 }

+ 1 - 1
ecl/hqlcpp/hqlcpp.cpp

@@ -140,7 +140,7 @@ void SubStringInfo::bindToFrom(HqlCppTranslator & translator, BuildCtx & ctx)
 
 //---------------------------------------------------------------------------
 
-WorkflowItem::WorkflowItem(IHqlExpression * _function) : wfid(0), function(_function), workflowOp(no_funcdef)
+WorkflowItem::WorkflowItem(IHqlExpression * _function) : wfid(999999999), function(_function), workflowOp(no_funcdef)
 {
     IHqlExpression * body = function->queryChild(0);
     assertex(body->getOperator() == no_outofline);

+ 3 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -2016,6 +2016,8 @@ public:
     inline unsigned curGraphSequence() const { return activeGraph ? graphSeqNumber : 0; }
     UniqueSequenceCounter & querySpillSequence() { return spillSequence; }
     unsigned nextLibrarySequence() { return librarySequence++; }
+    unsigned queryMaxWfid() { return maxWfid; }
+    void setMaxWfid(unsigned wfid) { maxWfid = wfid; }
 
 public:
     void traceExpression(const char * title, IHqlExpression * expr, unsigned level=500);
@@ -2046,6 +2048,7 @@ protected:
     HqlCppDerived       derived;
     unsigned            activitiesThisCpp;
     unsigned            curCppFile;
+    unsigned            maxWfid = 0;
     Linked<ICodegenContextCallback> ctxCallback;
     ClusterType         targetClusterType;
     bool contextAvailable;

+ 5 - 3
ecl/hqlcpp/hqlhtcpp.cpp

@@ -5963,9 +5963,11 @@ bool HqlCppTranslator::buildCode(HqlQueryContext & query, const char * embeddedL
                 WorkflowItem & cur = workflow.item(i);
                 if (!cur.isFunction())
                 {
-                    assertex(!graph);
                     HqlExprArray & exprs = cur.queryExprs();
+                    assertex(!graph);
                     assertex(exprs.ordinality() == 1);
+
+                    curWfid = cur.queryWfid();
                     graph.set(&exprs.item(0));
                     assertex(graph->getOperator() == no_thor);
                 }
@@ -18541,6 +18543,7 @@ void HqlCppTranslator::buildWorkflow(WorkflowArray & workflow)
 
         if (!isEmpty)
         {
+            curWfid = wfid;
             if (action.isFunction())
             {
                 OwnedHqlExpr function = action.getFunction();
@@ -18551,7 +18554,6 @@ void HqlCppTranslator::buildWorkflow(WorkflowArray & workflow)
                 OwnedHqlExpr expr = createActionList(action.queryExprs());
 
                 IHqlExpression * persistAttr = expr->queryAttribute(_workflowPersist_Atom);
-                curWfid = wfid;
                 if (persistAttr)
                 {
                     if (!options.freezePersists)
@@ -18564,8 +18566,8 @@ void HqlCppTranslator::buildWorkflow(WorkflowArray & workflow)
                 }
                 else
                     buildWorkflowItem(switchctx, switchStmt, wfid, expr);
-                curWfid = 0;
             }
+            curWfid = 0;
         }
     }
 

+ 2 - 1
ecl/hqlcpp/hqlttcpp.cpp

@@ -7252,7 +7252,7 @@ void WorkflowTransformer::analyseAll(const HqlExprArray & in)
 
 void WorkflowTransformer::transformRoot(const HqlExprArray & in, WorkflowArray & out)
 {
-    wfidCount = 0;
+    wfidCount = translator.queryMaxWfid();
     HqlExprArray transformed;
     WorkflowTransformInfo globalInfo(NULL);
     ForEachItemIn(idx, in)
@@ -7316,6 +7316,7 @@ void WorkflowTransformer::transformRoot(const HqlExprArray & in, WorkflowArray &
 
     appendArray(out, workflow);
     appendArray(out, functions);
+    translator.setMaxWfid(wfidCount);
 }
 
 void extractWorkflow(HqlCppTranslator & translator, HqlExprArray & exprs, WorkflowArray & out)

+ 2 - 2
plugins/cassandra/cassandrawu.cpp

@@ -2623,7 +2623,7 @@ public:
         else
             return WUGraphUnknown;
     }
-    void setGraphState(const char *graphName, WUGraphState state) const
+    void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
     {
         setNodeState(graphName, 0, state);
     }
@@ -3324,7 +3324,7 @@ public:
                         }
                     }
                     if (graph.hasProp("@_state"))
-                        wu->setGraphState(graphName, (WUGraphState) graph.getPropInt("@_state"));
+                        wu->setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
                 }
             }
             wu->commit();

+ 2 - 2
thorlcr/graph/thgraphmaster.cpp

@@ -1774,7 +1774,7 @@ bool CJobMaster::go()
     try
     {
         startJob();
-        workunit->setGraphState(queryGraphName(), WUGraphRunning);
+        workunit->setGraphState(queryGraphName(), getWfid(), WUGraphRunning);
         Owned<IThorGraphIterator> iter = queryJobChannel(0).getSubGraphs();
         CICopyArrayOf<CMasterGraph> toRun;
         ForEach(*iter)
@@ -1807,7 +1807,7 @@ bool CJobMaster::go()
     }
     catch (IException *e) { fireException(e); e->Release(); }
     catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
-    workunit->setGraphState(queryGraphName(), aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
+    workunit->setGraphState(queryGraphName(), getWfid(), aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
 
     if (queryPausing())
         saveSpills();

+ 2 - 2
tools/wutool/wutool.cpp

@@ -1246,7 +1246,7 @@ protected:
         ASSERT(wu->queryGraphState("graph1")==WUGraphUnknown);
         ASSERT(wu->queryNodeState("graph1", 1)==WUGraphUnknown);
 
-        wu->setGraphState("graph1",WUGraphRunning);
+        wu->setGraphState("graph1",1,WUGraphRunning);
         ASSERT(wu->queryGraphState("graph1")==WUGraphRunning);
 
         wu->setNodeState("graph1", 1, WUGraphRunning);
@@ -1261,7 +1261,7 @@ protected:
         ret = wu->getRunningGraph(s, subid);
         ASSERT(!ret);
 
-        Owned<IWUGraphStats> progress = wu->updateStats("graph1", SCThthor, queryStatisticsComponentName(), 0, 1);
+        Owned<IWUGraphStats> progress = wu->updateStats("graph1", SCThthor, queryStatisticsComponentName(), 1, 1);
         IStatisticGatherer & stats = progress->queryStatsBuilder();
         {
             StatsSubgraphScope subgraph(stats, 1);