jthread.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifndef __JTHREAD__
  15. #define __JTHREAD__
  16. #include "jexpdef.hpp"
  17. #include "jiface.hpp"
  18. #include "jmutex.hpp"
  19. #include "jexcept.hpp"
  20. #include "jhash.hpp"
  21. #ifdef _WIN32
  22. #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL
  23. #else
  24. // no thread priority handling?
  25. #endif
  26. interface jlib_decl IThread : public IInterface
  27. {
  28. virtual void start() = 0;
  29. virtual int run() = 0;
  30. };
  31. interface jlib_decl IThreadName
  32. {
  33. virtual const char *get()=0;
  34. };
  35. extern jlib_decl void addThreadExceptionHandler(IExceptionHandler *handler);
  36. extern jlib_decl void removeThreadExceptionHandler(IExceptionHandler *handler);
  37. extern jlib_decl void enableThreadSEH();
  38. extern jlib_decl void disableThreadSEH();
  39. extern jlib_decl unsigned threadLogID(); // for use in logging
  40. class jlib_decl Thread : public CInterface, public IThread
  41. {
  42. private:
  43. ThreadId threadid;
  44. unsigned short stacksize; // in 4K blocks
  45. char prioritydelta;
  46. char nicelevel;
  47. bool alive;
  48. unsigned tidlog;
  49. #ifdef _WIN32
  50. HANDLE hThread;
  51. static unsigned WINAPI _threadmain(LPVOID v);
  52. #else
  53. static void *_threadmain(void *v);
  54. #endif
  55. virtual int begin();
  56. void init(const char *name);
  57. void handleException(IException *e);
  58. void adjustNiceLevel();
  59. protected:
  60. struct cThreadName: implements IThreadName
  61. {
  62. char *threadname;
  63. const char *get() { return threadname; }
  64. } cthreadname;
  65. IThreadName *ithreadname;
  66. public:
  67. #ifndef _WIN32
  68. Semaphore suspend;
  69. Semaphore starting;
  70. #endif
  71. Semaphore stopped;
  72. IMPLEMENT_IINTERFACE;
  73. Thread(const char *_name) { init(_name); }
  74. Thread() { init(NULL); }
  75. ~Thread();
  76. void adjustPriority(char delta);
  77. void setNice(char nicelevel);
  78. void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously)
  79. const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }
  80. bool isAlive() { return alive; }
  81. bool join(unsigned timeout=INFINITE);
  82. virtual void start();
  83. virtual void startRelease();
  84. StringBuffer &getInfo(StringBuffer &str) { str.appendf("%8"I64F"X %6"I64F"d %u: %s",(__int64)threadid,(__int64)threadid,tidlog,getName()); return str; }
  85. const char *getLogInfo(int &thandle,unsigned &tid) {
  86. #ifdef _WIN32
  87. thandle = (int)(memsize_t)hThread;
  88. #elif defined __FreeBSD__ || defined __APPLE__
  89. thandle = (int)(memsize_t)threadid;
  90. #else
  91. thandle = threadid;
  92. #endif
  93. tid = tidlog;
  94. return getName();
  95. }
  96. // run method not implemented - concrete derived classes must do so
  97. static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:)
  98. IThreadName *queryThreadName() { return ithreadname; }
  99. void setThreadName(IThreadName *name) { ithreadname = name; }
  100. };
  101. interface IThreaded
  102. {
  103. virtual void main() = 0;
  104. };
  105. // utility class, useful for containing a thread
  106. class CThreaded : public Thread
  107. {
  108. IThreaded *owner;
  109. public:
  110. inline CThreaded(const char *name, IThreaded *_owner) : Thread(name), owner(_owner) { }
  111. inline CThreaded(const char *name) : Thread(name) { owner = NULL; }
  112. inline void init(IThreaded *_owner) { owner = _owner; start(); }
  113. virtual int run() { owner->main(); return 1; }
  114. };
  115. // Similar to above, but the underlying thread always remains running. This can make repeated start + join's significantly quicker
  116. class jlib_decl CThreadedPersistent : public CInterface
  117. {
  118. class CAThread : public Thread
  119. {
  120. CThreadedPersistent &owner;
  121. public:
  122. CAThread(CThreadedPersistent &_owner, const char *name) : Thread(name), owner(_owner) { }
  123. virtual int run() { owner.main(); return 1; }
  124. } athread;
  125. Owned<IException> exception;
  126. IThreaded *owner;
  127. Semaphore sem, joinSem;
  128. atomic_t state;
  129. bool halt;
  130. enum ThreadStates { s_ready, s_running, s_joining };
  131. void main();
  132. public:
  133. CThreadedPersistent(const char *name, IThreaded *_owner);
  134. ~CThreadedPersistent();
  135. void start();
  136. bool join(unsigned timeout=INFINITE);
  137. };
  138. // Asynchronous 'for' utility class
  139. // see HRPCUTIL.CPP for example of usage
  140. class jlib_decl CAsyncFor
  141. {
  142. public:
  143. void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false);
  144. virtual void Do(unsigned idx=0)=0;
  145. };
  146. // Thread local storage - use MAKETHREADLOCALIINTERFACE macro to get a thread local type
  147. template <class CLASS, class CLASSINIT = CLASS, class MAP = MapBetween<ThreadId, ThreadId, CLASS, CLASSINIT> >
  148. class ThreadLocalOf : public MAP
  149. {
  150. public:
  151. CLASS * query()
  152. {
  153. CLASS * find = threadMap.getValue(GetCurrentThreadId());
  154. if(find) return find;
  155. threadMap.setValue(GetCurrentThreadId(), CLASSINIT());
  156. return threadMap.getValue(GetCurrentThreadId());
  157. }
  158. operator CLASS & ()
  159. {
  160. return *query();
  161. }
  162. private:
  163. MAP threadMap;
  164. };
  165. #define MAKETHREADLOCALIINTERFACE(C, CI, NAME) \
  166. typedef ThreadLocalOf<C, CI> NAME
  167. // ---------------------------------------------------------------------------
  168. // Thread Pools
  169. // ---------------------------------------------------------------------------
  170. interface IPooledThread: extends IInterface // base class for deriving pooled thread (alternative to Thread)
  171. {
  172. public:
  173. virtual void init(void *param)=0; // called before main started (from within start)
  174. virtual void main()=0; // where threads code goes (param is passed from start)
  175. virtual bool stop()=0; // called to cause main to return, returns false if request rejected
  176. virtual bool canReuse()=0; // return true if object can be re-used (after stopped), otherwise released
  177. };
  178. interface IThreadFactory: extends IInterface // factory for creating new pooled instances (called when pool empty)
  179. {
  180. virtual IPooledThread *createNew()=0;
  181. };
  182. typedef IIteratorOf<IPooledThread> IPooledThreadIterator;
  183. typedef unsigned PooledThreadHandle;
  184. interface IThreadPool : extends IInterface
  185. {
  186. virtual PooledThreadHandle start(void *param)=0; // starts a new thread reuses stopped pool entries
  187. virtual PooledThreadHandle start(void *param,const char *name)=0; // starts a new thread reuses stopped pool entries
  188. virtual PooledThreadHandle start(void *param,const char *name,unsigned timeout)=0; // starts a new thread reuses stopped pool entries, throws exception if can't start within timeout
  189. virtual bool stop(PooledThreadHandle handle)=0; // initiates stop on specified thread (may return false)
  190. virtual bool stopAll(bool tryall=false)=0; // initiates stop on all threads, if tryall continues even if one or more fails
  191. virtual bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)=0;
  192. // waits for a single thread to terminate
  193. virtual bool joinAll(bool del=true,unsigned timeout=INFINITE)=0; // waits for all threads in thread pool to terminate
  194. // if del true frees all pooled threads
  195. virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads
  196. virtual unsigned runningCount()=0; // number of currently running threads
  197. virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
  198. virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception
  199. };
  200. extern jlib_decl IThreadPool *createThreadPool(
  201. const char *poolname, // trace name of pool
  202. IThreadFactory *factory, // factory for creating new thread instances
  203. IExceptionHandler *exceptionHandler=NULL, // optional exception handler
  204. unsigned defaultmax=50, // maximum number of threads before starts blocking
  205. unsigned delay=1000, // maximum delay on each block
  206. unsigned stacksize=0, // stack size (bytes) 0 is default
  207. unsigned timeoutOnRelease=INFINITE, // maximum time waited for thread to terminate on releasing pool
  208. unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax)
  209. );
  210. extern jlib_decl StringBuffer &getThreadList(StringBuffer &str);
  211. extern jlib_decl unsigned getThreadCount();
  212. extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0
  213. // Simple pipe process support
  214. interface ISimpleReadStream;
  215. interface IPipeProcess: extends IInterface
  216. {
  217. virtual bool run(const char *title,const char *prog, const char *dir,
  218. bool hasinput,bool hasoutput,bool haserror=false,
  219. size32_t stderrbufsize=0) = 0; // set to non-zero to automatically buffer stderror output
  220. virtual bool hasInput() = 0; // i.e. can write to pipe
  221. virtual size32_t write(size32_t sz, const void *buffer) = 0; // write pipe process standard output
  222. virtual bool hasOutput() = 0; // i.e. can read from pipe
  223. virtual size32_t read(size32_t sz, void *buffer) = 0; // read from pipe process standard output
  224. virtual ISimpleReadStream *getOutputStream() = 0; // read from pipe process standard output
  225. virtual bool hasError() = 0; // i.e. can read from pipe stderr
  226. virtual size32_t readError(size32_t sz, void *buffer) = 0; // read from pipe process standard error
  227. virtual ISimpleReadStream *getErrorStream() = 0; // read from pipe process standard error
  228. virtual unsigned wait() = 0; // returns return code
  229. virtual unsigned wait(unsigned timeoutms, bool &timedout) = 0; // sets timedout to true if times out
  230. virtual void closeInput() = 0; // indicate finished input to pipe
  231. virtual void closeOutput() = 0; // indicate finished reading from pipe (generally called automatically)
  232. virtual void closeError() = 0; // indicate finished reading from pipe stderr
  233. virtual void abort() = 0;
  234. virtual void notifyTerminated(HANDLE pid,unsigned retcode) = 0; // internal
  235. virtual HANDLE getProcessHandle() = 0; // used to auto kill
  236. };
  237. extern jlib_decl IPipeProcess *createPipeProcess(const char *allowedprograms=NULL);
  238. //--------------------------------------------------------
  239. interface IWorkQueueItem: extends IInterface
  240. {
  241. virtual void execute()=0;
  242. };
  243. interface IWorkQueueThread: extends IInterface
  244. {
  245. virtual void post(IWorkQueueItem *item)=0; // takes ownership of item
  246. virtual void wait()=0;
  247. virtual unsigned pending()=0;
  248. };
  249. // Simple lightweight async worker queue
  250. // internally thread persists for specified time waiting before self destroying
  251. extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=1000*60);
  252. #endif