浏览代码

Merge pull request #5250 from richardkchapman/roxie-thor

HPCC-10576 Add code to roxie to call Thor graphs

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 年之前
父节点
当前提交
c234036c56
共有 5 个文件被更改,包括 301 次插入46 次删除
  1. 1 2
      ecl/eclagent/eclgraph.cpp
  2. 2 0
      roxie/ccd/ccd.hpp
  3. 273 26
      roxie/ccd/ccdcontext.cpp
  4. 2 0
      roxie/ccd/ccdmain.cpp
  5. 23 18
      roxie/ccd/ccdquery.cpp

+ 1 - 2
ecl/eclagent/eclgraph.cpp

@@ -1520,7 +1520,7 @@ void EclAgent::executeThorGraph(const char * graphName)
             {
                 CriticalBlock b(crit);
                 if (0 == subId) return;
-                if (valueLen && valueLen==strlen("resume") && (0 == strncmp("resume", (const char *)valueData, valueLen)))
+                if (valueLen==strlen("resume") && (0 == strncmp("resume", (const char *)valueData, valueLen)))
                     sem.signal();
             }
             bool wait()
@@ -1562,7 +1562,6 @@ void EclAgent::executeThorGraph(const char * graphName)
         {
             Semaphore sem;
             bool stopped;
-            unsigned starttime;
             IJobQueue *jq;
             IConstWorkUnit *wu;
         public:

+ 2 - 0
roxie/ccd/ccd.hpp

@@ -119,6 +119,7 @@ extern unsigned myNodeIndex;
 #endif
 
 #define ROXIE_DALI_CONNECT_TIMEOUT 5000
+#define ABORT_POLL_PERIOD 5000
 
 class RemoteActivityId
 {
@@ -359,6 +360,7 @@ extern unsigned socketCheckInterval;
 extern memsize_t defaultMemoryLimit;
 extern unsigned defaultTimeLimit[3];
 extern unsigned defaultWarnTimeLimit[3];
+extern unsigned defaultThorConnectTimeout;
 extern bool pretendAllOpt;
 extern ClientCertificate clientCert;
 extern bool useHardLink;

+ 273 - 26
roxie/ccd/ccdcontext.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include "jlib.hpp"
 
+#include "wujobq.hpp"
 #include "nbcd.hpp"
 #include "rtlread_imp.hpp"
 #include "thorplugin.hpp"
@@ -1045,38 +1046,44 @@ public:
         if (queryTraceLevel() > 8)
             CTXLOG("Executing graph %s", name);
 
-        assertex(!realThor);
-        bool created = false;
-        cycle_t startCycles = get_cycles_now();
-        try
+        if (realThor)
         {
-            beginGraph(name);
-            created = true;
-            runGraph();
+            executeThorGraph(name);
         }
-        catch (IException *e)
+        else
         {
-            if (e->errorAudience() == MSGAUD_operator)
-                EXCLOG(e, "Exception thrown in query - cleaning up");  // if an IException is throw let EXCLOG determine if a trap should be generated
-            else
+            bool created = false;
+            cycle_t startCycles = get_cycles_now();
+            try
             {
-                StringBuffer s;
-                CTXLOG("Exception thrown in query - cleaning up: %d: %s", e->errorCode(), e->errorMessage(s).str());
+                beginGraph(name);
+                created = true;
+                runGraph();
             }
-            if (created)
-                endGraph(startCycles, true);
-            CTXLOG("Done cleaning up");
-            throw;
-        }
-        catch (...)
-        {
-            CTXLOG("Exception thrown in query - cleaning up");
-            if (created)
-                endGraph(startCycles, true);
-            CTXLOG("Done cleaning up");
-            throw;
+            catch (IException *e)
+            {
+                if (e->errorAudience() == MSGAUD_operator)
+                    EXCLOG(e, "Exception thrown in query - cleaning up");  // if an IException is throw let EXCLOG determine if a trap should be generated
+                else
+                {
+                    StringBuffer s;
+                    CTXLOG("Exception thrown in query - cleaning up: %d: %s", e->errorCode(), e->errorMessage(s).str());
+                }
+                if (created)
+                    endGraph(startCycles, true);
+                CTXLOG("Done cleaning up");
+                throw;
+            }
+            catch (...)
+            {
+                CTXLOG("Exception thrown in query - cleaning up");
+                if (created)
+                    endGraph(startCycles, true);
+                CTXLOG("Done cleaning up");
+                throw;
+            }
+            endGraph(startCycles, false);
         }
-        endGraph(startCycles, false);
     }
 
     virtual IActivityGraph * queryChildGraph(unsigned  id)
@@ -1679,6 +1686,246 @@ protected:
         }
         throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value %s", stepname);
     }
+
+    // Copied from eclgraph.cpp, in the hope that we will be deleting that code soon
+    void executeThorGraph(const char *graphName)
+    {
+        assertex(workUnit);
+        SCMStringBuffer wuid;
+        workUnit->getWuid(wuid);
+
+        SCMStringBuffer cluster;
+        SCMStringBuffer owner;
+        workUnit->getClusterName(cluster);
+        workUnit->getUser(owner);
+        int priority = workUnit->getPriorityValue();
+        unsigned timelimit = workUnit->getDebugValueInt("thorConnectTimeout", defaultThorConnectTimeout);
+        Owned<IConstWUClusterInfo> c = getTargetClusterInfo(cluster.str());
+        if (!c)
+            throw MakeStringException(0, "Invalid thor cluster %s", cluster.str());
+        SCMStringBuffer queueName;
+        c->getThorQueue(queueName);
+        Owned<IJobQueue> jq = createJobQueue(queueName.str());
+
+        bool resubmit;
+        do // loop if pause interrupted graph and needs resubmitting on resume
+        {
+            resubmit = false; // set if job interrupted in thor
+            class CWorkunitResumeHandler : public CInterface, implements ISDSSubscription
+            {
+                IConstWorkUnit &wu;
+                StringBuffer xpath;
+                StringAttr wuid;
+                SubscriptionId subId;
+                CriticalSection crit;
+                Semaphore sem;
+
+                void unsubscribe()
+                {
+                    CriticalBlock b(crit);
+                    if (subId)
+                    {
+                        SubscriptionId _subId = subId;
+                        subId = 0;
+                        querySDS().unsubscribe(_subId);
+                    }
+                }
+            public:
+                IMPLEMENT_IINTERFACE;
+                CWorkunitResumeHandler(IConstWorkUnit &_wu) : wu(_wu)
+                {
+                    xpath.append("/WorkUnits/");
+                    SCMStringBuffer istr;
+                    wu.getWuid(istr);
+                    wuid.set(istr.str());
+                    xpath.append(wuid.get()).append("/Action");
+                    subId = 0;
+                }
+                ~CWorkunitResumeHandler()
+                {
+                    unsubscribe();
+                }
+                void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+                {
+                    CriticalBlock b(crit);
+                    if (0 == subId) return;
+                    if (valueLen==strlen("resume") && (0 == strncmp("resume", (const char *)valueData, valueLen)))
+                        sem.signal();
+                }
+                bool wait()
+                {
+                    subId = querySDS().subscribe(xpath.str(), *this, false, true);
+                    assertex(subId);
+                    PROGLOG("Job %s paused, waiting for resume/abort", wuid.get());
+                    bool ret = true;
+                    while (!sem.wait(10000))
+                    {
+                        wu.forceReload();
+                        if (WUStatePaused != wu.getState() || wu.aborting())
+                        {
+                            SCMStringBuffer str;
+                            wu.getStateDesc(str);
+                            PROGLOG("Aborting pause job %s, state : %s", wuid.get(), str.str());
+                            ret = false;
+                            break;
+                        }
+                    }
+                    unsubscribe();
+                    return ret;
+                }
+            } workunitResumeHandler(*workUnit);
+
+            if (WUStatePaused == workUnit->getState()) // check initial state - and wait if paused
+            {
+                if (!workunitResumeHandler.wait())
+                    throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
+            }
+            setWUState(WUStateBlocked);
+
+            class cPollThread: public Thread  // MORE - why do we ned a thread here?
+            {
+                Semaphore sem;
+                bool stopped;
+                IJobQueue *jq;
+                IConstWorkUnit *wu;
+            public:
+
+                bool timedout;
+                CTimeMon tm;
+                cPollThread(IJobQueue *_jq, IConstWorkUnit *_wu, unsigned timelimit)
+                    : tm(timelimit)
+                {
+                    stopped = false;
+                    jq = _jq;
+                    wu = _wu;
+                    timedout = false;
+                }
+                ~cPollThread()
+                {
+                    stop();
+                }
+                int run()
+                {
+                    while (!stopped) {
+                        sem.wait(ABORT_POLL_PERIOD);
+                        if (stopped)
+                            break;
+                        if (tm.timedout()) {
+                            timedout = true;
+                            stopped = true;
+                            jq->cancelInitiateConversation();
+                        }
+                        else if (wu->aborting()) {
+                            stopped = true;
+                            jq->cancelInitiateConversation();
+                        }
+
+                    }
+                    return 0;
+                }
+                void stop()
+                {
+                    stopped = true;
+                    sem.signal();
+                }
+            } pollthread(jq, workUnit, timelimit*1000);
+
+            pollthread.start();
+
+            PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
+            IJobQueueItem* item = createJobQueueItem(wuid.str());
+            item->setOwner(owner.str());
+            item->setPriority(priority);
+            Owned<IConversation> conversation = jq->initiateConversation(item);
+            bool got = conversation.get()!=NULL;
+            pollthread.stop();
+            pollthread.join();
+            if (!got)
+            {
+                if (pollthread.timedout)
+                    throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d) seconds", wuid.str(), timelimit);
+                throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
+            }
+            // get the thor ep from whoever picked up
+
+            SocketEndpoint thorMaster;
+            MemoryBuffer msg;
+            if (!conversation->recv(msg,1000*60)) {
+                throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
+            }
+            thorMaster.deserialize(msg);
+            msg.clear().append(graphName);
+            SocketEndpoint myep;
+            myep.setLocalHost(0);
+            myep.serialize(msg);  // only used for tracing
+            if (!conversation->send(msg)) {
+                StringBuffer s("Failed to send query to Thor on ");
+                thorMaster.getUrlStr(s);
+                throw MakeStringExceptionDirect(-1, s.str()); // maybe retry?
+            }
+
+            StringBuffer eps;
+            PROGLOG("Thor on %s running %s",thorMaster.getUrlStr(eps).str(),wuid.str());
+            MemoryBuffer reply;
+            try
+            {
+                if (!conversation->recv(reply,INFINITE))
+                {
+                    StringBuffer s("Failed to receive reply from thor ");
+                    thorMaster.getUrlStr(s);
+                    throw MakeStringExceptionDirect(-1, s.str());
+                }
+            }
+            catch (IException *e)
+            {
+                StringBuffer s("Failed to receive reply from thor ");
+                thorMaster.getUrlStr(s);
+                s.append("; (").append(e->errorCode()).append(", ");
+                e->errorMessage(s).append(")");
+                throw MakeStringExceptionDirect(-1, s.str());
+            }
+            ThorReplyCodes replyCode;
+            reply.read((unsigned &)replyCode);
+            switch (replyCode)
+            {
+                case DAMP_THOR_REPLY_PAUSED:
+                {
+                    bool isException ;
+                    reply.read(isException);
+                    if (isException)
+                    {
+                        Owned<IException> e = deserializeException(reply);
+                        VStringBuffer str("Pausing job %s caused exception", wuid.str());
+                        EXCLOG(e, str.str());
+                    }
+                    WorkunitUpdate w(&workUnit->lock());
+                    w->setState(WUStatePaused); // will trigger executeThorGraph to pause next time around.
+                    WUAction action = w->getAction();
+                    switch (action)
+                    {
+                        case WUActionPause:
+                        case WUActionPauseNow:
+                            w->setAction(WUActionUnknown);
+                    }
+                    resubmit = true; // JCSMORE - all subgraph _could_ be done, thor will check though and not rerun
+                    break;
+                }
+                case DAMP_THOR_REPLY_GOOD:
+                    break;
+                case DAMP_THOR_REPLY_ERROR:
+                {
+                    throw deserializeException(reply);
+                }
+                case DAMP_THOR_REPLY_ABORT:
+                    throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
+                default:
+                    throwUnexpected();
+            }
+            workUnit->forceReload();
+        }
+        while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
+
+    }
 };
 
 IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, unsigned _timeLimit, memsize_t _memoryLimit, IRoxieQueryPacket *packet)

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -117,6 +117,7 @@ unsigned memoryStatsInterval = 0;
 memsize_t defaultMemoryLimit;
 unsigned defaultTimeLimit[3] = {0, 0, 0};
 unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
+unsigned defaultThorConnectTimeout;
 
 unsigned defaultParallelJoinPreload = 0;
 unsigned defaultPrefetchProjectPreload = 10;
@@ -706,6 +707,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
         defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
         defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
+        defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
 
         defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
         defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);

+ 23 - 18
roxie/ccd/ccdquery.cpp

@@ -284,6 +284,7 @@ protected:
 
     bool isSuspended;
     bool enableFieldTranslation;
+    ClusterType targetClusterType;
     unsigned timeLimit;
     unsigned warnTimeLimit;
     memsize_t memoryLimit;
@@ -928,6 +929,8 @@ public:
             warnTimeLimit = (unsigned) wu->getDebugValueInt("warnTimeLimit", 0);
             SCMStringBuffer bStr;
             enableFieldTranslation = strToBool(wu->getDebugValue("layoutTranslationEnabled", bStr).str());
+            bStr.clear();
+            targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
 
             // MORE - does package override stateInfo, or vice versa?
 
@@ -939,26 +942,28 @@ public:
                 timeLimit = (unsigned) stateInfo->getPropInt("@timeLimit", timeLimit);
                 warnTimeLimit = (unsigned) stateInfo->getPropInt("@warnTimeLimit", warnTimeLimit);
             }
-
-            Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
-            SCMStringBuffer graphNameStr;
-            ForEach(*graphs)
+            if (targetClusterType == RoxieCluster)
             {
-                graphs->query().getName(graphNameStr);
-                const char *graphName = graphNameStr.s.str();
-                Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
-                try
-                {
-                    ActivityArray *activities = loadGraph(*graphXgmml, graphName);
-                    graphMap.setValue(graphName, activities);
-                }
-                catch (IException *E)
+                Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
+                SCMStringBuffer graphNameStr;
+                ForEach(*graphs)
                 {
-                    StringBuffer m;
-                    E->errorMessage(m);
-                    suspend(true, m.str(), NULL, false);
-                    ERRLOG("Query %s suspended: %s", id.get(), m.str());
-                    E->Release();
+                    graphs->query().getName(graphNameStr);
+                    const char *graphName = graphNameStr.s.str();
+                    Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
+                    try
+                    {
+                        ActivityArray *activities = loadGraph(*graphXgmml, graphName);
+                        graphMap.setValue(graphName, activities);
+                    }
+                    catch (IException *E)
+                    {
+                        StringBuffer m;
+                        E->errorMessage(m);
+                        suspend(true, m.str(), NULL, false);
+                        ERRLOG("Query %s suspended: %s", id.get(), m.str());
+                        E->Release();
+                    }
                 }
             }
         }