Browse Source

Merge pull request #7557 from richardkchapman/cassandra-progress

HPCC-13899 Add support for graph progress in Cassandra

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 năm trước cách đây
mục cha
commit
381555c440

+ 295 - 400
common/workunit/workunit.cpp

@@ -172,232 +172,79 @@ void doDescheduleWorkkunit(char const * wuid)
  * Graph progress support
  */
 
-#define PROGRESS_FORMAT_V 2
+CWuGraphStats::CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
+    : progress(_progress), creatorType(_creatorType), creator(_creator), id(_id)
+{
+    StatisticScopeType scopeType = SSTgraph;
+    StatsScopeId rootScopeId;
+    verifyex(rootScopeId.setScopeText(_rootScope));
+    collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
+}
 
-class CConstGraphProgress : public CInterface, implements IConstWUGraphProgress
+void CWuGraphStats::beforeDispose()
 {
-    class CWuGraphStats : public CInterfaceOf<IWUGraphStats>
+    Owned<IStatisticCollection> stats = collector->getResult();
+
+    MemoryBuffer compressed;
     {
-    public:
-        CWuGraphStats(CConstGraphProgress &_parent, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
-            : parent(_parent), creatorType(_creatorType), creator(_creator), id(_id)
-        {
-            StringBuffer subgraphScopeName;
-            subgraphScopeName.append(_rootScope);
+        MemoryBuffer serialized;
+        serializeStatisticCollection(serialized, stats);
+        compressToBuffer(compressed, serialized.length(), serialized.toByteArray());
+    }
 
-            StatisticScopeType scopeType = SSTgraph;
-            StatsScopeId rootScopeId;
-            verifyex(rootScopeId.setScopeText(_rootScope));
-            collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
-        }
-        virtual void beforeDispose()
-        {
-            Owned<IStatisticCollection> stats = collector->getResult();
+    unsigned minActivity = 0;
+    unsigned maxActivity = 0;
+    stats->getMinMaxActivity(minActivity, maxActivity);
 
-            MemoryBuffer compressed;
-            {
-                MemoryBuffer serialized;
-                serializeStatisticCollection(serialized, stats);
-                compressToBuffer(compressed, serialized.length(), serialized.toByteArray());
-            }
+    StringBuffer tag;
+    tag.append("sg").append(id);
 
-            unsigned minActivity = 0;
-            unsigned maxActivity = 0;
-            stats->getMinMaxActivity(minActivity, maxActivity);
-            parent.setSubgraphStats(creatorType, creator, id, compressed, minActivity, maxActivity);
-        }
-        virtual IStatisticGatherer & queryStatsBuilder()
-        {
-            return *collector;
-        }
+    IPropertyTree * subgraph = createPTree(tag);
+    subgraph->setProp("@c", queryCreatorTypeName(creatorType));
+    subgraph->setProp("@creator", creator);
+    subgraph->setPropInt("@minActivity", minActivity);
+    subgraph->setPropInt("@maxActivity", maxActivity);
 
-    protected:
-        CConstGraphProgress &parent;
-        Owned<IStatisticGatherer> collector;
-        StringAttr creator;
-        StatisticCreatorType creatorType;
-        unsigned id;
-    };
+    //Replace the particular subgraph statistics added by this creator
+    StringBuffer qualified(tag);
+    qualified.append("[@creator='").append(creator).append("']");
+    progress->removeProp(qualified);
+    subgraph = progress->addPropTree(tag, subgraph);
+    subgraph->setPropBin("Stats", compressed.length(), compressed.toByteArray());
+    if (!progress->getPropBool("@stats", false))
+        progress->setPropBool("@stats", true);
+}
 
-    class CGraphProgress : public CInterface, implements IWUGraphProgress
-    {
-        CConstGraphProgress &parent;
-    public:
-        IMPLEMENT_IINTERFACE;
-        CGraphProgress(CConstGraphProgress &_parent) : parent(_parent)
-        {
-            parent.lockWrite();
-        }
-        ~CGraphProgress()
-        {
-            parent.unlock();
-        }
-        virtual IPropertyTree * getProgressTree() { return parent.getProgressTree(); }
-        virtual WUGraphState queryGraphState() { return parent.queryGraphState(); }
-        virtual WUGraphState queryNodeState(WUGraphIDType nodeId) { return parent.queryNodeState(nodeId); }
-        virtual IWUGraphProgress * update() { throwUnexpected(); }
-        virtual IWUGraphStats * update(StatisticCreatorType creatorType, const char * creator, unsigned subgraph) { throwUnexpected(); }
-        virtual unsigned queryFormatVersion() { return parent.queryFormatVersion(); }
-        virtual void setGraphState(WUGraphState state)
-        {
-            parent.setGraphState(state);
-        }
-        virtual void setNodeState(WUGraphIDType nodeId, WUGraphState state)
-        {
-            parent.setNodeState(nodeId, state);
-        }
-    };
-    void clearConnection()
-    {
-        conn.clear();
-        progress.clear();
-    }
+IStatisticGatherer & CWuGraphStats::queryStatsBuilder()
+{
+    return *collector;
+}
+
+class CConstGraphProgress : public CInterface, implements IConstWUGraphProgress
+{
 public:
     IMPLEMENT_IINTERFACE;
-    static void deleteWuidProgress(const char *wuid)
-    {
-        StringBuffer path("/GraphProgress/");
-        path.append(wuid);
-        Owned<IRemoteConnection> conn = querySDS().connect(path.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-        if (conn)
-            conn->close(true);
-    }
-    CConstGraphProgress(const char *_wuid, const char *_graphName) : wuid(_wuid), graphName(_graphName)
-    {
-        rootPath.append("/GraphProgress/").append(wuid).append('/').append(graphName).append('/');
-        connected = connectedWrite = false;
-        formatVersion = 0;
-    }
     CConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress) : wuid(_wuid), graphName(_graphName), progress(_progress)
     {
-        formatVersion = progress->getPropInt("@format");
-        connectedWrite = false; // should never be
-        connected = true;
-    }
-    void connect()
-    {
-        clearConnection();
-        conn.setown(querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_READ|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
-
-        progress.set(conn->queryRoot());
-        formatVersion = progress->getPropInt("@format");
-        connected = true;
-    }
-    void lockWrite()
-    {
-        if (connectedWrite) return;
-        if (!rootPath.length())
-            throw MakeStringException(WUERR_GraphProgressWriteUnsupported, "Writing to graph progress unsupported in this context");
-        // JCSMORE - look at using changeMode here.
-        if (conn)
-            clearConnection();
-        conn.setown(querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
-        progress.set(conn->queryRoot());
-        if (!progress->hasChildren()) // i.e. blank.
-        {
-            formatVersion = PROGRESS_FORMAT_V;
-            progress->setPropInt("@format", PROGRESS_FORMAT_V);
-        }
-        else
-            formatVersion = progress->getPropInt("@format");
-        connected = connectedWrite = true;
-    }
-    void unlock()
-    {
-        connected = false;
-        connectedWrite = false;
-        clearConnection();
-    }
-    static bool getRunningGraph(const char *wuid, IStringVal &graphName, WUGraphIDType &subId)
-    {
-        StringBuffer path;
-        Owned<IRemoteConnection> conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
-        if (!conn) return false;
-
-        const char *name = conn->queryRoot()->queryProp("Running/@graph");
-        if (name)
-        {
-            graphName.set(name);
-            subId = conn->queryRoot()->getPropInt64("Running/@subId");
-            return true;
-        }
-        else
-            return false;
-    }
-    void setGraphState(WUGraphState state)
-    {
-        progress->setPropInt("@_state", (unsigned)state);
-    }
-    void setNodeState(WUGraphIDType nodeId, WUGraphState state)
-    {
-        if (!connectedWrite) lockWrite();
-        StringBuffer path;
-        path.append("node[@id=\"").append(nodeId).append("\"]");
-        IPropertyTree *node = progress->queryPropTree(path.str());
-        if (!node)
-        {
-            node = progress->addPropTree("node", createPTree());
-            node->setPropInt("@id", (int)nodeId);
-        }
-        node->setPropInt("@_state", (unsigned)state);
-        
-        switch (state)
-        {
-            case WUGraphRunning:
-            {
-                StringBuffer path;
-                Owned<IRemoteConnection> conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-                IPropertyTree *running = conn->queryRoot()->setPropTree("Running", createPTree());
-                running->setProp("@graph", graphName);
-                running->setPropInt64("@subId", nodeId);
-                break;
-            }
-            case WUGraphComplete:
-            {
-                StringBuffer path;
-                Owned<IRemoteConnection> conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-                conn->queryRoot()->removeProp("Running"); // only one thing running at any given time and one thing with lockWrite access
-                break;
-            }
-        }
+        if (!progress)
+            progress.setown(createPTree());
+        formatVersion = progress->getPropInt("@format", PROGRESS_FORMAT_V);
     }
     virtual IPropertyTree * getProgressTree()
     {
-        if (!connected) connect();
         if (progress->getPropBool("@stats"))
-            return createProcessTreeFromStats();
+            return createProcessTreeFromStats(); // Should we cache that?
         return LINK(progress);
     }
-    virtual WUGraphState queryGraphState()
-    {
-        return (WUGraphState)queryProgressStateTree()->getPropInt("@_state", (unsigned)WUGraphUnknown);
-    }
-    virtual WUGraphState queryNodeState(WUGraphIDType nodeId)
-    {
-        StringBuffer path;
-        path.append("node[@id=\"").append(nodeId).append("\"]/@_state");
-        return (WUGraphState)queryProgressStateTree()->getPropInt(path.str(), (unsigned)WUGraphUnknown);
-    }
-    virtual IWUGraphProgress * update()
-    {
-        return new CGraphProgress(*this);
-    }
-    virtual IWUGraphStats * update(StatisticCreatorType creatorType, const char * creator, unsigned subgraph)
-    {
-        return new CWuGraphStats(*this, creatorType, creator, graphName, subgraph);
-    }
-
     virtual unsigned queryFormatVersion()
     {
-        if (!connected) connect();
         return formatVersion;
     }
 
-private:
-    IPropertyTree * queryProgressStateTree()
+protected:
+    CConstGraphProgress(const char *_wuid, const char *_graphName) : wuid(_wuid), graphName(_graphName)
     {
-        if (!connected) connect();
-        return progress;
+        formatVersion = PROGRESS_FORMAT_V;
     }
     static void expandStats(IPropertyTree * target, IStatisticCollection & collection)
     {
@@ -497,37 +344,17 @@ private:
         return progressTree.getClear();
     }
 
-    void setSubgraphStats(StatisticCreatorType creatorType, const char * creator, unsigned id, const MemoryBuffer & compressed, unsigned minActivity, unsigned maxActivity)
-    {
-        StringBuffer tag;
-        tag.append("sg").append(id);
-
-        IPropertyTree * subgraph = createPTree(tag);
-        subgraph->setProp("@c", queryCreatorTypeName(creatorType));
-        subgraph->setProp("@creator", creator);
-        subgraph->setPropInt("@minActivity", minActivity);
-        subgraph->setPropInt("@maxActivity", maxActivity);
-
-        //Replace the particular subgraph statistics added by this creator
-        tag.append("[@creator='").append(creator).append("']");
-
-        lockWrite();
-        subgraph = progress->setPropTree(tag, subgraph);
-        subgraph->setPropBin("Stats", compressed.length(), compressed.toByteArray());
-        if (!progress->getPropBool("@stats", false))
-            progress->setPropBool("@stats", true);
-        unlock();
-    }
-
-private:
-    Owned<IRemoteConnection> conn;
+protected:
     Linked<IPropertyTree> progress;
     StringAttr wuid, graphName;
-    StringBuffer rootPath;
-    bool connected, connectedWrite;
     unsigned formatVersion;
 };
 
+extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress)
+{
+    return new CConstGraphProgress(_wuid, _graphName, _progress);
+}
+
 //--------------------------------------------------------------------------------------------------------------------
 
 class ExtractedStatistic : public CInterfaceOf<IConstWUStatistic>
@@ -1038,6 +865,17 @@ extern IConstWorkUnitInfo *createConstWorkUnitInfo(IPropertyTree &p)
     return new CLightweightWorkunitInfo(p);
 }
 
+class CDaliWuGraphStats : public CWuGraphStats
+{
+public:
+    CDaliWuGraphStats(IRemoteConnection *_conn, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
+        : CWuGraphStats(LINK(_conn->queryRoot()), _creatorType, _creator, _rootScope, _id), conn(_conn)
+    {
+    }
+protected:
+    Owned<IRemoteConnection> conn;
+};
+
 class CDaliWorkUnit : public CPersistedWorkUnit
 {
 public:
@@ -1053,6 +891,75 @@ public:
         // We use the beforeDispose() in base class to help ensure this
         p.clear();
     }
+    IConstWUGraphProgress *getGraphProgress(const char *graphName) const
+    {
+        CriticalBlock block(crit);
+        IRemoteConnection *conn = queryProgressConnection();
+        if (conn)
+        {
+            IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
+            if (progress)
+                return new CConstGraphProgress(p->queryName(), graphName, progress);
+        }
+        return NULL;
+    }
+    virtual WUGraphState queryGraphState(const char *graphName) const
+    {
+        CriticalBlock block(crit);
+        IRemoteConnection *conn = queryProgressConnection();
+        if (conn)
+        {
+            IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
+            if (progress)
+                return (WUGraphState) progress->getPropInt("@_state", (unsigned) WUGraphUnknown);
+        }
+        return WUGraphUnknown;
+    }
+    virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
+    {
+        CriticalBlock block(crit);
+        IRemoteConnection *conn = queryProgressConnection();
+        if (conn)
+        {
+            IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
+            if (progress)
+            {
+                StringBuffer path;
+                path.append("node[@id=\"").append(nodeId).append("\"]/@_state");
+                return (WUGraphState) progress->getPropInt(path, (unsigned) WUGraphUnknown);
+            }
+        }
+        return WUGraphUnknown;
+    }
+
+    virtual void clearGraphProgress() const
+    {
+        CriticalBlock block(crit);
+        progressConnection.clear();  // Make sure nothing is locking for read or we won't be able to lock for write
+        StringBuffer path("/GraphProgress/");
+        path.append(p->queryName());
+        Owned<IRemoteConnection> delconn = querySDS().connect(path.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
+        if (delconn)
+            delconn->close(true);
+    }
+
+    virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
+    {
+        CriticalBlock block(crit);
+        IRemoteConnection *conn = queryProgressConnection();
+        if (!conn)
+            return false;
+        const char *name = conn->queryRoot()->queryProp("Running/@graph");
+        if (name)
+        {
+            graphName.set(name);
+            subId = conn->queryRoot()->getPropInt64("Running/@subId");
+            return true;
+        }
+        else
+            return false;
+    }
+
 
     virtual void forceReload()
     {
@@ -1072,6 +979,7 @@ public:
     virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
     {
         CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
+        clearGraphProgress();
         connection->close(true);
         connection.clear();
     }
@@ -1121,8 +1029,65 @@ public:
             throw;
         }
     }
+    virtual void setGraphState(const char *graphName, WUGraphState state) const
+    {
+        Owned<IRemoteConnection> conn = getWritableProgressConnection(graphName);
+        conn->queryRoot()->setPropInt("@_state", state);
+    }
+    virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
+    {
+        CriticalBlock block(crit);
+        progressConnection.clear();  // Make sure nothing is locking for read or we won't be able to lock for write
+        VStringBuffer path("/GraphProgress/%s", queryWuid());
+        Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        IPTree *progress = ensurePTree(conn->queryRoot(), graphName);
+        path.clear().append("node[@id=\"").append(nodeId).append("\"]");
+        IPropertyTree *node = progress->queryPropTree(path.str());
+        if (!node)
+        {
+            node = progress->addPropTree("node", createPTree());
+            node->setPropInt64("@id", nodeId);
+        }
+        node->setPropInt("@_state", (unsigned)state);
+        switch (state)
+        {
+            case WUGraphRunning:
+            {
+                IPropertyTree *running = conn->queryRoot()->setPropTree("Running", createPTree());
+                running->setProp("@graph", graphName);
+                running->setPropInt64("@subId", nodeId);
+                break;
+            }
+            case WUGraphComplete:
+            {
+                conn->queryRoot()->removeProp("Running"); // only one thing running at any given time and one thing with lockWrite access
+                break;
+            }
+        }
+    }
+    virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
+    {
+        return new CDaliWuGraphStats(getWritableProgressConnection(graphName), creatorType, creator, graphName, subgraph);
+    }
+
 protected:
+    IRemoteConnection *queryProgressConnection() const
+    {
+        CriticalBlock block(crit);
+        if (!progressConnection)
+        {
+            VStringBuffer path("/GraphProgress/%s", queryWuid());
+            progressConnection.setown(querySDS().connect(path, myProcessSession(), 0, SDS_LOCK_TIMEOUT)); // Note - we don't lock. The writes are atomic.
+        }
+        return progressConnection;
+    }
+    IRemoteConnection *getWritableProgressConnection(const char *graphName) const
+    {
+        VStringBuffer path("/GraphProgress/%s/%s", queryWuid(), graphName);
+        return querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+    }
     Owned<IRemoteConnection> connection;
+    mutable Owned<IRemoteConnection> progressConnection;
 };
 
 class CLockedWorkUnit : public CInterface, implements ILocalWorkUnit, implements IExtendedWUInterface
@@ -1337,6 +1302,19 @@ public:
             { return c->getProcesses(type, instance); }
     virtual unsigned getTotalThorTime() const
             { return c->getTotalThorTime(); }
+    virtual WUGraphState queryGraphState(const char *graphName) const
+            { return c->queryGraphState(graphName); }
+    virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
+            { return c->queryNodeState(graphName, nodeId); }
+    virtual void setGraphState(const char *graphName, WUGraphState state) const
+            { c->setGraphState(graphName, state); }
+    virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
+            { c->setNodeState(graphName, nodeId, state); }
+    virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
+            { return c->updateStats(graphName, creatorType, creator, subgraph); }
+    virtual void clearGraphProgress() const
+            { c->clearGraphProgress(); }
+
 
     virtual void clearExceptions()
             { c->clearExceptions(); }
@@ -1438,8 +1416,6 @@ public:
             { c->noteFileRead(file); }
     virtual void releaseFile(const char *fileName)
             { c->releaseFile(fileName); }
-    virtual void clearGraphProgress()
-            { c->clearGraphProgress(); }
     virtual void resetBeforeGeneration()
             { c->resetBeforeGeneration(); }
     virtual void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned)
@@ -2446,7 +2422,11 @@ public:
     virtual void deleteRepository(bool recreate)
     {
         Owned<IRemoteConnection> conn = sdsManager->connect("/WorkUnits", session, RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-        conn->close(true);
+        if (conn)
+            conn->close(true);
+        conn.setown(sdsManager->connect("/GraphProgress", session, RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
+        if (conn)
+            conn->close(true);
     }
     virtual void createRepository()
     {
@@ -3228,7 +3208,6 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll, bool deleteOwned, const Strin
     { 
         WARNLOG("Unknown exception during cleanupAndDelete: %s", p->queryName()); 
     }
-    CConstGraphProgress::deleteWuidProgress(p->queryName());
 }
 
 void CLocalWorkUnit::setTimeScheduled(const IJlibDateTime &val)
@@ -3671,7 +3650,8 @@ void CLocalWorkUnit::setSecurityToken(const char *value)
 
 bool CLocalWorkUnit::getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
 {
-    return CConstGraphProgress::getRunningGraph(p->queryName(), graphName, subId);
+    // Only implemented in derived classes
+    return false;
 }
 
 void CLocalWorkUnit::setJobName(const char *value)
@@ -6168,9 +6148,8 @@ void CLocalWorkUnit::releaseFile(const char *fileName)
     }
 }
 
-void CLocalWorkUnit::clearGraphProgress()
+void CLocalWorkUnit::clearGraphProgress() const
 {
-    CConstGraphProgress::deleteWuidProgress(p->queryName());
 }
 
 void CLocalWorkUnit::resetBeforeGeneration()
@@ -6353,10 +6332,7 @@ IStringVal& CLocalWUGraph::getLabel(IStringVal &str) const
 
 WUGraphState CLocalWUGraph::getState() const
 {
-    Owned<IConstWUGraphProgress> graphProgress = owner.getGraphProgress(p->queryProp("@name"));
-    if (!graphProgress)
-        return WUGraphUnknown;
-    return graphProgress->queryGraphState();
+    return owner.queryGraphState(p->queryProp("@name"));
 }
 
 
@@ -6509,117 +6485,60 @@ void CLocalWorkUnit::setHash(unsigned __int64 hash)
     p->setPropInt64("@hash", hash);
 }
 
+// getGraphs / getGraphsMeta
+// These are basically the same except for the amount of preloading they do, and the type of the iterator they return...
+// If a type other than any is requested, a postfilter is needed.
+
+template <class T, class U> class CFilteredGraphIteratorOf : public CInterfaceOf<T>
+{
+    WUGraphType type;
+    Owned<T> base;
+    bool match()
+    {
+        return  base->query().getType()==type;
+    }
+public:
+    CFilteredGraphIteratorOf<T,U>(T *_base, WUGraphType _type)
+        : base(_base), type(_type)
+    {
+    }
+    bool first()
+    {
+        if (!base->first())
+            return false;
+        if (match())
+            return true;
+        return next();
+    }
+    bool next()
+    {
+        while (base->next())
+            if (match())
+                return true;
+        return false;
+    }
+    virtual bool isValid()
+    {
+        return base->isValid();
+    }
+    U & query()
+    {
+        return base->query();
+    }
+};
+
 IConstWUGraphMetaIterator& CLocalWorkUnit::getGraphsMeta(WUGraphType type) const
 {
     /* NB: this method should be 'cheap', loadGraphs() creates IConstWUGraph interfaces to the graphs
-     * it does not actually pull the graph data. We only use IConstWUGraphMeta here, which never probes the xgmml
-     * This method also connects to the graph progress (/GraphProgress/<wuid>) using a single connection, in order
-     * to get state information, it does not pull all the progress data.
+     * it does not actually pull the graph data. We only use IConstWUGraphMeta here, which never probes the xgmml.
      */
 
     CriticalBlock block(crit);
     loadGraphs(false);
-
-    class CConstWUGraphMetaIterator: public CInterface, implements IConstWUGraphMetaIterator, implements IConstWUGraphMeta
-    {
-        StringAttr wuid;
-        WUGraphType type;
-        Owned<IConstWUGraphIterator> graphIter;
-        IConstWUGraph *curGraph;
-        Owned<IConstWUGraphProgress> curGraphProgress;
-        Owned<IRemoteConnection> progressConn;
-        bool match()
-        {
-            return (GraphTypeAny == type) || (type == graphIter->query().getType());
-        }
-
-        void setCurrent(IConstWUGraph &graph)
-        {
-            curGraph = &graph;
-            SCMStringBuffer graphName;
-            curGraph->getName(graphName);
-            if (progressConn)
-            {
-                IPropertyTree *progress = progressConn->queryRoot()->queryPropTree(graphName.str());
-                if (progress)
-                {
-                    curGraphProgress.setown(new CConstGraphProgress(wuid, graphName.str(), progress));
-                    return;
-                }
-            }
-            curGraphProgress.clear();
-        }
-    public:
-        IMPLEMENT_IINTERFACE;
-        CConstWUGraphMetaIterator(const char *_wuid, IConstWUGraphIterator *_graphIter, WUGraphType _type)
-            : wuid(_wuid), graphIter(_graphIter), type(_type)
-        {
-            curGraph = NULL;
-            StringBuffer progressPath;
-            progressPath.append("/GraphProgress/").append(wuid);
-            progressConn.setown(querySDS().connect(progressPath.str(), myProcessSession(), RTM_NONE, SDS_LOCK_TIMEOUT));
-        }
-        virtual bool first()
-        {
-            curGraph = NULL;
-            curGraphProgress.clear();
-            if (!graphIter->first())
-                return false;
-            if (match())
-            {
-                setCurrent(graphIter->query());
-                return true;
-            }
-            return next();
-        }
-        virtual bool next()
-        {
-            while (graphIter->next())
-            {
-                if (match())
-                {
-                    setCurrent(graphIter->query());
-                    return true;
-                }
-            }
-            curGraph = NULL;
-            curGraphProgress.clear();
-            return false;
-        }
-        virtual bool isValid()
-        {
-            return NULL != curGraph;
-        }
-        virtual IConstWUGraphMeta & query()
-        {
-            return *this;
-        }
-        // IConstWUGraphMeta
-        virtual IStringVal & getName(IStringVal & ret) const
-        {
-            return curGraph->getName(ret);
-        }
-        virtual IStringVal & getLabel(IStringVal & ret) const
-        {
-            return curGraph->getLabel(ret);
-        }
-        virtual IStringVal & getTypeName(IStringVal & ret) const
-        {
-            return curGraph->getTypeName(ret);
-        }
-        virtual WUGraphType getType() const
-        {
-            return curGraph->getType();
-        }
-        virtual WUGraphState getState() const
-        {
-            if (!curGraphProgress)
-                return WUGraphUnknown;
-            return curGraphProgress->queryGraphState();
-        }
-    };
-    IConstWUGraphIterator *graphIter = new CArrayIteratorOf<IConstWUGraph,IConstWUGraphIterator> (graphs, 0, (IConstWorkUnit *) this);
-    return * new CConstWUGraphMetaIterator(p->queryName(), graphIter, type);
+    IConstWUGraphMetaIterator *giter = new CArrayIteratorOf<IConstWUGraph,IConstWUGraphMetaIterator> (graphs, 0, (IConstWorkUnit *) this);
+    if (type!=GraphTypeAny)
+        giter = new CFilteredGraphIteratorOf<IConstWUGraphMetaIterator, IConstWUGraphMeta>(giter,type);
+    return *giter;
 }
 
 IConstWUGraphIterator& CLocalWorkUnit::getGraphs(WUGraphType type) const
@@ -6627,48 +6546,8 @@ IConstWUGraphIterator& CLocalWorkUnit::getGraphs(WUGraphType type) const
     CriticalBlock block(crit);
     loadGraphs(true);
     IConstWUGraphIterator *giter = new CArrayIteratorOf<IConstWUGraph,IConstWUGraphIterator> (graphs, 0, (IConstWorkUnit *) this);
-    if (type!=GraphTypeAny) {
-        class CConstWUGraphIterator: public CInterface, implements IConstWUGraphIterator
-        {
-            WUGraphType type;
-            Owned<IConstWUGraphIterator> base;
-            bool match()
-            {
-                return  base->query().getType()==type;
-            }
-        public:
-            IMPLEMENT_IINTERFACE;
-            CConstWUGraphIterator(IConstWUGraphIterator *_base,WUGraphType _type)
-                : base(_base)
-            {
-                type = _type;
-            }
-            bool first()
-            {
-                if (!base->first())
-                    return false;
-                if (match())
-                    return true;
-                return next();
-            }
-            bool next()
-            {
-                while (base->next())
-                    if (match())
-                        return true;
-                return false;
-            }
-            virtual bool isValid()
-            {
-                return base->isValid();
-            }
-            IConstWUGraph & query()
-            {
-                return base->query();
-            }
-        };
-        giter = new CConstWUGraphIterator(giter,type);
-    }
+    if (type!=GraphTypeAny)
+        giter = new CFilteredGraphIteratorOf<IConstWUGraphIterator, IConstWUGraph>(giter, type);
     return *giter;
 }
 
@@ -6700,15 +6579,32 @@ void CLocalWorkUnit::createGraph(const char * name, const char *label, WUGraphTy
 
 IConstWUGraphProgress *CLocalWorkUnit::getGraphProgress(const char *name) const
 {
-    CriticalBlock block(crit);
-    return new CConstGraphProgress(p->queryName(), name);
+    throwUnexpected();   // Should only be used for persisted workunits
+}
+WUGraphState CLocalWorkUnit::queryGraphState(const char *graphName) const
+{
+    throwUnexpected();   // Should only be used for persisted workunits
+}
+WUGraphState CLocalWorkUnit::queryNodeState(const char *graphName, WUGraphIDType nodeId) const
+{
+    throwUnexpected();   // Should only be used for persisted workunits
+}
+void CLocalWorkUnit::setGraphState(const char *graphName, WUGraphState state) const
+{
+    throwUnexpected();   // Should only be used for persisted workunits
+}
+void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
+{
+    throwUnexpected();   // Should only be used for persisted workunits
+}
+IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
+{
+    throwUnexpected();   // Should only be used for persisted workunits
 }
 
 void CLocalWUGraph::setName(const char *str)
 {
     p->setProp("@name", str);
-    progress.clear();
-    progress.setown(new CConstGraphProgress(owner.queryWuid(), str));
 }
 
 void CLocalWUGraph::setLabel(const char *str)
@@ -6834,17 +6730,16 @@ IPropertyTree * CLocalWUGraph::getXGMMLTree(bool doMergeProgress) const
     else
     {
         Owned<IPropertyTree> copy = createPTreeFromIPT(graph);
-        Owned<IConstWUGraphProgress> _progress;
-        if (progress) _progress.set(progress);
-        else
-            _progress.setown(new CConstGraphProgress(owner.queryWuid(), p->queryProp("@name")));
-
-        //MORE: Eventually this should directly access the new stats structure
-        unsigned progressV = _progress->queryFormatVersion();
-        Owned<IPropertyTree> progressTree = _progress->getProgressTree();
-        Owned<IPropertyTreeIterator> nodeIterator = copy->getElements("node");
-        ForEach (*nodeIterator)
-            mergeProgress(nodeIterator->query(), *progressTree, progressV);
+        Owned<IConstWUGraphProgress> progress = owner.getGraphProgress(p->queryProp("@name"));
+        if (progress)
+        {
+            //MORE: Eventually this should directly access the new stats structure
+            unsigned progressV = progress->queryFormatVersion();
+            Owned<IPropertyTree> progressTree = progress->getProgressTree();
+            Owned<IPropertyTreeIterator> nodeIterator = copy->getElements("node");
+            ForEach (*nodeIterator)
+                mergeProgress(nodeIterator->query(), *progressTree, progressV);
+        }
         return copy.getClear();
     }
 }

+ 10 - 12
common/workunit/workunit.hpp

@@ -722,10 +722,6 @@ interface IPropertyTree;
 interface IConstWUGraphProgress : extends IInterface
 {
     virtual IPropertyTree * getProgressTree() = 0;
-    virtual WUGraphState queryGraphState() = 0;
-    virtual WUGraphState queryNodeState(WUGraphIDType nodeId) = 0;
-    virtual IWUGraphProgress * update() = 0;
-    virtual IWUGraphStats * update(StatisticCreatorType creatorType, const char * creator, unsigned subgraph) = 0;
     virtual unsigned queryFormatVersion() = 0;
 };
 
@@ -735,12 +731,6 @@ interface IWUGraphStats : public IInterface
     virtual IStatisticGatherer & queryStatsBuilder() = 0;
 };
 
-interface IWUGraphProgress : extends IConstWUGraphProgress
-{
-    virtual void setGraphState(WUGraphState state) = 0;
-    virtual void setNodeState(WUGraphIDType nodeId, WUGraphState state) = 0;
-};
-
 
 interface IConstWUTimeStamp : extends IInterface
 {
@@ -1053,6 +1043,16 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
     virtual IStringIterator *getLogs(const char *type, const char *instance=NULL) const = 0;
     virtual IStringIterator *getProcesses(const char *type) const = 0;
     virtual IPropertyTreeIterator* getProcesses(const char *type, const char *instance) const = 0;
+
+    // Note that these don't read/modify the workunit itself, but rather the associated progress info.
+    // As such they can be called without locking the workunit, and are 'const' as far as the WU is concerned.
+
+    virtual WUGraphState queryGraphState(const char *graphName) const = 0;
+    virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
+    virtual void setGraphState(const char *graphName, WUGraphState state) const = 0;
+    virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
+    virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const = 0;
+    virtual void clearGraphProgress() const = 0;
 };
 
 
@@ -1115,7 +1115,6 @@ interface IWorkUnit : extends IConstWorkUnit
     virtual void setIsClone(bool value) = 0;
     virtual void setTimeScheduled(const IJlibDateTime & val) = 0;
     virtual void noteFileRead(IDistributedFile * file) = 0;
-    virtual void clearGraphProgress() = 0;
     virtual void resetBeforeGeneration() = 0;
     virtual bool switchThorQueue(const char * newcluster, IQueueSwitcher * qs) = 0;
     virtual void setAllowedClusters(const char * value) = 0;
@@ -1134,7 +1133,6 @@ interface IWorkUnit : extends IConstWorkUnit
     virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val) = 0;
     virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val) = 0;
     virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val) = 0;
-//  virtual void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val) = 0;
     virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val) = 0;
     virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *) = 0;
     virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val) = 0;

+ 25 - 2
common/workunit/workunit.ipp

@@ -249,6 +249,13 @@ public:
     virtual IConstWUGraphMetaIterator & getGraphsMeta(WUGraphType type) const;
     virtual IConstWUGraph * getGraph(const char *name) const;
     virtual IConstWUGraphProgress * getGraphProgress(const char * name) const;
+    virtual WUGraphState queryGraphState(const char *graphName) const;
+    virtual void setGraphState(const char *graphName, WUGraphState state) const;
+    virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
+    virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
+    virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const;
+    void clearGraphProgress() const;
+
     virtual const char *queryJobName() const;
     virtual IConstWUPlugin * getPluginByName(const char * name) const;
     virtual IConstWUPluginIterator & getPlugins() const;
@@ -371,7 +378,6 @@ public:
     void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner);
     void noteFileRead(IDistributedFile *file);
     void releaseFile(const char *fileName);
-    void clearGraphProgress();
     void resetBeforeGeneration();
     void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned);
     void deleteTemporaries();
@@ -689,7 +695,6 @@ class CLocalWUGraph : public CInterface, implements IConstWUGraph
     const CLocalWorkUnit &owner;
     Owned<IPropertyTree> p;
     mutable Owned<IPropertyTree> graph; // cached copy of graph xgmml
-    mutable Linked<IConstWUGraphProgress> progress;
     unsigned wuidVersion;
 
     void mergeProgress(IPropertyTree &tree, IPropertyTree &progressTree, const unsigned &progressV) const;
@@ -714,4 +719,22 @@ public:
     void setXGMMLTree(IPropertyTree * tree);
 };
 
+class CWuGraphStats : public CInterfaceOf<IWUGraphStats>
+{
+public:
+    CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id);
+    virtual void beforeDispose();
+    virtual IStatisticGatherer & queryStatsBuilder();
+protected:
+    Owned<IPropertyTree> progress;
+    Owned<IStatisticGatherer> collector;
+    StringAttr creator;
+    StatisticCreatorType creatorType;
+    unsigned id;
+};
+
+#define PROGRESS_FORMAT_V 2
+
+extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress);
+
 #endif

+ 2 - 2
deployment/deploy/deploy.cpp

@@ -1127,8 +1127,8 @@ IPropertyTree* getInstances(const IPropertyTree* pEnvRoot, const char* compName,
     IPropertyTree* pComponent = &iter->query();
     const char* type = pComponent->queryName();
     if (stricmp(type, "Topology")!=0 && stricmp(type, "Directories")!=0 && 
-        ((!compName && !compType) || (compName && !strcmp(pComponent->queryProp("@name"), compName)) ||
-        (!compName && compType && !strcmp(pComponent->queryProp("@buildSet"), compType))))
+        ((!compName && !compType) || (compName && pComponent->queryProp("@name") && !strcmp(pComponent->queryProp("@name"), compName)) ||
+        (!compName && compType && pComponent->queryProp("@buildSet") && !strcmp(pComponent->queryProp("@buildSet"), compType))))
     {
       const char* name    = pComponent->queryProp("@name");
       const char* build   = pComponent->queryProp("@build");

+ 1 - 1
ecl/eclagent/eclagent.cpp

@@ -1843,7 +1843,7 @@ void EclAgent::doProcess()
             if (isRemoteWorkunit)
             {
                 w->setAgentSession(myProcessSession());
-                w->clearGraphProgress();
+                w->clearGraphProgress();  // Should Roxie do this too??
             }
             if (debugContext)   
             {

+ 2 - 1
ecl/eclagent/eclagent.ipp

@@ -1063,7 +1063,8 @@ public:
     void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml, bool enableProbe);
     void execute(const byte * parentExtract);
     void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
-    IConstWUGraphProgress * getGraphProgress();
+    IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned subgraph);
+
     EclSubGraph * idToGraph(unsigned id);
     EclGraphElement * idToActivity(unsigned id);
     const char *queryGraphName() { return graphName; }

+ 35 - 71
ecl/eclagent/eclgraph.cpp

@@ -728,42 +728,6 @@ IHThorException * EclGraphElement::makeWrappedException(IException * e)
 
 //---------------------------------------------------------------------------
 
-class GraphRunningState
-{
-public:
-    GraphRunningState(EclGraph & parent, unsigned _id) : graphProgress(parent.getGraphProgress()), id(_id), running(true)
-    {
-        set(WUGraphRunning);
-    }
-
-    ~GraphRunningState()
-    {
-        if(running)        
-            set(WUGraphFailed);
-    }
-
-    void complete()
-    {
-        set(WUGraphComplete);
-        running = false;
-    }
-
-private:
-    void set(WUGraphState state)
-    {
-        Owned<IWUGraphProgress> progress = graphProgress->update();
-        if(id)
-            progress->setNodeState(id, state);
-        else
-            progress->setGraphState(state);
-    }
-
-private:
-    Owned<IConstWUGraphProgress> graphProgress;
-    unsigned id;
-    bool running;
-};
-
 EclSubGraph::EclSubGraph(IAgentContext & _agent, EclGraph & _parent, EclSubGraph * _owner, unsigned _seqNo, bool enableProbe, CHThorDebugContext * _debugContext, IProbeManager * _probeManager)
     : parent(_parent), owner(_owner), seqNo(_seqNo), probeEnabled(enableProbe), debugContext(_debugContext), probeManager(_probeManager)
 {
@@ -883,8 +847,7 @@ void EclSubGraph::updateProgress()
 {
     if (!isChildGraph && agent->queryRemoteWorkunit())
     {
-        Owned<IConstWUGraphProgress> graphProgress = parent.getGraphProgress();
-        Owned<IWUGraphStats> progress = graphProgress->update(queryStatisticsComponentType(), queryStatisticsComponentName(), id);
+        Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), id);
         IStatisticGatherer & stats = progress->queryStatsBuilder();
         updateProgress(stats);
     }
@@ -1198,42 +1161,47 @@ void EclGraph::createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml, boo
 
 void EclGraph::execute(const byte * parentExtract)
 {
-    GraphRunningState * run = NULL;
     if (agent->queryRemoteWorkunit())
-        run = new GraphRunningState(*this, 0);
+        wu->setGraphState(queryGraphName(), WUGraphRunning);
 
-    unsigned startTime = msTick();
-    aindex_t lastSink = -1;
-    ForEachItemIn(idx, graphs)
+    try
     {
-        EclSubGraph & cur = graphs.item(idx);
-        if (cur.isSink)
-            cur.execute(parentExtract);
-    }
+        unsigned startTime = msTick();
+        aindex_t lastSink = -1;
+        ForEachItemIn(idx, graphs)
+        {
+            EclSubGraph & cur = graphs.item(idx);
+            if (cur.isSink)
+                cur.execute(parentExtract);
+        }
 
-    {
-        unsigned elapsed = msTick()-startTime;
+        {
+            unsigned elapsed = msTick()-startTime;
 
-        Owned<IWorkUnit> wu(agent->updateWorkUnit());
+            Owned<IWorkUnit> wu(agent->updateWorkUnit());
 
-        StringBuffer description;
-        formatGraphTimerLabel(description, queryGraphName(), 0, 0);
+            StringBuffer description;
+            formatGraphTimerLabel(description, queryGraphName(), 0, 0);
 
-        unsigned __int64 totalTimeNs = 0;
-        unsigned __int64 totalThisTimeNs = 0;
-        unsigned __int64 elapsedNs = milliToNano(elapsed);
-        const char *totalTimeStr = "Total cluster time";
-        getWorkunitTotalTime(wu, "hthor", totalTimeNs, totalThisTimeNs);
+            unsigned __int64 totalTimeNs = 0;
+            unsigned __int64 totalThisTimeNs = 0;
+            unsigned __int64 elapsedNs = milliToNano(elapsed);
+            const char *totalTimeStr = "Total cluster time";
+            getWorkunitTotalTime(wu, "hthor", totalTimeNs, totalThisTimeNs);
 
-        updateWorkunitTimeStat(wu, SSTgraph, queryGraphName(), StTimeElapsed, description.str(), elapsedNs);
-        updateWorkunitTimeStat(wu, SSTglobal, GLOBAL_SCOPE, StTimeElapsed, NULL, totalThisTimeNs+elapsedNs);
-        wu->setStatistic(SCTsummary, "hthor", SSTglobal, GLOBAL_SCOPE, StTimeElapsed, totalTimeStr, totalTimeNs+elapsedNs, 1, 0, StatsMergeReplace);
-    }
+            updateWorkunitTimeStat(wu, SSTgraph, queryGraphName(), StTimeElapsed, description.str(), elapsedNs);
+            updateWorkunitTimeStat(wu, SSTglobal, GLOBAL_SCOPE, StTimeElapsed, NULL, totalThisTimeNs+elapsedNs);
+            wu->setStatistic(SCTsummary, "hthor", SSTglobal, GLOBAL_SCOPE, StTimeElapsed, totalTimeStr, totalTimeNs+elapsedNs, 1, 0, StatsMergeReplace);
+        }
 
-    if (run)
+        if (agent->queryRemoteWorkunit())
+            wu->setGraphState(queryGraphName(), WUGraphComplete);
+    }
+    catch (...)
     {
-        run->complete();
-        delete run;
+        if (agent->queryRemoteWorkunit())
+            wu->setGraphState(queryGraphName(), WUGraphFailed);
+        throw;
     }
 }
 
@@ -1288,12 +1256,10 @@ void EclGraph::updateLibraryProgress()
     //Check for old format embedded graph names, and don't update the stats if not the correct format
     if (!MATCHES_CONST_PREFIX(queryGraphName(), GraphScopePrefix))
         return;
-
-    Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
     ForEachItemIn(idx, graphs)
     {
         EclSubGraph & cur = graphs.item(idx);
-        Owned<IWUGraphStats> progress = graphProgress->update(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.id);
+        Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), cur.id);
         cur.updateProgress(progress->queryStatsBuilder());
     }
 }
@@ -1434,11 +1400,9 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)
 
 //---------------------------------------------------------------------------
 
-
-
-IConstWUGraphProgress * EclGraph::getGraphProgress()
+IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned subgraph)
 {
-    return wu->getGraphProgress(queryGraphName());
+    return wu->updateStats (queryGraphName(), creatorType, creator, subgraph);
 }
 
 IThorChildGraph * EclGraph::resolveChildQuery(unsigned subgraphId)

+ 54 - 1
ecl/wutest/wutest.cpp

@@ -419,6 +419,7 @@ class WuTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testCopy);
         CPPUNIT_TEST(testGraph);
+        CPPUNIT_TEST(testGraphProgress);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
@@ -864,8 +865,60 @@ protected:
             numIterated++;
         }
         ASSERT(numIterated==2);
+        wu.clear();
+        factory->deleteWorkUnit(wuid);
+    }
+    void testGraphProgress()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        Owned<IWorkUnit> createWu = factory->createWorkUnit("WuTest", NULL, NULL, NULL);
+        StringBuffer wuid(createWu->queryWuid());
+        createWu->createGraph("graph1", "graphLabel", GraphTypeActivities, createPTreeFromXMLString("<graph><node id='1'/></graph>"));
+        createWu->setState(WUStateCompleted);
+        createWu->commit();
+        createWu.clear();
+        Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
+        ASSERT(streq(wu->queryWuid(), wuid));
+
+        SCMStringBuffer s;
+        s.set("Not empty");
+        WUGraphIDType subid = 10;
+        bool ret = wu->getRunningGraph(s, subid);
+        ASSERT(!ret);
+        ASSERT(wu->queryGraphState("graph1")==WUGraphUnknown);
+        ASSERT(wu->queryNodeState("graph1", 1)==WUGraphUnknown);
+
+        wu->setGraphState("graph1",WUGraphRunning);
+        ASSERT(wu->queryGraphState("graph1")==WUGraphRunning);
+
+        wu->setNodeState("graph1", 1, WUGraphRunning);
+        ASSERT(wu->queryNodeState("graph1", 1)==WUGraphRunning);
+        ret = wu->getRunningGraph(s, subid);
+        ASSERT(ret);
+        ASSERT(streq(s.str(), "graph1"));
+        ASSERT(subid==1);
+
+        wu->setNodeState("graph1", 1, WUGraphComplete);
+        ASSERT(wu->queryNodeState("graph1", 1)==WUGraphComplete);
+        ret = wu->getRunningGraph(s, subid);
+        ASSERT(!ret);
+
+        Owned<IWUGraphStats> progress = wu->updateStats("graph1", SCThthor, queryStatisticsComponentName(), 1);
+        IStatisticGatherer & stats = progress->queryStatsBuilder();
+        {
+            StatsSubgraphScope subgraph(stats, 1);
+            stats.addStatistic(StTimeElapsed, 5000);
+        }
+        progress.clear();
+
+
+        ASSERT(wu->queryGraphState("graph1")==WUGraphRunning);
+        wu->clearGraphProgress();
+        ASSERT(wu->queryGraphState("graph1")==WUGraphUnknown);
+        wu.clear();
+        factory->deleteWorkUnit(wuid);
+    }
 
-}
     void sortStatistics(StringBuffer &xml)
     {
         Owned<IPropertyTree> p = createPTreeFromXMLString(xml);

+ 5 - 9
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -760,16 +760,12 @@ void WsWuInfo::getGraphInfo(IEspECLWorkunit &info, unsigned flags)
                     g->setWhenFinished(whenGraphFinished->getFormattedValue(s).str());
             }
 
-            Owned<IConstWUGraphProgress> progress = cw->getGraphProgress(name.str());
-            if (progress)
+            WUGraphState graphstate = cw->queryGraphState(name.str());
+            if (graphstate == WUGraphComplete)
+                g->setComplete(true);
+            if (version > 1.13 && graphstate == WUGraphFailed)
             {
-                WUGraphState graphstate= progress->queryGraphState();
-                if (graphstate == WUGraphComplete)
-                    g->setComplete(true);
-                if (version > 1.13 && graphstate == WUGraphFailed)
-                {
-                    g->setFailed(true);
-                }
+                g->setFailed(true);
             }
             graphs.append(*g.getLink());
         }

+ 207 - 203
plugins/cassandra/cassandrawu.cpp

@@ -263,63 +263,6 @@ public:
     }
 } wuidColumnMapper;
 
-static class GraphIdColumnMapper : implements CassandraColumnMapper
-{
-public:
-    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
-    {
-        rtlDataAttr str;
-        unsigned chars;
-        getUTF8Result(NULL, value, chars, str.refstr());
-        StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
-        if (strcmp(s, "Running")==0)  // The input XML structure is a little odd
-            return row;
-        else
-        {
-            if (!row->hasProp(s))
-                row->addPropTree(s, createPTree());
-            return row->queryPropTree(s);
-        }
-    }
-    virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
-    {
-        const char *value = row->queryName();
-        if (!value)
-            return false;
-        if (statement)
-            statement->bindString(idx, value);
-        return true;
-    }
-} graphIdColumnMapper;
-
-static class ProgressColumnMapper : implements CassandraColumnMapper
-{
-public:
-    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
-    {
-        rtlDataAttr str;
-        unsigned chars;
-        getDataResult(NULL, value, chars, str.refdata());  // Stored as a blob in case we want to compress
-        IPTree *child = createPTreeFromXMLString(chars, str.getstr());  // For now, assume we did not compress!
-        row->addPropTree(child->queryName(), child);
-        return child;
-    }
-    virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
-    {
-        // MORE - may need to read, and probably should write, compressed.
-        StringBuffer value;
-        ::toXML(row, value, 0, 0);
-        if (value.length())
-        {
-            if (statement)
-                statement->bindBytes(idx, (const cass_byte_t *) value.str(), value.length());
-            return true;
-        }
-        else
-            return false;
-    }
-} progressColumnMapper;
-
 static class BoolColumnMapper : implements CassandraColumnMapper
 {
 public:
@@ -459,27 +402,6 @@ public:
     }
 } bigintColumnMapper;
 
-static class SubgraphIdColumnMapper : implements CassandraColumnMapper
-{
-public:
-    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
-    {
-        __int64 id = getSignedResult(NULL, value);
-        if (id)
-            row->addPropInt64(name, id);
-        return row;
-    }
-    virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
-    {
-        if (statement)
-        {
-            int value = row->getPropInt(name);
-            statement->bindInt64(idx, value);
-        }
-        return true;
-    }
-} subgraphIdColumnMapper;
-
 static class SimpleMapColumnMapper : implements CassandraColumnMapper
 {
 public:
@@ -1049,7 +971,7 @@ static const CassandraXmlMapping filesReadSearchMappings [] =
 
 // The following describe child tables - all keyed by wuid
 
-enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuGraphProgressChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild,ChildTablesSize };
+enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild,ChildTablesSize };
 
 struct ChildTableInfo
 {
@@ -1095,24 +1017,6 @@ static const ChildTableInfo wuStatisticsTable =
     wuStatisticsMappings
 };
 
-static const CassandraXmlMapping wuGraphProgressMappings [] =
-{
-    {"partition", "int", NULL, hashRootNameColumnMapper},
-    {"wuid", "text", NULL, rootNameColumnMapper},
-    {"graphID", "text", NULL, graphIdColumnMapper},
-    {"progress", "blob", NULL, progressColumnMapper},  // NOTE - order of these is significant - this creates the subtree that ones below will modify
-    {"subgraphID", "text", "@id", subgraphIdColumnMapper},
-    {"state", "int", "@_state", intColumnMapper},
-    { NULL, "wuGraphProgress", "((partition, wuid), graphid, subgraphid)", stringColumnMapper}
-};
-
-static const ChildTableInfo wuGraphProgressTable =
-{
-    "Bit of a", "Special case",
-    WuGraphProgressChild,
-    wuGraphProgressMappings
-};
-
 static const CassandraXmlMapping wuGraphsMappings [] =
 {
     {"partition", "int", NULL, hashRootNameColumnMapper},
@@ -1222,7 +1126,39 @@ static const ChildTableInfo wuFilesReadTable =
 };
 
 // Order should match the enum above
-static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuGraphProgressTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, NULL };
+static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, NULL };
+
+// Graph progress tables are read directly, XML mappers not used
+
+static const CassandraXmlMapping wuGraphProgressMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"graphID", "text", NULL, stringColumnMapper},
+    {"subgraphID", "text", NULL, stringColumnMapper},
+    {"creator", "text", NULL, stringColumnMapper},
+    {"progress", "blob", NULL, blobColumnMapper},
+    { NULL, "wuGraphProgress", "((partition, wuid), graphID, subgraphID, creator)", stringColumnMapper}
+};
+
+static const CassandraXmlMapping wuGraphStateMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"graphID", "text", NULL, stringColumnMapper},
+    {"subgraphID", "text", NULL, stringColumnMapper},
+    {"state", "int", NULL, intColumnMapper},
+    { NULL, "wuGraphState", "((partition, wuid), graphID, subgraphID)", stringColumnMapper}
+};
+
+static const CassandraXmlMapping wuGraphRunningMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"graphID", "text", NULL, stringColumnMapper},
+    {"subgraphID", "int", NULL, intColumnMapper},
+    { NULL, "wuGraphRunning", "((partition, wuid))", stringColumnMapper}
+};
 
 interface ICassandraSession : public IInterface  // MORE - rename!
 {
@@ -1426,111 +1362,6 @@ extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *bat
     childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
 }
 
-/*
-extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
-{
-    StringBuffer names;
-    StringBuffer bindings;
-    StringBuffer tableName;
-    int numBound = getFieldNames(graphProgressMappings, names, bindings, tableName);
-    VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
-    DBGLOG("%s", insertQuery.str());
-    CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
-    CassandraFuture futurePrep(cass_session_prepare(session, insertQuery));
-    futurePrep.wait("prepare statement");
-    CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
-
-    Owned<IPTreeIterator> graphs = inXML->getElements("./graph*");
-    ForEach(*graphs)
-    {
-        IPTree &graph = graphs->query();
-        Owned<IPTreeIterator> subgraphs = graph.getElements("./node");
-        ForEach(*subgraphs)
-        {
-            IPTree &subgraph = subgraphs->query();
-            CassandraStatement update(cass_prepared_bind(prepared));
-            graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
-            graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
-            unsigned colidx = 2;
-            while (graphProgressMappings[colidx].columnName)
-            {
-                graphProgressMappings[colidx].mapper.fromXML(update, colidx, &subgraph, graphProgressMappings[colidx].xpath);
-                colidx++;
-            }
-            check(cass_batch_add_statement(batch, update));
-        }
-        // And one more with subgraphid = 0 for the graph status
-        CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
-        graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
-        graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
-        check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
-        unsigned colidx = 4;  // we skip progress and subgraphid
-        while (graphProgressMappings[colidx].columnName)
-        {
-            graphProgressMappings[colidx].mapper.fromXML(update, colidx, &graph, graphProgressMappings[colidx].xpath);
-            colidx++;
-        }
-        check(cass_batch_add_statement(batch, update));
-    }
-    if (inXML->hasProp("Running"))
-    {
-        IPTree *running = inXML->queryPropTree("Running");
-        CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
-        graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
-        graphProgressMappings[1].mapper.fromXML(update, 1, running, graphProgressMappings[1].xpath);
-        graphProgressMappings[2].mapper.fromXML(update, 2, running, graphProgressMappings[2].xpath);
-        check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
-        check(cass_batch_add_statement(batch, update));
-    }
-    CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
-    futureBatch.wait("execute");
-}
-
-extern void cassandraToGraphProgressXML(CassSession *session, const char *wuid)
-{
-    CassandraResult result(fetchDataForWu(wuid, session, graphProgressMappings));
-    Owned<IPTree> progress = createPTree(wuid);
-    CassandraIterator rows(cass_iterator_from_result(result));
-    while (cass_iterator_next(rows))
-    {
-        CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
-        unsigned colidx = 1;  // wuid is not returned
-        IPTree *ptree = progress;
-        while (cass_iterator_next(cols))
-        {
-            assertex(graphProgressMappings[colidx].columnName);
-            const CassValue *value = cass_iterator_get_column(cols);
-            // NOTE - this relies on the fact that progress is NULL when subgraphId=0, so that the status and id fields
-            // get set on the graph instead of on the child node in those cases.
-            if (value && !cass_value_is_null(value))
-                ptree = graphProgressMappings[colidx].mapper.toXML(ptree, graphProgressMappings[colidx].xpath, value);
-            colidx++;
-        }
-    }
-    StringBuffer out;
-    toXML(progress, out, 0, XML_SortTags|XML_Format);
-    printf("%s", out.str());
-}
-*/
-
-/*
-extern void cassandraTestGraphProgressXML()
-{
-    CassandraCluster cluster(cass_cluster_new());
-    cass_cluster_set_contact_points(cluster, "127.0.0.1");
-    CassandraSession session(cass_session_new());
-    CassandraFuture future(cass_session_connect_keyspace(session, cluster, "hpcc"));
-    future.wait("connect");
-
-    ensureTable(session, graphProgressMappings);
-    Owned<IPTree> inXML = createPTreeFromXMLFile("/data/rchapman/hpcc/testing/regress/ecl/a.xml");
-    graphProgressXMLtoCassandra(session, inXML);
-    const char *wuid = inXML->queryName();
-    cassandraToGraphProgressXML(session, wuid);
-}
-
-*/
-
 static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
 {
     CassandraIterator cols(cass_iterator_from_row(row));
@@ -2248,6 +2079,9 @@ public:
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
         deleteChildren(wuid);
         deleteSecondaries(wuid);
+        sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, *batch);
+        sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, *batch);
+        sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, *batch);
         CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
         update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         update.bindString(1, wuid);
@@ -2472,6 +2306,169 @@ public:
         }
     }
 
+    virtual void clearGraphProgress() const
+    {
+        const char *wuid = queryWuid();
+        CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
+        sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, batch);
+        sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, batch);
+        sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, batch);
+        CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
+        futureBatch.wait("clearGraphProgress");
+    }
+    virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
+    {
+        CassandraStatement statement(sessionCache->prepareStatement("SELECT graphID, subgraphID FROM wuGraphRunning where partition=? and wuid=?;"));
+        const char *wuid = queryWuid();
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("getRunningGraph");
+        CassandraResult result(cass_future_get_result(future));
+        if (cass_result_row_count(result))
+        {
+            const CassRow *row = cass_result_first_row(result);
+            assertex(row);
+            StringBuffer b;
+            getCassString(b, cass_row_get_column(row, 0));
+            graphName.set(b);
+            subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+            return true;
+        }
+        else
+            return false;
+    }
+    virtual IConstWUGraphProgress *getGraphProgress(const char *graphName) const
+    {
+        CassandraStatement statement(sessionCache->prepareStatement("SELECT subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=? and graphID=?;"));
+        const char *wuid = queryWuid();
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        statement.bindString(2, graphName);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("getGraphProgress");
+        CassandraResult result(cass_future_get_result(future));
+        CassandraIterator rows(cass_iterator_from_result(result));
+        if (!cass_result_row_count(result))
+            return NULL;
+        Owned<IPropertyTree> progress = createPTree(graphName);
+        progress->setPropBool("@stats", true);
+        progress->setPropInt("@format", PROGRESS_FORMAT_V);
+        while (cass_iterator_next(rows))
+        {
+            const CassRow *row = cass_iterator_get_row(rows);
+            unsigned subId = subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
+            StringBuffer creator, xml;
+            getCassString(creator, cass_row_get_column(row, 1));
+            getCassString(xml, cass_row_get_column(row, 2));
+            IPTree *stats = createPTreeFromXMLString(xml);
+            // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
+            progress->addPropTree(stats->queryName(), stats);
+        }
+        return createConstGraphProgress(queryWuid(), graphName, progress); // Links progress
+    }
+    WUGraphState queryGraphState(const char *graphName) const
+    {
+        return queryNodeState(graphName, 0);
+    }
+    WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
+    {
+        CassandraStatement statement(sessionCache->prepareStatement("SELECT state FROM wuGraphState where partition=? and wuid=? and graphID=? and subgraphID=?;"));
+        const char *wuid = queryWuid();
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        statement.bindString(2, graphName);
+        statement.bindInt32(3, nodeId);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("queryNodeState");
+        CassandraResult result(cass_future_get_result(future));
+        if (cass_result_row_count(result))
+            return (WUGraphState) getUnsignedResult(NULL, getSingleResult(result));
+        else
+            return WUGraphUnknown;
+    }
+    void setGraphState(const char *graphName, WUGraphState state) const
+    {
+        setNodeState(graphName, 0, state);
+    }
+    void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
+    {
+        CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphState (partition, wuid, graphID, subgraphID, state) values (?,?,?,?,?);"));
+        const char *wuid = queryWuid();
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        statement.bindString(2, graphName);
+        statement.bindInt32(3, nodeId);
+        statement.bindInt32(4, (int) state);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("setNodeState update state");
+        if (nodeId)
+        {
+            switch (state)
+            {
+                case WUGraphRunning:
+                {
+                    CassandraStatement statement2(sessionCache->prepareStatement("INSERT INTO wuGraphRunning (partition, wuid, graphID, subgraphID) values (?,?,?,?);"));
+                    statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+                    statement2.bindString(1, wuid);
+                    statement2.bindString(2, graphName);
+                    statement2.bindInt32(3, nodeId);
+                    CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
+                    future.wait("setNodeState update running");
+                    break;
+                }
+                case WUGraphComplete:
+                {
+                    CassandraStatement statement3(sessionCache->prepareStatement("DELETE FROM wuGraphRunning where partition=? and wuid=?;"));
+                    statement3.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+                    statement3.bindString(1, wuid);
+                    CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement3));
+                    future.wait("setNodeState remove running");
+                    break;
+                }
+            }
+        }
+    }
+    class CCassandraWuGraphStats : public CWuGraphStats
+    {
+    public:
+        CCassandraWuGraphStats(const char *_wuid, const ICassandraSession *_sessionCache, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
+        : CWuGraphStats(createPTree(_rootScope), _creatorType, _creator, _rootScope, _id),
+          wuid(_wuid), sessionCache(_sessionCache)
+        {
+        }
+        virtual void beforeDispose()
+        {
+            CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
+            CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
+            statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+            statement.bindString(1, wuid);
+            statement.bindString(2, progress->queryName());
+            statement.bindInt32(3, id);
+            statement.bindString(4, creator);
+            StringBuffer tag;
+            tag.append("sg").append(id);
+            IPTree *sq = progress->queryPropTree(tag);
+            assertex(sq);
+            StringBuffer xml;
+            toXML(sq, xml);
+            statement.bindString(5, xml);
+            CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+            future.wait("update stats");
+        }
+
+    protected:
+        Linked<const ICassandraSession> sessionCache;
+        StringAttr wuid;
+    };
+
+
+    IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
+    {
+        return new CCassandraWuGraphStats(queryWuid(), sessionCache, creatorType, creator, graphName, subgraph);
+    }
+
+
     virtual void _loadFilesRead() const
     {
         checkChildLoaded(wuFilesReadTable);        // Lazy populate the FilesRead branch of p from Cassandra
@@ -3203,6 +3200,10 @@ public:
         errCount += checkOrphans(searchMappings, 3, batch);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
             errCount += checkOrphans(table[0]->mappings, 1, batch);
+        errCount += checkOrphans(wuGraphProgressMappings, 1, batch);
+        errCount += checkOrphans(wuGraphStateMappings, 1, batch);
+        errCount += checkOrphans(wuGraphRunningMappings, 1, batch);
+
         // 3. Commit fixes
         if (batch)
         {
@@ -3240,6 +3241,9 @@ public:
         ensureTable(session, filesReadSearchMappings);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
             ensureTable(session, table[0]->mappings);
+        ensureTable(session, wuGraphProgressMappings);
+        ensureTable(session, wuGraphStateMappings);
+        ensureTable(session, wuGraphRunningMappings);
     }
 
     virtual const char *queryStoreType() const

+ 2 - 9
roxie/ccd/ccdcontext.cpp

@@ -1016,8 +1016,7 @@ public:
 class CRoxieContextBase : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
 {
 protected:
-    Owned<IConstWUGraphProgress> progress;  // These need to be destroyed very late (particularly, after the childgraphs)
-    Owned<IWUGraphStats> graphStats;
+    Owned<IWUGraphStats> graphStats;   // This needs to be destroyed very late (particularly, after the childgraphs)
     mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
     Owned<IRowManager> rowManager; // NOTE: the order of destruction here is significant. For leak check to work destroy this BEFORE allAllocators, but after most other things
     Owned <IDebuggableContext> debugContext;
@@ -1322,10 +1321,7 @@ public:
         if (debugContext)
             debugContext->checkBreakpoint(DebugStateGraphStart, NULL, graphName);
         if (workUnit)
-        {
-            progress.setown(workUnit->getGraphProgress(graph->queryName()));
-            graphStats.setown(progress->update(SCTroxie, queryStatisticsComponentName(), 0));
-        }
+            graphStats.setown(workUnit->updateStats(graph->queryName(), SCTroxie, queryStatisticsComponentName(), 0));
     }
 
     virtual void endGraph(cycle_t startCycles, bool aborting)
@@ -1356,10 +1352,7 @@ public:
             graph.clear();
             childGraphs.kill();
             if (graphStats)
-            {
                 graphStats.clear();
-                progress.clear();
-            }
         }
     }
 

+ 7 - 26
thorlcr/graph/thgraphmaster.cpp

@@ -1533,8 +1533,7 @@ void CJobMaster::saveSpills()
     {
         CFileUsageEntry &entry = iter->query();
         StringAttr tmpName = entry.queryName();
-        Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
-        if (WUGraphComplete == graphProgress->queryNodeState(entry.queryGraphId()))
+        if (WUGraphComplete == workunit->queryNodeState(queryGraphName(), entry.queryGraphId()))
         {
             IArrayOf<IGroup> groups;
             StringArray clusters;
@@ -1690,27 +1689,22 @@ bool CJobMaster::go()
     if (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction())
         throw MakeStringException(0, "Job paused at start, exiting");
 
-    Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
     bool allDone = true;
     unsigned concurrentSubGraphs = (unsigned)getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
     try
     {
         startJob();
-        Owned<IWUGraphProgress> progress = graphProgress->update();
-        progress->setGraphState(WUGraphRunning);
-        progress.clear();
-        
+        workunit->setGraphState(queryGraphName(), WUGraphRunning);
         Owned<IThorGraphIterator> iter = getSubGraphs();
         CICopyArrayOf<CMasterGraph> toRun;
         ForEach(*iter)
         {
             CMasterGraph &graph = (CMasterGraph &)iter->query();
-            if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == graphProgress->queryNodeState(graph.queryGraphId()))
+            if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == workunit->queryNodeState(queryGraphName(), graph.queryGraphId()))
                 graph.setCompleteEx();
             else
                 toRun.append(graph);
         }
-        graphProgress.clear();
         ForEachItemInRev(g, toRun)
         {
             if (aborted) break;
@@ -1733,11 +1727,7 @@ bool CJobMaster::go()
     }
     catch (IException *e) { fireException(e); e->Release(); }
     catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
-    graphProgress.setown(getGraphProgress());
-    Owned<IWUGraphProgress> progress = graphProgress->update();
-    progress->setGraphState(aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
-    progress.clear();
-    graphProgress.clear();
+    workunit->setGraphState(queryGraphName(), aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
 
     if (queryPausing())
         saveSpills();
@@ -2397,8 +2387,7 @@ void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentE
     {
         if (!queryOwner())
         {
-            Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
-            if (WUGraphComplete == graphProgress->queryNodeState(graphId))
+            if (WUGraphComplete == job.queryWorkUnit().queryNodeState(job.queryGraphName(), graphId))
                 setCompleteEx();
         }
     }
@@ -2498,12 +2487,7 @@ bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
             return false;
     }
     if (!queryOwner())
-    {
-        Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
-        Owned<IWUGraphProgress> progress = graphProgress->update();
-        progress->setNodeState(graphId, WUGraphRunning);
-        progress.clear();
-    }
+        job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, WUGraphRunning);
     return true;
 }
 
@@ -2664,10 +2648,7 @@ void CMasterGraph::setComplete(bool tf)
     CGraphBase::setComplete(tf);
     if (tf && !queryOwner())
     {
-        Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
-        Owned<IWUGraphProgress> progress = graphProgress->update();
-        progress->setNodeState(graphId, graphDone?WUGraphComplete:WUGraphFailed);
-        progress.clear();
+        job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, graphDone?WUGraphComplete:WUGraphFailed);
     }
 }
 

+ 0 - 1
thorlcr/graph/thgraphmaster.ipp

@@ -162,7 +162,6 @@ public:
         }
         return NULL;
     }
-    IConstWUGraphProgress *getGraphProgress() { return workunit->getGraphProgress(queryGraphName()); }
 
     CGraphTableCopy executed;
     CriticalSection exceptCrit;

+ 4 - 8
thorlcr/master/thdemonserver.cpp

@@ -126,14 +126,13 @@ private:
             try
             {
                 IConstWorkUnit &currentWU = activeGraphs.item(0).queryJob().queryWorkUnit();
-                Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)activeGraphs.item(0).queryJob()).getGraphProgress();
+                const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName();
                 ForEachItemIn (g, activeGraphs)
                 {
                     CGraphBase &graph = activeGraphs.item(g);
-                    Owned<IWUGraphStats> stats = graphProgress->update(SCTthor, queryStatisticsComponentName(), graph.queryGraphId());
+                    Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), graph.queryGraphId());
                     reportGraph(stats->queryStatsBuilder(), &graph, finished);
                 }
-                graphProgress.clear();
                 Owned<IWorkUnit> wu = &currentWU.lock();
                 ForEachItemIn (g2, activeGraphs)
                 {
@@ -156,15 +155,12 @@ private:
         try
         {
             IConstWorkUnit &currentWU = graph->queryJob().queryWorkUnit();
-            Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)graph->queryJob()).getGraphProgress();
-
+            const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName();
             {
-                Owned<IWUGraphStats> stats = graphProgress->update(SCTthor, queryStatisticsComponentName(), graph->queryGraphId());
+                Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), graph->queryGraphId());
                 reportGraph(stats->queryStatsBuilder(), graph, finished);
             }
 
-            graphProgress.clear();
-
             Owned<IWorkUnit> wu = &currentWU.lock();
             reportStatus(wu, *graph, startTime, finished, success);