Browse Source

HPCC-14656 Split concepts of an input and a row stream

Swicth some usage of IInputBase to IRowStream, clean up unused code.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
dd00a92bcb

+ 3 - 23
common/thorhelper/roxiehelper.cpp

@@ -82,7 +82,7 @@ CRHRollingCache::~CRHRollingCache()
     }  
 }
 
-void CRHRollingCache::init(IInputBase *_in, unsigned _max)
+void CRHRollingCache::init(IRowStream *_in, unsigned _max)
 {
     max = _max;
     in =_in;
@@ -177,7 +177,7 @@ CRHDualCache::~CRHDualCache()
     }  
 }
 
-void CRHDualCache::init(IInputBase * _in)
+void CRHDualCache::init(IRowStream * _in)
 {
     in = _in;
     cache.clear();
@@ -243,21 +243,6 @@ bool CRHDualCache::get(unsigned n, CRHRollingCacheElem *&out)
     return true;
 }
 
-size32_t CRHDualCache::getRecordSize(const void *ptr)
-{
-    return in->queryOutputMeta()->getRecordSize(ptr);
-}
-
-size32_t CRHDualCache::getFixedSize() const
-{
-    return in->queryOutputMeta()->getFixedSize();
-}
-
-size32_t CRHDualCache::getMinRecordSize() const
-{
-    return in->queryOutputMeta()->getMinRecordSize();
-}
-
 CRHDualCache::cOut::cOut(CRHDualCache *_parent, unsigned &_pos) 
 : pos(_pos)
 {
@@ -275,11 +260,6 @@ const void * CRHDualCache::cOut::nextRow()
     return e->row;
 }
 
-IOutputMetaData * CRHDualCache::cOut::queryOutputMeta() const
-{
-    return parent->input()->queryOutputMeta();
-}
-
 void CRHDualCache::cOut::stop()
 {
     pos = (unsigned)-1;
@@ -295,7 +275,7 @@ IRHLimitedCompareHelper *createRHLimitedCompareHelper()
 
 //CRHLimitedCompareHelper
 void CRHLimitedCompareHelper::init( unsigned _atmost,
-                                 IInputBase *_in,
+                                 IRowStream *_in,
                                  ICompare * _cmp,
                                  ICompare * _limitedcmp )
 {

+ 2 - 1
common/thorhelper/roxiehelper.ipp

@@ -85,13 +85,14 @@ interface IRoxieContextLogger : extends IContextLogger
 //===================================================================================
 
 //IRHLimitedCompareHelper copied from THOR ILimitedCompareHelper, and modified to get input from IHThorInput instead of IReadSeqVar
+// Can probably common back up now
 class OwnedRowArray;
 interface ICompare;
 interface IRHLimitedCompareHelper: public IInterface
 {
     virtual void init(
             unsigned atmost,
-            IInputBase *strm,
+            IRowStream *strm,
             ICompare *compare,
             ICompare *limcompare
         )=0;

+ 13 - 18
common/thorhelper/roxielmj.hpp

@@ -49,11 +49,11 @@ class CRHRollingCache: extends CInterface
 {
     unsigned max; // max cache size
     QueueOf<CRHRollingCacheElem,true> cache;
-    IInputBase * in;
+    IRowStream * in;
     bool eos;
 public:
     ~CRHRollingCache();
-    void init(IInputBase *_in, unsigned _max);
+    void init(IRowStream *_in, unsigned _max);
     
 #ifdef TRACEROLLING
     void PrintCache();
@@ -64,12 +64,14 @@ public:
 };
 
 //===================================================================================
-//CRHDualCache copied from THOR CDualCache, and modified to get input from IInputBase instead 
+//CRHDualCache copied from THOR CDualCache, and modified to get input from IRowStream instead
 //of IReadSeqVar and to manage rows as OwnedRoxieRow types
-class THORHELPER_API CRHDualCache: public CInterface, public IRecordSize
+// Could probably be combined with CDualCache now ?
+
+class THORHELPER_API CRHDualCache: public CInterface
 {
     // similar to rolling cache - should be combined really
-    IInputBase * in;
+    IRowStream * in;
     MemoryAttr varbuf;
     bool eos;
     unsigned base;
@@ -82,8 +84,8 @@ public:
 
     CRHDualCache();
     ~CRHDualCache();
-    void init(IInputBase * _in);
-    inline IInputBase * input() {return in;}
+    void init(IRowStream * _in);
+    inline IRowStream * input() { return in; }
 
 #ifdef TRACEROLLING
     void PrintCache();
@@ -91,12 +93,7 @@ public:
 
     bool get(unsigned n, CRHRollingCacheElem *&out);
 
-    //interface IRecordSize:
-    virtual size32_t getRecordSize(const void *ptr);
-    virtual size32_t getFixedSize() const;
-    virtual size32_t getMinRecordSize() const;
-
-    class cOut: public CInterface, public IInputBase
+    class cOut: public CInterface, public IRowStream
     {
     private:
         CRHDualCache *parent;
@@ -106,14 +103,12 @@ public:
         IMPLEMENT_IINTERFACE;
         cOut(CRHDualCache *_parent, unsigned &_pos); 
         const void * nextRow();
-        IOutputMetaData * queryOutputMeta() const;
     private:
         void stop();
-        virtual IRecordSize * queryRecordSize() { return parent; }
     } *strm1, *strm2;
 
-    IInputBase *queryOut1() { return strm1; }
-    IInputBase *queryOut2() { return strm2; }
+    IRowStream *queryOut1() { return strm1; }
+    IRowStream *queryOut2() { return strm2; }
 
 };
 
@@ -134,7 +129,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     void init( unsigned _atmost,
-               IInputBase *_in,
+               IRowStream *_in,
                ICompare * _cmp,
                ICompare * _limitedcmp );
 

+ 2 - 1
common/thorhelper/roxiestream.hpp

@@ -28,8 +28,9 @@
 
 class SmartStepExtra;
 
-interface THORHELPER_API IEngineRowStream : public IRowStream //base for IInputBase and IHThorSimpleInput
+interface THORHELPER_API IEngineRowStream : public IRowStream
 {
+//    virtual IRowStream &queryStream() const = 0;
     virtual bool nextGroup(ConstPointerArray & group);      // note: default implementation can be overridden for efficiency...
     virtual void readAll(RtlLinkedDatasetBuilder &builder); // note: default implementation can be overridden for efficiency...
     virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { throwUnexpected(); }    // can only be called on stepping fields.

+ 1 - 1
ecl/hthor/hthor.ipp

@@ -1377,7 +1377,7 @@ class CHThorSelfJoinActivity : public CHThorActivityBase
     Owned<IRHLimitedCompareHelper> limitedhelper;
     Owned<CRHDualCache> dualcache;
     Owned<IGroupedInput> groupedInput;
-    IInputBase *dualCacheInput;
+    IRowStream *dualCacheInput;
 private:
     bool fillGroup();
     const void * joinRecords(const void * curLeft, const void * curRight, unsigned counter, IException * except);

+ 1 - 1
roxie/ccd/ccdserver.cpp

@@ -16523,7 +16523,7 @@ class CRoxieServerSelfJoinActivity : public CRoxieServerActivity
     Owned<IRHLimitedCompareHelper> limitedhelper;
     Owned<CRHDualCache> dualcache;
     Owned<IGroupedInput> groupedInput;
-    IInputBase *dualCacheInput;
+    IRowStream *dualCacheInput;
 
     bool fillGroup()
     {