Browse Source

Merge pull request #8323 from richardkchapman/parallel-child-mk2

HPCC-15062 Design and implement parallel child queries

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
98bd1c41c7
4 changed files with 95 additions and 30 deletions
  1. 1 1
      roxie/ccd/ccdactivities.cpp
  2. 1 1
      roxie/ccd/ccdquery.cpp
  3. 91 25
      roxie/ccd/ccdserver.cpp
  4. 2 3
      roxie/ccd/ccdserver.hpp

+ 1 - 1
roxie/ccd/ccdactivities.cpp

@@ -218,7 +218,7 @@ public:
             {
             {
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
                     _probeManager = queryContext->queryProbeManager();
                     _probeManager = queryContext->queryProbeManager();
-                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
+                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx, 1); // MORE - the parent is wrong!
                 childGraphs.append(*childGraph);
                 childGraphs.append(*childGraph);
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
                 childGraph->onCreate(colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
                 childGraph->onCreate(colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.

+ 1 - 1
roxie/ccd/ccdquery.cpp

@@ -1280,7 +1280,7 @@ public:
         assertex(name && *name);
         assertex(name && *name);
         ActivityArrayPtr *graph = graphMap.getValue(name);
         ActivityArrayPtr *graph = graphMap.getValue(name);
         assertex(graph);
         assertex(graph);
-        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx);
+        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx, 1);
         return ret.getClear();
         return ret.getClear();
     }
     }
 
 

+ 91 - 25
roxie/ccd/ccdserver.cpp

@@ -571,18 +571,11 @@ public:
         CActivityFactory::addChildQuery(id, childQuery);
         CActivityFactory::addChildQuery(id, childQuery);
     }
     }
 
 
-    virtual void createChildQueries(IRoxieSlaveContext * ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const
+    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel) const
     {
     {
         ForEachItemIn(idx, childQueries)
         ForEachItemIn(idx, childQueries)
         {
         {
-            childGraphs.append(*createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), parentActivity, _probeManager, _logctx));
-        }
-    }
-
-    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const
-    {
-        ForEachItemIn(idx, childGraphs)
-        {
+            childGraphs.append(*createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), parentActivity, _probeManager, _logctx, numParallel));
             ctx->noteChildGraph(childQueryIndexes.item(idx), &childGraphs.item(idx));
             ctx->noteChildGraph(childQueryIndexes.item(idx), &childGraphs.item(idx));
             childGraphs.item(idx).onCreate(colocalArg);
             childGraphs.item(idx).onCreate(colocalArg);
         }
         }
@@ -592,7 +585,7 @@ public:
     {
     {
         unsigned match = childQueryIndexes.find(childId);
         unsigned match = childQueryIndexes.find(childId);
         assertex(match != NotFound);
         assertex(match != NotFound);
-        Owned<IActivityGraph> graph = createActivityGraph(ctx, NULL, childQueryIndexes.item(match), childQueries.item(match), parentActivity, _probeManager, _logctx);
+        Owned<IActivityGraph> graph = createActivityGraph(ctx, NULL, childQueryIndexes.item(match), childQueries.item(match), parentActivity, _probeManager, _logctx, 1);
         graph->onCreate(colocalArg);
         graph->onCreate(colocalArg);
         return graph.getClear();
         return graph.getClear();
     }
     }
@@ -941,7 +934,7 @@ protected:
     bool timeActivities;
     bool timeActivities;
     bool aborted;
     bool aborted;
     bool connected = false;
     bool connected = false;
-
+    IProbeManager *probeManager = NULL;
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
@@ -951,7 +944,8 @@ public:
           factory(_factory),
           factory(_factory),
           basehelper(_factory->getHelper()),
           basehelper(_factory->getHelper()),
           activityId(_factory->queryId()),
           activityId(_factory->queryId()),
-          stats(_factory ? _factory->queryStatsMapping() : actStatistics)
+          stats(_factory ? _factory->queryStatsMapping() : actStatistics),
+          probeManager(_probeManager)
     {
     {
         input = NULL;
         input = NULL;
         sourceIdx = 0;
         sourceIdx = 0;
@@ -959,8 +953,6 @@ public:
         meta.set(basehelper.queryOutputMeta());
         meta.set(basehelper.queryOutputMeta());
         processed = 0;
         processed = 0;
         localCycles = 0;
         localCycles = 0;
-        if (factory)
-            factory->createChildQueries(ctx, childGraphs, this, _probeManager, *this);
         state=STATEreset;
         state=STATEreset;
         rowAllocator = NULL;
         rowAllocator = NULL;
         debugging = _probeManager != NULL; // Don't want to collect timing stats from debug sessions
         debugging = _probeManager != NULL; // Don't want to collect timing stats from debug sessions
@@ -1163,7 +1155,7 @@ public:
 
 
     virtual bool needsAllocator() const { return false; }
     virtual bool needsAllocator() const { return false; }
 
 
-    virtual void onCreate(IHThorArg *_colocalParent)
+    void _onCreate(IHThorArg *_colocalParent, unsigned _numParallel)
     {
     {
         colocalParent = _colocalParent;
         colocalParent = _colocalParent;
         createPending = true;
         createPending = true;
@@ -1171,11 +1163,16 @@ public:
             createRowAllocator();
             createRowAllocator();
         processed = 0;
         processed = 0;
         if (factory)
         if (factory)
-            factory->onCreateChildQueries(ctx, &basehelper, childGraphs);
+            factory->onCreateChildQueries(ctx, &basehelper, childGraphs, this, probeManager, *this, _numParallel);
         if (ctx)
         if (ctx)
             timeActivities = ctx->queryOptions().timeActivities;
             timeActivities = ctx->queryOptions().timeActivities;
     }
     }
 
 
+    virtual void onCreate(IHThorArg *_colocalParent)
+    {
+        _onCreate(_colocalParent, 1);
+    }
+
     virtual void serializeCreateStartContext(MemoryBuffer &out)
     virtual void serializeCreateStartContext(MemoryBuffer &out)
     {
     {
         //This should only be called after onStart has been called on the helper
         //This should only be called after onStart has been called on the helper
@@ -1664,6 +1661,11 @@ public:
         active = 0;
         active = 0;
     }
     }
 
 
+    virtual void onCreate(IHThorArg *_colocalArg)
+    {
+        CRoxieServerActivity::_onCreate(_colocalArg, strands.ordinality());
+    }
+
     //This function is pure (But also implemented out of line) to force the derived classes to implement it.
     //This function is pure (But also implemented out of line) to force the derived classes to implement it.
     //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
     //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
     //this must also happen before any rows are read from the strands (e.g., by a source junction)
     //this must also happen before any rows are read from the strands (e.g., by a source junction)
@@ -26736,6 +26738,64 @@ public:
     }
     }
 };
 };
 
 
+class CProxyActivityGraph : public CInterface, implements IActivityGraph, implements IThorChildGraph
+{
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CProxyActivityGraph(IRoxieSlaveContext *_ctx, const char *_graphName, unsigned _id, ActivityArray &_graphDefinition, const IRoxieContextLogger &_logctx, unsigned _numParallel)
+        : ctx(_ctx), graphName(_graphName), id(_id), graphDefinition(_graphDefinition), logctx(_logctx), numParallel(_numParallel)
+    {
+    }
+    virtual void abort() { throwUnexpected(); }
+    virtual void reset() { throwUnexpected(); }
+    virtual void execute() { throwUnexpected(); }
+    virtual void getProbeResponse(IPropertyTree *query) { throwUnexpected(); }
+    virtual void onCreate(IHThorArg *_colocalArg)
+    {
+        colocalArg = _colocalArg;
+    }
+    virtual void noteException(IException *E) { throwUnexpected(); }
+    virtual void checkAbort() { throwUnexpected(); }
+    virtual IThorChildGraph * queryChildGraph() { return this; }
+    virtual IEclGraphResults * queryLocalGraph() { throwUnexpected(); }
+    virtual IRoxieServerChildGraph * queryLoopGraph() { throwUnexpected(); }
+    virtual IRoxieServerChildGraph * createGraphLoopInstance(IRoxieSlaveContext *ctx, unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) { throwUnexpected(); }
+    virtual const char *queryName() const { throwUnexpected(); }
+
+    virtual IEclGraphResults * evaluate(unsigned parentExtractSize, const byte * parentExtract)
+    {
+        Owned<CActivityGraph> realGraph;
+        {
+            CriticalBlock b(graphCrit);
+            if (stack.length())
+                realGraph.setown(&stack.popGet());
+        }
+        if (!realGraph)
+        {
+            realGraph.setown(new CActivityGraph(ctx, graphName, id, graphDefinition, NULL, logctx));
+            realGraph->createGraph(ctx);
+            realGraph->onCreate(colocalArg);
+        }
+        Owned<IEclGraphResults> results = realGraph->evaluate(parentExtractSize, parentExtract);
+        {
+            CriticalBlock b(graphCrit);
+            stack.append(*realGraph.getClear());
+        }
+        return results.getClear();
+    }
+
+protected:
+    IRoxieSlaveContext *ctx;
+    StringAttr graphName;
+    unsigned id;
+    ActivityArray &graphDefinition;
+    const IRoxieContextLogger &logctx;
+    unsigned numParallel;
+    IHThorArg *colocalArg = nullptr;
+    CriticalSection graphCrit;
+    CIArrayOf<CActivityGraph> stack;
+};
 
 
 class CIterationActivityGraph : public CActivityGraph
 class CIterationActivityGraph : public CActivityGraph
 {
 {
@@ -26940,22 +27000,28 @@ public:
 
 
 
 
 
 
-IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *_graphName, unsigned id, ActivityArray &childFactories, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx)
+IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *_graphName, unsigned id, ActivityArray &childFactories, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel)
 {
 {
     if (childFactories.isDelayed())
     if (childFactories.isDelayed())
     {
     {
+        assertex(numParallel==1);
         return new CDelayedActivityGraph(_graphName, id, childFactories, _probeManager);
         return new CDelayedActivityGraph(_graphName, id, childFactories, _probeManager);
     }
     }
     else
     else
     {
     {
-        Owned<IProbeManager> childProbe;
-        if (_probeManager)
-            childProbe.setown(_probeManager->startChildGraph(id, parentActivity));
-        Owned<CActivityGraph> ret = new CActivityGraph(ctx, _graphName, id, childFactories, childProbe, _logctx);
-        ret->createGraph(ctx);
-        if (_probeManager)
-            _probeManager->endChildGraph(childProbe, parentActivity);
-        return ret.getClear();
+        if (numParallel==1 || _probeManager != nullptr)
+        {
+            Owned<IProbeManager> childProbe;
+            if (_probeManager)
+                childProbe.setown(_probeManager->startChildGraph(id, parentActivity));
+            Owned<CActivityGraph> ret = new CActivityGraph(ctx, _graphName, id, childFactories, childProbe, _logctx);
+            ret->createGraph(ctx);
+            if (_probeManager)
+                _probeManager->endChildGraph(childProbe, parentActivity);
+            return ret.getClear();
+        }
+        else
+            return new CProxyActivityGraph(ctx, _graphName, id, childFactories, _logctx, numParallel);
     }
     }
 }
 }
 
 

+ 2 - 3
roxie/ccd/ccdserver.hpp

@@ -203,8 +203,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual IRoxieServerActivity *createFunction(IHThorArg &helper, IProbeManager *_probeManager) const = 0;
     virtual IRoxieServerActivity *createFunction(IHThorArg &helper, IProbeManager *_probeManager) const = 0;
     virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
     virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, cycle_t localCycles) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, cycle_t localCycles) const = 0;
-    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const = 0;
-    virtual void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
+    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel) const = 0;
     virtual void noteStarted() const = 0;
     virtual void noteStarted() const = 0;
     virtual void noteStarted(unsigned idx) const = 0;
     virtual void noteStarted(unsigned idx) const = 0;
     virtual void noteDependent(unsigned target) = 0;
     virtual void noteDependent(unsigned target) = 0;
@@ -261,7 +260,7 @@ interface IRoxieServerChildGraph : public IInterface
 
 
 interface IQueryFactory;
 interface IQueryFactory;
 
 
-extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
+extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx, unsigned numParallel);
 
 
 extern ruid_t getNextRuid();
 extern ruid_t getNextRuid();
 extern void setStartRuid(unsigned restarts);
 extern void setStartRuid(unsigned restarts);