Jelajahi Sumber

Merge pull request #8033 from richardkchapman/split-stream-phase2

HPCC-14656 Split concepts of an input and a row stream

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 tahun lalu
induk
melakukan
20b891d893

+ 15 - 2
common/thorhelper/roxiehelper.ipp

@@ -24,10 +24,23 @@
 
 extern THORHELPER_API unsigned traceLevel;
 interface IOutputMetaData;
-struct IInputBase : public IEngineRowStream // Should be derived from IInterface  //base for IRoxieInput and IHThorInput
+interface IInputSteppingMeta;
+
+struct IInputBase : public IInterface //base for IRoxieInput and IHThorInput
 {
-    virtual IEngineRowStream &queryInput() const { UNIMPLEMENTED; };
     virtual IOutputMetaData * queryOutputMeta() const = 0;
+    virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
+    virtual void resetEOF() = 0;
+
+    // These will need some thought
+    virtual IEngineRowStream &queryStream() = 0;
+    inline bool nextGroup(ConstPointerArray & group) { return queryStream().nextGroup(group); }
+    inline void readAll(RtlLinkedDatasetBuilder &builder) { return queryStream().readAll(builder); }
+    inline const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { return queryStream().nextRowGE(seek, numFields, wasCompleteMatch, stepExtra); }
+    inline const void *nextRow() { return queryStream().nextRow(); }
+    inline void stop() { queryStream().stop(); }
+    inline const void *ungroupedNextRow() { return queryStream().ungroupedNextRow(); }
+
 };
 
 //---------------------------------------------------

+ 3 - 1
ecl/eclagent/eclagent.cpp

@@ -3605,7 +3605,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
 
 //=======================================================================================
 //copied/modified from ccdserver
-class InputProbe : public CInterface, implements IHThorInput // base class for the edge probes used for tracing and debugging....
+class InputProbe : public CInterface, implements IHThorInput, implements IEngineRowStream // base class for the edge probes used for tracing and debugging....
 {
 protected:
     IHThorInput *in;
@@ -4052,6 +4052,8 @@ public:
         InputProbe::ready();
     }
 
+    virtual IEngineRowStream &queryStream() { return *this; }
+
     virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
         return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);

+ 9 - 2
ecl/eclagent/eclagent.ipp

@@ -845,7 +845,7 @@ class EclSubGraph : public CInterface, implements ILocalEclGraphResults, public
     friend class EclGraphElement;
 private:
 
-    class LegacyInputProbe : public CInterfaceOf<IHThorInput>
+    class LegacyInputProbe : public CInterface, implements IHThorInput, implements IEngineRowStream
     {
         IHThorInput  *in;
         EclSubGraph  *owner;
@@ -856,6 +856,8 @@ private:
         StringAttr edgeId;
 
     public:
+        IMPLEMENT_IINTERFACE;
+
         LegacyInputProbe(IHThorInput *_in, EclSubGraph *_owner, unsigned _sourceId, int outputidx)
             : in(_in), owner(_owner), sourceId(_sourceId), outputIndex(outputidx)
         {
@@ -876,6 +878,11 @@ private:
             in->stop();
         }
 
+        IEngineRowStream &queryStream()
+        {
+            return *this;
+        }
+
         bool isGrouped() { return in->isGrouped(); }
 
         bool nextGroup(ConstPointerArray & group)
@@ -1004,7 +1011,7 @@ public:
     unsigned numResults;
     CIArrayOf<EclGraphElement> elements;
 
-    IArrayOf<LegacyInputProbe> probes;
+    IArrayOf<IHThorInput> probes;
     CIArrayOf<EclSubGraph> subgraphs;
     Owned<IHThorGraphResults> localResults;
     Owned<IHThorGraphResults> graphLoopResults;

+ 7 - 7
ecl/hthor/hthor.cpp

@@ -4369,14 +4369,14 @@ void CHThorJoinActivity::ready()
     StringBuffer tempBase;
     agent.getTempfileBase(tempBase);
     if (helper.isLeftAlreadySorted())
-        sortedLeftInput.setown(createDegroupedInputReader(input));
+        sortedLeftInput.setown(createDegroupedInputReader(&input->queryStream()));
     else
-        sortedLeftInput.setown(createSortedInputReader(input, createSortAlgorithm(sortAlgorithm, helper.queryCompareLeft(), *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
+        sortedLeftInput.setown(createSortedInputReader(&input->queryStream(), createSortAlgorithm(sortAlgorithm, helper.queryCompareLeft(), *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
     ICompare *compareRight = helper.queryCompareRight();
     if (helper.isRightAlreadySorted())
-        groupedSortedRightInput.setown(createGroupedInputReader(input1, compareRight));
+        groupedSortedRightInput.setown(createGroupedInputReader(&input1->queryStream(), compareRight));
     else
-        groupedSortedRightInput.setown(createSortedGroupedInputReader(input1, compareRight, createSortAlgorithm(sortAlgorithm, compareRight, *queryRowManager(), input1->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
+        groupedSortedRightInput.setown(createSortedGroupedInputReader(&input1->queryStream(), compareRight, createSortAlgorithm(sortAlgorithm, compareRight, *queryRowManager(), input1->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
     outBuilder.setAllocator(rowAllocator);
     leftOuterJoin = (helper.getJoinFlags() & JFleftouter) != 0;
     rightOuterJoin = (helper.getJoinFlags() & JFrightouter) != 0;
@@ -4985,14 +4985,14 @@ void CHThorSelfJoinActivity::ready()
     outBuilder.setAllocator(rowAllocator);
     ICompare *compareLeft = helper.queryCompareLeft();
     if (helper.isLeftAlreadySorted())
-        groupedInput.setown(createGroupedInputReader(input, compareLeft));
+        groupedInput.setown(createGroupedInputReader(&input->queryStream(), compareLeft));
     else
     {
         bool isStable = (helper.getJoinFlags() & JFunstable) == 0;
         RoxieSortAlgorithm sortAlgorithm = isStable ? stableSpillingQuickSortAlgorithm : spillingQuickSortAlgorithm;
         StringBuffer tempBase;
         agent.getTempfileBase(tempBase);
-        groupedInput.setown(createSortedGroupedInputReader(input, compareLeft, createSortAlgorithm(sortAlgorithm, compareLeft, *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
+        groupedInput.setown(createSortedGroupedInputReader(&input->queryStream(), compareLeft, createSortAlgorithm(sortAlgorithm, compareLeft, *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
     }
     leftOuterJoin = (helper.getJoinFlags() & JFleftouter) != 0;
     rightOuterJoin = (helper.getJoinFlags() & JFrightouter) != 0;
@@ -9289,7 +9289,7 @@ CHThorLoopActivity::~CHThorLoopActivity()
 
 void CHThorLoopActivity::ready()
 {
-    curInput = input;
+    curInput = &input->queryStream();
     eof = false;
     loopCounter = 1;
     CHThorSimpleActivityBase::ready(); 

+ 0 - 2
ecl/hthor/hthor.hpp

@@ -52,11 +52,9 @@ interface IInputSteppingMeta;
 struct IHThorInput : public IInputBase
 {
     virtual bool isGrouped() = 0;
-    virtual IOutputMetaData * queryOutputMeta() const = 0;
 
     virtual void ready() = 0;
     virtual void updateProgress(IStatisticGatherer &progress) const = 0;
-    virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual void resetEOF() { }
 };

+ 5 - 2
ecl/hthor/hthor.ipp

@@ -191,7 +191,7 @@ static bool verifyFormatCrcSuper(unsigned helperCrc, IDistributedFile * df, bool
     virtual bool isGrouped() { throwUnexpected(); } \
     virtual IOutputMetaData * queryOutputMeta() const   { throwUnexpected(); } 
 
-class CHThorActivityBase : public CInterface, implements IHThorActivity, implements IHThorInput
+class CHThorActivityBase : public CInterface, implements IHThorActivity, implements IHThorInput, implements IEngineRowStream
 {
 protected:
     enum ActivityState { StateCreated, StateReady, StateDone };
@@ -230,6 +230,8 @@ public:
     virtual bool needsAllocator() const { return false; }       
     void createRowAllocator();                                  
     virtual bool isPassThrough();
+    virtual IEngineRowStream &queryStream() { return *this; }
+    inline const void *ungroupedNextRow() { return IEngineRowStream::ungroupedNextRow(); }
 
 protected:
     void updateProgressForOther(IStatisticGatherer &progress, unsigned otherActivity, unsigned otherSubgraph, unsigned whichOutput, unsigned __int64 numProcessed) const;
@@ -2733,7 +2735,7 @@ protected:
 
 
 class CHThorLibraryCallActivity;
-class LibraryCallOutput : public CInterface, public IHThorInput
+class LibraryCallOutput : public CInterface, public IHThorInput, public IEngineRowStream
 {
 public:
     LibraryCallOutput(CHThorLibraryCallActivity * _owner, unsigned _output, IOutputMetaData * _meta);
@@ -2744,6 +2746,7 @@ public:
 
     virtual void ready();
     virtual void stop();
+    virtual IEngineRowStream &queryStream() { return *this; }
     virtual void updateProgress(IStatisticGatherer &progress) const;
 
 protected:

+ 7 - 3
roxie/ccd/ccddebug.cpp

@@ -30,7 +30,7 @@ using roxiemem::IRowManager;
 
 //=======================================================================================================================
 
-class InputProbe : public CInterface, implements IRoxieInput // base class for the edge probes used for tracing and debugging....
+class InputProbe : public CInterface, implements IRoxieInput, implements IEngineRowStream // base class for the edge probes used for tracing and debugging....
 {
 protected:
     IRoxieInput *in;
@@ -104,6 +104,10 @@ public:
     {
         return in->queryOutputMeta();
     }
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
         // NOTE: totalRowCount/maxRowSize not reset, as we want them cumulative when working in a child query.
@@ -278,7 +282,7 @@ public:
 
 class CProbeManager : public CInterface, implements IProbeManager
 {
-    IArrayOf<TraceProbe> probes; // May want to replace with hash table at some point....
+    IArrayOf<IRoxieInput> probes; // May want to replace with hash table at some point....
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -302,7 +306,7 @@ public:
         {
             idx++;
             if (idx>=probeCount) idx = 0;
-            TraceProbe &p = probes.item(idx);
+            TraceProbe &p = static_cast<TraceProbe &> (probes.item(idx));
             if (p.matches(edge, forNode))
             {
                 startat = idx;

+ 48 - 16
roxie/ccd/ccdserver.cpp

@@ -847,7 +847,7 @@ private:
     IRoxieServerActivityCopyArray & activities;
 };
 
-class CRoxieServerActivity : public CInterface, implements IRoxieServerActivity, implements IRoxieInput, implements IRoxieContextLogger
+class CRoxieServerActivity : public CInterface, implements IRoxieServerActivity, implements IRoxieInput, implements IEngineRowStream, implements IRoxieContextLogger
 {
 protected:
     IRoxieInput *input;
@@ -956,6 +956,11 @@ public:
         return *this;
     }
 
+    virtual IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
+
     virtual void mergeStats(MemoryBuffer &buf)
     {
         stats.deserializeMerge(buf);
@@ -1697,7 +1702,7 @@ public:
 
 // MORE - this code copied from ThreadedConcat code - may be able to common up some.
 
-class CRoxieServerReadAheadInput : public CInterface, implements IRoxieInput, implements IRecordPullerCallback
+class CRoxieServerReadAheadInput : public CInterface, implements IRoxieInput, implements IRecordPullerCallback, implements IEngineRowStream
 {
     QueueOf<const void, true> buffer;
     InterruptableSemaphore ready;
@@ -1730,6 +1735,11 @@ public:
             timeActivities = ctx->queryOptions().timeActivities;
     }
 
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
+
     virtual IRoxieServerActivity *queryActivity()
     {
         return puller.queryInput()->queryActivity();
@@ -2613,7 +2623,7 @@ void throwRemoteException(IMessageUnpackCursor *extra)
     throwUnexpected();
 }
 
-class CRemoteResultAdaptor :public CInterface, implements IRoxieInput, implements IExceptionHandler
+class CRemoteResultAdaptor :public CInterface, implements IRoxieInput, implements IExceptionHandler, implements IEngineRowStream
 {
     friend class CRemoteResultMerger;
     class CRemoteResultMerger
@@ -3494,6 +3504,11 @@ public:
         delete [] buffers;
     }
 
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
+
     void setMeta(IOutputMetaData *newmeta)
     {
         meta.set(newmeta);
@@ -5379,12 +5394,16 @@ public:
 };
 
 
-class CSafeRoxieInput : public CInterface, implements IRoxieInput
+class CSafeRoxieInput : public CInterface, implements IRoxieInput, implements IEngineRowStream
 {
 public:
     CSafeRoxieInput(IRoxieInput * _input) : input(_input) {}
     IMPLEMENT_IINTERFACE
 
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
     virtual IOutputMetaData * queryOutputMeta() const
     {
         return input->queryOutputMeta();
@@ -5457,7 +5476,7 @@ private:
 
 //=================================================================================
 
-class CPseudoRoxieInput : public CInterface, implements IRoxieInput
+class CPseudoRoxieInput : public CInterface, implements IRoxieInput, implements IEngineRowStream
 {
 public:
     IMPLEMENT_IINTERFACE;
@@ -5489,6 +5508,10 @@ public:
     {
         throwUnexpected();
     }
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
 
     virtual IOutputMetaData * queryOutputMeta() const { throwUnexpected(); }
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) { }
@@ -7434,7 +7457,7 @@ public:
                 sortAlgorithm = useAlgorithm(algorithmName, sortFlags);
                 sorter.setown(createSortAlgorithm(sortAlgorithm, compare, ctx->queryRowManager(), meta, ctx->queryCodeContext(), tempDirectory, activityId));
             }
-            sorter->prepare(input);
+            sorter->prepare(&input->queryStream());
             noteStatistic(StTimeSortElapsed, cycle_to_nanosec(sorter->getElapsedCycles(true)));
             readInput = true;
         }
@@ -7622,7 +7645,7 @@ public:
                 }
                 else
                 {
-                    sorter->prepare(input);
+                    sorter->prepare(&input->queryStream());
                     sorter->getSortedGroup(sorted);
                 }
 
@@ -7898,7 +7921,7 @@ public:
     unsigned headIdx;
     Owned<IException> error;
 
-    class OutputAdaptor : public CInterface, implements IRoxieInput
+    class OutputAdaptor : public CInterface, implements IRoxieInput, implements IEngineRowStream
     {
         bool eof, eofpending, stopped;
 
@@ -8045,6 +8068,11 @@ public:
             parent->checkAbort();
         }
 
+        IEngineRowStream &queryStream()
+        {
+            return *this;
+        }
+
     } *adaptors;
     bool *used;
 
@@ -9245,7 +9273,7 @@ public:
         else
             eof = true;
 
-        return ungroupedNextRow();
+        return IEngineRowStream::ungroupedNextRow();
     }
 
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) 
@@ -11333,14 +11361,14 @@ public:
         else
             sortAlgorithm = isStable ? stableQuickSortAlgorithm : quickSortAlgorithm;
         if (helper.isLeftAlreadySorted())
-            sortedLeft.setown(createDegroupedInputReader(input));
+            sortedLeft.setown(createDegroupedInputReader(&input->queryStream()));
         else
-            sortedLeft.setown(createSortedInputReader(input, createSortAlgorithm(sortAlgorithm, helper.queryCompareLeft(), ctx->queryRowManager(), input->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
+            sortedLeft.setown(createSortedInputReader(&input->queryStream(), createSortAlgorithm(sortAlgorithm, helper.queryCompareLeft(), ctx->queryRowManager(), input->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
         ICompare *compareRight = helper.queryCompareRight();
         if (helper.isRightAlreadySorted())
-            groupedSortedRight.setown(createGroupedInputReader(input1, compareRight));
+            groupedSortedRight.setown(createGroupedInputReader(&input1->queryStream(), compareRight));
         else
-            groupedSortedRight.setown(createSortedGroupedInputReader(input1, compareRight, createSortAlgorithm(sortAlgorithm, compareRight, ctx->queryRowManager(), input1->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
+            groupedSortedRight.setown(createSortedGroupedInputReader(&input1->queryStream(), compareRight, createSortAlgorithm(sortAlgorithm, compareRight, ctx->queryRowManager(), input1->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
         if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit())
         {   //limited match join (s[1..n])
             limitedhelper.setown(createRHLimitedCompareHelper());
@@ -16722,7 +16750,7 @@ public:
             createDefaultRight();
         ICompare *compareLeft = helper.queryCompareLeft();
         if (helper.isLeftAlreadySorted())
-            groupedInput.setown(createGroupedInputReader(input, compareLeft));
+            groupedInput.setown(createGroupedInputReader(&input->queryStream(), compareLeft));
         else
         {
             bool isStable = (helper.getJoinFlags() & JFunstable) == 0;
@@ -16731,7 +16759,7 @@ public:
                 sortAlgorithm = isStable ? stableSpillingQuickSortAlgorithm : spillingQuickSortAlgorithm;
             else
                 sortAlgorithm = isStable ? stableQuickSortAlgorithm : quickSortAlgorithm;
-            groupedInput.setown(createSortedGroupedInputReader(input, compareLeft, createSortAlgorithm(sortAlgorithm, compareLeft, ctx->queryRowManager(), input->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
+            groupedInput.setown(createSortedGroupedInputReader(&input->queryStream(), compareLeft, createSortAlgorithm(sortAlgorithm, compareLeft, ctx->queryRowManager(), input->queryOutputMeta(), ctx->queryCodeContext(), tempDirectory, activityId)));
         }
         if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) 
         {   //limited match join (s[1..n])
@@ -26255,7 +26283,7 @@ public:
     virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 } testMeta;
 
-class TestInput : public CInterface, implements IRoxieInput
+class TestInput : public CInterface, implements IRoxieInput, implements IEngineRowStream
 {
     char const * const *input;
     IRoxieSlaveContext *ctx;
@@ -26294,6 +26322,10 @@ public:
         ASSERT(state == STATEreset);
         state = STATEstarted; 
     }
+    IEngineRowStream &queryStream()
+    {
+        return *this;
+    }
     virtual IRoxieServerActivity *queryActivity()
     {
         throwUnexpected();

+ 0 - 4
roxie/ccd/ccdserver.hpp

@@ -93,26 +93,22 @@ class ClusterWriteHandler;
 interface IRoxieInput : extends IInputBase
 {
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
-    virtual void stop() = 0;
     virtual void reset() = 0;
     virtual void checkAbort() = 0;
     virtual unsigned queryId() const = 0;
 
     virtual unsigned __int64 queryTotalCycles() const = 0;
     virtual unsigned __int64 queryLocalCycles() const = 0;
-    virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual unsigned numConcreteOutputs() const { return 1; }
     virtual IRoxieInput * queryConcreteInput(unsigned idx) { assertex(idx==0); return this; }
     virtual IRoxieInput *queryInput(unsigned idx) const = 0;
     virtual IRoxieServerActivity *queryActivity() = 0;
-    virtual void resetEOF() = 0;
     virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
 };
 
 
 interface ISteppedConjunctionCollector;
-interface IInputSteppingMeta;
 
 interface IIndexReadActivityInfo
 {