Просмотр исходного кода

HPCC-24027 Ensure thormaster exception is cleared by agent.

Clear the exception passed back from a k8s wuid job, so that
if the workunit is submitted (e.g. due to RECOVERY) it will not
cause the same exception to be re-seen.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 лет назад
Родитель
Сommit
91d30d74d5

Разница между файлами не показана из-за своего большого размера
+ 401 - 113
common/workunit/workunit.cpp


+ 6 - 4
common/workunit/workunit.hpp

@@ -139,9 +139,9 @@ enum WUAction
     WUActionCheck = 2,
     WUActionRun = 3,
     WUActionExecuteExisting = 4,
-    WUActionPause = 5, 
-    WUActionPauseNow = 6, 
-    WUActionResume = 7, 
+    WUActionPause = 5,
+    WUActionPauseNow = 6,
+    WUActionResume = 7,
     WUActionSize = 8
 };
 
@@ -1496,7 +1496,7 @@ interface IExtendedWUInterface
     virtual bool archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned,bool exportAssociatedFiles) = 0;
     virtual IPropertyTree *getUnpackedTree(bool includeProgress) const = 0;
     virtual IPropertyTree *queryPTree() const = 0;
-    
+
 };
 
 //Do not mark this as WORKUNIT_API - all functions are inline, and it causes windows link errors
@@ -1691,6 +1691,8 @@ extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 
 inline cost_type calcCost(cost_type ratePerHour, unsigned __int64 ms) { return ratePerHour * ms / 1000 / 3600; }
 
+extern WORKUNIT_API void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IPropertyTree &config);
+
 #ifdef _CONTAINERIZED
 extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName, const char *multiJobLingerQueueName);
 extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);

+ 74 - 74
ecl/eclagent/eclagent.cpp

@@ -68,8 +68,8 @@ using roxiemem::OwnedRoxieString;
 
 //#define LEAK_FILE         "c:\\leaks.txt"
 
-#define MONITOR_ECLAGENT_STATUS     
- 
+#define MONITOR_ECLAGENT_STATUS
+
 //#define ROOT_DRIVE      "c:"
 
 //#define DEFAULT_REALTHOR_HOST "localhost"
@@ -154,7 +154,7 @@ class CHThorDebugSocketListener : public Thread, implements IHThorDebugSocketLis
 
 public:
     IMPLEMENT_IINTERFACE;
-    CHThorDebugSocketListener(CHThorDebugContext * _debugContext) 
+    CHThorDebugSocketListener(CHThorDebugContext * _debugContext)
         : Thread("CHThorDebugSocketListener"), debugContext(_debugContext)
     {
         port = 0;
@@ -204,7 +204,7 @@ public:
         port = HTHOR_DEBUG_BASE_PORT;
         for (;;)
         {
-            try 
+            try
             {
                 DBGLOG("CHThorDebugSocketListener trying port %d", port);
                 socket.setown( ISocket::create(port) );
@@ -428,7 +428,7 @@ public:
     virtual bool stop() override
     {
         IERRLOG("CHThorDebugSocketWorker stopped with queries active");
-        return true; 
+        return true;
     }
 
 };
@@ -440,7 +440,7 @@ IPooledThread *CHThorDebugSocketListener::createNew()
 
 //=======================================================================================
 
-CHThorDebugContext::CHThorDebugContext(const IContextLogger &_logctx, IPropertyTree *_queryXGMML, EclAgent *_eclAgent) 
+CHThorDebugContext::CHThorDebugContext(const IContextLogger &_logctx, IPropertyTree *_queryXGMML, EclAgent *_eclAgent)
     : CBaseServerDebugContext(_logctx, _queryXGMML), eclAgent(_eclAgent)
 {
 }
@@ -616,7 +616,7 @@ ICodeContext *EclAgent::queryCodeContext()
 
 const char *EclAgent::queryTempfilePath()
 {
-    if (agentTempDir.isEmpty()) 
+    if (agentTempDir.isEmpty())
     {
         StringBuffer dir;
         getTempFilePath(dir, "eclagent", agentTopology);
@@ -693,7 +693,7 @@ void EclAgent::unlockWorkUnit()
     if (wuWrite)
     {
         IWorkUnit *w = wuWrite.getClear();
-        if (!w->Release()) 
+        if (!w->Release())
             IERRLOG("EclAgent::unlockWorkUnit workunit not released");
     }
 }
@@ -843,7 +843,7 @@ void EclAgent::setResultReal(const char *name, unsigned sequence, double val)
     Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
-        r->setResultReal(val);  
+        r->setResultReal(val);
         r->setResultStatus(ResultStatusCalculated);
     }
     else
@@ -1023,16 +1023,16 @@ void EclAgent::getExternalResultRaw(unsigned & tlen, void * & tgt, const char *
     {
         Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
         if (!r) failv(0, "Failed to find raw value %s:%d in workunit %s", nullText(stepname),sequence, wuid);
-        
+
         Variable2IDataVal result(&tlen, &tgt);
         Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
         Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
         r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
     }
-    catch (IException * e) 
+    catch (IException * e)
     {
-        StringBuffer text; 
-        e->errorMessage(text); 
+        StringBuffer text;
+        e->errorMessage(text);
         e->Release();
         failv(0, "value %s:%d in workunit %s contains an invalid raw value [%s]", nullText(stepname), sequence, wuid, text.str());
     }
@@ -1101,10 +1101,10 @@ bool EclAgent::getWorkunitResultFilename(StringBuffer & diskFilename, const char
         diskFilename.append("~").append(tempFilename.str());
         return true;
     }
-    catch (IException * e) 
+    catch (IException * e)
     {
-        StringBuffer text; 
-        e->errorMessage(text); 
+        StringBuffer text;
+        e->errorMessage(text);
         e->Release();
         failv(0, "Failed to find value %s:%d in workunit %s [%s]", nullText(stepname),sequence, nullText(wuid), text.str());
     }
@@ -1144,7 +1144,7 @@ void EclAgent::doSetResultString(type_t type, const char *name, unsigned sequenc
     Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
-        r->setResultString(val, len);   
+        r->setResultString(val, len);
         r->setResultStatus(ResultStatusCalculated);
     }
     else
@@ -1194,7 +1194,7 @@ void EclAgent::setResultSet(const char * name, unsigned sequence, bool isAll, si
     if (r)
     {
         r->setResultIsAll(isAll);
-        r->setResultRaw(len, val, ResultFormatRaw); 
+        r->setResultRaw(len, val, ResultFormatRaw);
         r->setResultStatus(ResultStatusCalculated);
     }
     else
@@ -1210,7 +1210,7 @@ void EclAgent::setResultSet(const char * name, unsigned sequence, bool isAll, si
             {
                 SimpleOutputWriter x;
                 xform->toXML(isAll, len, (const byte *) val, x);
-                outputSerializer->printf(sequence, "%s]", x.str()); 
+                outputSerializer->printf(sequence, "%s]", x.str());
             }
             else
                 outputSerializer->printf(sequence, "?]");
@@ -1379,7 +1379,7 @@ bool EclAgent::expandLogicalName(StringBuffer & fullname, const char * logicalNa
         logicalName++;
         useScope = false;
     }
-    else if (isAbsolutePath(logicalName)) 
+    else if (isAbsolutePath(logicalName))
     {
         fullname.append(logicalName);
         return false;
@@ -1410,9 +1410,9 @@ ILocalOrDistributedFile *EclAgent::resolveLFN(const char *fname, const char *err
         }
         else
         {
-            makeAbsolutePath(lfn.str(), name);  
+            makeAbsolutePath(lfn.str(), name);
         }
-        lfn.clear().append(name);  
+        lfn.clear().append(name);
     }
     if (expandedlfn)
         *expandedlfn = lfn;
@@ -1455,7 +1455,7 @@ bool EclAgent::fileExists(const char *name)
     expandLogicalName(lfn, name);
 
     Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn.str(),queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser);
-    if (f) 
+    if (f)
         return true;
     return false;
 }
@@ -1562,7 +1562,7 @@ char *EclAgent::getPlatform()
         /* NB: platform specs. are defined if agent is running in the context of
          * another engine, e.g. query has been submitted to Thor, but some code is
         * executing outside of it.
-        * 
+        *
         * If not defined then assumed to be executing in hthor context,
         * where platform() defaults to "hthor".
         */
@@ -1570,7 +1570,7 @@ char *EclAgent::getPlatform()
         if (!agentTopology->getProp("platform/@type", type))
             type.set("hthor"); // default
         return type.detach();
-#else            
+#else
         const char * cluster = clusterNames.tos();
         Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
         if (!clusterInfo)
@@ -1582,7 +1582,7 @@ char *EclAgent::getPlatform()
         return strdup("standalone");
 }
 
-char *EclAgent::getEnv(const char *name, const char *defaultValue) const 
+char *EclAgent::getEnv(const char *name, const char *defaultValue) const
 {
     const char *val = cmdLineArgs->queryProp(name);
     if (!val)
@@ -1628,15 +1628,15 @@ unsigned EclAgent::getNodes()//retrieve node count for current cluster
             /* NB: platform specs. are defined if agent is running in the context of
              * another engine, e.g. query has been submitted to Thor, but some code is
              * executing outside of it.
-             * 
+             *
              * If not defined then assumed to be executing in hthor context,
              * where getNodes() defaults to 1.
              */
             clusterWidth = agentTopology->getPropInt("platform/@width", 1);
-#else            
+#else
             const char * cluster = clusterNames.tos();
             Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
-            if (!clusterInfo) 
+            if (!clusterInfo)
                 throw MakeStringException(-1, "Unknown cluster '%s'", cluster);
             clusterWidth = clusterInfo->getSize();
             assertex(clusterWidth != 0);
@@ -1739,7 +1739,7 @@ IConstWorkUnit * EclAgent::resolveLibrary(const char * libraryName, unsigned exp
     Owned<IPropertyTree> resolved = queryRegistry ? resolveQueryAlias(queryRegistry, libraryName) : NULL;
     if (!resolved)
         throw MakeStringException(0, "No current implementation of library %s", libraryName);
-    
+
     const char * libraryWuid = resolved->queryProp("@wuid");
 
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
@@ -1886,7 +1886,7 @@ void EclAgent::doProcess()
                 w->setAgentSession(myProcessSession());
                 w->clearGraphProgress();  // Should Roxie do this too??
             }
-            if (debugContext)   
+            if (debugContext)
             {
                 w->setDebugAgentListenerPort(debugContext->queryPort());
 
@@ -2071,7 +2071,7 @@ void EclAgent::doProcess()
         logException((IException *) NULL);
     }
     try {
-        unlockWorkUnit(); 
+        unlockWorkUnit();
     }
     catch (IException *e)
     {
@@ -2197,7 +2197,7 @@ void EclAgent::runProcess(IEclProcess *process)
 
 unsigned EclAgent::getWorkflowId()
 {
-    return workflow->queryCurrentWfid(); 
+    return workflow->queryCurrentWfid();
 }
 
 //----------------------------------------------------------------
@@ -2544,7 +2544,7 @@ void EclAgent::logException(WorkflowException *e)
         e->errorMessage(m);
         code = e->errorCode();
     }
-    else    
+    else
         m.append("Unknown error");
 
     logException(SeverityError, aud, code, m.str(), isAbort);
@@ -2564,7 +2564,7 @@ void EclAgent::logException(IException *e)
         e->errorMessage(m);
         code = e->errorCode();
     }
-    else    
+    else
         m.append("Unknown error");
 
     logException(SeverityError, MSGAUD_programmer, code, m.str(), false);
@@ -3130,7 +3130,7 @@ char *EclAgent::getFilePart(const char *lfn, bool create)
                 OwnedIFile file = ldFile->getPartFile(0,copyno);
                 if (file->exists())
                 {
-                    StringBuffer p(file->queryFilename()); 
+                    StringBuffer p(file->queryFilename());
                     return p.detach();
                 }
             }
@@ -3198,7 +3198,7 @@ void EclAgent::addTimings()
 }
 
 // eclagent abort monitoring
-void EclAgent::abortMonitor() 
+void EclAgent::abortMonitor()
 {
     StringBuffer errorText;
     unsigned guillotineleft = 0;
@@ -3274,14 +3274,14 @@ void EclAgent::abortMonitor()
 void EclAgent::fatalAbort(bool userabort,const char *excepttext)
 {
     try {
-        CriticalBlock block(wusect); 
+        CriticalBlock block(wusect);
         WorkunitUpdate w = updateWorkUnit();
-        if (userabort) 
+        if (userabort)
             w->setState(WUStateAborted);
         if (excepttext&&*excepttext)
             addExceptionEx(SeverityError, MSGAUD_programmer, "eclagent", 1000, excepttext, NULL, 0, 0, true, false);
         w->deleteTempFiles(NULL, false, true);
-        wuRead.clear(); 
+        wuRead.clear();
         w->commit();        // needed because we can't unlock the workunit in this thread
         w.clear();
         deleteTempFiles();
@@ -3488,7 +3488,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
                     Owned<IPropertyTree> valtree = createPTreeFromXMLString(val);
                     query->setPropTree(key+1, valtree.getClear());
                 }
-                else 
+                else
                     query->setProp(key+1, val);
             }
         }
@@ -3516,7 +3516,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
     Owned<IConnectionMonitor> daliDownMonitor;
     try
     {
-#ifdef MONITOR_ECLAGENT_STATUS  
+#ifdef MONITOR_ECLAGENT_STATUS
         std::unique_ptr<CSDSServerStatus> serverstatus;
 #endif
         Owned<ILocalWorkUnit> standAloneWorkUnit;
@@ -3535,7 +3535,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
                 Owned<IGroup> serverGroup = createIGroupRetry(daliServers, DALI_SERVER_PORT);
                 initClientProcess(serverGroup, DCR_EclAgent, 0, NULL, NULL, MP_WAIT_FOREVER);
             }
-#ifdef MONITOR_ECLAGENT_STATUS  
+#ifdef MONITOR_ECLAGENT_STATUS
             serverstatus.reset(new CSDSServerStatus("ECLagent"));
             serverstatus->queryProperties()->setPropInt("Pid", GetCurrentProcessId());
             serverstatus->commitProperties();
@@ -3574,7 +3574,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
             addMPConnectionMonitor(daliDownMonitor);
 
             LOG(MCoperatorInfo, "hthor build %s", BUILD_TAG);
-            startLogMsgParentReceiver();    
+            startLogMsgParentReceiver();
             connectLogMsgManagerToDali();
 
             StringBuffer baseDir;
@@ -3646,7 +3646,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
             wuid.set(uid);
         }
 
-#ifdef MONITOR_ECLAGENT_STATUS  
+#ifdef MONITOR_ECLAGENT_STATUS
         if (serverstatus)
         {
             serverstatus->queryProperties()->setProp("WorkUnit",wuid.str());
@@ -3840,7 +3840,7 @@ protected:
     bool hasStarted;
     bool hasStopped;
     bool everStarted;
-        
+
 public:
     InputProbe(IHThorInput *_in, IEngineRowStream *_stream, unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
         : in(_in), stream(_stream), sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
@@ -3854,26 +3854,26 @@ public:
         inMeta = NULL;
     }
 
-    virtual IInputSteppingMeta * querySteppingMeta() 
+    virtual IInputSteppingMeta * querySteppingMeta()
     {
         return in->querySteppingMeta();
     }
-    virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) 
+    virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector)
     {
-        return in->gatherConjunctions(collector); 
+        return in->gatherConjunctions(collector);
     }
-    virtual void resetEOF() 
-    { 
-        in->resetEOF(); 
+    virtual void resetEOF()
+    {
+        in->resetEOF();
     }
 
-    virtual IOutputMetaData * queryOutputMeta() const 
-    { 
-        return in->queryOutputMeta(); 
+    virtual IOutputMetaData * queryOutputMeta() const
+    {
+        return in->queryOutputMeta();
     }
 
-    virtual IOutputMetaData * queryOutputMeta() 
-    { 
+    virtual IOutputMetaData * queryOutputMeta()
+    {
         return in->queryOutputMeta();
     }
 
@@ -3903,22 +3903,22 @@ public:
         return ret;
     }
 
-    virtual bool isGrouped() 
-    { 
-        return in->isGrouped(); 
+    virtual bool isGrouped()
+    {
+        return in->isGrouped();
     }
 
-    virtual void ready() 
-    { 
+    virtual void ready()
+    {
         // NOTE: rowCount/maxRowSize not reset, as we want them cumulative when working in a child query.
         hasStarted = true;
         hasStopped = false;
         everStarted = true;
-        in->ready(); 
+        in->ready();
     }
 
-    virtual void stop() 
-    { 
+    virtual void stop()
+    {
         hasStopped = true;
         stream->stop();
     }
@@ -3943,7 +3943,7 @@ class DebugProbe : public InputProbe, implements IActivityDebugContext
     unsigned historySize;
     unsigned historyCapacity;
     unsigned nextHistorySlot;
-    
+
     mutable memsize_t proxyId; // MORE - do we need a critsec to protect too?
 
     DebugActivityRecord *sourceAct;
@@ -4027,7 +4027,7 @@ public:
 
     virtual void Link() const
     {
-        CInterface::Link(); 
+        CInterface::Link();
     }
 
     virtual bool Release() const
@@ -4041,12 +4041,12 @@ public:
         return proxyId;
     }
 
-    virtual void resetEOF() 
-    { 
+    virtual void resetEOF()
+    {
         forceEOF = false;
         EOGseen = false;
         EOGsent = false;
-        InputProbe::resetEOF(); 
+        InputProbe::resetEOF();
     }
 #if 0
     virtual unsigned queryId() const
@@ -4123,7 +4123,7 @@ public:
         }
     }
 
-    virtual void getXGMML(IXmlWriter *output) const 
+    virtual void getXGMML(IXmlWriter *output) const
     {
         output->outputBeginNested("edge", false);
         sourceAct->outputId(output, "@source");
@@ -4143,7 +4143,7 @@ public:
         output->outputEndNested("edge");
     }
 
-    virtual IOutputMetaData *queryOutputMeta() const 
+    virtual IOutputMetaData *queryOutputMeta() const
     {
         return InputProbe::queryOutputMeta();
     }
@@ -4154,7 +4154,7 @@ public:
     }
 
     // NOTE - these functions are threadsafe because only called when query locked by debugger.
-    // Even though this thread may not yet be blocked on the debugger's critsec, because all manipulation (including setting history rows) is from 
+    // Even though this thread may not yet be blocked on the debugger's critsec, because all manipulation (including setting history rows) is from
     // within debugger it is ok.
 
     virtual unsigned queryHistorySize() const
@@ -4362,7 +4362,7 @@ public:
     }
 
     virtual void updateProgress(IStatisticGatherer &progress) const
-    {   
+    {
         if (in)
             in->updateProgress(progress);
     }

+ 29 - 30
ecl/eclagent/eclagent.ipp

@@ -30,7 +30,7 @@
 #include "workflow.hpp"
 #include "roxierow.hpp"
 #include "roxiedebug.hpp"
-#include <stdexcept> 
+#include <stdexcept>
 #include "thorplugin.hpp"
 #include "thorcommon.hpp"
 #include "enginecontext.hpp"
@@ -117,28 +117,28 @@ public:
     {
         return ctx->queryResolveFilesLocally();
     }
-    virtual bool queryRemoteWorkunit() 
-    { 
-        return ctx->queryRemoteWorkunit(); 
+    virtual bool queryRemoteWorkunit()
+    {
+        return ctx->queryRemoteWorkunit();
     }
-    virtual bool queryWriteResultsToStdout() 
-    { 
+    virtual bool queryWriteResultsToStdout()
+    {
         return ctx->queryWriteResultsToStdout();
-    }   
+    }
     virtual outputFmts queryOutputFmt()
-    { 
+    {
         return ctx->queryOutputFmt();
     }
     virtual VOID outputFormattedResult(const char *name, unsigned sequence, bool close)
-    { 
+    {
         return ctx->outputFormattedResult(name, sequence, close);
     }
     virtual unsigned __int64 queryStopAfter()
-    { 
+    {
         return ctx->queryStopAfter();
     }
-    virtual IOrderedOutputSerializer * queryOutputSerializer() 
-    { 
+    virtual IOrderedOutputSerializer * queryOutputSerializer()
+    {
         return ctx->queryOutputSerializer();
     }
     virtual void setWorkflowCondition(bool value)
@@ -217,17 +217,17 @@ public:
     {
         return ctx->createGraphLoopResults();
     }
-    
+
     virtual const char *queryAllowedPipePrograms()
     {
         return ctx->queryAllowedPipePrograms();
     }
-    
+
     virtual IGroup *getHThorGroup(StringBuffer &name)
     {
         return ctx->getHThorGroup(name);
     }
-    
+
     virtual const char *queryWuid()
     {
         return ctx->queryWuid();
@@ -330,9 +330,9 @@ class CHThorDebugContext : extends CBaseServerDebugContext
 {
     Owned<CHThorDebugSocketListener> listener;
     EclAgent *eclAgent;
-    
+
 public:
-    CHThorDebugContext(const IContextLogger &_logctx, IPropertyTree *_queryXGMML, EclAgent *_eclAgent); 
+    CHThorDebugContext(const IContextLogger &_logctx, IPropertyTree *_queryXGMML, EclAgent *_eclAgent);
     inline unsigned queryPort();
     inline EclAgent * getEclAgent() { return eclAgent; };
 
@@ -582,12 +582,11 @@ public:
     virtual void getLastFailMessage(size32_t & outLen, char * & outStr, const char * tag);
     virtual void getEventName(size32_t & outLen, char * & outStr);
     virtual void getEventExtra(size32_t & outLen, char * & outStr, const char * tag);
-    //virtual void logException(IEclException *e);  
+    //virtual void logException(IEclException *e);
     virtual char *resolveName(const char *in, char *out, unsigned outlen);
     virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type);
     virtual ILocalOrDistributedFile  *resolveLFN(const char *logicalName, const char *errorTxt, bool optional, bool noteRead, bool write, StringBuffer * expandedlfn, bool isPrivilegedUser);
 
-    virtual void executeThorGraph(const char * graphName);
     virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract);
     virtual IHThorGraphResults * executeLibraryGraph(const char * libraryName, unsigned expectedInterfaceHash, unsigned activityId, const char * embeddedGraphName, const byte * parentExtract);
     virtual IThorChildGraph * resolveChildQuery(__int64 subgraphId, IHThorArg * colocal);
@@ -604,8 +603,8 @@ public:
 
     void addException(ErrorSeverity severity, const char * source, unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool failOnError, bool isAbort);
     void addExceptionEx(ErrorSeverity severity, MessageAudience aud, const char * source, unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool failOnError, bool isAbort);
-    void logException(IException *e);  
-    void logException(WorkflowException *e);  
+    void logException(IException *e);
+    void logException(WorkflowException *e);
     void logException(std::exception & e);
     void logException(ErrorSeverity severity, MessageAudience aud, unsigned code, const char * text, bool isAbort);
 
@@ -622,7 +621,7 @@ public:
     virtual unsigned getWorkflowId();
     virtual IConstWorkUnit *queryWorkUnit() const override;  // no link
     virtual IWorkUnit *updateWorkUnit() const; // links
-    virtual void unlockWorkUnit();      
+    virtual void unlockWorkUnit();
     virtual void reloadWorkUnit();
     void addTimings();
 
@@ -689,9 +688,9 @@ public:
     {
         return allowedPipeProgs.get();
     }
-    
+
     IGroup *getHThorGroup(StringBuffer &out);
-    
+
     virtual void updateWULogfile();
 
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
@@ -944,12 +943,12 @@ private:
 
         IOutputMetaData * queryOutputMeta() const { return in->queryOutputMeta(); }
 
-        void ready() 
+        void ready()
         {
             in->ready();
         }
-        
-        void stop() 
+
+        void stop()
         {
             in->stop();
         }
@@ -975,7 +974,7 @@ private:
                 return true;
             return false;
         }
-    
+
         const void *nextRow()
         {
             const void *ret = in->nextRow();
@@ -996,7 +995,7 @@ private:
             }
             if (in)
                 in->updateProgress(progress);
-        }   
+        }
     };
 
     RedirectedAgentContext subgraphAgentContext;
@@ -1121,7 +1120,7 @@ class EclGraph : public CInterface
         }
 
         IEclGraphResults * resolveLocalQuery(__int64 activityId)
-        { 
+        {
             return container->resolveLocalQuery((unsigned)activityId);
         }
         void setContainer(EclGraph * _container)

+ 23 - 289
ecl/eclagent/eclgraph.cpp

@@ -53,17 +53,17 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKdiskwrite:
     case TAKspillwrite:
         return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind);
-    case TAKsort: 
+    case TAKsort:
         return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind);
-    case TAKdedup: 
+    case TAKdedup:
         return createGroupDedupActivity(agent, activityId, subgraphId, (IHThorDedupArg &)arg, kind);
-    case TAKfilter: 
+    case TAKfilter:
         return createFilterActivity(agent, activityId, subgraphId, (IHThorFilterArg &)arg, kind);
-    case TAKproject: 
+    case TAKproject:
         return createProjectActivity(agent, activityId, subgraphId, (IHThorProjectArg &)arg, kind);
     case TAKprefetchproject:
         return createPrefetchProjectActivity(agent, activityId, subgraphId, (IHThorPrefetchProjectArg &)arg, kind);
-    case TAKfilterproject : 
+    case TAKfilterproject :
         return createFilterProjectActivity(agent, activityId, subgraphId, (IHThorFilterProjectArg &)arg, kind);
     case TAKrollup:
         return createRollupActivity(agent, activityId, subgraphId, (IHThorRollupArg &)arg, kind);
@@ -189,7 +189,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKxmlfetch:
     case TAKjsonfetch:
         return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind, node);
-    case TAKmerge: 
+    case TAKmerge:
         return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);
     case TAKhttp_rowdataset:
         return createHttpRowCallActivity(agent, activityId, subgraphId, (IHThorHttpCallArg &)arg, kind);
@@ -201,13 +201,13 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
         return createSoapDatasetCallActivity(agent, activityId, subgraphId, (IHThorSoapCallArg &)arg, kind);
     case TAKsoap_datasetaction:
         return createSoapDatasetActionActivity(agent, activityId, subgraphId, (IHThorSoapActionArg &)arg, kind);
-    case TAKchilditerator:          
+    case TAKchilditerator:
         return createChildIteratorActivity(agent, activityId, subgraphId, (IHThorChildIteratorArg &)arg, kind);
     case TAKlinkedrawiterator:
         return createLinkedRawIteratorActivity(agent, activityId, subgraphId, (IHThorLinkedRawIteratorArg &)arg, kind);
-    case TAKrowresult:          
+    case TAKrowresult:
         return createRowResultActivity(agent, activityId, subgraphId, (IHThorRowResultArg &)arg, kind);
-    case TAKdatasetresult:          
+    case TAKdatasetresult:
         return createDatasetResultActivity(agent, activityId, subgraphId, (IHThorDatasetResultArg &)arg, kind);
     case TAKwhen_dataset:
     case TAKwhen_action:
@@ -347,7 +347,7 @@ EclGraphElement::EclGraphElement(EclSubGraph * _subgraph, EclSubGraph * _results
 
 void EclGraphElement::addDependsOn(EclSubGraph & other, EclGraphElement * sourceActivity, int controlId)
 {
-    dependentOn.append(other); 
+    dependentOn.append(other);
     dependentOnActivity.append(*sourceActivity);
     dependentControlId.append(controlId);
 }
@@ -503,13 +503,13 @@ void EclGraphElement::createActivity(IAgentContext & agent, EclSubGraph * owner)
                                                         input.queryOutput(branchIndexes.item(i2)),  //input
                                                         input.activity.get(),   //Source act
                                                         activity.get(),         //target activity
-                                                        0,//input.id, 
-                                                        0,//id, 
+                                                        0,//input.id,
+                                                        0,//id,
                                                         0);
                         probe = & dynamic_cast<IHThorInput &> (base->queryInput());
                     }
                 }
-                else 
+                else
                 {
                     probe = subgraph->createLegacyProbe(input.queryOutput(branchIndexes.item(i2)),
                                                     input.id,
@@ -673,8 +673,8 @@ bool EclGraphElement::prepare(IAgentContext & agent, const byte * parentExtract,
         case TAKparallel:
             {
                 Owned<IHThorArg> helper = createHelper(agent, subgraph->owner);
-                unsigned numBranches = (kind == TAKsequential) ? 
-                                        ((IHThorSequentialArg *)helper.get())->numBranches() : 
+                unsigned numBranches = (kind == TAKsequential) ?
+                                        ((IHThorSequentialArg *)helper.get())->numBranches() :
                                         ((IHThorParallelArg *)helper.get())->numBranches();
                 for (unsigned branch=1; branch <= numBranches; branch++)
                     executeDependentActions(agent, parentExtract, branch);
@@ -1119,19 +1119,19 @@ EclGraphElement * EclSubGraph::idToActivity(unsigned id)
     return NULL;
 }
 
-void EclSubGraph::reset() 
-{ 
-    executed = false; 
+void EclSubGraph::reset()
+{
+    executed = false;
     ForEachItemIn(i, subgraphs)
         subgraphs.item(i).reset();
 }
 
 //---------------------------------------------------------------------------
 
-void EclAgentQueryLibrary::updateProgress() 
+void EclAgentQueryLibrary::updateProgress()
 {
     if (graph)
-        graph->updateLibraryProgress(); 
+        graph->updateLibraryProgress();
 }
 
 void EclAgentQueryLibrary::destroyGraph()
@@ -1195,7 +1195,7 @@ void EclGraph::createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml, boo
             {
                 targetGraph = targetActivity->subgraph;
                 targetGraphContext = targetGraph->parentActivityId;
-                if (sourceGraphContext == targetGraphContext) 
+                if (sourceGraphContext == targetGraphContext)
                     break;
 
                 targetActivity = recurseFindActivityFromId(targetGraph, targetGraphContext);
@@ -1524,272 +1524,6 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa
 
 extern IProbeManager *createDebugManager(IDebuggableContext *debugContext, const char *graphName);
 
-#define ABORT_POLL_PERIOD (5*1000)
-
-void EclAgent::executeThorGraph(const char * graphName)
-{
-    unsigned timelimit = queryWorkUnit()->getDebugValueInt("thorConnectTimeout", agentTopology->getPropInt("@thorConnectTimeout", 60));
-#ifdef _CONTAINERIZED
-    // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
-    setBlocked();
-    unlockWorkUnit();
-
-    WUState state = WUStateUnknown;
-    if (agentTopology->hasProp("@queue"))
-    {
-        bool multiJobLinger = agentTopology->getPropBool("@multiJobLinger");
-        if (executeGraphOnLingeringThor(*queryWorkUnit(), graphName, multiJobLinger ? agentTopology->queryProp("@queue") : nullptr))
-            PROGLOG("Existing lingering Thor handled graph: %s", graphName);
-        else
-        {
-            VStringBuffer queueName("%s.thor", agentTopology->queryProp("@queue"));
-            DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
-            Owned<IJobQueue> queue = createJobQueue(queueName.str());
-            VStringBuffer jobName("%s/%s", wuid.get(), graphName);
-            IJobQueueItem *item = createJobQueueItem(jobName);
-            queue->enqueue(item);
-        }
-
-        // NB: overall max runtime if guillotine set handled by abortmonitor
-        unsigned runningTimeLimit = queryWorkUnit()->getDebugValueInt("maxRunTime", 0);
-        runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;
-
-        std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
-        for (unsigned i=0; i<2; i++)
-        {
-            WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
-            DBGLOG("Got state: %s", getWorkunitStateStr(state));
-            if (WUStateWait == state) // already finished
-                break;
-            else if ((INFINITE != timelimit) && (WUStateUnknown == state))
-                throw makeStringExceptionV(0, "Query %s failed to start within specified timelimit (%u) seconds", wuid.str(), timelimit);
-            else
-            {
-                auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
-                if (it == expectedStates.end())
-                    throw makeStringExceptionV(0, "Query %s failed, state: %s", wuid.str(), getWorkunitStateStr(state));
-            }
-            timelimit = runningTimeLimit;
-            expectedStates = { WUStateWait };
-        }
-    }
-    else
-    {        
-        VStringBuffer job("%s-%s", wuid.str(), graphName);
-        runK8sJob("thormaster", wuid, job, queryComponentConfig().getPropBool("@deleteJobs", true), { { "graphName", graphName} });
-    }
-
-    if (wuRead->getExceptionCount())
-    {
-        Owned<IConstWUExceptionIterator> iter = &wuRead->getExceptions();
-        ForEach(*iter)
-        {
-            IConstWUException &e = iter->query();
-            SCMStringBuffer str;
-            e.getExceptionSource(str);
-            if (streq("thormasterexception", str.s))
-            {
-                str.clear();
-                e.getExceptionMessage(str);
-                throw makeStringException(e.getExceptionCode(), str.str());
-            }
-        }
-    }
-    else
-    {
-        WorkunitUpdate w = updateWorkUnit();
-        WUState state = w->getState();
-        if (WUStateFailed == state)
-            throw makeStringException(0, "Workunit failed");
-    }
-
-    setRunning();
-    unlockWorkUnit();
-#else
-    StringAttr wuid(wuRead->queryWuid());
-    StringAttr owner(wuRead->queryUser());
-    StringAttr cluster(wuRead->queryClusterName());
-    int priority = wuRead->getPriorityValue();
-    Owned<IConstWUClusterInfo> c = getTargetClusterInfo(cluster.str());
-    if (!c)
-        throw MakeStringException(0, "Invalid thor cluster %s", cluster.str());
-    SCMStringBuffer queueName;
-    c->getThorQueue(queueName);
-    Owned<IJobQueue> jq = createJobQueue(queueName.str());
-
-    bool resubmit;
-    Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
-    do // loop if pause interrupted graph and needs resubmitting on resume
-    {
-        resubmit = false; // set if job interrupted in thor
-        unlockWorkUnit();
-        if (WUStatePaused == queryWorkUnit()->getState()) // check initial state - and wait if paused
-        {
-            for (;;)
-            {
-                WUAction action = wuFactory->waitForWorkUnitAction(wuid, queryWorkUnit()->getAction());
-                if (action == WUActionUnknown)
-                    throw new WorkflowException(0, "Workunit aborting", 0, WorkflowException::ABORT, MSGAUD_user);
-                if (action != WUActionPause && action != WUActionPauseNow)
-                    break;
-            }
-        }
-        {
-            Owned <IWorkUnit> w = updateWorkUnit();
-            w->setState(WUStateBlocked);
-        }
-        unlockWorkUnit();
-            
-        class cPollThread: public Thread
-        {
-            Semaphore sem;
-            bool stopped;
-            IJobQueue *jq;
-            IConstWorkUnit *wu;
-        public:
-            
-            bool timedout;
-            CTimeMon tm;
-            cPollThread(IJobQueue *_jq, IConstWorkUnit *_wu, unsigned timelimit)
-                : tm(timelimit)
-            {
-                stopped = false;
-                jq = _jq;
-                wu = _wu;
-                timedout = false;
-            }
-            ~cPollThread()
-            {
-                stop();
-            }
-            int run()
-            {
-                while (!stopped) {
-                    sem.wait(ABORT_POLL_PERIOD);
-                    if (stopped)
-                        break;
-                    if (tm.timedout()) {
-                        timedout = true;
-                        stopped = true;
-                        jq->cancelInitiateConversation();
-                    }
-                    else if (wu->aborting()) {
-                        stopped = true;
-                        jq->cancelInitiateConversation();
-                    }
-
-                }
-                return 0;
-            }
-            void stop()
-            {
-                stopped = true;
-                sem.signal();
-            }
-        } pollthread(jq,wuRead,timelimit*1000);
-
-        pollthread.start();
-
-        PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
-        IJobQueueItem* item = createJobQueueItem(wuid.str());
-        item->setOwner(owner.str());
-        item->setPriority(priority);        
-        Owned<IConversation> conversation = jq->initiateConversation(item);
-        bool got = conversation.get()!=NULL;
-        pollthread.stop();
-        pollthread.join();
-        if (!got) {
-            if (pollthread.timedout)
-                throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d) seconds", wuid.str(), timelimit);
-            throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
-        }
-        // get the thor ep from whoever picked up
-
-        SocketEndpoint thorMaster;
-        MemoryBuffer msg;
-        if (!conversation->recv(msg,1000*60)) {
-            throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
-        }
-        thorMaster.deserialize(msg);
-        msg.clear().append(graphName);
-        SocketEndpoint myep;
-        myep.setLocalHost(0);
-        myep.serialize(msg);  // only used for tracing
-        if (!conversation->send(msg)) {
-            StringBuffer s("Failed to send query to Thor on ");
-            thorMaster.getUrlStr(s);
-            throw MakeStringExceptionDirect(-1, s.str()); // maybe retry?
-        }
-
-        StringBuffer eps;
-        PROGLOG("Thor on %s running %s",thorMaster.getUrlStr(eps).str(),wuid.str());
-        MemoryBuffer reply;
-        try
-        {
-            if (!conversation->recv(reply,INFINITE))
-            {
-                StringBuffer s("Failed to receive reply from thor ");
-                thorMaster.getUrlStr(s);
-                throw MakeStringExceptionDirect(-1, s.str());
-            }
-        }
-        catch (IException *e)
-        {
-            StringBuffer s("Failed to receive reply from thor ");
-            thorMaster.getUrlStr(s);
-            s.append("; (").append(e->errorCode()).append(", ");
-            e->errorMessage(s).append(")");
-            e->Release();
-            throw MakeStringExceptionDirect(-1, s.str());
-        }
-        unsigned replyCode;
-        reply.read(replyCode);
-        switch ((ThorReplyCodes) replyCode)
-        {
-            case DAMP_THOR_REPLY_PAUSED:
-            {
-                bool isException ;
-                reply.read(isException);
-                if (isException)
-                {
-                    Owned<IException> e = deserializeException(reply);
-                    VStringBuffer str("Pausing job %s caused exception", queryWorkUnit()->queryWuid());
-                    EXCLOG(e, str.str());
-                }
-                Owned <IWorkUnit> w = updateWorkUnit();
-                w->setState(WUStatePaused); // will trigger executeThorGraph to pause next time around.
-                WUAction action = w->getAction();
-                switch (action)
-                {
-                    case WUActionPause:
-                    case WUActionPauseNow:
-                        w->setAction(WUActionUnknown);
-                }
-                resubmit = true; // JCSMORE - all subgraph _could_ be done, thor will check though and not rerun
-                break;
-            }
-            case DAMP_THOR_REPLY_GOOD:
-                break;
-            case DAMP_THOR_REPLY_ERROR:
-            {
-                throw deserializeException(reply);
-            }
-            case DAMP_THOR_REPLY_ABORT:
-            {
-                Owned<IException> e = deserializeException(reply);
-                StringBuffer msg;
-                e->errorMessage(msg);
-                throw new WorkflowException(e->errorCode(), msg.str(), 0, WorkflowException::ABORT, MSGAUD_user);
-            }
-            default:
-                assertex(false);
-        }
-        reloadWorkUnit();
-    }
-    while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
-#endif
-}
-
 //In case of logfile rollover, update logfile name(s) stored in workunit
 
 void EclAgent::updateWULogfile()
@@ -1822,7 +1556,7 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
     {
         if (isStandAloneExe)
             throw MakeStringException(0, "Cannot execute Thor Graph in standalone mode");
-        executeThorGraph(graphName);
+        executeThorGraph(graphName, *wuRead, *agentTopology);
     }
     else
     {
@@ -1909,7 +1643,7 @@ IThorChildGraph * EclAgent::resolveChildQuery(__int64 subgraphId, IHThorArg * co
 }
 
 IEclGraphResults * EclAgent::resolveLocalQuery(__int64 activityId)
-{ 
+{
     throwUnexpected();
 }
 

+ 1 - 267
roxie/ccd/ccdcontext.cpp

@@ -1585,7 +1585,7 @@ public:
 
         if (realThor)
         {
-            executeThorGraph(name);
+            executeThorGraph(name, *workUnit, queryComponentConfig());
         }
         else
         {
@@ -2267,272 +2267,6 @@ protected:
         }
         throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
     }
-
-    // Copied from eclgraph.cpp, in the hope that we will be deleting that code soon
-    void executeThorGraph(const char *graphName)
-    {
-        assertex(workUnit);
-        StringAttr wuid(workUnit->queryWuid());
-
-        unsigned timelimit = workUnit->getDebugValueInt("thorConnectTimeout", defaultThorConnectTimeout);
-
-#ifdef _CONTAINERIZED
-        // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
-        setWUState(WUStateBlocked);
-
-        if (queryComponentConfig().hasProp("@queue"))
-        {
-            bool multiJobLinger = queryComponentConfig().getPropBool("@multiJobLinger");
-            if (executeGraphOnLingeringThor(*workUnit, graphName, multiJobLinger ? queryComponentConfig().queryProp("@queue") : nullptr))
-                PROGLOG("Existing lingering Thor handled graph: %s", graphName);
-            else
-            {
-                VStringBuffer queueName("%s.thor", queryComponentConfig().queryProp("@queue"));
-                DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
-                Owned<IJobQueue> queue = createJobQueue(queueName.str());
-                VStringBuffer jobName("%s/%s", wuid.get(), graphName);
-                IJobQueueItem *item = createJobQueueItem(jobName);
-                queue->enqueue(item);
-            }
-
-            unsigned runningTimeLimit = workUnit->getDebugValueInt("maxRunTime", 0);
-            runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;
-
-            std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
-            for (unsigned i=0; i<2; i++)
-            {
-                WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
-                DBGLOG("Got state: %s", getWorkunitStateStr(state));
-                if (WUStateWait == state) // already finished
-                    break;
-                else if ((INFINITE != timelimit) && (WUStateUnknown == state))
-                    throw makeStringExceptionV(0, "Query %s failed to start within specified timelimit (%u) seconds", wuid.str(), timelimit);
-                else
-                {
-                    auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
-                    if (it == expectedStates.end())
-                        throw makeStringExceptionV(0, "Query %s failed, state: %s", wuid.str(), getWorkunitStateStr(state));
-                }
-                timelimit = runningTimeLimit;
-                expectedStates = { WUStateWait };
-            }
-        }
-        else
-        {
-            VStringBuffer job("%s-%s", wuid.str(), graphName);
-            runK8sJob("thormaster", wuid, job, queryComponentConfig().getPropBool("@deleteJobs", true), { { "graphName", graphName} });
-        }
-
-        if (workUnit->getExceptionCount())
-        {
-            Owned<IConstWUExceptionIterator> iter = &workUnit->getExceptions();
-            ForEach(*iter)
-            {
-                IConstWUException &e = iter->query();
-                SCMStringBuffer str;
-                e.getExceptionSource(str);
-                if (streq("thormasterexception", str.s))
-                {
-                    str.clear();
-                    e.getExceptionMessage(str);
-                    throw makeStringException(e.getExceptionCode(), str.str());
-                }
-            }
-        }
-        else
-        {
-            WorkunitUpdate w(&workUnit->lock());
-            WUState state = w->getState();
-            if (WUStateFailed == state)
-                throw makeStringException(0, "Workunit failed");
-        }
-
-        setWUState(WUStateRunning);
-
-#else
-        StringAttr owner(workUnit->queryUser());
-        StringAttr cluster(workUnit->queryClusterName());
-
-        int priority = workUnit->getPriorityValue();
-#ifdef _CONTAINERIZED
-        StringBuffer queueName;
-        queueName.append(cluster).append(".thor");
-#else
-        Owned<IConstWUClusterInfo> c = getTargetClusterInfo(cluster.str());
-        if (!c)
-            throw MakeStringException(0, "Invalid thor cluster %s", cluster.str());
-        SCMStringBuffer queueName;
-        c->getThorQueue(queueName);
-#endif
-        Owned<IJobQueue> jq = createJobQueue(queueName.str());
-        Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
-        bool resubmit;
-        do // loop if pause interrupted graph and needs resubmitting on resume
-        {
-            resubmit = false; // set if job interrupted in thor
-            if (WUStatePaused == workUnit->getState()) // check initial state - and wait if paused
-            {
-                for (;;)
-                {
-                    WUAction action = wuFactory->waitForWorkUnitAction(wuid, queryWorkUnit()->getAction());
-                    if (action == WUActionUnknown)
-                        throw new WorkflowException(0, "Workunit aborting", 0, WorkflowException::ABORT, MSGAUD_user);
-                    if (action != WUActionPause && action != WUActionPauseNow)
-                        break;
-                }
-            }
-            setWUState(WUStateBlocked);
-
-            class cPollThread: public Thread  // MORE - why do we ned a thread here?
-            {
-                Semaphore sem;
-                bool stopped;
-                IJobQueue *jq;
-                IConstWorkUnit *wu;
-            public:
-
-                bool timedout;
-                CTimeMon tm;
-                cPollThread(IJobQueue *_jq, IConstWorkUnit *_wu, unsigned timelimit)
-                    : tm(timelimit)
-                {
-                    stopped = false;
-                    jq = _jq;
-                    wu = _wu;
-                    timedout = false;
-                }
-                ~cPollThread()
-                {
-                    stop();
-                }
-                int run()
-                {
-                    while (!stopped) {
-                        sem.wait(ABORT_POLL_PERIOD);
-                        if (stopped)
-                            break;
-                        if (tm.timedout()) {
-                            timedout = true;
-                            stopped = true;
-                            jq->cancelInitiateConversation();
-                        }
-                        else if (wu->aborting()) {
-                            stopped = true;
-                            jq->cancelInitiateConversation();
-                        }
-
-                    }
-                    return 0;
-                }
-                void stop()
-                {
-                    stopped = true;
-                    sem.signal();
-                }
-            } pollthread(jq, workUnit, timelimit*1000);
-
-            pollthread.start();
-
-            PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
-            IJobQueueItem* item = createJobQueueItem(wuid.str());
-            item->setOwner(owner.str());
-            item->setPriority(priority);
-            Owned<IConversation> conversation = jq->initiateConversation(item);
-            bool got = conversation.get()!=NULL;
-            pollthread.stop();
-            pollthread.join();
-            if (!got)
-            {
-                if (pollthread.timedout)
-                    throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d) seconds", wuid.str(), timelimit);
-                throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
-            }
-            // get the thor ep from whoever picked up
-
-            SocketEndpoint thorMaster;
-            MemoryBuffer msg;
-            if (!conversation->recv(msg,1000*60)) {
-                throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
-            }
-            thorMaster.deserialize(msg);
-            msg.clear().append(graphName);
-            SocketEndpoint myep;
-            myep.setLocalHost(0);
-            myep.serialize(msg);  // only used for tracing
-            if (!conversation->send(msg)) {
-                StringBuffer s("Failed to send query to Thor on ");
-                thorMaster.getUrlStr(s);
-                throw MakeStringExceptionDirect(-1, s.str()); // maybe retry?
-            }
-
-            StringBuffer eps;
-            PROGLOG("Thor on %s running %s",thorMaster.getUrlStr(eps).str(),wuid.str());
-            MemoryBuffer reply;
-            try
-            {
-                if (!conversation->recv(reply,INFINITE))
-                {
-                    StringBuffer s("Failed to receive reply from thor ");
-                    thorMaster.getUrlStr(s);
-                    throw MakeStringExceptionDirect(-1, s.str());
-                }
-            }
-            catch (IException *e)
-            {
-                StringBuffer s("Failed to receive reply from thor ");
-                thorMaster.getUrlStr(s);
-                s.append("; (").append(e->errorCode()).append(", ");
-                e->errorMessage(s).append(")");
-                e->Release();
-                throw MakeStringExceptionDirect(-1, s.str());
-            }
-            unsigned replyCode;
-            reply.read(replyCode);
-            switch ((ThorReplyCodes) replyCode)
-            {
-                case DAMP_THOR_REPLY_PAUSED:
-                {
-                    bool isException ;
-                    reply.read(isException);
-                    if (isException)
-                    {
-                        Owned<IException> e = deserializeException(reply);
-                        VStringBuffer str("Pausing job %s caused exception", wuid.str());
-                        EXCLOG(e, str.str());
-                    }
-                    WorkunitUpdate w(&workUnit->lock());
-                    w->setState(WUStatePaused); // will trigger executeThorGraph to pause next time around.
-                    WUAction action = w->getAction();
-                    switch (action)
-                    {
-                        case WUActionPause:
-                        case WUActionPauseNow:
-                            w->setAction(WUActionUnknown);
-                    }
-                    resubmit = true; // JCSMORE - all subgraph _could_ be done, thor will check though and not rerun
-                    break;
-                }
-                case DAMP_THOR_REPLY_GOOD:
-                    break;
-                case DAMP_THOR_REPLY_ERROR:
-                {
-                    throw deserializeException(reply);
-                }
-                case DAMP_THOR_REPLY_ABORT:
-                {
-                    Owned<IException> e = deserializeException(reply);
-                    StringBuffer msg;
-                    e->errorMessage(msg);
-                    throw new WorkflowException(e->errorCode(), msg.str(), 0, WorkflowException::ABORT, MSGAUD_user);
-                }
-                default:
-                    throwUnexpected();
-            }
-            workUnit->forceReload();
-        }
-        while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
-#endif
-    }
 };
 
 //-----------------------------------------------------------------------------------------------