Browse Source

HPCC-14343 Deadlock fix for global LOOP loopagain if stopped early

When a global loop activity is stopped early (e.g. by firstn), it
no longer exectuted on that slave, if other slaves had not
finished, they would deadlock at the global loop synchronization
point.

Fix by continuing to loop ensuring that the finished slaves
continue to signal that they are finished, until all are finished.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 years ago
parent
commit
ad7e43a523
1 changed files with 119 additions and 115 deletions
  1. 119 115
      thorlcr/activities/loop/thloopslave.cpp

+ 119 - 115
thorlcr/activities/loop/thloopslave.cpp

@@ -28,90 +28,6 @@
 #include "eclrtl_imp.hpp"
 #include "thcompressutil.hpp"
 
-class CNextRowFeeder : public CSimpleInterface, implements IThreaded, implements IRowStream
-{
-    CThreaded threaded;
-    CActivityBase *activity;
-    Owned<ISmartRowBuffer> smartbuf;
-    Owned<IRowStream> in;
-    bool stopped;
-    Owned<IException> exception;
-    IRowInterfaces *rowInterfaces;
-public:
-    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-
-    CNextRowFeeder(CActivityBase *_activity, IRowStream *_in) : in(_in), threaded("CNextRowFeeder"), activity(_activity), rowInterfaces(_activity)
-    {
-        stopped = true;
-        smartbuf.setown(createSmartInMemoryBuffer(activity, activity, SMALL_SMART_BUFFER_SIZE));
-        threaded.init(this);
-    }
-    ~CNextRowFeeder()
-    {
-        stopThread();
-    }
-    void stopThread()
-    {
-        if (!stopped)
-        {
-            stopped = true;
-            threaded.join();
-        }
-    }
-    void main()
-    {
-        stopped = false;
-        Linked<IRowWriter> writer = smartbuf->queryWriter();
-        try
-        {
-            while (!stopped)
-            {
-                OwnedConstThorRow row = in->nextRow();
-                if (!row)
-                {
-                    row.setown(in->nextRow());
-                    if (!row)
-                        break;
-                    else
-                        writer->putRow(NULL); // eog
-                }
-                writer->putRow(row.getClear());
-            }
-            in->stop();
-        }
-        catch (IException *e)
-        {
-            ActPrintLog(activity, e);
-            exception.setown(e);
-        }
-        try { writer->flush(); }
-        catch (IException *e)
-        {
-            ActPrintLog(activity, e, "Exception in writer->flush");
-            if (!exception.get())
-                exception.setown(e);
-        }
-    }
-    virtual const void *nextRow()
-    {
-        OwnedConstThorRow row = smartbuf->nextRow();
-        if (exception) 
-            throw exception.getClear();
-        return row.getClear();
-    }
-    virtual void stop()
-    {
-        if (smartbuf)
-        {
-            smartbuf->stop(); // just in case blocked
-            stopThread();
-            smartbuf.clear();
-            if (exception) 
-                throw exception.getClear();
-        }
-    }
-};
-
 class CLoopSlaveActivityBase : public CSlaveActivity, public CThorDataLink
 {
 protected:
@@ -216,7 +132,6 @@ public:
 class CLoopSlaveActivity : public CLoopSlaveActivityBase
 {
     Owned<IRowStream> curInput;
-    Owned<CNextRowFeeder> nextRowFeeder;
     Owned<IRowWriterMultiReader> loopPending;
     rowcount_t loopPendingCount;
     unsigned flags, lastMs;
@@ -224,6 +139,103 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
     bool eof, finishedLooping;
     Owned<IBarrier> barrier;
 
+    class CNextRowFeeder : public CSimpleInterface, implements IThreaded, implements IRowStream
+    {
+        CThreaded threaded;
+        CLoopSlaveActivity *activity;
+        Owned<ISmartRowBuffer> smartbuf;
+        bool stopped, stopping;
+        Owned<IException> exception;
+        IRowInterfaces *rowInterfaces;
+
+        void stopThread()
+        {
+            if (!stopped)
+            {
+                stopped = true;
+                threaded.join();
+            }
+        }
+    public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+        CNextRowFeeder(CLoopSlaveActivity *_activity) : threaded("CNextRowFeeder"), activity(_activity), rowInterfaces(_activity)
+        {
+            stopped = true;
+            stopping = false;
+            smartbuf.setown(createSmartInMemoryBuffer(activity, activity, SMALL_SMART_BUFFER_SIZE));
+            threaded.init(this);
+        }
+        ~CNextRowFeeder()
+        {
+            stopThread();
+        }
+        void abort()
+        {
+            if (smartbuf)
+            {
+                smartbuf->stop(); // just in case blocked
+                stopThread();
+            }
+        }
+        void main()
+        {
+            stopped = false;
+            Linked<IRowWriter> writer = smartbuf->queryWriter();
+            try
+            {
+                while (!stopped)
+                {
+                    OwnedConstThorRow row = activity->getNextRow(stopping);
+                    if (!row)
+                    {
+                        row.setown(activity->getNextRow(stopping));
+                        if (!row)
+                            break;
+                        else if (!stopping)
+                            writer->putRow(NULL); // eog
+                    }
+                    if (!stopping)
+                        writer->putRow(row.getClear());
+                }
+                activity->doStop();
+            }
+            catch (IException *e)
+            {
+                ::ActPrintLog(activity, e);
+                exception.setown(e);
+            }
+            try { writer->flush(); }
+            catch (IException *e)
+            {
+                ::ActPrintLog(activity, e, "Exception in writer->flush");
+                if (!exception.get())
+                    exception.setown(e);
+            }
+        }
+        virtual const void *nextRow()
+        {
+            OwnedConstThorRow row = smartbuf->nextRow();
+            if (exception)
+                throw exception.getClear();
+            return row.getClear();
+        }
+        virtual void stop()
+        {
+            /* NB: signals wants to stop and discards further rows coming out of loop,
+             * but reader thread keeps looping, until finishedLooping=true.
+             */
+            stopping = true;
+            smartbuf->stop(); // just in case blocked
+            threaded.join();
+            stopped = true;
+            smartbuf.clear();
+            if (exception)
+                throw exception.getClear();
+        }
+    };
+    Owned<CNextRowFeeder> nextRowFeeder;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -252,6 +264,8 @@ public:
     virtual void abort()
     {
         CLoopSlaveActivityBase::abort();
+        if (nextRowFeeder)
+            nextRowFeeder->abort();
         if (barrier)
             barrier->cancel();
     }
@@ -272,31 +286,16 @@ public:
         curInput.set(input);
         lastMs = msTick();
 
-        class CWrapper : public CSimpleInterface, implements IRowStream
-        {
-            CLoopSlaveActivity &activity;
-        public:
-            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-            CWrapper(CLoopSlaveActivity &_activity) : activity(_activity) { }
-            virtual const void *nextRow()
-            {
-                return activity.getNextRow();
-            }
-            virtual void stop()
-            {
-                activity.doStop();
-            }
-        };
         ActPrintLog("maxIterations = %d", maxIterations);
         dataLinkStart();
-        nextRowFeeder.setown(new CNextRowFeeder(this, new CWrapper(*this)));
+        nextRowFeeder.setown(new CNextRowFeeder(this));
     }
     void doStop()
     {
         loopPending.clear();
         CLoopSlaveActivityBase::doStop();
     }
-    const void *getNextRow()
+    const void *getNextRow(bool stopping)
     {
         ActivityTimer t(totalCycles, timeActivities);
         if (!abortSoon && !eof)
@@ -326,19 +325,24 @@ public:
                 if (abortSoon)
                     break;
 
-                switch (container.getKind())
+                if (stopping)
+                    finishedLooping = true;
+                else
                 {
-                case TAKloopdataset:
-                    assertex(flags & IHThorLoopArg::LFnewloopagain);
-                    // NB: finishedLooping set at end of loop, based on loopAgain result
-                    break;
-                case TAKlooprow:
-                    if (0 == loopPendingCount)
-                        finishedLooping = true; // This slave has finished
-                    break;
-                case TAKloopcount:
-                    // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
-                    break;
+                    switch (container.getKind())
+                    {
+                    case TAKloopdataset:
+                        assertex(flags & IHThorLoopArg::LFnewloopagain);
+                        // NB: finishedLooping set at end of loop, based on loopAgain result
+                        break;
+                    case TAKlooprow:
+                        if (0 == loopPendingCount)
+                            finishedLooping = true; // This slave has finished
+                        break;
+                    case TAKloopcount:
+                        // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
+                        break;
+                    }
                 }
 
                 if (loopPendingCount)
@@ -427,7 +431,7 @@ public:
     }
     virtual void stop()
     {
-        nextRowFeeder->stop();
+        nextRowFeeder->stop(); // NB: This will block if this slave's loop hasn't hit eof, it will continue looping until 'finishedLooping'
     }
 };