浏览代码

Merge pull request #5373 from richardkchapman/roxiepersist

HPCC-10574 Support persists in roxie

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 年之前
父节点
当前提交
e99945028e
共有 7 个文件被更改,包括 482 次插入27 次删除
  1. 1 2
      ecl/hqlcpp/hqlcerrors.hpp
  2. 2 6
      ecl/hqlcpp/hqlttcpp.cpp
  3. 470 13
      roxie/ccd/ccdcontext.cpp
  4. 2 1
      roxie/ccd/ccdcontext.hpp
  5. 3 3
      roxie/ccd/ccdquery.cpp
  6. 2 1
      roxie/ccd/ccdquery.hpp
  7. 2 1
      roxie/ccd/ccdstate.cpp

+ 1 - 2
ecl/hqlcpp/hqlcerrors.hpp

@@ -69,7 +69,7 @@
 #define HQLERR_RowTooLarge                      4043
 #define HQLERR_RowTooLarge                      4043
 #define HQLERR_ShouldHaveBeenHoisted            4044
 #define HQLERR_ShouldHaveBeenHoisted            4044
 #define HQLERR_NoArgumentsInValidator           4045
 #define HQLERR_NoArgumentsInValidator           4045
-#define HQLERR_NotSupportInRoxie                4046
+
 #define HQLERR_InputMergeNotSorted              4047
 #define HQLERR_InputMergeNotSorted              4047
 #define HQLERR_TooComplicatedToPreload          4048
 #define HQLERR_TooComplicatedToPreload          4048
 #define HQLERR_KeyedNotKeyed                    4049
 #define HQLERR_KeyedNotKeyed                    4049
@@ -356,7 +356,6 @@
 #define HQLERR_RowTooLarge_Text                 "Row size %u exceeds the maximum specified (%u)"
 #define HQLERR_RowTooLarge_Text                 "Row size %u exceeds the maximum specified (%u)"
 #define HQLERR_ShouldHaveBeenHoisted_Text       "Select expression should have been hoisted"
 #define HQLERR_ShouldHaveBeenHoisted_Text       "Select expression should have been hoisted"
 #define HQLERR_NoArgumentsInValidator_Text      "%s() cannot have a parameter inside a VALIDATE"
 #define HQLERR_NoArgumentsInValidator_Text      "%s() cannot have a parameter inside a VALIDATE"
-#define HQLERR_NotSupportInRoxie_Text           "%s is not supported in roxie queries"
 #define HQLERR_InputMergeNotSorted_Text         "Input to MERGE does not appear to be sorted"
 #define HQLERR_InputMergeNotSorted_Text         "Input to MERGE does not appear to be sorted"
 #define HQLERR_TooComplicatedToPreload_Text     "Expression is too complicated to preload"
 #define HQLERR_TooComplicatedToPreload_Text     "Expression is too complicated to preload"
 #define HQLERR_KeyedNotKeyed_Text               "KEYED(%s) couldn't be looked up in a key."
 #define HQLERR_KeyedNotKeyed_Text               "KEYED(%s) couldn't be looked up in a key."

+ 2 - 6
ecl/hqlcpp/hqlttcpp.cpp

@@ -5674,13 +5674,9 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
         switch (curOp)
         switch (curOp)
         {
         {
         case no_persist:
         case no_persist:
-            if (isRoxie && translator.getCheckRoxieRestrictions())
+            if (isRoxie)
             {
             {
-                StringBuffer s;
-                IHqlExpression * name = cur.queryChild(0);
-                OwnedHqlExpr seq = getGlobalSequenceNumber();
-                getStoredDescription(s, seq, name, true);
-                throwError1(HQLERR_NotSupportInRoxie, s.str());
+                // MORE - Add dynamic attribute to ensure the file is not pre-resolved
             }
             }
             //fall through
             //fall through
         case no_checkpoint:
         case no_checkpoint:

+ 470 - 13
roxie/ccd/ccdcontext.cpp

@@ -34,6 +34,7 @@
 #include "ccdqueue.ipp"
 #include "ccdqueue.ipp"
 #include "ccdsnmp.hpp"
 #include "ccdsnmp.hpp"
 #include "ccdstate.hpp"
 #include "ccdstate.hpp"
+#include "roxiehelper.hpp"
 
 
 using roxiemem::IRowManager;
 using roxiemem::IRowManager;
 
 
@@ -189,15 +190,34 @@ public:
 };
 };
 
 
 //=======================================================================================================================
 //=======================================================================================================================
+#define DEFAULT_PERSIST_COPIES (-1)
+#define PERSIST_LOCK_TIMEOUT 10000
+#define PERSIST_LOCK_SLEEP 5000
 
 
 class CRoxieWorkflowMachine : public WorkflowMachine
 class CRoxieWorkflowMachine : public WorkflowMachine
 {
 {
+    class PersistVersion : public CInterface
+    {
+    public:
+        PersistVersion(char const * _logicalName, unsigned _eclCRC, unsigned __int64 _allCRC, bool _isFile) : logicalName(_logicalName), eclCRC(_eclCRC), allCRC(_allCRC), isFile(_isFile) {}
+        StringAttr logicalName;
+        unsigned eclCRC;
+        unsigned __int64 allCRC;
+        bool isFile;
+    };
+
 public:
 public:
-    CRoxieWorkflowMachine(IPropertyTree *_workflowInfo, bool _doOnce, const IRoxieContextLogger &_logctx) : WorkflowMachine(_logctx)
+    CRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *_wu, bool _doOnce, const IRoxieContextLogger &_logctx)
+    : WorkflowMachine(_logctx)
     {
     {
+        workunit = _wu;
         workflowInfo = _workflowInfo;
         workflowInfo = _workflowInfo;
         doOnce = _doOnce;
         doOnce = _doOnce;
     }
     }
+    void returnPersistVersion(char const * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
+    {
+        persist.setown(new PersistVersion(logicalName, eclCRC, allCRC, isFile));
+    }
 protected:
 protected:
     virtual void begin()
     virtual void begin()
     {
     {
@@ -225,15 +245,401 @@ protected:
     virtual bool schedulingPullStop() { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported in roxie"); }
     virtual bool schedulingPullStop() { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Scheduling not supported in roxie"); }
     virtual void reportContingencyFailure(char const * type, IException * e) {}
     virtual void reportContingencyFailure(char const * type, IException * e) {}
     virtual void checkForAbort(unsigned wfid, IException * handling) {}
     virtual void checkForAbort(unsigned wfid, IException * handling) {}
-    virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Persists not supported in roxie"); }
+    virtual void doExecutePersistItem(IRuntimeWorkflowItem & item)
+    {
+        if (!workunit)
+        {
+            throw MakeStringException(0, "PERSIST not supported when running predeployed queries");
+        }
+        unsigned wfid = item.queryWfid();
+        // Old persist model requires dependencies to be executed BEFORE checking if the persist is up to date
+        // Defaults to old model, in case executing a WU that is created by earlier eclcc
+        if (!workunit->getDebugValueBool("expandPersistInputDependencies", false))
+            doExecuteItemDependencies(item, wfid);
+        SCMStringBuffer name;
+        const char *logicalName = item.getPersistName(name).str();
+        int maxPersistCopies = item.queryPersistCopies();
+        if (maxPersistCopies < 0)
+            maxPersistCopies = DEFAULT_PERSIST_COPIES;
+        Owned<IRemoteConnection> persistLock;
+        persistLock.setown(startPersist(logicalName));
+        doExecuteItemDependency(item, item.queryPersistWfid(), wfid, true);  // generated code should end up calling back to returnPersistVersion, which sets persist
+        if (!persist)
+        {
+            StringBuffer errmsg;
+            errmsg.append("Internal error in generated code: for wfid ").append(wfid).append(", persist CRC wfid ").append(item.queryPersistWfid()).append(" did not call returnPersistVersion");
+            throw MakeStringExceptionDirect(0, errmsg.str());
+        }
+        Owned<PersistVersion> thisPersist = persist.getClear();
+        if (strcmp(logicalName, thisPersist->logicalName.get()) != 0)
+        {
+            StringBuffer errmsg;
+            errmsg.append("Failed workflow/persist consistency check: wfid ").append(wfid).append(", WU persist name ").append(logicalName).append(", runtime persist name ").append(thisPersist->logicalName.get());
+            throw MakeStringExceptionDirect(0, errmsg.str());
+        }
+        if (workunit->getDebugValueInt("freezepersists", 0) != 0)
+        {
+            checkPersistMatches(logicalName, thisPersist->eclCRC);
+        }
+        else if(!isPersistUptoDate(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC, thisPersist->isFile))
+        {
+            // We used to call agent.clearPersist(logicalName) here - but that means if the persist rebuild fails, we forget WHY we wanted to.
+            // New persist model allows dependencies to be executed AFTER checking if the persist is up to date
+            if (workunit->getDebugValueBool("expandPersistInputDependencies", false))
+                doExecuteItemDependencies(item, wfid);
+            if (maxPersistCopies > 0)
+                deleteLRUPersists(logicalName, (unsigned) maxPersistCopies-1);
+            doExecuteItem(item, wfid);
+            updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
+        }
+        finishPersist(persistLock.getClear());
+    }
+
 private:
 private:
+
+    bool isResult(const char *name, unsigned sequence)
+    {
+        Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
+        return r != NULL && r->getResultStatus() != ResultStatusUndefined;
+    }
+
+    unsigned getResultHash(const char * name, unsigned sequence)
+    {
+        Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
+        if (!r)
+            throw MakeStringException(ROXIE_INTERNAL_ERROR, "Failed to retrieve hash value %s from workunit", name);
+        return r->getResultHash();
+    }
+
+    unsigned __int64 getResultInt(const char * name, unsigned sequence)
+    {
+        Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
+        if (!r)
+            throw MakeStringException(ROXIE_INTERNAL_ERROR, "Failed to retrieve persist value %s from workunit", name);
+        return r->getResultInt();
+    }
+
+    void setResultInt(const char * name, unsigned sequence, unsigned __int64 value)
+    {
+        WorkunitUpdate w(&workunit->lock());
+        w->setResultInt(name, sequence, value);
+    }
+
+    inline bool fileExists(const char *lfn)
+    {
+        Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn, NULL);  // MORE - need a userdescriptor from workunit
+        if (f)
+            return true;
+        return false;
+    }
+
+    inline IUserDescriptor *queryUserDescriptor()
+    {
+        return workunit->queryUserDescriptor();
+    }
+
+    bool checkPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer &errText)
+    {
+        StringBuffer lfn, crcName, eclName;
+        expandLogicalFilename(lfn, logicalName, workunit, false);
+        crcName.append(lfn).append("$crc");
+        eclName.append(lfn).append("$eclcrc");
+
+        if (!isResult(lfn, ResultSequencePersist))
+            errText.appendf("Building PERSIST('%s'): It hasn't been calculated before", logicalName);
+        else if (!isResult(crcName, ResultSequencePersist))
+            errText.appendf("Rebuilding PERSIST('%s'): Saved CRC isn't present", logicalName);
+        else if (isFile && !fileExists(lfn))
+            errText.appendf("Rebuilding PERSIST('%s'): Persistent file does not exist", logicalName);
+        else
+        {
+            unsigned savedEclCRC = (unsigned) getResultInt(eclName, ResultSequencePersist);
+            unsigned __int64 savedCRC = (unsigned __int64)getResultInt(crcName, ResultSequencePersist);
+            if (savedEclCRC != eclCRC)
+                errText.appendf("Rebuilding PERSIST('%s'): ECL has changed", logicalName);
+            else if (savedCRC != allCRC)
+                errText.appendf("Rebuilding PERSIST('%s'): Input files have changed", logicalName);
+            else
+                return true;
+        }
+
+        return false;
+    }
+
+    bool changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat)
+    {
+        logctx.CTXLOG("Waiting to change persist lock to %s for %s", (mode == RTM_LOCK_WRITE) ? "write" : "read", name);  // MORE - pass a logctx around?
+        //When converting a read lock to a write lock so the persist can be rebuilt hold onto the lock as short as
+        //possible.  Otherwise lots of workunits each trying to convert read locks to write locks will mean
+        //that the read lock is never released by all the workunits at the same time, so no workunit can progress.
+        unsigned timeout = repeat ? PERSIST_LOCK_TIMEOUT : 0;
+        loop
+        {
+            try
+            {
+                persistLock->changeMode(mode, timeout);
+                logctx.CTXLOG("Changed persist lock");
+                return true;
+            }
+            catch(ISDSException *E)
+            {
+                if (SDSExcpt_LockTimeout != E->errorCode())
+                    throw E;
+                E->Release();
+            }
+            if (!repeat)
+            {
+                logctx.CTXLOG("Failed to convert persist lock");
+                return false;
+            }
+            //This is only executed when converting write->read.  There is significant doubt whether the changeMode()
+            //can ever fail - and whether the execution can ever get here.
+            logctx.CTXLOG("Waiting to convert persist lock"); // MORE - give a chance to abort
+        }
+    }
+
+    IRemoteConnection *getPersistReadLock(const char * logicalName)
+    {
+        StringBuffer lfn;
+        expandLogicalFilename(lfn, logicalName, workunit, false);
+        if (!lfn.length())
+            throw MakeStringException(0, "Invalid persist name used : '%s'", logicalName);
+
+        const char * name = lfn;
+
+        StringBuffer xpath;
+        xpath.append("/PersistRunLocks/");
+        if (isdigit(*name))
+            xpath.append("_");
+        for (const char * cur = name;*cur;cur++)
+            xpath.append(isalnum(*cur) ? *cur : '_');
+
+        logctx.CTXLOG("Waiting for persist read lock for %s", name);
+        Owned<IRemoteConnection> persistLock;
+        loop
+        {
+            try
+            {
+                unsigned mode = RTM_CREATE_QUERY | RTM_LOCK_READ;
+                if (queryDaliServerVersion().compare("1.4") >= 0)
+                    mode |= RTM_DELETE_ON_DISCONNECT;
+                persistLock.setown(querySDS().connect(xpath.str(), myProcessSession(), mode, PERSIST_LOCK_TIMEOUT));
+            }
+            catch(ISDSException *E)
+            {
+                if (SDSExcpt_LockTimeout != E->errorCode())
+                    throw E;
+                E->Release();
+            }
+            if (persistLock)
+                break;
+            logctx.CTXLOG("Waiting for persist read lock"); // MORE - give a chance to abort
+        }
+
+        logctx.CTXLOG("Obtained persist read lock");
+        return persistLock.getClear();
+    }
+
+    void setBlockedOnPersist(const char * logicalName)
+    {
+        StringBuffer s;
+        s.append("Waiting for persist ").append(logicalName);
+        WorkunitUpdate w(&workunit->lock());
+        w->setState(WUStateBlocked);
+        w->setStateEx(s.str());
+    }
+
+    bool isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
+    {
+        //Loop trying to get a write lock - if it fails, then release the read lock, otherwise
+        //you can get a deadlock with several things waiting to read, and none being able to write.
+        bool rebuildAllPersists = false;   // Useful for debugging purposes
+        loop
+        {
+            StringBuffer dummy;
+            if (checkPersistUptoDate(logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
+            {
+                logctx.CTXLOG("PERSIST('%s') is up to date", logicalName);
+                return true;
+            }
+
+            //Get a write lock
+            setBlockedOnPersist(logicalName);
+            if (changePersistLockMode(persistLock, RTM_LOCK_WRITE, logicalName, false))
+                break;
+
+            //failed to get a write lock, so release our read lock
+            persistLock.clear();
+            MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
+            persistLock.setown(getPersistReadLock(logicalName));
+        }
+        WorkunitUpdate w(&workunit->lock());
+        w->setState(WUStateRunning);
+
+        //Check again whether up to date, someone else might have updated it!
+        StringBuffer errText;
+        if (checkPersistUptoDate(logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
+        {
+            logctx.CTXLOG("PERSIST('%s') is up to date (after being calculated by another job)", logicalName);
+            changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
+            return true;
+        }
+        if (errText.length())
+            logctx.CTXLOG("%s", errText.str());
+        return false;
+    }
+
+    void clearPersist(const char * logicalName)
+    {
+        StringBuffer lfn, crcName, eclName;
+        expandLogicalFilename(lfn, logicalName, workunit, false);
+        crcName.append(lfn).append("$crc");
+        eclName.append(lfn).append("$eclcrc");
+
+        setResultInt(crcName, ResultSequencePersist, 0);
+        setResultInt(eclName, ResultSequencePersist, 0);
+        logctx.CTXLOG("Recalculate persistent value %s", logicalName);
+    }
+
+    void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
+    {
+        StringBuffer lfn, crcName, eclName;
+        expandLogicalFilename(lfn, logicalName, workunit, false);
+        crcName.append(lfn).append("$crc");
+        eclName.append(lfn).append("$eclcrc");
+
+        setResultInt(crcName, ResultSequencePersist, allCRC);
+        setResultInt(eclName, ResultSequencePersist, eclCRC);
+
+        logctx.CTXLOG("Convert persist write lock to read lock");
+        changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
+    }
+
+    IRemoteConnection *startPersist(const char * logicalName)
+    {
+        setBlockedOnPersist(logicalName);
+        IRemoteConnection *persistLock = getPersistReadLock(logicalName);
+        WorkunitUpdate w(&workunit->lock());
+        w->setState(WUStateRunning);
+        return persistLock;
+    }
+
+    void finishPersist(IRemoteConnection *persistLock)
+    {
+        logctx.CTXLOG("Finished persists - add to read lock list");
+        persistReadLocks.append(*persistLock);
+    }
+
+    void checkPersistMatches(const char * logicalName, unsigned eclCRC)
+    {
+        StringBuffer lfn, eclName;
+        expandLogicalFilename(lfn, logicalName, workunit, true);
+        eclName.append(lfn).append("$eclcrc");
+
+        if (!isResult(lfn, ResultSequencePersist))
+            throw MakeStringException(ROXIE_INTERNAL_ERROR, "Frozen PERSIST('%s') hasn't been calculated ", logicalName);
+        if (isResult(eclName, ResultSequencePersist) && (getResultInt(eclName, ResultSequencePersist) != eclCRC))
+            throw MakeStringException(ROXIE_INTERNAL_ERROR, "Frozen PERSIST('%s') ECL has changed", logicalName);
+
+        StringBuffer msg;
+        msg.append("Frozen PERSIST('").append(logicalName).append("') is up to date");
+        logctx.CTXLOG("%s", msg.str());
+    }
+
+    static int comparePersistAccess(IInterface **_a, IInterface **_b)
+    {
+        IPropertyTree *a = *(IPropertyTree **)_a;
+        IPropertyTree *b = *(IPropertyTree **)_b;
+        const char *accessedA = a->queryProp("@accessed");
+        const char *accessedB = b->queryProp("@accessed");
+        if (accessedA && accessedB)
+            return strcmp(accessedB, accessedA);
+        else if (accessedB)
+            return -1;
+        else if (accessedA)
+            return 1;
+        else
+            return 0;
+
+    }
+
+    void deleteLRUPersists(const char * logicalName, unsigned keep)
+    {
+        StringBuffer lfn;
+        expandLogicalFilename(lfn, logicalName, workunit, false);
+        logicalName = lfn.str();
+        const char *tail = strrchr(logicalName, '_');     // Locate the trailing double-underbar
+        assertex(tail);
+        StringBuffer head(tail-logicalName+1, logicalName);
+        head.append("p*");                                  // Multi-mode persist names end with __pNNNNNNN
+        loop  // Until we manage to delete without things changing beneath us...
+        {
+            IArrayOf<IPropertyTree> persists;
+            Owned<IDFAttributesIterator> iter = queryDistributedFileDirectory().getDFAttributesIterator(head,queryUserDescriptor(),false,false,NULL);
+            ForEach(*iter)
+            {
+                IPropertyTree &pt = iter->query();
+                const char *name = pt.queryProp("@name");
+                if (stricmp(name, logicalName) == 0)   // Don't include the one we are intending to recreate in the LRU list (keep value does not include it)
+                    continue;
+                if (pt.getPropBool("@persistent", false))
+                {
+                    // Paranoia - check as far as we can that it really is another instance of this persist
+                    tail = strrchr(name, '_');     // Locate the trailing double-underbar
+                    assertex(tail);
+                    tail++;
+                    bool crcSuffix = (*tail++=='p');
+                    while (crcSuffix && *tail)
+                    {
+                        if (!isdigit(*tail))
+                            crcSuffix = false;
+                        tail++;
+                    }
+                    if (crcSuffix)
+                        persists.append(*LINK(&pt));
+                }
+            }
+            if (persists.ordinality() > keep)
+            {
+                persists.sort(comparePersistAccess);
+                while (persists.ordinality() > keep)
+                {
+                    Owned<IPropertyTree> oldest = &persists.popGet();
+                    const char *oldAccessTime = oldest->queryProp("@accessed");
+                    VStringBuffer goer("~%s", oldest->queryProp("@name"));   // Make sure we don't keep adding the scope
+                    Owned<IRemoteConnection> persistLock = getPersistReadLock(goer);
+                    while (!changePersistLockMode(persistLock, RTM_LOCK_WRITE, goer, false))
+                    {
+                        persistLock.clear();
+                        MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
+                        persistLock.setown(getPersistReadLock(goer));
+                    }
+                    Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(goer, queryUserDescriptor(), true);
+                    if (!f)
+                        continue; // Persist has been deleted since last checked - repeat the whole process
+                    const char *newAccessTime = f->queryAttributes().queryProp("@accessed");
+                    if (oldAccessTime && newAccessTime && !streq(oldAccessTime, newAccessTime))
+                        continue; // Persist has been accessed since last checked - repeat the whole process
+                    else if (newAccessTime && !oldAccessTime)
+                        continue; // Persist has been accessed since last checked - repeat the whole process
+                    DBGLOG("Deleting LRU persist %s (last accessed at %s)", goer.str(), oldAccessTime);
+                    f->detach();
+                }
+            }
+            break;
+        }
+    }
+
+    IConstWorkUnit *workunit;
     IPropertyTree *workflowInfo;
     IPropertyTree *workflowInfo;
+    Owned<PersistVersion> persist;
+    Array persistReadLocks;
     bool doOnce;
     bool doOnce;
 };
 };
 
 
-WorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, bool _doOnce, const IRoxieContextLogger &_logctx)
+CRoxieWorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *_wu, bool _doOnce, const IRoxieContextLogger &_logctx)
 {
 {
-    return new CRoxieWorkflowMachine(_workflowInfo, _doOnce, _logctx);
+    return new CRoxieWorkflowMachine(_workflowInfo, _wu, _doOnce, _logctx);
 }
 }
 
 
 //=======================================================================================================================
 //=======================================================================================================================
@@ -688,6 +1094,7 @@ public:
         ctxFetchPreload = 0;
         ctxFetchPreload = 0;
         ctxPrefetchProjectPreload = 0;
         ctxPrefetchProjectPreload = 0;
         traceActivityTimes = _traceActivityTimes;
         traceActivityTimes = _traceActivityTimes;
+        persists = NULL;
         temporaries = NULL;
         temporaries = NULL;
         deserializedResultStore = NULL;
         deserializedResultStore = NULL;
         rereadResults = NULL;
         rereadResults = NULL;
@@ -715,6 +1122,7 @@ public:
     ~CSlaveContext()
     ~CSlaveContext()
     {
     {
         ::Release(rereadResults);
         ::Release(rereadResults);
+        ::Release(persists);
         ::Release(temporaries);
         ::Release(temporaries);
         ::Release(deserializedResultStore);
         ::Release(deserializedResultStore);
     }
     }
@@ -1548,6 +1956,7 @@ public:
 protected:
 protected:
     mutable CriticalSection contextCrit;
     mutable CriticalSection contextCrit;
     Owned<IPropertyTree> context;
     Owned<IPropertyTree> context;
+    IPropertyTree *persists;
     IPropertyTree *temporaries;
     IPropertyTree *temporaries;
     IPropertyTree *rereadResults;
     IPropertyTree *rereadResults;
     PTreeReaderOptions xmlStoredDatasetReadFlags;
     PTreeReaderOptions xmlStoredDatasetReadFlags;
@@ -1564,7 +1973,12 @@ protected:
             else
             else
                 throw MakeStringException(ROXIE_CODEGEN_ERROR, "Code generation error - attempting to access stored variable on slave");
                 throw MakeStringException(ROXIE_CODEGEN_ERROR, "Code generation error - attempting to access stored variable on slave");
         case ResultSequencePersist:
         case ResultSequencePersist:
-            throwUnexpected();  // Do not expect to see in Roxie
+            {
+                CriticalBlock b(contextCrit);
+                if (!persists)
+                    persists = createPTree();
+                return *persists;
+            }
         case ResultSequenceInternal:
         case ResultSequenceInternal:
             {
             {
                 CriticalBlock b(contextCrit);
                 CriticalBlock b(contextCrit);
@@ -2103,7 +2517,7 @@ class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext
     unsigned lastHeartBeat;
     unsigned lastHeartBeat;
 
 
 protected:
 protected:
-    Owned<WorkflowMachine> workflow;
+    Owned<CRoxieWorkflowMachine> workflow;
     SafeSocket *client;
     SafeSocket *client;
     bool isBlocked;
     bool isBlocked;
     bool isHttp;
     bool isHttp;
@@ -2235,7 +2649,7 @@ public:
     {
     {
         init();
         init();
         rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
         rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
-        workflow.setown(_factory->createWorkflowMachine(true, logctx));
+        workflow.setown(_factory->createWorkflowMachine(workUnit, true, logctx));
         context.setown(createPTree(ipt_caseInsensitive));
         context.setown(createPTree(ipt_caseInsensitive));
     }
     }
 
 
@@ -2245,7 +2659,7 @@ public:
         init();
         init();
         workUnit.set(_workUnit);
         workUnit.set(_workUnit);
         rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
         rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
-        workflow.setown(_factory->createWorkflowMachine(false, logctx));
+        workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
         context.setown(createPTree(ipt_caseInsensitive));
         context.setown(createPTree(ipt_caseInsensitive));
 
 
         //MORE: Use various debug settings to override settings:
         //MORE: Use various debug settings to override settings:
@@ -2295,7 +2709,7 @@ public:
         rowManager->setActivityTracking(context->getPropBool("_TraceMemory", false));
         rowManager->setActivityTracking(context->getPropBool("_TraceMemory", false));
         rowManager->setMemoryLimit((memsize_t) context->getPropInt64("_MemoryLimit", _factory->getMemoryLimit()));
         rowManager->setMemoryLimit((memsize_t) context->getPropInt64("_MemoryLimit", _factory->getMemoryLimit()));
         authToken.append(httpHelper.queryAuthToken());
         authToken.append(httpHelper.queryAuthToken());
-        workflow.setown(_factory->createWorkflowMachine(false, logctx));
+        workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
 
 
         ctxParallelJoinPreload = context->getPropInt("_ParallelJoinPreload", defaultParallelJoinPreload);
         ctxParallelJoinPreload = context->getPropInt("_ParallelJoinPreload", defaultParallelJoinPreload);
         ctxFullKeyedJoinPreload = context->getPropInt("_FullKeyedJoinPreload", defaultFullKeyedJoinPreload);
         ctxFullKeyedJoinPreload = context->getPropInt("_FullKeyedJoinPreload", defaultFullKeyedJoinPreload);
@@ -3175,15 +3589,24 @@ public:
         }
         }
     }
     }
 
 
-    // persist-related code - usage of persist should have been caught and rejected at codegen time
-    virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
+    // persist-related code
+    virtual char * getExpandLogicalName(const char * logicalName)
+    {
+        StringBuffer lfn;
+        expandLogicalFilename(lfn, logicalName, workUnit, false);
+        return lfn.detach();
+    }
     virtual IRemoteConnection *startPersist(const char * name) { throwUnexpected(); }
     virtual IRemoteConnection *startPersist(const char * name) { throwUnexpected(); }
     virtual void finishPersist(IRemoteConnection *) { throwUnexpected(); }
     virtual void finishPersist(IRemoteConnection *) { throwUnexpected(); }
     virtual void clearPersist(const char * logicalName) { throwUnexpected(); }
     virtual void clearPersist(const char * logicalName) { throwUnexpected(); }
     virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) { throwUnexpected(); }
     virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) { throwUnexpected(); }
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC) { throwUnexpected(); }
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC) { throwUnexpected(); }
     virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }
     virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }
-    virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile) { throwUnexpected(); }
+    virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
+    {
+        if (workflow)
+            workflow->returnPersistVersion(logicalName, eclCRC, allCRC, isFile);
+    }
     virtual void fail(int code, const char *text)
     virtual void fail(int code, const char *text)
     {
     {
         addWuException(text, code, 2, "user");
         addWuException(text, code, 2, "user");
@@ -3211,7 +3634,41 @@ public:
     virtual void doWait(unsigned code, char const * extra) { UNIMPLEMENTED; }
     virtual void doWait(unsigned code, char const * extra) { UNIMPLEMENTED; }
     virtual void doWaitCond(unsigned code, char const * extra, int sequence, char const * alias, unsigned wfid) { UNIMPLEMENTED; }
     virtual void doWaitCond(unsigned code, char const * extra, int sequence, char const * alias, unsigned wfid) { UNIMPLEMENTED; }
 
 
-    virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); }
+    static unsigned __int64 crcLogicalFileTime(IDistributedFile * file, unsigned __int64 crc, const char * filename)
+    {
+        CDateTime dt;
+        StringBuffer dtstr;
+        file->getModificationTime(dt);
+        unsigned __int64 modifiedTime = dt.getSimple();
+        return rtlHash64Data(sizeof(modifiedTime), &modifiedTime, crc);
+    }
+
+    virtual unsigned __int64 getDatasetHash(const char * logicalName, unsigned __int64 crc)
+    {
+        StringBuffer fullname;
+        expandLogicalFilename(fullname, logicalName, workUnit, false);
+        Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(fullname.str(),queryUserDescriptor());
+        if (file)
+        {
+            WorkunitUpdate wu = updateWorkUnit();
+            wu->noteFileRead(file);
+            IDistributedSuperFile * super = file->querySuperFile();
+            if (super)
+            {
+                Owned<IDistributedFileIterator> iter = super->getSubFileIterator(true);
+                ForEach(*iter)
+                {
+                    IDistributedFile & cur = iter->query();
+                    const char * name = cur.queryLogicalName();
+                    crc = rtlHash64Data(strlen(name), name, crc);
+                    crc = crcLogicalFileTime(&cur, crc, name);
+                }
+            }
+            else
+                crc = crcLogicalFileTime(file, crc, fullname.str());
+        }
+        return crc;
+    }
 
 
     virtual int queryLastFailCode() { UNIMPLEMENTED; }
     virtual int queryLastFailCode() { UNIMPLEMENTED; }
     virtual void getLastFailMessage(size32_t & outLen, char * &outStr, const char * tag) { UNIMPLEMENTED; }
     virtual void getLastFailMessage(size32_t & outLen, char * &outStr, const char * tag) { UNIMPLEMENTED; }

+ 2 - 1
roxie/ccd/ccdcontext.hpp

@@ -107,12 +107,13 @@ interface IDeserializedResultStore : public IInterface
 };
 };
 
 
 typedef IEclProcess* (* EclProcessFactory)();
 typedef IEclProcess* (* EclProcessFactory)();
+class CRoxieWorkflowMachine;
 
 
 extern IDeserializedResultStore *createDeserializedResultStore();
 extern IDeserializedResultStore *createDeserializedResultStore();
 extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const SlaveContextLogger &logctx, unsigned timeLimit, memsize_t memoryLimit, IRoxieQueryPacket *packet);
 extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const SlaveContextLogger &logctx, unsigned timeLimit, memsize_t memoryLimit, IRoxieQueryPacket *packet);
 extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, unsigned priority, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
 extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, unsigned priority, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
 extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const ContextLogger &_logctx);
 extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const ContextLogger &_logctx);
 extern IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &logctx);
 extern IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &logctx);
-extern WorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, bool doOnce, const IRoxieContextLogger &_logctx);
+extern CRoxieWorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *wu, bool doOnce, const IRoxieContextLogger &_logctx);
 
 
 #endif
 #endif

+ 3 - 3
roxie/ccd/ccdquery.cpp

@@ -1194,7 +1194,7 @@ public:
     {
     {
         return package;
         return package;
     }
     }
-    virtual WorkflowMachine *createWorkflowMachine(bool isOnce, const ContextLogger &logctx) const
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
     {
     {
         throwUnexpected();  // only on server...
         throwUnexpected();  // only on server...
     }
     }
@@ -1380,12 +1380,12 @@ public:
         return createWorkUnitServerContext(wu, this, _logctx);
         return createWorkUnitServerContext(wu, this, _logctx);
     }
     }
 
 
-    virtual WorkflowMachine *createWorkflowMachine(bool isOnce, const ContextLogger &logctx) const
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
     {
     {
         IPropertyTree *workflow = queryWorkflowTree();
         IPropertyTree *workflow = queryWorkflowTree();
         if (workflow)
         if (workflow)
         {
         {
-            return ::createRoxieWorkflowMachine(workflow, isOnce, logctx);
+            return ::createRoxieWorkflowMachine(workflow, wu, isOnce, logctx);
         }
         }
         else
         else
             return NULL;
             return NULL;

+ 2 - 1
roxie/ccd/ccdquery.hpp

@@ -70,6 +70,7 @@ interface IActivityGraph : extends IInterface
 
 
 interface IRoxiePackage;
 interface IRoxiePackage;
 interface IDeserializedResultStore;
 interface IDeserializedResultStore;
+class CRoxieWorkflowMachine;
 
 
 interface ISharedOnceContext : extends IInterface
 interface ISharedOnceContext : extends IInterface
 {
 {
@@ -108,7 +109,7 @@ interface IQueryFactory : extends IInterface
     virtual void getActivityMetrics(StringBuffer &reply) const = 0;
     virtual void getActivityMetrics(StringBuffer &reply) const = 0;
 
 
     virtual IPropertyTree *cloneQueryXGMML() const = 0;
     virtual IPropertyTree *cloneQueryXGMML() const = 0;
-    virtual WorkflowMachine *createWorkflowMachine(bool isOnce, const ContextLogger &logctx) const = 0;
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const = 0;
     virtual char *getEnv(const char *name, const char *defaultValue) const = 0;
     virtual char *getEnv(const char *name, const char *defaultValue) const = 0;
     virtual unsigned getPriority() const = 0;
     virtual unsigned getPriority() const = 0;
     virtual unsigned getWarnTimeLimit() const = 0;
     virtual unsigned getWarnTimeLimit() const = 0;

+ 2 - 1
roxie/ccd/ccdstate.cpp

@@ -38,6 +38,7 @@
 #include "dautils.hpp"
 #include "dautils.hpp"
 
 
 #include "pkgimpl.hpp"
 #include "pkgimpl.hpp"
+#include "roxiehelper.hpp"
 
 
 //-------------------------------------------------------------------------------------------
 //-------------------------------------------------------------------------------------------
 // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie. 
 // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie. 
@@ -401,7 +402,7 @@ protected:
                 {
                 {
                     if (subFileName.charAt(0)=='~')
                     if (subFileName.charAt(0)=='~')
                     {
                     {
-                        // implies that a package file had ~ in subfile names - shouldn;t really, but we allow it (and just strip the ~
+                        // implies that a package file had ~ in subfile names - shouldn't really, but we allow it (and just strip the ~
                         subFileName.remove(0,1);
                         subFileName.remove(0,1);
                     }
                     }
                     if (traceLevel > 9)
                     if (traceLevel > 9)