jthread.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifndef __JTHREAD__
  15. #define __JTHREAD__
  16. #include "jexpdef.hpp"
  17. #include "jiface.hpp"
  18. #include "jmutex.hpp"
  19. #include "jexcept.hpp"
  20. #include "jhash.hpp"
  21. #ifdef _WIN32
  22. #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL
  23. #else
  24. // no thread priority handling?
  25. #endif
  26. interface jlib_decl IThread : public IInterface
  27. {
  28. virtual void start() = 0;
  29. virtual int run() = 0;
  30. };
  31. interface jlib_decl IThreadName
  32. {
  33. virtual const char *get()=0;
  34. };
  35. extern jlib_decl void addThreadExceptionHandler(IExceptionHandler *handler);
  36. extern jlib_decl void removeThreadExceptionHandler(IExceptionHandler *handler);
  37. extern jlib_decl void enableThreadSEH();
  38. extern jlib_decl void disableThreadSEH();
  39. extern jlib_decl unsigned threadLogID(); // for use in logging
  40. class jlib_decl Thread : public CInterface, public IThread
  41. {
  42. private:
  43. ThreadId threadid;
  44. unsigned short stacksize; // in 4K blocks
  45. char prioritydelta;
  46. char nicelevel;
  47. bool alive;
  48. unsigned tidlog;
  49. #ifdef _WIN32
  50. HANDLE hThread;
  51. static unsigned WINAPI _threadmain(LPVOID v);
  52. #else
  53. static void *_threadmain(void *v);
  54. #endif
  55. virtual int begin();
  56. void init(const char *name);
  57. void handleException(IException *e);
  58. void adjustNiceLevel();
  59. protected:
  60. struct cThreadName: implements IThreadName
  61. {
  62. char *threadname;
  63. const char *get() { return threadname; }
  64. } cthreadname;
  65. IThreadName *ithreadname;
  66. public:
  67. #ifndef _WIN32
  68. Semaphore suspend;
  69. Semaphore starting;
  70. #endif
  71. Semaphore stopped;
  72. IMPLEMENT_IINTERFACE;
  73. Thread(const char *_name) { init(_name); }
  74. Thread() { init(NULL); }
  75. ~Thread();
  76. void adjustPriority(char delta);
  77. void setNice(char nicelevel);
  78. void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously)
  79. const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }
  80. bool isAlive() { return alive; }
  81. bool join(unsigned timeout=INFINITE);
  82. virtual void start();
  83. virtual void startRelease();
  84. StringBuffer &getInfo(StringBuffer &str) { str.appendf("%8"I64F"X %6"I64F"d %u: %s",(__int64)threadid,(__int64)threadid,tidlog,getName()); return str; }
  85. const char *getLogInfo(int &thandle,unsigned &tid) {
  86. #ifdef _WIN32
  87. thandle = (int)(memsize_t)hThread;
  88. #elif defined __FreeBSD__
  89. thandle = (int)(memsize_t)threadid;
  90. #else
  91. thandle = threadid;
  92. #endif
  93. tid = tidlog;
  94. return getName();
  95. }
  96. // run method not implemented - concrete derived classes must do so
  97. static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:)
  98. IThreadName *queryThreadName() { return ithreadname; }
  99. void setThreadName(IThreadName *name) { ithreadname = name; }
  100. };
  101. interface IThreaded
  102. {
  103. virtual void main() = 0;
  104. };
  105. // utility class, useful for containing a thread
  106. class CThreaded : public Thread
  107. {
  108. IThreaded *owner;
  109. public:
  110. CThreaded(const char *name) : Thread(name) { }
  111. void init(IThreaded *_owner) { owner = _owner; start(); }
  112. virtual int run() { owner->main(); return 1; }
  113. };
  114. // Asynchronous 'for' utility class
  115. // see HRPCUTIL.CPP for example of usage
  116. class jlib_decl CAsyncFor
  117. {
  118. public:
  119. void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false);
  120. virtual void Do(unsigned idx=0)=0;
  121. };
  122. // Thread local storage - use MAKETHREADLOCALIINTERFACE macro to get a thread local type
  123. template <class CLASS, class CLASSINIT = CLASS, class MAP = MapBetween<ThreadId, ThreadId, CLASS, CLASSINIT> >
  124. class ThreadLocalOf : public MAP
  125. {
  126. public:
  127. CLASS * query()
  128. {
  129. CLASS * find = threadMap.getValue(GetCurrentThreadId());
  130. if(find) return find;
  131. threadMap.setValue(GetCurrentThreadId(), CLASSINIT());
  132. return threadMap.getValue(GetCurrentThreadId());
  133. }
  134. operator CLASS & ()
  135. {
  136. return *query();
  137. }
  138. private:
  139. MAP threadMap;
  140. };
  141. #define MAKETHREADLOCALIINTERFACE(C, CI, NAME) \
  142. typedef ThreadLocalOf<C, CI> NAME
  143. // ---------------------------------------------------------------------------
  144. // Thread Pools
  145. // ---------------------------------------------------------------------------
  146. interface IPooledThread: extends IInterface // base class for deriving pooled thread (alternative to Thread)
  147. {
  148. public:
  149. virtual void init(void *param)=0; // called before main started (from within start)
  150. virtual void main()=0; // where threads code goes (param is passed from start)
  151. virtual bool stop()=0; // called to cause main to return, returns false if request rejected
  152. virtual bool canReuse()=0; // return true if object can be re-used (after stopped), otherwise released
  153. };
  154. interface IThreadFactory: extends IInterface // factory for creating new pooled instances (called when pool empty)
  155. {
  156. virtual IPooledThread *createNew()=0;
  157. };
  158. typedef IIteratorOf<IPooledThread> IPooledThreadIterator;
  159. typedef unsigned PooledThreadHandle;
  160. interface IThreadPool : extends IInterface
  161. {
  162. virtual PooledThreadHandle start(void *param)=0; // starts a new thread reuses stopped pool entries
  163. virtual PooledThreadHandle start(void *param,const char *name)=0; // starts a new thread reuses stopped pool entries
  164. virtual PooledThreadHandle start(void *param,const char *name,unsigned timeout)=0; // starts a new thread reuses stopped pool entries, throws exception if can't start within timeout
  165. virtual bool stop(PooledThreadHandle handle)=0; // initiates stop on specified thread (may return false)
  166. virtual bool stopAll(bool tryall=false)=0; // initiates stop on all threads, if tryall continues even if one or more fails
  167. virtual bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)=0;
  168. // waits for a single thread to terminate
  169. virtual bool joinAll(bool del=true,unsigned timeout=INFINITE)=0; // waits for all threads in thread pool to terminate
  170. // if del true frees all pooled threads
  171. virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads
  172. virtual unsigned runningCount()=0; // number of currently running threads
  173. virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
  174. virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception
  175. };
  176. extern jlib_decl IThreadPool *createThreadPool(
  177. const char *poolname, // trace name of pool
  178. IThreadFactory *factory, // factory for creating new thread instances
  179. IExceptionHandler *exceptionHandler=NULL, // optional exception handler
  180. unsigned defaultmax=50, // maximum number of threads before starts blocking
  181. unsigned delay=1000, // maximum delay on each block
  182. unsigned stacksize=0, // stack size (bytes) 0 is default
  183. unsigned timeoutOnRelease=INFINITE, // maximum time waited for thread to terminate on releasing pool
  184. unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax)
  185. );
  186. extern jlib_decl StringBuffer &getThreadList(StringBuffer &str);
  187. extern jlib_decl unsigned getThreadCount();
  188. extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0
  189. // Simple pipe process support
  190. interface ISimpleReadStream;
  191. interface IPipeProcess: extends IInterface
  192. {
  193. virtual bool run(const char *title,const char *prog, const char *dir,
  194. bool hasinput,bool hasoutput,bool haserror=false,
  195. size32_t stderrbufsize=0) = 0; // set to non-zero to automatically buffer stderror output
  196. virtual bool hasInput() = 0; // i.e. can write to pipe
  197. virtual size32_t write(size32_t sz, const void *buffer) = 0; // write pipe process standard output
  198. virtual bool hasOutput() = 0; // i.e. can read from pipe
  199. virtual size32_t read(size32_t sz, void *buffer) = 0; // read from pipe process standard output
  200. virtual ISimpleReadStream *getOutputStream() = 0; // read from pipe process standard output
  201. virtual bool hasError() = 0; // i.e. can read from pipe stderr
  202. virtual size32_t readError(size32_t sz, void *buffer) = 0; // read from pipe process standard error
  203. virtual ISimpleReadStream *getErrorStream() = 0; // read from pipe process standard error
  204. virtual unsigned wait() = 0; // returns return code
  205. virtual unsigned wait(unsigned timeoutms, bool &timedout) = 0; // sets timedout to true if times out
  206. virtual void closeInput() = 0; // indicate finished input to pipe
  207. virtual void closeOutput() = 0; // indicate finished reading from pipe (generally called automatically)
  208. virtual void closeError() = 0; // indicate finished reading from pipe stderr
  209. virtual void abort() = 0;
  210. virtual void notifyTerminated(HANDLE pid,unsigned retcode) = 0; // internal
  211. virtual HANDLE getProcessHandle() = 0; // used to auto kill
  212. };
  213. extern jlib_decl IPipeProcess *createPipeProcess(const char *allowedprograms=NULL);
  214. //--------------------------------------------------------
  215. interface IWorkQueueItem: extends IInterface
  216. {
  217. virtual void execute()=0;
  218. };
  219. interface IWorkQueueThread: extends IInterface
  220. {
  221. virtual void post(IWorkQueueItem *item)=0; // takes ownership of item
  222. virtual void wait()=0;
  223. virtual unsigned pending()=0;
  224. };
  225. // Simple lightweight async worker queue
  226. // internally thread persists for specified time waiting before self destroying
  227. extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=1000*60);
  228. #endif