소스 검색

HPCC-14656 Split concepts of an input and a row stream

Files other than ccdserver.cpp all now compile without the extra methods in
the IRoxieInput class.

Temporarily added them back so that this commit builds...

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 년 전
부모
커밋
8f665b69a7
7개의 변경된 파일55개의 추가작업 그리고 39개의 파일을 삭제
  1. 1 1
      common/thorhelper/roxiedebug.ipp
  2. 12 11
      ecl/eclagent/eclagent.cpp
  3. 1 0
      ecl/eclagent/eclgraph.cpp
  4. 9 0
      ecl/hthor/hthor.hpp
  5. 3 6
      roxie/ccd/ccdactivities.cpp
  6. 15 14
      roxie/ccd/ccddebug.cpp
  7. 14 7
      roxie/ccd/ccdserver.cpp

+ 1 - 1
common/thorhelper/roxiedebug.ipp

@@ -163,7 +163,7 @@ interface IDebugGraphManager : extends IInterface
 
 interface IProbeManager : public IInterface
 {
-    virtual IInputBase *createProbe(IInputBase *in, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration) = 0;
+    virtual IInputBase *createProbe(IInputBase *in, IEngineRowStream *_inStream, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration) = 0;
     virtual void getProbeResponse(IPropertyTree *query) = 0;
     virtual void noteSink(IActivityBase *sink) = 0;
     virtual void noteDependency(IActivityBase *sourceActivity, unsigned sourceIndex, unsigned controlId, const char *edgeId, IActivityBase *targetActivity) = 0;

+ 12 - 11
ecl/eclagent/eclagent.cpp

@@ -3609,6 +3609,7 @@ class InputProbe : public CInterface, implements IHThorInput, implements IEngine
 {
 protected:
     IHThorInput *in;
+    IEngineRowStream *stream;
     unsigned sourceId;
     unsigned sourceIdx;
     unsigned targetId;
@@ -3625,8 +3626,8 @@ protected:
     bool everStarted;
         
 public:
-    InputProbe(IHThorInput *_in, unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
-        : in(_in), sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
+    InputProbe(IHThorInput *_in, IEngineRowStream *_stream, unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
+        : in(_in), stream(_stream), sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
     {
         hasStarted = false;
         hasStopped = false;
@@ -3669,12 +3670,12 @@ public:
 
     virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return stream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual const void *nextRow()
     {
-        const void *ret = in->nextRow();
+        const void *ret = stream->nextRow();
         if (ret)
         {
             size32_t size = in->queryOutputMeta()->getRecordSize(ret);
@@ -3702,7 +3703,7 @@ public:
     virtual void stop() 
     { 
         hasStopped = true;
-        in->stop(); 
+        stream->stop();
     }
 };
 
@@ -3760,8 +3761,8 @@ class DebugProbe : public InputProbe, implements IActivityDebugContext
     }
 
 public:
-    DebugProbe(IHThorInput *_in, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
-        : InputProbe(_in, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
+    DebugProbe(IHThorInput *_in, IEngineRowStream *_stream, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
+        : InputProbe(_in, _stream, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
           sourceAct(_sourceAct), targetAct(_targetAct), debugContext(_debugContext)
     {
         historyCapacity = debugContext->getDefaultHistoryCapacity();
@@ -4056,12 +4057,12 @@ public:
 
     virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return stream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual void stop()
     {
-        in->stop();
+        stream->stop();
     }
 
     virtual const void *nextRow()
@@ -4161,7 +4162,7 @@ public:
     {
     }
 
-    IInputBase *createProbe(IInputBase *in, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    IInputBase *createProbe(IInputBase *in, IEngineRowStream *stream, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
     {
         CriticalBlock b(crit);
         unsigned channel = debugContext->queryChannel();
@@ -4169,7 +4170,7 @@ public:
         unsigned targetId = targetAct->queryId();
         DebugActivityRecord *sourceActRecord = noteActivity(sourceAct, iteration, channel, debugContext->querySequence());
         DebugActivityRecord *targetActRecord = noteActivity(targetAct, iteration, channel, debugContext->querySequence());
-        DebugProbe *probe = new DebugProbe(dynamic_cast<IHThorInput*>(in), sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
+        DebugProbe *probe = new DebugProbe(dynamic_cast<IHThorInput*>(in), stream, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
     #ifdef _DEBUG
         DBGLOG("Creating probe for edge id %s in graphManager %p", probe->queryEdgeId(), this);
     #endif

+ 1 - 0
ecl/eclagent/eclgraph.cpp

@@ -489,6 +489,7 @@ void EclGraphElement::createActivity(IAgentContext & agent, EclSubGraph * owner)
                     {
                         IInputBase *base = probeManager->createProbe(
                                                         input.queryOutput(branchIndexes.item(i2)),  //input
+                                                        &input.queryOutput(branchIndexes.item(i2))->queryStream(),  //stream
                                                         input.activity.get(),   //Source act
                                                         activity.get(),         //target activity
                                                         0,//input.id, 

+ 9 - 0
ecl/hthor/hthor.hpp

@@ -57,6 +57,15 @@ struct IHThorInput : public IInputBase
     virtual void updateProgress(IStatisticGatherer &progress) const = 0;
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual void resetEOF() { }
+
+    // HThor is not going to support parallel streams
+    inline bool nextGroup(ConstPointerArray & group) { return queryStream().nextGroup(group); }
+    inline void readAll(RtlLinkedDatasetBuilder &builder) { return queryStream().readAll(builder); }
+    inline const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { return queryStream().nextRowGE(seek, numFields, wasCompleteMatch, stepExtra); }
+    inline const void *nextRow() { return queryStream().nextRow(); }
+    inline void stop() { queryStream().stop(); }
+    inline const void *ungroupedNextRow() { return queryStream().ungroupedNextRow(); }
+
 };
 
 struct IHThorNWayInput

+ 3 - 6
roxie/ccd/ccdactivities.cpp

@@ -5164,15 +5164,12 @@ public:
         {
             remoteGraph->beforeExecute();
             Owned<IRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
+            IEngineRowStream &stream = input->queryStream();
             while (!aborted)
             {
-                const void * next = input->nextRow();
+                const void * next = stream.ungroupedNextRow();
                 if (!next)
-                {
-                    next = input->nextRow();
-                    if (!next)
-                        break;
-                }
+                    break;
 
                 size32_t nextSize = meta.getRecordSize(next);
                 //MORE - what about grouping?

+ 15 - 14
roxie/ccd/ccddebug.cpp

@@ -34,6 +34,7 @@ class InputProbe : public CInterface, implements IRoxieInput, implements IEngine
 {
 protected:
     IRoxieInput *in;
+    IEngineRowStream *inStream;
     unsigned sourceId;
     unsigned sourceIdx;
     unsigned targetId;
@@ -51,9 +52,9 @@ protected:
     bool hasStopped;
 
 public:
-    InputProbe(IRoxieInput *_in, IDebuggableContext *_debugContext,
+    InputProbe(IRoxieInput *_in, IEngineRowStream *_inStream, IDebuggableContext *_debugContext,
         unsigned _sourceId, unsigned _sourceIdx, unsigned _targetId, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
-        : in(_in),  debugContext(_debugContext),
+        : in(_in),  inStream(_inStream), debugContext(_debugContext),
           sourceId(_sourceId), sourceIdx(_sourceIdx), targetId(_targetId), targetIdx(_targetIdx), iteration(_iteration), channel(_channel)
     {
         hasStarted = false;
@@ -75,7 +76,7 @@ public:
     }
     virtual void resetEOF()
     {
-        in->resetEOF();
+        inStream->resetEOF();
     }
     virtual unsigned numConcreteOutputs() const
     {
@@ -122,7 +123,7 @@ public:
     virtual void stop()
     {
         hasStopped = true;
-        in->stop();
+        inStream->stop();
     }
     virtual void reset()
     {
@@ -135,7 +136,7 @@ public:
     }
     virtual const void *nextRow()
     {
-        const void *ret = in->nextRow();
+        const void *ret = inStream->nextRow();
         if (ret)
         {
             size32_t size = inMeta->getRecordSize(ret);
@@ -148,7 +149,7 @@ public:
     }
     virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
-        const void *ret = in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void *ret = inStream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (ret && wasCompleteMatch)  // GH is this test right?
         {
             size32_t size = inMeta->getRecordSize(ret);
@@ -167,8 +168,8 @@ class TraceProbe : public InputProbe
 public:
     IMPLEMENT_IINTERFACE;
 
-    TraceProbe(IRoxieInput *_in, unsigned _sourceId, unsigned _targetId, unsigned _sourceIdx, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
-        : InputProbe(_in, NULL, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel)
+    TraceProbe(IRoxieInput *_in, IEngineRowStream *_inStream, unsigned _sourceId, unsigned _targetId, unsigned _sourceIdx, unsigned _targetIdx, unsigned _iteration, unsigned _channel)
+        : InputProbe(_in, _inStream, NULL, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel)
     {
     }
 
@@ -267,11 +268,11 @@ class CProbeManager : public CInterface, implements IProbeManager
 public:
     IMPLEMENT_IINTERFACE;
 
-    IInputBase *createProbe(IInputBase *in, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    IInputBase *createProbe(IInputBase *in, IEngineRowStream *_inStream, IActivityBase *inAct, IActivityBase *outAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
     {
         unsigned idIn = inAct->queryId();
         unsigned idOut = outAct->queryId();
-        TraceProbe *probe = new TraceProbe(static_cast<IRoxieInput*>(in), idIn, idOut, sourceIdx, targetIdx, iteration, 0);
+        TraceProbe *probe = new TraceProbe(static_cast<IRoxieInput*>(in), _inStream, idIn, idOut, sourceIdx, targetIdx, iteration, 0);
         probes.append(*probe);
         return probe;
     }
@@ -521,8 +522,8 @@ class DebugProbe : public InputProbe, implements IActivityDebugContext
     }
 
 public:
-    DebugProbe(IInputBase *_in, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
-        : InputProbe(static_cast<IRoxieInput*>(_in), _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
+    DebugProbe(IInputBase *_in, IEngineRowStream *_inStream, unsigned _sourceId, unsigned _sourceIdx, DebugActivityRecord *_sourceAct, unsigned _targetId, unsigned _targetIdx, DebugActivityRecord *_targetAct, unsigned _iteration, unsigned _channel, IDebuggableContext *_debugContext)
+        : InputProbe(static_cast<IRoxieInput*>(_in), _inStream, _debugContext, _sourceId, _sourceIdx, _targetId, _targetIdx, _iteration, _channel),
           sourceAct(_sourceAct), targetAct(_targetAct)
     {
         historyCapacity = debugContext->getDefaultHistoryCapacity();
@@ -1006,7 +1007,7 @@ public:
         return CInterface::Release();
     }
 
-    virtual IInputBase *createProbe(IInputBase *in, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    virtual IInputBase *createProbe(IInputBase *in, IEngineRowStream *inStream, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
     {
         CriticalBlock b(crit);
         if (!iteration)
@@ -1016,7 +1017,7 @@ public:
         unsigned targetId = targetAct->queryId();
         DebugActivityRecord *sourceActRecord = noteActivity(sourceAct, iteration, channel, debugContext->querySequence());
         DebugActivityRecord *targetActRecord = noteActivity(targetAct, iteration, channel, debugContext->querySequence());
-        DebugProbe *probe = new DebugProbe(in, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
+        DebugProbe *probe = new DebugProbe(in, inStream, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
 #ifdef _DEBUG
         DBGLOG("Creating probe for edge id %s in graphManager %p", probe->queryEdgeId(), this);
 #endif

+ 14 - 7
roxie/ccd/ccdserver.cpp

@@ -14478,12 +14478,13 @@ private:
     unsigned sourceIdx;
     Linked<IRoxieServerActivity> sourceAct;
     Linked<IRoxieInput> sourceInput;
+    Linked<IEngineRowStream> sourceStream;
     unsigned numUses;
     unsigned iteration;
 
 public:
-    CGraphIterationInfo(IRoxieServerActivity * _sourceAct, IRoxieInput *_input, unsigned _sourceIdx, unsigned _iteration)
-        : sourceAct(_sourceAct), sourceInput(_input), sourceIdx(_sourceIdx), iteration(_iteration)
+    CGraphIterationInfo(IRoxieServerActivity * _sourceAct, IRoxieInput *_input, IEngineRowStream *_stream, unsigned _sourceIdx, unsigned _iteration)
+        : sourceAct(_sourceAct), sourceInput(_input), sourceStream(_stream),  sourceIdx(_sourceIdx), iteration(_iteration)
     {
         numUses = 0;
     }
@@ -14503,7 +14504,7 @@ public:
             IRoxieInput *input = sourceAct->queryOutput(sourceIdx);
             if (probeManager)
             {
-                IInputBase * inputBase = probeManager->createProbe(static_cast<IInputBase*>(input), sourceAct, splitter, sourceIdx, 0, iteration);
+                IInputBase * inputBase = probeManager->createProbe(static_cast<IInputBase*>(input), sourceStream, sourceAct, splitter, sourceIdx, 0, iteration);
                 input = static_cast<IRoxieInput*>(inputBase);
                 // MORE - shouldn't this be added to probes?
             }
@@ -14511,6 +14512,7 @@ public:
             sourceAct->setInput(0, input);
             sourceIdx = 0;
             sourceInput.clear();
+            sourceStream.clear();
         }
     }
 
@@ -14518,11 +14520,14 @@ public:
     {
         // MORE - not really necessary to create splitters in separate pass, is it?
         if (factory) // we created a splitter....
+        {
             sourceInput.set(sourceAct->queryOutput(sourceIdx));
+            sourceStream.set(&sourceInput->queryStream());
+        }
         IRoxieInput *ret = sourceInput;
         if (probeManager)
         {
-            IInputBase *inputBase = probeManager->createProbe(ret, sourceAct, targetAct, sourceIdx, targetIdx, iteration);
+            IInputBase *inputBase = probeManager->createProbe(sourceInput, sourceStream, sourceAct, targetAct, sourceIdx, targetIdx, iteration);
             ret = static_cast<IRoxieInput *>(inputBase);
             probes.append(*LINK(ret));
         }
@@ -14625,7 +14630,7 @@ public:
     {
         //result(0) is the input to the graph.
         resultInput = inputExtractMapper;
-        outputs.append(* new CGraphIterationInfo(resultInput->queryActivity(), resultInput, 0, 1));
+        outputs.append(* new CGraphIterationInfo(resultInput->queryActivity(), resultInput, &resultInput->queryStream(), 0, 1));
 
         for (createLoopCounter=1; createLoopCounter <= maxIterations; createLoopCounter++)
         {
@@ -25695,7 +25700,7 @@ public:
         IRoxieInput * output = sourceActivity.queryOutput(sourceIdx);
         if (probeManager)
         {
-            IInputBase * inputBase = probeManager->createProbe(static_cast<IInputBase*>(output), &sourceActivity, &targetActivity, sourceIdx, targetIdx, iteration);
+            IInputBase * inputBase = probeManager->createProbe(static_cast<IInputBase*>(output), &output->queryStream(), &sourceActivity, &targetActivity, sourceIdx, targetIdx, iteration);
             output = static_cast<IRoxieInput*>(inputBase);
             probes.append(*LINK(output));
         }
@@ -26136,7 +26141,9 @@ public:
     virtual CGraphIterationInfo *selectGraphLoopOutput()
     {
         IRoxieServerActivity &sourceActivity = activities.item(graphOutputActivityIndex);
-        return new CGraphIterationInfo(&sourceActivity, sourceActivity.queryOutput(0), 0, loopCounter);
+        IRoxieInput *sourceInput = sourceActivity.queryOutput(0);
+        IEngineRowStream *sourceStream = &sourceInput->queryStream();
+        return new CGraphIterationInfo(&sourceActivity, sourceInput, sourceStream, 0, loopCounter);
     }
 
     virtual void gatherIterationUsage(IRoxieServerLoopResultProcessor & processor)