瀏覽代碼

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Refactor rather more to make it easier to support more stranded activities.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父節點
當前提交
b7709802ac
共有 1 個文件被更改,包括 239 次插入128 次删除
  1. 239 128
      roxie/ccd/ccdserver.cpp

+ 239 - 128
roxie/ccd/ccdserver.cpp

@@ -894,6 +894,7 @@ extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxi
 
 class CRoxieServerActivity : public CInterface, implements IRoxieServerActivity, implements IFinalRoxieInput, implements IEngineRowStream, implements IRoxieContextLogger
 {
+    friend class StrandProcessor;
 protected:
     IFinalRoxieInput *input;
     unsigned sourceIdx;
@@ -1235,7 +1236,7 @@ public:
         }
     }
 
-    inline void stop()
+    virtual void stop()
     {
         // NOTE - don't be tempted to skip the stop for activities that are reset - splitters need to see the stops
         if (state != STATEstopped)
@@ -1460,7 +1461,6 @@ public:
 
 class CRoxieServerLateStartActivity : public CRoxieServerActivity
 {
-
 protected:
     bool prefiltered;
     bool eof;
@@ -1513,6 +1513,224 @@ public:
         CRoxieServerActivity::reset();
         prefiltered = false;
     }
+};
+
+//=================================================================================
+
+class StrandOptions
+{
+    // Typically set from hints, common to many stranded activities
+public:
+    StrandOptions(IPropertyTree &_graphNode)
+    {
+        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
+        blockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
+        forcePreserveOrder = _graphNode.getPropBool("hint[@name='strandordered']/@value");
+    }
+    StrandOptions(const StrandOptions &from, IRoxieSlaveContext *ctx)
+    {
+        numStrands = from.numStrands;
+        blockSize = from.blockSize;
+        forcePreserveOrder = from.forcePreserveOrder;
+
+        if (!blockSize)
+            blockSize = ctx->queryOptions().strandBlockSize;
+        // Could consider some similar option for numStrands...
+    }
+public:
+    unsigned numStrands = 1;
+    unsigned blockSize = 0;
+    bool forcePreserveOrder = false;
+};
+
+class StrandProcessor : public CInterfaceOf<IEngineRowStream>
+{
+protected:
+    CRoxieServerActivity &parent;
+    IEngineRowAllocator *rowAllocator;
+
+    IEngineRowStream *inputStream;
+    bool timeActivities;
+    ActivityTimeAccumulator totalCycles;
+    unsigned processed = 0;
+    bool stopped = false;
+public:
+    explicit StrandProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream)
+      : parent(_parent), inputStream(_inputStream)
+    {
+        timeActivities = parent.timeActivities;
+        rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), roxiemem::RHFunique);
+    }
+    virtual void start() = 0;
+    virtual void stop()
+    {
+        parent.processed += processed;  // MORE - Should be atomic
+        // Also merge the cycles up somehow (and any other relevant stats)
+        if (!stopped)
+        {
+            inputStream->stop();
+            parent.stop();
+        }
+        stopped = true;
+    }
+    virtual void reset()
+    {
+        stopped = false;
+    }
+    virtual void resetEOF()
+    {
+        inputStream->resetEOF();
+    }
+};
+
+class CRoxieServerStrandedActivity : public CRoxieServerActivity
+{
+protected:
+    StrandOptions strandOptions;
+    IArrayOf<StrandProcessor> strands;
+    Owned<IStrandBranch> branch;
+    Owned<IStrandJunction> splitter;
+    unsigned active;  // SHould really be atomic
+public:
+    CRoxieServerStrandedActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
+        : CRoxieServerActivity(_ctx, _factory, _probeManager),
+          strandOptions(_strandOptions, ctx)
+    {
+        active = 0;
+    }
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        ForEachItemIn(idx, strands)
+        {
+            strands.item(idx).start();
+            active++;
+        }
+        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        if (splitter)
+            splitter->ready();
+    }
+
+    virtual void reset()
+    {
+        // assertex(active==0);  Disable for now as we know that stop() is nt being called on the strands.
+        if (splitter)
+            splitter->reset();
+        CRoxieServerActivity::reset();
+    }
+
+    virtual void stop()
+    {
+        // Called from the strands... which should ensure that stop is not called more than once per strand
+        assertex(active);
+        active--;
+        if (!active)
+            CRoxieServerActivity::stop();
+    }
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
+    {
+        assertex(idx == 0);
+        CRoxieServerActivity::connectDependencies(flags);
+        Owned <IStrandJunction> recombiner;
+        if (strandOptions.numStrands == 1)
+        {
+            // 1 means explicitly requested single-strand.
+            IEngineRowStream *instream = connectSingleStream(ctx, input, sourceIdx, junction, flags);
+            strands.append(*createStrandProcessor(instream));
+        }
+        else
+        {
+            if (strandOptions.forcePreserveOrder)
+                flags |= SFpreserveOrder;
+            PointerArrayOf<IEngineRowStream> instreams;
+            recombiner.setown(input->getOutputStreams(ctx, sourceIdx, instreams, strandOptions.numStrands != 1, flags));
+            if (instreams.length() == 1 && strandOptions.numStrands > 1)             // 0 means did not specify - we should use the strands that our upstream provides
+            {
+                assertex(recombiner == NULL);
+                // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
+                if (flags & SFpreserveOrder)
+                {
+                    branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, false));
+                    splitter.set(branch->queryInputJunction());
+                    recombiner.set(branch->queryOutputJunction());
+                }
+                else
+                {
+                    splitter.setown(createStrandJunction(ctx->queryRowManager(), 1, strandOptions.numStrands, strandOptions.blockSize, false));
+                }
+                splitter->setInput(0, instreams.item(0));
+                for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
+                    strands.append(*createStrandProcessor(splitter->queryOutput(strandNo)));
+            }
+            else
+            {
+                // Ignore my hint and just use the width already split into...
+                ForEachItemIn(strandNo, instreams)
+                    strands.append(*createStrandProcessor(instreams.item(strandNo)));
+            }
+        }
+        ForEachItemIn(i, strands)
+            streams.append(&strands.item(i));
+        return recombiner.getClear();
+    }
+
+    virtual StrandProcessor *createStrandProcessor(IEngineRowStream *instream) = 0;
+
+    virtual const void * nextRow()
+    {
+        throwUnexpected();
+    }
+};
+
+class CRoxieServerStrandedLateStartActivity : public CRoxieServerStrandedActivity
+{
+protected:
+    bool prefiltered;
+    bool eof;
+
+    void lateStart(unsigned parentExtractSize, const byte *parentExtract, bool any)
+    {
+        prefiltered = !any;
+        eof = prefiltered;
+        if (!prefiltered)
+        {
+            ForEachItemIn(idx, strands)
+                strands.item(idx).start();
+            input->start(parentExtractSize, parentExtract, false);
+            if (splitter)
+                splitter->ready();
+        }
+        else
+        {
+            if (traceStartStop)
+                CTXLOG("strandedLateStart activity stopping input early as prefiltered");
+            ForEachItemIn(idx, strands)
+                strands.item(idx).stop();
+        }
+    }
+
+public:
+
+    CRoxieServerStrandedLateStartActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
+        : CRoxieServerStrandedActivity(_ctx, _factory, _probeManager, _strandOptions)
+    {
+        prefiltered = false;
+        eof = false;
+    }
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        IFinalRoxieInput *save = input;
+        input = NULL;   // Make sure parent does not start the chain yet - but we do want to do the dependencies (because the decision about whether to start may depend on them)
+        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        input = save;
+    }
+
+    virtual void reset()
+    {
+        CRoxieServerStrandedActivity::reset();
+        prefiltered = false;
+    }
 
 };
 
@@ -13235,59 +13453,24 @@ IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsig
     return new CRoxieServerFilterProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
 }
 
-//=================================================================================
-
-class StrandOptions
-{
-    // Typically set from hints, common to many stranded activities
-public:
-    StrandOptions(IPropertyTree &_graphNode)
-    {
-        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
-        blockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
-        forcePreserveOrder = _graphNode.getPropInt("hint[@name='strandordered']/@value", 0);
-    }
-    StrandOptions(const StrandOptions &from, IRoxieSlaveContext *ctx)
-    {
-        numStrands = from.numStrands;
-        blockSize = from.blockSize;
-        forcePreserveOrder = from.forcePreserveOrder;
-
-        if (!blockSize)
-            blockSize = ctx->queryOptions().strandBlockSize;
-        // Could consider some similar option for numStrands...
-    }
-public:
-    unsigned numStrands = 1;
-    unsigned blockSize = 0;
-    bool forcePreserveOrder = false;
-};
+//=====================================================
 
-class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
+class CRoxieServerStrandedProjectActivity : public CRoxieServerStrandedActivity
 {
-    unsigned numProcessedLastGroup;
-    StrandOptions strandOptions;
-
-    class ProjectProcessor : public CInterfaceOf<IEngineRowStream>
+    class ProjectProcessor : public StrandProcessor
     {
-        CRoxieServerParallelProjectActivity &parent;
-        IEngineRowAllocator *rowAllocator;
-
-        // All these probably should go in a common base class StrandProcessor. Might even want that class to replace the corresponding fields in CRoxieServerActivity
-        IEngineRowStream *inputStream;
-        IHThorArg &basehelper;
-        bool timeActivities;
-    public:
-        ActivityTimeAccumulator totalCycles;
-        unsigned processed = 0;
+    protected:
         unsigned numProcessedLastGroup = 0;
+        IHThorProjectArg &helper;
 
     public:
-        explicit ProjectProcessor(CRoxieServerParallelProjectActivity &_parent, IEngineRowStream *_inputStream)
-          : parent(_parent), inputStream(_inputStream), basehelper(parent.basehelper)
+        ProjectProcessor(CRoxieServerActivity &_parent, IEngineRowStream *_inputStream, IHThorProjectArg &_helper)
+        : StrandProcessor(_parent, _inputStream), helper(_helper)
         {
-            timeActivities = parent.timeActivities;
-            rowAllocator = parent.ctx->getRowAllocatorEx(parent.meta.queryOriginal(), parent.activityId, roxiemem::RHFunique);
+        }
+        virtual void start()
+        {
+            numProcessedLastGroup = 0;
         }
         virtual const void * nextRow()
         {
@@ -13310,7 +13493,7 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
                 {
                     RtlDynamicRowBuilder rowBuilder(rowAllocator);
                     size32_t outSize;
-                    outSize = ((IHThorProjectArg &) basehelper).transform(rowBuilder, in);
+                    outSize = helper.transform(rowBuilder, in);
                     if (outSize)
                     {
                         processed++;
@@ -13323,90 +13506,17 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
                 }
             }
         }
-        virtual void stop()
-        {
-            parent.processed += processed;  // MORE - Should be atomic
-            // Also merge the cycles up somehow (and any other relevant stats)
-            inputStream->stop();
-        }
-        virtual void resetEOF()
-        {
-            inputStream->resetEOF();
-        }
     };
-    IArrayOf<ProjectProcessor> strands;
-    Owned<IStrandBranch> branch;
-    Owned<IStrandJunction> splitter;
 
 public:
-    CRoxieServerParallelProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
-        : CRoxieServerActivity(_ctx, _factory, _probeManager),
-          strandOptions(_strandOptions, ctx)
+    CRoxieServerStrandedProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
+        : CRoxieServerStrandedActivity(_ctx, _factory, _probeManager, _strandOptions)
     {
     }
 
-    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    virtual StrandProcessor *createStrandProcessor(IEngineRowStream *instream)
     {
-        ForEachItemIn(idx, strands)
-        {
-            strands.item(idx).numProcessedLastGroup = 0;
-        }
-        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
-        if (splitter)
-            splitter->ready();
-    }
-
-    virtual void reset()
-    {
-        if (splitter)
-            splitter->reset();
-        CRoxieServerActivity::reset();
-    }
-
-    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
-    {
-        assertex(idx == 0);
-        CRoxieServerActivity::connectDependencies(flags);
-        if (strandOptions.forcePreserveOrder)
-            flags |= SFpreserveOrder;
-        PointerArrayOf<IEngineRowStream> instreams;
-        Owned <IStrandJunction> recombiner = input->getOutputStreams(ctx, sourceIdx, instreams, true, flags);
-
-        // If my input was already split into streams, I just use those...
-        if (instreams.length() == 1)
-        {
-            assertex(recombiner == NULL);
-            // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
-            if (flags & SFpreserveOrder)
-            {
-                branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, false));
-                splitter.set(branch->queryInputJunction());
-                recombiner.set(branch->queryOutputJunction());
-            }
-            else
-            {
-                splitter.setown(createStrandJunction(ctx->queryRowManager(), 1, strandOptions.numStrands, strandOptions.blockSize, false));
-            }
-            splitter->setInput(0, instreams.item(0));
-            for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
-                strands.append(*new ProjectProcessor(*this, splitter->queryOutput(strandNo)));
-        }
-        else
-        {
-            // Ignore my hint and just use the width already split into...
-            ForEachItemIn(strandNo, instreams)
-                strands.append(*new ProjectProcessor(*this, instreams.item(strandNo)));
-        }
-        ForEachItemIn(i, strands)
-        {
-            streams.append(&strands.item(i));
-        }
-        return recombiner.getClear();
-    }
-
-    virtual const void * nextRow()
-    {
-        throwUnexpected();
+        return new ProjectProcessor(*this, instream, (IHThorProjectArg &) basehelper);
     }
 };
 
@@ -13488,8 +13598,9 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
     {
-        if (kind == TAKproject && strandOptions.numStrands > 1)  // Not supported on prefetch or count projects
-            return new CRoxieServerParallelProjectActivity(_ctx, this, _probeManager, strandOptions);
+        // NOTE - if they explicitly requested numStrands = 1 via  hint, we could use either of the below. For now use the stranded version to ensure it is tested.
+        if (kind == TAKproject)  // Not supported on prefetch or count projects
+            return new CRoxieServerStrandedProjectActivity(_ctx, this, _probeManager, strandOptions);
         else
             return new CRoxieServerProjectActivity(_ctx, this, _probeManager, count);
     }