浏览代码

HPCC-14031 Sasha accesses dali workunit tree directly

Also includes fixes for archiveWorkunit and restoreWorkUnit methods which were
being used by sasha (and which accessed dali directly for at least some of the
information being archived/restored).

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
10b433cfdf

+ 154 - 140
common/workunit/workunit.cpp

@@ -923,6 +923,7 @@ public:
             if (progress)
             {
                 StringBuffer path;
+                // MORE I don't think this works! Gavin changed the format... Jake?
                 path.append("node[@id=\"").append(nodeId).append("\"]/@_state");
                 return (WUGraphState) progress->getPropInt(path, (unsigned) WUGraphUnknown);
             }
@@ -1042,6 +1043,7 @@ public:
         VStringBuffer path("/GraphProgress/%s", queryWuid());
         Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
         IPTree *progress = ensurePTree(conn->queryRoot(), graphName);
+        // Actually, maybe it does work, but by luck?
         path.clear().append("node[@id=\"").append(nodeId).append("\"]");
         IPropertyTree *node = progress->queryPropTree(path.str());
         if (!node)
@@ -1087,6 +1089,17 @@ protected:
         VStringBuffer path("/GraphProgress/%s/%s", queryWuid(), graphName);
         return querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
     }
+    IPropertyTree *getGraphProgressTree() const
+    {
+        IRemoteConnection *conn = queryProgressConnection();
+        if (conn)
+        {
+            Owned<IPropertyTree> tmp = createPTree("GraphProgress");
+            mergePTree(tmp,conn->queryRoot());
+            return tmp.getClear();
+        }
+        return NULL;
+    }
     Owned<IRemoteConnection> connection;
     mutable Owned<IRemoteConnection> progressConnection;
 };
@@ -2152,6 +2165,104 @@ IWorkUnit* CWorkUnitFactory::updateWorkUnit(const char *wuid, ISecManager *secmg
     }
 }
 
+IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath)
+{
+    Owned<IPropertyTree> ret;
+    IPropertyTree * branch = from->queryPropTree(xpath);
+    if(branch) {
+        ret.setown(createPTreeFromIPT(branch));
+        from->removeTree(branch);
+    }
+    return ret.getClear();
+}
+
+bool CWorkUnitFactory::restoreWorkUnit(const char *base, const char *wuid)
+{
+    StringBuffer path(base);
+    addPathSepChar(path).append(wuid).append(".xml");
+    Owned<IPTree> pt = createPTreeFromXMLFile(path);
+    if (!pt)
+        return false;
+    CDateTime dt;
+    dt.setNow();
+    StringBuffer dts;
+    dt.getString(dts);
+    pt->setProp("@restoredDate", dts.str());
+    Owned<IPropertyTree> generatedDlls = pruneBranch(pt, "GeneratedDlls[1]");
+    Owned<IPropertyTree> associatedFiles;
+    IPropertyTree *srcAssociated = pt->queryPropTree("Query/Associated");
+    if (srcAssociated)
+        associatedFiles.setown(createPTreeFromIPT(srcAssociated));
+    // The updating of the repo is implementation specific...
+    if (!_restoreWorkUnit(pt.getClear(), wuid))
+        return false;
+    // now kludge back GeneratedDlls
+    if (generatedDlls)
+    {
+        Owned<IPropertyTreeIterator> dlls = generatedDlls->getElements("GeneratedDll");
+        for(dlls->first(); dlls->isValid(); dlls->next())
+        {
+            IPropertyTree & dll = dlls->query();
+            char const * name = dll.queryProp("@name");
+            char const * kind = dll.queryProp("@kind");
+            char const * location = dll.queryProp("@location");
+            Owned<IDllEntry> got = queryDllServer().getEntry(name);
+            if (!got)
+            {
+                RemoteFilename dstRfn;
+                dstRfn.setRemotePath(location);
+                StringBuffer srcPath(base);
+                addPathSepChar(srcPath);
+                dstRfn.getTail(srcPath);
+                OwnedIFile srcFile = createIFile(srcPath);
+                OwnedIFile dstFile = createIFile(dstRfn);
+                copyFile(dstFile, srcFile);
+                queryDllServer().registerDll(name, kind, location);
+            }
+        }
+    }
+    if (associatedFiles)
+    {
+        Owned<IPropertyTreeIterator> associated = associatedFiles->getElements("*");
+        ForEach(*associated)
+        {
+            IPropertyTree &file = associated->query();
+            const char *filename = file.queryProp("@filename");
+            SocketEndpoint ep(file.queryProp("@ip"));
+            RemoteFilename rfn;
+            rfn.setPath(ep, filename);
+            OwnedIFile dstFile = createIFile(rfn);
+            StringBuffer srcPath(base), name;
+            addPathSepChar(srcPath);
+            rfn.getTail(name);
+            srcPath.append(name);
+            if (generatedDlls)
+            {
+                VStringBuffer gDllPath("GeneratedDll[@name=\"%s\"]", name.str());
+                if (generatedDlls->hasProp(gDllPath))
+                    continue; // generated dlls handled separately - see above
+            }
+
+            OwnedIFile srcFile = createIFile(srcPath);
+            if (srcFile->exists())
+            {
+                try
+                {
+                    copyFile(dstFile, srcFile);
+                }
+                catch (IException *e)
+                {
+                    VStringBuffer msg("Failed to restore associated file '%s' to destination '%s'", srcFile->queryFilename(), dstFile->queryFilename());
+                    EXCLOG(e, msg.str());
+                    e->Release();
+                }
+            }
+        }
+    }
+    return true;
+}
+
+
 int CWorkUnitFactory::setTracingLevel(int newLevel)
 {
     if (newLevel)
@@ -2495,7 +2606,42 @@ public:
         else
             return NULL;
     }
+    virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
+    {
+        Owned<IPTree> pt(_pt);
+        Owned<IPropertyTree> gprogress = pruneBranch(pt, "GraphProgress[1]");
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        if (!conn)
+        {
+            ERRLOG("restoreWorkUnit could not create to %s", wuRoot.str());
+            return false;
+        }
+        IPropertyTree *root = conn->queryRoot();
+        if (root->hasChildren())
+        {
+            ERRLOG("restoreWorkUnit WUID %s already exists", wuid);
+            return false;
+        }
+        root->setPropTree(NULL, pt.getClear());
+        conn.clear();
 
+        // now kludge back GraphProgress
+        if (gprogress)
+        {
+            VStringBuffer xpath("/GraphProgress/%s", wuid);
+            conn.setown(querySDS().connect(xpath, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
+            if (conn)
+            {
+                IPropertyTree *groot = conn->queryRoot();
+                if (groot->hasChildren())
+                    WARNLOG("restoreWorkUnit WUID %s graphprogress already exists, replacing",wuid);
+                groot->setPropTree(NULL, gprogress.getClear());
+            }
+        }
+        return true;
+    }
     virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer wuRoot;
@@ -2902,6 +3048,10 @@ public:
         if (!secUser) secUser = defaultSecUser.get();
         return baseFactory->updateWorkUnit(wuid, secMgr, secUser);
     }
+    virtual bool restoreWorkUnit(const char *base, const char *wuid)
+    {
+        return baseFactory->restoreWorkUnit(base, wuid);
+    }
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();
@@ -3233,15 +3383,11 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
     exportWorkUnitToXML(this, buf, false, false, true);
 
     StringBuffer extraWorkUnitXML;
-    StringBuffer xpath("/GraphProgress/");
-    xpath.append(wuid);
-    Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-    if (conn)
+    Owned<IPTree> graphProgress = getGraphProgressTree();
+    if (graphProgress)
     {
-        Owned<IPropertyTree> tmp = createPTree("GraphProgress");
-        mergePTree(tmp,conn->queryRoot());
-        toXML(tmp,extraWorkUnitXML,1,XML_Format);
-        conn->close();
+        toXML(graphProgress,extraWorkUnitXML,1,XML_Format);
+        graphProgress.clear();
     }
 
     Owned<IConstWUQuery> q = getQuery();
@@ -3367,138 +3513,6 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
     return true;
 }
 
-IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath)
-{
-    Owned<IPropertyTree> ret;
-    IPropertyTree * branch = from->queryPropTree(xpath);
-    if(branch) {
-        ret.setown(createPTreeFromIPT(branch));
-        from->removeTree(branch);
-    }
-    return ret.getClear();
-}
-
-bool restoreWorkUnit(const char *base,const char *wuid)
-{
-    StringBuffer path(base);
-    if (!wuid||!*wuid)
-        return false;
-    addPathSepChar(path).append(wuid).append(".xml");
-    Owned<IFile> file = createIFile(path.str());
-    if (!file)
-        return false;
-    Owned<IFileIO> fileio = file->open(IFOread);
-    if (!fileio)
-        return false;
-    Owned<IPropertyTree> pt = createPTree(*fileio);
-    if (!pt)
-        return false;
-    CDateTime dt;
-    dt.setNow();
-    StringBuffer dts;
-    dt.getString(dts);
-    pt->setProp("@restoredDate", dts.str());
-    VStringBuffer xpath("/WorkUnits/%s", wuid);
-    Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    if (!conn)
-    {
-        ERRLOG("restoreWorkUnit could not create to %s", xpath.str());
-        return false;
-    }
-    IPropertyTree *root = conn->queryRoot();
-    if (root->hasChildren())
-    {
-        ERRLOG("restoreWorkUnit WUID %s already exists", wuid);
-        return false;
-    }
-    Owned<IPropertyTree> gprogress = pruneBranch(pt, "GraphProgress[1]");
-    Owned<IPropertyTree> generatedDlls = pruneBranch(pt, "GeneratedDlls[1]");
-    Owned<IPropertyTree> associatedFiles;
-    IPropertyTree *srcAssociated = pt->queryPropTree("Query/Associated");
-    if (srcAssociated)
-        associatedFiles.setown(createPTreeFromIPT(srcAssociated));
-    root->setPropTree(NULL, pt.getClear());
-    conn.clear();
-
-    // now kludge back GraphProgress and GeneratedDlls
-    if (gprogress)
-    {
-        VStringBuffer xpath("/GraphProgress/%s", wuid);
-        conn.setown(querySDS().connect(xpath, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
-        if (conn)
-        {
-            IPropertyTree *groot = conn->queryRoot();
-            if (groot->hasChildren())
-                WARNLOG("restoreWorkUnit WUID %s graphprogress already exists, replacing",wuid);
-            groot->setPropTree(NULL, gprogress.getClear());
-        }
-    }
-
-    if (generatedDlls)
-    {
-        Owned<IPropertyTreeIterator> dlls = generatedDlls->getElements("GeneratedDll");
-        for(dlls->first(); dlls->isValid(); dlls->next())
-        {
-            IPropertyTree & dll = dlls->query();
-            char const * name = dll.queryProp("@name");
-            char const * kind = dll.queryProp("@kind");
-            char const * location = dll.queryProp("@location");
-            Owned<IDllEntry> got = queryDllServer().getEntry(name);
-            if (!got)
-            {
-                RemoteFilename dstRfn;
-                dstRfn.setRemotePath(location);
-                StringBuffer srcPath(base);
-                addPathSepChar(srcPath);
-                dstRfn.getTail(srcPath);
-                OwnedIFile srcFile = createIFile(srcPath);
-                OwnedIFile dstFile = createIFile(dstRfn);
-                copyFile(dstFile, srcFile);
-                queryDllServer().registerDll(name, kind, location);
-            }
-        }
-    }
-    if (associatedFiles)
-    {
-        Owned<IPropertyTreeIterator> associated = associatedFiles->getElements("*");
-        ForEach(*associated)
-        {
-            IPropertyTree &file = associated->query();
-            const char *filename = file.queryProp("@filename");
-            SocketEndpoint ep(file.queryProp("@ip"));
-            RemoteFilename rfn;
-            rfn.setPath(ep, filename);
-            OwnedIFile dstFile = createIFile(rfn);
-            StringBuffer srcPath(base), name;
-            addPathSepChar(srcPath);
-            rfn.getTail(name);
-            srcPath.append(name);
-            if (generatedDlls)
-            {
-                VStringBuffer gDllPath("GeneratedDll[@name=\"%s\"]", name.str());
-                if (generatedDlls->hasProp(gDllPath))
-                    continue; // generated dlls handled separately - see above
-            }
-
-            OwnedIFile srcFile = createIFile(srcPath);
-            if (srcFile->exists())
-            {
-                try
-                {
-                    copyFile(dstFile, srcFile);
-                }
-                catch (IException *e)
-                {
-                    VStringBuffer msg("Failed to restore associated file '%s' to destination '%s'", srcFile->queryFilename(), dstFile->queryFilename());
-                    EXCLOG(e, msg.str());
-                    e->Release();
-                }
-            }
-        }
-    }
-    return true;
-}
-
 void CLocalWorkUnit::loadXML(const char *xml)
 {
     CriticalBlock block(crit);

+ 1 - 1
common/workunit/workunit.hpp

@@ -1244,6 +1244,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual IConstWorkUnit * openWorkUnit(const char *wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
+    virtual bool restoreWorkUnit(const char *base, const char *wuid) = 0;
     virtual int setTracingLevel(int newlevel) = 0;
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char * scope, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
@@ -1338,7 +1339,6 @@ extern WORKUNIT_API IStatisticGatherer * createGlobalStatisticGatherer(IWorkUnit
 extern WORKUNIT_API WUGraphType getGraphTypeFromString(const char* type);
 
 extern WORKUNIT_API bool getWorkUnitCreateTime(const char *wuid,CDateTime &time); // based on WUID
-extern WORKUNIT_API bool restoreWorkUnit(const char *base,const char *wuid);
 extern WORKUNIT_API void clientShutdownWorkUnit();
 extern WORKUNIT_API IExtendedWUInterface * queryExtendedWU(IConstWorkUnit * wu);
 extern WORKUNIT_API const IExtendedWUInterface * queryExtendedWU(const IConstWorkUnit * wu);

+ 4 - 1
common/workunit/workunit.ipp

@@ -548,6 +548,7 @@ protected:
     void checkAgentRunning(WUState & state);
 
     // Implemented by derived classes
+    virtual IPropertyTree *getGraphProgressTree() const { return NULL; };
     virtual void unsubscribe() {};
     virtual void _lockRemote() {};
     virtual void _unlockRemote() {};
@@ -657,6 +658,7 @@ public:
     virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     virtual IConstWorkUnit * openWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
+    virtual bool restoreWorkUnit(const char *base, const char *wuid);
     virtual int setTracingLevel(int newlevel);
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *scope, ISecManager *secmgr, ISecUser *secuser);
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr, ISecUser *secuser) = 0;
@@ -690,7 +692,7 @@ protected:
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;
     virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;  // for read access
     virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;  // for write access
-
+    virtual bool _restoreWorkUnit(IPTree *pt, const char *wuid) = 0;
 };
 
 class CLocalWUGraph : public CInterface, implements IConstWUGraph
@@ -776,5 +778,6 @@ public:
 #define PROGRESS_FORMAT_V 2
 
 extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress);
+extern WORKUNIT_API  IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath);
 
 #endif

+ 18 - 15
dali/sasha/saarch.cpp

@@ -665,22 +665,25 @@ static bool doArchiveWorkUnit(IWorkUnitFactory *wufactory,const char *wuid, Stri
 
 static bool doRestoreWorkUnit(IWorkUnitFactory *wufactory,const char *wuid, StringBuffer &res)
 {
-    CriticalBlock block(archivingSect);
-    res.append("RESTORE: ").append(wuid).append(" ");
-    StringBuffer ldspath("Archive/WorkUnits");
-    splitWUIDpath(wuid,ldspath);
-    StringBuffer path;
-    getLdsPath(ldspath.str(),path);
-    try {
-        if (restoreWorkUnit(path.str(),wuid)) {
-            res.append("OK");
-            return true;
+    if (wuid && *wuid)
+    {
+        CriticalBlock block(archivingSect);
+        res.append("RESTORE: ").append(wuid).append(" ");
+        StringBuffer ldspath("Archive/WorkUnits");
+        splitWUIDpath(wuid,ldspath);
+        StringBuffer base;
+        getLdsPath(ldspath.str(), base);
+        try {
+            if (wufactory->restoreWorkUnit(base, wuid)) {
+                res.append("OK");
+                return true;
+            }
+        }
+        catch (IException *e) {
+            e->errorMessage(res);
+            res.append(' ');
+            e->Release();
         }
-    }
-    catch (IException *e) {
-        e->errorMessage(res);
-        res.append(' ');
-        e->Release();
     }
     res.append("FAILED");
     return false;

+ 6 - 9
dali/sasha/saqmon.cpp

@@ -170,7 +170,7 @@ public:
             return false;
 
         // see if can find candidate on another queue
-
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         ForEachItemIn(i1,queues) {
             if (i1!=qi) {
                 IJobQueue &srcq = queues.item(i1);
@@ -180,15 +180,12 @@ public:
                 ForEach(*iter) {
                     const char *wuid = iter->query().queryWUID();
                     if (wuid&&*wuid) {
-                        StringBuffer xpath("/WorkUnits/");
-                        xpath.append(wuid);
-                        Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-                        if (conn) {
-                            Owned<IPropertyTree> ptree=conn->getRoot();
-                            const char *allowedclusters = ptree->queryProp("Debug/allowedclusters");
-                            if (allowedclusters&&*allowedclusters) {
+                        Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
+                        if (wu) {
+                            SCMStringBuffer allowedClusters;
+                            if (wu->getAllowedClusters(allowedClusters).length()) {
                                 StringArray acs;
-                                acs.appendListUniq(allowedclusters, ",");
+                                acs.appendListUniq(allowedClusters.str(), ",");
                                 bool found = true;
                                 ForEachItemIn(i,acs) {
                                     if (strcmp(cnames.item(qi),acs.item(i))==0) 

+ 1 - 1
ecl/wutest/wutest.cpp

@@ -351,7 +351,7 @@ int main(int argc, const char *argv[])
                 if (from.length()==0)
                     from.append('.');
                 const char *wuid = globals->queryProp("WUID");
-                if (restoreWorkUnit(from.str(),wuid))
+                if (factory->restoreWorkUnit(from.str(),wuid))
                     printf("restored %s\n", wuid);
                 else
                     printf("failed to restore %s\n", wuid);

+ 155 - 26
plugins/cassandra/cassandrawu.cpp

@@ -2053,11 +2053,10 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser,  IRemoteConnection *_daliLock)
-        : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock)
+    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
+        : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
     {
         CPersistedWorkUnit::loadPTree(wuXML);
-        allDirty = false;   // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
         memset(childLoaded, 0, sizeof(childLoaded));
         if (daliLock)
             createBatch();
@@ -2454,40 +2453,26 @@ public:
     class CCassandraWuGraphStats : public CWuGraphStats
     {
     public:
-        CCassandraWuGraphStats(const char *_wuid, const ICassandraSession *_sessionCache, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
+        CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
         : CWuGraphStats(createPTree(_rootScope), _creatorType, _creator, _rootScope, _id),
-          wuid(_wuid), sessionCache(_sessionCache)
+          parent(_parent)
         {
         }
         virtual void beforeDispose()
         {
             CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
-            CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
-            statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
-            statement.bindString(1, wuid);
-            statement.bindString(2, progress->queryName());
-            statement.bindInt64(3, id);
-            statement.bindString(4, creator);
-            StringBuffer tag;
-            tag.append("sg").append(id);
-            IPTree *sq = progress->queryPropTree(tag);
-            assertex(sq);
-            StringBuffer xml;
-            toXML(sq, xml);
-            statement.bindString(5, xml);
-            CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
-            future.wait("update stats");
+            parent->setGraphProgress(progress, progress->queryName(), id, creator);
         }
 
     protected:
-        Linked<const ICassandraSession> sessionCache;
+        Linked<const CCassandraWorkUnit> parent;
         StringAttr wuid;
     };
 
 
     IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
     {
-        return new CCassandraWuGraphStats(queryWuid(), sessionCache, creatorType, creator, graphName, subgraph);
+        return new CCassandraWuGraphStats(this, creatorType, creator, graphName, subgraph);
     }
 
 
@@ -2562,6 +2547,98 @@ public:
             checkChildLoaded(**table);
         return p;
     }
+
+    void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator) const
+    {
+        const char *wuid=queryWuid();
+        CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        statement.bindString(2, gid);
+        statement.bindInt64(3, subid);
+        statement.bindString(4, creator);
+        StringBuffer tag;
+        tag.append("sg").append(subid);
+        IPTree *sq = progress->queryPropTree(tag);
+        assertex(sq);
+        StringBuffer xml;
+        toXML(sq, xml);
+        statement.bindString(5, xml);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
+        future.wait("update stats");
+    }
+    virtual IPropertyTree *getGraphProgressTree() const
+    {
+        CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
+        const char *wuid = queryWuid();
+        graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        graphQuery.bindString(1, wuid);
+        CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
+        future.wait("getGraphProgress");
+        CassandraResult result(cass_future_get_result(future));
+        if (!cass_result_row_count(result))
+            return NULL;
+        Owned<IPTree> progress = createPTree("GraphProgress");
+        CassandraIterator rows(cass_iterator_from_result(result));
+        while (cass_iterator_next(rows))
+        {
+            const CassRow *row = cass_iterator_get_row(rows);
+            StringBuffer graphName, creator, xml;
+            getCassString(graphName, cass_row_get_column(row, 0));
+            WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+            getCassString(creator, cass_row_get_column(row, 2));
+            getCassString(xml, cass_row_get_column(row, 3));
+            if (!progress->hasProp(graphName))
+                progress->setPropTree(graphName, createPTree(graphName));
+            IPTree *graph = progress->queryPropTree(graphName);
+            graph->setPropBool("@stats", true);
+            graph->setPropInt("@format", PROGRESS_FORMAT_V);
+            IPTree *stats = createPTreeFromXMLString(xml);
+            // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
+            graph->addPropTree(stats->queryName(), stats);
+        }
+        // Now fill in the graph/node states
+        CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
+        stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        stateQuery.bindString(1, wuid);
+        CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
+        stateFuture.wait("getGraphStateProgress");
+        CassandraResult stateResult(cass_future_get_result(stateFuture));
+        CassandraIterator stateRows(cass_iterator_from_result(stateResult));
+        if (cass_result_row_count(stateResult))
+        {
+            CassandraIterator stateRows(cass_iterator_from_result(stateResult));
+            while (cass_iterator_next(stateRows))
+            {
+                const CassRow *row = cass_iterator_get_row(stateRows);
+                StringBuffer graphName;
+                getCassString(graphName, cass_row_get_column(row, 0));
+                WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+                unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
+                IPTree *node = progress->queryPropTree(graphName);
+                if (node)
+                {
+                    if (subId)
+                    {
+                        // This is what you might expect it to say...
+                        //StringBuffer sg("sg");
+                        //sg.append(subId);
+                        //node = node->queryPropTree(sg);
+                        // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
+                        StringBuffer xpath("node[@id='");
+                        xpath.append(subId).append("'])");
+                        node->removeProp(xpath);  // Shouldn't be one, just playing safe
+                        node = node->addPropTree("node", createPTree("node"));
+                        node->setPropInt("@id", subId);
+                        node->setPropInt("@_state", state);
+                    }
+                    else
+                        node->setPropInt("@_state", state);
+                }
+            }
+        }
+        return progress.getClear();
+    }
 protected:
     void createBatch()
     {
@@ -2834,7 +2911,7 @@ public:
                 wuXML->setPropInt("@wuidVersion", WUID_VERSION);  // we implement the latest version.
                 Owned<IRemoteConnection> daliLock;
                 lockWuid(daliLock, useWuid);
-                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
+                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
                 return wu.getClear();
             }
             suffix = rand_r(&randState);
@@ -2846,7 +2923,7 @@ public:
     {
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
         if (wuXML)
-            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL);
+            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
         else
             return NULL;
     }
@@ -2856,9 +2933,60 @@ public:
         Owned<IRemoteConnection> daliLock;
         lockWuid(daliLock, wuid);
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
-        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear());
+        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
         return wu.getClear();
     }
+    virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
+    {
+        Owned<IPTree> pt(_pt);
+        try
+        {
+            Owned<IRemoteConnection> daliLock;
+            lockWuid(daliLock, wuid);
+            Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
+            Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
+            if (gProgress)
+            {
+                Owned<IPTreeIterator> graphs = gProgress->getElements("*");
+                ForEach(*graphs)
+                {
+                    IPTree &graph = graphs->query();
+                    const char *graphName = graph.queryName();
+                    Owned<IPTreeIterator> subs = graph.getElements("*");
+                    ForEach(*subs)
+                    {
+                        IPTree &sub = subs->query();
+                        const char *name=sub.queryName();
+                        if (name[0]=='s' && name[1]=='g')
+                        {
+                            wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"));
+                        }
+                        else if (streq(name, "node"))
+                        {
+                            unsigned subid = sub.getPropInt("@id");
+                            if (subid)
+                            {
+                                if (sub.hasChildren()) // Old format
+                                    wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"));
+                                if (sub.hasProp("@_state"))
+                                    wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
+                            }
+                        }
+                    }
+                    if (graph.hasProp("@_state"))
+                        wu->setGraphState(graphName, (WUGraphState) graph.getPropInt("@_state"));
+                }
+            }
+            wu->commit();
+            return true;
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            ::Release(E);
+            return false;
+        }
+    }
 
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
     {
@@ -2866,7 +2994,7 @@ public:
         Owned<IRemoteConnection> daliLock;
         lockWuid(daliLock, GLOBAL_WORKUNIT);
         Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
-        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear());
+        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
         return &wu->lockRemote(false);
     }
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
@@ -3296,6 +3424,7 @@ public:
 
     virtual void createRepository()
     {
+        cluster.disconnect();
         CassandraSession s(cass_session_new());
         CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
         future.wait("connect without keyspace");