Bladeren bron

Fix issues with distributed results in child queries

Rewrite the way intermediate child query/loop query results are handled.
They need to be accessible by other global queries and remain distributed,
but also collated (on demand) by local child queries or context requests

Also fix an abort issue, where query could hang.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 jaren geleden
bovenliggende
commit
57702ef2b8

+ 12 - 131
thorlcr/activities/loop/thloop.cpp

@@ -124,7 +124,7 @@ public:
         if (!CLoopActivityMasterBase::doinit())
             return;
         IHThorLoopArg *helper = (IHThorLoopArg *) queryHelper();
-        Owned<IThorGraphResults> results = createThorGraphResults(3);
+        Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(3);
         queryContainer().queryLoopGraph()->prepareLoopResults(*this, results);
         if (helper->getFlags() & IHThorLoopArg::LFcounter)
             queryContainer().queryLoopGraph()->prepareCounterResult(*this, results, 1, 2);
@@ -172,7 +172,7 @@ public:
         if (!CLoopActivityMasterBase::doinit())
             return;
         IHThorGraphLoopArg *helper = (IHThorGraphLoopArg *) queryHelper();
-        Owned<IThorGraphResults> results = createThorGraphResults(1);
+        Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
         if (helper->getFlags() & IHThorGraphLoopArg::GLFcounter)
             queryContainer().queryLoopGraph()->prepareCounterResult(*this, results, 1, 0);
         loopGraph->setResults(results);
@@ -189,9 +189,8 @@ public:
             return;
         unsigned maxIterations = helper->numIterations();
         if ((int)maxIterations < 0) maxIterations = 0;
-        Owned<IThorGraphResults> loopResults = createThorGraphResults(maxIterations);
+        Owned<IThorGraphResults> loopResults = queryGraph().createThorGraphResults(maxIterations);
         IThorResult *result = loopResults->createResult(*this, 0, this, true);
-        Owned<IRowWriter> resultWriter = result->getWriter();
 
         helper->createParentExtract(extractBuilder);
 
@@ -215,139 +214,25 @@ CActivityBase *createGraphLoopActivityMaster(CMasterGraphElement *container)
 
 class CLocalResultActivityMasterBase : public CMasterActivity
 {
-    PointerArrayOf<CThorExpandingRowArray> results;
-
 protected:
     Owned<IRowInterfaces> inputRowIf;
 
 public:
     CLocalResultActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info)
     {
-        mpTag = container.queryJob().allocateMPTag();
-        for (unsigned n=0; n<container.queryJob().querySlaves(); n++)
-            results.append(new CThorExpandingRowArray(*this));
-    }
-    ~CLocalResultActivityMasterBase()
-    {
-        ForEachItemIn(r, results)
-        {
-            CThorExpandingRowArray *result = results.item(r);
-            delete result;
-        }
-        results.kill();
     }
     virtual void init()
     {
         reset();
     }
-    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
-    {
-        if (!container.queryLocalOrGrouped())
-            serializeMPtag(dst, mpTag);
-    }
-    virtual IThorResult *createResult() = 0;
+    virtual void createResult() = 0;
     virtual void process()
     {
         assertex(container.queryResultsGraph());
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
         inputRowIf.setown(createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext()));
-
-        IThorResult *result = createResult();
-        if (!result->isLocal())
-        {
-            Owned<IRowWriter> resultWriter = result->getWriter();
-            unsigned todo = container.queryJob().querySlaves();
-            for (unsigned n=0; n<todo; n++)
-                results.item(n)->kill();
-            rank_t sender;
-            MemoryBuffer mb;
-            CMessageBuffer msg;
-            Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
-            CThorStreamDeserializerSource rowSource(stream);
-            loop
-            {
-                loop
-                {
-                    if (abortSoon)
-                        return;
-                    msg.clear();
-                    if (receiveMsg(msg, RANK_ALL, mpTag, &sender, 60*1000))
-                        break;
-                    ActPrintLog("WARNING: tag %d timedout, retrying", (unsigned)mpTag);
-                }
-                sender = sender - 1; // 0 = master
-                if (!msg.length())
-                {
-                    --todo;
-                    if (0 == todo)
-                        break; // done
-                }
-                else
-                {
-                    ThorExpand(msg, mb.clear());
-
-                    CThorExpandingRowArray *slaveResults = results.item(sender);
-                    while (!rowSource.eos())
-                    {
-                        RtlDynamicRowBuilder rowBuilder(inputRowIf->queryRowAllocator());
-                        size32_t sz = inputRowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
-                        slaveResults->append(rowBuilder.finalizeRowClear(sz));
-                    }
-                }
-            }
-            mb.clear();
-            CMemoryRowSerializer mbs(mb);
-            CThorExpandingRowArray *slaveResult = results.item(0);
-            unsigned rowNum=0;
-            unsigned resultNum=1;
-            loop
-            {
-                while (resultNum)
-                {
-                    if (rowNum == slaveResult->ordinality())
-                    {
-                        loop
-                        {
-                            if (resultNum == results.ordinality())
-                            {
-                                resultNum = 0; // eos
-                                break;
-                            }
-                            slaveResult = results.item(resultNum++);
-                            if (slaveResult->ordinality())
-                            {
-                                rowNum = 0;
-                                break;
-                            }
-                        }
-                        if (!resultNum) // eos
-                            break;
-                    }
-                    const void *row = slaveResult->query(rowNum++);
-                    inputRowIf->queryRowSerializer()->serialize(mbs, (const byte *)row);
-                    LinkThorRow(row);
-                    resultWriter->putRow(row);
-                    if (mb.length() > 0x80000)
-                        break;
-                }
-                msg.clear();
-                if (mb.length())
-                {
-                    ThorCompress(mb.toByteArray(), mb.length(), msg);
-                    mb.clear();
-                }
-                BooleanOnOff onOff(receiving);
-                ((CJobMaster &)container.queryJob()).broadcastToSlaves(msg, mpTag, LONGTIMEOUT, NULL, NULL, true);
-                if (0 == msg.length())
-                    break;
-            }
-        }
-    }
-    virtual void abort()
-    {
-        CMasterActivity::abort();
-        cancelReceiveMsg(RANK_ALL, mpTag);
+        createResult();
     }
 };
 
@@ -357,15 +242,11 @@ public:
     CLocalResultActivityMaster(CMasterGraphElement *info) : CLocalResultActivityMasterBase(info)
     {
     }
-    virtual IThorResult *createResult()
+    virtual void createResult()
     {
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
-        bool local = container.queryOwner().isLocalOnly();
-        IThorResult *result = graph->queryResult(helper->querySequence());
-        if (result)
-            local = result->isLocal();
-        return graph->createResult(*this, helper->querySequence(), this, local); // NB graph owns result
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        graph->createResult(*this, helper->querySequence(), this, true); // NB graph owns result
     }
 };
 
@@ -380,11 +261,11 @@ public:
     CGraphLoopResultWriteActivityMaster(CMasterGraphElement *info) : CLocalResultActivityMasterBase(info)
     {
     }
-    virtual IThorResult *createResult()
+    virtual void createResult()
     {
         IHThorGraphLoopResultWriteArg *helper = (IHThorGraphLoopResultWriteArg *)queryHelper();
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
-        return graph->createGraphLoopResult(*this, inputRowIf); // NB graph owns result
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        graph->createGraphLoopResult(*this, inputRowIf, true); // NB graph owns result
     }
 };
 

+ 32 - 131
thorlcr/activities/loop/thloopslave.cpp

@@ -385,7 +385,7 @@ public:
         executed = false;
         maxIterations = helper->numIterations();
         if ((int)maxIterations < 0) maxIterations = 0;
-        loopResults.setown(createThorGraphResults(maxIterations));
+        loopResults.setown(queryGraph().createThorGraphResults(0));
         helper->createParentExtract(extractBuilder);
         dataLinkStart("GRAPHLOOP", container.queryId());
     }
@@ -394,7 +394,7 @@ public:
         if (!executed)
         {
             executed = true;
-            IThorResult *result = loopResults->createResult(*this, 0, this, true);
+            IThorResult *result = loopResults->createResult(*this, 0, this, !queryGraph().isLocalChild());
             Owned<IRowWriter> resultWriter = result->getWriter();
             loop
             {
@@ -447,6 +447,7 @@ class CLocalResultReadActivity : public CSlaveActivity, public CThorDataLink
     IHThorLocalResultReadArg *helper;
     Owned<IRowStream> resultStream;
     unsigned curRow;
+    mptag_t replyTag;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -456,6 +457,7 @@ public:
         helper = (IHThorLocalResultReadArg *)queryHelper();
         input = NULL;
         curRow = 0;
+        replyTag = createReplyTag();
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
@@ -471,14 +473,15 @@ public:
         graph_id resultGraphId = container.queryXGMML().getPropInt("att[@name=\"_graphId\"]/@value");
         if (!resultGraphId)
             resultGraphId = container.queryResultsGraph()->queryGraphId();
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(resultGraphId);
-        Owned<IThorResult> result = graph->getResult(helper->querySequence());
+        Owned<CGraphBase> graph = queryJob().getGraph(resultGraphId);
+        Owned<IThorResult> result = graph->getResult(helper->querySequence(), queryGraph().isLocalChild());
         resultStream.setown(result->getRowStream());
         dataLinkStart("LOCALRESULTREAD", container.queryId());
     }
     virtual void stop()
     {
         abortSoon = true;
+        resultStream.clear();
         dataLinkStop();
     }
     virtual void kill()
@@ -513,42 +516,12 @@ activityslaves_decl CActivityBase *createLocalResultReadSlave(CGraphElementBase
 }
 
 
-void receiveResult(CActivityBase &activity, IRowInterfaces &inputRowIf, IRowWriter &result)
-{
-    const mptag_t &mpTag = activity.queryMpTag();
-    ICommunicator &comm = activity.queryContainer().queryJob().queryJobComm();
-    IOutputRowDeserializer *deserializer = inputRowIf.queryRowDeserializer();
-    IEngineRowAllocator *allocator = inputRowIf.queryRowAllocator();
-    // receive full result back
-    CMessageBuffer msg;
-    MemoryBuffer mb;
-    Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
-    CThorStreamDeserializerSource rowSource(stream);
-    loop
-    {
-        if (activity.queryAbortSoon())
-            return;
-        msg.clear();
-        if (!activity.receiveMsg(msg, 0, mpTag))
-            return;
-        if (0 == msg.length())
-            break;
-        ThorExpand(msg, mb.clear());
-        while (!rowSource.eos()) 
-        {
-            RtlDynamicRowBuilder rowBuilder(allocator);
-            size32_t sz = deserializer->deserialize(rowBuilder, rowSource);
-            result.putRow(rowBuilder.finalizeRowClear(sz));
-        }
-    }
-}
-
 /////////////// local spill write
 
 class CLocalResultSpillActivity : public CSlaveActivity, public CThorDataLink
 {
     IHThorLocalResultSpillArg *helper;
-    bool eoi, lastNull, global;
+    bool eoi, lastNull;
     Owned<IRowWriter> resultWriter;
     MemoryBuffer mb;
 
@@ -578,14 +551,9 @@ public:
         lastNull = eoi = false;
         abortSoon = false;
         assertex(container.queryResultsGraph());
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
-        bool local = container.queryOwner().isLocalOnly();
-        IThorResult *result = graph->queryResult(helper->querySequence());
-        if (result)
-            local = result->isLocal();
-        result = graph->createResult(*this, helper->querySequence(), this, local);  // NB graph owns result
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        IThorResult *result = graph->createResult(*this, helper->querySequence(), this, !queryGraph().isLocalChild());  // NB graph owns result
         resultWriter.setown(result->getWriter());
-        global = !result->isLocal();
         startInput(inputs.item(0));
         dataLinkStart("LOCALRESULTSPILL", container.queryId());
     }
@@ -603,44 +571,21 @@ public:
                 lastNull = true;
             return NULL;
         }
-        if (lastNull)
+        if (lastNull) // looks like I should only bother with this if grouped
         {
-            if (!global)
-                resultWriter->putRow(NULL);
+            resultWriter->putRow(NULL);
             lastNull = false;
         }
-        if (global)
-        {
-            CMemoryRowSerializer mbs(mb);
-            queryRowSerializer()->serialize(mbs, (const byte *)row.get());
-            if (mb.length() > 0x80000)
-                sendResultSoFar();
-        }
-        else
-            resultWriter->putRow(row.getClear());
+        resultWriter->putRow(row.getClear());
         dataLinkIncrement();
         return row.getClear();
     }
     virtual void stop()
     {
-        if (global)
-        {
-            if (mb.length())
-                sendResultSoFar();
-            CMessageBuffer msg;
-            container.queryJob().queryJobComm().send(msg, 0, mpTag, LONGTIMEOUT);
-            receiveResult(*this, *this, *resultWriter);
-        }
         stopInput(inputs.item(0));
         abortSoon = true;
         dataLinkStop();
     }
-    virtual void abort()
-    {
-        CSlaveActivity::abort();
-        if (global)
-            cancelReceiveMsg(0, mpTag);
-    }
     virtual bool isGrouped() { return inputs.item(0)->isGrouped(); }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
@@ -661,11 +606,6 @@ public:
     {
         input = NULL;
     }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
-        if (!container.queryLocalOrGrouped())
-            mpTag = container.queryJob().deserializeMPTag(data);
-    }
     virtual IThorResult *createResult() = 0;
     virtual void process()
     {
@@ -676,60 +616,19 @@ public:
         IThorResult *result = createResult();
 
         Owned<IRowWriter> resultWriter = result->getWriter();
-        if (!result->isLocal())
-        {
-            CMessageBuffer msg;
-            MemoryBuffer mb;
-            CMemoryRowSerializer mbs(mb);
-            IRowInterfaces *inputRowIf = input->queryFromActivity();
-            loop
-            {
-                loop
-                {
-                    OwnedConstThorRow row = input->nextRow();
-                    if (!row)
-                    {
-                        row.setown(input->nextRow());
-                        if (!row)
-                            break;
-                    }
-                    inputRowIf->queryRowSerializer()->serialize(mbs, (const byte *)row.get());
-                    if (mb.length() > 0x80000)
-                        break;
-                }
-                msg.clear();
-                if (mb.length())
-                {
-                    ThorCompress(mb.toByteArray(), mb.length(), msg);
-                    mb.clear();
-                }
-                container.queryJob().queryJobComm().send(msg, 0, mpTag, LONGTIMEOUT);
-                if (0 == msg.length())
-                    break;
-            }
-            receiveResult(*this, *inputRowIf, *resultWriter);
-        }
-        else
+        loop
         {
-            loop
+            OwnedConstThorRow nextrec = input->nextRow();
+            if (!nextrec)
             {
-                OwnedConstThorRow nextrec = input->nextRow();
+                nextrec.setown(input->nextRow());
                 if (!nextrec)
-                {
-                    nextrec.setown(input->nextRow());
-                    if (!nextrec)
-                        break;
-                    resultWriter->putRow(NULL);
-                }
-                resultWriter->putRow(nextrec.getClear());
+                    break;
+                resultWriter->putRow(NULL);
             }
+            resultWriter->putRow(nextrec.getClear());
         }
     }
-    virtual void abort()
-    {
-        ProcessSlaveActivity::abort();
-        cancelReceiveMsg(0, mpTag);
-    }
     void endProcess()
     {
         if (processed & THORDATALINK_STARTED)
@@ -753,12 +652,8 @@ public:
     virtual IThorResult *createResult()
     {
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
-        bool local = container.queryOwner().isLocalOnly();
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
-        IThorResult *result = graph->queryResult(helper->querySequence());
-        if (result)
-            local = result->isLocal();
-        return graph->createResult(*this, helper->querySequence(), this, local);
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        return graph->createResult(*this, helper->querySequence(), this, !queryGraph().isLocalChild());
     }
 };
 
@@ -1145,6 +1040,11 @@ public:
     {
         appendOutputLinked(this);
     }
+    virtual void kill()
+    {
+        CSlaveActivity::kill();
+        resultStream.clear();
+    }
     virtual void start()
     {
         ActivityTimer s(totalCycles, timeActivities, NULL);
@@ -1156,8 +1056,8 @@ public:
             graph_id resultGraphId = container.queryXGMML().getPropInt("att[@name=\"_graphId\"]/@value");
             if (!resultGraphId)
                 resultGraphId = container.queryResultsGraph()->queryGraphId();
-            Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(resultGraphId);
-            Owned<IThorResult> result = graph->getGraphLoopResult(sequence);
+            Owned<CGraphBase> graph = queryJob().getGraph(resultGraphId);
+            Owned<IThorResult> result = graph->getGraphLoopResult(sequence, queryGraph().isLocalChild());
             resultStream.setown(result->getRowStream());
         }
         else
@@ -1182,6 +1082,7 @@ public:
     virtual void stop()
     {
         abortSoon = true;
+        resultStream.clear();
         dataLinkStop();
     }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
@@ -1279,8 +1180,8 @@ public:
     virtual IThorResult *createResult()
     {
         IHThorGraphLoopResultWriteArg *helper = (IHThorGraphLoopResultWriteArg *)queryHelper();
-        Owned<CGraphBase> graph = container.queryOwner().queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
-        return graph->createGraphLoopResult(*this, input->queryFromActivity());
+        Owned<CGraphBase> graph = queryJob().getGraph(container.queryResultsGraph()->queryGraphId());
+        return graph->createGraphLoopResult(*this, input->queryFromActivity(), !queryGraph().isLocalChild());
     }
 };
 

+ 1 - 1
thorlcr/activities/temptable/thtmptableslave.cpp

@@ -130,7 +130,7 @@ public:
         __uint64 numRows = helper->numRows();
         // local when generated from a child query (the range is per node, don't split)
         bool isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
-        if (!isLocal && (helper->getFlags() & TTFdistributed != 0))
+        if (!isLocal && ((helper->getFlags() & TTFdistributed) != 0))
         {
             __uint64 nodes = container.queryCodeContext()->getNodes();
             __uint64 nodeid = container.queryCodeContext()->getNodeNum();

+ 97 - 125
thorlcr/graph/thgraph.cpp

@@ -49,21 +49,19 @@ class CThorGraphResult : public CInterface, implements IThorResult, implements I
     Owned<IRowWriterMultiReader> rowBuffer;
     IRowInterfaces *rowIf;
     IEngineRowAllocator *allocator;
-    bool stopped, readers;
-    bool local;
+    bool stopped, readers, distributed;
 
     void init()
     {
-        stopped = false;
+        stopped = readers = false;
         allocator = rowIf->queryRowAllocator();
         meta = allocator->queryOutputMeta();
         rowStreamCount = 0;
-        readers = false;
     }
 public:
     IMPLEMENT_IINTERFACE;
 
-    CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _local) : activity(_activity), rowIf(_rowIf), local(_local)
+    CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _distributed) : activity(_activity), rowIf(_rowIf), distributed(_distributed)
     {
         init();
         rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
@@ -95,13 +93,13 @@ public:
         readers = true;
         return rowBuffer->getReader();
     }
-    virtual bool isLocal() const { return local; }
-    virtual IOutputMetaData *queryMeta() { return meta; }
-    virtual void getResult(size32_t &len, void * & data)
+    virtual IRowInterfaces *queryRowInterfaces() { return rowIf; }
+    virtual CActivityBase *queryActivity() { return &activity; }
+    virtual bool isDistributed() const { return distributed; }
+    virtual void serialize(MemoryBuffer &mb)
     {
         Owned<IRowStream> stream = getRowStream();
         bool grouped = meta->isGrouped();
-        MemoryBuffer mb;
         if (grouped)
         {
             OwnedConstThorRow prev = stream->nextRow();
@@ -138,6 +136,11 @@ public:
                 mb.append(sz, row.get());
             }
         }
+    }
+    virtual void getResult(size32_t &len, void * & data)
+    {
+        MemoryBuffer mb;
+        serialize(mb);
         len = mb.length();
         data = mb.detach();
     }
@@ -166,94 +169,21 @@ public:
 
 /////
 
-class CThorGraphResults : public CInterface, implements IThorGraphResults
+IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed)
 {
-    class CThorUninitializedGraphResults : public CInterface, implements IThorResult
-    {
-        unsigned id;
-
-    public:
-        IMPLEMENT_IINTERFACE
-
-        CThorUninitializedGraphResults(unsigned _id) { id = _id; }
-        virtual IRowWriter *getWriter() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual IRowStream *getRowStream() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual IOutputMetaData *queryMeta() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual bool isLocal() const { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual void getResult(size32_t & retSize, void * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-        virtual void getLinkedResult(unsigned & count, byte * * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
-    };
-    IArrayOf<IThorResult> results;
-    CriticalSection cs;
-    void ensureAtleast(unsigned id)
-    {
-        while (results.ordinality() < id)
-            results.append(*new CThorUninitializedGraphResults(results.ordinality()));
-    }
-public:
-    IMPLEMENT_IINTERFACE;
+    Owned<IThorResult> result = ::createResult(activity, rowIf, distributed);
+    setResult(id, result);
+    return result;
+}
 
-    CThorGraphResults(unsigned _numResults) { }
-    virtual void clear()
-    {
-        CriticalBlock procedure(cs);
-        results.kill();
-    }
-    virtual IThorResult *queryResult(unsigned id)
-    {
-        CriticalBlock procedure(cs);
-        if (results.ordinality() <= id)
-            return NULL;
-        IThorResult *res = &results.item(id);
-        if (QUERYINTERFACE(res, CThorUninitializedGraphResults))
-            return NULL;
-        return res;
-    }
-    virtual IThorResult *getResult(unsigned id)
-    {
-        CriticalBlock procedure(cs);
-        ensureAtleast(id+1);
-        // NB: stream static after this, i.e. nothing can be added to this result
-        return LINK(&results.item(id));
-    }
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool local=false)
-    {
-        Owned<CThorGraphResult> result = new CThorGraphResult(activity, rowIf, local);
-        setResult(id, result);
-        return result;
-    }
-    virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool local=false)
-    {
-        return createResult(activity, results.ordinality(), rowIf, local);
-    }
-    virtual void setResult(unsigned id, IThorResult *result)
-    {
-        CriticalBlock procedure(cs);
-        ensureAtleast(id);
-        if (results.isItem(id))
-            results.replace(*LINK(result), id);
-        else
-            results.append(*LINK(result));
-    }
-    virtual unsigned count() { return results.ordinality(); }
-    virtual void getResult(size32_t & retSize, void * & ret, unsigned id)
-    {
-        results.item(id).getResult(retSize, ret);
-    }
-    virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
-    {
-        results.item(id).getLinkedResult(count, ret);
-    }
-};
+/////
 
-IThorGraphResults *createThorGraphResults(unsigned num)
+IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed)
 {
-    return new CThorGraphResults(num);
+    return new CThorGraphResult(activity, rowIf, distributed);
 }
 
-//////////
-
+/////
 class CThorBoundLoopGraph : public CInterface, implements IThorBoundLoopGraph
 {
     CGraphBase *graph;
@@ -277,7 +207,7 @@ public:
         thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
         *res = loopCounter;
         OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
-        IThorResult *counterResult = results->createResult(activity, pos, countRowIf, true);
+        IThorResult *counterResult = results->createResult(activity, pos, countRowIf, false);
         Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
         counterResultWriter->putRow(counterRowFinal.getClear());
     }
@@ -285,12 +215,12 @@ public:
     {
         if (!resultRowIf)
             resultRowIf.setown(createRowInterfaces(resultMeta, activityId, activity.queryCodeContext()));
-        IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, true); // loop output, create and mark local
-        IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, true);
+        IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
+        IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild());
     }
     virtual IRowStream *execute(CActivityBase &activity, unsigned counter, IRowWriterMultiReader *inputStream, unsigned rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
     {
-        Owned<IThorGraphResults> results = createThorGraphResults(3);
+        Owned<IThorGraphResults> results = graph->createThorGraphResults(3);
         prepareLoopResults(activity, results);
         if (counter)
         {
@@ -301,12 +231,14 @@ public:
         if (inputStream)
             inputResult->setResultStream(inputStream, rowStreamCount);
         graph->executeChild(parentExtractSz, parentExtract, results, NULL);
+        if (0 == graph->queryJob().queryMyRank())
+            return NULL; // unset and unused on master
         Owned<IThorResult> result0 = results->getResult(0);
         return result0->getRowStream();
     }
     virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
     {
-        Owned<IThorGraphResults> results = createThorGraphResults(1);
+        Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
         if (counter)
         {
             prepareCounterResult(activity, results, counter, 0);
@@ -1304,13 +1236,18 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
     }
     try
     {
-        if (exception && !queryOwner())
+        if (exception)
         {
-            StringBuffer str;
-            Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
-            e->setGraphId(graphId);
-            e->setAction(tea_abort);
-            fireException(e);
+            if (NULL == owner || isGlobal())
+                waitBarrier->cancel();
+            if (!queryOwner())
+            {
+                StringBuffer str;
+                Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
+                e->setGraphId(graphId);
+                e->setAction(tea_abort);
+                fireException(e);
+            }
         }
     }
     catch (IException *e)
@@ -1666,6 +1603,14 @@ void CGraphBase::GraphPrintLog(IException *e, const char *format, ...)
     va_end(args);
 }
 
+void CGraphBase::setGlobal(bool tf)
+{
+    global = tf;
+    Owned<IThorGraphIterator> iter = getChildGraphs();
+    ForEach(*iter)
+        iter->query().setGlobal(tf);
+}
+
 void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph)
 {
     owner = _owner;
@@ -1692,7 +1637,7 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
         tmpHandler.setown(queryJob().createTempHandler());
     }
 
-    bool localChild = false;
+    localChild = false;
     if (owner && parentActivityId)
     {
         CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
@@ -1704,9 +1649,11 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
             case TAKloopdataset:
             case TAKgraphloop:
             case TAKparallelgraphloop:
-                if (!parentElement->queryLocal())
-                    global = true;
+            {
+                if (parentElement->queryOwner().isLocalChild())
+                    localChild = true;
                 break;
+            }
             default:
                 localChild = true;
                 break;
@@ -1741,6 +1688,35 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
                 global = isGlobalActivity(*act);
         }
     }
+
+    if (!localChild)
+    {
+        ForEach(*nodes)
+        {
+            IPropertyTree &e = nodes->query();
+            ThorActivityKind kind = (ThorActivityKind) e.getPropInt("att[@name=\"_kind\"]/@value");
+            switch (kind)
+            {
+                case TAKlooprow:
+                case TAKloopcount:
+                case TAKloopdataset:
+                case TAKgraphloop:
+                case TAKparallelgraphloop:
+                {
+                    unsigned loopId = e.getPropInt("att[@name=\"_loopid\"]/@value");
+                    // if any need sub graph or dependency if subgraphs need global execution,
+                    // then all loop subgraphs must be executed globally (sync'd with master)
+                    Owned<CGraphBase> loopGraph = getChildGraph(loopId);
+                    if (!loopGraph->isLocalOnly())
+                        loopGraph->setGlobal();
+                    break;
+                }
+                default:
+                    break;
+            }
+        }
+    }
+
     Owned<IPropertyTreeIterator> edges = xgmml->getElements("edge");
     ForEach(*edges)
     {
@@ -1854,46 +1830,36 @@ void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtrac
     doExecuteChild(parentExtractSz, parentExtract);
 }
 
-IThorResult *CGraphBase::queryResult(unsigned id)
-{
-    return localResults->queryResult(id);
-}
-
-IThorResult *CGraphBase::getResult(unsigned id)
+IThorResult *CGraphBase::getResult(unsigned id, bool distributed)
 {
-    return localResults->getResult(id);
+    return localResults->getResult(id, distributed);
 }
 
-IThorResult *CGraphBase::queryGraphLoopResult(unsigned id)
+IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
 {
-    return graphLoopResults->queryResult(id);
+    return graphLoopResults->getResult(id, distributed);
 }
 
-IThorResult *CGraphBase::getGraphLoopResult(unsigned id)
+IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed)
 {
-    return graphLoopResults->getResult(id);
+    return localResults->createResult(activity, id, rowIf, distributed);
 }
 
-IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool local)
+IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed)
 {
-    return localResults->createResult(activity, id, rowIf, local);
-}
-
-IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf)
-{
-    return graphLoopResults->createResult(activity, rowIf, true);
+    return graphLoopResults->createResult(activity, rowIf, distributed);
 }
 
 // ILocalGraph
 void CGraphBase::getResult(size32_t & len, void * & data, unsigned id)
 {
-    Owned<IThorResult> result = localResults->getResult(id);
+    Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
     result->getResult(len, data);
 }
 
 void CGraphBase::getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
 {
-    Owned<IThorResult> result = localResults->getResult(id);
+    Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
     result->getLinkedResult(count, ret);
 }
 
@@ -1966,6 +1932,12 @@ bool CGraphBase::isLocalOnly() const // checks all dependencies, if something ne
     return 1==localOnly;
 }
 
+IThorGraphResults *CGraphBase::createThorGraphResults(unsigned num)
+{
+    return new CThorGraphResults(num);
+}
+
+
 ////
 
 void CGraphTempHandler::registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)

+ 101 - 16
thorlcr/graph/thgraph.hpp

@@ -73,7 +73,8 @@ enum msgids
     Shutdown,
     GraphInit,
     GraphEnd,
-    GraphAbort
+    GraphAbort,
+    GraphGetResult
 };
 
 interface ICodeContextExt : extends ICodeContext
@@ -114,8 +115,10 @@ interface IThorResult : extends IInterface
     virtual IRowWriter *getWriter() = 0;
     virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count) = 0;
     virtual IRowStream *getRowStream() = 0;
-    virtual IOutputMetaData *queryMeta() = 0;
-    virtual bool isLocal() const = 0;
+    virtual IRowInterfaces *queryRowInterfaces() = 0;
+    virtual CActivityBase *queryActivity() = 0;
+    virtual bool isDistributed() const = 0;
+    virtual void serialize(MemoryBuffer &mb) = 0;
     virtual void getResult(size32_t & retSize, void * & ret) = 0;
     virtual void getLinkedResult(unsigned & count, byte * * & ret) = 0;
 };
@@ -125,10 +128,10 @@ class CActivityBase;
 interface IThorGraphResults : extends IEclGraphResults
 {
     virtual void clear() = 0;
-    virtual IThorResult *queryResult(unsigned id) = 0;
-    virtual IThorResult *getResult(unsigned id) = 0;
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool local=false) = 0;
-    virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool local=false) = 0;
+    virtual IThorResult *getResult(unsigned id, bool distributed=false) = 0;
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed) = 0;
+    virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed) = 0;
+    virtual unsigned addResult(IThorResult *result) = 0;
     virtual void setResult(unsigned id, IThorResult *result) = 0;
     virtual unsigned count() = 0;
 };
@@ -432,12 +435,11 @@ class graph_decl CGraphBase : public CInterface, implements ILocalGraph, impleme
     CriticalSection evaluateCrit;
     CGraphElementTable containers;
     CGraphElementArray sinks;
-    bool sink, complete, global;
+    bool sink, complete, global, localChild;
     mutable int localOnly;
     activity_id parentActivityId;
     IPropertyTree *xgmml;
     CGraphTable childGraphs;
-    Owned<IThorGraphResults> localResults, graphLoopResults;
     Owned<IGraphTempHandler> tmpHandler;
 
     void clean();
@@ -527,6 +529,7 @@ class graph_decl CGraphBase : public CInterface, implements ILocalGraph, impleme
     } graphCodeContext;
 
 protected:
+    Owned<IThorGraphResults> localResults, graphLoopResults;
     CGraphBase *owner, *parent;
     Owned<IException> abortException;
     CGraphElementArrayCopy ifs;
@@ -619,7 +622,9 @@ public:
     bool isCreated() const { return created; }
     bool isStarted() const { return started; }
     bool isLocalOnly() const; // this graph and all upstream dependencies
+    bool isLocalChild() const { return localChild; }
     void setCompleteEx(bool tf=true) { complete = tf; }
+    void setGlobal(bool tf=true);
     const byte *setParentCtx(size32_t _parentExtractSz, const byte *parentExtract)
     {
         parentExtractSz = _parentExtractSz;
@@ -729,16 +734,15 @@ public:
     virtual void done();
     virtual void end();
     virtual void abort(IException *e);
+    virtual IThorGraphResults *createThorGraphResults(unsigned num);
 
 // IExceptionHandler
     virtual bool fireException(IException *e);
 
-    virtual IThorResult *queryResult(unsigned id);
-    virtual IThorResult *getResult(unsigned id);
-    virtual IThorResult *queryGraphLoopResult(unsigned id);
-    virtual IThorResult *getGraphLoopResult(unsigned id);
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool local=false);
-    virtual IThorResult *createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf);
+    virtual IThorResult *getResult(unsigned id, bool distributed=false);
+    virtual IThorResult *getGraphLoopResult(unsigned id, bool distributed=false);
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed);
+    virtual IThorResult *createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed);
 
 // ILocalGraph
     virtual void getResult(size32_t & len, void * & data, unsigned id);
@@ -949,6 +953,7 @@ public:
     ~CActivityBase();
     CGraphElementBase &queryContainer() const { return container; }
     CJobBase &queryJob() const { return container.queryJob(); }
+    CGraphBase &queryGraph() const { return container.queryOwner(); }
     inline const mptag_t queryMpTag() const { return mpTag; }
     inline const bool &queryAbortSoon() const { return abortSoon; }
     inline IHThorArg *queryHelper() const { return baseHelper; }
@@ -1037,11 +1042,91 @@ public:
     virtual IFileInProgressHandler &queryFileInProgressHandler() { UNIMPLEMENTED; return *((IFileInProgressHandler *)NULL); }
 };
 
+class graph_decl CThorGraphResults : public CInterface, implements IThorGraphResults
+{
+protected:
+    class CThorUninitializedGraphResults : public CInterface, implements IThorResult
+    {
+        unsigned id;
+
+    public:
+        IMPLEMENT_IINTERFACE
+
+        CThorUninitializedGraphResults(unsigned _id) { id = _id; }
+        virtual IRowWriter *getWriter() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual IRowStream *getRowStream() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual IRowInterfaces *queryRowInterfaces() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual CActivityBase *queryActivity() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual bool isDistributed() const { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual void serialize(MemoryBuffer &mb) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual void getResult(size32_t & retSize, void * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+        virtual void getLinkedResult(unsigned & count, byte * * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
+    };
+    IArrayOf<IThorResult> results;
+    CriticalSection cs;
+    void ensureAtLeast(unsigned id)
+    {
+        while (results.ordinality() < id)
+            results.append(*new CThorUninitializedGraphResults(results.ordinality()));
+    }
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CThorGraphResults(unsigned _numResults) { ensureAtLeast(_numResults); }
+    virtual void clear()
+    {
+        CriticalBlock procedure(cs);
+        results.kill();
+    }
+    virtual IThorResult *getResult(unsigned id, bool distributed)
+    {
+        CriticalBlock procedure(cs);
+        ensureAtLeast(id+1);
+        // NB: stream static after this, i.e. nothing can be added to this result
+        return LINK(&results.item(id));
+    }
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed);
+    virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed)
+    {
+        return createResult(activity, results.ordinality(), rowIf, distributed);
+    }
+    virtual unsigned addResult(IThorResult *result)
+    {
+        CriticalBlock procedure(cs);
+        unsigned id = results.ordinality();
+        setResult(id, result);
+        return id;
+    }
+    virtual void setResult(unsigned id, IThorResult *result)
+    {
+        CriticalBlock procedure(cs);
+        ensureAtLeast(id);
+        if (results.isItem(id))
+            results.replace(*LINK(result), id);
+        else
+            results.append(*LINK(result));
+    }
+    virtual unsigned count() { return results.ordinality(); }
+    virtual void getResult(size32_t & retSize, void * & ret, unsigned id)
+    {
+        Owned<IThorResult> result = getResult(id, true);
+        result->getResult(retSize, ret);
+    }
+    virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
+    {
+        Owned<IThorResult> result = getResult(id, true);
+        result->getLinkedResult(count, ret);
+    }
+};
+
+extern graph_decl IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed);
+
+
 class CGraphElementBase;
 typedef CGraphElementBase *(*CreateFunc)(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph);
 extern graph_decl void registerCreateFunc(CreateFunc func);
 extern graph_decl CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph);
-extern graph_decl IThorGraphResults *createThorGraphResults(unsigned num);
 extern graph_decl IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId);
 extern graph_decl bool isDiskInput(ThorActivityKind kind);
 

+ 181 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -44,6 +44,8 @@
 #include "eclhelper.hpp"
 #include "thexception.hpp"
 #include "thactivitymaster.ipp"
+#include "thmem.hpp"
+#include "thcompressutil.hpp"
 
 static CriticalSection *jobManagerCrit;
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -286,7 +288,7 @@ void CSlaveMessageHandler::main()
                 }
                 case smt_actMsg:
                 {
-                    LOG(MCdebugProgress, unknownJob, "smt_podata called from slave %d", sender-1);
+                    LOG(MCdebugProgress, unknownJob, "smt_actMsg called from slave %d", sender-1);
                     graph_id gid;
                     msg.read(gid);
                     activity_id id;
@@ -300,6 +302,21 @@ void CSlaveMessageHandler::main()
                     activity->handleSlaveMessage(msg); // don't block
                     break;
                 }
+                case smt_getresult:
+                {
+                    LOG(MCdebugProgress, unknownJob, "smt_getresult called from slave %d", sender-1);
+                    graph_id gid;
+                    msg.read(gid);
+                    Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
+                    unsigned resultId;
+                    msg.read(resultId);
+                    mptag_t replyTag = graph->queryJob().deserializeMPTag(msg);
+                    assertex(graph);
+                    Owned<IThorResult> result = graph->getResult(resultId);
+                    Owned<IRowStream> resultStream = result->getRowStream();
+                    sendInChunks(graph->queryJob().queryJobComm(), sender, replyTag, resultStream, result->queryRowInterfaces());
+                    break;
+                }
             }
         }
     }
@@ -1734,6 +1751,153 @@ bool CJobMaster::fireException(IException *e)
 
 ///////////////////
 
+class CCollatedResult : public CSimpleInterface, implements IThorResult
+{
+    CMasterGraph &graph;
+    CActivityBase &activity;
+    IRowInterfaces *rowIf;
+    unsigned id;
+    CriticalSection crit;
+    PointerArrayOf<CThorExpandingRowArray> results;
+    Owned<IThorResult> result;
+
+    void ensure()
+    {
+        CriticalBlock b(crit);
+        if (result)
+            return;
+        mptag_t replyTag = createReplyTag();
+        CMessageBuffer msg;
+        msg.append(GraphGetResult);
+        msg.append(activity.queryJob().queryKey());
+        msg.append(graph.queryGraphId());
+        msg.append(id);
+        msg.append(replyTag);
+        ((CJobMaster &)graph.queryJob()).broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, NULL, NULL, true);
+
+        unsigned numSlaves = graph.queryJob().querySlaves();
+        for (unsigned n=0; n<numSlaves; n++)
+            results.item(n)->kill();
+        rank_t sender;
+        MemoryBuffer mb;
+        Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
+        CThorStreamDeserializerSource rowSource(stream);
+        unsigned todo = numSlaves;
+
+        loop
+        {
+            loop
+            {
+                if (activity.queryAbortSoon())
+                    return;
+                msg.clear();
+                if (activity.receiveMsg(msg, RANK_ALL, replyTag, &sender, 60*1000))
+                    break;
+                ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
+            }
+            sender = sender - 1; // 0 = master
+            if (!msg.length())
+            {
+                --todo;
+                if (0 == todo)
+                    break; // done
+            }
+            else
+            {
+                bool error;
+                msg.read(error);
+                if (error)
+                {
+                    Owned<IThorException> e = deserializeThorException(msg);
+                    e->setSlave(sender);
+                    throw e.getClear();
+                }
+                ThorExpand(msg, mb.clear());
+
+                CThorExpandingRowArray *slaveResults = results.item(sender);
+                while (!rowSource.eos())
+                {
+                    RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
+                    size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
+                    slaveResults->append(rowBuilder.finalizeRowClear(sz));
+                }
+            }
+        }
+        Owned<IThorResult> _result = ::createResult(activity, rowIf, false);
+        Owned<IRowWriter> resultWriter = _result->getWriter();
+        for (unsigned s=0; s<numSlaves; s++)
+        {
+            CThorExpandingRowArray *slaveResult = results.item(s);
+            ForEachItemIn(r, *slaveResult)
+            {
+                const void *row = slaveResult->query(r);
+                LinkThorRow(row);
+                resultWriter->putRow(row);
+            }
+        }
+        result.setown(_result.getClear());
+    }
+
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+    CCollatedResult(CMasterGraph &_graph, CActivityBase &_activity, IRowInterfaces *_rowIf, unsigned _id) : graph(_graph), activity(_activity), rowIf(_rowIf), id(_id)
+    {
+        for (unsigned n=0; n<graph.queryJob().querySlaves(); n++)
+            results.append(new CThorExpandingRowArray(activity));
+    }
+    ~CCollatedResult()
+    {
+        ForEachItemIn(l, results)
+        {
+            CThorExpandingRowArray *result = results.item(l);
+            delete result;
+        }
+    }
+    void setId(unsigned _id)
+    {
+        id = _id;
+    }
+
+// IThorResult
+    virtual IRowWriter *getWriter() { throwUnexpected(); }
+    virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
+    {
+        throwUnexpected();
+    }
+    virtual IRowStream *getRowStream()
+    {
+        ensure();
+        return result->getRowStream();
+    }
+    virtual IRowInterfaces *queryRowInterfaces()
+    {
+        return rowIf;
+    }
+    virtual CActivityBase *queryActivity()
+    {
+        return &activity;
+    }
+    virtual bool isDistributed() const { return false; }
+    virtual void serialize(MemoryBuffer &mb)
+    {
+        ensure();
+        result->serialize(mb);
+    }
+    virtual void getResult(size32_t & retSize, void * & ret)
+    {
+        ensure();
+        return result->getResult(retSize, ret);
+    }
+    virtual void getLinkedResult(unsigned & count, byte * * & ret)
+    {
+        ensure();
+        return result->getLinkedResult(count, ret);
+    }
+};
+
+///////////////////
+
 //
 // CMasterGraph impl.
 //
@@ -2411,6 +2575,22 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
     return true;
 }
 
+IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed)
+{
+    Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id);
+    localResults->setResult(id, result);
+    return result;
+}
+
+IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed)
+{
+    Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, 0);
+    unsigned id = graphLoopResults->addResult(result);
+    result->setId(id);
+    return result;
+}
+
+
 ///////////////////////////////////////////////////
 
 CThorStats::CThorStats()

+ 3 - 0
thorlcr/graph/thgraphmaster.ipp

@@ -78,6 +78,9 @@ public:
     virtual void start();
     virtual void done();
     virtual void abort(IException *e);
+    IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed);
+    IThorResult *createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed);
+
 // IExceptionHandler
     virtual bool fireException(IException *e);
 // IThorChildGraph impl.

+ 107 - 0
thorlcr/graph/thgraphslave.cpp

@@ -27,6 +27,7 @@
 #include "thorport.hpp"
 #include "slwatchdog.hpp"
 #include "thgraphslave.hpp"
+#include "thcompressutil.hpp"
 
 //////////////////////////////////
 
@@ -758,6 +759,112 @@ void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
     getDoneSem.signal();
 }
 
+
+class CThorSlaveGraphResults : public CThorGraphResults
+{
+    CSlaveGraph &graph;
+    IArrayOf<IThorResult> globalResults;
+    PointerArrayOf<CriticalSection> globalResultCrits;
+    void ensureAtLeastGlobals(unsigned id)
+    {
+        while (globalResults.ordinality() < id)
+        {
+            globalResults.append(*new CThorUninitializedGraphResults(globalResults.ordinality()));
+            globalResultCrits.append(new CriticalSection);
+        }
+    }
+public:
+    CThorSlaveGraphResults(CSlaveGraph &_graph,unsigned numResults) : CThorGraphResults(numResults), graph(_graph)
+    {
+    }
+    virtual void clear()
+    {
+        CriticalBlock procedure(cs);
+        results.kill();
+        globalResults.kill();
+        ForEachItemIn(i, globalResultCrits)
+            delete globalResultCrits.item(i);
+        globalResultCrits.kill();
+    }
+    IThorResult *getResult(unsigned id, bool distributed)
+    {
+        Linked<IThorResult> result;
+        {
+            CriticalBlock procedure(cs);
+            ensureAtLeast(id+1);
+
+            result.set(&results.item(id));
+            if (!distributed || !result->isDistributed())
+                return result.getClear();
+            ensureAtLeastGlobals(id+1);
+        }
+        CriticalBlock b(*globalResultCrits.item(id)); // block other global requests for this result
+        IThorResult *globalResult = &globalResults.item(id);
+        if (!QUERYINTERFACE(globalResult, CThorUninitializedGraphResults))
+            return LINK(globalResult);
+        Owned<IThorResult> gr = graph.getGlobalResult(*result->queryActivity(), result->queryRowInterfaces(), id);
+        globalResults.replace(*gr.getLink(), id);
+        return gr.getClear();
+    }
+};
+
+IThorGraphResults *CSlaveGraph::createThorGraphResults(unsigned num)
+{
+    return new CThorSlaveGraphResults(*this, num);
+}
+
+IThorResult *CSlaveGraph::getGlobalResult(CActivityBase &activity, IRowInterfaces *rowIf, unsigned id)
+{
+    mptag_t replyTag = createReplyTag();
+    CMessageBuffer msg;
+    msg.setReplyTag(replyTag);
+    msg.append(smt_getresult);
+    msg.append(graphId);
+    msg.append(id);
+    msg.append(replyTag);
+
+    if (!queryJob().queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
+        throwUnexpected();
+
+    Owned<IThorResult> result = ::createResult(activity, rowIf, false);
+    Owned<IRowWriter> resultWriter = result->getWriter();
+
+    MemoryBuffer mb;
+    Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
+    CThorStreamDeserializerSource rowSource(stream);
+
+    loop
+    {
+        loop
+        {
+            if (activity.queryAbortSoon())
+                return NULL;
+            msg.clear();
+            if (activity.receiveMsg(msg, 0, replyTag, NULL, 60*1000))
+                break;
+            ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
+        }
+        if (!msg.length())
+            break; // done
+        else
+        {
+            bool error;
+            msg.read(error);
+            if (error)
+                throw deserializeThorException(msg);
+            ThorExpand(msg, mb.clear());
+            while (!rowSource.eos())
+            {
+                RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
+                size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
+                resultWriter->putRow(rowBuilder.finalizeRowClear(sz));
+            }
+        }
+    }
+    return result.getClear();
+}
+
+
 ///////////////////////////
 
 class CThorCodeContextSlave : public CThorCodeContextBase

+ 2 - 0
thorlcr/graph/thgraphslave.hpp

@@ -107,6 +107,7 @@ public:
     void initWithActData(MemoryBuffer &in, MemoryBuffer &out);
     void getDone(MemoryBuffer &doneInfoMb);
     void serializeDone(MemoryBuffer &mb);
+    IThorResult *getGlobalResult(CActivityBase &activity, IRowInterfaces *rowIf, unsigned id);
 
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
     virtual void serializeStats(MemoryBuffer &mb);
@@ -117,6 +118,7 @@ public:
     virtual void abort(IException *e);
     virtual void done();
     virtual void end();
+    virtual IThorGraphResults *createThorGraphResults(unsigned num);
 
 // IExceptionHandler
     virtual bool fireException(IException *e)

+ 33 - 0
thorlcr/slave/slavmain.cpp

@@ -44,6 +44,7 @@
 #include "thorport.hpp"
 #include "thgraphslave.hpp"
 #include "slave.ipp"
+#include "thcompressutil.hpp"
 
 //---------------------------------------------------------------------------
 
@@ -344,6 +345,38 @@ public:
                         stopped = true;
                         break;
                     }
+                    case GraphGetResult:
+                    {
+                        StringAttr jobKey;
+                        msg.read(jobKey);
+                        PROGLOG("GraphGetResult: %s", jobKey.get());
+                        CJobSlave *job = jobs.find(jobKey.get());
+                        if (job)
+                        {
+                            graph_id gid;
+                            msg.read(gid);
+                            Owned<CGraphBase> graph = job->getGraph(gid);
+                            if (!graph)
+                            {
+                                Owned<IThorException> e = MakeThorException(0, "GraphGetResult: graph not found");
+                                e->setGraphId(gid);
+                                throw e.getClear();
+                            }
+                            unsigned resultId;
+                            msg.read(resultId);
+                            mptag_t replyTag = job->deserializeMPTag(msg);
+                            msg.setReplyTag(replyTag);
+                            Owned<IThorResult> result = graph->getResult(resultId);
+                            if (!result)
+                                throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
+                            msg.clear();
+
+                            Owned<IRowStream> resultStream = result->getRowStream();
+                            sendInChunks(job->queryJobComm(), 0, replyTag, resultStream, result->queryRowInterfaces());
+                            doReply = false;
+                        }
+                        break;
+                    }
                     default:
                         throwUnexpected();
                 }

+ 35 - 1
thorlcr/thorutil/thormisc.cpp

@@ -40,6 +40,7 @@
 #include "thgraph.hpp"
 #include "thbufdef.hpp"
 #include "thmem.hpp"
+#include "thcompressutil.hpp"
 
 #include "eclrtl.hpp"
 #include "eclhelper.hpp"
@@ -1195,7 +1196,6 @@ public:
                     if (!row)
                         break;
                     activity->queryRowSerializer()->serialize(mbs,(const byte *)row.get());
-
                 } while (mb.length() < fetchBuffSize); // NB: allows at least 1
                 if (!comm.send(mb, sender, mpTag, LONGTIMEOUT))
                     throw MakeStringException(0, "CRowStreamFromNode: Failed to send data back to node: %d", activity->queryContainer().queryJob().queryMyRank());
@@ -1236,3 +1236,37 @@ IRowStream *createUngroupStream(IRowStream *input)
     };
     return new CUngroupStream(input);
 }
+
+void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpTag, IRowStream *input, IRowInterfaces *rowIf)
+{
+    CMessageBuffer msg;
+    MemoryBuffer mb;
+    CMemoryRowSerializer mbs(mb);
+    IOutputRowSerializer *serializer = rowIf->queryRowSerializer();
+    loop
+    {
+        loop
+        {
+            OwnedConstThorRow row = input->nextRow();
+            if (!row)
+            {
+                row.setown(input->nextRow());
+                if (!row)
+                    break;
+            }
+            serializer->serialize(mbs, (const byte *)row.get());
+            if (mb.length() > 0x80000)
+                break;
+        }
+        msg.clear();
+        if (mb.length())
+        {
+            msg.append(false); // no error
+            ThorCompress(mb.toByteArray(), mb.length(), msg);
+            mb.clear();
+        }
+        comm.send(msg, dst, mpTag, LONGTIMEOUT);
+        if (0 == msg.length())
+            break;
+    }
+}

+ 4 - 1
thorlcr/thorutil/thormisc.hpp

@@ -300,7 +300,7 @@ extern graph_decl void reportExceptionToWorkunit(IConstWorkUnit &workunit,IExcep
 
 extern graph_decl IPropertyTree *globals;
 extern graph_decl mptag_t masterSlaveMpTag;
-enum SlaveMsgTypes { smt_errorMsg=1, smt_initGraphReq, smt_initActDataReq, smt_dataReq, smt_getPhysicalName, smt_getFileOffset, smt_actMsg };
+enum SlaveMsgTypes { smt_errorMsg=1, smt_initGraphReq, smt_initActDataReq, smt_dataReq, smt_getPhysicalName, smt_getFileOffset, smt_actMsg, smt_getresult };
 // Logging
 extern graph_decl const LogMsgJobInfo thorJob;
 
@@ -340,5 +340,8 @@ extern graph_decl IRowServer *createRowServer(CActivityBase *activity, IRowStrea
 
 extern graph_decl IRowStream *createUngroupStream(IRowStream *input);
 
+interface IRowInterfaces;
+extern graph_decl void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpTag, IRowStream *input, IRowInterfaces *rowIf);
+
 #endif