Browse Source

Merge pull request #2563 from jakesmith/gh-2476

Add methods for adding/retreiving logs to workunit

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
de9c0d3769

+ 56 - 0
common/workunit/workunit.cpp

@@ -622,6 +622,8 @@ public:
     virtual IStringVal & getXmlParams(IStringVal & params) const;
     virtual const IPropertyTree *getXmlParams() const;
     virtual unsigned __int64 getHash() const;
+    virtual IStringIterator *getLogs(const char *type, const char *component) const;
+    virtual IStringIterator *getProcesses(const char *type) const;
 
     virtual bool getWuDate(unsigned & year, unsigned & month, unsigned& day);
     virtual IStringVal & getSnapshot(IStringVal & str) const;
@@ -663,6 +665,7 @@ public:
     IWUException *createException();
     void setTimeStamp(const char *name, const char *instance, const char *event);
     void addTimeStamp(const char * name, const char * instance, const char *event);
+    void addProcess(const char *type, const char *instance, const char *log);
     void setAction(WUAction action);
     void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite);
     void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite);
@@ -1076,6 +1079,10 @@ public:
             { return c->getXmlParams(); }
     virtual unsigned __int64 getHash() const
             { return c->getHash(); }
+    virtual IStringIterator *getLogs(const char *type, const char *instance) const
+            { return c->getLogs(type, instance); }
+    virtual IStringIterator *getProcesses(const char *type) const
+            { return c->getProcesses(type); }
 
     virtual void clearExceptions()
             { c->clearExceptions(); }
@@ -1087,6 +1094,8 @@ public:
             { c->setTimeStamp(name, instance, event); }
     virtual void addTimeStamp(const char * name, const char * instance, const char *event)
             { c->addTimeStamp(name, instance, event); }
+    virtual void addProcess(const char *type, const char *instance, const char *log)
+            { c->addProcess(type, instance, log); }
     virtual void protect(bool protectMode)
             { c->protect(protectMode); }
     virtual void setBilled(bool billed)
@@ -1839,6 +1848,7 @@ public:
     }
 };      
 
+#define WUID_VERSION 1 // recorded in each wuid created, useful for bkwd compat. checks
 
 class CWorkUnitFactory : public CInterface, implements IWorkUnitFactory, implements IDaliClientShutdown
 {
@@ -1898,6 +1908,7 @@ public:
         IWorkUnit* ret = &cw->lockRemote(false);
         ret->setDebugValue("CREATED_BY", app, true);
         ret->setDebugValue("CREATED_FOR", user, true);
+        ret->setDebugValueInt("WUID_VERSION", WUID_VERSION, true);
         if (user)
             cw->setWuScope(user);
         return ret;
@@ -4644,6 +4655,51 @@ bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const
     return p->getPropBool(prop.str(), defVal); 
 }
 
+IStringIterator *CLocalWorkUnit::getLogs(const char *type, const char *instance) const
+{
+    VStringBuffer xpath("Process/%s/", type);
+    if (instance)
+        xpath.append(instance);
+    else
+        xpath.append("*");
+    CriticalBlock block(crit);
+    if (p->getPropInt("WUID_VERSION") < 1) // legacy wuid
+    {
+        if (!instance)
+            return new CStringPTreeTagIterator(p->getElements("Debug/*log*"));
+        else if(streq("EclAgent", instance))
+            return new CStringPTreeTagIterator(p->getElements("Debug/eclagentlog"));
+        else if (streq("Thor", instance))
+            return new CStringPTreeTagIterator(p->getElements("Debug/thorlog*"));
+        VStringBuffer xpath("Debug/%s", instance);
+        return new CStringPTreeAttrIterator(p->getElements(xpath.str()), xpath.str());
+    }
+    else
+        return new CStringPTreeAttrIterator(p->getElements(xpath.str()), "@log");
+}
+
+IStringIterator *CLocalWorkUnit::getProcesses(const char *type) const
+{
+    VStringBuffer xpath("Process/%s/*", type);
+    CriticalBlock block(crit);
+    return new CStringPTreeTagIterator(p->getElements(xpath.str()));
+}
+
+void CLocalWorkUnit::addProcess(const char *type, const char *instance, const char *log)
+{
+    VStringBuffer processType("Process/%s", type);
+    VStringBuffer xpath("%s/%s", processType.str(), instance);
+    if (log)
+        xpath.appendf("[@log=\"%s\"]", log);
+    CriticalBlock block(crit);
+    if (!p->hasProp(xpath))
+    {
+        IPropertyTree *node = ensurePTree(p, processType.str());
+        node = node->addPropTree(instance, createPTree());
+        node->setProp("@log", log);
+    }
+}
+
 void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite)
 {
     StringBuffer lower;

+ 3 - 0
common/workunit/workunit.hpp

@@ -918,6 +918,8 @@ interface IConstWorkUnit : extends IInterface
     virtual IStringVal & getXmlParams(IStringVal & params) const = 0;
     virtual const IPropertyTree * getXmlParams() const = 0;
     virtual unsigned __int64 getHash() const = 0;
+    virtual IStringIterator *getLogs(const char *type, const char *instance=NULL) const = 0;
+    virtual IStringIterator *getProcesses(const char *type) const = 0;
 };
 
 
@@ -930,6 +932,7 @@ interface IWorkUnit : extends IConstWorkUnit
     virtual IWUException * createException() = 0;
     virtual void setTimeStamp(const char * name, const char * instance, const char * event) = 0;
     virtual void addTimeStamp(const char * name, const char * instance, const char * event) = 0;
+    virtual void addProcess(const char *type, const char *instance, const char *log=NULL) = 0;
     virtual void setAction(WUAction action) = 0;
     virtual void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite) = 0;
     virtual void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite) = 0;

+ 2 - 2
ecl/eclagent/eclagent.cpp

@@ -1853,8 +1853,8 @@ void EclAgent::doProcess()
             LOG(MCrunlock, unknownJob, "Obtained workunit lock");
             if (w->hasDebugValue("traceLevel"))
                 traceLevel = w->getDebugValueInt("traceLevel", 10);
-            w->setTracingValue("EclAgentBuild", BUILD_TAG);  
-            w->setDebugValue("EclAgentLog", logname.str(), true);
+            w->setTracingValue("EclAgentBuild", BUILD_TAG);
+            w->addProcess("EclAgent", agentTopology->queryProp("@name"), logname.str());
             if (checkVersion && ((w->getCodeVersion() > ACTIVITY_INTERFACE_VERSION) || (w->getCodeVersion() < MIN_ACTIVITY_INTERFACE_VERSION)))
                 failv(0, "Workunit was compiled for eclagent interface version %d, this eclagent requires version %d..%d", w->getCodeVersion(), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
             if(noRetry && (w->getState() == WUStateFailed))

+ 8 - 2
testing/ecl/mergededup.ecl

@@ -215,7 +215,6 @@ subcnt3 := COUNT(srtbfnl) - COUNT(mrgsrtb);
 ok3 := IF(subcnt3=0, 'counts okay', 'counts wrong');
 diff3 := IF(subcnt3=0, COUNT(JOIN(PROJECT(srtbfnl,AddSeq(LEFT,COUNTER)), PROJECT(DISTRIBUTED(mrgsrtb,j),AddSeq(LEFT,COUNTER)),(LEFT.k=RIGHT.k)AND(LEFT.i=RIGHT.i)AND(LEFT.j=RIGHT.j), LEFT ONLY, LOCAL)), subcnt3);
 //diff3 := IF(subcnt3=0, COUNT(COMBINE(srtbfnl, mrgsrtb, diff(LEFT, RIGHT))((i != 0) OR (j != 0))), subcnt3);
-
 ddpbfnl := DEDUP(srtbfnl, i, LOCAL);
 mrgddpb := MERGE(ddpb1, ddpb2, ddpb3, ddpb4, ddpb5, ddpb6, ddpb7, ddpb8, ddpb9, ddpb10, ddpb11, ddpb12, ddpb13, ddpb14, ddpb15, ddpb16, ddpb17, ddpb18, ddpb19, ddpb20, ddpb21, ddpb22, ddpb23, ddpb24, ddpb25, DEDUP, sorted(i),LOCAL);
 subcnt4 := COUNT(ddpbfnl) - COUNT(mrgddpb);
@@ -223,11 +222,18 @@ ok4 := IF(subcnt4=0, 'counts okay', 'counts wrong');
 diff4 := IF(subcnt4=0, COUNT(JOIN(PROJECT(ddpbfnl,AddSeq(LEFT,COUNTER)), PROJECT(DISTRIBUTED(mrgddpb,j),AddSeq(LEFT,COUNTER)),(LEFT.k=RIGHT.k)AND(LEFT.i=RIGHT.i)AND(LEFT.j=RIGHT.j), LEFT ONLY, LOCAL)), subcnt4);
 //diff4 := IF(subcnt4=0, COUNT(COMBINE(ddpbfnl, mrgddpb, diff(LEFT, RIGHT))((i != 0) OR (j != 0))), subcnt4);
 
+mrgsrtc := MERGE([srtb1, srtb2, srtb3, srtb4, srtb5, srtb6, srtb7, srtb8, srtb9, srtb10, srtb11, srtb12, srtb13, srtb14, srtb15, srtb16, srtb17, srtb18, srtb19, srtb20, srtb21, srtb22, srtb23, srtb24, srtb25], sorted(i));
+subcnt5 := COUNT(srtbfnl) - COUNT(mrgsrtc);
+ok5 := IF(subcnt5=0, 'counts okay', 'counts wrong');
+diff5 := IF(subcnt5=0, COUNT(JOIN(PROJECT(srtbfnl,AddSeq(LEFT,COUNTER)), PROJECT(DISTRIBUTED(mrgsrtc,j),AddSeq(LEFT,COUNTER)),(LEFT.k=RIGHT.k)AND(LEFT.i=RIGHT.i)AND(LEFT.j=RIGHT.j), LEFT ONLY, LOCAL)), subcnt5);
+
+
 output(JOIN(PROJECT(srtfnl,AddSeq(LEFT,COUNTER)), PROJECT(mrgsrt,AddSeq(LEFT,COUNTER)),(LEFT.k=RIGHT.k)AND(LEFT.i=RIGHT.i)AND(LEFT.j=RIGHT.j), LEFT ONLY));
-SEQUENTIAL(OUTPUT(ok1), OUTPUT(diff1), OUTPUT(ok2), OUTPUT(diff2), OUTPUT(ok3), OUTPUT(diff3), OUTPUT(ok4), OUTPUT(diff4));
+SEQUENTIAL(OUTPUT(ok1), OUTPUT(diff1), OUTPUT(ok2), OUTPUT(diff2), OUTPUT(ok3), OUTPUT(diff3), OUTPUT(ok4), OUTPUT(diff4), OUTPUT(ok5), OUTPUT(diff5));
 
 
 // Global tests for Thor
+
 unsigned numrecs := 1000 : stored('numrecs');
 
 trec := record

+ 3 - 26
thorlcr/master/thgraphmanager.cpp

@@ -643,35 +643,10 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     {
         Owned<IWorkUnit> wu = &workunit.lock();
         wu->setTracingValue("ThorBuild", BUILD_TAG);
-        // expect there to be 1 or 2 of these, so scan/check if log exists already and add if not
         StringBuffer log, logUrl;
         logHandler->getLogName(log);
         createUNCFilename(log, logUrl, false);
-        const char *nLog = logUrl.str();
-        Owned<IStringIterator> siter = &wu->getDebugValues("ThorLog*");
-        unsigned last=0;
-        bool found=false;
-        ForEach(*siter)
-        {
-            SCMStringBuffer istr;
-            const char *prop = siter->str(istr).str();
-            SCMStringBuffer dV;
-            workunit.getDebugValue(prop, dV);
-            if (0 == stricmp(dV.str(), nLog))
-            {
-                found = true;
-                break;
-            }
-            const char *tail = prop+7;
-            unsigned n = (*tail) ? atoi(tail) : 1;
-            if (n>=last) last = n;
-        }
-        if (!found)
-        {
-            StringBuffer newThorLog("ThorLog");
-            if (last) newThorLog.append(++last);
-            wu->setDebugValue(newThorLog.str(), nLog, true);
-        }
+        wu->addProcess("Thor", globals->queryProp("@name"), logUrl.str());
         StringBuffer tsStr("Thor - ");
         wu->setTimeStamp(tsStr.append(graphName).str(), GetCachedHostName(), "Started");
     }
@@ -746,6 +721,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         {
             Owned<IWorkUnit> wu = &workunit.lock();
             wu->setState(WUStateRunning);
+            VStringBuffer version("%d.%d", THOR_VERSION_MAJOR, THOR_VERSION_MINOR);
+            wu->setDebugValue("ThorVersion", version.str(), true);
         }
 
         SCMStringBuffer wuid, clusterName;