Browse Source

Merge pull request #8031 from richardkchapman/split-stream-input

HPCC-14656 Split concepts of an input and a row stream

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
d2e70c3582

+ 22 - 47
common/thorhelper/roxiehelper.cpp

@@ -82,7 +82,7 @@ CRHRollingCache::~CRHRollingCache()
     }  
 }
 
-void CRHRollingCache::init(IInputBase *_in, unsigned _max)
+void CRHRollingCache::init(IRowStream *_in, unsigned _max)
 {
     max = _max;
     in =_in;
@@ -177,7 +177,7 @@ CRHDualCache::~CRHDualCache()
     }  
 }
 
-void CRHDualCache::init(IInputBase * _in)
+void CRHDualCache::init(IRowStream * _in)
 {
     in = _in;
     cache.clear();
@@ -243,21 +243,6 @@ bool CRHDualCache::get(unsigned n, CRHRollingCacheElem *&out)
     return true;
 }
 
-size32_t CRHDualCache::getRecordSize(const void *ptr)
-{
-    return in->queryOutputMeta()->getRecordSize(ptr);
-}
-
-size32_t CRHDualCache::getFixedSize() const
-{
-    return in->queryOutputMeta()->getFixedSize();
-}
-
-size32_t CRHDualCache::getMinRecordSize() const
-{
-    return in->queryOutputMeta()->getMinRecordSize();
-}
-
 CRHDualCache::cOut::cOut(CRHDualCache *_parent, unsigned &_pos) 
 : pos(_pos)
 {
@@ -275,11 +260,6 @@ const void * CRHDualCache::cOut::nextRow()
     return e->row;
 }
 
-IOutputMetaData * CRHDualCache::cOut::queryOutputMeta() const
-{
-    return parent->input()->queryOutputMeta();
-}
-
 void CRHDualCache::cOut::stop()
 {
     pos = (unsigned)-1;
@@ -295,7 +275,7 @@ IRHLimitedCompareHelper *createRHLimitedCompareHelper()
 
 //CRHLimitedCompareHelper
 void CRHLimitedCompareHelper::init( unsigned _atmost,
-                                 IInputBase *_in,
+                                 IRowStream *_in,
                                  ICompare * _cmp,
                                  ICompare * _limitedcmp )
 {
@@ -414,7 +394,7 @@ bool CRHLimitedCompareHelper::getGroup(OwnedRowArray &group, const void *left)
 //=========================================================================================
 
 // default implementations - can be overridden for efficiency...
-bool ISimpleInputBase::nextGroup(ConstPointerArray & group)
+bool IEngineRowStream::nextGroup(ConstPointerArray & group)
 {
     // MORE - this should be replaced with a version that reads to a builder
     const void * next;
@@ -425,7 +405,7 @@ bool ISimpleInputBase::nextGroup(ConstPointerArray & group)
     return false;
 }
 
-void ISimpleInputBase::readAll(RtlLinkedDatasetBuilder &builder)
+void IEngineRowStream::readAll(RtlLinkedDatasetBuilder &builder)
 {
     loop
     {
@@ -451,18 +431,13 @@ using roxiemem::OwnedConstRoxieRow;
 class InputReaderBase  : public CInterfaceOf<IGroupedInput>
 {
 protected:
-    IInputBase *input;
+    IEngineRowStream *input;
 public:
-    InputReaderBase(IInputBase *_input)
+    InputReaderBase(IEngineRowStream *_input)
     : input(_input)
     {
     }
 
-    virtual IOutputMetaData * queryOutputMeta() const
-    {
-        return input->queryOutputMeta();
-    }
-
     virtual void stop()
     {
         input->stop();
@@ -478,7 +453,7 @@ protected:
     OwnedConstRoxieRow next;
     const ICompare *compare;
 public:
-    GroupedInputReader(IInputBase *_input, const ICompare *_compare)
+    GroupedInputReader(IEngineRowStream *_input, const ICompare *_compare)
     : InputReaderBase(_input), compare(_compare)
     {
         firstRead = false;
@@ -518,7 +493,7 @@ public:
 class DegroupedInputReader : public InputReaderBase
 {
 public:
-    DegroupedInputReader(IInputBase *_input) : InputReaderBase(_input)
+    DegroupedInputReader(IEngineRowStream *_input) : InputReaderBase(_input)
     {
     }
     virtual const void *nextRow()
@@ -534,7 +509,7 @@ protected:
     Owned<ISortAlgorithm> sorter;
     bool firstRead;
 public:
-    SortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
+    SortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter)
       : InputReaderBase(_input), degroupedInput(_input), sorter(_sorter), firstRead(false)
     {
         sorter->reset();
@@ -559,7 +534,7 @@ protected:
     OwnedConstRoxieRow next;
     const ICompare *compare;
 public:
-    SortedGroupedInputReader(IInputBase *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
+    SortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
       : SortedInputReader(_input, _sorter), compare(_compare), eof(false), endGroupPending(false)
     {
     }
@@ -594,25 +569,25 @@ public:
     }
 };
 
-extern IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare)
+extern IGroupedInput *createGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare)
 {
     dbgassertex(_input && _groupCompare);
     return new GroupedInputReader(_input, _groupCompare);
 }
 
-extern IGroupedInput *createDegroupedInputReader(IInputBase *_input)
+extern IGroupedInput *createDegroupedInputReader(IEngineRowStream *_input)
 {
     dbgassertex(_input);
     return new DegroupedInputReader(_input);
 }
 
-extern IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
+extern IGroupedInput *createSortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter)
 {
     dbgassertex(_input && _sorter);
     return new SortedInputReader(_input, _sorter);
 }
 
-extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
+extern IGroupedInput *createSortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
 {
     dbgassertex(_input && _groupCompare && _sorter);
     return new SortedGroupedInputReader(_input, _groupCompare, _sorter);
@@ -686,7 +661,7 @@ class CQuickSortAlgorithm : public CInplaceSortAlgorithm
 public:
     CQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         curIndex = 0;
         if (input->nextGroup(sorted))
@@ -703,7 +678,7 @@ class CParallelQuickSortAlgorithm : public CInplaceSortAlgorithm
 public:
     CParallelQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         curIndex = 0;
         if (input->nextGroup(sorted))
@@ -720,7 +695,7 @@ class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
 public:
     CTbbQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         curIndex = 0;
         if (input->nextGroup(sorted))
@@ -739,7 +714,7 @@ public:
 
     virtual void sortRows(void * * rows, size_t numRows, void * * temp) = 0;
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         curIndex = 0;
         if (input->nextGroup(sorted))
@@ -988,7 +963,7 @@ public:
         blockNo = 0;
     }
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         blockNo = 0;
         curBlock = new SortedBlock(blockNo++, rowManager, activityId);
@@ -1194,7 +1169,7 @@ public:
         sequences.kill();
     }
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         inputAlreadySorted = true;
         curIndex = 0;
@@ -1267,7 +1242,7 @@ public:
 
     virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp) = 0;
 
-    virtual void prepare(IInputBase *input)
+    virtual void prepare(IEngineRowStream *input)
     {
         loop
         {

+ 6 - 6
common/thorhelper/roxiehelper.hpp

@@ -121,7 +121,7 @@ typedef enum {
 
 interface ISortAlgorithm : extends IInterface
 {
-    virtual void prepare(IInputBase *input) = 0;
+    virtual void prepare(IEngineRowStream *input) = 0;
     virtual const void *next() = 0;
     virtual void reset() = 0;
     virtual void getSortedGroup(ConstPointerArray & result) = 0;
@@ -144,14 +144,14 @@ extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm alg
 
 //=========================================================================================
 
-interface IGroupedInput : extends IInputBase
+interface IGroupedInput : extends IEngineRowStream  // MORE rename to IGroupedRowStream
 {
 };
 
-extern THORHELPER_API IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare);
-extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IInputBase *_input);
-extern THORHELPER_API IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter);
-extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
+extern THORHELPER_API IGroupedInput *createGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare);
+extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IEngineRowStream *_input);
+extern THORHELPER_API IGroupedInput *createSortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter);
+extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
 
 //=========================================================================================
 

+ 5 - 14
common/thorhelper/roxiehelper.ipp

@@ -19,24 +19,14 @@
 #define ROXIEHELPER_IPP
 
 #include "thorhelper.hpp"
-#include "rtlds_imp.hpp"
+#include "roxiestream.hpp"
 #include "jlog.hpp"
-#include "jio.hpp"
 
 extern THORHELPER_API unsigned traceLevel;
-
-//---------------------------------------------------
-// Base classes for all Roxie/HThor activities
-//---------------------------------------------------
-struct THORHELPER_API ISimpleInputBase : public IRowStream //base for IInputBase and IHThorSimpleInput
-{
-    virtual bool nextGroup(ConstPointerArray & group);      // note: default implementation can be overridden for efficiency...
-    virtual void readAll(RtlLinkedDatasetBuilder &builder); // note: default implementation can be overridden for efficiency...
-};
-
 interface IOutputMetaData;
-struct IInputBase : public ISimpleInputBase  //base for IRoxieInput and IHThorInput
+struct IInputBase : public IEngineRowStream // Should be derived from IInterface  //base for IRoxieInput and IHThorInput
 {
+    virtual IEngineRowStream &queryInput() const { UNIMPLEMENTED; };
     virtual IOutputMetaData * queryOutputMeta() const = 0;
 };
 
@@ -96,13 +86,14 @@ interface IRoxieContextLogger : extends IContextLogger
 //===================================================================================
 
 //IRHLimitedCompareHelper copied from THOR ILimitedCompareHelper, and modified to get input from IHThorInput instead of IReadSeqVar
+// Can probably common back up now
 class OwnedRowArray;
 interface ICompare;
 interface IRHLimitedCompareHelper: public IInterface
 {
     virtual void init(
             unsigned atmost,
-            IInputBase *strm,
+            IRowStream *strm,
             ICompare *compare,
             ICompare *limcompare
         )=0;

+ 13 - 18
common/thorhelper/roxielmj.hpp

@@ -49,11 +49,11 @@ class CRHRollingCache: extends CInterface
 {
     unsigned max; // max cache size
     QueueOf<CRHRollingCacheElem,true> cache;
-    IInputBase * in;
+    IRowStream * in;
     bool eos;
 public:
     ~CRHRollingCache();
-    void init(IInputBase *_in, unsigned _max);
+    void init(IRowStream *_in, unsigned _max);
     
 #ifdef TRACEROLLING
     void PrintCache();
@@ -64,12 +64,14 @@ public:
 };
 
 //===================================================================================
-//CRHDualCache copied from THOR CDualCache, and modified to get input from IInputBase instead 
+//CRHDualCache copied from THOR CDualCache, and modified to get input from IRowStream instead
 //of IReadSeqVar and to manage rows as OwnedRoxieRow types
-class THORHELPER_API CRHDualCache: public CInterface, public IRecordSize
+// Could probably be combined with CDualCache now ?
+
+class THORHELPER_API CRHDualCache: public CInterface
 {
     // similar to rolling cache - should be combined really
-    IInputBase * in;
+    IRowStream * in;
     MemoryAttr varbuf;
     bool eos;
     unsigned base;
@@ -82,8 +84,8 @@ public:
 
     CRHDualCache();
     ~CRHDualCache();
-    void init(IInputBase * _in);
-    inline IInputBase * input() {return in;}
+    void init(IRowStream * _in);
+    inline IRowStream * input() { return in; }
 
 #ifdef TRACEROLLING
     void PrintCache();
@@ -91,12 +93,7 @@ public:
 
     bool get(unsigned n, CRHRollingCacheElem *&out);
 
-    //interface IRecordSize:
-    virtual size32_t getRecordSize(const void *ptr);
-    virtual size32_t getFixedSize() const;
-    virtual size32_t getMinRecordSize() const;
-
-    class cOut: public CInterface, public IInputBase
+    class cOut: public CInterface, public IRowStream
     {
     private:
         CRHDualCache *parent;
@@ -106,14 +103,12 @@ public:
         IMPLEMENT_IINTERFACE;
         cOut(CRHDualCache *_parent, unsigned &_pos); 
         const void * nextRow();
-        IOutputMetaData * queryOutputMeta() const;
     private:
         void stop();
-        virtual IRecordSize * queryRecordSize() { return parent; }
     } *strm1, *strm2;
 
-    IInputBase *queryOut1() { return strm1; }
-    IInputBase *queryOut2() { return strm2; }
+    IRowStream *queryOut1() { return strm1; }
+    IRowStream *queryOut2() { return strm2; }
 
 };
 
@@ -134,7 +129,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     void init( unsigned _atmost,
-               IInputBase *_in,
+               IRowStream *_in,
                ICompare * _cmp,
                ICompare * _limitedcmp );
 

+ 38 - 0
common/thorhelper/roxiestream.hpp

@@ -0,0 +1,38 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ROXIESTREAM_HPP
+#define ROXIESTREAM_HPP
+
+#include "thorhelper.hpp"
+#include "rtlds_imp.hpp"
+#include "jio.hpp"
+
+//---------------------------------------------------
+// Base classes for all Roxie/HThor/Thor input streams
+//---------------------------------------------------
+
+class SmartStepExtra;
+
+interface THORHELPER_API IEngineRowStream : public IRowStream
+{
+    virtual bool nextGroup(ConstPointerArray & group);      // note: default implementation can be overridden for efficiency...
+    virtual void readAll(RtlLinkedDatasetBuilder &builder); // note: default implementation can be overridden for efficiency...
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { throwUnexpected(); }    // can only be called on stepping fields.
+};
+
+#endif // ROXIESTREAM_HPP

+ 4 - 4
ecl/eclagent/eclagent.cpp

@@ -3667,9 +3667,9 @@ public:
             return NULL;
     }
 
-    virtual const void * nextGE(const void * seek, unsigned numFields)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextGE(seek, numFields);
+        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual const void *nextRow()
@@ -4052,9 +4052,9 @@ public:
         InputProbe::ready();
     }
 
-    virtual const void * nextGE(const void * seek, unsigned numFields)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextGE(seek, numFields);
+        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual void stop()

+ 20 - 20
ecl/hthor/hthor.cpp

@@ -2383,12 +2383,12 @@ const void *CHThorGroupDedupKeepLeftActivity::nextRow()
     return ret;
 }
 
-const void * CHThorGroupDedupKeepLeftActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorGroupDedupKeepLeftActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     OwnedConstRoxieRow next;
     loop
     {
-        next.setown(input->nextGE(seek, numFields));
+        next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
         if (!prev || !next || !helper.matches(prev,next))
         {
             numKept = 0;
@@ -2720,12 +2720,12 @@ const void * CHThorFilterActivity::nextRow()
     }
 }
 
-const void * CHThorFilterActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorFilterActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (eof)
         return NULL;
 
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (!ret)
         return NULL;
 
@@ -2811,7 +2811,7 @@ const void * CHThorFilterGroupActivity::nextRow()
     }
 }
 
-const void * CHThorFilterGroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorFilterGroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (eof)
         return NULL;
@@ -2831,7 +2831,7 @@ const void * CHThorFilterGroupActivity::nextGE(const void * seek, unsigned numFi
         pending.clear();
     }
 
-    const void * ret = input->nextGE(seek, numFields);
+    const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     while (ret)
     {
         pending.append(ret);
@@ -2882,9 +2882,9 @@ const void * CHThorLimitActivity::nextRow()
     return ret.getClear();
 }
 
-const void * CHThorLimitActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorLimitActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (ret)
     {
         if (++numGot > rowLimit)
@@ -2963,11 +2963,11 @@ const void * CHThorCatchActivity::nextRow()
     throwUnexpected(); // onExceptionCaught should have thrown something
 }
 
-const void * CHThorCatchActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorCatchActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     try
     {
-        OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+        OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
         if (ret)
             processed++;
         return ret.getClear();
@@ -3648,9 +3648,9 @@ const void * CHThorDegroupActivity::nextRow()
     return ret;
 }
 
-const void * CHThorDegroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorDegroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    const void * ret = input->nextGE(seek, numFields);
+    const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     if (ret)
         processed++;
     return ret;
@@ -3717,7 +3717,7 @@ const void *CHThorGroupActivity::nextRow()
     return prev.getClear();
 }
 
-const void * CHThorGroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorGroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (firstDone)
     {
@@ -3727,7 +3727,7 @@ const void * CHThorGroupActivity::nextGE(const void * seek, unsigned numFields)
                 return nextRow();
         }
     }
-    next.setown(input->nextGE(seek, numFields));
+    next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     firstDone = true;
     return nextRow();
 }
@@ -4248,7 +4248,7 @@ const void *CHThorSortedActivity::nextRow()
     return prev.getClear();
 }
 
-const void * CHThorSortedActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorSortedActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (next)
     {
@@ -4257,7 +4257,7 @@ const void * CHThorSortedActivity::nextGE(const void * seek, unsigned numFields)
     }
 
     firstDone = true;
-    next.setown(input->nextGE(seek, numFields));
+    next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     return nextRow();
 }
 
@@ -4307,9 +4307,9 @@ const void *CHThorTraceActivity::nextRow()
     return ret.getClear();
 }
 
-const void * CHThorTraceActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorTraceActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (ret)
     {
         onTrace(ret);
@@ -9987,11 +9987,11 @@ const void * CHThorNWaySelectActivity::nextRow()
 }
 
 
-const void * CHThorNWaySelectActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorNWaySelectActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (!selectedInput)
         return NULL;
-    return selectedInput->nextGE(seek, numFields);
+    return selectedInput->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 }
 
 IInputSteppingMeta * CHThorNWaySelectActivity::querySteppingMeta()

+ 1 - 2
ecl/hthor/hthor.hpp

@@ -39,7 +39,7 @@ void HTHOR_API setHThorRowManager(roxiemem::IRowManager * manager); // do not ca
 class PointerArray;
 class EclGraphElement;
 
-inline const void * ungroupedNextRow(ISimpleInputBase * input)
+inline const void * ungroupedNextRow(IEngineRowStream * input)
 {
     const void * ret = input->nextRow();
     if (!ret)
@@ -56,7 +56,6 @@ struct IHThorInput : public IInputBase
 
     virtual void ready() = 0;
     virtual void updateProgress(IStatisticGatherer &progress) const = 0;
-    virtual const void * nextGE(const void * seek, unsigned numFields) { throwUnexpected(); }   // can only be called on stepping fields.
     virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual void resetEOF() { }

+ 16 - 15
ecl/hthor/hthor.ipp

@@ -122,7 +122,8 @@ public:
         if (seek)
         {
             //MORE: Should think about implementing isCompleteMatch in hthor
-            next = inputArray[i]->nextGE(seek, numFields);      // , inputIsCompleteMatch
+            bool inputIsCompleteMatch;
+            next = inputArray[i]->nextRowGE(seek, numFields, inputIsCompleteMatch, *stepExtra);
         }
         else
         {
@@ -469,7 +470,7 @@ public:
     //interface IHThorInput
     virtual const void *nextRow();
 
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
     virtual void resetEOF();
@@ -740,7 +741,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
     virtual void resetEOF();
@@ -760,7 +761,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorLimitActivity : public CHThorSteppableActivityBase
@@ -775,7 +776,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorSkipLimitActivity : public CHThorSimpleActivityBase
@@ -814,7 +815,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorSkipCatchActivity : public CHThorSimpleActivityBase
@@ -1014,7 +1015,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual bool isGrouped();
 };
 
@@ -1026,7 +1027,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual bool isGrouped();
 };
 
@@ -1241,7 +1242,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorTraceActivity : public CHThorSteppableActivityBase
@@ -1260,7 +1261,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 protected:
     void onTrace(const void *row);
 };
@@ -1376,7 +1377,7 @@ class CHThorSelfJoinActivity : public CHThorActivityBase
     Owned<IRHLimitedCompareHelper> limitedhelper;
     Owned<CRHDualCache> dualcache;
     Owned<IGroupedInput> groupedInput;
-    IInputBase *dualCacheInput;
+    IRowStream *dualCacheInput;
 private:
     bool fillGroup();
     const void * joinRecords(const void * curLeft, const void * curRight, unsigned counter, IException * except);
@@ -2565,7 +2566,7 @@ protected:
 };
 
 
-class LocalResultInput : public CInterfaceOf<ISimpleInputBase>
+class LocalResultInput : public CInterfaceOf<IEngineRowStream>
 {
 public:
     void init(IHThorGraphResult * _result)      
@@ -2590,7 +2591,7 @@ protected:
 
 
 
-class ConstPointerArrayInput : public CInterfaceOf<ISimpleInputBase>
+class ConstPointerArrayInput : public CInterfaceOf<IEngineRowStream>
 {
 public:
     void init(ConstPointerArray * _array)       { array = _array; curRow = 0; }
@@ -2619,7 +2620,7 @@ protected:
 class CHThorLoopActivity : public CHThorSimpleActivityBase
 {
     IHThorLoopArg &helper;
-    ISimpleInputBase * curInput;
+    IEngineRowStream * curInput;
     ConstPointerArray loopPending; //MORE: would be safer and neater to use an OwnedRowArray, but would need to change prototype of IHThorBoundLoopGraph::execute
     ConstPointerArrayInput arrayInput;
     LocalResultInput resultInput; 
@@ -2801,7 +2802,7 @@ public:
     virtual void stop();
     virtual void ready();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
 };
 

+ 3 - 2
ecl/hthor/hthorkey.cpp

@@ -772,7 +772,7 @@ public:
     //interface IHThorInput
     virtual void ready();
     virtual const void *nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual IInputSteppingMeta * querySteppingMeta();
 
@@ -974,8 +974,9 @@ const void *CHThorIndexReadActivity::nextRow()
 }
 
 
-const void *CHThorIndexReadActivity::nextGE(const void * seek, unsigned numFields)
+const void *CHThorIndexReadActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
+    // MORE - should set wasCompleteMatch
     if(keyedLimitReached && !keyedLimitSkips)
         helper.onKeyedLimitExceeded(); // should throw exception
 

+ 4 - 60
ecl/hthor/hthorstep.cpp

@@ -54,8 +54,8 @@ const void * CHThorSteppedInput::nextInputRow()
 
 const void * CHThorSteppedInput::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
 {
-    //Currently isCompleteMatch is not handled by hthor
-    return input->nextGE(seek, numFields);
+    //Currently isCompleteMatch is not properly handled by hthor
+    return input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 }
 
 IInputSteppingMeta * CHThorSteppedInput::queryInputSteppingMeta()
@@ -163,9 +163,8 @@ IInputSteppingMeta * CHThorNWayMergeActivity::querySteppingMeta()
     return &meta;
 }
 
-const void * CHThorNWayMergeActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorNWayMergeActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    SmartStepExtra stepExtra(SSEFreadAhead, NULL);
     bool matched = true;
     const void * next = merger.nextRowGE(seek, numFields, matched, stepExtra);
     if (next)
@@ -223,9 +222,8 @@ const void * CHThorMergeJoinBaseActivity::nextRow()
     return next;
 }
 
-const void * CHThorMergeJoinBaseActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorMergeJoinBaseActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    SmartStepExtra stepExtra(SSEFreadAhead, NULL);
     bool matched = true;
     const void * next = processor.nextGE(seek, numFields, matched, stepExtra);
     if (next)
@@ -265,60 +263,6 @@ CHThorProximityJoinActivity::CHThorProximityJoinActivity(IAgentContext & _agent,
 
 //---------------------------------------------------------------------------
 
-#ifdef archived_old_code
-
-CHThorNWayJoinActivity::CHThorNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg), helper(_arg), processor(_inputAllocator, _outputAllocator, _arg)
-{
-}
-
-void CHThorNWayJoinActivity::stop()
-{
-    processor.afterProcessing();
-    CHThorNaryActivity::stop();
-}
-
-void CHThorNWayJoinActivity::ready()
-{
-    CHThorNaryActivity::ready();
-
-    UnsignedArray inputValues;
-    ForEachItemIn(i1, expandedInputs)
-    {
-        IHThorInput * cur = expandedInputs.item(i1);
-        Owned<CHThorSteppedInput> stepInput = new CHThorSteppedInput(cur);
-        inputValues.append(processor.addInput(stepInput, cur->querySteppingMeta()));
-    }
-    processor.addJoin(helper, inputValues);
-    processor.beforeProcessing();
-}
-
-
-IInputSteppingMeta * CHThorNWayJoinActivity::querySteppingMeta() 
-{ 
-    return processor.querySteppingMeta();
-}
-
-const void * CHThorNWayJoinActivity::nextRow()
-{
-    const void * next = processor.nextRow();
-    if (next)
-        processed++;
-    return next;
-}
-
-const void * CHThorNWayJoinActivity::nextGE(const void * seek, unsigned numFields)
-{
-    const void * next = processor.nextGE(seek, numFields);
-    if (next)
-        processed++;
-    return next;
-}
-
-#endif
-
-//---------------------------------------------------------------------------
-
-
 extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind)
 {
     unsigned flags = _arg.getJoinFlags();

+ 2 - 28
ecl/hthor/hthorstep.ipp

@@ -67,7 +67,7 @@ public:
     virtual void ready();
     virtual void stop();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
 
 protected:
@@ -87,7 +87,7 @@ public:
     virtual void ready();
     virtual void stop();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
 
@@ -138,30 +138,4 @@ protected:
     CProximityJoinProcessor proximityProcessor;
 };
 
-
-#ifdef archived_old_code
-
-class CHThorNWayJoinActivity : public CHThorNaryActivity
-{
-public:
-    CHThorNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator);
-
-    //interface IHThorInput
-    virtual void ready();
-    virtual void stop();
-    virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
-    virtual IInputSteppingMeta * querySteppingMeta();
-
-protected:
-    void afterProcessing();
-    void beforeProcessing();
-
-protected:
-    IHThorNWayMergeJoinArg & helper;
-    CNaryJoinProcessor processor;
-};
-
-#endif
-
 #endif

+ 6 - 6
roxie/ccd/ccddebug.cpp

@@ -161,9 +161,9 @@ public:
         }
         return ret;
     }
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
-        const void *ret = in->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void *ret = in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (ret && wasCompleteMatch)  // GH is this test right?
         {
             size32_t size = inMeta->getRecordSize(ret);
@@ -238,10 +238,10 @@ public:
         return ret;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         // MORE - should probably only note them when wasCompleteMatch is true?
-        return _next(InputProbe::nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra));
+        return _next(InputProbe::nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     }
     virtual const void *nextRow()
     {
@@ -919,7 +919,7 @@ public:
         }
     }
 
-    virtual const void *nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         // MORE - not sure that skip is safe here? Should the incomplete matches even be returned?
         // Code is a little complex to avoid interpreting a skip on all rows in a group as EOF
@@ -930,7 +930,7 @@ public:
                 return NULL;
             loop
             {
-                const void *ret = InputProbe::nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+                const void *ret = InputProbe::nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
                 if (!ret)
                 {
                     if (EOGseen)

+ 49 - 49
roxie/ccd/ccdserver.cpp

@@ -2932,7 +2932,7 @@ class CRemoteResultAdaptor :public CInterface, implements IRoxieInput, implement
             return skipped;
         }
 
-        const void * nextSteppedGE(const void * seek, const void *rawSeek, unsigned numFields, unsigned seeklen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+        const void * nextRowGE(const void * seek, const void *rawSeek, unsigned numFields, unsigned seeklen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
         {
             // We discard all rows < seekval from all entries in heap
             // If this results in additional slave requests, we return NULL so that we can wait for them
@@ -3826,7 +3826,7 @@ public:
         return owner->queryInput(idx);
     }
 
-    const void * nextSteppedGE(const void *seek, const void *rawSeek, unsigned numFields, unsigned seekLen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    const void * nextRowGE(const void *seek, const void *rawSeek, unsigned numFields, unsigned seekLen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         if (activity.queryLogCtx().queryTraceLevel() > 20)
         {
@@ -3836,7 +3836,7 @@ public:
             {
                 recstr.appendf("%02x ", ((unsigned char *) rawSeek)[i]);
             }
-            activity.queryLogCtx().CTXLOG("CRemoteResultAdaptor::nextSteppedGE(rawSeek=%s numFields=%d, seeklen=%d, returnMismatches=%d)", recstr.str(), numFields, seekLen, stepExtra.returnMismatches());
+            activity.queryLogCtx().CTXLOG("CRemoteResultAdaptor::nextRowGE(rawSeek=%s numFields=%d, seeklen=%d, returnMismatches=%d)", recstr.str(), numFields, seekLen, stepExtra.returnMismatches());
         }
         assertex(mergeOrder);
         if (deferredStart)
@@ -3849,7 +3849,7 @@ public:
                 {
                     p.setDelayed(false);
                     if (activity.queryLogCtx().queryTraceLevel() > 10)
-                        activity.queryLogCtx().CTXLOG("About to send deferred start from nextSteppedGE, setting requireExact to %d", !stepExtra.returnMismatches());
+                        activity.queryLogCtx().CTXLOG("About to send deferred start from nextRowGE, setting requireExact to %d", !stepExtra.returnMismatches());
                     MemoryBuffer serializedSkip;
                     activity.serializeSkipInfo(serializedSkip, seekLen, rawSeek, numFields, seek, stepExtra);
                     p.setPacket(p.queryPacket()->insertSkipData(serializedSkip.length(), serializedSkip.toByteArray()));
@@ -3868,7 +3868,7 @@ public:
         {
             if (merger.ready())
             {
-                const void *got = merger.nextSteppedGE(seek, rawSeek, numFields, seekLen, wasCompleteMatch, stepExtra);
+                const void *got = merger.nextRowGE(seek, rawSeek, numFields, seekLen, wasCompleteMatch, stepExtra);
                 if (got)
                 {
                     processRow(got);
@@ -4405,12 +4405,12 @@ public:
         CRemoteResultAdaptor::onStart(_parentExtractSize, _parentExtract);
     }
 
-    virtual const void * nextSteppedGE(const void *seek, const void *rawSeek, unsigned numFields, unsigned seeklen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void *seek, const void *rawSeek, unsigned numFields, unsigned seeklen, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         // MORE - not sure what we need to do about the skip case... but we need at least this to prevent issues with exception getting lost
         if (exception)
             throw exception.getClear();
-        return CRemoteResultAdaptor::nextSteppedGE(seek, rawSeek, numFields, seeklen, wasCompleteMatch, stepExtra);
+        return CRemoteResultAdaptor::nextRowGE(seek, rawSeek, numFields, seeklen, wasCompleteMatch, stepExtra);
     }
 
     virtual const void *nextRow()
@@ -4568,10 +4568,10 @@ public:
         return true;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
-        const void * next = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * next = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (next)
             processed++;
         return next;
@@ -5528,9 +5528,9 @@ public:
         return input->nextRow();
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
-        return input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual unsigned __int64 queryTotalCycles() const
@@ -6151,10 +6151,10 @@ public:
         return iter->querySteppingMeta();
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         assertex(iter);
-        return iter->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return iter->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 };
 
@@ -6274,10 +6274,10 @@ public:
         return true;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
-        const void * next = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * next = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (next)
             processed++;
         return next;
@@ -6405,13 +6405,13 @@ public:
         return next;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         const void * next;
         loop
         {
-            next = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+            next = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 
             //If the record was an in-exact match from the index then return it immediately
             //and don't cause it to dedup following legal records.
@@ -7811,10 +7811,10 @@ public:
         return ret;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
-        const void *ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void *ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (ret && prev && compare->docompare(prev, ret) > 0)
         {
             // MORE - better to give mismatching rows that indexes?
@@ -9014,17 +9014,17 @@ public:
         }
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         //Could assert that this isn't grouped
-        // MORE - will need rethinking once we rethink the nextSteppedGE interface for global smart-stepping.
+        // MORE - will need rethinking once we rethink the nextRowGE interface for global smart-stepping.
         ActivityTimer t(totalCycles, timeActivities);
         if (eof)
             return NULL;
 
         loop
         {
-            const void * ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+            const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
             if (!ret)
             {
                 eof = true;
@@ -9186,7 +9186,7 @@ public:
         }
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (eof)
@@ -9206,7 +9206,7 @@ public:
             }
             curIndex = 0;
             gathered.kill();
-            //nextSteppedGE never returns an end of group marker.
+            //nextRowGE never returns an end of group marker.
         }
 
         //Not completely sure about this - it could lead the the start of a group being skipped, 
@@ -9221,15 +9221,15 @@ public:
             if (stepExtra.returnMismatches())
             {
                 bool matchedCompletely = true;
-                ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+                ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
                 if (!wasCompleteMatch)
                     return ret;
             }
             else
-                ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+                ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 #endif
 
-        const void * ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         while (ret)
         {
             gathered.append(ret);
@@ -10125,12 +10125,12 @@ public:
         return ret;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (eof)
             return NULL;
-        const void * ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (ret)
             processed++;
         else
@@ -15453,7 +15453,7 @@ protected:
     inline const void * doNextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         assertex(wasCompleteMatch);
-        return input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
 
@@ -15518,7 +15518,7 @@ public:
         const void * next;
         bool matches = true;
         if (seek)
-            next = inputArray[i]->nextSteppedGE(seek, numFields, matches, *stepExtra);
+            next = inputArray[i]->nextRowGE(seek, numFields, matches, *stepExtra);
         else
             next = inputArray[i]->ungroupedNextRow();
         pending[i] = next;
@@ -15575,7 +15575,7 @@ public:
         return next;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         const void * next = merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
@@ -15689,7 +15689,7 @@ public:
         return next;
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         const void * next = processor.nextGE(seek, numFields, wasCompleteMatch, stepExtra);
@@ -15858,12 +15858,12 @@ public:
             selectedInput->resetEOF(); 
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (!selectedInput)
             return NULL;
-        return selectedInput->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        return selectedInput->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     IInputSteppingMeta * querySteppingMeta()
@@ -16523,7 +16523,7 @@ class CRoxieServerSelfJoinActivity : public CRoxieServerActivity
     Owned<IRHLimitedCompareHelper> limitedhelper;
     Owned<CRHDualCache> dualcache;
     Owned<IGroupedInput> groupedInput;
-    IInputBase *dualCacheInput;
+    IRowStream *dualCacheInput;
 
     bool fillGroup()
     {
@@ -18299,10 +18299,10 @@ public:
         }
         return ret;
     }
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
-        const void * ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (ret)
         {
             if (wasCompleteMatch)
@@ -18495,12 +18495,12 @@ public:
         throwUnexpected(); // onExceptionCaught should have thrown something
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         try
         {
             ActivityTimer t(totalCycles, timeActivities);
-            const void * ret = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+            const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
             if (ret && wasCompleteMatch)
                 processed++;
             return ret;
@@ -18816,11 +18816,11 @@ public:
         }
         return row;
     }
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
-        // MORE - will need rethinking once we rethink the nextSteppedGE interface for global smart-stepping.
+        // MORE - will need rethinking once we rethink the nextRowGE interface for global smart-stepping.
         ActivityTimer t(totalCycles, timeActivities);
-        const void * row = input->nextSteppedGE(seek, numFields, wasCompleteMatch, stepExtra);
+        const void * row = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
         if (row)
         {
             onTrace(row);
@@ -21848,7 +21848,7 @@ public:
 
     }
 
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         try
@@ -21867,7 +21867,7 @@ public:
                     rawSeek = (byte *)temp;
                 }
             }
-            const void *ret = remote.nextSteppedGE(seek, rawSeek, numFields, seeklen, wasCompleteMatch, stepExtra);
+            const void *ret = remote.nextRowGE(seek, rawSeek, numFields, seeklen, wasCompleteMatch, stepExtra);
             if (ret && wasCompleteMatch) // GH pleas confirm the wasCompleteMatch I just added here is right
                 processed++;
             return ret;
@@ -22101,7 +22101,7 @@ public:
     const void *nextRow()
     {
         bool matched = true;
-        return nextSteppedGE(NULL, 0, matched, dummySmartStepExtra);
+        return nextRowGE(NULL, 0, matched, dummySmartStepExtra);
     }
 
     unsigned __int64 checkCount(unsigned __int64 limit)
@@ -22121,7 +22121,7 @@ public:
         return result;
     }
 
-    virtual const void *nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (eof)
@@ -22211,7 +22211,7 @@ public:
 //          {
 //              seekStr.appendf("%02x ", ((unsigned char *) rawSeek)[i]);
 //          }
-//          DBGLOG("nextSteppedGE can skip offset %d size %d value %s", seekGEOffset, seekSize, seekStr.str());
+//          DBGLOG("nextRowGE can skip offset %d size %d value %s", seekGEOffset, seekSize, seekStr.str());
 #endif
         }
         const byte * originalRawSeek = rawSeek;
@@ -22234,7 +22234,7 @@ public:
 //          {
 //              recstr.appendf("%02x ", ((unsigned char *) keyRow)[i]);
 //          }
-//          DBGLOG("nextSteppedGE Got %s", recstr.str());
+//          DBGLOG("nextRowGE Got %s", recstr.str());
             if (originalRawSeek && memcmp(keyRow + seekGEOffset, originalRawSeek, seekSize) < 0)
                 assertex(!"smart seek failure");
 #endif

+ 0 - 1
roxie/ccd/ccdserver.hpp

@@ -100,7 +100,6 @@ interface IRoxieInput : extends IInputBase
 
     virtual unsigned __int64 queryTotalCycles() const = 0;
     virtual unsigned __int64 queryLocalCycles() const = 0;
-    virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra) { throwUnexpected(); }  // can only be called on stepping fields.
     virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual unsigned numConcreteOutputs() const { return 1; }

+ 2 - 2
thorlcr/slave/slave.hpp

@@ -36,6 +36,7 @@
 #include "eclhelper.hpp"        // for IRecordSize
 #include "thgraph.hpp"
 #include "thorstep.hpp"
+#include "roxiestream.hpp"
 
 
 /* ---- To implement IThorDataLink you need ----
@@ -73,11 +74,10 @@ struct ThorDataLinkMetaInfo
 #endif
 class CActivityBase;
 
-interface IThorDataLink : extends IRowStream
+interface IThorDataLink : extends IEngineRowStream
 {
     virtual void start() = 0;
     virtual bool isGrouped() = 0;
-    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { throwUnexpected(); }    // can only be called on stepping fields.
     virtual IInputSteppingMeta *querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual void resetEOF() { }