Kaynağa Gözat

HPCC-15401 Fix reuse of changed inputStream spotted in review

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 yıl önce
ebeveyn
işleme
324fa5be62

+ 7 - 1
thorlcr/activities/enth/thenthslave.cpp

@@ -28,6 +28,7 @@ protected:
     Semaphore finishedSem;
     rowcount_t counter = 0, localRecCount = 0;
     rowcount_t denominator = 0, numerator = 0;
+    IEngineRowStream *originalInputStream = nullptr;
 
     bool haveLocalCount() { return RCUNBOUND != localRecCount; }
     inline bool wanted()
@@ -81,7 +82,7 @@ protected:
         else
         {
             localRecCount = RCUNBOUND;
-            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
+            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, originalInputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
             setLookAhead(0, lookAhead); // NB: this is post base start()
             lookAhead->start();
         }
@@ -96,6 +97,11 @@ public:
     {
         appendOutputLinked(this);
     }
+    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    {
+        PARENT::setInputStream(index, _input, consumerOrdered);
+        originalInputStream = inputStream;
+    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);

+ 7 - 1
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -216,6 +216,7 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
     rowcount_t maxres, skipped, totallimit;
     bool firstget;
     ThorDataLinkMetaInfo inputMeta;
+    IEngineRowStream *originalInputStream = nullptr;
 
 protected:
     virtual void doStop()
@@ -234,6 +235,11 @@ public:
         PARENT::init(data, slaveData);
         mpTag = container.queryJobChannel().deserializeMPTag(data);
     }
+    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    {
+        PARENT::setInputStream(index, _input, consumerOrdered);
+        originalInputStream = inputStream;
+    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
@@ -246,7 +252,7 @@ public:
         totallimit = (rowcount_t)helper->getLimit();
         rowcount_t _skipCount = validRC(helper->numToSkip()); // max
         rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
-        IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
+        IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, originalInputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
                                                                               maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
         setLookAhead(0, lookAhead);
         lookAhead->start();