Sfoglia il codice sorgente

Merge pull request #15320 from jakesmith/HPCC-26457-wuidread-delay-resultread

HPCC-26457 Prevent wuidread resolving result too early.

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 3 anni fa
parent
commit
e8c4e4af7e

+ 2 - 44
thorlcr/activities/wuidread/thwuidread.cpp

@@ -27,7 +27,7 @@ class CWorkUnitReadMaster : public CMasterActivity
 public:
     CWorkUnitReadMaster(CMasterGraphElement * info) : CMasterActivity(info) { }
 
-    virtual void handleSlaveMessage(CMessageBuffer &msg)
+    virtual void handleSlaveMessage(CMessageBuffer &msg) override
     {
         IHThorWorkunitReadArg *helper = (IHThorWorkunitReadArg *)queryHelper();
         size32_t lenData;
@@ -43,49 +43,7 @@ public:
     }
 };
 
-static bool getWorkunitResultFilename(CGraphElementBase &container, StringBuffer & diskFilename, const char * wuid, const char * stepname, int sequence)
-{
-    try
-    {
-        Owned<IConstWUResult> result;
-        if (wuid)
-            result.setown(container.queryCodeContext()->getExternalResult(wuid, stepname, sequence));
-        else
-            result.setown(container.queryCodeContext()->getResultForGet(stepname, sequence));
-        if (!result)
-            throw MakeThorException(TE_FailedToRetrieveWorkunitValue, "Failed to find value %s:%d in workunit %s", stepname?stepname:"(null)", sequence, wuid?wuid:"(null)");
-
-        SCMStringBuffer tempFilename;
-        result->getResultFilename(tempFilename);
-        if (tempFilename.length() == 0)
-            return false;
-
-        diskFilename.append("~").append(tempFilename.str());
-        return true;
-    }
-    catch (IException * e) 
-    {
-        StringBuffer text; 
-        e->errorMessage(text); 
-        e->Release();
-        throw MakeThorException(TE_FailedToRetrieveWorkunitValue, "Failed to find value %s:%d in workunit %s [%s]", stepname?stepname:"(null)", sequence, wuid?wuid:"(null)", text.str());
-    }
-    return false;
-}
-
-
 CActivityBase *createWorkUnitActivityMaster(CMasterGraphElement *container)
 {
-    StringBuffer diskFilename;
-    IHThorWorkunitReadArg *wuReadHelper = (IHThorWorkunitReadArg *)container->queryHelper();
-    wuReadHelper->onCreate(container->queryCodeContext(), NULL, NULL);
-    OwnedRoxieString fromWuid(wuReadHelper->getWUID());
-    if (getWorkunitResultFilename(*container, diskFilename, fromWuid, wuReadHelper->queryName(), wuReadHelper->querySequence()))
-    {
-        Owned<IHThorDiskReadArg> diskReadHelper = createWorkUnitReadArg(diskFilename, LINK(wuReadHelper));
-        Owned<CActivityBase> retAct = createDiskReadActivityMaster(container, diskReadHelper);
-        return retAct.getClear();
-    }
-    else
-        return new CWorkUnitReadMaster(container);
+    return new CWorkUnitReadMaster(container);
 }

+ 63 - 9
thorlcr/master/thactivitymaster.cpp

@@ -80,28 +80,82 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 
 CActivityBase *createGroupActivityMaster(CMasterGraphElement *container);
 
+static bool getWorkunitResultFilename(CGraphElementBase &container, StringBuffer & diskFilename, const char * wuid, const char * stepname, int sequence)
+{
+    try
+    {
+        Owned<IConstWUResult> result;
+        if (wuid)
+            result.setown(container.queryCodeContext()->getExternalResult(wuid, stepname, sequence));
+        else
+            result.setown(container.queryCodeContext()->getResultForGet(stepname, sequence));
+        if (!result)
+            throw MakeThorException(TE_FailedToRetrieveWorkunitValue, "Failed to find value %s:%d in workunit %s", stepname?stepname:"(null)", sequence, wuid?wuid:"(null)");
+
+        SCMStringBuffer tempFilename;
+        result->getResultFilename(tempFilename);
+        if (tempFilename.length() == 0)
+            return false;
+
+        diskFilename.append("~").append(tempFilename.str());
+        return true;
+    }
+    catch (IException * e) 
+    {
+        StringBuffer text; 
+        e->errorMessage(text); 
+        e->Release();
+        throw MakeThorException(TE_FailedToRetrieveWorkunitValue, "Failed to find value %s:%d in workunit %s [%s]", stepname?stepname:"(null)", sequence, wuid?wuid:"(null)", text.str());
+    }
+    return false;
+}
+
+
 class CGenericMasterGraphElement : public CMasterGraphElement
 {
+    StringBuffer wuidDiskFilename;
+    bool wuidDiskReadChecked = false;
+
+    void checkWuidDiskRead()
+    {
+        if (wuidDiskReadChecked)
+            return;
+        // replace wuidread act. with diskread act. if needed
+        wuidDiskFilename.clear();
+        IHThorWorkunitReadArg *wuReadHelper = (IHThorWorkunitReadArg *)queryHelper();
+        wuReadHelper->onCreate(queryCodeContext(), NULL, NULL);
+        OwnedRoxieString fromWuid(wuReadHelper->getWUID());
+        if (getWorkunitResultFilename(*this, wuidDiskFilename, fromWuid, wuReadHelper->queryName(), wuReadHelper->querySequence()))
+        {
+            Owned<IHThorDiskReadArg> diskReadHelper = createWorkUnitReadArg(wuidDiskFilename, LINK(wuReadHelper));
+            activity.setown(createDiskReadActivityMaster(this, diskReadHelper));
+        }
+        else
+            activity.setown(factory(kind));
+        wuidDiskReadChecked = true;
+    }
 public:
     CGenericMasterGraphElement(CGraphBase &owner, IPropertyTree &xgmml) : CMasterGraphElement(owner, xgmml)
     {
     }
-    virtual void serializeCreateContext(MemoryBuffer &mb)
+    virtual void serializeCreateContext(MemoryBuffer &mb) override
     {
         // bit of hack, need to tell slave if wuidread converted to diskread (see master activity)
         CMasterGraphElement::serializeCreateContext(mb);
         if (kind == TAKworkunitread)
         {
-            if (!activity)
-                doCreateActivity();
-            IHThorArg *helper = activity->queryHelper();
-            IHThorDiskReadArg *diskHelper = QUERYINTERFACE(helper, IHThorDiskReadArg);
-            mb.append(NULL != diskHelper); // flag to slaves that they should create diskread
-            if (diskHelper)
+            checkWuidDiskRead();
+
+            /* the meta information below is used by the worker at factory time
+             * to create the correct activity
+             */
+            if (wuidDiskFilename.length())
             {
-                OwnedRoxieString fileName(diskHelper->getFileName());
-                mb.append(fileName);
+                mb.append(true); // flag to slaves that they should create diskread
+                mb.append(wuidDiskFilename);
             }
+            else
+                mb.append(false);
         }
     }
     virtual CActivityBase *factory(ThorActivityKind kind)