Browse Source

Optimizing rapid child query speed

Various optimizations to help child queries which are ran repeatedly
thousands of times in rapid succession.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 years ago
parent
commit
4a888292cc

+ 1 - 1
thorlcr/activities/diskread/thdiskread.cpp

@@ -78,7 +78,7 @@ public:
     virtual void done()
     {
         IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
-        if (actStarted && 0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
+        if (0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
             container.queryTempHandler()->deregisterFile(helper->getFileName(), fileDesc->queryProperties().getPropBool("@pausefile"));
     }
     virtual void init()

+ 3 - 6
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -33,7 +33,7 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
 {
     class CInputHandler : public CInterface, implements IThreaded
     {
-        CThreaded threaded;
+        CThreadedPersistent threaded;
         CParallelFunnel &funnel;
         Linked<IRowStream> input;
         CriticalSection stopCrit;
@@ -43,7 +43,7 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
         bool stopping;
     public:
         CInputHandler(CParallelFunnel &_funnel, IRowStream *_input, unsigned _inputIndex) 
-            : threaded("CInputHandler"), funnel(_funnel), input(_input), inputIndex(_inputIndex)
+            : threaded("CInputHandler", this), funnel(_funnel), input(_input), inputIndex(_inputIndex)
         {
             readThisInput = 0;
             StringBuffer s(funnel.idStr);
@@ -51,18 +51,15 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
             idStr.set(s.str());
             stopping = false;
         }
-
         ~CInputHandler()
         {
             // stop();      too late to call stop I think
         }
-
         void start()
         {
             // NB don't start in constructor
-            threaded.init(this);
+            threaded.start();
         }
-
         void stop()
         {
             CriticalBlock b(stopCrit);

+ 3 - 7
thorlcr/activities/iterate/thiterateslave.cpp

@@ -473,7 +473,7 @@ class CStreamedIteratorSlaveActivity : public CSlaveActivity, public CThorDataLi
 {
     IHThorStreamedIteratorArg *helper;
     Owned<IRowStream> rows;
-    bool eof, isLocal, isLocalCache;
+    bool eof, isLocal;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -486,15 +486,11 @@ public:
     {
         appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorStreamedIteratorArg *> (queryHelper());
-        isLocalCache = isLocal = false;
+        isLocal = false;
     }
     virtual void start()
     {
-        if (!isLocalCache)
-        {
-            isLocalCache = true;
-            isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
-        }
+        isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
         eof = isLocal ? false : (container.queryJob().queryMyRank()>1);
         if (!eof)
             rows.setown(helper->createInput());

+ 9 - 9
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -566,7 +566,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
     class CKeyedFetchHandler : public CSimpleInterface, implements IThreaded
     {
         CKeyedJoinSlave &owner;
-        CThreaded threaded;
+        CThreadedPersistent threaded;
         bool writeWaiting, replyWaiting, stopped, aborted;
         unsigned pendingSends, pendingReplies, nodes, minFetchSendSz, totalSz, fetchMin;
         size32_t perRowMin;
@@ -590,16 +590,16 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
     public:
         class CKeyedFetchResultProcessor : public CSimpleInterface, implements IThreaded
         {
-            CThreaded threaded;
+            CThreadedPersistent threaded;
             CKeyedJoinSlave &owner;
             ICommunicator &comm;
             mptag_t resultMpTag;
             bool aborted;
         public:
-            CKeyedFetchResultProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _mpTag) : threaded("CKeyedFetchResultProcessor"), owner(_owner), comm(_comm), resultMpTag(_mpTag)
+            CKeyedFetchResultProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _mpTag) : threaded("CKeyedFetchResultProcessor", this), owner(_owner), comm(_comm), resultMpTag(_mpTag)
             {
                 aborted = false;
-                threaded.init(this);
+                threaded.start();
             }
             ~CKeyedFetchResultProcessor()
             {
@@ -685,7 +685,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
         } *resultProcessor;
         class CKeyedFetchRequestProcessor : public CSimpleInterface, implements IThreaded
         {
-            CThreaded threaded;
+            CThreadedPersistent threaded;
             CKeyedJoinSlave &owner;
             ICommunicator &comm;
             mptag_t requestMpTag, resultMpTag;
@@ -704,10 +704,10 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
             }
 
         public:
-            CKeyedFetchRequestProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _requestMpTag, mptag_t _resultMpTag) : threaded("CKeyedFetchRequestProcessor"), owner(_owner), comm(_comm), requestMpTag(_requestMpTag), resultMpTag(_resultMpTag)
+            CKeyedFetchRequestProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _requestMpTag, mptag_t _resultMpTag) : threaded("CKeyedFetchRequestProcessor", this), owner(_owner), comm(_comm), requestMpTag(_requestMpTag), resultMpTag(_resultMpTag)
             {
                 aborted = false;
-                threaded.init(this);
+                threaded.start();
             }
             ~CKeyedFetchRequestProcessor()
             {
@@ -877,7 +877,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
             }
         } *requestProcessor;
     public:
-        CKeyedFetchHandler(CKeyedJoinSlave &_owner) : threaded("CKeyedFetchHandler"), owner(_owner)
+        CKeyedFetchHandler(CKeyedJoinSlave &_owner) : threaded("CKeyedFetchHandler", this), owner(_owner)
         {
             minFetchSendSz = NEWFETCHSENDMAX;
             totalSz = 0;
@@ -901,7 +901,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
             requestProcessor = new CKeyedFetchRequestProcessor(owner, owner.container.queryJob().queryJobComm(), requestMpTag, resultMpTag); // remote receive of fetch fpos'
             resultProcessor = new CKeyedFetchResultProcessor(owner, owner.container.queryJob().queryJobComm(), resultMpTag); // asynchronously receiving results back
 
-            threaded.init(this);
+            threaded.start();
         }
         ~CKeyedFetchHandler()
         {

+ 20 - 24
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -82,6 +82,7 @@ class NSplitterSlaveActivity : public CSlaveActivity
     bool eofHit;
     CriticalSection startCrit;
     bool inputsConfigured;
+    CriticalSection startLock;
     unsigned nstopped;
     rowcount_t recsReady;
     SpinLock timingLock;
@@ -89,7 +90,7 @@ class NSplitterSlaveActivity : public CSlaveActivity
     class CWriter : public CSimpleInterface, IThreaded
     {
         NSplitterSlaveActivity &parent;
-        CThreaded threaded;
+        CThreadedPersistent threaded;
         Semaphore sem;
         bool stopped;
         rowcount_t writerMax;
@@ -97,7 +98,7 @@ class NSplitterSlaveActivity : public CSlaveActivity
         SpinLock recLock;
 
     public:
-        CWriter(NSplitterSlaveActivity &_parent) : parent(_parent), threaded("CWriter")
+        CWriter(NSplitterSlaveActivity &_parent) : parent(_parent), threaded("CWriter", this)
         {
             stopped = true;
             writerMax = 0;
@@ -124,7 +125,7 @@ class NSplitterSlaveActivity : public CSlaveActivity
         {
             stopped = false;
             writerMax = 0;
-            threaded.init(this);
+            threaded.start();
         }
         virtual void stop()
         {
@@ -305,7 +306,7 @@ public:
     }
     void prepareInput(unsigned output)
     {
-        CriticalBlock block(startCrit);
+        CriticalBlock block(startLock);
         if (!input)
         {
             input = inputs.item(0);
@@ -313,17 +314,22 @@ public:
                 startInput(input);
                 grouped = input->isGrouped();
                 nstopped = outputs.ordinality();
-                if (spill)
-                {
-                    StringBuffer tempname;
-                    GetTempName(tempname,"nsplit",true); // use alt temp dir
-                    smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), outputs.ordinality(), queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
-                    ActPrintLog("Using temp spill file: %s", tempname.str());
-                }
+                if (smartBuf)
+                    smartBuf->reset();
                 else
                 {
-                    ActPrintLog("Spill is 'balanced'");
-                    smartBuf.setown(createSharedSmartMemBuffer(this, outputs.ordinality(), queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
+                    if (spill)
+                    {
+                        StringBuffer tempname;
+                        GetTempName(tempname,"nsplit",true); // use alt temp dir
+                        smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), outputs.ordinality(), queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
+                        ActPrintLog("Using temp spill file: %s", tempname.str());
+                    }
+                    else
+                    {
+                        ActPrintLog("Spill is 'balanced'");
+                        smartBuf.setown(createSharedSmartMemBuffer(this, outputs.ordinality(), queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
+                    }
                 }
                 writer.start();
             }
@@ -383,16 +389,6 @@ public:
             input = NULL;
         }
     }
-    void doStopInput()
-    {
-        CriticalBlock block(startCrit);
-        if (nstopped)
-        {
-            nstopped = 0;
-            stopInput(input);
-            input = NULL;
-        }
-    }
     void abort()
     {
         CSlaveActivity::abort();
@@ -432,7 +428,7 @@ void SplitterOutput::start()
 
 void SplitterOutput::stop() 
 { 
-    CriticalBlock block(activity.startCrit);
+    CriticalBlock block(activity.startLock);
     activity.smartBuf->queryOutput(output)->stop();
     activity.inputStopped();
     dataLinkStop();

+ 3 - 3
thorlcr/activities/project/thprojectslave.cpp

@@ -131,7 +131,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity, public CThorDataLin
     class CPrefetcher : public CSimpleInterface, implements IThreaded
     {
         CPrefetchProjectSlaveActivity &parent;
-        CThreaded threaded;
+        CThreadedPersistent threaded;
         rowcount_t recordCount;
         bool full, blocked, stopped, eoi, eog, eoq;
         QueueOf<PrefetchInfo, true> prefetchQueue;
@@ -139,7 +139,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity, public CThorDataLin
         Semaphore blockedSem, fullSem;
 
     public:
-        CPrefetcher(CPrefetchProjectSlaveActivity &_parent) : threaded("CPrefetcher"), parent(_parent)
+        CPrefetcher(CPrefetchProjectSlaveActivity &_parent) : threaded("CPrefetcher", this), parent(_parent)
         {
         }
         ~CPrefetcher() { stop(); }
@@ -159,7 +159,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity, public CThorDataLin
             eoi = true;
             return NULL;
         }
-        void start() { recordCount = 0; full = blocked = eoq = eoi = stopped = false; eog = true; threaded.init(this); }
+        void start() { recordCount = 0; full = blocked = eoq = eoi = stopped = false; eog = true; threaded.start(); }
         void stop()
         {
             stopped = true;

+ 3 - 7
thorlcr/activities/temptable/thtmptableslave.cpp

@@ -30,7 +30,7 @@ private:
     bool eof;
     unsigned currentRow;
     size32_t maxrecsize;
-    bool isLocal, isLocalCache;
+    bool isLocal;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -39,7 +39,7 @@ public:
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         appendOutputLinked(this);
-        isLocalCache = isLocal = false;
+        isLocal = false;
         helper = static_cast <IHThorTempTableArg *> (queryHelper());
     }
     void start()
@@ -47,11 +47,7 @@ public:
         ActivityTimer s(totalCycles, timeActivities, NULL);
         dataLinkStart("TEMPTABLE", container.queryId());
         currentRow = 0;
-        if (!isLocalCache)
-        {
-            isLocalCache = true;
-            isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
-        }
+        isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
         eof = isLocal ? false : (container.queryJob().queryMyRank()>1);
     }
     void stop()

+ 1 - 1
thorlcr/activities/thdiskbaseslave.cpp

@@ -246,7 +246,7 @@ const char *CDiskReadSlaveActivityBase::queryLogicalFilename(unsigned index)
 
 void CDiskReadSlaveActivityBase::kill()
 {
-    if (actStarted && !abortSoon && 0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
+    if (!abortSoon && 0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
     {
         if (1 == partDescs.ordinality() && !partDescs.item(0).queryOwner().queryProperties().getPropBool("@pausefile"))
         {

+ 9 - 20
thorlcr/graph/thgraph.cpp

@@ -719,11 +719,6 @@ void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExt
     activity->preStart(parentExtractSz, parentExtract);
 }
 
-void CGraphElementBase::start()
-{
-    queryActivity()->startProcess();
-}
-
 void CGraphElementBase::initActivity()
 {
     if (activity)
@@ -1167,8 +1162,10 @@ void CGraphBase::reset()
             element.reset();
         }
     }
-    job.queryTimeReporter().reset();
-    clearNodeStats();
+    if (!queryOwner() || isGlobal())
+        job.queryTimeReporter().reset();
+    if (!queryOwner())
+        clearNodeStats();
 }
 
 void CGraphBase::addChildGraph(CGraphBase &graph)
@@ -1200,16 +1197,6 @@ bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
     return true;
 }
 
-void CGraphBase::start()
-{
-    Owned<IThorActivityIterator> iter = getTraverseIterator();
-    ForEach (*iter)
-    {
-        CGraphElementBase &act = iter->query();
-        act.start();
-    }
-}
-
 void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
 {
     if (job.queryPausing())
@@ -1305,7 +1292,6 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
         }
         if (!preStart(parentExtractSz, parentExtract)) return;
         start();
-        postStart();
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinetely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
             GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
         graphDone = true;
@@ -1685,6 +1671,7 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
     sink = xgmml->getPropBool("att[@name=\"rootGraph\"]/@value", false);
     graphId = node->getPropInt("@id");
     global = false;
+    localOnly = -1; // unset
     parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
 
     CGraphBase *graphContainer = this;
@@ -1966,7 +1953,9 @@ static bool isLocalOnly(CGraphElementBase &activity)
 
 bool CGraphBase::isLocalOnly() // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
 {
-    return ::isLocalOnly(*this);
+    if (-1 == localOnly)
+        localOnly = (int)::isLocalOnly(*this);
+    return 1==localOnly;
 }
 
 ////
@@ -2551,7 +2540,7 @@ IThorResource &queryThor()
 CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
 {
     mpTag = TAG_NULL;
-    abortSoon = actStarted = cancelledReceive = false;
+    abortSoon = cancelledReceive = false;
     baseHelper.set(container.queryHelper());
     parentExtractSz = 0;
     parentExtract = NULL;

+ 6 - 7
thorlcr/graph/thgraph.hpp

@@ -283,7 +283,6 @@ public:
     void onCreate();
     void abort(IException *e);
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
-    virtual void start();
     const bool &isOnCreated() const { return onCreateCalled; }
     const bool &isPrepared() const { return prepared; }
     CGraphBase &queryOwner() const { return *owner; }
@@ -481,6 +480,7 @@ class graph_decl CGraphBase : public CInterface, implements ILocalGraph, impleme
     CGraphElementTable containers;
     CGraphElementArray sinks;
     bool sink, complete, global;
+    int localOnly;
     activity_id parentActivityId;
     IPropertyTree *xgmml;
     CGraphTable childGraphs;
@@ -676,7 +676,7 @@ public:
     const bool isGlobal() const { return global; }
     const bool isCreated() const { return created; }
     const bool isStarted() const { return started; }
-    bool isLocalOnly();
+    bool isLocalOnly(); // this graph and all upstream dependencies
     void setCompleteEx(bool tf=true) { complete = tf; }
     const byte *setParentCtx(size32_t _parentExtractSz, const byte *parentExtract)
     {
@@ -782,8 +782,7 @@ public:
     virtual bool prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async);
     virtual void create(size32_t parentExtractSz, const byte *parentExtract);
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);
-    virtual void start();
-    virtual void postStart() { }
+    virtual void start() = 0;
     virtual bool wait(unsigned timeout);
     virtual void done();
     virtual void end();
@@ -973,7 +972,7 @@ protected:
     CGraphElementBase &container;
     Linked<IHThorArg> baseHelper;
     mptag_t mpTag; // to be used by any direct inter master<->slave communication
-    bool abortSoon, actStarted;
+    bool abortSoon;
     const bool &timeActivities; // purely for access efficiency
     size32_t parentExtractSz;
     const byte *parentExtract;
@@ -997,9 +996,9 @@ public:
     virtual void clearConnections() { }
     virtual void releaseIOs() { }
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract) { }
-    virtual void startProcess() { actStarted = true; }
+    virtual void startProcess(bool async=true) { }
     virtual bool wait(unsigned timeout) { return true; } // NB: true == success
-    virtual void reset() { receiving = abortSoon = actStarted = cancelledReceive = false; }
+    virtual void reset() { receiving = abortSoon = cancelledReceive = false; }
     virtual void done() { }
     virtual void kill() { }
     virtual void abort();

+ 21 - 5
thorlcr/graph/thgraphmaster.cpp

@@ -306,11 +306,12 @@ void CSlaveMessageHandler::main()
 
 //////////////////////
 
-CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity")
+CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
 {
     notedWarnings = createBitSet();
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
+    asyncStart = false;
     if (container.isSink())
         progressInfo.append(*new ProgressInfo);
     else
@@ -323,7 +324,7 @@ CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(
 
 CMasterActivity::~CMasterActivity()
 {
-    if (actStarted)
+    if (asyncStart)
         threaded.join();
     notedWarnings->Release();
     container.queryJob().freeMPTag(mpTag);
@@ -365,14 +366,21 @@ void CMasterActivity::main()
     }
 }
 
-void CMasterActivity::startProcess()
+void CMasterActivity::startProcess(bool async)
 {
-    CActivityBase::startProcess();
-    threaded.init(this);
+    if (async)
+    {
+        asyncStart = true;
+        threaded.start();
+    }
+    else
+        main();
 }
 
 bool CMasterActivity::wait(unsigned timeout)
 {
+    if (!asyncStart)
+        return true;
     return threaded.join(timeout);
 }
 
@@ -402,6 +410,7 @@ bool CMasterActivity::fireException(IException *_e)
 
 void CMasterActivity::reset()
 {
+    asyncStart = false;
     CActivityBase::reset();
 }
 
@@ -1959,6 +1968,13 @@ void CMasterGraph::create(size32_t parentExtractSz, const byte *parentExtract)
     }
 }
 
+void CMasterGraph::start()
+{
+    Owned<IThorActivityIterator> iter = getTraverseIterator();
+    ForEach (*iter)
+        iter->query().queryActivity()->startProcess();
+}
+
 void CMasterGraph::sendActivityInitData()
 {
     CMessageBuffer msg;

+ 3 - 1
thorlcr/graph/thgraphmaster.ipp

@@ -74,6 +74,7 @@ public:
     virtual void create(size32_t parentExtractSz, const byte *parentExtract);
 
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);
+    virtual void start();
     virtual void done();
     virtual void abort(IException *e);
 // IExceptionHandler
@@ -220,6 +221,7 @@ typedef IArrayOf<ProgressInfo> ProgressInfoArray;
 class graphmaster_decl CMasterActivity : public CActivityBase, implements IThreaded
 {
     CThreaded threaded;
+    bool asyncStart;
     MemoryBuffer *data;
     CriticalSection progressCrit;
 protected:
@@ -247,7 +249,7 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) { }
     virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb) { }
 
-    virtual void startProcess();
+    virtual void startProcess(bool async=true);
     virtual bool wait(unsigned timeout);
 
 // IExceptionHandler

+ 24 - 10
thorlcr/graph/thgraphslave.cpp

@@ -468,9 +468,20 @@ bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
     return true;
 }
 
-void CSlaveGraph::postStart()
+void CSlaveGraph::start()
 {
-    CGraphBase::postStart();
+    bool forceAsync = !queryOwner() || isGlobal();
+    Owned<IThorActivityIterator> iter = getSinkIterator();
+    unsigned sinks = 0;
+    ForEach(*iter)
+        ++sinks;
+    ForEach(*iter)
+    {
+        CGraphElementBase &container = iter->query();
+        CActivityBase *sinkAct = (CActivityBase *)container.queryActivity();
+        --sinks;
+        sinkAct->startProcess(forceAsync || 0 != sinks); // async, unless last
+    }
     if (!queryOwner())
     {
         if (globals->getPropBool("@watchdogProgressEnabled"))
@@ -627,14 +638,17 @@ void CSlaveGraph::done()
 void CSlaveGraph::end()
 {
     CGraphBase::end();
-    if (atomic_read(&nodesLoaded)) // wouldn't mean much if parallel jobs running
-        GraphPrintLog("JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", atomic_read(&cacheAdds), atomic_read(&cacheHits), atomic_read(&nodesLoaded), atomic_read(&blobCacheHits), atomic_read(&blobCacheAdds), atomic_read(&leafCacheHits), atomic_read(&leafCacheAdds), atomic_read(&nodeCacheHits), atomic_read(&nodeCacheAdds));
-    JSocketStatistics stats;
-    getSocketStatistics(stats);
-    StringBuffer s;
-    getSocketStatisticsString(stats,s);
-    GraphPrintLog("Socket statistics : %s\n",s.str());
-    resetSocketStatistics();
+    if (!queryOwner())
+    {
+        if (atomic_read(&nodesLoaded)) // wouldn't mean much if parallel jobs running
+            GraphPrintLog("JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", atomic_read(&cacheAdds), atomic_read(&cacheHits), atomic_read(&nodesLoaded), atomic_read(&blobCacheHits), atomic_read(&blobCacheAdds), atomic_read(&leafCacheHits), atomic_read(&leafCacheAdds), atomic_read(&nodeCacheHits), atomic_read(&nodeCacheAdds));
+        JSocketStatistics stats;
+        getSocketStatistics(stats);
+        StringBuffer s;
+        getSocketStatisticsString(stats,s);
+        GraphPrintLog("Socket statistics : %s\n",s.str());
+        resetSocketStatistics();
+    }
 }
 
 void CSlaveGraph::serializeStats(MemoryBuffer &mb)

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -112,7 +112,7 @@ public:
     virtual void serializeStats(MemoryBuffer &mb);
     virtual bool prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async);
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);
-    virtual void postStart();
+    virtual void start();
     virtual void create(size32_t parentExtractSz, const byte *parentExtract);
     virtual void abort(IException *e);
     virtual void done();

+ 6 - 4
thorlcr/slave/slave.cpp

@@ -51,7 +51,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 
 // ProcessSlaveActivity
 
-ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), threaded("ProcessSlaveActivity")
+ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), threaded("ProcessSlaveActivity", this)
 {
     processed = 0;
 }
@@ -67,10 +67,12 @@ ProcessSlaveActivity::~ProcessSlaveActivity()
     ActPrintLog("AFTER ProcessSlaveActivity : joining process thread");
 }
 
-void ProcessSlaveActivity::startProcess()
+void ProcessSlaveActivity::startProcess(bool async)
 {
-    CSlaveActivity::startProcess();
-    threaded.init(this);
+    if (async)
+        threaded.start();
+    else
+        main();
 }
 
 void ProcessSlaveActivity::main() 

+ 2 - 2
thorlcr/slave/slave.ipp

@@ -34,7 +34,7 @@ class ProcessSlaveActivity : public CSlaveActivity, implements IThreaded
 {
 protected:
     Owned<IException> exception;
-    CThreaded threaded;
+    CThreadedPersistent threaded;
     rowcount_t processed;
     unsigned __int64 lastCycles;
 
@@ -47,7 +47,7 @@ public:
     ProcessSlaveActivity(CGraphElementBase *container);
     ~ProcessSlaveActivity();
 
-    virtual void startProcess();
+    virtual void startProcess(bool async=true);
     virtual bool wait(unsigned timeout);
     virtual void done();
 

+ 2 - 1
thorlcr/slave/slavmain.cpp

@@ -275,7 +275,8 @@ public:
 
                         job->addSubGraph(*LINK(subGraph));
                         job->addDependencies(job->queryXGMML(), false);
-                        job->startGraph(*((CSlaveGraph *)subGraph.get()), *job, 0, NULL); // handoff to job to manage graph asynchronously from this msg
+
+                        subGraph->execute(0, NULL, true, true);
 
                         msg.clear();
                         msg.append(false);

+ 64 - 25
thorlcr/thorutil/thbuf.cpp

@@ -806,8 +806,9 @@ public:
     CRowSet(unsigned _chunk) : chunk(_chunk)
     {
     }
-    ~CRowSet()
+    void reset(unsigned _chunk)
     {
+        chunk = _chunk;
         rows.clear();
     }
     inline const unsigned queryChunk() { return chunk; }
@@ -862,6 +863,21 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
     IArrayOf<IRowStream> outputs;
     unsigned readersWaiting;
 
+    virtual void init()
+    {
+        stopped = false;
+        writeAtEof = false;
+        rowsWritten = 0;
+        readersWaiting = 0;
+        totalChunksOut = lowestChunk = 0;
+        lowestOutput = 0;
+#ifdef TRACE_WRITEAHEAD
+        totalOutChunkSize = sizeof(unsigned);
+#else
+        totalOutChunkSize = 0;
+#endif
+    }
+
     inline bool isEof(rowcount_t rowsRead)
     {
         return stopped || writeAtEof && rowsWritten == rowsRead;
@@ -920,6 +936,14 @@ protected:
         CRowSet *rowSet;
         unsigned row, rowsInRowSet;
 
+        void init()
+        {
+            rowsRead = 0;
+            currentChunkNum = 0;
+            rowsInRowSet = row = 0;
+            readerWaiting = eof = false;
+            rowSet = NULL;
+        }
         inline void doStop()
         {
             if (eof) return;
@@ -962,11 +986,12 @@ protected:
 
         COutput(CSharedWriteAheadBase &_parent, unsigned _output) : parent(_parent), output(_output)
         {
-            rowsRead = 0;
-            currentChunkNum = 0;
-            rowsInRowSet = row = 0;
-            readerWaiting = eof = false;
-            rowSet = NULL;
+            init();
+        }
+        void reset()
+        {
+            init();
+            outputOwnedRows.clear();
         }
         inline CRowSet *queryRowSet() { return rowSet; }
         const void *nextRow()
@@ -1103,20 +1128,14 @@ protected:
     virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
     virtual void flushRows() = 0;
     virtual size32_t rowSize(const void *row) = 0;
-
-
-    COutput *o1, *o2;
 public:
 
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
     CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IRowInterfaces *rowIf) : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData())
     {
-        stopped = false;
-        writeAtEof = false;
-        rowsWritten = 0;
+        init();
         minChunkSize = 0x2000;
-        readersWaiting = 0;
         size32_t minSize = meta->getMinRecordSize();
         if (minChunkSize<minSize*10) // if rec minSize bigish, ensure reasonable minChunkSize
         {
@@ -1129,20 +1148,8 @@ public:
         for (; c<outputCount; c++)
         {
             outputs.append(* new COutput(*this, c));
-
-            if (0 == c)
-                o1 = &queryCOutput(0);
-            else if (1 == c)
-                o2 = &queryCOutput(1);
         }
-        totalChunksOut = lowestChunk = 0;
         inMemRows.setown(new CRowSet(0));
-        lowestOutput = 0;
-#ifdef TRACE_WRITEAHEAD
-        totalOutChunkSize = sizeof(unsigned);
-#else
-        totalOutChunkSize = 0;
-#endif
     }
     ~CSharedWriteAheadBase()
     {
@@ -1222,6 +1229,14 @@ public:
         stopAll();
         signalReaders();
     }
+    virtual void reset()
+    {
+        init();
+        unsigned c=0;
+        for (; c<outputCount; c++)
+            queryCOutput(c).reset();
+        inMemRows->reset(0);
+    }
 friend class COutput;
 };
 
@@ -1524,6 +1539,19 @@ public:
         }
         PROGLOG("CSharedWriteAheadDisk: highOffset=%"I64F"d", highOffset);
     }
+    virtual void reset()
+    {
+        CSharedWriteAheadBase::reset();
+        loop
+        {
+            Owned<Chunk> chunk = savedChunks.dequeue();
+            if (!chunk) break;
+        }
+        freeChunks.kill();
+        freeChunksSized.kill();
+        highOffset = 0;
+        spillFileIO->setSize(0);
+    }
 };
 
 ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IRowInterfaces *rowIf, IDiskUsage *iDiskUsage)
@@ -1619,6 +1647,17 @@ public:
                 break;
         }
     }
+    virtual void reset()
+    {
+        CSharedWriteAheadBase::reset();
+        loop
+        {
+            Owned<CRowSet> rowSet = chunkPool.dequeue();
+            if (!rowSet)
+                break;
+        }
+        writerBlocked = false;
+    }
 };
 
 ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IRowInterfaces *rowIf, unsigned buffSize)

+ 1 - 0
thorlcr/thorutil/thbuf.hpp

@@ -72,6 +72,7 @@ interface ISharedSmartBuffer : extends IRowWriter
 {
     virtual IRowStream *queryOutput(unsigned output) = 0;
     virtual void cancel()=0;
+    virtual void reset() = 0;
 };
 
 extern graph_decl ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IRowInterfaces *rowif, unsigned buffSize=((unsigned)-1));

+ 1 - 2
thorlcr/thorutil/thbufdef.hpp

@@ -41,8 +41,7 @@
 #define WORKUNITWRITE_SMART_BUFFER_SIZE         (0x100000*3)            // 3MB
 #define DEFAULT_BLOCK_INPUT_BUFFER_SIZE         (0x10000)               // 64K
 #define AGGREGATE_INPUT_BUFFER_SIZE             (0x10000)               // 64K
-#define NSPLITTER_SPILL_BUFFER_SIZE             (0x10000)               // 64K
-#define NSPLITTER_BLOCK_BUFFER_SIZE             (0x10000)               // 64K
+#define NSPLITTER_SPILL_BUFFER_SIZE             (0x100000)              // 1MB
 #define DISTRIBUTE_PULL_BUFFER_SIZE             (0x100000*32)           // 32MB
 #define SORT_BUFFER_TOTAL                       (0x100000*20)           // 20MB (estimate)
 #define DISTRIBUTE_SINGLE_BUFFER_SIZE           (0x10000)               // 64K  - NB per node and multiplied by async send