Bläddra i källkod

HPCC-16387 Ensure stop is not called until after all rows returned

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 8 år sedan
förälder
incheckning
6f18b59575

+ 2 - 1
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -40,7 +40,7 @@ protected:
         if (inputStopped)
             return;
         inputStopped = true;
-        PARENT::stop();
+        stopInput(0);
     }
     virtual void start() override
     {
@@ -157,6 +157,7 @@ public:
     virtual void stop() override
     {
         doStopInput();
+        PARENT::stop();
     }
     CATCH_NEXTROW()
     {

+ 1 - 1
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -53,8 +53,8 @@ public:
         {
             abortSoon = true;
             stopped = true;
-            PARENT::stop();
         }
+        PARENT::stop();
     }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {

+ 18 - 14
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2022,9 +2022,8 @@ public:
             distributor->disconnect(true);
             distributor->join();
         }
-        stopInput();
         instrm.clear();
-        dataLinkStop();
+        PARENT::stop();
     }
     virtual void kill() override
     {
@@ -2900,10 +2899,15 @@ public:
         if (!inputstopped)
         {
             SpinBlock b(stopSpin);
-            PARENT::stop();
+            PARENT::stopInput(0);
             inputstopped = true;
         }
     }
+    virtual void stop() override
+    {
+        stopInput();
+        PARENT::stop();
+    }
     void kill()
     {
         ActPrintLog("kill");
@@ -3441,11 +3445,6 @@ public:
         : HashDedupSlaveActivityBase(container, true)
     {
     }
-    void stop()
-    {
-        ActPrintLog("stopping");
-        stopInput();
-    }
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         initMetaInfo(info);
@@ -3455,6 +3454,8 @@ public:
 
 class GlobalHashDedupSlaveActivity : public HashDedupSlaveActivityBase, implements IStopInput
 {
+    typedef HashDedupSlaveActivityBase PARENT;
+
     mptag_t mptag;
     CriticalSection stopsect;
     IHashDistributor *distributor;
@@ -3477,11 +3478,6 @@ public:
             distributor->Release();
         }
     }
-    void stopInput()
-    {
-        CriticalBlock block(stopsect);  // can be called async by distribute
-        HashDedupSlaveActivityBase::stopInput();
-    }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         HashDedupSlaveActivityBase::init(data, slaveData);
@@ -3510,6 +3506,7 @@ public:
             distributor->join();
         }
         stopInput();
+        PARENT::stop();
     }
     virtual void abort()
     {
@@ -3523,6 +3520,13 @@ public:
         info.canStall = true;
         info.unknownRowsOutput = true;
     }
+
+// IStopInput
+    virtual void stopInput() override
+    {
+        CriticalBlock block(stopsect);  // can be called async by distribute
+        HashDedupSlaveActivityBase::stopInput();
+    }
 };
 
 //===========================================================================
@@ -3675,7 +3679,7 @@ public:
             CriticalBlock b(joinHelperCrit);
             joinhelper.clear();
         }
-        dataLinkStop();
+        PARENT::stop();
     }
     void kill()
     {

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1749,7 +1749,7 @@ public:
         if (!inputStopped)
         {
             inputStopped = true;
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
     }
     void doAbortLimit(CJoinGroup *jg)
@@ -2071,6 +2071,7 @@ public:
             }
         }
         stopInput();
+        PARENT::stop();
 #ifdef TRACE_JOINGROUPS
         ActPrintLog("groupsPendsNoted = %d", groupsPendsNoted);
         ActPrintLog("fetchReadBack = %d", fetchReadBack);
@@ -2079,7 +2080,6 @@ public:
         ActPrintLog("wroteToFetchPipe = %d", wroteToFetchPipe);
         ActPrintLog("groupsComplete = %d", groupsComplete);
 #endif
-        dataLinkStop();
     }
     const void *doDenormTransform(RtlDynamicRowBuilder &target, CJoinGroup &group)
     {

+ 2 - 1
thorlcr/activities/limit/thlimitslave.cpp

@@ -38,7 +38,7 @@ protected:
         {
             stopped = true;
             sendResult(c);
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
     }
 
@@ -75,6 +75,7 @@ public:
     virtual void stop() override
     {
         stopInput(getDataLinkCount());
+        PARENT::stop();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override

+ 4 - 2
thorlcr/activities/loop/thloopslave.cpp

@@ -105,7 +105,7 @@ public:
     void doStop()
     {
         sendEndLooping();
-        PARENT::stop();
+        stopInput(0);
     }
 // IThorDataLink
     virtual bool isGrouped() const override { return false; }
@@ -427,6 +427,7 @@ public:
     {
         if (nextRowFeeder)
             nextRowFeeder->stop(); // NB: This will block if this slave's loop hasn't hit eof, it will continue looping until 'finishedLooping'
+        PARENT::stop();
     }
 };
 
@@ -500,7 +501,8 @@ public:
     {
         finalResultStream.clear();
         loopResults.clear();
-        CLoopSlaveActivityBase::doStop();
+        doStop();
+        PARENT::stop();
     }
 };
 

+ 1 - 1
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -82,13 +82,13 @@ public:
     virtual void stop()
     {
         out.clear();
-        PARENT::stop();
         if (hasStarted())
         {
             CriticalBlock block(statsCs);
             mergeStats(spillStats, iLoader);
             iLoader.clear();
         }
+        PARENT::stop();
     }
     CATCH_NEXTROW()
     {

+ 2 - 3
thorlcr/activities/msort/thmsortslave.cpp

@@ -119,7 +119,7 @@ public:
                 abortSoon,
                 auxrowif);
 
-            PARENT::stop();
+            PARENT::stopInput(0);
             if (abortSoon)
             {
                 ActPrintLogEx(&queryContainer(), thorlog_null, MCwarning, "MSortSlaveActivity::start aborting");
@@ -161,13 +161,12 @@ public:
             barrier->wait(false);
             ActPrintLog("SORT barrier.2 raised");
         }
-        PARENT::stop();
         if (queryInputStarted(0))
         {
             ActPrintLog("SORT waiting for merge");
             sorter->stopMerge();
         }
-        dataLinkStop();
+        PARENT::stop();
     }
     virtual void reset() override
     {

+ 5 - 10
thorlcr/activities/rollup/throllupslave.cpp

@@ -156,7 +156,6 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
     typedef CSlaveActivity PARENT;
 
     bool rollup;
-    CriticalSection stopsect;
     Linked<IThorRowInterfaces> rowif;
 
 protected:
@@ -177,11 +176,6 @@ public:
         global = _global;
         groupOp = _groupOp;
     }
-    virtual void stopInput()
-    {
-        CriticalBlock block(stopsect);  // can be called async by distribute
-        PARENT::stop();
-    }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
@@ -196,10 +190,6 @@ public:
         eogNext = eos = false;
         numKept = 0;
     }
-    virtual void stop()
-    {
-        stopInput();
-    }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         if (global) {
@@ -281,6 +271,11 @@ public:
         if (global)
             cancelReceiveMsg(queryJobChannel().queryMyRank(), mpTag);
     }
+// IStopInput
+    virtual void stopInput() override
+    {
+        PARENT::stopInput(0);
+    }
 };
 
 class CDedupBaseSlaveActivity : public CDedupRollupBaseActivity

+ 4 - 4
thorlcr/activities/topn/thtopnslave.cpp

@@ -174,14 +174,14 @@ public:
         }
         if (global || 0 == topNLimit || 0 == sortedCount)
         {
-            PARENT::stop();
+            PARENT::stopInput(0);
             if (!global || 0 == topNLimit)
                 eos = true;
         }
         return retStream.getClear();
     }
 // IThorDataLink
-    virtual void start()
+    virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
@@ -192,7 +192,7 @@ public:
         if (0 == topNLimit)
         {
             eos = true;
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
         else
         {
@@ -202,7 +202,7 @@ public:
         eog = false;
     }
     virtual bool isGrouped() const override { return grouped; }
-    virtual void stop()
+    virtual void stop() override
     {
         if (out)
             out->stop();