Преглед изворни кода

Merge pull request #7652 from richardkchapman/cassandra-sasha

HPCC-14031 Sasha accesses dali workunit tree directly

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 9 година
родитељ
комит
1380d52171

+ 155 - 140
common/workunit/workunit.cpp

@@ -938,6 +938,7 @@ public:
             if (progress)
             {
                 StringBuffer path;
+                // NOTE - the node state info still uses the old graph layout, even when the stats are using the new...
                 path.append("node[@id=\"").append(nodeId).append("\"]/@_state");
                 return (WUGraphState) progress->getPropInt(path, (unsigned) WUGraphUnknown);
             }
@@ -1057,6 +1058,7 @@ public:
         VStringBuffer path("/GraphProgress/%s", queryWuid());
         Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
         IPTree *progress = ensurePTree(conn->queryRoot(), graphName);
+        // NOTE - the node state info still uses the old graph layout, even when the stats are using the new...
         path.clear().append("node[@id=\"").append(nodeId).append("\"]");
         IPropertyTree *node = progress->queryPropTree(path.str());
         if (!node)
@@ -1102,6 +1104,17 @@ protected:
         VStringBuffer path("/GraphProgress/%s/%s", queryWuid(), graphName);
         return querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
     }
+    IPropertyTree *getGraphProgressTree() const
+    {
+        IRemoteConnection *conn = queryProgressConnection();
+        if (conn)
+        {
+            Owned<IPropertyTree> tmp = createPTree("GraphProgress");
+            mergePTree(tmp,conn->queryRoot());
+            return tmp.getClear();
+        }
+        return NULL;
+    }
     Owned<IRemoteConnection> connection;
     mutable Owned<IRemoteConnection> progressConnection;
 };
@@ -2170,6 +2183,105 @@ IWorkUnit* CWorkUnitFactory::updateWorkUnit(const char *wuid, ISecManager *secmg
     }
 }
 
+IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath)
+{
+    Owned<IPropertyTree> ret;
+    IPropertyTree * branch = from->queryPropTree(xpath);
+    if(branch)
+    {
+        ret.setown(createPTreeFromIPT(branch));
+        from->removeTree(branch);
+    }
+    return ret.getClear();
+}
+
+bool CWorkUnitFactory::restoreWorkUnit(const char *base, const char *wuid)
+{
+    StringBuffer path(base);
+    addPathSepChar(path).append(wuid).append(".xml");
+    Owned<IPTree> pt = createPTreeFromXMLFile(path);
+    if (!pt)
+        return false;
+    CDateTime dt;
+    dt.setNow();
+    StringBuffer dts;
+    dt.getString(dts);
+    pt->setProp("@restoredDate", dts.str());
+    Owned<IPropertyTree> generatedDlls = pruneBranch(pt, "GeneratedDlls[1]");
+    Owned<IPropertyTree> associatedFiles;
+    IPropertyTree *srcAssociated = pt->queryPropTree("Query/Associated");
+    if (srcAssociated)
+        associatedFiles.setown(createPTreeFromIPT(srcAssociated));
+    // The updating of the repo is implementation specific...
+    if (!_restoreWorkUnit(pt.getClear(), wuid))
+        return false;
+    // now kludge back GeneratedDlls
+    if (generatedDlls)
+    {
+        Owned<IPropertyTreeIterator> dlls = generatedDlls->getElements("GeneratedDll");
+        for(dlls->first(); dlls->isValid(); dlls->next())
+        {
+            IPropertyTree & dll = dlls->query();
+            char const * name = dll.queryProp("@name");
+            char const * kind = dll.queryProp("@kind");
+            char const * location = dll.queryProp("@location");
+            Owned<IDllEntry> got = queryDllServer().getEntry(name);
+            if (!got)
+            {
+                RemoteFilename dstRfn;
+                dstRfn.setRemotePath(location);
+                StringBuffer srcPath(base);
+                addPathSepChar(srcPath);
+                dstRfn.getTail(srcPath);
+                OwnedIFile srcFile = createIFile(srcPath);
+                OwnedIFile dstFile = createIFile(dstRfn);
+                copyFile(dstFile, srcFile);
+                queryDllServer().registerDll(name, kind, location);
+            }
+        }
+    }
+    if (associatedFiles)
+    {
+        Owned<IPropertyTreeIterator> associated = associatedFiles->getElements("*");
+        ForEach(*associated)
+        {
+            IPropertyTree &file = associated->query();
+            const char *filename = file.queryProp("@filename");
+            SocketEndpoint ep(file.queryProp("@ip"));
+            RemoteFilename rfn;
+            rfn.setPath(ep, filename);
+            OwnedIFile dstFile = createIFile(rfn);
+            StringBuffer srcPath(base), name;
+            addPathSepChar(srcPath);
+            rfn.getTail(name);
+            srcPath.append(name);
+            if (generatedDlls)
+            {
+                VStringBuffer gDllPath("GeneratedDll[@name=\"%s\"]", name.str());
+                if (generatedDlls->hasProp(gDllPath))
+                    continue; // generated dlls handled separately - see above
+            }
+
+            OwnedIFile srcFile = createIFile(srcPath);
+            if (srcFile->exists())
+            {
+                try
+                {
+                    copyFile(dstFile, srcFile);
+                }
+                catch (IException *e)
+                {
+                    VStringBuffer msg("Failed to restore associated file '%s' to destination '%s'", srcFile->queryFilename(), dstFile->queryFilename());
+                    EXCLOG(e, msg.str());
+                    e->Release();
+                }
+            }
+        }
+    }
+    return true;
+}
+
+
 int CWorkUnitFactory::setTracingLevel(int newLevel)
 {
     if (newLevel)
@@ -2513,7 +2625,42 @@ public:
         else
             return NULL;
     }
+    virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
+    {
+        Owned<IPTree> pt(_pt);
+        Owned<IPropertyTree> gprogress = pruneBranch(pt, "GraphProgress[1]");
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        if (!conn)
+        {
+            ERRLOG("restoreWorkUnit could not create to %s", wuRoot.str());
+            return false;
+        }
+        IPropertyTree *root = conn->queryRoot();
+        if (root->hasChildren())
+        {
+            ERRLOG("restoreWorkUnit WUID %s already exists", wuid);
+            return false;
+        }
+        root->setPropTree(NULL, pt.getClear());
+        conn.clear();
 
+        // now kludge back GraphProgress
+        if (gprogress)
+        {
+            VStringBuffer xpath("/GraphProgress/%s", wuid);
+            conn.setown(querySDS().connect(xpath, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
+            if (conn)
+            {
+                IPropertyTree *groot = conn->queryRoot();
+                if (groot->hasChildren())
+                    WARNLOG("restoreWorkUnit WUID %s graphprogress already exists, replacing",wuid);
+                groot->setPropTree(NULL, gprogress.getClear());
+            }
+        }
+        return true;
+    }
     virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer wuRoot;
@@ -2920,6 +3067,10 @@ public:
         if (!secUser) secUser = defaultSecUser.get();
         return baseFactory->updateWorkUnit(wuid, secMgr, secUser);
     }
+    virtual bool restoreWorkUnit(const char *base, const char *wuid)
+    {
+        return baseFactory->restoreWorkUnit(base, wuid);
+    }
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();
@@ -3251,15 +3402,11 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
     exportWorkUnitToXML(this, buf, false, false, true);
 
     StringBuffer extraWorkUnitXML;
-    StringBuffer xpath("/GraphProgress/");
-    xpath.append(wuid);
-    Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-    if (conn)
+    Owned<IPTree> graphProgress = getGraphProgressTree();
+    if (graphProgress)
     {
-        Owned<IPropertyTree> tmp = createPTree("GraphProgress");
-        mergePTree(tmp,conn->queryRoot());
-        toXML(tmp,extraWorkUnitXML,1,XML_Format);
-        conn->close();
+        toXML(graphProgress,extraWorkUnitXML,1,XML_Format);
+        graphProgress.clear();
     }
 
     Owned<IConstWUQuery> q = getQuery();
@@ -3385,138 +3532,6 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
     return true;
 }
 
-IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath)
-{
-    Owned<IPropertyTree> ret;
-    IPropertyTree * branch = from->queryPropTree(xpath);
-    if(branch) {
-        ret.setown(createPTreeFromIPT(branch));
-        from->removeTree(branch);
-    }
-    return ret.getClear();
-}
-
-bool restoreWorkUnit(const char *base,const char *wuid)
-{
-    StringBuffer path(base);
-    if (!wuid||!*wuid)
-        return false;
-    addPathSepChar(path).append(wuid).append(".xml");
-    Owned<IFile> file = createIFile(path.str());
-    if (!file)
-        return false;
-    Owned<IFileIO> fileio = file->open(IFOread);
-    if (!fileio)
-        return false;
-    Owned<IPropertyTree> pt = createPTree(*fileio);
-    if (!pt)
-        return false;
-    CDateTime dt;
-    dt.setNow();
-    StringBuffer dts;
-    dt.getString(dts);
-    pt->setProp("@restoredDate", dts.str());
-    VStringBuffer xpath("/WorkUnits/%s", wuid);
-    Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    if (!conn)
-    {
-        ERRLOG("restoreWorkUnit could not create to %s", xpath.str());
-        return false;
-    }
-    IPropertyTree *root = conn->queryRoot();
-    if (root->hasChildren())
-    {
-        ERRLOG("restoreWorkUnit WUID %s already exists", wuid);
-        return false;
-    }
-    Owned<IPropertyTree> gprogress = pruneBranch(pt, "GraphProgress[1]");
-    Owned<IPropertyTree> generatedDlls = pruneBranch(pt, "GeneratedDlls[1]");
-    Owned<IPropertyTree> associatedFiles;
-    IPropertyTree *srcAssociated = pt->queryPropTree("Query/Associated");
-    if (srcAssociated)
-        associatedFiles.setown(createPTreeFromIPT(srcAssociated));
-    root->setPropTree(NULL, pt.getClear());
-    conn.clear();
-
-    // now kludge back GraphProgress and GeneratedDlls
-    if (gprogress)
-    {
-        VStringBuffer xpath("/GraphProgress/%s", wuid);
-        conn.setown(querySDS().connect(xpath, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
-        if (conn)
-        {
-            IPropertyTree *groot = conn->queryRoot();
-            if (groot->hasChildren())
-                WARNLOG("restoreWorkUnit WUID %s graphprogress already exists, replacing",wuid);
-            groot->setPropTree(NULL, gprogress.getClear());
-        }
-    }
-
-    if (generatedDlls)
-    {
-        Owned<IPropertyTreeIterator> dlls = generatedDlls->getElements("GeneratedDll");
-        for(dlls->first(); dlls->isValid(); dlls->next())
-        {
-            IPropertyTree & dll = dlls->query();
-            char const * name = dll.queryProp("@name");
-            char const * kind = dll.queryProp("@kind");
-            char const * location = dll.queryProp("@location");
-            Owned<IDllEntry> got = queryDllServer().getEntry(name);
-            if (!got)
-            {
-                RemoteFilename dstRfn;
-                dstRfn.setRemotePath(location);
-                StringBuffer srcPath(base);
-                addPathSepChar(srcPath);
-                dstRfn.getTail(srcPath);
-                OwnedIFile srcFile = createIFile(srcPath);
-                OwnedIFile dstFile = createIFile(dstRfn);
-                copyFile(dstFile, srcFile);
-                queryDllServer().registerDll(name, kind, location);
-            }
-        }
-    }
-    if (associatedFiles)
-    {
-        Owned<IPropertyTreeIterator> associated = associatedFiles->getElements("*");
-        ForEach(*associated)
-        {
-            IPropertyTree &file = associated->query();
-            const char *filename = file.queryProp("@filename");
-            SocketEndpoint ep(file.queryProp("@ip"));
-            RemoteFilename rfn;
-            rfn.setPath(ep, filename);
-            OwnedIFile dstFile = createIFile(rfn);
-            StringBuffer srcPath(base), name;
-            addPathSepChar(srcPath);
-            rfn.getTail(name);
-            srcPath.append(name);
-            if (generatedDlls)
-            {
-                VStringBuffer gDllPath("GeneratedDll[@name=\"%s\"]", name.str());
-                if (generatedDlls->hasProp(gDllPath))
-                    continue; // generated dlls handled separately - see above
-            }
-
-            OwnedIFile srcFile = createIFile(srcPath);
-            if (srcFile->exists())
-            {
-                try
-                {
-                    copyFile(dstFile, srcFile);
-                }
-                catch (IException *e)
-                {
-                    VStringBuffer msg("Failed to restore associated file '%s' to destination '%s'", srcFile->queryFilename(), dstFile->queryFilename());
-                    EXCLOG(e, msg.str());
-                    e->Release();
-                }
-            }
-        }
-    }
-    return true;
-}
-
 void CLocalWorkUnit::loadXML(const char *xml)
 {
     CriticalBlock block(crit);

+ 1 - 1
common/workunit/workunit.hpp

@@ -1246,6 +1246,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual IConstWorkUnit * openWorkUnit(const char *wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
+    virtual bool restoreWorkUnit(const char *base, const char *wuid) = 0;
     virtual int setTracingLevel(int newlevel) = 0;
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char * scope, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
@@ -1340,7 +1341,6 @@ extern WORKUNIT_API IStatisticGatherer * createGlobalStatisticGatherer(IWorkUnit
 extern WORKUNIT_API WUGraphType getGraphTypeFromString(const char* type);
 
 extern WORKUNIT_API bool getWorkUnitCreateTime(const char *wuid,CDateTime &time); // based on WUID
-extern WORKUNIT_API bool restoreWorkUnit(const char *base,const char *wuid);
 extern WORKUNIT_API void clientShutdownWorkUnit();
 extern WORKUNIT_API IExtendedWUInterface * queryExtendedWU(IConstWorkUnit * wu);
 extern WORKUNIT_API const IExtendedWUInterface * queryExtendedWU(const IConstWorkUnit * wu);

+ 4 - 1
common/workunit/workunit.ipp

@@ -549,6 +549,7 @@ protected:
     void checkAgentRunning(WUState & state);
 
     // Implemented by derived classes
+    virtual IPropertyTree *getGraphProgressTree() const { return NULL; };
     virtual void unsubscribe() {};
     virtual void _lockRemote() {};
     virtual void _unlockRemote() {};
@@ -659,6 +660,7 @@ public:
     virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     virtual IConstWorkUnit * openWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
+    virtual bool restoreWorkUnit(const char *base, const char *wuid);
     virtual int setTracingLevel(int newlevel);
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *scope, ISecManager *secmgr, ISecUser *secuser);
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr, ISecUser *secuser) = 0;
@@ -692,7 +694,7 @@ protected:
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;
     virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;  // for read access
     virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;  // for write access
-
+    virtual bool _restoreWorkUnit(IPTree *pt, const char *wuid) = 0;
 };
 
 class CLocalWUGraph : public CInterface, implements IConstWUGraph
@@ -778,5 +780,6 @@ public:
 #define PROGRESS_FORMAT_V 2
 
 extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress);
+extern WORKUNIT_API  IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath);
 
 #endif

+ 18 - 15
dali/sasha/saarch.cpp

@@ -665,22 +665,25 @@ static bool doArchiveWorkUnit(IWorkUnitFactory *wufactory,const char *wuid, Stri
 
 static bool doRestoreWorkUnit(IWorkUnitFactory *wufactory,const char *wuid, StringBuffer &res)
 {
-    CriticalBlock block(archivingSect);
-    res.append("RESTORE: ").append(wuid).append(" ");
-    StringBuffer ldspath("Archive/WorkUnits");
-    splitWUIDpath(wuid,ldspath);
-    StringBuffer path;
-    getLdsPath(ldspath.str(),path);
-    try {
-        if (restoreWorkUnit(path.str(),wuid)) {
-            res.append("OK");
-            return true;
+    if (wuid && *wuid)
+    {
+        CriticalBlock block(archivingSect);
+        res.append("RESTORE: ").append(wuid).append(" ");
+        StringBuffer ldspath("Archive/WorkUnits");
+        splitWUIDpath(wuid,ldspath);
+        StringBuffer base;
+        getLdsPath(ldspath.str(), base);
+        try {
+            if (wufactory->restoreWorkUnit(base, wuid)) {
+                res.append("OK");
+                return true;
+            }
+        }
+        catch (IException *e) {
+            e->errorMessage(res);
+            res.append(' ');
+            e->Release();
         }
-    }
-    catch (IException *e) {
-        e->errorMessage(res);
-        res.append(' ');
-        e->Release();
     }
     res.append("FAILED");
     return false;

+ 6 - 9
dali/sasha/saqmon.cpp

@@ -170,7 +170,7 @@ public:
             return false;
 
         // see if can find candidate on another queue
-
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         ForEachItemIn(i1,queues) {
             if (i1!=qi) {
                 IJobQueue &srcq = queues.item(i1);
@@ -180,15 +180,12 @@ public:
                 ForEach(*iter) {
                     const char *wuid = iter->query().queryWUID();
                     if (wuid&&*wuid) {
-                        StringBuffer xpath("/WorkUnits/");
-                        xpath.append(wuid);
-                        Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-                        if (conn) {
-                            Owned<IPropertyTree> ptree=conn->getRoot();
-                            const char *allowedclusters = ptree->queryProp("Debug/allowedclusters");
-                            if (allowedclusters&&*allowedclusters) {
+                        Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
+                        if (wu) {
+                            SCMStringBuffer allowedClusters;
+                            if (wu->getAllowedClusters(allowedClusters).length()) {
                                 StringArray acs;
-                                acs.appendListUniq(allowedclusters, ",");
+                                acs.appendListUniq(allowedClusters.str(), ",");
                                 bool found = true;
                                 ForEachItemIn(i,acs) {
                                     if (strcmp(cnames.item(qi),acs.item(i))==0) 

+ 1 - 1
ecl/wutest/wutest.cpp

@@ -353,7 +353,7 @@ int main(int argc, const char *argv[])
                 if (from.length()==0)
                     from.append('.');
                 const char *wuid = globals->queryProp("WUID");
-                if (restoreWorkUnit(from.str(),wuid))
+                if (factory->restoreWorkUnit(from.str(),wuid))
                     printf("restored %s\n", wuid);
                 else
                     printf("failed to restore %s\n", wuid);

+ 155 - 26
plugins/cassandra/cassandrawu.cpp

@@ -2107,11 +2107,10 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser,  IRemoteConnection *_daliLock)
-        : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock)
+    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
+        : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
     {
         CPersistedWorkUnit::loadPTree(wuXML);
-        allDirty = false;   // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
         memset(childLoaded, 0, sizeof(childLoaded));
         if (daliLock)
             createBatch();
@@ -2535,40 +2534,26 @@ public:
     class CCassandraWuGraphStats : public CWuGraphStats
     {
     public:
-        CCassandraWuGraphStats(const char *_wuid, const ICassandraSession *_sessionCache, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
+        CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
         : CWuGraphStats(createPTree(_rootScope), _creatorType, _creator, _rootScope, _id),
-          wuid(_wuid), sessionCache(_sessionCache)
+          parent(_parent)
         {
         }
         virtual void beforeDispose()
         {
             CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
-            CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
-            statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
-            statement.bindString(1, wuid);
-            statement.bindString(2, progress->queryName());
-            statement.bindInt64(3, id);
-            statement.bindString(4, creator);
-            StringBuffer tag;
-            tag.append("sg").append(id);
-            IPTree *sq = progress->queryPropTree(tag);
-            assertex(sq);
-            StringBuffer xml;
-            toXML(sq, xml);
-            statement.bindString(5, xml);
-            CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
-            future.wait("update stats");
+            parent->setGraphProgress(progress, progress->queryName(), id, creator);
         }
 
     protected:
-        Linked<const ICassandraSession> sessionCache;
+        Linked<const CCassandraWorkUnit> parent;
         StringAttr wuid;
     };
 
 
     IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
     {
-        return new CCassandraWuGraphStats(queryWuid(), sessionCache, creatorType, creator, graphName, subgraph);
+        return new CCassandraWuGraphStats(this, creatorType, creator, graphName, subgraph);
     }
 
 
@@ -2649,6 +2634,98 @@ public:
             checkChildLoaded(**table);
         return p;
     }
+
+    void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator) const
+    {
+        const char *wuid=queryWuid();
+        CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        statement.bindString(2, gid);
+        statement.bindInt64(3, subid);
+        statement.bindString(4, creator);
+        StringBuffer tag;
+        tag.append("sg").append(subid);
+        IPTree *sq = progress->queryPropTree(tag);
+        assertex(sq);
+        StringBuffer xml;
+        toXML(sq, xml);
+        statement.bindString(5, xml);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("update stats");
+    }
+    virtual IPropertyTree *getGraphProgressTree() const
+    {
+        CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
+        const char *wuid = queryWuid();
+        graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        graphQuery.bindString(1, wuid);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
+        future.wait("getGraphProgress");
+        CassandraResult result(cass_future_get_result(future));
+        if (!cass_result_row_count(result))
+            return NULL;
+        Owned<IPTree> progress = createPTree("GraphProgress");
+        CassandraIterator rows(cass_iterator_from_result(result));
+        while (cass_iterator_next(rows))
+        {
+            const CassRow *row = cass_iterator_get_row(rows);
+            StringBuffer graphName, creator, xml;
+            getCassString(graphName, cass_row_get_column(row, 0));
+            WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+            getCassString(creator, cass_row_get_column(row, 2));
+            getCassString(xml, cass_row_get_column(row, 3));
+            if (!progress->hasProp(graphName))
+                progress->setPropTree(graphName, createPTree(graphName));
+            IPTree *graph = progress->queryPropTree(graphName);
+            graph->setPropBool("@stats", true);
+            graph->setPropInt("@format", PROGRESS_FORMAT_V);
+            IPTree *stats = createPTreeFromXMLString(xml);
+            // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
+            graph->addPropTree(stats->queryName(), stats);
+        }
+        // Now fill in the graph/node states
+        CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
+        stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        stateQuery.bindString(1, wuid);
+        CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
+        stateFuture.wait("getGraphStateProgress");
+        CassandraResult stateResult(cass_future_get_result(stateFuture));
+        CassandraIterator stateRows(cass_iterator_from_result(stateResult));
+        if (cass_result_row_count(stateResult))
+        {
+            CassandraIterator stateRows(cass_iterator_from_result(stateResult));
+            while (cass_iterator_next(stateRows))
+            {
+                const CassRow *row = cass_iterator_get_row(stateRows);
+                StringBuffer graphName;
+                getCassString(graphName, cass_row_get_column(row, 0));
+                WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+                unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
+                IPTree *node = progress->queryPropTree(graphName);
+                if (node)
+                {
+                    if (subId)
+                    {
+                        // This is what you might expect it to say...
+                        //StringBuffer sg("sg");
+                        //sg.append(subId);
+                        //node = node->queryPropTree(sg);
+                        // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
+                        StringBuffer xpath("node[@id='");
+                        xpath.append(subId).append("'])");
+                        node->removeProp(xpath);  // Shouldn't be one, just playing safe
+                        node = node->addPropTree("node", createPTree("node"));
+                        node->setPropInt("@id", subId);
+                        node->setPropInt("@_state", state);
+                    }
+                    else
+                        node->setPropInt("@_state", state);
+                }
+            }
+        }
+        return progress.getClear();
+    }
 protected:
     void createBatch()
     {
@@ -2952,7 +3029,7 @@ public:
                 wuXML->setPropInt("@wuidVersion", WUID_VERSION);  // we implement the latest version.
                 Owned<IRemoteConnection> daliLock;
                 lockWuid(daliLock, useWuid);
-                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
+                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
                 return wu.getClear();
             }
             suffix = rand_r(&randState);
@@ -2964,7 +3041,7 @@ public:
     {
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
         if (wuXML)
-            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL);
+            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
         else
             return NULL;
     }
@@ -2974,9 +3051,60 @@ public:
         Owned<IRemoteConnection> daliLock;
         lockWuid(daliLock, wuid);
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
-        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
+        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
         return wu.getClear();
     }
+    virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
+    {
+        Owned<IPTree> pt(_pt);
+        try
+        {
+            Owned<IRemoteConnection> daliLock;
+            lockWuid(daliLock, wuid);
+            Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
+            Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
+            if (gProgress)
+            {
+                Owned<IPTreeIterator> graphs = gProgress->getElements("*");
+                ForEach(*graphs)
+                {
+                    IPTree &graph = graphs->query();
+                    const char *graphName = graph.queryName();
+                    Owned<IPTreeIterator> subs = graph.getElements("*");
+                    ForEach(*subs)
+                    {
+                        IPTree &sub = subs->query();
+                        const char *name=sub.queryName();
+                        if (name[0]=='s' && name[1]=='g')
+                        {
+                            wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"));
+                        }
+                        else if (streq(name, "node"))
+                        {
+                            unsigned subid = sub.getPropInt("@id");
+                            if (subid)
+                            {
+                                if (sub.hasChildren()) // Old format
+                                    wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"));
+                                if (sub.hasProp("@_state"))
+                                    wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
+                            }
+                        }
+                    }
+                    if (graph.hasProp("@_state"))
+                        wu->setGraphState(graphName, (WUGraphState) graph.getPropInt("@_state"));
+                }
+            }
+            wu->commit();
+            return true;
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            ::Release(E);
+            return false;
+        }
+    }
 
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
     {
@@ -2984,7 +3112,7 @@ public:
         Owned<IRemoteConnection> daliLock;
         lockWuid(daliLock, GLOBAL_WORKUNIT);
         Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
-        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear());
+        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
         return &wu->lockRemote(false);
     }
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
@@ -3419,6 +3547,7 @@ public:
 
     virtual void createRepository()
     {
+        cluster.disconnect();
         CassandraSession s(cass_session_new());
         CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
         future.wait("connect without keyspace");