瀏覽代碼

Merge pull request #8061 from richardkchapman/split-stream-phase-2e

HPCC-14656 Split concepts of an input and a row stream

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 年之前
父節點
當前提交
80615cbe5b

+ 5 - 0
common/thorhelper/roxiehelper.cpp

@@ -450,6 +450,11 @@ public:
     {
         input->stop();
     }
+
+    virtual void resetEOF()
+    {
+        input->resetEOF();
+    }
 };
 
 class GroupedInputReader : public InputReaderBase

+ 1 - 1
common/thorhelper/roxiehelper.ipp

@@ -30,10 +30,10 @@ struct IInputBase : public IInterface //base for IRoxieInput and IHThorInput
 {
     virtual IOutputMetaData * queryOutputMeta() const = 0;
     virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
-    virtual void resetEOF() = 0;
 
     // These will need some thought
     virtual IEngineRowStream &queryStream() = 0;
+    inline void resetEOF() { queryStream().resetEOF(); }
     inline bool nextGroup(ConstPointerArray & group) { return queryStream().nextGroup(group); }
     inline void readAll(RtlLinkedDatasetBuilder &builder) { return queryStream().readAll(builder); }
     inline const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { return queryStream().nextRowGE(seek, numFields, wasCompleteMatch, stepExtra); }

+ 4 - 0
common/thorhelper/roxiestream.hpp

@@ -33,6 +33,10 @@ interface THORHELPER_API IEngineRowStream : public IRowStream
     virtual bool nextGroup(ConstPointerArray & group);      // note: default implementation can be overridden for efficiency...
     virtual void readAll(RtlLinkedDatasetBuilder &builder); // note: default implementation can be overridden for efficiency...
     virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
+
+    // Reinitialize the stream - called when smart-stepping potentially jumps forward in one of the inputs feeding into
+    // a join - other inputs may need to discard current state such as eof indicators, partially-delivered groups etc.
+    virtual void resetEOF() = 0;
 };
 
 #endif // ROXIESTREAM_HPP

+ 5 - 0
ecl/eclagent/eclagent.ipp

@@ -878,6 +878,11 @@ private:
             in->stop();
         }
 
+        virtual void resetEOF()
+        {
+            in->resetEOF();
+        }
+
         IEngineRowStream &queryStream()
         {
             return *this;

+ 18 - 0
ecl/hthor/hthor.cpp

@@ -213,6 +213,12 @@ void CHThorActivityBase::stop()
         input->stop();
 }
 
+void CHThorActivityBase::resetEOF()
+{
+    if (input)
+        input->resetEOF();
+}
+
 void CHThorActivityBase::updateProgress(IStatisticGatherer &progress) const
 {
     updateProgressForOther(progress, activityId, subgraphId);
@@ -6415,6 +6421,13 @@ void CHThorMultiInputActivity::stop()
         inputs.item(idx)->stop();
 }
 
+void CHThorMultiInputActivity::resetEOF()
+{
+    CHThorSimpleActivityBase::resetEOF();
+    ForEachItemIn(idx, inputs)
+        inputs.item(idx)->resetEOF();
+}
+
 void CHThorMultiInputActivity::setInput(unsigned index, IHThorInput *_input)
 {
     if (index==inputs.length())
@@ -9699,6 +9712,11 @@ void LibraryCallOutput::stop()
     result.clear();
 }
 
+void LibraryCallOutput::resetEOF()
+{
+    throwUnexpected();
+}
+
 void LibraryCallOutput::updateProgress(IStatisticGatherer &progress) const
 {
     owner->updateOutputProgress(progress, *this, processed);

+ 12 - 0
ecl/hthor/hthor.ipp

@@ -218,6 +218,7 @@ public:
     virtual void execute();
     virtual void extractResult(unsigned & len, void * & ret);
     virtual void stop();
+    virtual void resetEOF();
     virtual void setBoundGraph(IHThorBoundLoopGraph * graph) { UNIMPLEMENTED; }
     virtual __int64 getCount();
     virtual unsigned queryOutputs() { return 1; }
@@ -1731,6 +1732,7 @@ public:
 
     virtual void ready();
     virtual void stop();
+    virtual void resetEOF();
     virtual void setInput(unsigned, IHThorInput *);
 
     //interface IHThorInput
@@ -2586,6 +2588,11 @@ public:
     {
     }
 
+    virtual void resetEOF()
+    {
+        throwUnexpected();  // Should never be called on a source stream
+    }
+
 protected:
     Owned<IHThorGraphResult> result;
     unsigned curRow;
@@ -2613,6 +2620,10 @@ public:
     virtual void stop()
     {
     }
+    virtual void resetEOF()
+    {
+        throwUnexpected();  // Should never be called on a source stream
+    }
 
 protected:
     ConstPointerArray * array;
@@ -2746,6 +2757,7 @@ public:
 
     virtual void ready();
     virtual void stop();
+    virtual void resetEOF();
     virtual IEngineRowStream &queryStream() { return *this; }
     virtual void updateProgress(IStatisticGatherer &progress) const;
 

+ 4 - 0
ecl/hthor/hthorstep.cpp

@@ -185,6 +185,10 @@ void CHThorMergeJoinBaseActivity::stop()
     CHThorNaryActivity::stop();
 }
 
+void CHThorMergeJoinBaseActivity::resetEOF()
+{
+    processor.queryResetEOF();
+}
 
 bool CHThorMergeJoinBaseActivity::gatherConjunctions(ISteppedConjunctionCollector & collector)
 {

+ 1 - 0
ecl/hthor/hthorstep.ipp

@@ -86,6 +86,7 @@ public:
     //interface IHThorInput
     virtual void ready();
     virtual void stop();
+    virtual void resetEOF();
     virtual const void * nextRow();
     virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();