Parcourir la source

HPCC-22136 Import WU from ZAP report

Revise based on review:
1. Call removeFileTraceIfFail() to remove uploaded ZAP report;
2. Throw an exception if the Import folder exists before created;
3. Call invoke_program() to run the unzip command for ZAP report;
4. Track the IPT's for removing the unused file tree nodes;
5. Get the importDateTime when using it;
6. Combine the zapReportFilePath with the zapReportFileName.

Revise based on review:
1. Fix 2 typos;
2. Break the line 5195 in workunit.cpp to 2 lines;
3. Use the GetCachedHostName() to replace the queryHostIP().

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx il y a 6 ans
Parent
commit
cd5f7588eb

+ 181 - 0
common/workunit/workunit.cpp

@@ -3829,6 +3829,21 @@ public:
     {
         return new CDaliWuGraphStats(getWritableProgressConnection(graphName, _wfid), creatorType, creator, _wfid, graphName, subgraph);
     }
+    virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
+    {
+        connection->queryRoot()->setPropTree(nullptr, LINK(wuTree));
+        loadPTree(connection->getRoot());
+
+        if (!graphProgressTree)
+            return;
+
+        VStringBuffer xpath("/GraphProgress/%s", queryWuid());
+        Owned<IRemoteConnection> progressConn = querySDS().connect(xpath, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
+        if (!progressConn)
+            throw MakeStringException(0, "Failed to access %s.", xpath.str());
+
+        progressConn->queryRoot()->setPropTree(nullptr, LINK(graphProgressTree));
+    }
 
 protected:
     IRemoteConnection *getProgressConnection() const
@@ -4118,6 +4133,8 @@ public:
             { return c->getAbortBy(str); }
     virtual unsigned __int64 getAbortTimeStamp() const
             { return c->getAbortTimeStamp(); }
+    virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
+            { return c->import(wuTree, graphProgressTree); }
 
 
     virtual void clearExceptions(const char *source=nullptr)
@@ -5113,6 +5130,165 @@ bool CWorkUnitFactory::restoreWorkUnit(const char *base, const char *wuid, bool
     return true;
 }
 
+void CWorkUnitFactory::importWorkUnit(const char *zapReportFileName, const char *zapReportPassword,
+    const char *importDir, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser)
+{
+    class CImportWorkUnitHelper
+    {
+        StringAttr zapReportFileName, zapReportPassword, importDir, user, wuid, fromWUID, scope;
+        StringBuffer unzipDir;
+        Owned<IPTree> wuTree, graphProgressTree;
+
+        bool findZAPFile(const char *mask, bool optional, StringBuffer &fileName)
+        {
+            Owned<IFile> d = createIFile(unzipDir);
+            if (!d->exists())
+                throw MakeStringException(WUERR_InvalidUserInput, "%s not found.", unzipDir.str());
+
+            Owned<IDirectoryIterator> di = d->directoryFiles(mask, false, false);
+            if (!di->first())
+            {
+                if (optional)
+                    return false;
+                throw MakeStringException(WUERR_InvalidUserInput, "Failed to find %s in %s.", mask, unzipDir.str());
+            }
+
+            fileName.set(unzipDir).append(PATHSEPSTR);
+            di->getName(fileName);
+            if (di->next())
+                throw MakeStringException(WUERR_InvalidUserInput, "More than 1 %s files found in %s.", mask, unzipDir.str());
+            return true;
+        }
+        IPTree *readWUXMLFile(const char *pattern, bool optional)
+        {
+            StringBuffer fileName;
+            if (!findZAPFile(pattern, optional, fileName))
+                return nullptr;
+
+            Owned<IPTree> tree = createPTreeFromXMLFile(fileName);
+            if (!tree)
+                throw MakeStringException(WUERR_InvalidUserInput, "Failed to retrieve %s.", pattern);
+
+            Owned<IFile> f = createIFile(fileName);
+            f->remove();
+            return tree.getClear();
+        }
+        void updateWUQueryAssociatedFilesAttrs(const char *localIP)
+        {
+            ICopyArrayOf<IPropertyTree> fileTreesToRemove;
+            IPropertyTree *queryAssociatedTreeRoot = wuTree->queryPropTree("Query/Associated");
+            Owned<IPropertyTreeIterator> itr = queryAssociatedTreeRoot->getElements("File");
+            ForEach (*itr)
+            {
+                IPropertyTree &fileTree = itr->query();
+                const char *fileName = fileTree.queryProp("@filename");
+
+                StringBuffer fileNameWithPath(unzipDir);
+                fileNameWithPath.append(PATHSEPSTR).append(pathTail(fileName));
+                if (checkFileExists(fileNameWithPath)) //Check if the ZAP report contains this file.
+                {
+                    fileTree.setProp("@ip", localIP);
+                    fileTree.setProp("@filename", fileNameWithPath);
+                }
+                else
+                    fileTreesToRemove.append(fileTree);
+            }
+            ForEachItemIn(r, fileTreesToRemove)
+                queryAssociatedTreeRoot->removeTree(&fileTreesToRemove.item(r));
+        }
+        void getWUAttributes(IWorkUnit *workunit)
+        {
+            wuid.set(workunit->queryWuid());
+            scope.set(workunit->queryWuScope());
+        }
+        IPTree *queryWUPTree() const
+        {
+            return wuTree;
+        }
+        IPTree *queryGraphProgressPTree() const
+        {
+            return graphProgressTree;
+        }
+        void setUNZIPDir()
+        {   //Set a unique unzip folder inside the component's data folder
+            unzipDir.append(importDir).append(PATHSEPSTR).append(wuid.get());
+        }
+        void unzipZAPReport()
+        {
+            Owned<IFile> unzipFolder = createIFile(unzipDir);
+            if (!unzipFolder->exists())
+                unzipFolder->createDirectory();
+            else
+            {//This should not happen. Just in case.
+                throw MakeStringException(WUERR_CannotImportWorkunit, "Workunit %s had been created twice and passed to import()..", wuid.get());
+            }
+
+            //Unzip ZAP Report
+            VStringBuffer zipCommand("unzip %s -d %s", zapReportFileName.get(), unzipDir.str());
+            if (!isEmptyString(zapReportPassword))
+                zipCommand.appendf(" -P %s", zapReportPassword.get());
+            DWORD runcode;
+            bool success = invoke_program(zipCommand.str(), runcode, true);
+            if (!success)
+                throw MakeStringException(WUERR_CannotImportWorkunit, "Failed in execution : %s.", zipCommand.str());
+            if (runcode != 0)
+                throw MakeStringException(WUERR_CannotImportWorkunit, "Failed in execution : %s, error(%" I64F "i)", zipCommand.str(), (unsigned __int64) runcode);
+        }
+        void buildPTreesFromZAPReport()
+        {
+            wuTree.setown(readWUXMLFile("ZAPReport_W*.xml", false));
+            fromWUID.set(wuTree->queryName());
+            wuTree->setProp("@scope", scope);
+
+            //update QueryAssociatedFiles in WU XML;
+            updateWUQueryAssociatedFilesAttrs(GetCachedHostName());
+
+            graphProgressTree.setown(readWUXMLFile("ZAPReport_*.graphprogress", true));
+        }
+        void updateJobName(IWorkUnit *workunit)
+        {
+            StringBuffer newJobName;
+            const char *jobName = workunit->queryJobName();
+            if (isEmptyString(jobName))
+                newJobName.appendf("IMPORTED from %s", fromWUID.get());
+            else
+                newJobName.appendf("IMPORTED: %s", jobName);
+            workunit->setJobName(newJobName);
+        }
+        void setImportDebugAttribute(IWorkUnit *workunit)
+        {
+            StringBuffer attr, importDateTime;
+            CDateTime dt;
+            dt.setNow();
+            dt.getString(importDateTime);
+
+            attr.append("FromWUID=").append(fromWUID).append(",");
+            attr.append("ImportDT=").append(importDateTime).append(",");
+            attr.append("ZAPReport=").append(pathTail(zapReportFileName));
+            workunit->setDebugValue("imported", attr, true);
+        }
+    public:
+        CImportWorkUnitHelper(const char *_zapReportFileName, const char *_zapReportPassword, const char *_importDir, const char *_user)
+            : zapReportFileName(_zapReportFileName), zapReportPassword(_zapReportPassword), importDir(_importDir), user(_user) { };
+
+        void import(IWorkUnit *workunit)
+        {
+            getWUAttributes(workunit);
+            setUNZIPDir();
+            unzipZAPReport();
+            buildPTreesFromZAPReport();
+            workunit->import(queryWUPTree(), queryGraphProgressPTree());
+            setImportDebugAttribute(workunit);
+            updateJobName(workunit);
+
+            removeFileTraceIfFail(zapReportFileName);
+        }
+    };
+
+    Owned<IWorkUnit> newWU = createWorkUnit(app, user, secMgr, secUser);
+    CImportWorkUnitHelper helper(zapReportFileName, zapReportPassword, importDir, user);
+    helper.import(newWU);
+}
 
 int CWorkUnitFactory::setTracingLevel(int newLevel)
 {
@@ -6126,6 +6302,11 @@ public:
     {
         return baseFactory->restoreWorkUnit(base, wuid, restoreAssociated);
     }
+    virtual void importWorkUnit(const char *zapReportFileName, const char *zapReportPassword,
+        const char *importDir, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser)
+    {
+        baseFactory->importWorkUnit(zapReportFileName, zapReportPassword, importDir, app, user, secMgr, secUser);
+    }
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();

+ 3 - 0
common/workunit/workunit.hpp

@@ -1345,6 +1345,7 @@ interface IWorkUnit : extends IConstWorkUnit
     virtual void setResultBool(const char *name, unsigned sequence, bool val) = 0;
     virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val) = 0;
     virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend) = 0;
+    virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree = nullptr) = 0;
 };
 
 
@@ -1446,6 +1447,8 @@ typedef IIteratorOf<IPropertyTree> IConstQuerySetQueryIterator;
 interface IWorkUnitFactory : extends IPluggableFactory
 {
     virtual IWorkUnit *createWorkUnit(const char *app, const char *scope, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
+    virtual void importWorkUnit(const char *zapReportFileName, const char *zapReportPassword,
+        const char *importDir, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser) = 0;
     virtual bool deleteWorkUnit(const char *wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual bool deleteWorkUnitEx(const char *wuid, bool throwException, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstWorkUnit * openWorkUnit(const char *wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;

+ 3 - 0
common/workunit/workunit.ipp

@@ -278,6 +278,7 @@ public:
     virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
     virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override;
     void clearGraphProgress() const;
+    virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.
 
     virtual const char *queryJobName() const;
     virtual IConstWUPlugin * getPluginByName(const char * name) const;
@@ -624,6 +625,8 @@ public:
     // interface IWorkUnitFactory - some are left for derived classes
 
     virtual IWorkUnit * createWorkUnit(const char * app, const char * user, ISecManager *secmgr, ISecUser *secuser);
+    virtual void importWorkUnit(const char *zapReportFileName, const char *zapReportPassword,
+        const char *importDir, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser);
     virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     virtual bool deleteWorkUnitEx(const char * wuid, bool throwException, ISecManager *secmgr, ISecUser *secuser);
     virtual IConstWorkUnit * openWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);

+ 1 - 0
common/workunit/wuerror.hpp

@@ -53,4 +53,5 @@
 #define WUERR_WorkunitVersionMismatch           5028
 #define WUERR_InvalidFieldUsage                 5029
 #define WUERR_InvalidUserInput                  5030
+#define WUERR_CannotImportWorkunit              5031
 #endif

+ 13 - 16
esp/bindings/http/platform/httptransport.cpp

@@ -2041,19 +2041,17 @@ bool CHttpRequest::readUploadFileName(CMimeMultiPart* mimemultipart, StringBuffe
     return (fileName.length() > 0);
 }
 
-IFile* CHttpRequest::createUploadFile(const char * netAddress, const char* filePath, StringBuffer& fileName)
+IFile* CHttpRequest::createUploadFile(const char * netAddress, const char* filePath, const char* fileName, StringBuffer& fileNameWithPath)
 {
-    StringBuffer name(fileName), tmpFileName;
-    char* str = (char*) name.reverse().str();
-    char* pStr = (char*) strchr(str, '\\');
-    if (!pStr)
-        pStr = strchr(str, '/');
-    if (pStr)
-    {
-        pStr[0] = 0;
-        fileName.clear().append(str).reverse();
-    }
-    tmpFileName.appendf("%s/%s.part", filePath, fileName.str());
+    StringBuffer tmpFileName(filePath);
+    addPathSepChar(tmpFileName);
+    tmpFileName.append(pathTail(fileName));
+    fileNameWithPath.set(tmpFileName);
+    tmpFileName.append(".part");
+
+    //If no netAddress is specified, create a local file.
+    if (isEmptyString(netAddress))
+        return createIFile(tmpFileName);
 
     RemoteFilename rfn;
     SocketEndpoint ep;
@@ -2125,7 +2123,8 @@ int CHttpRequest::readContentToFiles(const char * netAddress, const char * path,
         }
 
         fileNames.append(fileName);
-        Owned<IFile> file = createUploadFile(netAddress, path, fileName);
+        StringBuffer fileNameWithPath;
+        Owned<IFile> file = createUploadFile(netAddress, path, fileName, fileNameWithPath);
         if (!file)
         {
             UERRLOG("Uploaded file %s cannot be created", fileName.str());
@@ -2169,9 +2168,7 @@ int CHttpRequest::readContentToFiles(const char * netAddress, const char * path,
         if (writeError)
             break;
 
-        StringBuffer fileNameWithPath;
-        fileNameWithPath.appendf("%s/%s", path, fileName.str());
-        file->rename(fileNameWithPath.str());
+        file->rename(fileNameWithPath);
 
         if (!foundAnotherFile)
             break;

+ 1 - 1
esp/bindings/http/platform/httptransport.ipp

@@ -361,7 +361,7 @@ public:
 
     bool readContentToBuffer(MemoryBuffer& fileContent, __int64& bytesNotRead);
     bool readUploadFileName(CMimeMultiPart* mimemultipart, StringBuffer& fileName, MemoryBuffer& contentBuffer, __int64& bytesNotRead);
-    IFile* createUploadFile(const char *netAddress, const char* filePath, StringBuffer& fileName);
+    IFile* createUploadFile(const char *netAddress, const char* filePath, const char* fileName, StringBuffer& fileNameWithPath);
     virtual int readContentToFiles(const char * netAddress, const char * path, StringArray& fileNames);
     virtual void readUploadFileContent(StringArray& fileNames, StringArray& files);
 };

+ 48 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -59,6 +59,7 @@
 #endif
 
 #define ESP_WORKUNIT_DIR "workunits/"
+static constexpr const char* zipFolder = "tempzipfiles" PATHSEPSTR;
 
 #define WU_SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
 const unsigned CHECK_QUERY_STATUS_THREAD_POOL_SIZE = 25;
@@ -428,6 +429,9 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 
     recursiveCreateDirectory(ESP_WORKUNIT_DIR);
 
+    getConfigurationDirectory(directories, "data", "esp", process, dataDirectory);
+    wuFactory.setown(getWorkUnitFactory());
+
     m_sched.start();
     filesInUse.subscribe();
 
@@ -4471,6 +4475,50 @@ int CWsWorkunitsSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* req
     return onGetNotFound(context, request, response, service);
 }
 
+int CWsWorkunitsSoapBindingEx::onStartUpload(IEspContext &ctx, CHttpRequest* request, CHttpResponse* response, const char *serv, const char *method)
+{
+    StringArray fileNames, files;
+    StringBuffer source;
+    Owned<IMultiException> me = MakeMultiException(source.setf("WsWorkunits::%s()", method).str());
+    try
+    {
+        if (strieq(method, "ImportWUZAPFile"))
+        {
+            SecAccessFlags accessOwn, accessOthers;
+            getUserWuAccessFlags(ctx, accessOwn, accessOthers, false);
+            if ((accessOwn != SecAccess_Full) || (accessOthers != SecAccess_Full))
+                throw MakeStringException(-1, "Permission denied.");
+    
+            StringBuffer password;
+            request->getParameter("Password", password);
+
+            request->readContentToFiles(nullptr, zipFolder, fileNames);
+            unsigned count = fileNames.ordinality();
+            if (count == 0)
+                throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to read upload content.");
+            //For now, we only support importing 1 ZAP report per ImportWUZAPFile request for a better response time.
+            //Some ZAP report could be very big. It may take a log time to import.
+            if (count > 1)
+                throw MakeStringException(ECLWATCH_INVALID_INPUT, "Only one WU ZAP report is allowed.");
+
+            VStringBuffer fileName("%s%s", zipFolder, fileNames.item(0));
+            wswService->queryWUFactory()->importWorkUnit(fileName, password,
+                wswService->getDataDirectory(), "ws_workunits", ctx.queryUserId(), ctx.querySecManager(), ctx.queryUser());
+        }
+        else
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "WsWorkunits::%s does not support the upload_ option.", method);
+    }
+    catch (IException* e)
+    {
+        me->append(*e);
+    }
+    catch (...)
+    {
+        me->append(*MakeStringExceptionDirect(ECLWATCH_INTERNAL_ERROR, "Unknown Exception"));
+    }
+    return onFinishUpload(ctx, request, response, serv, method, fileNames, files, me);
+}
+
 bool isDeploymentTypeCompressed(const char *type)
 {
     if (type && *type)

+ 5 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -203,6 +203,8 @@ public:
     void getGraphsByQueryId(const char *target, const char *queryId, const char *graphName, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs);
     void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
     void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
+    IWorkUnitFactory *queryWUFactory() { return wuFactory; };
+    const char *getDataDirectory() const { return dataDirectory.str(); };
 
     bool onWUQuery(IEspContext &context, IEspWUQueryRequest &req, IEspWUQueryResponse &resp);
     bool onWULightWeightQuery(IEspContext &context, IEspWULightWeightQueryRequest &req, IEspWULightWeightQueryResponse &resp);
@@ -374,6 +376,8 @@ private:
     int maxRequestEntityLength;
     Owned<IThreadPool> clusterQueryStatePool;
     unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
+    Owned<IWorkUnitFactory> wuFactory;
+    StringBuffer dataDirectory;
 
 public:
     QueryFilesInUse filesInUse;
@@ -417,6 +421,7 @@ public:
 
     int onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method);
     int onGet(CHttpRequest* request, CHttpResponse* response);
+    int onStartUpload(IEspContext& ctx, CHttpRequest* request, CHttpResponse* response, const char* service, const char* method);
 
     virtual void addService(const char * name, const char * host, unsigned short port, IEspService & service)
     {

+ 115 - 0
plugins/cassandra/cassandrawu.cpp

@@ -2367,6 +2367,79 @@ public:
         dirtyPaths.kill();
         dirtyResults.kill();
     }
+    virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
+    {
+        CPersistedWorkUnit::loadPTree(LINK(wuTree));
+
+        if (sessionCache->queryTraceLevel() >= 8)
+        {
+            StringBuffer s; toXML(wuTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
+        }
+
+        CIArrayOf<CassandraStatement> secondaryBatch;
+        CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
+        updateSecondaries(secondaryBatch);
+
+        // MORE can use the table?
+        childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, wuTree, "Graphs/Graph", 0);
+        childXMLtoCassandra(sessionCache, batch, wuResultsMappings, wuTree, "Results/Result", "0");
+        childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, wuTree, "Variables/Variable", "-1"); // ResultSequenceStored
+        childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, wuTree, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
+        childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, wuTree, "Exceptions/Exception", 0);
+        childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, wuTree, "Statistics/Statistic", 0);
+        childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, wuTree, "FilesRead/File", 0);
+        childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, wuTree, "Files/File", 0);
+        childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, wuTree, "usedsources/datasource", 0);
+
+        IPTree *query = wuTree->queryPropTree("Query");
+        if (query)
+            childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, queryWuid(), *query, 0);
+
+        if (sessionCache->queryTraceLevel() > 1)
+            DBGLOG("Executing commit batches");
+
+        CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
+        futureBatch.wait("commit updates");
+        executeAsync(secondaryBatch, "commit");
+
+        if (!graphProgressTree)
+            return;
+
+        if (sessionCache->queryTraceLevel() >= 8)
+        {
+            StringBuffer s; toXML(graphProgressTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
+        }
+
+        Owned<IPTreeIterator> graphs = graphProgressTree->getElements("*");
+        ForEach(*graphs)
+        {
+            IPTree &graph = graphs->query();
+            const char *graphName = graph.queryName();
+            Owned<IPTreeIterator> subs = graph.getElements("*");
+            ForEach(*subs)
+            {
+                IPTree &sub = subs->query();
+                const char *name=sub.queryName();
+                if (name[0]=='s' && name[1]=='g')
+                {
+                    setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"));
+                }
+                else if (streq(name, "node"))
+                {
+                    unsigned subid = sub.getPropInt("@id");
+                    if (subid)
+                    {
+                        if (sub.hasChildren()) // Old format
+                            setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"));
+                        if (sub.hasProp("@_state"))
+                            setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
+                    }
+                }
+            }
+            if (graph.hasProp("@_state"))
+                setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
+        }
+    }
     virtual IConstWUGraph *getGraph(const char *qname) const
     {
         // Just because we read one graph, does not mean we are likely to read more. So don't cache this result.
@@ -2952,6 +3025,13 @@ protected:
             simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
     }
 
+    void updateSecondaryTable(const char *xpath, const char *wuid, CIArrayOf<CassandraStatement> &batch)
+    {
+        const char *value = p->queryProp(xpath);
+        if (value && *value)
+            simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
+    }
+
     void deleteAppSecondaries(IPTree &pt, const char *wuid, CIArrayOf<CassandraStatement> &batch)
     {
         Owned<IPTreeIterator> apps = pt.getElements("Application");
@@ -3029,6 +3109,41 @@ protected:
         }
     }
 
+    void updateSecondaries(CIArrayOf<CassandraStatement> &batch)
+    {
+        const char *wuid = queryWuid();
+        const char * const *search;
+        for (search = searchPaths; *search; search++)
+            updateSecondaryTable(*search, wuid, batch);
+        for (search = wildSearchPaths; *search; search++)
+        {
+            const char *value = p->queryProp(*search);
+            if (value && *value)
+                addUniqueValue(sessionCache, batch, *search, value);
+        }
+        Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
+        ForEach(*appValues)
+        {
+            IConstWUAppValue& val=appValues->query();
+            addUniqueValue(sessionCache, batch, "Application", val.queryApplication());  // Used to populate droplists of applications
+            VStringBuffer key("@@%s", val.queryApplication());
+            addUniqueValue(sessionCache, batch, key, val.queryName());  // Used to populate droplists of value names for a given application
+            VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
+            addUniqueValue(sessionCache, batch, xpath, val.queryValue());  // Used to get lists of values for a given app and name, and for filtering
+            simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
+        }
+        Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
+        ForEach(*filesRead)
+        {
+            addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
+        }
+        Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
+        ForEach(*filesWritten)
+        {
+            addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
+        }
+    }
+
     // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
 
     bool trackSecondaryChange(const char *newval, const char *xpath)