Kaynağa Gözat

Merge pull request #8178 from richardkchapman/strandroxie

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 yıl önce
ebeveyn
işleme
1e9bd5cf2b

+ 2 - 0
common/thorhelper/CMakeLists.txt

@@ -38,6 +38,7 @@ set (    SRCS
          thorsoapcall.cpp 
          thorstep.cpp 
          thorstep2.cpp 
+         thorstrand.cpp
          thortalgo.cpp 
          thortlex.cpp 
          thortparse.cpp 
@@ -57,6 +58,7 @@ set (    SRCS
          thorrparse.hpp 
          thorsoapcall.hpp 
          thorstep.hpp 
+         thorstrand.hpp
          thorxmlread.hpp 
          thorxmlwrite.hpp
          roxierow.hpp

+ 1 - 2
common/thorhelper/roxiedebug.ipp

@@ -163,12 +163,11 @@ interface IDebugGraphManager : extends IInterface
 interface IRoxieProbe : public IInterface
 {
     virtual IInputBase &queryInput() = 0;
-    virtual IEngineRowStream &queryStream() = 0;
 };
 
 interface IProbeManager : public IInterface
 {
-    virtual IRoxieProbe *createProbe(IInputBase *in, IEngineRowStream *_inStream, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration) = 0;
+    virtual IRoxieProbe *createProbe(IInputBase *in, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration) = 0;
     virtual void getProbeResponse(IPropertyTree *query) = 0;
     virtual void noteSink(IActivityBase *sink) = 0;
     virtual void noteDependency(IActivityBase *sourceActivity, unsigned sourceIndex, unsigned controlId, const char *edgeId, IActivityBase *targetActivity) = 0;

+ 0 - 3
common/thorhelper/roxiehelper.ipp

@@ -30,9 +30,6 @@ struct IInputBase : public IInterface //base for IFinalRoxieInput and IHThorInpu
 {
     virtual IOutputMetaData * queryOutputMeta() const = 0;
     virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
-
-    // These will need some thought
-    virtual IEngineRowStream &queryStream() = 0;
 };
 
 //---------------------------------------------------

+ 38 - 0
common/thorhelper/thorcommon.cpp

@@ -27,6 +27,7 @@
 #include "thorcommon.ipp"
 #include "eclrtl.hpp"
 #include "rtlread_imp.hpp"
+#include <algorithm>
 
 #include "thorstep.hpp"
 
@@ -1677,6 +1678,43 @@ IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *li
     return new CDiskMerger(rowInterfaces, linker, tempnamebase);
 }
 
+//---------------------------------------------------------------------------------------------------------------------
+
+void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
+{
+    if (totalCycles)
+    {
+        builder.addStatistic(StWhenFirstRow, firstRow);
+        builder.addStatistic(StTimeElapsed, elapsed());
+        builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
+        builder.addStatistic(StTimeFirstExecute, latency());
+    }
+}
+
+void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
+{
+    if (other.totalCycles)
+    {
+        if (totalCycles)
+        {
+            //Record the earliest start, the latest end, the longest latencies
+            cycle_t thisLatency = latencyCycles();
+            cycle_t otherLatency = other.latencyCycles();
+            cycle_t maxLatency = std::max(thisLatency, otherLatency);
+            if (startCycles > other.startCycles)
+            {
+                startCycles = other.startCycles;
+                firstRow =other.firstRow;
+            }
+            firstExitCycles = startCycles + maxLatency;
+            if (endCycles < other.endCycles)
+                endCycles = other.endCycles;
+            totalCycles += other.totalCycles;
+        }
+        else
+            *this = other;
+    }
+}
 
 
 

+ 12 - 17
common/thorhelper/thorcommon.hpp

@@ -131,9 +131,6 @@ interface THORHELPER_API IDiskMerger : extends IInterface
 
 extern THORHELPER_API IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase);
 
-extern THORHELPER_API void testDiskSort();
-
-
 
 #define TIME_ACTIVITIES
 class ActivityTimeAccumulator
@@ -142,11 +139,7 @@ class ActivityTimeAccumulator
 public:
     ActivityTimeAccumulator()
     {
-        startCycles = 0;
-        totalCycles = 0;
-        endCycles = 0;
-        firstRow = 0;
-        firstExitCycles = 0;
+        reset();
     }
 public:
     cycle_t startCycles; // Wall clock time of first entry to this activity
@@ -158,17 +151,19 @@ public:
     // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
     inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
     // Return the total amount of time (in nanoseconds) spent in the first call of this activity (first entry to first exit)
-    inline unsigned __int64 latency() const { return cycle_to_nanosec(firstExitCycles-startCycles); }
+    inline unsigned __int64 latency() const { return cycle_to_nanosec(latencyCycles()); }
+    inline cycle_t latencyCycles() const { return firstExitCycles-startCycles; }
+
+    void addStatistics(IStatisticGatherer & builder) const;
+    void merge(const ActivityTimeAccumulator & other);
 
-    void addStatistics(IStatisticGatherer & builder) const
+    void reset()
     {
-        if (totalCycles)
-        {
-            builder.addStatistic(StWhenFirstRow, firstRow);
-            builder.addStatistic(StTimeElapsed, elapsed());
-            builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
-            builder.addStatistic(StTimeFirstExecute, latency());
-        }
+        startCycles = 0;
+        totalCycles = 0;
+        endCycles = 0;
+        firstRow = 0;
+        firstExitCycles = 0;
     }
 };
 

Dosya farkı çok büyük olduğundan ihmal edildi
+ 1441 - 0
common/thorhelper/thorstrand.cpp


+ 187 - 0
common/thorhelper/thorstrand.hpp

@@ -0,0 +1,187 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef THORSTRAND_HPP
+#define THORSTRAND_HPP
+
+#include "jqueue.hpp"
+#include "thorhelper.hpp"
+#include "roxiestream.hpp"
+#include "roxiemem.hpp"
+
+class IStrandJunction : extends IInterface
+{
+public:
+    virtual IEngineRowStream * queryOutput(unsigned n) = 0;
+    virtual void setInput(unsigned n, IEngineRowStream * _stream) = 0;
+    virtual void ready() = 0;
+    virtual void reset() = 0;
+    virtual void abort() = 0;
+};
+
+inline void startJunction(IStrandJunction * junction) { if (junction) junction->ready(); }
+inline void resetJunction(IStrandJunction * junction) { if (junction) junction->reset(); }
+
+interface IManyToOneRowStream : extends IRowStream
+{
+public:
+    virtual IRowWriterEx * getWriter(unsigned n) = 0;
+    virtual void abort() = 0;
+};
+
+interface IStrandBranch : extends IInterface
+{
+    virtual IStrandJunction * queryInputJunction() = 0;
+    virtual IStrandJunction * queryOutputJunction() = 0;
+};
+
+extern THORHELPER_API IStrandJunction * createStrandJunction(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered);
+extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & _rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped);
+extern THORHELPER_API void clearRowQueue(IRowQueue * queue);
+
+extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered);
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class RowBlockAllocator;
+class THORHELPER_API RoxieRowBlock
+{
+public:
+    const static unsigned numDummyDynamicRows = 1;
+    explicit RoxieRowBlock(unsigned _maxRows) noexcept : maxRows(_maxRows)
+    {
+        readPos = 0;
+        writePos = 0;
+        endOfChunk = false;
+    }
+    ~RoxieRowBlock();
+
+    inline bool addRowNowFull(const void * row)
+    {
+        dbgassertex(writePos < maxRows);
+        rows[writePos] = row;
+        return (++writePos == maxRows);
+    }
+
+    bool empty() const;
+    IException * getClearException()
+    {
+         return exception.getClear();
+    }
+    inline bool isEndOfChunk() const { return endOfChunk; }
+    inline bool nextRow(const void * & row)
+    {
+        if (readPos >= writePos)
+            return false;
+        row = rows[readPos++];
+        return true;
+    }
+    inline size32_t numRows() const { return writePos - readPos; }
+
+    bool readFromStream(IRowStream * stream);
+    inline void releaseBlock()
+    {
+        //This function is called instead of directly calling delete in case a cache is introduced later.
+        delete this;
+    }
+    void releaseRows();
+
+    inline void setEndOfChunk() { endOfChunk = true; }
+    inline void setExceptionOwn(IException * e) { exception.setown(e); }
+
+    void throwAnyPendingException();
+
+    static void operator delete (void * ptr);
+
+protected:
+    Owned<IException> exception;
+    const size32_t maxRows;
+    size32_t readPos;
+    size32_t writePos;
+    bool endOfChunk;
+    const void * rows[numDummyDynamicRows];        // Actually multiple rows.  Memory is allocated by the RowBlockAllocator.
+};
+
+
+class THORHELPER_API RowBlockAllocator
+{
+public:
+    RowBlockAllocator(roxiemem::IRowManager & _rowManager, unsigned rowsPerBlock);
+    RoxieRowBlock * newBlock();
+
+    size32_t maxRowsPerBlock() const { return rowsPerBlock; }
+
+public:
+    size32_t rowsPerBlock;
+    Owned<roxiemem::IFixedRowHeap> heap;
+};
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+typedef IQueueOf<RoxieRowBlock *> IRowBlockQueue;
+
+
+//MORE:  This implementation should be improved!  Directly use the correct queue implementation??
+class CRowBlockQueue : implements CInterfaceOf<IRowBlockQueue>
+{
+public:
+    CRowBlockQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots)
+    {
+        queue.setown(createRowQueue(numReaders, numWriters, maxItems, maxSlots));
+    }
+
+    virtual bool enqueue(RoxieRowBlock * const item)
+    {
+        return queue->enqueue(reinterpret_cast<const void *>(item));
+    }
+    virtual bool dequeue(RoxieRowBlock * & result)
+    {
+        const void * tempResult;
+        bool ok = queue->dequeue(tempResult);
+        result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
+        return ok;
+    }
+    virtual bool tryDequeue(RoxieRowBlock * & result)
+    {
+        const void * tempResult;
+        bool ok = queue->tryDequeue(tempResult);
+        result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
+        return ok;
+    }
+    virtual void reset()
+    {
+        queue->reset();
+    }
+    virtual void noteWriterStopped()
+    {
+        queue->noteWriterStopped();
+    }
+    virtual void abort()
+    {
+        queue->abort();
+    }
+
+private:
+    Owned<IRowQueue> queue;
+};
+
+
+
+
+
+#endif // THORSTRAND_HPP

+ 0 - 1
ecl/eclagent/eclgraph.cpp

@@ -489,7 +489,6 @@ void EclGraphElement::createActivity(IAgentContext & agent, EclSubGraph * owner)
                     {
                         IRoxieProbe *base = probeManager->createProbe(
                                                         input.queryOutput(branchIndexes.item(i2)),  //input
-                                                        &input.queryOutput(branchIndexes.item(i2))->queryStream(),  //stream
                                                         input.activity.get(),   //Source act
                                                         activity.get(),         //target activity
                                                         0,//input.id, 

+ 1 - 0
ecl/hthor/hthor.hpp

@@ -59,6 +59,7 @@ struct IHThorInput : public IInputBase
     virtual void resetEOF() { }
 
     // HThor is not going to support parallel streams
+    virtual IEngineRowStream &queryStream() = 0;
     inline bool nextGroup(ConstPointerArray & group) { return queryStream().nextGroup(group); }
     inline void readAll(RtlLinkedDatasetBuilder &builder) { return queryStream().readAll(builder); }
     inline const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { return queryStream().nextRowGE(seek, numFields, wasCompleteMatch, stepExtra); }

+ 1 - 1
plugins/cassandra/cpp-driver

@@ -1 +1 @@
-Subproject commit b4bb435129bab533612fa2caf194555fa943f925
+Subproject commit d7bbad34db39a51f209c6fadd07c9ec3a0bb86b7

+ 3 - 0
roxie/ccd/ccd.hpp

@@ -432,6 +432,9 @@ extern unsigned defaultFetchPreload;
 extern unsigned defaultFullKeyedJoinPreload;
 extern unsigned defaultKeyedJoinPreload;
 extern unsigned defaultPrefetchProjectPreload;
+extern unsigned defaultStrandBlockSize;
+extern unsigned defaultForceNumStrands;
+
 extern bool defaultCheckingHeap;
 
 extern unsigned slaveQueryReleaseDelaySeconds;

+ 12 - 9
roxie/ccd/ccdactivities.cpp

@@ -40,6 +40,7 @@
 #include "csvsplitter.hpp"
 #include "thorxmlread.hpp"
 #include "thorcommon.ipp"
+#include "thorstrand.hpp"
 #include "jstats.h"
 
 size32_t diskReadBufferSize = 0x10000;
@@ -209,7 +210,7 @@ public:
         if (datafile)
             addXrefFileInfo(reply, datafile);
     }
-    void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
+    void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
     {
         if (childQueries.length())
         {
@@ -217,10 +218,10 @@ public:
             {
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
                     _probeManager = queryContext->queryProbeManager();
-                IActivityGraph *childGraph = createActivityGraph(NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
+                IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
                 childGraphs.append(*childGraph);
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
-                childGraph->onCreate(queryContext, colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
+                childGraph->onCreate(colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
             }
         }
     }
@@ -334,9 +335,9 @@ protected:
         // MORE - need to consider debugging....
         if (probeAllRows)
             probeManager.setown(createProbeManager());
-        basefactory->createChildQueries(childGraphs, basehelper, probeManager, queryContext, logctx);
+        basefactory->createChildQueries(queryContext, childGraphs, basehelper, probeManager, queryContext, logctx);
 #else
-        basefactory->createChildQueries(childGraphs, basehelper, NULL, queryContext, logctx);
+        basefactory->createChildQueries(queryContext, childGraphs, basehelper, NULL, queryContext, logctx);
 #endif
         if (meta.needsSerializeDisk())
             serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
@@ -584,9 +585,9 @@ public:
         return logctx;
     }
 
-    virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const 
+    virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
     {
-        return queryContext->queryCodeContext()->getRowAllocator(meta, activityId); 
+        return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
     }
     virtual const char *cloneVString(const char *str) const
     {
@@ -5164,10 +5165,12 @@ public:
         {
             remoteGraph->beforeExecute();
             Owned<IFinalRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
-            IEngineRowStream &stream = input->queryStream();
+            Owned<IStrandJunction> junction;
+            IEngineRowStream *stream = connectSingleStream(queryContext, input, 0, junction, 0);
+
             while (!aborted)
             {
-                const void * next = stream.ungroupedNextRow();
+                const void * next = stream->ungroupedNextRow();
                 if (!next)
                     break;
 

+ 9 - 4
roxie/ccd/ccdcontext.cpp

@@ -1310,13 +1310,13 @@ public:
     {
         if (extra.embedded)
         {
-            return factory->lookupGraph(extra.embeddedGraphName, probeManager, *this, parentActivity);
+            return factory->lookupGraph(this, extra.embeddedGraphName, probeManager, *this, parentActivity);
         }
         else
         {
             Owned<IQueryFactory> libraryQuery = factory->lookupLibrary(extra.libraryName, extra.interfaceHash, *this);
             assertex(libraryQuery);
-            return libraryQuery->lookupGraph("graph1", probeManager, *this, parentActivity);
+            return libraryQuery->lookupGraph(this, "graph1", probeManager, *this, parentActivity);
         }
     }
 
@@ -1330,8 +1330,8 @@ public:
         }
         else if (probeAllRows || probeQuery != NULL)
             probeManager.setown(createProbeManager());
-        graph.setown(factory->lookupGraph(graphName, probeManager, *this, NULL));
-        graph->onCreate(this, NULL);  // MORE - is that right
+        graph.setown(factory->lookupGraph(this, graphName, probeManager, *this, NULL));
+        graph->onCreate(NULL);  // MORE - is that right
         if (debugContext)
             debugContext->checkBreakpoint(DebugStateGraphStart, NULL, graphName);
         if (workUnit)
@@ -1578,6 +1578,11 @@ public:
         return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
     }
 
+    virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+    {
+        return allocatorMetaCache->ensure(meta, activityId, flags);
+    }
+
     virtual const char *cloneVString(const char *str) const
     {
         return rowManager->cloneVString(str);

+ 2 - 0
roxie/ccd/ccdcontext.hpp

@@ -69,6 +69,8 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IConstWorkUnit *queryWorkUnit() const = 0;
     virtual IRoxieServerContext *queryServerContext() = 0;
     virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped) = 0;
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const = 0;
+
 };
 
 interface IRoxieServerContext : extends IInterface

+ 28 - 14
roxie/ccd/ccddebug.cpp

@@ -25,6 +25,7 @@
 #include "ccdqueue.ipp"
 #include "ccdsnmp.hpp"
 #include "ccdstate.hpp"
+#include "thorstrand.hpp"
 
 using roxiemem::IRowManager;
 
@@ -52,11 +53,12 @@ protected:
     bool hasStopped;
 
 public:
-    InputProbe(IFinalRoxieInput *_in, IEngineRowStream *_inStream, IDebuggableContext *_debugContext,
+    InputProbe(IFinalRoxieInput *_in, IDebuggableContext *_debugContext,
         unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
-        : in(_in),  inStream(_inStream), debugContext(_debugContext),
+        : in(_in),  debugContext(_debugContext),
           sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
     {
+        inStream = NULL;
         hasStarted = false;
         everStarted = false;
         hasStopped = false;
@@ -74,6 +76,22 @@ public:
     {
         return in->gatherConjunctions(collector);
     }
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
+    {
+        assertex (!idx);
+        PointerArrayOf<IEngineRowStream> instreams;
+        Owned<IStrandJunction> junction = in->getOutputStreams(ctx, sourceIdx, instreams, false, flags | SFforceSingle);
+        // We forced to single, so should not be getting anything but a single stream back
+        assertex(junction==NULL);
+        assertex(instreams.length()==1);
+        inStream = instreams.item(0);
+
+        // Return a single stream too...
+        streams.append(this);
+        return NULL;
+    }
+
     virtual void resetEOF()
     {
         inStream->resetEOF();
@@ -105,10 +123,6 @@ public:
     {
         return in->queryOutputMeta();
     }
-    IEngineRowStream &queryStream()
-    {
-        return *this;
-    }
     IInputBase &queryInput()
     {
         return *this;
@@ -172,8 +186,8 @@ class TraceProbe : public InputProbe
 public:
     IMPLEMENT_IINTERFACE;
 
-    TraceProbe(IFinalRoxieInput *_in, IEngineRowStream *_inStream, unsigned _sourceId, unsigned _targetId, unsigned _sourceIdx, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
-        : InputProbe(_in, _inStream, NULL, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel)
+    TraceProbe(IFinalRoxieInput *_in, unsigned _sourceId, unsigned _targetId, unsigned _sourceIdx, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
+        : InputProbe(_in, NULL, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel)
     {
     }
 
@@ -272,11 +286,11 @@ class CProbeManager : public CInterface, implements IProbeManager
 public:
     IMPLEMENT_IINTERFACE;
 
-    IRoxieProbe *createProbe(IInputBase *in, IEngineRowStream *_inStream, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    IRoxieProbe *createProbe(IInputBase *in, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
     {
         unsigned idIn = inAct->queryId();
         unsigned idOut = outAct->queryId();
-        TraceProbe *probe = new TraceProbe(static_cast<IFinalRoxieInput*>(in), _inStream, idIn, idOut, sourceIdx, targetIdx, iteration, 0);
+        TraceProbe *probe = new TraceProbe(static_cast<IFinalRoxieInput*>(in), idIn, idOut, sourceIdx, targetIdx, iteration, 0);
         probes.append(*probe);
         return probe;
     }
@@ -526,8 +540,8 @@ class DebugProbe : public InputProbe, implements IActivityDebugContext
     }
 
 public:
-    DebugProbe(IInputBase *_in, IEngineRowStream *_inStream, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
-        : InputProbe(static_cast<IFinalRoxieInput*>(_in), _inStream, _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
+    DebugProbe(IInputBase *_in,  unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
+        : InputProbe(static_cast<IFinalRoxieInput*>(_in), _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
           sourceAct(_sourceAct), targetAct(_targetAct)
     {
         historyCapacity = debugContext->getDefaultHistoryCapacity();
@@ -1011,7 +1025,7 @@ public:
         return CInterface::Release();
     }
 
-    virtual IRoxieProbe *createProbe(IInputBase *in, IEngineRowStream *inStream, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    virtual IRoxieProbe *createProbe(IInputBase *in, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
     {
         CriticalBlock b(crit);
         if (!iteration)
@@ -1021,7 +1035,7 @@ public:
         unsigned targetId = targetAct->queryId();
         DebugActivityRecord *sourceActRecord = noteActivity(sourceAct, iteration, channel, debugContext->querySequence());
         DebugActivityRecord *targetActRecord = noteActivity(targetAct, iteration, channel, debugContext->querySequence());
-        DebugProbe *probe = new DebugProbe(in, inStream, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
+        DebugProbe *probe = new DebugProbe(in, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
 #ifdef _DEBUG
         DBGLOG("Creating probe for edge id %s in graphManager %p", probe->queryEdgeId(), this);
 #endif

+ 4 - 0
roxie/ccd/ccdmain.cpp

@@ -132,6 +132,8 @@ unsigned defaultFullKeyedJoinPreload = 0;
 unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
+unsigned defaultStrandBlockSize = 512;
+unsigned defaultForceNumStrands = 0;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
 unsigned coresPerQuery = 0;
@@ -763,6 +765,8 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
         defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
         defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
+        defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
+        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
         defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
 
         slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);

+ 13 - 5
roxie/ccd/ccdquery.cpp

@@ -289,6 +289,8 @@ QueryOptions::QueryOptions()
     fetchPreload = defaultFetchPreload;
     prefetchProjectPreload = defaultPrefetchProjectPreload;
     bindCores = coresPerQuery;
+    strandBlockSize = defaultStrandBlockSize;
+    forceNumStrands = defaultForceNumStrands;
 
     checkingHeap = defaultCheckingHeap;
     disableLocalOptimizations = false;  // No global default for this
@@ -316,6 +318,8 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     fetchPreload = other.fetchPreload;
     prefetchProjectPreload = other.prefetchProjectPreload;
     bindCores = other.bindCores;
+    strandBlockSize = other.strandBlockSize;
+    forceNumStrands = other.forceNumStrands;
 
     checkingHeap = other.checkingHeap;
     disableLocalOptimizations = other.disableLocalOptimizations;
@@ -353,6 +357,8 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
     updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
     updateFromWorkUnit(bindCores, wu, "bindCores");
+    updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
+    updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
 
     updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
     updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
@@ -400,6 +406,8 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
         updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
         updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
+        updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
+        updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
 
         updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
         // Note: disableLocalOptimizations is not permitted at context level (too late)
@@ -536,7 +544,7 @@ protected:
             return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKproject:
         case TAKcountproject:
-            return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind); // code is common between Project, CountProject
+            return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
         case TAKfilterproject:
             return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKdatasetresult:
@@ -692,7 +700,7 @@ protected:
         case TAKsimpleaction:
             return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
         case TAKparse:
-            return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, this);
+            return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
         case TAKworkunitwrite:
             return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
         case TAKdictionaryworkunitwrite:
@@ -846,7 +854,7 @@ protected:
         case TAKnonempty:
             return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKprefetchproject:
-            return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKwhen_dataset:
             return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKwhen_action:
@@ -1266,12 +1274,12 @@ public:
         return *graphMap.getValue(name);
     }
 
-    virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
+    virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
     {
         assertex(name && *name);
         ActivityArrayPtr *graph = graphMap.getValue(name);
         assertex(graph);
-        Owned<IActivityGraph> ret = ::createActivityGraph(name, 0, **graph, parentActivity, probeManager, logctx);
+        Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx);
         return ret.getClear();
     }
 

+ 5 - 3
roxie/ccd/ccdquery.hpp

@@ -58,13 +58,13 @@ interface IActivityGraph : extends IInterface
     virtual void reset() = 0;
     virtual void execute() = 0;
     virtual void getProbeResponse(IPropertyTree *query) = 0;
-    virtual void onCreate(IRoxieSlaveContext *ctx, IHThorArg *colocalArg) = 0;
+    virtual void onCreate(IHThorArg *colocalArg) = 0;
     virtual void noteException(IException *E) = 0;
     virtual void checkAbort() = 0;
     virtual IThorChildGraph * queryChildGraph() = 0;
     virtual IEclGraphResults * queryLocalGraph() = 0;
     virtual IRoxieServerChildGraph * queryLoopGraph() = 0;
-    virtual IRoxieServerChildGraph * createGraphLoopInstance(unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) = 0;
+    virtual IRoxieServerChildGraph * createGraphLoopInstance(IRoxieSlaveContext *ctx, unsigned loopCounter, unsigned parentExtractSize, const byte * parentExtract, const IRoxieContextLogger &logctx) = 0;
     virtual const char *queryName() const = 0;
 };
 
@@ -109,6 +109,8 @@ public:
     int fetchPreload;
     int prefetchProjectPreload;
     int bindCores;
+    unsigned strandBlockSize;
+    unsigned forceNumStrands;
 
     bool checkingHeap;
     bool disableLocalOptimizations;
@@ -134,7 +136,7 @@ private:
 interface IQueryFactory : extends IInterface
 {
     virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const = 0;
-    virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
+    virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
     virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const = 0;
     virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const = 0;
     virtual hash64_t queryHash() const = 0;

Dosya farkı çok büyük olduğundan ihmal edildi
+ 1604 - 1006
roxie/ccd/ccdserver.cpp


+ 27 - 11
roxie/ccd/ccdserver.hpp

@@ -87,8 +87,16 @@ interface IRoxieServerActivity;
 interface IRoxieServerChildGraph;
 interface IRoxieServerContext;
 interface IRoxieSlaveContext;
+interface IStrandJunction;
+
 class ClusterWriteHandler;
 
+enum StrandFlags
+{
+    SFforceSingle   = 0x0001,     // Force entire subtree to be single-stranded - eg when debugging or smart-stepping
+    SFpreserveOrder = 0x0002,     // Order must be preserved by any multistranding - returns a  suitable M:1 junction object to restore the order
+};
+
 interface IFinalRoxieInput : extends IInputBase
 {
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
@@ -100,8 +108,12 @@ interface IFinalRoxieInput : extends IInputBase
     virtual IFinalRoxieInput * queryConcreteInput(unsigned idx) { assertex(idx==0); return this; }
     virtual IRoxieServerActivity *queryActivity() = 0;
     virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
 };
 
+extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, unsigned flags);
+
 interface ISteppedConjunctionCollector;
 
 interface IIndexReadActivityInfo
@@ -132,12 +144,14 @@ interface IRoxiePackage;
 
 interface IRoxieServerActivity : extends IActivityBase
 {
-    virtual void setInput(unsigned idx, IFinalRoxieInput *in) = 0;
+    virtual void setInput(unsigned idx, unsigned sourceIdx, IFinalRoxieInput *in) = 0;
     virtual IFinalRoxieInput *queryOutput(unsigned idx) = 0;
     virtual IFinalRoxieInput *queryInput(unsigned idx) const = 0;
     virtual void execute(unsigned parentExtractSize, const byte *parentExtract) = 0;
-    virtual void onCreate(IRoxieSlaveContext *ctx, IHThorArg *colocalArg) = 0;
+    virtual void onCreate(IHThorArg *colocalArg) = 0;
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
+
     virtual void stop() = 0;
     virtual void abort() = 0;
     virtual void reset() = 0;
@@ -150,9 +164,11 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void serializeCreateStartContext(MemoryBuffer &out) = 0;
     virtual void serializeExtra(MemoryBuffer &out) = 0;
     virtual void stopSink(unsigned idx) = 0;
-//Functions to support result streaming between parallel loop/graphloop/library implementations
+    virtual void connectOutputStreams(unsigned flags) = 0;
+
+    //Functions to support result streaming between parallel loop/graphloop/library implementations
     virtual IFinalRoxieInput * querySelectOutput(unsigned id) = 0;
-    virtual bool querySetStreamInput(unsigned id, IFinalRoxieInput * _input) = 0;
+    virtual bool querySetStreamInput(unsigned id, unsigned _sourceIdx, IFinalRoxieInput * _input) = 0;
     virtual void gatherIterationUsage(IRoxieServerLoopResultProcessor & processor, unsigned parentExtractSize, const byte * parentExtract) = 0;
     virtual void associateIterationOutputs(IRoxieServerLoopResultProcessor & processor, unsigned parentExtractSize, const byte * parentExtract, IProbeManager *probeManager, IArrayOf<IRoxieProbe> &probes) = 0;
     virtual void resetOutputsUsed() = 0;        // use for adjusting correct number of uses for a splitter
@@ -173,7 +189,7 @@ interface IRoxieServerActivity : extends IActivityBase
 
 interface IRoxieServerActivityFactory : extends IActivityFactory
 {
-    virtual IRoxieServerActivity *createActivity(IProbeManager *_probemanager) const = 0;
+    virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probemanager) const = 0;
     virtual void setInput(unsigned idx, unsigned source, unsigned sourceidx) = 0;
     virtual bool isSink() const = 0;
     virtual bool isFunction() const = 0;
@@ -192,7 +208,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, cycle_t localCycles) const = 0;
     virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const = 0;
-    virtual void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
+    virtual void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
     virtual void noteStarted() const = 0;
     virtual void noteStarted(unsigned idx) const = 0;
     virtual void noteDependent(unsigned target) = 0;
@@ -231,7 +247,7 @@ interface IRoxieServerChildGraph : public IInterface
     virtual IFinalRoxieInput * startOutput(unsigned id, unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
     virtual IFinalRoxieInput * selectOutput(unsigned id) = 0;
     virtual void setInputResult(unsigned id, IGraphResult * result) = 0;
-    virtual bool querySetInputResult(unsigned id, IFinalRoxieInput * result) = 0;
+    virtual bool querySetInputResult(unsigned id, unsigned _sourceIdx, IFinalRoxieInput * result) = 0;
     virtual void stopUnusedOutputs() = 0;
     virtual IRoxieGraphResults * execute(size32_t parentExtractSize, const byte *parentExtract) = 0;
     virtual void afterExecute() = 0;
@@ -248,7 +264,7 @@ interface IRoxieServerChildGraph : public IInterface
 
 interface IQueryFactory;
 
-extern IActivityGraph *createActivityGraph(const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
+extern IActivityGraph *createActivityGraph(IRoxieSlaveContext *ctx, const char *graphName, unsigned id, ActivityArray &x, IRoxieServerActivity *parent, IProbeManager *probeManager, const IRoxieContextLogger &logctx);
 
 extern ruid_t getNextRuid();
 extern void setStartRuid(unsigned restarts);
@@ -361,7 +377,7 @@ extern IRoxieServerActivityFactory *createRoxieServerRegroupActivityFactory(unsi
 extern IRoxieServerActivityFactory *createRoxieServerCombineActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerCombineGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerRollupGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
 extern IRoxieServerActivityFactory *createRoxieServerGraphLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
@@ -380,7 +396,7 @@ extern IRoxieServerActivityFactory *createRoxieServerSkipLimitActivityFactory(un
 extern IRoxieServerActivityFactory *createRoxieServerCatchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerCaseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
 extern IRoxieServerActivityFactory *createRoxieServerIfActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
-extern IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IResourceContext *rc);
+extern IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, IResourceContext *rc);
 extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
@@ -413,7 +429,7 @@ extern IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(uns
 extern IRoxieServerActivityFactory *createRoxieServerIfActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerSequentialActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerParallelActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerStreamedIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);

+ 4 - 4
system/jlib/jatomic.hpp

@@ -32,11 +32,11 @@ inline static void spinPause() { __pause(); }
 # else
 inline static void spinPause() { _mm_pause(); }
 # endif
+#elif defined(_ARCH_PPC64EL_)
+inline static void spinPause() { } // MORE: Is there an equivalent?
+#elif defined(_ARCH_ARM64_)
+inline static void spinPause() { } // MORE: Is there an equivalent?
 #else
-// _ARCH_ARM64_ || _ARCH_ARM32_
-// inline static void spinPause() { __nop(); }
-// _ARCH_PPC64EL_
-// __asm__ __volatile__ ("or 0,0,0");
 inline static void spinPause() { }
 #endif
 

+ 6 - 0
system/jlib/jio.hpp

@@ -169,6 +169,12 @@ interface IRowWriter: extends IInterface
     virtual void flush() = 0;
 };
 
+interface IRowWriterEx : extends IRowWriter
+{
+public:
+    virtual void noteStopped() = 0;
+};
+
 interface IRowLinkCounter: extends IInterface
 {
     virtual void linkRow(const void *row)=0;

+ 32 - 0
system/jlib/jthread.cpp

@@ -718,6 +718,38 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
         throw e;
 }
 
+//---------------------------------------------------------------------------------------------------------------------
+
+class CSimpleFunctionThread : public Thread
+{
+    std::function<void()> func;
+public:
+    inline CSimpleFunctionThread(std::function<void()> _func) : Thread("TaskProcessor"), func(_func) { }
+    virtual int run()
+    {
+        func();
+        return 1;
+    }
+};
+
+void asyncStart(IThreaded & threaded)
+{
+    CThreaded * thread = new CThreaded("AsyncStart", &threaded);
+    thread->startRelease();
+}
+
+void asyncStart(const char * name, IThreaded & threaded)
+{
+    CThreaded * thread = new CThreaded(name, &threaded);
+    thread->startRelease();
+}
+
+//Experimental - is this a useful function to replace some uses of IThreaded?
+void asyncStart(std::function<void()> func)
+{
+    (new CSimpleFunctionThread(func))->startRelease();
+}
+
 // ---------------------------------------------------------------------------
 // Thread Pools
 // ---------------------------------------------------------------------------

+ 7 - 0
system/jlib/jthread.hpp

@@ -24,6 +24,7 @@
 #include "jmutex.hpp"
 #include "jexcept.hpp"
 #include "jhash.hpp"
+#include <functional>
 
 #ifdef _WIN32
 #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL
@@ -153,6 +154,12 @@ public:
     virtual int run() { owner->main(); return 1; }
 };
 
+extern jlib_decl void asyncStart(IThreaded & threaded);
+extern jlib_decl void asyncStart(const char * name, IThreaded & threaded);
+#if defined(__cplusplus) and __cplusplus >= 201100
+extern jlib_decl void asyncStart(std::function<void()> func);
+#endif
+
 // Similar to above, but the underlying thread always remains running. This can make repeated start + join's significantly quicker
 class jlib_decl CThreadedPersistent : public CInterface
 {

+ 2 - 7
testing/unittests/jlibtests.cpp

@@ -528,6 +528,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( JlibStringBufferTest );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibStringBufferTest, "JlibStringBufferTest" );
 
 
+
 /* =========================================================== */
 
 static const unsigned split4_2[] = {0, 2, 4 };
@@ -617,7 +618,6 @@ public:
 CPPUNIT_TEST_SUITE_REGISTRATION( JlibQuantileTest );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibQuantileTest, "JlibQuantileTest" );
 
-
 /* =========================================================== */
 
 class JlibReaderWriterTest : public CppUnit::TestFixture
@@ -705,13 +705,12 @@ class JlibReaderWriterTest : public CppUnit::TestFixture
                     work = spinCalculation(work, workScale);
                 target.enqueue(buffer + i);
             }
+            target.noteWriterStopped();
             doneSem.signal();
             return 0;
         }
     };
 public:
-    JlibReaderWriterTest() { unitWorkTimeMs = 0; }
-
     const static size_t bufferSize = 0x100000;//0x100000*64;
     void testQueue(IRowQueue & queue, unsigned numProducers, unsigned numConsumers, unsigned queueElements, unsigned readerWork, unsigned writerWork)
     {
@@ -746,10 +745,6 @@ public:
         for (unsigned i7 = 0; i7 < numProducers; i7++)
             writerDoneSem.wait();
 
-        //Now add NULL records to the queue so the consumers know to terminate
-        for (unsigned i8 = 0; i8 < numConsumers; i8++)
-            queue.enqueue(NULL);
-
         //Wait for the readers to complete
         for (unsigned i3 = 0; i3 < numConsumers; i3++)
             stopSem.wait();