Explorar o código

HPCC-14656 Split concepts of an input and a row stream

Common up the hthor nextGE method with the ROxie/thor equivalents. Note that
this does not fully implement the completeMatch-related stuff in hthor.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=9) %!d(string=hai) anos
pai
achega
305c9dea34

+ 4 - 4
ecl/eclagent/eclagent.cpp

@@ -3667,9 +3667,9 @@ public:
             return NULL;
     }
 
-    virtual const void * nextGE(const void * seek, unsigned numFields)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextGE(seek, numFields);
+        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual const void *nextRow()
@@ -4052,9 +4052,9 @@ public:
         InputProbe::ready();
     }
 
-    virtual const void * nextGE(const void * seek, unsigned numFields)
+    virtual const void * nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
     {
-        return in->nextGE(seek, numFields);
+        return in->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
 
     virtual void stop()

+ 20 - 20
ecl/hthor/hthor.cpp

@@ -2383,12 +2383,12 @@ const void *CHThorGroupDedupKeepLeftActivity::nextRow()
     return ret;
 }
 
-const void * CHThorGroupDedupKeepLeftActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorGroupDedupKeepLeftActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     OwnedConstRoxieRow next;
     loop
     {
-        next.setown(input->nextGE(seek, numFields));
+        next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
         if (!prev || !next || !helper.matches(prev,next))
         {
             numKept = 0;
@@ -2720,12 +2720,12 @@ const void * CHThorFilterActivity::nextRow()
     }
 }
 
-const void * CHThorFilterActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorFilterActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (eof)
         return NULL;
 
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (!ret)
         return NULL;
 
@@ -2811,7 +2811,7 @@ const void * CHThorFilterGroupActivity::nextRow()
     }
 }
 
-const void * CHThorFilterGroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorFilterGroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (eof)
         return NULL;
@@ -2831,7 +2831,7 @@ const void * CHThorFilterGroupActivity::nextGE(const void * seek, unsigned numFi
         pending.clear();
     }
 
-    const void * ret = input->nextGE(seek, numFields);
+    const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     while (ret)
     {
         pending.append(ret);
@@ -2882,9 +2882,9 @@ const void * CHThorLimitActivity::nextRow()
     return ret.getClear();
 }
 
-const void * CHThorLimitActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorLimitActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (ret)
     {
         if (++numGot > rowLimit)
@@ -2963,11 +2963,11 @@ const void * CHThorCatchActivity::nextRow()
     throwUnexpected(); // onExceptionCaught should have thrown something
 }
 
-const void * CHThorCatchActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorCatchActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     try
     {
-        OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+        OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
         if (ret)
             processed++;
         return ret.getClear();
@@ -3648,9 +3648,9 @@ const void * CHThorDegroupActivity::nextRow()
     return ret;
 }
 
-const void * CHThorDegroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorDegroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    const void * ret = input->nextGE(seek, numFields);
+    const void * ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     if (ret)
         processed++;
     return ret;
@@ -3717,7 +3717,7 @@ const void *CHThorGroupActivity::nextRow()
     return prev.getClear();
 }
 
-const void * CHThorGroupActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorGroupActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (firstDone)
     {
@@ -3727,7 +3727,7 @@ const void * CHThorGroupActivity::nextGE(const void * seek, unsigned numFields)
                 return nextRow();
         }
     }
-    next.setown(input->nextGE(seek, numFields));
+    next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     firstDone = true;
     return nextRow();
 }
@@ -4248,7 +4248,7 @@ const void *CHThorSortedActivity::nextRow()
     return prev.getClear();
 }
 
-const void * CHThorSortedActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorSortedActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (next)
     {
@@ -4257,7 +4257,7 @@ const void * CHThorSortedActivity::nextGE(const void * seek, unsigned numFields)
     }
 
     firstDone = true;
-    next.setown(input->nextGE(seek, numFields));
+    next.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     return nextRow();
 }
 
@@ -4307,9 +4307,9 @@ const void *CHThorTraceActivity::nextRow()
     return ret.getClear();
 }
 
-const void * CHThorTraceActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorTraceActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    OwnedConstRoxieRow ret(input->nextGE(seek, numFields));
+    OwnedConstRoxieRow ret(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
     if (ret)
     {
         onTrace(ret);
@@ -9987,11 +9987,11 @@ const void * CHThorNWaySelectActivity::nextRow()
 }
 
 
-const void * CHThorNWaySelectActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorNWaySelectActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
     if (!selectedInput)
         return NULL;
-    return selectedInput->nextGE(seek, numFields);
+    return selectedInput->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 }
 
 IInputSteppingMeta * CHThorNWaySelectActivity::querySteppingMeta()

+ 0 - 1
ecl/hthor/hthor.hpp

@@ -56,7 +56,6 @@ struct IHThorInput : public IInputBase
 
     virtual void ready() = 0;
     virtual void updateProgress(IStatisticGatherer &progress) const = 0;
-    virtual const void * nextGE(const void * seek, unsigned numFields) { throwUnexpected(); }   // can only be called on stepping fields.  // Not sure why this is different from the Roxie/thor versions
     virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual void resetEOF() { }

+ 12 - 11
ecl/hthor/hthor.ipp

@@ -122,7 +122,8 @@ public:
         if (seek)
         {
             //MORE: Should think about implementing isCompleteMatch in hthor
-            next = inputArray[i]->nextGE(seek, numFields);      // , inputIsCompleteMatch
+            bool inputIsCompleteMatch;
+            next = inputArray[i]->nextRowGE(seek, numFields, inputIsCompleteMatch, *stepExtra);
         }
         else
         {
@@ -469,7 +470,7 @@ public:
     //interface IHThorInput
     virtual const void *nextRow();
 
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
     virtual void resetEOF();
@@ -740,7 +741,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
     virtual void resetEOF();
@@ -760,7 +761,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorLimitActivity : public CHThorSteppableActivityBase
@@ -775,7 +776,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorSkipLimitActivity : public CHThorSimpleActivityBase
@@ -814,7 +815,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorSkipCatchActivity : public CHThorSimpleActivityBase
@@ -1014,7 +1015,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual bool isGrouped();
 };
 
@@ -1026,7 +1027,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual bool isGrouped();
 };
 
@@ -1241,7 +1242,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 };
 
 class CHThorTraceActivity : public CHThorSteppableActivityBase
@@ -1260,7 +1261,7 @@ public:
 
     //interface IHThorInput
     virtual const void *nextRow();
-    virtual const void *nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 protected:
     void onTrace(const void *row);
 };
@@ -2801,7 +2802,7 @@ public:
     virtual void stop();
     virtual void ready();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
 };
 

+ 3 - 2
ecl/hthor/hthorkey.cpp

@@ -772,7 +772,7 @@ public:
     //interface IHThorInput
     virtual void ready();
     virtual const void *nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
 
     virtual IInputSteppingMeta * querySteppingMeta();
 
@@ -974,8 +974,9 @@ const void *CHThorIndexReadActivity::nextRow()
 }
 
 
-const void *CHThorIndexReadActivity::nextGE(const void * seek, unsigned numFields)
+const void *CHThorIndexReadActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
+    // MORE - should set wasCompleteMatch
     if(keyedLimitReached && !keyedLimitSkips)
         helper.onKeyedLimitExceeded(); // should throw exception
 

+ 4 - 60
ecl/hthor/hthorstep.cpp

@@ -54,8 +54,8 @@ const void * CHThorSteppedInput::nextInputRow()
 
 const void * CHThorSteppedInput::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
 {
-    //Currently isCompleteMatch is not handled by hthor
-    return input->nextGE(seek, numFields);
+    //Currently isCompleteMatch is not properly handled by hthor
+    return input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 }
 
 IInputSteppingMeta * CHThorSteppedInput::queryInputSteppingMeta()
@@ -163,9 +163,8 @@ IInputSteppingMeta * CHThorNWayMergeActivity::querySteppingMeta()
     return &meta;
 }
 
-const void * CHThorNWayMergeActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorNWayMergeActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    SmartStepExtra stepExtra(SSEFreadAhead, NULL);
     bool matched = true;
     const void * next = merger.nextRowGE(seek, numFields, matched, stepExtra);
     if (next)
@@ -223,9 +222,8 @@ const void * CHThorMergeJoinBaseActivity::nextRow()
     return next;
 }
 
-const void * CHThorMergeJoinBaseActivity::nextGE(const void * seek, unsigned numFields)
+const void * CHThorMergeJoinBaseActivity::nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
 {
-    SmartStepExtra stepExtra(SSEFreadAhead, NULL);
     bool matched = true;
     const void * next = processor.nextGE(seek, numFields, matched, stepExtra);
     if (next)
@@ -265,60 +263,6 @@ CHThorProximityJoinActivity::CHThorProximityJoinActivity(IAgentContext & _agent,
 
 //---------------------------------------------------------------------------
 
-#ifdef archived_old_code
-
-CHThorNWayJoinActivity::CHThorNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg), helper(_arg), processor(_inputAllocator, _outputAllocator, _arg)
-{
-}
-
-void CHThorNWayJoinActivity::stop()
-{
-    processor.afterProcessing();
-    CHThorNaryActivity::stop();
-}
-
-void CHThorNWayJoinActivity::ready()
-{
-    CHThorNaryActivity::ready();
-
-    UnsignedArray inputValues;
-    ForEachItemIn(i1, expandedInputs)
-    {
-        IHThorInput * cur = expandedInputs.item(i1);
-        Owned<CHThorSteppedInput> stepInput = new CHThorSteppedInput(cur);
-        inputValues.append(processor.addInput(stepInput, cur->querySteppingMeta()));
-    }
-    processor.addJoin(helper, inputValues);
-    processor.beforeProcessing();
-}
-
-
-IInputSteppingMeta * CHThorNWayJoinActivity::querySteppingMeta() 
-{ 
-    return processor.querySteppingMeta();
-}
-
-const void * CHThorNWayJoinActivity::nextRow()
-{
-    const void * next = processor.nextRow();
-    if (next)
-        processed++;
-    return next;
-}
-
-const void * CHThorNWayJoinActivity::nextGE(const void * seek, unsigned numFields)
-{
-    const void * next = processor.nextGE(seek, numFields);
-    if (next)
-        processed++;
-    return next;
-}
-
-#endif
-
-//---------------------------------------------------------------------------
-
-
 extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind)
 {
     unsigned flags = _arg.getJoinFlags();

+ 2 - 28
ecl/hthor/hthorstep.ipp

@@ -67,7 +67,7 @@ public:
     virtual void ready();
     virtual void stop();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
 
 protected:
@@ -87,7 +87,7 @@ public:
     virtual void ready();
     virtual void stop();
     virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
+    virtual const void *nextRowGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra);
     virtual IInputSteppingMeta * querySteppingMeta();
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector);
 
@@ -138,30 +138,4 @@ protected:
     CProximityJoinProcessor proximityProcessor;
 };
 
-
-#ifdef archived_old_code
-
-class CHThorNWayJoinActivity : public CHThorNaryActivity
-{
-public:
-    CHThorNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator);
-
-    //interface IHThorInput
-    virtual void ready();
-    virtual void stop();
-    virtual const void * nextRow();
-    virtual const void * nextGE(const void * seek, unsigned numFields);
-    virtual IInputSteppingMeta * querySteppingMeta();
-
-protected:
-    void afterProcessing();
-    void beforeProcessing();
-
-protected:
-    IHThorNWayMergeJoinArg & helper;
-    CNaryJoinProcessor processor;
-};
-
-#endif
-
 #endif