Prechádzať zdrojové kódy

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Fix remaining issues picked up by regression suite run with strands
forced on.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 rokov pred
rodič
commit
45c16b70c5

+ 38 - 0
common/thorhelper/thorcommon.cpp

@@ -27,6 +27,7 @@
 #include "thorcommon.ipp"
 #include "eclrtl.hpp"
 #include "rtlread_imp.hpp"
+#include <algorithm>
 
 #include "thorstep.hpp"
 
@@ -1677,6 +1678,43 @@ IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *li
     return new CDiskMerger(rowInterfaces, linker, tempnamebase);
 }
 
+//---------------------------------------------------------------------------------------------------------------------
+
+void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
+{
+    if (totalCycles)
+    {
+        builder.addStatistic(StWhenFirstRow, firstRow);
+        builder.addStatistic(StTimeElapsed, elapsed());
+        builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
+        builder.addStatistic(StTimeFirstExecute, latency());
+    }
+}
+
+void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
+{
+    if (other.totalCycles)
+    {
+        if (totalCycles)
+        {
+            //Record the earliest start, the latest end, the longest latencies
+            cycle_t thisLatency = latencyCycles();
+            cycle_t otherLatency = other.latencyCycles();
+            cycle_t maxLatency = std::max(thisLatency, otherLatency);
+            if (startCycles > other.startCycles)
+            {
+                startCycles = other.startCycles;
+                firstRow =other.firstRow;
+            }
+            firstExitCycles = startCycles + maxLatency;
+            if (endCycles < other.endCycles)
+                endCycles = other.endCycles;
+            totalCycles += other.totalCycles;
+        }
+        else
+            *this = other;
+    }
+}
 
 
 

+ 12 - 17
common/thorhelper/thorcommon.hpp

@@ -131,9 +131,6 @@ interface THORHELPER_API IDiskMerger : extends IInterface
 
 extern THORHELPER_API IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase);
 
-extern THORHELPER_API void testDiskSort();
-
-
 
 #define TIME_ACTIVITIES
 class ActivityTimeAccumulator
@@ -142,11 +139,7 @@ class ActivityTimeAccumulator
 public:
     ActivityTimeAccumulator()
     {
-        startCycles = 0;
-        totalCycles = 0;
-        endCycles = 0;
-        firstRow = 0;
-        firstExitCycles = 0;
+        reset();
     }
 public:
     cycle_t startCycles; // Wall clock time of first entry to this activity
@@ -158,17 +151,19 @@ public:
     // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
     inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
     // Return the total amount of time (in nanoseconds) spent in the first call of this activity (first entry to first exit)
-    inline unsigned __int64 latency() const { return cycle_to_nanosec(firstExitCycles-startCycles); }
+    inline unsigned __int64 latency() const { return cycle_to_nanosec(latencyCycles()); }
+    inline cycle_t latencyCycles() const { return firstExitCycles-startCycles; }
+
+    void addStatistics(IStatisticGatherer & builder) const;
+    void merge(const ActivityTimeAccumulator & other);
 
-    void addStatistics(IStatisticGatherer & builder) const
+    void reset()
     {
-        if (totalCycles)
-        {
-            builder.addStatistic(StWhenFirstRow, firstRow);
-            builder.addStatistic(StTimeElapsed, elapsed());
-            builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
-            builder.addStatistic(StTimeFirstExecute, latency());
-        }
+        startCycles = 0;
+        totalCycles = 0;
+        endCycles = 0;
+        firstRow = 0;
+        firstExitCycles = 0;
     }
 };
 

+ 2 - 0
roxie/ccd/ccd.hpp

@@ -433,6 +433,8 @@ extern unsigned defaultFullKeyedJoinPreload;
 extern unsigned defaultKeyedJoinPreload;
 extern unsigned defaultPrefetchProjectPreload;
 extern unsigned defaultStrandBlockSize;
+extern unsigned defaultForceNumStrands;
+
 extern bool defaultCheckingHeap;
 
 extern unsigned slaveQueryReleaseDelaySeconds;

+ 3 - 0
roxie/ccd/ccdmain.cpp

@@ -133,6 +133,7 @@ unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
 unsigned defaultStrandBlockSize = 512;
+unsigned defaultForceNumStrands = 0;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
 unsigned coresPerQuery = 0;
@@ -764,6 +765,8 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
         defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
         defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
+        defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
+        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
         defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
 
         slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);

+ 5 - 1
roxie/ccd/ccdquery.cpp

@@ -290,6 +290,7 @@ QueryOptions::QueryOptions()
     prefetchProjectPreload = defaultPrefetchProjectPreload;
     bindCores = coresPerQuery;
     strandBlockSize = defaultStrandBlockSize;
+    forceNumStrands = defaultForceNumStrands;
 
     checkingHeap = defaultCheckingHeap;
     disableLocalOptimizations = false;  // No global default for this
@@ -318,6 +319,7 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     prefetchProjectPreload = other.prefetchProjectPreload;
     bindCores = other.bindCores;
     strandBlockSize = other.strandBlockSize;
+    forceNumStrands = other.forceNumStrands;
 
     checkingHeap = other.checkingHeap;
     disableLocalOptimizations = other.disableLocalOptimizations;
@@ -356,6 +358,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
     updateFromWorkUnit(bindCores, wu, "bindCores");
     updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
+    updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
 
     updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
     updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
@@ -404,6 +407,7 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
         updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
         updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
+        updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
 
         updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
         // Note: disableLocalOptimizations is not permitted at context level (too late)
@@ -696,7 +700,7 @@ protected:
         case TAKsimpleaction:
             return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
         case TAKparse:
-            return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, this);
+            return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
         case TAKworkunitwrite:
             return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
         case TAKdictionaryworkunitwrite:

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -110,6 +110,7 @@ public:
     int prefetchProjectPreload;
     int bindCores;
     unsigned strandBlockSize;
+    unsigned forceNumStrands;
 
     bool checkingHeap;
     bool disableLocalOptimizations;

+ 210 - 137
roxie/ccd/ccdserver.cpp

@@ -912,6 +912,7 @@ protected:
     IHThorArg *colocalParent;
     IEngineRowAllocator *rowAllocator;
     CriticalSection statecrit;
+    CriticalSection statscrit;
 
     mutable CRuntimeStatisticCollection stats;
     unsigned processed;
@@ -923,6 +924,8 @@ protected:
     bool debugging;
     bool timeActivities;
     bool aborted;
+    bool connected = false;
+
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -932,7 +935,7 @@ public:
           factory(_factory),
           basehelper(_factory->getHelper()),
           activityId(_factory->queryId()),
-          stats(_factory ? factory->queryStatsMapping() : actStatistics)
+          stats(_factory ? _factory->queryStatsMapping() : actStatistics)
     {
         input = NULL;
         sourceIdx = 0;
@@ -1008,18 +1011,32 @@ public:
     {
         return *this;
     }
-
+    inline const StatisticsMapping & queryStatsMapping() const
+    {
+        return factory ? factory->queryStatsMapping() : actStatistics;
+    }
     virtual void mergeStats(MemoryBuffer &buf)
     {
         stats.deserializeMerge(buf);
     }
-
+    void mergeStrandStats(unsigned strandProcessed, const ActivityTimeAccumulator & strandCycles, const CRuntimeStatisticCollection & strandStats)
+    {
+        CriticalBlock cb(statscrit);
+        processed += strandProcessed;
+        totalCycles.merge(strandCycles);
+        stats.merge(strandStats);
+    }
     inline void createRowAllocator()
     {
         if (!rowAllocator) 
             rowAllocator = ctx->queryCodeContext()->getRowAllocator(meta.queryOriginal(), activityId);
     }
 
+    inline ICodeContext *queryCodeContext()
+    {
+        return ctx->queryCodeContext();
+    }
+
     // MORE - most of this is copied from ccd.hpp - can't we refactor?
     virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
     {
@@ -1182,8 +1199,7 @@ public:
         basehelper.onStart(parentExtract, NULL);
         if (factory)
             factory->noteStarted();
-        if (junction)
-            junction->ready();
+        startJunction(junction);
     }
 
     void executeDependencies(unsigned parentExtractSize, const byte *parentExtract, unsigned controlId)
@@ -1298,6 +1314,7 @@ public:
                     }
                 }
 #endif
+                resetJunction(junction);
                 ForEachItemIn(idx, dependencies)
                     dependencies.item(idx).reset();
                 localCycles = queryLocalCycles();  // We can't call queryLocalCycles() in the destructor, so save the information here when we can.
@@ -1335,20 +1352,21 @@ public:
     {
         throw MakeStringException(ROXIE_SINK, "Internal error: stopSink() requires a suitable sink");
     }
-
     virtual void connectOutputStreams(unsigned flags)
     {
         if (input && !inputStream)
             inputStream = connectSingleStream(ctx, input, sourceIdx, junction, flags);
-        connectDependencies(flags);
+        if (!connected)
+        {
+            connectDependencies(flags);
+            connected = true;
+        }
     }
 
     void connectDependencies(unsigned flags)
     {
         ForEachItemIn(i, dependencies)
-        {
             dependencies.item(i).connectOutputStreams(flags);
-        }
     }
 
     virtual __int64 evaluate() 
@@ -1371,6 +1389,7 @@ public:
         assertex(!idx);
         input = _in;
         sourceIdx = _sourceIdx;
+        inputStream = NULL;
     }
 
     virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
@@ -1521,11 +1540,11 @@ class StrandOptions
 {
     // Typically set from hints, common to many stranded activities
 public:
-    StrandOptions(IPropertyTree &_graphNode)
+    explicit StrandOptions(IPropertyTree &_graphNode)
     {
-        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
+        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 0);
         blockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
-        forcePreserveOrder = _graphNode.getPropBool("hint[@name='strandordered']/@value");
+        forcePreserveOrder = _graphNode.getPropBool("hint[@name='strandordered']/@value", true);
     }
     StrandOptions(const StrandOptions &from, IRoxieSlaveContext *ctx)
     {
@@ -1535,10 +1554,12 @@ public:
 
         if (!blockSize)
             blockSize = ctx->queryOptions().strandBlockSize;
+        if (!numStrands)
+            numStrands = ctx->queryOptions().forceNumStrands;
         // Could consider some similar option for numStrands...
     }
 public:
-    unsigned numStrands = 1;
+    unsigned numStrands = 0;
     unsigned blockSize = 0;
     bool forcePreserveOrder = false;
 };
@@ -1548,28 +1569,41 @@ class StrandProcessor : public CInterfaceOf<IEngineRowStream>
 protected:
     CRoxieServerActivity &parent;
     IEngineRowAllocator *rowAllocator;
-
     IEngineRowStream *inputStream;
-    bool timeActivities;
     ActivityTimeAccumulator totalCycles;
+    mutable CRuntimeStatisticCollection stats;
     unsigned processed = 0;
+    unsigned numProcessedLastGroup = 0;
+    const bool timeActivities;
     bool stopped = false;
+
 public:
-    explicit StrandProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream)
-      : parent(_parent), inputStream(_inputStream)
+    explicit StrandProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream, bool needsAllocator)
+      : parent(_parent), inputStream(_inputStream), stats(parent.queryStatsMapping()), timeActivities(_parent.timeActivities)
+    {
+        if (needsAllocator)
+            rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), roxiemem::RHFunique);
+        else
+            rowAllocator = NULL;
+    }
+    ~StrandProcessor()
     {
-        timeActivities = parent.timeActivities;
-        rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), roxiemem::RHFunique);
+        ::Release(rowAllocator);
+    }
+    virtual void start()
+    {
+        processed = 0;
+        numProcessedLastGroup = 0;
+        totalCycles.reset();
+        stats.reset();
     }
-    virtual void start() = 0;
     virtual void stop()
     {
-        parent.processed += processed;  // MORE - Should be atomic
-        // Also merge the cycles up somehow (and any other relevant stats)
         if (!stopped)
         {
             inputStream->stop();
             parent.stop();
+            parent.mergeStrandStats(processed, totalCycles, stats);
         }
         stopped = true;
     }
@@ -1607,15 +1641,15 @@ public:
             active++;
         }
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
-        if (splitter)
-            splitter->ready();
+        startJunction(splitter);
     }
 
     virtual void reset()
     {
         // assertex(active==0);  Disable for now as we know that stop() is nt being called on the strands.
-        if (splitter)
-            splitter->reset();
+        ForEachItemIn(idx, strands)
+            strands.item(idx).reset();
+        resetJunction(splitter);
         CRoxieServerActivity::reset();
     }
 
@@ -1631,6 +1665,7 @@ public:
     virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
     {
         assertex(idx == 0);
+        assertex(strands.empty());
         CRoxieServerActivity::connectDependencies(flags);
         Owned <IStrandJunction> recombiner;
         if (strandOptions.numStrands == 1)
@@ -1651,7 +1686,7 @@ public:
                 // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
                 if (flags & SFpreserveOrder)
                 {
-                    branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, false));
+                    branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, input->queryOutputMeta()->isGrouped()));
                     splitter.set(branch->queryInputJunction());
                     recombiner.set(branch->queryOutputJunction());
                 }
@@ -1698,8 +1733,7 @@ protected:
             ForEachItemIn(idx, strands)
                 strands.item(idx).start();
             input->start(parentExtractSize, parentExtract, false);
-            if (splitter)
-                splitter->ready();
+            startJunction(splitter);
         }
         else
         {
@@ -1842,6 +1876,7 @@ public:
         eof = false;
         eog = false;
         input->start(parentExtractSize, parentExtract, paused);
+        startJunction(junction);
         try
         {
             if (preload && !paused)
@@ -1895,6 +1930,7 @@ public:
     void reset()
     {
         input->reset();
+        resetJunction(junction);
     }
 
     virtual int run()
@@ -2213,6 +2249,7 @@ public:
     {
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
         input1->start(parentExtractSize, parentExtract, paused);
+        startJunction(junction1);
     }
 
     virtual void stop()
@@ -2253,6 +2290,7 @@ public:
         CRoxieServerActivity::reset(); 
         if (input1)
             input1->reset();
+        resetJunction(junction1);
     }
 
     virtual void setInput(unsigned idx, unsigned _sourceIdx, IFinalRoxieInput *_in)
@@ -2338,6 +2376,10 @@ public:
     {
         for (unsigned i = 0; i < numInputs; i++)
             inputArray[i]->reset();
+        for (unsigned iS = 0; iS < numStreams; iS++)
+        {
+            resetJunction(junctionArray[iS]);
+        }
         CRoxieServerActivity::reset(); 
     }
 
@@ -2374,6 +2416,10 @@ public:
         {
             inputArray[i]->start(parentExtractSize, parentExtract, paused);
         }
+        for (unsigned iS = 0; iS < numStreams; iS++)
+        {
+            startJunction(junctionArray[iS]);
+        }
     }
 
     virtual void stop()
@@ -5726,6 +5772,7 @@ public:
     {
         CriticalBlock procedure(cs);
         input->start(parentExtractSize, parentExtract, paused);
+        startJunction(junction);
     }
     virtual void stop()
     {
@@ -5736,6 +5783,7 @@ public:
     {
         CriticalBlock procedure(cs);
         input->reset();
+        resetJunction(junction);
     }
     virtual void resetEOF()
     {
@@ -5814,6 +5862,7 @@ public:
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) 
     {
         input->start(parentExtractSize, parentExtract, paused);
+        startJunction(junction);
     }
     virtual void stop()
     {
@@ -5822,6 +5871,7 @@ public:
     virtual void reset()
     { 
         input->reset();
+        resetJunction(junction);
     }
 
     virtual const void * nextRow()
@@ -5910,7 +5960,7 @@ public:
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
-        input->start(savedParentExtractSize, savedParentExtract, paused);
+        CIndirectRoxieInput::start(savedParentExtractSize, savedParentExtract, paused);
     }
 
     void setParentExtract(unsigned _savedParentExtractSize, const byte * _savedParentExtract)
@@ -6149,6 +6199,7 @@ public:
     {
         if (id == sequence)
         {
+
             CRoxieServerActivity::setInput(0, _sourceIdx, _input);
             connectOutputStreams(0);
             return true;
@@ -6381,6 +6432,7 @@ public:
                 }
             }
         }
+        startJunction(iterJunction);
     }
 
     virtual void stop()
@@ -6396,8 +6448,10 @@ public:
             CriticalBlock b(iterCrit);
             if (iterInput)
                 iterInput->reset();
+            resetJunction(iterJunction);
             iterInput.clear();
             iterStream.clear();
+            iterJunction.clear();
         }
         CRoxieServerActivity::reset(); 
     };
@@ -12667,6 +12721,7 @@ public:
             {
                 selectedStream = streamArray[i];
                 inputArray[i]->start(savedParentExtractSize, savedParentExtract, false);  // Assumes 1:1 mapping streams to inputs
+                startJunction(junctionArray[i]);
                 const void * next = selectedStream->nextRow();
                 if (next)
                 {
@@ -13460,18 +13515,13 @@ class CRoxieServerStrandedProjectActivity : public CRoxieServerStrandedActivity
     class ProjectProcessor : public StrandProcessor
     {
     protected:
-        unsigned numProcessedLastGroup = 0;
         IHThorProjectArg &helper;
 
     public:
         ProjectProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream, IHThorProjectArg &_helper)
-        : StrandProcessor(_parent, _inputStream), helper(_helper)
+        : StrandProcessor(_parent, _inputStream, true), helper(_helper)
         {
         }
-        virtual void start()
-        {
-            numProcessedLastGroup = 0;
-        }
         virtual const void * nextRow()
         {
             ActivityTimer t(totalCycles, timeActivities);
@@ -14922,9 +14972,10 @@ public:
         inputExtractMapper->setInput(_sourceIdx, _in);
     }
 
+
     virtual void connectOutputStreams(unsigned flags)
     {
-        inputExtractMapper->connectOutputStreams(ctx, flags);
+        //NB: inputExtractMapper is not connected at this point - only if/when it isused from within the graph
         CRoxieServerActivity::connectOutputStreams(flags);
     }
 
@@ -14943,6 +14994,7 @@ public:
 
         createExpandedGraph(GraphExtractBuilder.size(), GraphExtractBuilder.getbytes(), probeManager);
         resultInput->start(GraphExtractBuilder.size(), GraphExtractBuilder.getbytes(), paused);
+        startJunction(resultJunction);
     }
 
     virtual void stop()
@@ -14956,8 +15008,10 @@ public:
     {
         if (resultInput)
             resultInput->reset();
+        resetJunction(resultJunction);
         resultInput = NULL;
         resultStream = NULL;
+        resultJunction.clear();
         outputs.kill();
         iterationGraphs.kill(); // must be done after all activities killed
         if (probeManager)
@@ -15621,12 +15675,16 @@ public:
             ForEachItemIn(i, selectedInputs)
                 selectedInputs.item(i)->start(parentExtractSize, parentExtract, paused);
         }
+        ForEachItemIn(i, selectedStreams)
+            startJunction(resultJunctions[i]);
     }
 
     virtual void reset()    
     {
         resultReaders.kill();
         CRoxieServerNWayInputBaseActivity::reset();
+        ForEachItemIn(i, selectedStreams)
+            resetJunction(resultJunctions[i]);
     }
 
     virtual void gatherIterationUsage(IRoxieServerLoopResultProcessor & processor, unsigned parentExtractSize, const byte * parentExtract)
@@ -15789,7 +15847,7 @@ class CRoxieServerNaryActivity : public CRoxieServerMultiInputActivity
 {
 public:
     CRoxieServerNaryActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
-        : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs)
+        : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), expandedJunctions(nullptr)
     {
     }
 
@@ -15810,14 +15868,18 @@ public:
         ForEachItemIn(idx, expandedInputs)
         {
             expandedStreams.append(connectSingleStream(ctx, expandedInputs.item(idx), 0, expandedJunctions[idx], 0));  // MORE - is the index 0 right?
+            startJunction(expandedJunctions[idx]);
         }
     }
 
     virtual void reset()    
     {
+        ForEachItemIn(idx, expandedInputs)
+            resetJunction(expandedJunctions[idx]);
         expandedInputs.kill();
         expandedStreams.kill();
         delete [] expandedJunctions;
+        expandedJunctions = nullptr;
         CRoxieServerMultiInputActivity::reset(); 
     }
 
@@ -16162,12 +16224,14 @@ public:
                 whichInput -= numRealInputs;
             }
         }
+        startJunction(selectedJunction);
     }
 
     virtual void reset()    
     {
         selectedInput = NULL;
         selectedStream = NULL;
+        resetJunction(selectedJunction);
         selectedJunction.clear();
         CRoxieServerMultiInputActivity::reset(); 
     }
@@ -19245,6 +19309,7 @@ public:
         if (cond >= numInputs)
             cond = numInputs - 1;
         inputArray[cond]->start(parentExtractSize, parentExtract, paused);
+        startJunction(junctionArray[cond]);
         assertex(numInputs==numStreams);
         for (unsigned idx = 0; idx < numStreams; idx++)
         {
@@ -19347,13 +19412,17 @@ public:
         if (cond)
         {
             inputTrue->start(parentExtractSize, parentExtract, paused);
+            startJunction(junctionTrue);
             if (streamFalse)
                 streamFalse->stop(); // Note: stopping unused branches early helps us avoid buffering splits too long.
         }
         else 
         {
             if (inputFalse)
+            {
                 inputFalse->start(parentExtractSize, parentExtract, paused);
+                startJunction(junctionFalse);
+            }
             streamTrue->stop();
         }
         unusedStopped = true;
@@ -19407,6 +19476,8 @@ public:
         inputTrue->reset();
         if (inputFalse)
             inputFalse->reset();
+        resetJunction(junctionTrue);
+        resetJunction(junctionFalse);
         unusedStopped = false;
     }
 
@@ -19873,130 +19944,134 @@ extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(u
 }
 
 //=================================================================================
-class CRoxieServerParseActivity : public CRoxieServerActivity, implements IMatchedAction
-{
-    IHThorParseArg &helper;
-    INlpParser * parser;
-    INlpResultIterator * rowIter;
-    const void * in;
-    char * curSearchText;
-    INlpParseAlgorithm * algorithm;
-    size32_t curSearchTextLen;
-    bool anyThisGroup;
 
-    bool processRecord(const void * inRec)
-    {
-        if (helper.searchTextNeedsFree())
-            rtlFree(curSearchText);
-
-        curSearchTextLen = 0;
-        curSearchText = NULL;
-        helper.getSearchText(curSearchTextLen, curSearchText, inRec);
-
-        return parser->performMatch(*this, in, curSearchTextLen, curSearchText);
-    }
 
-public:
-    CRoxieServerParseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, INlpParseAlgorithm * _algorithm)
-        : CRoxieServerActivity(_ctx, _factory, _probeManager),
-        helper((IHThorParseArg &)basehelper), algorithm(_algorithm)
-    {
-        parser = NULL;
-        rowIter = NULL;
-        in = NULL;
-        curSearchText = NULL;
-        anyThisGroup = false;
-        curSearchTextLen = 0;
-    }
+class CRoxieServerStrandedParseActivity : public CRoxieServerStrandedActivity
+{
+    INlpParseAlgorithm * algorithm;
 
-    ~CRoxieServerParseActivity()
+    class ParseProcessor : public StrandProcessor, implements IMatchedAction
     {
-        ::Release(parser);
-    }
-
-    virtual bool needsAllocator() const { return true; }
+    protected:
+        IHThorParseArg &helper;
+        INlpParser * parser;
+        INlpResultIterator * rowIter;
+        const void * in;
+        char * curSearchText;
+        size32_t curSearchTextLen;
 
-    virtual void onCreate(IHThorArg *_colocalParent)
-    {
-        CRoxieServerActivity::onCreate(_colocalParent);
-        parser = algorithm->createParser(ctx->queryCodeContext(), activityId, helper.queryHelper(), &helper);
-        rowIter = parser->queryResultIter();
-    }
+        bool processRecord(const void * inRec)
+        {
+            if (helper.searchTextNeedsFree())
+                rtlFree(curSearchText);
 
-    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
-    {
-        anyThisGroup = false;
-        curSearchTextLen = 0;
-        curSearchText = NULL;
-        in = NULL;
-        parser->reset();
-        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
-    }
+            curSearchTextLen = 0;
+            curSearchText = NULL;
+            helper.getSearchText(curSearchTextLen, curSearchText, inRec);
 
-    virtual void reset()
-    {
-        if (helper.searchTextNeedsFree())
-            rtlFree(curSearchText);
-        curSearchText = NULL;
-        ReleaseClearRoxieRow(in);
-        CRoxieServerActivity::reset();
-    }
+            return parser->performMatch(*this, in, curSearchTextLen, curSearchText);
+        }
 
-    virtual unsigned onMatch(ARowBuilder & self, const void * curRecord, IMatchedResults * results, IMatchWalker * walker)
-    {
-        try
+    public:
+        ParseProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream, IHThorParseArg &_helper, INlpParseAlgorithm * _algorithm)
+        : StrandProcessor(_parent, _inputStream, true), helper(_helper)
         {
-            return helper.transform(self, curRecord, results, walker);
+            parser = _algorithm->createParser(parent.queryCodeContext(), parent.queryId(), helper.queryHelper(), &helper);
+            rowIter = parser->queryResultIter();
+            in = NULL;
+            curSearchText = NULL;
+            curSearchTextLen = 0;
         }
-        catch (IException *E)
+        ~ParseProcessor()
         {
-            throw makeWrappedException(E);
+            ::Release(parser);
         }
-    }
-
-    virtual const void * nextRow()
-    {
-        ActivityTimer t(totalCycles, timeActivities);
-        loop
+        virtual void start()
         {
-            if (rowIter->isValid())
-            {
-                anyThisGroup = true;
-                OwnedConstRoxieRow out = rowIter->getRow();
-                rowIter->next();
-                processed++;
-                return out.getClear();
-            }
-
+            numProcessedLastGroup = 0;
+            curSearchTextLen = 0;
+            curSearchText = NULL;
+            in = NULL;
+            parser->reset();
+            StrandProcessor::start();
+        }
+        virtual void reset()
+        {
+            if (helper.searchTextNeedsFree())
+                rtlFree(curSearchText);
+            curSearchText = NULL;
             ReleaseClearRoxieRow(in);
-            in = inputStream->nextRow();
-            if (!in)
+            StrandProcessor::reset();
+        }
+        virtual const void * nextRow()
+        {
+            ActivityTimer t(totalCycles, timeActivities);
+            loop
             {
-                if (anyThisGroup)
+                if (rowIter->isValid())
                 {
-                    anyThisGroup = false;
-                    return NULL;
+                    OwnedConstRoxieRow out = rowIter->getRow();
+                    rowIter->next();
+                    processed++;
+                    return out.getClear();
                 }
+
+                ReleaseClearRoxieRow(in);
                 in = inputStream->nextRow();
                 if (!in)
-                    return NULL;
-            }
+                {
+                    if (processed == numProcessedLastGroup)
+                        in = inputStream->nextRow();
+                    if (!in)
+                    {
+                        processed = numProcessedLastGroup;
+                        return NULL;
+                    }
 
-            processRecord(in);
-            rowIter->first();
+                    in = inputStream->nextRow();
+                    if (!in)
+                        return NULL;
+                }
+
+                processRecord(in);
+                rowIter->first();
+            }
         }
+        virtual unsigned onMatch(ARowBuilder & self, const void * curRecord, IMatchedResults * results, IMatchWalker * walker)
+        {
+            try
+            {
+                return helper.transform(self, curRecord, results, walker);
+            }
+            catch (IException *E)
+            {
+                throw parent.makeWrappedException(E);
+            }
+        }
+    };
+
+public:
+    CRoxieServerStrandedParseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, INlpParseAlgorithm * _algorithm, const StrandOptions &_strandOptions)
+        : CRoxieServerStrandedActivity(_ctx, _factory, _probeManager, _strandOptions), algorithm(_algorithm)
+    {
     }
 
+    virtual StrandProcessor *createStrandProcessor(IEngineRowStream *instream)
+    {
+        return new ParseProcessor(*this, instream, (IHThorParseArg &) basehelper, algorithm);
+    }
 };
 
+
 class CRoxieServerParseActivityFactory : public CRoxieServerActivityFactory
 {
     Owned<INlpParseAlgorithm> algorithm;
     Owned<IHThorParseArg> helper;
+    StrandOptions strandOptions;
 
 public:
-    CRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IResourceContext *rc)
-        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
+    CRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, IResourceContext *rc)
+        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind), strandOptions(_graphNode)
     {
         helper.setown((IHThorParseArg *) helperFactory());
         algorithm.setown(createThorParser(rc, *helper));
@@ -20004,13 +20079,13 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
     {
-        return new CRoxieServerParseActivity(_ctx, this, _probeManager, algorithm);
+        return new CRoxieServerStrandedParseActivity(_ctx, this, _probeManager, algorithm, strandOptions);
     }
 };
 
-IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IResourceContext *rc)
+IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, IResourceContext *rc)
 {
-    return new CRoxieServerParseActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, rc);
+    return new CRoxieServerParseActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode, rc);
 }
 
 //=====================================================================================================
@@ -24234,7 +24309,6 @@ class CRoxieServerFullKeyedJoinHead: public CRoxieServerActivity, implements IRe
     Owned<const IResolvedFile> varFileInfo;
     IFinalRoxieInput *indexReadInput;
     unsigned indexReadIdx = 0;
-    Owned<IStrandJunction> indexReadJunction;
     IIndexReadActivityInfo *rootIndex;
 
 public:
@@ -24284,8 +24358,7 @@ public:
     virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
     {
         puller.connectOutputStreams(ctx, flags);
-        if (indexReadInput)
-            connectSingleStream(ctx, indexReadInput, indexReadIdx, indexReadJunction, flags);  // We never actually pull the stream
+        //No rows are read from indexReadInput, so no need to extract the streams
         return CRoxieServerActivity::getOutputStreams(ctx, idx, streams, multiOk, flags);
     }
 

+ 1 - 1
roxie/ccd/ccdserver.hpp

@@ -396,7 +396,7 @@ extern IRoxieServerActivityFactory *createRoxieServerSkipLimitActivityFactory(un
 extern IRoxieServerActivityFactory *createRoxieServerCatchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerCaseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
 extern IRoxieServerActivityFactory *createRoxieServerIfActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
-extern IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IResourceContext *rc);
+extern IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, IResourceContext *rc);
 extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);