Просмотр исходного кода

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Add code to support parallel project.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 лет назад
Родитель
Сommit
7a117c3318

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -432,6 +432,7 @@ extern unsigned defaultFetchPreload;
 extern unsigned defaultFullKeyedJoinPreload;
 extern unsigned defaultKeyedJoinPreload;
 extern unsigned defaultPrefetchProjectPreload;
+extern unsigned defaultStrandBlockSize;
 extern bool defaultCheckingHeap;
 
 extern unsigned slaveQueryReleaseDelaySeconds;

+ 1 - 1
roxie/ccd/ccddebug.cpp

@@ -541,7 +541,7 @@ class DebugProbe : public InputProbe, implements IActivityDebugContext
 
 public:
     DebugProbe(IInputBase *_in,  unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
-        : InputProbe(static_cast<IFinalRoxieInput*>(_in), debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
+        : InputProbe(static_cast<IFinalRoxieInput*>(_in), _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
           sourceAct(_sourceAct), targetAct(_targetAct)
     {
         historyCapacity = debugContext->getDefaultHistoryCapacity();

+ 1 - 0
roxie/ccd/ccdmain.cpp

@@ -132,6 +132,7 @@ unsigned defaultFullKeyedJoinPreload = 0;
 unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
+unsigned defaultStrandBlockSize = 512;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
 unsigned coresPerQuery = 0;

+ 6 - 2
roxie/ccd/ccdquery.cpp

@@ -289,6 +289,7 @@ QueryOptions::QueryOptions()
     fetchPreload = defaultFetchPreload;
     prefetchProjectPreload = defaultPrefetchProjectPreload;
     bindCores = coresPerQuery;
+    strandBlockSize = defaultStrandBlockSize;
 
     checkingHeap = defaultCheckingHeap;
     disableLocalOptimizations = false;  // No global default for this
@@ -316,6 +317,7 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     fetchPreload = other.fetchPreload;
     prefetchProjectPreload = other.prefetchProjectPreload;
     bindCores = other.bindCores;
+    strandBlockSize = other.strandBlockSize;
 
     checkingHeap = other.checkingHeap;
     disableLocalOptimizations = other.disableLocalOptimizations;
@@ -353,6 +355,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
     updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
     updateFromWorkUnit(bindCores, wu, "bindCores");
+    updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
 
     updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
     updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
@@ -400,6 +403,7 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
         updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
         updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
+        updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
 
         updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
         // Note: disableLocalOptimizations is not permitted at context level (too late)
@@ -536,7 +540,7 @@ protected:
             return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKproject:
         case TAKcountproject:
-            return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind); // code is common between Project, CountProject
+            return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
         case TAKfilterproject:
             return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKdatasetresult:
@@ -846,7 +850,7 @@ protected:
         case TAKnonempty:
             return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKprefetchproject:
-            return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKwhen_dataset:
             return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKwhen_action:

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -109,6 +109,7 @@ public:
     int fetchPreload;
     int prefetchProjectPreload;
     int bindCores;
+    unsigned strandBlockSize;
 
     bool checkingHeap;
     bool disableLocalOptimizations;

+ 182 - 20
roxie/ccd/ccdserver.cpp

@@ -848,8 +848,6 @@ private:
     IRoxieServerActivityCopyArray & activities;
 };
 
-#define JUNCTION_BLOCK_SIZE 512 // Make configurable at some point
-
 extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, unsigned flags)
 {
     if (input)
@@ -860,7 +858,11 @@ extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxi
         {
             assertex(instreams.length());
             if (!junction)
-                junction.setown(createStrandJunction(ctx->queryRowManager(), instreams.length(), 1, JUNCTION_BLOCK_SIZE, false));
+                junction.setown(createStrandJunction(ctx->queryRowManager(), instreams.length(), 1, ctx->queryOptions().strandBlockSize, false));
+            ForEachItemIn(stream, instreams)
+            {
+                junction->setInput(stream, instreams.item(stream));
+            }
             return junction->queryOutput(0);
         }
         else
@@ -1173,6 +1175,8 @@ public:
         basehelper.onStart(parentExtract, NULL);
         if (factory)
             factory->noteStarted();
+        if (junction)
+            junction->ready();
     }
 
     void executeDependencies(unsigned parentExtractSize, const byte *parentExtract, unsigned controlId)
@@ -1327,8 +1331,13 @@ public:
 
     virtual void connectOutputStreams(unsigned flags)
     {
-        if (input)  // && !inputStream ?
+        if (input && !inputStream)
             inputStream = connectSingleStream(ctx, input, sourceIdx, junction, flags);
+        connectDependencies(flags);
+    }
+
+    void connectDependencies(unsigned flags)
+    {
         ForEachItemIn(i, dependencies)
         {
             dependencies.item(i).connectOutputStreams(flags);
@@ -7533,7 +7542,7 @@ public:
         }
         else
             sortFlags = TAFstable|TAFconstant;
-        bool forceSpill = _queryFactory.queryOptions().allSortsMaySpill || _graphNode.getPropBool("hint[@name='spill']/@value", false);;
+        bool forceSpill = _queryFactory.queryOptions().allSortsMaySpill || _graphNode.getPropBool("hint[@name='spill']/@value", false);
         if (forceSpill)
             sortFlags |= TAFspill;
         if (!(sortFlags & TAFunstable))
@@ -13221,25 +13230,171 @@ IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsig
 }
 
 //=================================================================================
+
+class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
+{
+    unsigned numProcessedLastGroup;
+    unsigned numStrands;
+    unsigned blockSize;
+
+    class ProjectProcessor : public CInterfaceOf<IEngineRowStream>
+    {
+        CRoxieServerParallelProjectActivity &parent;
+
+        // All these probably should go in a common base class StrandProcessor. Might even want that class to replace the corresponding fields in CRoxieServerActivity
+        IEngineRowStream *inputStream;
+        IHThorArg &basehelper;
+        bool timeActivities;
+    public:
+        ActivityTimeAccumulator totalCycles;
+        unsigned processed = 0;
+        unsigned numProcessedLastGroup = 0;
+
+    public:
+        ProjectProcessor(CRoxieServerParallelProjectActivity &_parent, IEngineRowStream *_inputStream)
+          : parent(_parent), inputStream(_inputStream), basehelper(parent.basehelper)
+        {
+            timeActivities = parent.timeActivities;
+        }
+        virtual const void * nextRow()
+        {
+            ActivityTimer t(totalCycles, timeActivities);
+            loop
+            {
+                OwnedConstRoxieRow in = inputStream->nextRow();
+                if (!in)
+                {
+                    if (numProcessedLastGroup == processed)
+                        in.setown(inputStream->nextRow());
+                    if (!in)
+                    {
+                        numProcessedLastGroup = processed;
+                        return NULL;
+                    }
+                }
+
+                try
+                {
+                    RtlDynamicRowBuilder rowBuilder(parent.rowAllocator);
+                    size32_t outSize;
+                    outSize = ((IHThorProjectArg &) basehelper).transform(rowBuilder, in);
+                    if (outSize)
+                    {
+                        processed++;
+                        return rowBuilder.finalizeRowClear(outSize);
+                    }
+                }
+                catch (IException *E)
+                {
+                    throw parent.makeWrappedException(E);
+                }
+            }
+        }
+        virtual void stop()
+        {
+            parent.processed += processed;  // MORE - Should be atomic
+            // Also merge the cycles up somehow (and any other relevant stats)
+            inputStream->stop();
+        }
+        virtual void resetEOF()
+        {
+            inputStream->resetEOF();
+        }
+    };
+    IArrayOf<ProjectProcessor> strands;
+    Owned<IStrandBranch> branch;
+    Owned<IStrandJunction> splitter;
+
+public:
+    CRoxieServerParallelProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numStrands, unsigned _blockSize)
+        : CRoxieServerActivity(_ctx, _factory, _probeManager),
+          numStrands(_numStrands), blockSize(_blockSize)
+    {
+        if (!blockSize)
+            blockSize = ctx->queryOptions().strandBlockSize;
+    }
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        ForEachItemIn(idx, strands)
+        {
+            strands.item(idx).numProcessedLastGroup = 0;
+        }
+        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        if (splitter)
+            splitter->ready();
+    }
+
+    virtual void reset()
+    {
+        if (splitter)
+            splitter->reset();
+        CRoxieServerActivity::reset();
+    }
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
+    {
+        assertex(idx == 0);
+        CRoxieServerActivity::connectDependencies(flags);
+
+        PointerArrayOf<IEngineRowStream> instreams;
+        Owned <IStrandJunction> recombiner = input->getOutputStreams(ctx, sourceIdx, instreams, true, flags);
+
+        // If my input was already split into streams, I just use those...
+        if (instreams.length() == 1)
+        {
+            assertex(recombiner == NULL);
+            // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
+            if (flags & SFpreserveOrder || true)
+            {
+                branch.setown(createStrandBranch(ctx->queryRowManager(), numStrands, blockSize, true, false));
+                splitter.set(branch->queryInputJunction());
+                recombiner.set(branch->queryOutputJunction());
+            }
+            else
+            {
+                splitter.setown(createStrandJunction(ctx->queryRowManager(), 1, numStrands, blockSize, false));
+            }
+            splitter->setInput(0, instreams.item(0));
+            for (unsigned strandNo = 0; strandNo < numStrands; strandNo++)
+                strands.append(*new ProjectProcessor(*this, splitter->queryOutput(strandNo)));
+        }
+        else
+        {
+            // Ignore my hint and just use the width already split into...
+            ForEachItemIn(strandNo, instreams)
+                strands.append(*new ProjectProcessor(*this, instreams.item(strandNo)));
+        }
+        ForEachItemIn(i, strands)
+        {
+            streams.append(&strands.item(i));
+        }
+        return recombiner.getClear();
+    }
+
+    virtual bool needsAllocator() const { return true; }
+
+    virtual const void * nextRow()
+    {
+        throwUnexpected();
+    }
+};
+
 class CRoxieServerProjectActivity : public CRoxieServerActivity
 {
     unsigned numProcessedLastGroup;
-    bool count;
     unsigned __int64 recordCount;
+    bool count;
 
-public:
+ public:
     CRoxieServerProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _count)
         : CRoxieServerActivity(_ctx, _factory, _probeManager),
-        count(_count)
+          count(_count)
     {
         numProcessedLastGroup = 0;
         recordCount = 0;
     }
 
-    ~CRoxieServerProjectActivity()
-    {
-    }
-
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
         numProcessedLastGroup = 0;
@@ -13292,23 +13447,30 @@ public:
 class CRoxieServerProjectActivityFactory : public CRoxieServerActivityFactory
 {
 protected:
+    unsigned numStrands;
+    unsigned strandBlockSize;
     bool count;
 public:
-    CRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+    CRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
         : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
     {
+        numStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
+        strandBlockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
         count = (_kind==TAKcountproject || _kind==TAKprefetchcountproject);
     }
 
     virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
     {
-        return new CRoxieServerProjectActivity(_ctx, this, _probeManager, count);
+        if (kind == TAKproject && numStrands > 1)  // Not supported on prefetch or count projects
+            return new CRoxieServerParallelProjectActivity(_ctx, this, _probeManager, numStrands, strandBlockSize);
+        else
+            return new CRoxieServerProjectActivity(_ctx, this, _probeManager, count);
     }
 };
 
-IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
 {
-    return new CRoxieServerProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+    return new CRoxieServerProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
 }
 
 //=================================================================================
@@ -13534,8 +13696,8 @@ public:
 class CRoxieServerPrefetchProjectActivityFactory : public CRoxieServerProjectActivityFactory 
 {
 public:
-    CRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
-        : CRoxieServerProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
+    CRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
+        : CRoxieServerProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode)
     {
     }
 
@@ -13545,9 +13707,9 @@ public:
     }
 };
 
-extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
 {
-    return new CRoxieServerPrefetchProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+    return new CRoxieServerPrefetchProjectActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
 }
 
 //=================================================================================

+ 2 - 2
roxie/ccd/ccdserver.hpp

@@ -377,7 +377,7 @@ extern IRoxieServerActivityFactory *createRoxieServerRegroupActivityFactory(unsi
 extern IRoxieServerActivityFactory *createRoxieServerCombineActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerCombineGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerRollupGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
 extern IRoxieServerActivityFactory *createRoxieServerGraphLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
@@ -429,7 +429,7 @@ extern IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(uns
 extern IRoxieServerActivityFactory *createRoxieServerIfActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerSequentialActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerParallelActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerStreamedIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);