Explorar o código

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Refactoring onCreate vs constructor calls to ensure ctx is available sooner,
so that we can pass RowManager to the junction creation code.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=9) %!d(string=hai) anos
pai
achega
70840f6afc

+ 6 - 6
roxie/ccd/ccdactivities.cpp

@@ -210,7 +210,7 @@ public:
         if (datafile)
             addXrefFileInfo(reply, datafile);
     }
-    void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
+    void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
     {
         if (childQueries.length())
         {
@@ -218,10 +218,10 @@ public:
             {
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
                     _probeManager = queryContext->queryProbeManager();
-                IActivityGraph *childGraph = createActivityGraph(NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
+                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
                 childGraphs.append(*childGraph);
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
-                childGraph->onCreate(queryContext, colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
+                childGraph->onCreate(colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
             }
         }
     }
@@ -335,9 +335,9 @@ protected:
         // MORE - need to consider debugging....
         if (probeAllRows)
             probeManager.setown(createProbeManager());
-        basefactory->createChildQueries(childGraphs, basehelper, probeManager, queryContext, logctx);
+        basefactory->createChildQueries(queryContext, childGraphs, basehelper, probeManager, queryContext, logctx);
 #else
-        basefactory->createChildQueries(childGraphs, basehelper, NULL, queryContext, logctx);
+        basefactory->createChildQueries(queryContext, childGraphs, basehelper, NULL, queryContext, logctx);
 #endif
         if (meta.needsSerializeDisk())
             serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
@@ -5166,7 +5166,7 @@ public:
             remoteGraph->beforeExecute();
             Owned<IFinalRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
             Owned <IStrandJunction> junction;
-            IEngineRowStream *stream = connectSingleStream(input, 0, junction, 0);
+            IEngineRowStream *stream = connectSingleStream(queryContext, input, 0, junction, 0);
 
             while (!aborted)
             {

+ 4 - 4
roxie/ccd/ccdcontext.cpp

@@ -1310,13 +1310,13 @@ public:
     {
         if (extra.embedded)
         {
-            return factory->lookupGraph(extra.embeddedGraphName, probeManager, *this, parentActivity);
+            return factory->lookupGraph(this, extra.embeddedGraphName, probeManager, *this, parentActivity);
         }
         else
         {
             Owned<IQueryFactory> libraryQuery = factory->lookupLibrary(extra.libraryName, extra.interfaceHash, *this);
             assertex(libraryQuery);
-            return libraryQuery->lookupGraph("graph1", probeManager, *this, parentActivity);
+            return libraryQuery->lookupGraph(this, "graph1", probeManager, *this, parentActivity);
         }
     }
 
@@ -1330,8 +1330,8 @@ public:
         }
         else if (probeAllRows || probeQuery != NULL)
             probeManager.setown(createProbeManager());
-        graph.setown(factory->lookupGraph(graphName, probeManager, *this, NULL));
-        graph->onCreate(this, NULL);  // MORE - is that right
+        graph.setown(factory->lookupGraph(this, graphName, probeManager, *this, NULL));
+        graph->onCreate(NULL);  // MORE - is that right
         if (debugContext)
             debugContext->checkBreakpoint(DebugStateGraphStart, NULL, graphName);
         if (workUnit)

+ 2 - 2
roxie/ccd/ccddebug.cpp

@@ -77,11 +77,11 @@ public:
         return in->gatherConjunctions(collector);
     }
 
-    virtual IStrandJunction *getOutputStreams(unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
     {
         assertex (!idx);
         PointerArrayOf<IEngineRowStream> instreams;
-        Owned<IStrandJunction> junction = in->getOutputStreams(sourceIdx, instreams, false, flags | SFforceSingle);
+        Owned<IStrandJunction> junction = in->getOutputStreams(ctx, sourceIdx, instreams, false, flags | SFforceSingle);
         // We forced to single, so should not be getting anything but a single stream back
         assertex(junction==NULL);
         assertex(instreams.length()==1);

+ 2 - 2
roxie/ccd/ccdquery.cpp

@@ -1266,12 +1266,12 @@ public:
         return *graphMap.getValue(name);
     }
 
-    virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
+    virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
     {
         assertex(name && *name);
         ActivityArrayPtr *graph = graphMap.getValue(name);
         assertex(graph);
-        Owned<IActivityGraph> ret = ::createActivityGraph(name, 0, **graph, parentActivity, probeManager, logctx);
+        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx);
         return ret.getClear();
     }
 

+ 3 - 3
roxie/ccd/ccdquery.hpp

@@ -58,13 +58,13 @@ interface IActivityGraph : extends IInterface
     virtual void reset() = 0;
     virtual void execute() = 0;
     virtual void getProbeResponse(IPropertyTree *query) = 0;
-    virtual void onCreate(IRoxieSlaveContext *ctx, IHThorArg *colocalArg) = 0;
+    virtual void onCreate(IHThorArg *colocalArg) = 0;
     virtual void noteException(IException *E) = 0;
     virtual void checkAbort() = 0;
     virtual IThorChildGraph * queryChildGraph() = 0;
     virtual IEclGraphResults * queryLocalGraph() = 0;
     virtual IRoxieServerChildGraph * queryLoopGraph() = 0;
-    virtual IRoxieServerChildGraph * createGraphLoopInstance(unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) = 0;
+    virtual IRoxieServerChildGraph * createGraphLoopInstance(IRoxieSlaveContext *ctx, unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) = 0;
     virtual const char *queryName() const = 0;
 };
 
@@ -134,7 +134,7 @@ private:
 interface IQueryFactory : extends IInterface
 {
     virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const = 0;
-    virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
+    virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
     virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const = 0;
     virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const = 0;
     virtual hash64_t queryHash() const = 0;

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 786 - 801
roxie/ccd/ccdserver.cpp


+ 7 - 7
roxie/ccd/ccdserver.hpp

@@ -109,10 +109,10 @@ interface IFinalRoxieInput : extends IInputBase
     virtual IRoxieServerActivity *queryActivity() = 0;
     virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
 
-    virtual IStrandJunction *getOutputStreams(unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
 };
 
-extern IEngineRowStream *connectSingleStream(IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, unsigned flags);
+extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, unsigned flags);
 
 interface ISteppedConjunctionCollector;
 
@@ -148,9 +148,9 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual IFinalRoxieInput *queryOutput(unsigned idx) = 0;
     virtual IFinalRoxieInput *queryInput(unsigned idx) const = 0;
     virtual void execute(unsigned parentExtractSize, const byte *parentExtract) = 0;
-    virtual void onCreate(IRoxieSlaveContext *ctx, IHThorArg *colocalArg) = 0;
+    virtual void onCreate(IHThorArg *colocalArg) = 0;
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
-    virtual IStrandJunction *getOutputStreams(unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
 
     virtual void stop() = 0;
     virtual void abort() = 0;
@@ -189,7 +189,7 @@ interface IRoxieServerActivity : extends IActivityBase
 
 interface IRoxieServerActivityFactory : extends IActivityFactory
 {
-    virtual IRoxieServerActivity *createActivity(IProbeManager *_probemanager) const = 0;
+    virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probemanager) const = 0;
     virtual void setInput(unsigned idx, unsigned source, unsigned sourceidx) = 0;
     virtual bool isSink() const = 0;
     virtual bool isFunction() const = 0;
@@ -208,7 +208,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, cycle_t localCycles) const = 0;
     virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const = 0;
-    virtual void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
+    virtual void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
     virtual void noteStarted() const = 0;
     virtual void noteStarted(unsigned idx) const = 0;
     virtual void noteDependent(unsigned target) = 0;
@@ -264,7 +264,7 @@ interface IRoxieServerChildGraph : public IInterface
 
 interface IQueryFactory;
 
-extern IActivityGraph *createActivityGraph(const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
+extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
 
 extern ruid_t getNextRuid();
 extern void setStartRuid(unsigned restarts);