Browse Source

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Support hint to force order preservation, and refactor a little ready for
supporting more activities.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
2431e7e463
1 changed files with 41 additions and 21 deletions
  1. 41 21
      roxie/ccd/ccdserver.cpp

+ 41 - 21
roxie/ccd/ccdserver.cpp

@@ -13237,11 +13237,36 @@ IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsig
 
 //=================================================================================
 
+class StrandOptions
+{
+    // Typically set from hints, common to many stranded activities
+public:
+    StrandOptions(IPropertyTree &_graphNode)
+    {
+        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
+        blockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
+        forcePreserveOrder = _graphNode.getPropInt("hint[@name='strandordered']/@value", 0);
+    }
+    StrandOptions(const StrandOptions &from, IRoxieSlaveContext *ctx)
+    {
+        numStrands = from.numStrands;
+        blockSize = from.blockSize;
+        forcePreserveOrder = from.forcePreserveOrder;
+
+        if (!blockSize)
+            blockSize = ctx->queryOptions().strandBlockSize;
+        // Could consider some similar option for numStrands...
+    }
+public:
+    unsigned numStrands = 1;
+    unsigned blockSize = 0;
+    bool forcePreserveOrder = false;
+};
+
 class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
 {
     unsigned numProcessedLastGroup;
-    unsigned numStrands;
-    unsigned blockSize;
+    StrandOptions strandOptions;
 
     class ProjectProcessor : public CInterfaceOf<IEngineRowStream>
     {
@@ -13258,7 +13283,7 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
         unsigned numProcessedLastGroup = 0;
 
     public:
-        ProjectProcessor(CRoxieServerParallelProjectActivity &_parent, IEngineRowStream *_inputStream)
+        explicit ProjectProcessor(CRoxieServerParallelProjectActivity &_parent, IEngineRowStream *_inputStream)
           : parent(_parent), inputStream(_inputStream), basehelper(parent.basehelper)
         {
             timeActivities = parent.timeActivities;
@@ -13314,13 +13339,10 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
     Owned<IStrandJunction> splitter;
 
 public:
-    CRoxieServerParallelProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numStrands, unsigned _blockSize)
+    CRoxieServerParallelProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
         : CRoxieServerActivity(_ctx, _factory, _probeManager),
-          numStrands(_numStrands), blockSize(_blockSize)
+          strandOptions(_strandOptions, ctx)
     {
-        if (!blockSize)
-            blockSize = ctx->queryOptions().strandBlockSize;
-
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -13345,7 +13367,8 @@ public:
     {
         assertex(idx == 0);
         CRoxieServerActivity::connectDependencies(flags);
-
+        if (strandOptions.forcePreserveOrder)
+            flags |= SFpreserveOrder;
         PointerArrayOf<IEngineRowStream> instreams;
         Owned <IStrandJunction> recombiner = input->getOutputStreams(ctx, sourceIdx, instreams, true, flags);
 
@@ -13354,18 +13377,18 @@ public:
         {
             assertex(recombiner == NULL);
             // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
-            if (flags & SFpreserveOrder || true)
+            if (flags & SFpreserveOrder)
             {
-                branch.setown(createStrandBranch(ctx->queryRowManager(), numStrands, blockSize, true, false));
+                branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, false));
                 splitter.set(branch->queryInputJunction());
                 recombiner.set(branch->queryOutputJunction());
             }
             else
             {
-                splitter.setown(createStrandJunction(ctx->queryRowManager(), 1, numStrands, blockSize, false));
+                splitter.setown(createStrandJunction(ctx->queryRowManager(), 1, strandOptions.numStrands, strandOptions.blockSize, false));
             }
             splitter->setInput(0, instreams.item(0));
-            for (unsigned strandNo = 0; strandNo < numStrands; strandNo++)
+            for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
                 strands.append(*new ProjectProcessor(*this, splitter->queryOutput(strandNo)));
         }
         else
@@ -13454,22 +13477,19 @@ class CRoxieServerProjectActivity : public CRoxieServerActivity
 class CRoxieServerProjectActivityFactory : public CRoxieServerActivityFactory
 {
 protected:
-    unsigned numStrands;
-    unsigned strandBlockSize;
+    StrandOptions strandOptions;
     bool count;
 public:
     CRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
-        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
+        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind), strandOptions(_graphNode)
     {
-        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
-        strandBlockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
-        count = (_kind==TAKcountproject || _kind==TAKprefetchcountproject);
+        count = (kind==TAKcountproject || kind==TAKprefetchcountproject);
     }
 
     virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
     {
-        if (kind == TAKproject && numStrands > 1)  // Not supported on prefetch or count projects
-            return new CRoxieServerParallelProjectActivity(_ctx, this, _probeManager, numStrands, strandBlockSize);
+        if (kind == TAKproject && strandOptions.numStrands > 1)  // Not supported on prefetch or count projects
+            return new CRoxieServerParallelProjectActivity(_ctx, this, _probeManager, strandOptions);
         else
             return new CRoxieServerProjectActivity(_ctx, this, _probeManager, count);
     }