Browse Source

Merge pull request #8054 from richardkchapman/split-stream-phase-2b

HPCC-14656 Split concepts of an input and a row stream

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
6d7419ad61
3 changed files with 3 additions and 58 deletions
  1. 1 9
      roxie/ccd/ccddebug.cpp
  2. 2 47
      roxie/ccd/ccdserver.cpp
  3. 0 2
      roxie/ccd/ccdserver.hpp

+ 1 - 9
roxie/ccd/ccddebug.cpp

@@ -129,10 +129,6 @@ public:
         hasStarted = false;
         in->reset();
     }
-    virtual void checkAbort()
-    {
-        in->checkAbort();
-    }
     virtual unsigned queryId() const
     {
         return in->queryId();
@@ -141,10 +137,6 @@ public:
     {
         return in->queryTotalCycles();
     }
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        return in->queryLocalCycles();
-    }
     virtual IRoxieInput *queryInput(unsigned idx) const
     {
         if (!idx)
@@ -264,7 +256,7 @@ public:
             totalTime += 10; // Fudge factor - I don't really know the times but this makes the graph more useable than not supplying a totalTime value
         if (totalTime)
             putStatsValue(&node, "totalTime", "sum", totalTime);
-        unsigned localTime = isOutput ? 10 : (unsigned) (cycle_to_nanosec(in->queryLocalCycles())/1000); // Fudge factor - I don't really know the times but this makes the graph more useable than not supplying a localTime value
+        unsigned localTime = isOutput ? 10 : (unsigned) (cycle_to_nanosec(in->queryActivity()->queryLocalCycles())/1000); // Fudge factor - I don't really know the times but this makes the graph more useable than not supplying a localTime value
         if (localTime)
             putStatsValue(&node, "localTime", "sum", localTime);
     }

+ 2 - 47
roxie/ccd/ccdserver.cpp

@@ -1798,11 +1798,6 @@ public:
         return puller.queryInput()->queryOutputMeta(); 
     }
 
-    virtual void checkAbort() 
-    {
-        puller.queryInput()->checkAbort();
-    }
-
     void setInput(unsigned idx, IRoxieInput *_in)
     {
         assertex(!idx);
@@ -1814,13 +1809,6 @@ public:
         return totalCycles;
     }
 
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        __int64 ret = totalCycles - puller.queryInput()->queryTotalCycles();
-        if (ret < 0) ret = 0;
-        return ret;
-    }
-
     virtual IRoxieInput *queryInput(unsigned idx) const
     {
         return puller.queryInput()->queryInput(idx);
@@ -3735,11 +3723,6 @@ public:
         owner->start(parentExtractSize, parentExtract, paused);
     }
 
-    void checkAbort()
-    {
-        owner->checkAbort();
-    }
-
     void setLimits(unsigned __int64 _rowLimit, unsigned __int64 _keyedLimit, unsigned __int64 _stopAfter)
     {
         if (ctx->queryProbeManager())
@@ -3831,11 +3814,6 @@ public:
         return totalCycles;
     }
 
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        return owner->queryLocalCycles();
-    }
-
     virtual IRoxieInput *queryInput(unsigned idx) const
     {
         return owner->queryInput(idx);
@@ -4001,14 +3979,14 @@ public:
         {
             checkDelayed();
             unsigned timeout = remoteId.isSLAPriority() ? slaTimeout : (remoteId.isHighPriority() ? highTimeout : lowTimeout);
-            owner->checkAbort();
+            activity.queryContext()->checkAbort();
             bool anyActivity;
             if (ctxTraceLevel > 5)
                 activity.queryLogCtx().CTXLOG("Calling getNextUnpacker(%d)", timeout);
             mr.setown(mc->getNextResult(timeout, anyActivity));
             if (ctxTraceLevel > 6)
                 activity.queryLogCtx().CTXLOG("Called getNextUnpacker(%d), activity=%d", timeout, anyActivity);
-            owner->checkAbort();
+            activity.queryContext()->checkAbort();
             if (mr)
             {
                 unsigned roxieHeaderLen;
@@ -5416,10 +5394,6 @@ public:
     {
         return input->queryTotalCycles();
     }
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        return input->queryLocalCycles();
-    }
     virtual IRoxieInput *queryInput(unsigned idx) const
     {
         return input->queryInput(idx);
@@ -5452,11 +5426,6 @@ public:
         CriticalBlock procedure(cs);
         input->resetEOF();
     }
-    virtual void checkAbort()
-    {
-        CriticalBlock procedure(cs);
-        input->checkAbort();
-    }
     virtual const void *nextRow()
     {
         CriticalBlock procedure(cs);
@@ -5541,10 +5510,6 @@ public:
     { 
         input->reset();
     }
-    virtual void checkAbort() 
-    {
-        input->checkAbort();
-    }
 
     virtual const void * nextRow()
     {
@@ -5561,11 +5526,6 @@ public:
         return input->queryTotalCycles();
     }
 
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        return input->queryLocalCycles();
-    }
-
     virtual IRoxieInput *queryInput(unsigned idx) const
     {
         return input->queryInput(idx);
@@ -13940,11 +13900,6 @@ public:
         savedParentExtractSize = 0;
     }
 
-    virtual IRoxieInput *queryInput(unsigned idx) const
-    {
-        return safeInput->queryInput(idx);
-    }
-
     void setInput(CRoxieServerParallelLoopActivity * _activity, IRoxieInput *_input, unsigned _flags)
     {
         activity = _activity;

+ 0 - 2
roxie/ccd/ccdserver.hpp

@@ -94,11 +94,9 @@ interface IRoxieInput : extends IInputBase
 {
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
     virtual void reset() = 0;
-    virtual void checkAbort() = 0;
     virtual unsigned queryId() const = 0;
 
     virtual unsigned __int64 queryTotalCycles() const = 0;
-    virtual unsigned __int64 queryLocalCycles() const = 0;
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual unsigned numConcreteOutputs() const { return 1; }
     virtual IRoxieInput * queryConcreteInput(unsigned idx) { assertex(idx==0); return this; }