Browse Source

HPCC-15062 Design and implement parallel child queries

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
ecb3b8b0a7
4 changed files with 95 additions and 30 deletions
  1. 1 1
      roxie/ccd/ccdactivities.cpp
  2. 1 1
      roxie/ccd/ccdquery.cpp
  3. 91 25
      roxie/ccd/ccdserver.cpp
  4. 2 3
      roxie/ccd/ccdserver.hpp

+ 1 - 1
roxie/ccd/ccdactivities.cpp

@@ -218,7 +218,7 @@ public:
             {
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
                     _probeManager = queryContext->queryProbeManager();
-                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
+                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx, 1); // MORE - the parent is wrong!
                 childGraphs.append(*childGraph);
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
                 childGraph->onCreate(colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.

+ 1 - 1
roxie/ccd/ccdquery.cpp

@@ -1280,7 +1280,7 @@ public:
         assertex(name && *name);
         ActivityArrayPtr *graph = graphMap.getValue(name);
         assertex(graph);
-        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx);
+        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx, 1);
         return ret.getClear();
     }
 

+ 91 - 25
roxie/ccd/ccdserver.cpp

@@ -570,18 +570,11 @@ public:
         CActivityFactory::addChildQuery(id, childQuery);
     }
 
-    virtual void createChildQueries(IRoxieSlaveContext * ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const
+    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel) const
     {
         ForEachItemIn(idx, childQueries)
         {
-            childGraphs.append(*createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), parentActivity, _probeManager, _logctx));
-        }
-    }
-
-    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const
-    {
-        ForEachItemIn(idx, childGraphs)
-        {
+            childGraphs.append(*createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), parentActivity, _probeManager, _logctx, numParallel));
             ctx->noteChildGraph(childQueryIndexes.item(idx), &childGraphs.item(idx));
             childGraphs.item(idx).onCreate(colocalArg);
         }
@@ -591,7 +584,7 @@ public:
     {
         unsigned match = childQueryIndexes.find(childId);
         assertex(match != NotFound);
-        Owned<IActivityGraph> graph = createActivityGraph(ctx, NULL, childQueryIndexes.item(match), childQueries.item(match), parentActivity, _probeManager, _logctx);
+        Owned<IActivityGraph> graph = createActivityGraph(ctx, NULL, childQueryIndexes.item(match), childQueries.item(match), parentActivity, _probeManager, _logctx, 1);
         graph->onCreate(colocalArg);
         return graph.getClear();
     }
@@ -940,7 +933,7 @@ protected:
     bool timeActivities;
     bool aborted;
     bool connected = false;
-
+    IProbeManager *probeManager = NULL;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -950,7 +943,8 @@ public:
           factory(_factory),
           basehelper(_factory->getHelper()),
           activityId(_factory->queryId()),
-          stats(_factory ? _factory->queryStatsMapping() : actStatistics)
+          stats(_factory ? _factory->queryStatsMapping() : actStatistics),
+          probeManager(_probeManager)
     {
         input = NULL;
         sourceIdx = 0;
@@ -958,8 +952,6 @@ public:
         meta.set(basehelper.queryOutputMeta());
         processed = 0;
         localCycles = 0;
-        if (factory)
-            factory->createChildQueries(ctx, childGraphs, this, _probeManager, *this);
         state=STATEreset;
         rowAllocator = NULL;
         debugging = _probeManager != NULL; // Don't want to collect timing stats from debug sessions
@@ -1162,7 +1154,7 @@ public:
 
     virtual bool needsAllocator() const { return false; }
 
-    virtual void onCreate(IHThorArg *_colocalParent)
+    void _onCreate(IHThorArg *_colocalParent, unsigned _numParallel)
     {
         colocalParent = _colocalParent;
         createPending = true;
@@ -1170,11 +1162,16 @@ public:
             createRowAllocator();
         processed = 0;
         if (factory)
-            factory->onCreateChildQueries(ctx, &basehelper, childGraphs);
+            factory->onCreateChildQueries(ctx, &basehelper, childGraphs, this, probeManager, *this, _numParallel);
         if (ctx)
             timeActivities = ctx->queryOptions().timeActivities;
     }
 
+    virtual void onCreate(IHThorArg *_colocalParent)
+    {
+        _onCreate(_colocalParent, 1);
+    }
+
     virtual void serializeCreateStartContext(MemoryBuffer &out)
     {
         //This should only be called after onStart has been called on the helper
@@ -1663,6 +1660,11 @@ public:
         active = 0;
     }
 
+    virtual void onCreate(IHThorArg *_colocalArg)
+    {
+        CRoxieServerActivity::_onCreate(_colocalArg, strands.ordinality());
+    }
+
     //This function is pure (But also implemented out of line) to force the derived classes to implement it.
     //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
     //this must also happen before any rows are read from the strands (e.g., by a source junction)
@@ -26735,6 +26737,64 @@ public:
     }
 };
 
+class CProxyActivityGraph : public CInterface, implements IActivityGraph, implements IThorChildGraph
+{
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CProxyActivityGraph(IRoxieSlaveContext *_ctx, const char *_graphName, unsigned _id, ActivityArray &_graphDefinition, const IRoxieContextLogger &_logctx, unsigned _numParallel)
+        : ctx(_ctx), graphName(_graphName), id(_id), graphDefinition(_graphDefinition), logctx(_logctx), numParallel(_numParallel)
+    {
+    }
+    virtual void abort() { throwUnexpected(); }
+    virtual void reset() { throwUnexpected(); }
+    virtual void execute() { throwUnexpected(); }
+    virtual void getProbeResponse(IPropertyTree *query) { throwUnexpected(); }
+    virtual void onCreate(IHThorArg *_colocalArg)
+    {
+        colocalArg = _colocalArg;
+    }
+    virtual void noteException(IException *E) { throwUnexpected(); }
+    virtual void checkAbort() { throwUnexpected(); }
+    virtual IThorChildGraph * queryChildGraph() { return this; }
+    virtual IEclGraphResults * queryLocalGraph() { throwUnexpected(); }
+    virtual IRoxieServerChildGraph * queryLoopGraph() { throwUnexpected(); }
+    virtual IRoxieServerChildGraph * createGraphLoopInstance(IRoxieSlaveContext *ctx, unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) { throwUnexpected(); }
+    virtual const char *queryName() const { throwUnexpected(); }
+
+    virtual IEclGraphResults * evaluate(unsigned parentExtractSize, const byte * parentExtract)
+    {
+        Owned<CActivityGraph> realGraph;
+        {
+            CriticalBlock b(graphCrit);
+            if (stack.length())
+                realGraph.setown(&stack.popGet());
+        }
+        if (!realGraph)
+        {
+            realGraph.setown(new CActivityGraph(ctx, graphName, id, graphDefinition, NULL, logctx));
+            realGraph->createGraph(ctx);
+            realGraph->onCreate(colocalArg);
+        }
+        Owned<IEclGraphResults> results = realGraph->evaluate(parentExtractSize, parentExtract);
+        {
+            CriticalBlock b(graphCrit);
+            stack.append(*realGraph.getClear());
+        }
+        return results.getClear();
+    }
+
+protected:
+    IRoxieSlaveContext *ctx;
+    StringAttr graphName;
+    unsigned id;
+    ActivityArray &graphDefinition;
+    const IRoxieContextLogger &logctx;
+    unsigned numParallel;
+    IHThorArg *colocalArg = nullptr;
+    CriticalSection graphCrit;
+    CIArrayOf<CActivityGraph> stack;
+};
 
 class CIterationActivityGraph : public CActivityGraph
 {
@@ -26939,22 +26999,28 @@ public:
 
 
 
-IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *_graphName, unsigned id, ActivityArray &childFactories, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx)
+IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *_graphName, unsigned id, ActivityArray &childFactories, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel)
 {
     if (childFactories.isDelayed())
     {
+        assertex(numParallel==1);
         return new CDelayedActivityGraph(_graphName, id, childFactories, _probeManager);
     }
     else
     {
-        Owned<IProbeManager> childProbe;
-        if (_probeManager)
-            childProbe.setown(_probeManager->startChildGraph(id, parentActivity));
-        Owned<CActivityGraph> ret = new CActivityGraph(ctx, _graphName, id, childFactories, childProbe, _logctx);
-        ret->createGraph(ctx);
-        if (_probeManager)
-            _probeManager->endChildGraph(childProbe, parentActivity);
-        return ret.getClear();
+        if (numParallel==1 || _probeManager != nullptr)
+        {
+            Owned<IProbeManager> childProbe;
+            if (_probeManager)
+                childProbe.setown(_probeManager->startChildGraph(id, parentActivity));
+            Owned<CActivityGraph> ret = new CActivityGraph(ctx, _graphName, id, childFactories, childProbe, _logctx);
+            ret->createGraph(ctx);
+            if (_probeManager)
+                _probeManager->endChildGraph(childProbe, parentActivity);
+            return ret.getClear();
+        }
+        else
+            return new CProxyActivityGraph(ctx, _graphName, id, childFactories, _logctx, numParallel);
     }
 }
 

+ 2 - 3
roxie/ccd/ccdserver.hpp

@@ -203,8 +203,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual IRoxieServerActivity *createFunction(IHThorArg &helper, IProbeManager *_probeManager) const = 0;
     virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, cycle_t localCycles) const = 0;
-    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const = 0;
-    virtual void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
+    virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx, unsigned numParallel) const = 0;
     virtual void noteStarted() const = 0;
     virtual void noteStarted(unsigned idx) const = 0;
     virtual void noteDependent(unsigned target) = 0;
@@ -261,7 +260,7 @@ interface IRoxieServerChildGraph : public IInterface
 
 interface IQueryFactory;
 
-extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
+extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx, unsigned numParallel);
 
 extern ruid_t getNextRuid();
 extern void setStartRuid(unsigned restarts);