|
@@ -2245,6 +2245,9 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
|
|
|
doExecuteItemDependencies(item, wfid);
|
|
|
SCMStringBuffer name;
|
|
|
const char *logicalName = item.getPersistName(name).str();
|
|
|
+ int maxPersistCopies = item.queryPersistCopies();
|
|
|
+ if (maxPersistCopies < 0)
|
|
|
+ maxPersistCopies = DEFAULT_PERSIST_COPIES;
|
|
|
Owned<IRemoteConnection> persistLock;
|
|
|
if(persistsPrelocked)
|
|
|
{
|
|
@@ -2278,6 +2281,8 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
|
|
|
// New persist model allows dependencies to be executed AFTER checking if the persist is up to date
|
|
|
if (agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
|
|
|
doExecuteItemDependencies(item, wfid);
|
|
|
+ if (maxPersistCopies > 0)
|
|
|
+ agent.deleteLRUPersists(logicalName, maxPersistCopies-1);
|
|
|
doExecuteItem(item, wfid);
|
|
|
agent.updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
|
|
|
}
|
|
@@ -2674,6 +2679,90 @@ void EclAgent::checkPersistMatches(const char * logicalName, unsigned eclCRC)
|
|
|
logException(ExceptionSeverityInformation, 0, msg.str(), false);
|
|
|
}
|
|
|
|
|
|
+static int comparePersistAccess(IInterface **_a, IInterface **_b)
|
|
|
+{
|
|
|
+ IPropertyTree *a = *(IPropertyTree **)_a;
|
|
|
+ IPropertyTree *b = *(IPropertyTree **)_b;
|
|
|
+ const char *accessedA = a->queryProp("@accessed");
|
|
|
+ const char *accessedB = b->queryProp("@accessed");
|
|
|
+ if (accessedA && accessedB)
|
|
|
+ return strcmp(accessedB, accessedA);
|
|
|
+ else if (accessedB)
|
|
|
+ return -1;
|
|
|
+ else if (accessedA)
|
|
|
+ return 1;
|
|
|
+ else
|
|
|
+ return 0;
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+void EclAgent::deleteLRUPersists(const char * logicalName, int keep)
|
|
|
+{
|
|
|
+ StringBuffer lfn;
|
|
|
+ expandLogicalName(lfn, logicalName);
|
|
|
+ logicalName = lfn.str();
|
|
|
+ const char *tail = strrchr(logicalName, '_'); // Locate the trailing double-underbar
|
|
|
+ assertex(tail);
|
|
|
+ StringBuffer head(tail-logicalName+1, logicalName);
|
|
|
+ head.append("p*"); // Multi-mode persist names end with __pNNNNNNN
|
|
|
+ loop // Until we manage to delete without things changing beneath us...
|
|
|
+ {
|
|
|
+ IArrayOf<IPropertyTree> persists;
|
|
|
+ Owned<IDFAttributesIterator> iter = queryDistributedFileDirectory().getDFAttributesIterator(head,queryUserDescriptor(),false,false,NULL);
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ IPropertyTree &pt = iter->query();
|
|
|
+ const char *name = pt.queryProp("@name");
|
|
|
+ if (stricmp(name, logicalName) == 0) // Don't include the one we are intending to recreate in the LRU list (keep value does not include it)
|
|
|
+ continue;
|
|
|
+ if (pt.getPropBool("@persistent", false))
|
|
|
+ {
|
|
|
+ // Paranoia - check as far as we can that it really is another instance of this persist
|
|
|
+ tail = strrchr(name, '_'); // Locate the trailing double-underbar
|
|
|
+ assertex(tail);
|
|
|
+ tail++;
|
|
|
+ bool crcSuffix = (*tail++=='p');
|
|
|
+ while (crcSuffix && *tail)
|
|
|
+ {
|
|
|
+ if (!isdigit(*tail))
|
|
|
+ crcSuffix = false;
|
|
|
+ tail++;
|
|
|
+ }
|
|
|
+ if (crcSuffix)
|
|
|
+ persists.append(*LINK(&pt));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (persists.ordinality() > keep)
|
|
|
+ {
|
|
|
+ persists.sort(comparePersistAccess);
|
|
|
+ while (persists.ordinality() > keep)
|
|
|
+ {
|
|
|
+ Owned<IPropertyTree> oldest = &persists.popGet();
|
|
|
+ const char *oldAccessTime = oldest->queryProp("@accessed");
|
|
|
+ VStringBuffer goer("~%s", oldest->queryProp("@name")); // Make sure we don't keep adding the scope
|
|
|
+ Owned<IRemoteConnection> persistLock = getPersistReadLock(goer);
|
|
|
+ while (!changePersistLockMode(persistLock, RTM_LOCK_WRITE, goer, false))
|
|
|
+ {
|
|
|
+ persistLock.clear();
|
|
|
+ MilliSleep(getRandom()%2000);
|
|
|
+ persistLock.setown(getPersistReadLock(goer));
|
|
|
+ }
|
|
|
+ Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(goer, queryUserDescriptor(), true);
|
|
|
+ if (!f)
|
|
|
+ continue; // Persist has been deleted since last checked - repeat the whole process
|
|
|
+ const char *newAccessTime = f->queryAttributes().queryProp("@accessed");
|
|
|
+ if (oldAccessTime && newAccessTime && !streq(oldAccessTime, newAccessTime))
|
|
|
+ continue; // Persist has been accessed since last checked - repeat the whole process
|
|
|
+ else if (newAccessTime && !oldAccessTime)
|
|
|
+ continue; // Persist has been accessed since last checked - repeat the whole process
|
|
|
+ DBGLOG("Deleting LRU persist %s (last accessed at %s)", goer.str(), oldAccessTime);
|
|
|
+ f->detach();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//---------------------------------------------------------------------------
|
|
|
|
|
|
char *EclAgent::getWuid()
|