Explorar o código

Merge pull request #7843 from jakesmith/hpcc-14343

HPCC-14343 Deadlock fix for global LOOP loopagain if stopped early

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday %!s(int64=9) %!d(string=hai) anos
pai
achega
a016258de1
Modificáronse 1 ficheiros con 119 adicións e 115 borrados
  1. 119 115
      thorlcr/activities/loop/thloopslave.cpp

+ 119 - 115
thorlcr/activities/loop/thloopslave.cpp

@@ -28,90 +28,6 @@
 #include "eclrtl_imp.hpp"
 #include "thcompressutil.hpp"
 
-class CNextRowFeeder : public CSimpleInterface, implements IThreaded, implements IRowStream
-{
-    CThreaded threaded;
-    CActivityBase *activity;
-    Owned<ISmartRowBuffer> smartbuf;
-    Owned<IRowStream> in;
-    bool stopped;
-    Owned<IException> exception;
-    IRowInterfaces *rowInterfaces;
-public:
-    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-
-    CNextRowFeeder(CActivityBase *_activity, IRowStream *_in) : in(_in), threaded("CNextRowFeeder"), activity(_activity), rowInterfaces(_activity)
-    {
-        stopped = true;
-        smartbuf.setown(createSmartInMemoryBuffer(activity, activity, SMALL_SMART_BUFFER_SIZE));
-        threaded.init(this);
-    }
-    ~CNextRowFeeder()
-    {
-        stopThread();
-    }
-    void stopThread()
-    {
-        if (!stopped)
-        {
-            stopped = true;
-            threaded.join();
-        }
-    }
-    void main()
-    {
-        stopped = false;
-        Linked<IRowWriter> writer = smartbuf->queryWriter();
-        try
-        {
-            while (!stopped)
-            {
-                OwnedConstThorRow row = in->nextRow();
-                if (!row)
-                {
-                    row.setown(in->nextRow());
-                    if (!row)
-                        break;
-                    else
-                        writer->putRow(NULL); // eog
-                }
-                writer->putRow(row.getClear());
-            }
-            in->stop();
-        }
-        catch (IException *e)
-        {
-            ActPrintLog(activity, e);
-            exception.setown(e);
-        }
-        try { writer->flush(); }
-        catch (IException *e)
-        {
-            ActPrintLog(activity, e, "Exception in writer->flush");
-            if (!exception.get())
-                exception.setown(e);
-        }
-    }
-    virtual const void *nextRow()
-    {
-        OwnedConstThorRow row = smartbuf->nextRow();
-        if (exception) 
-            throw exception.getClear();
-        return row.getClear();
-    }
-    virtual void stop()
-    {
-        if (smartbuf)
-        {
-            smartbuf->stop(); // just in case blocked
-            stopThread();
-            smartbuf.clear();
-            if (exception) 
-                throw exception.getClear();
-        }
-    }
-};
-
 class CLoopSlaveActivityBase : public CSlaveActivity, public CThorDataLink
 {
 protected:
@@ -216,7 +132,6 @@ public:
 class CLoopSlaveActivity : public CLoopSlaveActivityBase
 {
     Owned<IRowStream> curInput;
-    Owned<CNextRowFeeder> nextRowFeeder;
     Owned<IRowWriterMultiReader> loopPending;
     rowcount_t loopPendingCount;
     unsigned flags, lastMs;
@@ -224,6 +139,103 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
     bool eof, finishedLooping;
     Owned<IBarrier> barrier;
 
+    class CNextRowFeeder : public CSimpleInterface, implements IThreaded, implements IRowStream
+    {
+        CThreaded threaded;
+        CLoopSlaveActivity *activity;
+        Owned<ISmartRowBuffer> smartbuf;
+        bool stopped, stopping;
+        Owned<IException> exception;
+        IRowInterfaces *rowInterfaces;
+
+        void stopThread()
+        {
+            if (!stopped)
+            {
+                stopped = true;
+                threaded.join();
+            }
+        }
+    public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+        CNextRowFeeder(CLoopSlaveActivity *_activity) : threaded("CNextRowFeeder"), activity(_activity), rowInterfaces(_activity)
+        {
+            stopped = true;
+            stopping = false;
+            smartbuf.setown(createSmartInMemoryBuffer(activity, activity, SMALL_SMART_BUFFER_SIZE));
+            threaded.init(this);
+        }
+        ~CNextRowFeeder()
+        {
+            stopThread();
+        }
+        void abort()
+        {
+            if (smartbuf)
+            {
+                smartbuf->stop(); // just in case blocked
+                stopThread();
+            }
+        }
+        void main()
+        {
+            stopped = false;
+            Linked<IRowWriter> writer = smartbuf->queryWriter();
+            try
+            {
+                while (!stopped)
+                {
+                    OwnedConstThorRow row = activity->getNextRow(stopping);
+                    if (!row)
+                    {
+                        row.setown(activity->getNextRow(stopping));
+                        if (!row)
+                            break;
+                        else if (!stopping)
+                            writer->putRow(NULL); // eog
+                    }
+                    if (!stopping)
+                        writer->putRow(row.getClear());
+                }
+                activity->doStop();
+            }
+            catch (IException *e)
+            {
+                ::ActPrintLog(activity, e);
+                exception.setown(e);
+            }
+            try { writer->flush(); }
+            catch (IException *e)
+            {
+                ::ActPrintLog(activity, e, "Exception in writer->flush");
+                if (!exception.get())
+                    exception.setown(e);
+            }
+        }
+        virtual const void *nextRow()
+        {
+            OwnedConstThorRow row = smartbuf->nextRow();
+            if (exception)
+                throw exception.getClear();
+            return row.getClear();
+        }
+        virtual void stop()
+        {
+            /* NB: signals wants to stop and discards further rows coming out of loop,
+             * but reader thread keeps looping, until finishedLooping=true.
+             */
+            stopping = true;
+            smartbuf->stop(); // just in case blocked
+            threaded.join();
+            stopped = true;
+            smartbuf.clear();
+            if (exception)
+                throw exception.getClear();
+        }
+    };
+    Owned<CNextRowFeeder> nextRowFeeder;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -252,6 +264,8 @@ public:
     virtual void abort()
     {
         CLoopSlaveActivityBase::abort();
+        if (nextRowFeeder)
+            nextRowFeeder->abort();
         if (barrier)
             barrier->cancel();
     }
@@ -272,31 +286,16 @@ public:
         curInput.set(input);
         lastMs = msTick();
 
-        class CWrapper : public CSimpleInterface, implements IRowStream
-        {
-            CLoopSlaveActivity &activity;
-        public:
-            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-            CWrapper(CLoopSlaveActivity &_activity) : activity(_activity) { }
-            virtual const void *nextRow()
-            {
-                return activity.getNextRow();
-            }
-            virtual void stop()
-            {
-                activity.doStop();
-            }
-        };
         ActPrintLog("maxIterations = %d", maxIterations);
         dataLinkStart();
-        nextRowFeeder.setown(new CNextRowFeeder(this, new CWrapper(*this)));
+        nextRowFeeder.setown(new CNextRowFeeder(this));
     }
     void doStop()
     {
         loopPending.clear();
         CLoopSlaveActivityBase::doStop();
     }
-    const void *getNextRow()
+    const void *getNextRow(bool stopping)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (!abortSoon && !eof)
@@ -326,19 +325,24 @@ public:
                 if (abortSoon)
                     break;
 
-                switch (container.getKind())
+                if (stopping)
+                    finishedLooping = true;
+                else
                 {
-                case TAKloopdataset:
-                    assertex(flags & IHThorLoopArg::LFnewloopagain);
-                    // NB: finishedLooping set at end of loop, based on loopAgain result
-                    break;
-                case TAKlooprow:
-                    if (0 == loopPendingCount)
-                        finishedLooping = true; // This slave has finished
-                    break;
-                case TAKloopcount:
-                    // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
-                    break;
+                    switch (container.getKind())
+                    {
+                    case TAKloopdataset:
+                        assertex(flags & IHThorLoopArg::LFnewloopagain);
+                        // NB: finishedLooping set at end of loop, based on loopAgain result
+                        break;
+                    case TAKlooprow:
+                        if (0 == loopPendingCount)
+                            finishedLooping = true; // This slave has finished
+                        break;
+                    case TAKloopcount:
+                        // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
+                        break;
+                    }
                 }
 
                 if (loopPendingCount)
@@ -427,7 +431,7 @@ public:
     }
     virtual void stop()
     {
-        nextRowFeeder->stop();
+        nextRowFeeder->stop(); // NB: This will block if this slave's loop hasn't hit eof, it will continue looping until 'finishedLooping'
     }
 };