浏览代码

HPCC-10106 Persist should ignore other persists when determining rebuild

Previous fix was incomplete, and would fail in multipler persist cases, due to
global state issues in eclagent.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父节点
当前提交
b334ef94f8

+ 34 - 39
ecl/eclagent/eclagent.cpp

@@ -2134,8 +2134,8 @@ void EclAgentWorkflowMachine::prelockPersists()
     ForEachItemIn(idx, names)
     {
         char const * name = names.item(idx);
-        agent.startPersist(name);
-        agent.cachePersist(name);
+        persistCache.setValue(name, agent.startPersist(name));
+        LOG(MCrunlock, unknownJob, "Cached persist read lock for %s", name);
     }
     persistsPrelocked = true;
 }
@@ -2237,39 +2237,44 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
     if (!agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
         doExecuteItemDependencies(item, wfid);
     SCMStringBuffer name;
-    item.getPersistName(name);
+    const char *logicalName = item.getPersistName(name).str();
+    Owned<IRemoteConnection> persistLock;  // MORE - pass it to isPersistUptoDate (which may change it)
     if(persistsPrelocked)
-        agent.decachePersist(name.str());
+    {
+        persistLock.setown(persistCache.getValue(logicalName));
+        persistCache.setValue(logicalName, NULL);
+        LOG(MCrunlock, unknownJob, "Decached persist read lock for %s", logicalName);
+    }
     else
-        agent.startPersist(name.str());
-    doExecuteItemDependency(item, item.queryPersistWfid(), wfid, true);
+        persistLock.setown(agent.startPersist(logicalName));
+    doExecuteItemDependency(item, item.queryPersistWfid(), wfid, true);  // generated code should end up calling back to returnPersistVersion, which sets persist
     if(!persist)
     {
         StringBuffer errmsg;
         errmsg.append("Internal error in generated code: for wfid ").append(wfid).append(", persist CRC wfid ").append(item.queryPersistWfid()).append(" did not call returnPersistVersion");
         throw MakeStringExceptionDirect(0, errmsg.str());
     }
-    if(strcmp(name.str(), persist->logicalName.get()) != 0)
+    Owned<PersistVersion> thisPersist = persist.getClear();
+    if(strcmp(logicalName, thisPersist->logicalName.get()) != 0)
     {
         StringBuffer errmsg;
-        errmsg.append("Failed workflow/persist consistency check: wfid ").append(wfid).append(", WU persist name ").append(name.str()).append(", runtime persist name ").append(persist->logicalName.get());
+        errmsg.append("Failed workflow/persist consistency check: wfid ").append(wfid).append(", WU persist name ").append(logicalName).append(", runtime persist name ").append(thisPersist->logicalName.get());
         throw MakeStringExceptionDirect(0, errmsg.str());
     }
     if(agent.arePersistsFrozen())
     {
-        agent.checkPersistMatches(name.str(), persist->eclCRC);
+        agent.checkPersistMatches(logicalName, thisPersist->eclCRC);
     }
-    else if(!agent.isPersistUptoDate(name.str(), persist->eclCRC, persist->allCRC, persist->isFile))
+    else if(!agent.isPersistUptoDate(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC, thisPersist->isFile))
     {
-        agent.clearPersist(name.str());
+        agent.clearPersist(logicalName);
         // New persist model allows dependencies to be executed AFTER checking if the persist is up to date
         if (agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
             doExecuteItemDependencies(item, wfid);
         doExecuteItem(item, wfid);
-        agent.updatePersist(name.str(), persist->eclCRC, persist->allCRC);
+        agent.updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
     }
-    persist.clear();
-    agent.finishPersist();
+    agent.finishPersist(persistLock.getClear());
 }
 
 //----------------------------------------------------------------
@@ -2484,7 +2489,7 @@ bool EclAgent::checkPersistUptoDate(const char * logicalName, unsigned eclCRC, u
     return false;
 }
 
-bool EclAgent::changePersistLockMode(unsigned mode, const char * name, bool repeat)
+bool EclAgent::changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat)
 {
     LOG(MCrunlock, unknownJob, "Waiting to change persist lock to %s for %s", (mode == RTM_LOCK_WRITE) ? "write" : "read", name);
     loop
@@ -2510,7 +2515,7 @@ bool EclAgent::changePersistLockMode(unsigned mode, const char * name, bool repe
     }
 }
 
-void EclAgent::getPersistReadLock(const char * logicalName)
+IRemoteConnection *EclAgent::getPersistReadLock(const char * logicalName)
 {
     StringBuffer lfn;
     expandLogicalName(lfn, logicalName);
@@ -2527,6 +2532,7 @@ void EclAgent::getPersistReadLock(const char * logicalName)
         xpath.append(isalnum(*cur) ? *cur : '_');
 
     LOG(MCrunlock, unknownJob, "Waiting for persist read lock for %s", name);
+    Owned<IRemoteConnection> persistLock;
     loop
     {
         try
@@ -2548,6 +2554,7 @@ void EclAgent::getPersistReadLock(const char * logicalName)
     }
 
     reportProgress("Obtained persist read lock");
+    return persistLock.getClear();
 }
 
 void EclAgent::setBlockedOnPersist(const char * logicalName)
@@ -2559,7 +2566,7 @@ void EclAgent::setBlockedOnPersist(const char * logicalName)
     w->setStateEx(s.str());
 }
 
-bool EclAgent::isPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
+bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
 {
     //Loop trying to get a write lock - if it fails, then release the read lock, otherwise
     //you can get a deadlock with several things waiting to read, and none being able to write.
@@ -2577,13 +2584,13 @@ bool EclAgent::isPersistUptoDate(const char * logicalName, unsigned eclCRC, unsi
         //Get a write lock
         setBlockedOnPersist(logicalName);
         unlockWorkUnit();
-        if (changePersistLockMode(RTM_LOCK_WRITE, logicalName, false))
+        if (changePersistLockMode(persistLock, RTM_LOCK_WRITE, logicalName, false))
             break;
 
         //failed to get a write lock, so release our read lock
         persistLock.clear();
         MilliSleep(getRandom()%2000);
-        getPersistReadLock(logicalName);
+        persistLock.setown(getPersistReadLock(logicalName));
     }
     setRunning();
 
@@ -2594,7 +2601,7 @@ bool EclAgent::isPersistUptoDate(const char * logicalName, unsigned eclCRC, unsi
         StringBuffer msg;
         msg.append("PERSIST('").append(logicalName).append("') is up to date (after being calculated by another job)");
         logException(ExceptionSeverityInformation, 0, msg.str(), false);
-        changePersistLockMode(RTM_LOCK_READ, logicalName, true);
+        changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
         return true;
     }
     if (errText.length())
@@ -2614,7 +2621,7 @@ void EclAgent::clearPersist(const char * logicalName)
     LOG(MCrunlock, unknownJob, "Recalculate persistent value %s", logicalName);
 }
 
-void EclAgent::updatePersist(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
+void EclAgent::updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
 {
     StringBuffer lfn, crcName, eclName;
     expandLogicalName(lfn, logicalName);
@@ -2625,34 +2632,22 @@ void EclAgent::updatePersist(const char * logicalName, unsigned eclCRC, unsigned
     setResultInt(eclName,(unsigned)-2,eclCRC);
 
     reportProgress("Convert persist write lock to read lock");
-    changePersistLockMode(RTM_LOCK_READ, logicalName, true);
+    changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);
 }
 
-void EclAgent::startPersist(const char * logicalName)
+IRemoteConnection *EclAgent::startPersist(const char * logicalName)
 {
     setBlockedOnPersist(logicalName);
     unlockWorkUnit();
-    getPersistReadLock(logicalName);
+    IRemoteConnection *persistLock = getPersistReadLock(logicalName);
     setRunning();
+    return persistLock;
 }
 
-void EclAgent::cachePersist(const char * logicalName)
-{
-    persistCache.setValue(logicalName, persistLock.getClear());
-    LOG(MCrunlock, unknownJob, "Cached persist read lock for %s", logicalName);
-}
-
-void EclAgent::decachePersist(const char * logicalName)
-{
-    persistLock.setown(persistCache.getValue(logicalName));
-    persistCache.setValue(logicalName, NULL);
-    LOG(MCrunlock, unknownJob, "Decached persist read lock for %s", logicalName);
-}
-
-void EclAgent::finishPersist()
+void EclAgent::finishPersist(IRemoteConnection *persistLock)
 {
     LOG(MCrunlock, unknownJob, "Finished persists - add to read lock list");
-    persistReadLocks.append(*persistLock.getClear());
+    persistReadLocks.append(*persistLock);
 }
 
 void EclAgent::checkPersistMatches(const char * logicalName, unsigned eclCRC)

+ 13 - 16
ecl/eclagent/eclagent.ipp

@@ -110,13 +110,13 @@ public:
     {
         ctx->restoreCluster();
     }
-    virtual void startPersist(const char * name)
+    virtual IRemoteConnection *startPersist(const char * name)
     {
-        ctx->startPersist(name);
+        return ctx->startPersist(name);
     }
-    virtual void finishPersist()
+    virtual void finishPersist(IRemoteConnection *persistLock)
     {
-        ctx->finishPersist();
+        ctx->finishPersist(persistLock);
     }
     virtual bool queryResolveFilesLocally()
     {
@@ -150,9 +150,9 @@ public:
     {
         ctx->clearPersist(logicalName);
     }
-    virtual void updatePersist(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
+    virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
     {
-        ctx->updatePersist(logicalName, eclCRC, allCRC);
+        ctx->updatePersist(persistLock, logicalName, eclCRC, allCRC);
     }
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC)
     {
@@ -303,6 +303,7 @@ private:
     Owned<IWorkflowScheduleConnection> wfconn;
     Owned<PersistVersion> persist;
     bool persistsPrelocked;
+    MapStringToMyClass<IRemoteConnection> persistCache;
 };
 
 class EclAgentQueryLibrary : public CInterface
@@ -369,8 +370,6 @@ private:
     CriticalSection wusect;
     StringArray tempFiles;
     CriticalSection tfsect;
-    Owned<IRemoteConnection> persistLock;
-    MapStringToMyClass<IRemoteConnection> persistCache;
     Array persistReadLocks;
 
     Owned<ILoadedDllEntry> dll;
@@ -401,10 +400,10 @@ private:
 
     void processXmlParams(const IPropertyTree *params);
     bool checkPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer & errText);
-    bool isPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile);
-    bool changePersistLockMode(unsigned mode, const char * name, bool repeat);
+    bool isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile);
+    bool changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat);
     bool expandLogicalName(StringBuffer & fullname, const char * logicalName);
-    void getPersistReadLock(const char * logicalName);
+    IRemoteConnection *getPersistReadLock(const char * logicalName);
     void doSimpleResult(type_t type, int size, char * buffer, int sequence);
     IWUResult *updateResult(const char *name, unsigned sequence);
     IConstWUResult *getResult(const char *name, unsigned sequence);
@@ -505,12 +504,10 @@ public:
     virtual IUserDescriptor *queryUserDescriptor();
     virtual void selectCluster(const char * cluster);
     virtual void restoreCluster();
-    virtual void startPersist(const char * name);
-    virtual void cachePersist(const char * name);
-    virtual void decachePersist(const char * name);
-    virtual void finishPersist();
+    virtual IRemoteConnection *startPersist(const char * name);
+    virtual void finishPersist(IRemoteConnection *persistLock);
     virtual void clearPersist(const char * logicalName);
-    virtual void updatePersist(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC);
+    virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC);
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC);
     virtual bool queryResolveFilesLocally() { return resolveFilesLocally; }
     virtual bool queryRemoteWorkunit() { return isRemoteWorkunit; }

+ 0 - 10
ecl/hqlcpp/hqlcatom.cpp

@@ -126,9 +126,7 @@ IIdAtom * bool2VStrId;
 IIdAtom * bool2VStrXId;
 IIdAtom * castIntId[9][2];
 IIdAtom * checkFieldOverflowId;
-IIdAtom * checkPersistMatchesId;
 IIdAtom * checkRowOverflowId;
-IIdAtom * clearPersistId;
 IIdAtom * clibExpId;
 IIdAtom * cloneVStringId;
 IIdAtom * cloneVStringXId;
@@ -326,7 +324,6 @@ IIdAtom * failDivideByZeroId;
 IIdAtom * _failId;
 IIdAtom * fileExistsId;
 IIdAtom * finalizeRowClearId;
-IIdAtom * finishPersistId;
 IIdAtom * freeId;
 IIdAtom * freeExceptionId;
 IIdAtom * getBytesFromBuilderId;
@@ -607,7 +604,6 @@ IIdAtom * set2SetXId;
 IIdAtom * sinId;
 IIdAtom * sinhId;
 IIdAtom * sqrtId;
-IIdAtom * startPersistId;
 IIdAtom * str2DataId;
 IIdAtom * str2DataXId;
 IIdAtom * strToQStrId;
@@ -674,7 +670,6 @@ IIdAtom * unicodeStrcpyId;
 IIdAtom * unicodeStrlenId;
 IIdAtom * unicodeSubStrFXId;
 IIdAtom * unicodeSubStrFTXId;
-IIdAtom * updatePersistId;
 IIdAtom * utf82CodepageId;
 IIdAtom * utf82CodepageXId;
 IIdAtom * utf82DataId;
@@ -774,9 +769,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     castIntId[6][false] = createIdAtom("castUInt6");
     castIntId[7][false] = createIdAtom("castUInt7");
     MAKEID(checkFieldOverflow);
-    MAKEID(checkPersistMatches);
     MAKEID(checkRowOverflow);
-    MAKEID(clearPersist);
     MAKEID(cloneVString);
     MAKEID(cloneVStringX);
     MAKEID(codeGenerator);
@@ -972,7 +965,6 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEID(failDivideByZero);
     MAKEID(fileExists);
     MAKEID(finalizeRowClear);
-    MAKEID(finishPersist);
     MAKEID(free);
     MAKEID(freeException);
     MAKEID(getBytesFromBuilder);
@@ -1281,7 +1273,6 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     sinId = createIdAtom("_sin");
     sinhId = createIdAtom("_sinh");
     sqrtId = createIdAtom("_sqrt");
-    MAKEID(startPersist);
     MAKEID(str2Data);
     MAKEID(str2DataX);
     MAKEID(strToQStr);
@@ -1349,7 +1340,6 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEID(unicodeStrlen);
     MAKEID(unicodeSubStrFX);
     MAKEID(unicodeSubStrFTX);
-    MAKEID(updatePersist);
     MAKEID(utf82Codepage);
     MAKEID(utf82CodepageX);
     MAKEID(utf82Data);

+ 0 - 5
ecl/hqlcpp/hqlcatom.hpp

@@ -126,9 +126,7 @@ extern IIdAtom * bool2VStrId;
 extern IIdAtom * bool2VStrXId;
 extern IIdAtom * castIntId[9][2];
 extern IIdAtom * checkFieldOverflowId;
-extern IIdAtom * checkPersistMatchesId;
 extern IIdAtom * checkRowOverflowId;
-extern IIdAtom * clearPersistId;
 extern IIdAtom * clibExpId;
 extern IIdAtom * cloneVStringId;
 extern IIdAtom * cloneVStringXId;
@@ -326,7 +324,6 @@ extern IIdAtom * failDivideByZeroId;
 extern IIdAtom * _failId;
 extern IIdAtom * fileExistsId;
 extern IIdAtom * finalizeRowClearId;
-extern IIdAtom * finishPersistId;
 extern IIdAtom * freeId;
 extern IIdAtom * freeExceptionId;
 extern IIdAtom * getBytesFromBuilderId;
@@ -607,7 +604,6 @@ extern IIdAtom * set2SetXId;
 extern IIdAtom * sinId;
 extern IIdAtom * sinhId;
 extern IIdAtom * sqrtId;
-extern IIdAtom * startPersistId;
 extern IIdAtom * str2DataId;
 extern IIdAtom * str2DataXId;
 extern IIdAtom * strToQStrId;
@@ -674,7 +670,6 @@ extern IIdAtom * unicodeStrcpyId;
 extern IIdAtom * unicodeStrlenId;
 extern IIdAtom * unicodeSubStrFXId;
 extern IIdAtom * unicodeSubStrFTXId;
-extern IIdAtom * updatePersistId;
 extern IIdAtom * utf82CodepageId;
 extern IIdAtom * utf82CodepageXId;
 extern IIdAtom * utf82DataId;

+ 0 - 6
ecl/hqlcpp/hqlcppsys.ecl

@@ -613,12 +613,6 @@ const char * cppSystemText[]  = {
 
     "   selectCluster(const varstring src)  : gctxmethod,entrypoint='selectCluster';",
     "   restoreCluster()    : gctxmethod,entrypoint='restoreCluster';",
-    "   startPersist(const varstring src)   : gctxmethod,entrypoint='startPersist';",
-    "   finishPersist() : gctxmethod,entrypoint='finishPersist';",
-
-    "   clearPersist(const varstring name)  : gctxmethod,entrypoint='clearPersist';",
-    "   updatePersist(const varstring name, unsigned4 eclCRC, unsigned8 allCRC) : gctxmethod,entrypoint='updatePersist';",
-    "   checkPersistMatches(const varstring name, unsigned4 eclCRC) : gctxmethod,entrypoint='checkPersistMatches';",
 
     "   integer4 compareUtf8Utf8(const utf8 l, const utf8 r, const varstring codepage) : eclrtl,pure,library='eclrtl',entrypoint='rtlCompareUtf8Utf8';",
     "   integer4 compareUtf8Utf8Strength(const utf8 l, const utf8 r, const varstring loc, unsigned4 str) : eclrtl,pure,library='eclrtl',entrypoint='rtlCompareUtf8Utf8Strength';",

+ 3 - 3
roxie/ccd/ccdcontext.cpp

@@ -2871,10 +2871,10 @@ public:
 
     // persist-related code - usage of persist should have been caught and rejected at codegen time
     virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
-    virtual void startPersist(const char * name) { throwUnexpected(); }
-    virtual void finishPersist() { throwUnexpected(); }
+    virtual IRemoteConnection *startPersist(const char * name) { throwUnexpected(); }
+    virtual void finishPersist(IRemoteConnection *) { throwUnexpected(); }
     virtual void clearPersist(const char * logicalName) { throwUnexpected(); }
-    virtual void updatePersist(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) { throwUnexpected(); }
+    virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) { throwUnexpected(); }
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC) { throwUnexpected(); }
     virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }
     virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile) { throwUnexpected(); }

+ 7 - 3
rtl/include/eclhelper.hpp

@@ -2713,6 +2713,8 @@ struct IHThorDictionaryResultWriteArg : public IHThorArg
 
 //------------------------- Other stuff -------------------------
 
+struct IRemoteConnection;
+
 struct IGlobalCodeContext
 {
     virtual ICodeContext * queryCodeContext() = 0;
@@ -2729,10 +2731,12 @@ struct IGlobalCodeContext
 
     virtual void selectCluster(const char * cluster) = 0;
     virtual void restoreCluster() = 0;
-    virtual void startPersist(const char * name) = 0;
-    virtual void finishPersist() = 0;
+
+    // These next 5 are not used from generated code, and should be remove in 5.0
+    virtual IRemoteConnection *startPersist(const char * name) = 0;
+    virtual void finishPersist(IRemoteConnection *) = 0;
     virtual void clearPersist(const char * logicalName) = 0;
-    virtual void updatePersist(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) = 0;
+    virtual void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC) = 0;
     virtual void checkPersistMatches(const char * logicalName, unsigned eclCRC) = 0;
 
     virtual void setWorkflowCondition(bool value) = 0;