浏览代码

Implementation of a persist thread

Similar to CThreaded, but thread does not stop, instead waits
on a semaphore. To be used where a class reuses a thread quickly.
Preventing the creation/destruction or restarting/joining of a thread can
significanly speed up a process if a frequent event

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 年之前
父节点
当前提交
7145aec102
共有 2 个文件被更改,包括 31 次插入25 次删除
  1. 28 24
      system/jlib/jthread.cpp
  2. 3 1
      system/jlib/jthread.hpp

+ 28 - 24
system/jlib/jthread.cpp

@@ -514,6 +514,7 @@ StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
 
 CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner)
 {
+    halt = false;
     atomic_set(&state, s_ready);
     athread.start();
 }
@@ -521,7 +522,7 @@ CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) :
 CThreadedPersistent::~CThreadedPersistent()
 {
     join(INFINITE);
-    atomic_set(&state, s_stop);
+    halt = true;
     sem.signal();
     athread.join();
 }
@@ -531,9 +532,18 @@ void CThreadedPersistent::main()
     loop
     {
         sem.wait();
-        if (s_stop == atomic_read(&state))
+        if (halt)
             break;
-        owner->main();
+        try
+        {
+            owner->main();
+        }
+        catch (IException *e)
+        {
+            exception.setown(e);
+            joinSem.signal(); // leave in running state, signal to join to handle
+            continue;
+        }
         if (!atomic_cas(&state, s_ready, s_running))
             if (atomic_cas(&state, s_ready, s_joining))
                 joinSem.signal();
@@ -546,11 +556,8 @@ void CThreadedPersistent::start()
     {
         VStringBuffer msg("CThreadedPersistent::start(%s) - not ready", athread.getName());
         WARNLOG("%s", msg.str());
-#ifdef _DEBUG
         PrintStackReport();
         throw MakeStringException(-1, "%s", msg.str());
-#endif
-        return;
     }
     sem.signal();
 }
@@ -559,27 +566,24 @@ bool CThreadedPersistent::join(unsigned timeout)
 {
     if (atomic_cas(&state, s_joining, s_running))
     {
-        if (joinSem.wait(timeout)) // NB: state change to ready by signaller
-            return true;
-        if (!atomic_cas(&state, s_running, s_joining)) // put back if still in running state
-        {   // if here, either a) main() made s_ready after timeout and has or will signal, or b) stopped
-            if (s_ready == atomic_read(&state)) // need to swallow signal
-                if (!joinSem.wait(60000)) // should be instant
-                    throwUnexpected();
+        if (!joinSem.wait(timeout))
+        {
+            if (atomic_cas(&state, s_running, s_joining)) // if still joining, restore running state 
+                return false;
+            // if here, main() set s_ready after timeout and has or will signal
+            if (!joinSem.wait(60000)) // should be instant
+                throwUnexpected();
             return true;
         }
-        return false;
-    }
-#ifdef _DEBUG
-    ThreadStates s = (ThreadStates) atomic_read(&state);
-    if (s != s_stop && s != s_ready)
-    {
-        VStringBuffer msg("CThreadedPersistent(%s)::join called while not running, state now: %d", athread.getName(), s);
-        PROGLOG("%s", msg.str());
-        PrintStackReport();
-        throw MakeStringException(-1, "%s", msg.str());
+        if (exception.get())
+        {
+            // switch back to ready state and throw
+            Owned<IException> e = exception.getClear();
+            if (!atomic_cas(&state, s_ready, s_joining))
+                throwUnexpected();
+            throw e.getClear();
+        }
     }
-#endif
     return true;
 }
 

+ 3 - 1
system/jlib/jthread.hpp

@@ -151,10 +151,12 @@ class jlib_decl CThreadedPersistent : public CInterface
         CAThread(CThreadedPersistent &_owner, const char *name) : Thread(name), owner(_owner) { }
         virtual int run() { owner.main(); return 1; }
     } athread;
+    Owned<IException> exception;
     IThreaded *owner;
     Semaphore sem, joinSem;
     atomic_t state;
-    enum ThreadStates { s_stop, s_ready, s_running, s_joining };
+    bool halt;
+    enum ThreadStates { s_ready, s_running, s_joining };
 
     void main();
 public: