Kaynağa Gözat

Merge branch 'candidate-6.2.0'

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 8 yıl önce
ebeveyn
işleme
583a202da8

+ 2 - 1
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -40,7 +40,7 @@ protected:
         if (inputStopped)
             return;
         inputStopped = true;
-        PARENT::stop();
+        stopInput(0);
     }
     virtual void start() override
     {
@@ -159,6 +159,7 @@ public:
     virtual void stop() override
     {
         doStopInput();
+        PARENT::stop();
     }
     CATCH_NEXTROW()
     {

+ 1 - 7
thorlcr/activities/filter/thfilterslave.cpp

@@ -73,15 +73,9 @@ public:
         {
             dataLinkStart();
             abortSoon = true;
-            stop();
+            stopInput(0);
         }
     }
-    virtual void stop() override
-    {
-        if (!queryInputStopped(0))
-            PARENT::stop();
-        // else already stopped
-    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities);

+ 7 - 7
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -53,8 +53,8 @@ public:
         {
             abortSoon = true;
             stopped = true;
-            PARENT::stop();
         }
+        PARENT::stop();
     }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
@@ -100,7 +100,7 @@ public:
                     OwnedConstThorRow row = inputStream->ungroupedNextRow();
                     if (!row)
                     {
-                        stop();
+                        stopInput(0);
                         return NULL;
                     }
                     skipped++;
@@ -115,7 +115,7 @@ public:
                     return row.getClear();
                 }
             }
-            stop(); // NB: really whatever is pulling, should stop asap.
+            stopInput(0); // NB: really whatever is pulling, should stop asap.
         }
         return NULL;
     }
@@ -160,7 +160,7 @@ public:
                         {
                             if (0 == skipped)
                             {
-                                stop();
+                                stopInput(0);
                                 return NULL;
                             }
                             skipped = 0; // reset, skip group
@@ -179,7 +179,7 @@ public:
                     }
                     else if (0 == countThisGroup && 0==skipCount)
                     {
-                        stop();
+                        stopInput(0);
                         return NULL;
                     }
                 }
@@ -330,7 +330,7 @@ public:
                     OwnedConstThorRow row = inputStream->ungroupedNextRow();
                     if (!row)
                     {
-                        stop();
+                        stopInput(0);
                         return NULL;
                     }
                     skipped++;
@@ -345,7 +345,7 @@ public:
                     return row.getClear();
                 }
             }
-            stop(); // NB: really whatever is pulling, should stop asap.
+            stopInput(0); // NB: really whatever is pulling, should stop asap.
         }
         return NULL;
     }

+ 19 - 15
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2024,9 +2024,8 @@ public:
             distributor->disconnect(true);
             distributor->join();
         }
-        stopInput();
         instrm.clear();
-        dataLinkStop();
+        PARENT::stop();
     }
     virtual void kill() override
     {
@@ -2178,7 +2177,7 @@ public:
                 }
                 out->flush();
                 sz = out->getPosition();
-                activity->stop();
+                activity->stopInput(0);
             }
             ret.setown(createRowStream(tempfile, activity, rwFlags));
         }
@@ -2894,10 +2893,15 @@ public:
         if (!inputstopped)
         {
             SpinBlock b(stopSpin);
-            PARENT::stop();
+            PARENT::stopInput(0);
             inputstopped = true;
         }
     }
+    virtual void stop() override
+    {
+        stopInput();
+        PARENT::stop();
+    }
     void kill()
     {
         ActPrintLog("kill");
@@ -3435,11 +3439,6 @@ public:
         : HashDedupSlaveActivityBase(container, true)
     {
     }
-    void stop()
-    {
-        ActPrintLog("stopping");
-        stopInput();
-    }
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         initMetaInfo(info);
@@ -3449,6 +3448,8 @@ public:
 
 class GlobalHashDedupSlaveActivity : public HashDedupSlaveActivityBase, implements IStopInput
 {
+    typedef HashDedupSlaveActivityBase PARENT;
+
     mptag_t mptag;
     CriticalSection stopsect;
     IHashDistributor *distributor;
@@ -3471,11 +3472,6 @@ public:
             distributor->Release();
         }
     }
-    void stopInput()
-    {
-        CriticalBlock block(stopsect);  // can be called async by distribute
-        HashDedupSlaveActivityBase::stopInput();
-    }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         HashDedupSlaveActivityBase::init(data, slaveData);
@@ -3504,6 +3500,7 @@ public:
             distributor->join();
         }
         stopInput();
+        PARENT::stop();
     }
     virtual void abort()
     {
@@ -3517,6 +3514,13 @@ public:
         info.canStall = true;
         info.unknownRowsOutput = true;
     }
+
+// IStopInput
+    virtual void stopInput() override
+    {
+        CriticalBlock block(stopsect);  // can be called async by distribute
+        HashDedupSlaveActivityBase::stopInput();
+    }
 };
 
 //===========================================================================
@@ -3669,7 +3673,7 @@ public:
             CriticalBlock b(joinHelperCrit);
             joinhelper.clear();
         }
-        dataLinkStop();
+        PARENT::stop();
     }
     void kill()
     {

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1749,7 +1749,7 @@ public:
         if (!inputStopped)
         {
             inputStopped = true;
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
     }
     void doAbortLimit(CJoinGroup *jg)
@@ -2071,6 +2071,7 @@ public:
             }
         }
         stopInput();
+        PARENT::stop();
 #ifdef TRACE_JOINGROUPS
         ActPrintLog("groupsPendsNoted = %d", groupsPendsNoted);
         ActPrintLog("fetchReadBack = %d", fetchReadBack);
@@ -2079,7 +2080,6 @@ public:
         ActPrintLog("wroteToFetchPipe = %d", wroteToFetchPipe);
         ActPrintLog("groupsComplete = %d", groupsComplete);
 #endif
-        dataLinkStop();
     }
     const void *doDenormTransform(RtlDynamicRowBuilder &target, CJoinGroup &group)
     {

+ 2 - 1
thorlcr/activities/limit/thlimitslave.cpp

@@ -38,7 +38,7 @@ protected:
         {
             stopped = true;
             sendResult(c);
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
     }
 
@@ -77,6 +77,7 @@ public:
     virtual void stop() override
     {
         stopInput(getDataLinkCount());
+        PARENT::stop();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override

+ 4 - 2
thorlcr/activities/loop/thloopslave.cpp

@@ -107,7 +107,7 @@ public:
     void doStop()
     {
         sendEndLooping();
-        PARENT::stop();
+        stopInput(0);
     }
 // IThorDataLink
     virtual bool isGrouped() const override { return false; }
@@ -429,6 +429,7 @@ public:
     {
         if (nextRowFeeder)
             nextRowFeeder->stop(); // NB: This will block if this slave's loop hasn't hit eof, it will continue looping until 'finishedLooping'
+        PARENT::stop();
     }
 };
 
@@ -502,7 +503,8 @@ public:
     {
         finalResultStream.clear();
         loopResults.clear();
-        CLoopSlaveActivityBase::doStop();
+        doStop();
+        PARENT::stop();
     }
 };
 

+ 1 - 1
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -80,13 +80,13 @@ public:
     virtual void stop()
     {
         out.clear();
-        PARENT::stop();
         if (hasStarted())
         {
             CriticalBlock block(statsCs);
             mergeStats(spillStats, iLoader);
             iLoader.clear();
         }
+        PARENT::stop();
     }
     CATCH_NEXTROW()
     {

+ 2 - 3
thorlcr/activities/msort/thmsortslave.cpp

@@ -119,7 +119,7 @@ public:
                 abortSoon,
                 auxrowif);
 
-            PARENT::stop();
+            PARENT::stopInput(0);
             if (abortSoon)
             {
                 ActPrintLogEx(&queryContainer(), thorlog_null, MCwarning, "MSortSlaveActivity::start aborting");
@@ -161,13 +161,12 @@ public:
             barrier->wait(false);
             ActPrintLog("SORT barrier.2 raised");
         }
-        PARENT::stop();
         if (queryInputStarted(0))
         {
             ActPrintLog("SORT waiting for merge");
             sorter->stopMerge();
         }
-        dataLinkStop();
+        PARENT::stop();
     }
     virtual void reset() override
     {

+ 5 - 10
thorlcr/activities/rollup/throllupslave.cpp

@@ -156,7 +156,6 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
     typedef CSlaveActivity PARENT;
 
     bool rollup;
-    CriticalSection stopsect;
     Linked<IThorRowInterfaces> rowif;
 
 protected:
@@ -179,11 +178,6 @@ public:
         if (!global)
             setRequireInitData(false);
     }
-    virtual void stopInput()
-    {
-        CriticalBlock block(stopsect);  // can be called async by distribute
-        PARENT::stop();
-    }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
@@ -198,10 +192,6 @@ public:
         eogNext = eos = false;
         numKept = 0;
     }
-    virtual void stop()
-    {
-        stopInput();
-    }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         if (global) {
@@ -283,6 +273,11 @@ public:
         if (global)
             cancelReceiveMsg(queryJobChannel().queryMyRank(), mpTag);
     }
+// IStopInput
+    virtual void stopInput() override
+    {
+        PARENT::stopInput(0);
+    }
 };
 
 class CDedupBaseSlaveActivity : public CDedupRollupBaseActivity

+ 8 - 9
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -68,7 +68,7 @@ private:
         Owned<IThorRowLoader> iLoader = createThorRowLoader(*this, ::queryRowInterfaces(input), compare, isUnstable() ? stableSort_none : stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> rs = iLoader->load(inputStream, abortSoon);
         mergeStats(spillStats, iLoader);  // Not sure of the best policy if rs spills later on.
-        PARENT::stop();
+        PARENT::stopInput(0);
         return rs.getClear();
     }
 
@@ -78,7 +78,7 @@ private:
         ActPrintLog("SELFJOIN: Performing global self-join");
 #endif
         sorter->Gather(::queryRowInterfaces(input), inputStream, compare, NULL, NULL, keyserializer, NULL, NULL, false, isUnstable(), abortSoon, NULL);
-        PARENT::stop();
+        PARENT::stopInput(0);
         if (abortSoon)
         {
             barrier->cancel();
@@ -176,7 +176,7 @@ public:
         {
             strm.setown(isLocal ? doLocalSelfJoin() : doGlobalSelfJoin());
             assertex(strm);
-            // NB: PARENT::stop() will now have been called
+            // NB: PARENT::stopInput(0) will now have been called
         }
 
         joinhelper->init(strm, NULL, ::queryRowAllocator(queryInput(0)), ::queryRowAllocator(queryInput(0)), ::queryRowMetaData(queryInput(0)));
@@ -201,14 +201,13 @@ public:
                 CriticalBlock b(joinHelperCrit);
                 joinhelper.clear();
             }
-            if (isLightweight)
-                PARENT::stop();
-            else if (strm) // if !isLightWeight, PARENT::stop() will have been called in start()
+            if (strm)
+            {
                 strm->stop();
-            strm.clear();
+                strm.clear();
+            }
         }
-        else
-            PARENT::stop();
+        PARENT::stop();
     }
     
     CATCH_NEXTROW()

+ 4 - 4
thorlcr/activities/topn/thtopnslave.cpp

@@ -176,14 +176,14 @@ public:
         }
         if (global || 0 == topNLimit || 0 == sortedCount)
         {
-            PARENT::stop();
+            PARENT::stopInput(0);
             if (!global || 0 == topNLimit)
                 eos = true;
         }
         return retStream.getClear();
     }
 // IThorDataLink
-    virtual void start()
+    virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
@@ -194,7 +194,7 @@ public:
         if (0 == topNLimit)
         {
             eos = true;
-            PARENT::stop();
+            PARENT::stopInput(0);
         }
         else
         {
@@ -204,7 +204,7 @@ public:
         eog = false;
     }
     virtual bool isGrouped() const override { return grouped; }
-    virtual void stop()
+    virtual void stop() override
     {
         if (out)
             out->stop();

+ 3 - 3
thorlcr/graph/thgraphslave.hpp

@@ -33,9 +33,6 @@
 #include "traceslave.hpp"
 #include "thorstrand.hpp"
 
-// #define OUTPUT_RECORDSIZE // causes the record size to be logged for each activity on the 1st call to dataLinkIncrement
-
-
 interface IStartableEngineRowStream : extends IEngineRowStream
 {
     virtual void start() = 0;
@@ -85,6 +82,8 @@ public:
     inline void dataLinkIncrement() { dataLinkIncrement(1); }
     inline void dataLinkIncrement(rowcount_t v)
     {
+#ifdef _TESTING
+        assertex(hasStarted());
 #ifdef OUTPUT_RECORDSIZE
         if (count==THORDATALINK_STARTED)
         {
@@ -92,6 +91,7 @@ public:
             parent.ActPrintLog("Record size %s= %d", parent.queryRowMetaData(this)->isVariableSize()?"(min) ":"",rsz);
         }
 #endif
+#endif
         icount += v;
         count += v;
     }