فهرست منبع

HPCC-10945 Eclccserver doesn't support aborting query compilation

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 سال پیش
والد
کامیت
102f9662e2
3فایلهای تغییر یافته به همراه63 افزوده شده و 4 حذف شده
  1. 51 1
      ecl/eclccserver/eclccserver.cpp
  2. 10 2
      system/jlib/jthread.cpp
  3. 2 1
      system/jlib/jthread.hpp

+ 51 - 1
ecl/eclccserver/eclccserver.cpp

@@ -17,6 +17,7 @@
 
 #include <jlib.hpp>
 #include <jmisc.hpp>
+#include <jisem.hpp>
 #include <jfile.hpp>
 #include <jencrypt.hpp>
 #include <jregexp.hpp>
@@ -122,6 +123,52 @@ private:
 };
 
 //------------------------------------------------------------------------------------------------------------------
+// Check for aborts of the workunit as it is compiling
+//------------------------------------------------------------------------------------------------------------------
+
+class AbortWaiter : public Thread
+{
+public:
+    AbortWaiter(IPipeProcess *_pipe, IConstWorkUnit *_wu)
+        : Thread("EclccCompileThread::AbortWaiter"), pipe(_pipe), wu(_wu)
+    {
+    }
+
+    virtual int run()
+    {
+        wu->subscribe(SubscribeOptionAbort);
+        try
+        {
+            loop
+            {
+                if (sem.wait(2000))
+                    break;
+                if (wu->aborting())
+                {
+                    pipe->abort();
+                    break;
+                }
+            }
+        }
+        catch (IException *E)
+        {
+            ::Release(E);
+        }
+        return 0;
+    }
+
+    void stop()
+    {
+        sem.interrupt(NULL);
+        join();
+    }
+private:
+    IPipeProcess *pipe;
+    IConstWorkUnit *wu;
+    InterruptableSemaphore sem;
+};
+
+//------------------------------------------------------------------------------------------------------------------
 // Class EclccCompileThread does the work of compiling workunits (using eclcc), and optionally then enqueueing them for execution by agentexec.
 // A threadpool is used to allow multiple compiles to be submitted at once. Threads are reused when compilation completes.
 //------------------------------------------------------------------------------------------------------------------
@@ -307,9 +354,11 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
         {
             unsigned time = msTick();
             Owned<ErrorReader> errorReader = new ErrorReader(pipe, this);
+            Owned<AbortWaiter> abortWaiter = new AbortWaiter(pipe, workunit);
             eclccCmd.insert(0, eclccProgName);
-            pipe->run(eclccProgName, eclccCmd, ".", true, false, true, 0);
+            pipe->run(eclccProgName, eclccCmd, ".", true, false, true, 0, true);
             errorReader->start();
+            abortWaiter->start();
             try
             {
                 pipe->write(eclQuery.s.length(), eclQuery.s.str());
@@ -322,6 +371,7 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
             }
             unsigned retcode = pipe->wait();
             errorReader->join();
+            abortWaiter->stop();
             if (retcode == 0)
             {
                 StringBuffer realdllname, dllurl;

+ 10 - 2
system/jlib/jthread.cpp

@@ -1758,6 +1758,7 @@ protected: friend class PipeWriterThread;
     bool hasinput;
     bool hasoutput;
     bool haserror;
+    bool newProcessGroup;
     StringAttr title;
     StringAttr cmd;
     StringAttr prog;
@@ -1785,6 +1786,7 @@ public:
         retcode = -1;
         aborted = false;
         stderrbufferthread = NULL;
+        newProcessGroup = false;
     }
     ~CLinuxPipeProcess()
     {
@@ -1873,6 +1875,8 @@ public:
             }
         }
         if (pipeProcess==0) { // child
+            if (newProcessGroup)//Force the child process into its own process group, so we can terminate it and its children.
+                setpgid(0,0);
             if (hasinput) {
                 dup2(inpipe[0],0);
                 close(inpipe[0]);
@@ -1917,7 +1921,7 @@ public:
             closeOutput();
     }
 
-    bool run(const char *_title,const char *_prog,const char *_dir,bool _hasinput,bool _hasoutput, bool _haserror, size32_t stderrbufsize)
+    bool run(const char *_title,const char *_prog,const char *_dir,bool _hasinput,bool _hasoutput, bool _haserror, size32_t stderrbufsize, bool _newProcessGroup)
     {
         static CriticalSection runsect; // single thread process start to avoid forked handle open/closes interleaving
         CriticalBlock runblock(runsect);
@@ -1926,6 +1930,7 @@ public:
         hasinput = _hasinput;
         hasoutput = _hasoutput;
         haserror = _haserror;
+        newProcessGroup = _newProcessGroup;
         title.clear();
         prog.set(_prog);
         dir.set(_dir);
@@ -2158,7 +2163,10 @@ public:
             if (pipeProcess != (HANDLE)-1) {
                 if (title.length())
                     PROGLOG("%s: Forcibly killing pipe process %d",title.get(),pipeProcess);
-                ::kill(pipeProcess,SIGKILL);            // if this doesn't kill it we are in trouble
+                if (newProcessGroup)
+                    ::kill(-pipeProcess,SIGKILL);
+                else
+                    ::kill(pipeProcess,SIGKILL);            // if this doesn't kill it we are in trouble
                 CriticalUnblock unblock(sect);
                 wait();
             }

+ 2 - 1
system/jlib/jthread.hpp

@@ -250,7 +250,8 @@ interface IPipeProcess: extends IInterface
 {
     virtual bool run(const char *title,const char *prog, const char *dir,
                        bool hasinput,bool hasoutput,bool haserror=false,
-                       size32_t stderrbufsize=0) = 0;               // set to non-zero to automatically buffer stderror output
+                       size32_t stderrbufsize=0,                      // set to non-zero to automatically buffer stderror output
+                       bool newProcessGroup=false) = 0;
     virtual bool hasInput() = 0;                                    // i.e. can write to pipe
     virtual size32_t write(size32_t sz, const void *buffer) = 0;    // write pipe process standard output
     virtual bool hasOutput() = 0;                                   // i.e. can read from pipe