Explorar o código

HPCC-24490 Ensure queued thor job workflow items are unique by adding graph name

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith %!s(int64=4) %!d(string=hai) anos
pai
achega
7f68967011
Modificáronse 2 ficheiros con 16 adicións e 10 borrados
  1. 8 7
      common/workunit/workunit.cpp
  2. 8 3
      thorlcr/master/thgraphmanager.cpp

+ 8 - 7
common/workunit/workunit.cpp

@@ -13780,7 +13780,8 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
         pollthread.start();
 
         PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
-        IJobQueueItem* item = createJobQueueItem(wuid.str());
+        VStringBuffer wuidGraph("%s/%s", wuid.str(), graphName);
+        IJobQueueItem* item = createJobQueueItem(wuidGraph.str());
         item->setOwner(owner.str());
         item->setPriority(priority);
         Owned<IConversation> conversation = jq->initiateConversation(item);
@@ -13790,17 +13791,17 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
         if (!got)
         {
             if (pollthread.timedout)
-                throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d) seconds", wuid.str(), timelimit);
-            throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
+                throw MakeStringException(0, "Query %s failed to start within specified timelimit (%u) seconds", wuidGraph.str(), timelimit);
+            throw MakeStringException(0, "Query %s cancelled (1)", wuidGraph.str());
         }
         // get the thor ep from whoever picked up
 
         SocketEndpoint thorMaster;
         MemoryBuffer msg;
         if (!conversation->recv(msg,1000*60))
-            throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
+            throw MakeStringException(0, "Query %s cancelled (2)", wuidGraph.str());
         thorMaster.deserialize(msg);
-        msg.clear().append(graphName);
+        msg.clear();
         SocketEndpoint myep;
         myep.setLocalHost(0);
         myep.serialize(msg);  // only used for tracing
@@ -13811,7 +13812,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
         }
 
         StringBuffer eps;
-        PROGLOG("Thor on %s running %s",thorMaster.getUrlStr(eps).str(),wuid.str());
+        PROGLOG("Thor on %s running %s", thorMaster.getUrlStr(eps).str(), wuidGraph.str());
         MemoryBuffer reply;
         try
         {
@@ -13842,7 +13843,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
                 if (isException)
                 {
                     Owned<IException> e = deserializeException(reply);
-                    VStringBuffer str("Pausing job %s caused exception", wuid.get());
+                    VStringBuffer str("Pausing job %s caused exception", wuidGraph.str());
                     EXCLOG(e, str.str());
                 }
                 Owned<IWorkUnit> w = &workunit.lock();

+ 8 - 3
thorlcr/master/thgraphmanager.cpp

@@ -678,7 +678,14 @@ void CJobManager::run()
             PROGLOG("acceptConversation aborted - terminating");
             break;
         }
-        StringAttr graphName;
+        StringAttr graphName, wuid;
+        const char *wuidGraph = item->queryWUID(); // actually <wuid>/<graphName>
+        StringArray sArray;
+        sArray.appendList(wuidGraph, "/");
+        assertex(2 == sArray.ordinality());
+        wuid.set(sArray.item(0));
+        graphName.set(sArray.item(1));
+
         handlingConversation = true;
         SocketEndpoint agentep;
         try
@@ -695,7 +702,6 @@ void CJobManager::run()
                 IWARNLOG("recv conversation failed");
                 continue;
             }
-            msg.read(graphName);
             agentep.deserialize(msg);
         }
         catch (IException *e)
@@ -705,7 +711,6 @@ void CJobManager::run()
         }
         Owned<IWorkUnitFactory> factory;
         Owned<IConstWorkUnit> workunit;
-        const char *wuid = item->queryWUID();
         bool allDone = false;
         try
         {