瀏覽代碼

Merge branch 'candidate-4.2.8' into closedown-4.2.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父節點
當前提交
b3395bc909

+ 39 - 0
common/workunit/workflow.cpp

@@ -882,6 +882,45 @@ void WorkflowMachine::doExecuteEndWaitItem(IRuntimeWorkflowItem & item)
 }
 
 
+bool WorkflowMachine::isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item)
+{
+    time_t thisTime;
+    if (!getPersistTime(thisTime, item))
+        return false;  // if no time must be older than the persist
+    return when < thisTime;
+}
+
+bool WorkflowMachine::isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item)
+{
+    Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
+    ForEach(*iter)
+    {
+        unsigned cur = iter->query();
+
+        IRuntimeWorkflowItem & other = workflow->queryWfid(cur);
+        if (isPersist(other))
+        {
+            if (isOlderThanPersist(when, other))
+                return true;
+        }
+        else
+        {
+            if (isOlderThanInputPersists(when, other))
+                return true;
+        }
+    }
+    return false;
+}
+
+bool WorkflowMachine::isItemOlderThanInputPersists(IRuntimeWorkflowItem & item)
+{
+    time_t curWhen;
+    if (!getPersistTime(curWhen, item))
+        return false; // if no time then old and can't tell
+
+    return isOlderThanInputPersists(curWhen, item);
+}
+
 void WorkflowMachine::performItem(unsigned wfid, unsigned scheduledWfid)
 {
 #ifdef TRACE_WORKFLOW

+ 6 - 0
common/workunit/workflow.hpp

@@ -76,6 +76,7 @@ public:
     const char * queryEventExtra() const;
     bool hasItemsWaiting() const { return (itemsWaiting > 0); }
     void setCondition(bool value) { condition = value; }
+    bool isItemOlderThanInputPersists(IRuntimeWorkflowItem & item);
 
 protected:
     // Machine specific prologue/epilogue
@@ -90,6 +91,7 @@ protected:
     virtual void checkForAbort(unsigned wfid, IException * handling) = 0;
     // Persistence styles varies from machine to machine
     virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) = 0;
+    virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item) = 0;
 
     // Check conditions, item type and call operations below based on type
     bool executeItem(unsigned wfid, unsigned scheduledWfid);
@@ -108,6 +110,10 @@ protected:
     // Unblock the scheduled workflow item, which should mean execution continues.
     void doExecuteEndWaitItem(IRuntimeWorkflowItem & item);
 
+    //Used for checking if a persist is older than its inputs
+    bool isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item);
+    bool isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item);
+
     bool attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid);
     void handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep);
 

+ 26 - 0
common/workunit/workunit.cpp

@@ -1895,6 +1895,7 @@ mapEnums querySortFields[] =
    { WUQSFpriority, "@priority" },
    { WUQSFpriorityHi, "@priority" },
    { WUQSFQuerySet, "../@id" },
+   { WUQSFLibrary, "Library"},
    { WUQSFterm, NULL }
 };
 
@@ -9734,7 +9735,30 @@ extern WORKUNIT_API IPropertyTree * getQueryRegistryRoot()
             return NULL;
 }
 
+extern WORKUNIT_API void checkAddLibrariesToQueryEntry(IPropertyTree *queryTree, IConstWULibraryIterator *libraries)
+{
+    if (!queryTree || !libraries)
+        return;
+    if (queryTree->hasProp("@libCount")) //already added
+        return;
+    unsigned libCount=0;
+    ForEach(*libraries)
+    {
+        IConstWULibrary &library = libraries->query();
+        SCMStringBuffer libname;
+        if (!library.getName(libname).length())
+            continue;
+        queryTree->addProp("Library", libname.str());
+        libCount++;
+    }
+    queryTree->setPropInt("@libCount", libCount);
+}
 
+extern WORKUNIT_API void checkAddLibrariesToQueryEntry(IPropertyTree *queryTree, IConstWorkUnit *cw)
+{
+    Owned<IConstWULibraryIterator> libraries = &cw->getLibraries();
+    checkAddLibrariesToQueryEntry(queryTree, libraries);
+}
 
 extern WORKUNIT_API IPropertyTree * getQueryRegistry(const char * wsEclId, bool readonly)
 {
@@ -9853,6 +9877,8 @@ void addQueryToQuerySet(IWorkUnit *workunit, IPropertyTree *queryRegistry, const
     }
 
     IPropertyTree *newEntry = addNamedQuery(queryRegistry, cleanQueryName, wuid.str(), dllName.str(), isLibrary(workunit), userid, snapshot.str());
+    Owned<IConstWULibraryIterator> libraries = &workunit->getLibraries();
+    checkAddLibrariesToQueryEntry(newEntry, libraries);
     newQueryId.append(newEntry->queryProp("@id"));
     workunit->setIsQueryService(true); //will check querysets before delete
     workunit->commit();

+ 5 - 1
common/workunit/workunit.hpp

@@ -640,7 +640,7 @@ interface IConstWorkflowItem : extends IInterface
     virtual unsigned queryScheduledWfid() const = 0;
     virtual IStringVal & queryCluster(IStringVal & val) const = 0;
 };
-
+inline bool isPersist(const IConstWorkflowItem & item) { return item.queryMode() == WFModePersist; }
 
 interface IRuntimeWorkflowItem : extends IConstWorkflowItem
 {
@@ -1104,6 +1104,7 @@ enum WUQuerySortField
     WUQSFpriority = 11,
     WUQSFpriorityHi = 12,
     WUQSFQuerySet = 13,
+    WUQSFLibrary = 14,
     WUQSFterm = 0,
     WUQSFreverse = 256,
     WUQSFnocase = 512,
@@ -1255,6 +1256,9 @@ extern WORKUNIT_API IPropertyTree * resolveQueryAlias(const char *queryset, cons
 extern WORKUNIT_API IPropertyTree * getQueryRegistry(const char * wsEclId, bool readonly);
 extern WORKUNIT_API IPropertyTree * getQueryRegistryRoot();
 
+extern WORKUNIT_API void checkAddLibrariesToQueryEntry(IPropertyTree *queryTree, IConstWULibraryIterator *libraries);
+extern WORKUNIT_API void checkAddLibrariesToQueryEntry(IPropertyTree *queryTree, IConstWorkUnit *cw);
+
 extern WORKUNIT_API void setQueryCommentForNamedQuery(IPropertyTree * queryRegistry, const char *id, const char *queryComment);
 
 extern WORKUNIT_API void setQuerySuspendedState(IPropertyTree * queryRegistry, const char * name, bool suspend, const char *userid);

+ 27 - 8
ecl/eclagent/eclagent.cpp

@@ -2228,6 +2228,20 @@ void EclAgentWorkflowMachine::checkForAbort(unsigned wfid, IException * handling
     }
 }
 
+bool EclAgentWorkflowMachine::getPersistTime(time_t & when, IRuntimeWorkflowItem & item)
+{
+    SCMStringBuffer name;
+    const char *logicalName = item.getPersistName(name).str();
+    StringBuffer whenName;
+    agent.expandLogicalName(whenName, logicalName);
+    whenName.append("$when");
+    if (!agent.isResult(whenName, ResultSequencePersist))
+        return false;
+
+    when = agent.getResultInt(whenName, ResultSequencePersist);
+    return true;
+}
+
 void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
 {
     if (agent.isStandAloneExe)
@@ -2271,7 +2285,7 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
     {
         agent.checkPersistMatches(logicalName, thisPersist->eclCRC);
     }
-    else if(!agent.isPersistUptoDate(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC, thisPersist->isFile))
+    else if(!agent.isPersistUptoDate(persistLock, item, logicalName, thisPersist->eclCRC, thisPersist->allCRC, thisPersist->isFile))
     {
         // We used to call agent.clearPersist(logicalName) here - but that means if the persist rebuild fails, we forget WHY we wanted to.
         // New persist model allows dependencies to be executed AFTER checking if the persist is up to date
@@ -2469,7 +2483,7 @@ unsigned __int64 EclAgent::getDatasetHash(const char * logicalName, unsigned __i
 
 //---------------------------------------------------------------------------
 
-bool EclAgent::checkPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer &errText)
+bool EclAgent::checkPersistUptoDate(IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer &errText)
 {
     StringBuffer lfn, crcName, eclName;
     expandLogicalName(lfn, logicalName);
@@ -2490,6 +2504,8 @@ bool EclAgent::checkPersistUptoDate(const char * logicalName, unsigned eclCRC, u
             errText.appendf("Rebuilding PERSIST('%s'): ECL has changed", logicalName);
         else if (savedCRC != allCRC)
             errText.appendf("Rebuilding PERSIST('%s'): Input files have changed", logicalName);
+        else if (workflow->isItemOlderThanInputPersists(item))
+            errText.appendf("Rebuilding PERSIST('%s'): Input persists are more recent", logicalName);
         else
             return true;
     }
@@ -2497,6 +2513,7 @@ bool EclAgent::checkPersistUptoDate(const char * logicalName, unsigned eclCRC, u
     return false;
 }
 
+
 bool EclAgent::changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat)
 {
     LOG(MCrunlock, unknownJob, "Waiting to change persist lock to %s for %s", (mode == RTM_LOCK_WRITE) ? "write" : "read", name);
@@ -2580,7 +2597,7 @@ void EclAgent::setBlockedOnPersist(const char * logicalName)
     w->setStateEx(s.str());
 }
 
-bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
+bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile)
 {
     //Loop trying to get a write lock - if it fails, then release the read lock, otherwise
     //you can get a deadlock with several things waiting to read, and none being able to write.
@@ -2588,7 +2605,7 @@ bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const ch
     loop
     {
         StringBuffer dummy;
-        if (checkPersistUptoDate(logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
+        if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, dummy) && !rebuildAllPersists)
         {
             StringBuffer msg;
             msg.append("PERSIST('").append(logicalName).append("') is up to date");
@@ -2611,7 +2628,7 @@ bool EclAgent::isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const ch
 
     //Check again whether up to date, someone else might have updated it!
     StringBuffer errText;
-    if (checkPersistUptoDate(logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
+    if (checkPersistUptoDate(item, logicalName, eclCRC, allCRC, isFile, errText) && !rebuildAllPersists)
     {
         StringBuffer msg;
         msg.append("PERSIST('").append(logicalName).append("') is up to date (after being calculated by another job)");
@@ -2638,13 +2655,15 @@ void EclAgent::clearPersist(const char * logicalName)
 
 void EclAgent::updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC)
 {
-    StringBuffer lfn, crcName, eclName;
+    StringBuffer lfn, crcName, eclName, whenName;
     expandLogicalName(lfn, logicalName);
     crcName.append(lfn).append("$crc");
     eclName.append(lfn).append("$eclcrc");
+    whenName.append(lfn).append("$when");
 
-    setResultInt(crcName,(unsigned)-2,allCRC);
-    setResultInt(eclName,(unsigned)-2,eclCRC);
+    setResultInt(crcName,ResultSequencePersist,allCRC);
+    setResultInt(eclName,ResultSequencePersist,eclCRC);
+    setResultInt(whenName,ResultSequencePersist,time(NULL));
 
     reportProgress("Convert persist write lock to read lock");
     changePersistLockMode(persistLock, RTM_LOCK_READ, logicalName, true);

+ 3 - 2
ecl/eclagent/eclagent.ipp

@@ -293,6 +293,7 @@ protected:
     virtual void reportContingencyFailure(char const * type, IException * e);
     virtual void checkForAbort(unsigned wfid, IException * handling);
     virtual void doExecutePersistItem(IRuntimeWorkflowItem & item);
+    virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item);
 
 private:
     void prelockPersists();
@@ -402,8 +403,8 @@ private:
     void deleteTempFiles();
 
     void processXmlParams(const IPropertyTree *params);
-    bool checkPersistUptoDate(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer & errText);
-    bool isPersistUptoDate(Owned<IRemoteConnection> &persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile);
+    bool checkPersistUptoDate(IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile, StringBuffer & errText);
+    bool isPersistUptoDate(Owned<IRemoteConnection> &persistLock, IRuntimeWorkflowItem & item, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile);
     bool changePersistLockMode(IRemoteConnection *persistLock, unsigned mode, const char * name, bool repeat);
     bool expandLogicalName(StringBuffer & fullname, const char * logicalName);
     IRemoteConnection *getPersistReadLock(const char * logicalName);

+ 1 - 0
esp/files/templates/QuerySetQueryWidget.html

@@ -21,6 +21,7 @@
                         	<div id="${id}FilterForm" style="width:460px" data-dojo-type="dijit.form.Form">
                                 <div data-dojo-props="cols:2" data-dojo-type="dojox.layout.TableContainer">
                         			<input id="${id}ClusterTargetSelect" title="Cluster:" name="QuerySetName" colspan="2" data-dojo-props="trim: true, placeHolder:'r?x*'" data-dojo-type="TargetSelectWidget" />
+                                    <input id="${id}LibraryName" title="Library&nbsp;Name:" name="LibraryName" colspan="2" data-dojo-props="trim: true" data-dojo-type="dijit.form.TextBox" />
                         			<input id="${id}Active" title="Active:" name="Active" colspan="2" data-dojo-props="trim: true, placeHolder:'jsmi*'" data-dojo-type="dijit.form.CheckBox" />
                         			<input id="${id}Suspended" title="Suspended:" name="Suspended" colspan="2" data-dojo-props="trim: true, placeHolder:'jsmi*'" data-dojo-type="dijit.form.CheckBox" />
                         		</div>

+ 1 - 0
esp/scm/ws_workunits.ecm

@@ -1230,6 +1230,7 @@ ESPrequest [nil_remove] WUListQueriesRequest
 {
     string  QuerySetName;
     string  ClusterName;
+    string  LibraryName;
     int64 MemoryLimitLow;
     int64 MemoryLimitHigh;
     nonNegativeInteger TimeLimitLow;

+ 7 - 2
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -707,7 +707,6 @@ bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo *cl
     }
 }
 
-
 bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
 {
     StringBuffer wuid = req.getWuid();
@@ -1218,6 +1217,7 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
     MemoryBuffer filterBuf;
     const char* clusterReq = req.getClusterName();
     addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQuerySetName(), WUQSFQuerySet);
+    addWUQSQueryFilter(filters, filterCount, filterBuf, req.getLibraryName(), (WUQuerySortField) (WUQSFLibrary | WUQSFnocase));
     if (!req.getMemoryLimitLow_isNull())
         addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitLow(), (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric));
     if (!req.getMemoryLimitHigh_isNull())
@@ -1862,6 +1862,12 @@ public:
                 if (!destQuery->hasProp(atname))
                     destQuery->setProp(atname, aiter->queryValue());
             }
+            Owned<IPropertyTreeIterator> children = query->getElements("*");
+            ForEach(*children)
+            {
+                IPropertyTree &child = children->query();
+                destQuery->addPropTree(child.queryName(), createPTreeFromIPT(&child));
+            }
             if (cloneFilesEnabled && wufiles)
                 wufiles->addFilesFromQuery(workunit, pm, newQueryId);
         }
@@ -1936,7 +1942,6 @@ public:
     StringArray copiedQueryIds;
 };
 
-
 bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
 {
     const char *source = req.getSource();

+ 38 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -47,6 +47,8 @@
 
 #define ESP_WORKUNIT_DIR "workunits/"
 
+#define SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
+
 class NewWsWorkunit : public Owned<IWorkUnit>
 {
 public:
@@ -662,6 +664,41 @@ bool doAction(IEspContext& context, StringArray& wuids, int action, IProperties*
     return bAllSuccess;
 }
 
+static void checkUpdateQuerysetLibraries()
+{
+    Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+    if (!globalLock)
+        return;
+
+    IPropertyTree *root = globalLock->queryRoot();
+    if (!root)
+        return;
+
+    Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+    Owned<IPropertyTreeIterator> querySets = root->getElements("QuerySet");
+    ForEach(*querySets)
+    {
+        IPropertyTree &querySet = querySets->query();
+        if (querySet.hasProp("@updatedLibraries")) //only need to do this once, then publish and copy will keep up to date
+            continue;
+        Owned<IPropertyTreeIterator> queries = querySet.getElements("Query");
+        ForEach(*queries)
+        {
+            IPropertyTree &query = queries->query();
+            if (query.hasProp("@libCount"))
+                continue;
+            const char *wuid = query.queryProp("@wuid");
+            if (!wuid || !*wuid)
+                continue;
+            Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
+            if (!cw)
+                continue;
+            checkAddLibrariesToQueryEntry(&query, cw);
+        }
+        querySet.setPropBool("@updatedLibraries", true);
+    }
+}
+
 MapStringTo<int> wuActionTable;
 
 void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *service)
@@ -675,6 +712,7 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 
     DBGLOG("Initializing %s service [process = %s]", service, process);
 
+    checkUpdateQuerysetLibraries();
     refreshValidClusters();
 
     daliServers.set(cfg->queryProp("Software/EspProcess/@daliServers"));

+ 2 - 0
roxie/ccd/ccdcontext.cpp

@@ -224,6 +224,8 @@ protected:
     virtual void reportContingencyFailure(char const * type, IException * e) {}
     virtual void checkForAbort(unsigned wfid, IException * handling) {}
     virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) { throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Persists not supported in roxie"); }
+    virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item) { return false; }
+
 private:
     IPropertyTree *workflowInfo;
     bool doOnce;