ソースを参照

Merge pull request #7364 from jakesmith/hpcc-13612

HPCC-13612 Fix race condition on pipe stop that caused deadlock

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年 前
コミット
1532b91e42
1 ファイル変更44 行追加49 行削除
  1. 44 49
      system/jlib/jthread.cpp

+ 44 - 49
system/jlib/jthread.cpp

@@ -1732,7 +1732,7 @@ class CLinuxPipeProcess: public CInterface, implements IPipeProcess
             }
             return 0;
         }
-        void stop() 
+        void stop()
         {
             stopsem.signal();
             Thread::join();
@@ -1780,6 +1780,27 @@ protected: friend class PipeWriterThread;
     StringArray envVars;
     StringArray envValues;
 
+    void clearUtilityThreads()
+    {
+        Owned<cForkThread> ft;
+        cStdErrorBufferThread *et;
+        {
+            CriticalBlock block(sect); // clear forkthread and stderrbufferthread
+            ft.setown(forkthread.getClear());
+            et = stderrbufferthread;
+            stderrbufferthread = NULL;
+        }
+        if (ft)
+        {
+            ft->join();
+            ft.clear();
+        }
+        if (et)
+        {
+            et->stop();
+            delete et;
+        }
+    }
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -1805,22 +1826,7 @@ public:
         closeInput();
         closeOutput();
         closeError();
-
-        Owned<cForkThread> ft;
-        cStdErrorBufferThread *et;
-        {   CriticalBlock block(sect); // clear forkthread  and stderrbufferthread
-            ft.setown(forkthread.getClear());
-            et = stderrbufferthread;
-            stderrbufferthread = NULL;
-        }
-        if (ft) {
-            ft->join();
-            ft.clear();
-        }
-        if (et) {
-            et->stop();
-            delete et;
-        }
+        clearUtilityThreads();
     }
 
 
@@ -2083,51 +2089,40 @@ public:
             pipeProcess = (HANDLE)-1;
         }
     }
-    
-    
+
     unsigned wait()
     {
-        CriticalBlock block(sect); 
-        if (stderrbufferthread)
-            stderrbufferthread->stop();
-        if (forkthread) {
-            {
-                CriticalUnblock unblock(sect);
-                forkthread->join();
-            }
-            if (pipeProcess != (HANDLE)-1) {
-                if (title.length())
-                    PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
-                pipeProcess = (HANDLE)-1;
-            }
-            forkthread.clear();
-        }
-        return retcode;
+        bool timedout;
+        return wait(INFINITE, timedout);
     }
 
     unsigned wait(unsigned timeoutms, bool &timedout)
     {
-        CriticalBlock block(sect); 
         timedout = false;
-        if (forkthread) {
+        if (INFINITE != timeoutms)
+        {
+            CriticalBlock block(sect);
+            if (forkthread)
             {
-                CriticalUnblock unblock(sect);
-                if (!forkthread->join(timeoutms)) {
-                    timedout = true;
-                    return retcode;
+                {
+                    CriticalUnblock unblock(sect);
+                    if (!forkthread->join(timeoutms))
+                    {
+                        timedout = true;
+                        return retcode;
+                    }
                 }
-
-            }
-            if (pipeProcess != (HANDLE)-1) {
-                if (title.length())
-                    PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
-                pipeProcess = (HANDLE)-1;
             }
-            forkthread.clear();
+        }
+        clearUtilityThreads(); // NB: will recall forkthread->join(), but doesn't matter
+        if (pipeProcess != (HANDLE)-1)
+        {
+            if (title.length())
+                PROGLOG("%s: Pipe: process %d complete %d", title.get(), pipeProcess, retcode);
+            pipeProcess = (HANDLE)-1;
         }
         return retcode;
     }
-    
 
     void closeOutput()
     {