jthread.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __JTHREAD__
  14. #define __JTHREAD__
  15. #include "jiface.hpp"
  16. #include "jmutex.hpp"
  17. #include "jexcept.hpp"
  18. #include "jhash.hpp"
  19. #include <functional>
  20. #ifdef _WIN32
  21. #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL
  22. #else
  23. // no thread priority handling?
  24. #endif
  25. interface jlib_decl IThread : public IInterface
  26. {
  27. virtual void start() = 0;
  28. virtual int run() = 0;
  29. };
  30. interface jlib_decl IThreadName
  31. {
  32. virtual const char *get()=0;
  33. };
  34. extern jlib_decl bool isMainThread();
  35. extern jlib_decl void addThreadExceptionHandler(IExceptionHandler *handler);
  36. extern jlib_decl void removeThreadExceptionHandler(IExceptionHandler *handler);
  37. extern jlib_decl void enableThreadSEH();
  38. extern jlib_decl void disableThreadSEH();
  39. extern jlib_decl unsigned threadLogID(); // for use in logging
  40. // A function registered via addThreadTermFunc will be called when the thread that registered that function
  41. // terminates. Such a function should call on to the previously registered function (if any) - generally you
  42. // would expect to store that value in thread-local storage.
  43. // This can be used to ensure that thread-specific objects can be properly destructed.
  44. // Note that threadpools also call the thread termination hook when each thread's threadmain function terminates,
  45. // so the hook function should clear any variables if necessary rather than assuming that they will be cleared
  46. // at thread startup time.
  47. typedef bool (*ThreadTermFunc)(bool isPooled);
  48. extern jlib_decl void addThreadTermFunc(ThreadTermFunc onTerm);
  49. extern jlib_decl void callThreadTerminationHooks(bool isPooled);
  50. //An exception safe way of ensuring that the thread termination hooks are called.
  51. class jlib_decl QueryTerminationCleanup
  52. {
  53. bool isPooled;
  54. public:
  55. inline QueryTerminationCleanup(bool _isPooled) : isPooled(_isPooled) { }
  56. inline ~QueryTerminationCleanup() { callThreadTerminationHooks(isPooled); }
  57. };
  58. class jlib_decl Thread : public CInterface, public IThread
  59. {
  60. private:
  61. ThreadId threadid;
  62. unsigned short stacksize; // in 4K blocks
  63. int prioritydelta;
  64. int nicelevel;
  65. bool alive;
  66. unsigned tidlog;
  67. #ifdef _WIN32
  68. HANDLE hThread;
  69. static unsigned WINAPI _threadmain(LPVOID v);
  70. #else
  71. static void *_threadmain(void *v);
  72. #endif
  73. virtual int begin();
  74. void init(const char *name);
  75. void handleException(IException *e);
  76. void adjustNiceLevel();
  77. protected:
  78. struct cThreadName: implements IThreadName
  79. {
  80. char *threadname;
  81. const char *get() { return threadname; }
  82. } cthreadname;
  83. IThreadName *ithreadname;
  84. public:
  85. #ifndef _WIN32
  86. Semaphore suspend;
  87. Semaphore starting;
  88. #endif
  89. Semaphore stopped;
  90. IMPLEMENT_IINTERFACE;
  91. Thread(const char *_name) { init(_name); }
  92. Thread() { init(NULL); }
  93. ~Thread();
  94. void adjustPriority(int delta);
  95. bool isCurrentThread() const;
  96. void setNice(int nicelevel);
  97. void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously)
  98. const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }
  99. bool isAlive() { return alive; }
  100. bool join(unsigned timeout=INFINITE);
  101. virtual void start();
  102. virtual void startRelease();
  103. StringBuffer &getInfo(StringBuffer &str) { str.appendf("%8" I64F "X %6" I64F "d %u: %s",(__int64)threadid,(__int64)threadid,tidlog,getName()); return str; }
  104. const char *getLogInfo(int &thandle,unsigned &tid) {
  105. #ifdef _WIN32
  106. thandle = (int)(memsize_t)hThread;
  107. #elif defined __FreeBSD__ || defined __APPLE__
  108. thandle = (int)(memsize_t)threadid;
  109. #else
  110. thandle = (int)threadid;
  111. #endif
  112. tid = tidlog;
  113. return getName();
  114. }
  115. // run method not implemented - concrete derived classes must do so
  116. static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:)
  117. IThreadName *queryThreadName() { return ithreadname; }
  118. void setThreadName(IThreadName *name) { ithreadname = name; }
  119. };
  120. interface IThreaded
  121. {
  122. virtual void threadmain() = 0;
  123. protected:
  124. virtual ~IThreaded() {}
  125. };
  126. // utility class, useful for containing a thread
  127. class CThreaded : public Thread
  128. {
  129. IThreaded *owner;
  130. public:
  131. inline CThreaded(const char *name, IThreaded *_owner) : Thread(name), owner(_owner) { }
  132. inline CThreaded(const char *name) : Thread(name) { owner = NULL; }
  133. inline void init(IThreaded *_owner) { owner = _owner; start(); }
  134. virtual int run() { owner->threadmain(); return 1; }
  135. };
  136. extern jlib_decl void asyncStart(IThreaded & threaded);
  137. extern jlib_decl void asyncStart(const char * name, IThreaded & threaded);
  138. #if defined(__cplusplus) && __cplusplus >= 201100
  139. extern jlib_decl void asyncStart(std::function<void()> func);
  140. #endif
  141. // Similar to above, but the underlying thread always remains running. This can make repeated start + join's significantly quicker
  142. class jlib_decl CThreadedPersistent
  143. {
  144. class CAThread : public Thread
  145. {
  146. CThreadedPersistent &owner;
  147. public:
  148. CAThread(CThreadedPersistent &_owner, const char *name) : Thread(name), owner(_owner) { }
  149. virtual int run() { owner.threadmain(); return 1; }
  150. } athread;
  151. Owned<IException> exception;
  152. IThreaded *owner;
  153. Semaphore sem, joinSem;
  154. std::atomic_uint state;
  155. bool halt;
  156. enum ThreadStates { s_ready, s_running, s_joining };
  157. void threadmain();
  158. public:
  159. CThreadedPersistent(const char *name, IThreaded *_owner);
  160. ~CThreadedPersistent();
  161. void start();
  162. bool join(unsigned timeout, bool throwException = true);
  163. };
  164. // Asynchronous 'for' utility class
  165. // see HRPCUTIL.CPP for example of usage
  166. interface ITaskScheduler;
  167. class jlib_decl CAsyncFor
  168. {
  169. public:
  170. void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false);
  171. void TaskFor(unsigned num, ITaskScheduler & scheduler);
  172. virtual void Do(unsigned idx=0)=0;
  173. };
  174. template <typename AsyncFunc>
  175. class CAsyncForFunc final : public CAsyncFor
  176. {
  177. public:
  178. CAsyncForFunc(AsyncFunc _func) : func(_func) {}
  179. virtual void Do(unsigned idx=0) { func(idx); }
  180. private:
  181. AsyncFunc func;
  182. };
  183. //Utility functions for executing a lambda function in parallel, but allow the number of concurrent iterations
  184. //action on exception, and shuffling to be controlled.
  185. template <typename AsyncFunc>
  186. inline void asyncFor(unsigned num, unsigned maxAtOnce, bool abortFollowingException, bool shuffled, AsyncFunc func)
  187. {
  188. CAsyncForFunc<AsyncFunc> async(func);
  189. async.For(num, maxAtOnce, abortFollowingException, shuffled);
  190. }
  191. template <typename AsyncFunc>
  192. inline void asyncFor(unsigned num, unsigned maxAtOnce, bool abortFollowingException, AsyncFunc func)
  193. {
  194. asyncFor(num, maxAtOnce, abortFollowingException, false, func);
  195. }
  196. template <typename AsyncFunc>
  197. inline void asyncFor(unsigned num, unsigned maxAtOnce, AsyncFunc func)
  198. {
  199. asyncFor(num, maxAtOnce, false, false, func);
  200. }
  201. template <typename AsyncFunc>
  202. inline void asyncFor(unsigned num, AsyncFunc func)
  203. {
  204. asyncFor(num, num, false, false, func);
  205. }
  206. // ---------------------------------------------------------------------------
  207. // Thread Pools
  208. // ---------------------------------------------------------------------------
  209. interface IPooledThread: extends IInterface // base class for deriving pooled thread (alternative to Thread)
  210. {
  211. public:
  212. virtual void init(void *param) = 0; // called before threadmain started (from within start)
  213. virtual void threadmain() = 0; // where threads code goes (param is passed from start)
  214. virtual bool stop() = 0; // called to cause threadmain to return, returns false if request rejected
  215. virtual bool canReuse() const = 0; // return true if object can be re-used (after stopped), otherwise released
  216. };
  217. interface IThreadFactory: extends IInterface // factory for creating new pooled instances (called when pool empty)
  218. {
  219. virtual IPooledThread *createNew()=0;
  220. };
  221. typedef IIteratorOf<IPooledThread> IPooledThreadIterator;
  222. typedef unsigned PooledThreadHandle;
  223. interface IThreadPool : extends IInterface
  224. {
  225. virtual PooledThreadHandle start(void *param)=0; // starts a new thread reuses stopped pool entries
  226. virtual PooledThreadHandle start(void *param,const char *name)=0; // starts a new thread reuses stopped pool entries
  227. virtual PooledThreadHandle start(void *param,const char *name,unsigned timeout)=0; // starts a new thread reuses stopped pool entries, throws exception if can't start within timeout
  228. virtual bool stop(PooledThreadHandle handle)=0; // initiates stop on specified thread (may return false)
  229. virtual bool stopAll(bool tryall=false)=0; // initiates stop on all threads, if tryall continues even if one or more fails
  230. virtual bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)=0;
  231. // waits for a single thread to terminate
  232. virtual bool joinAll(bool del=true,unsigned timeout=INFINITE)=0; // waits for all threads in thread pool to terminate
  233. // if del true frees all pooled threads
  234. virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads
  235. virtual unsigned runningCount()=0; // number of currently running threads
  236. virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
  237. virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception
  238. virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period
  239. virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available
  240. };
  241. extern jlib_decl IThreadPool *createThreadPool(
  242. const char *poolname, // trace name of pool
  243. IThreadFactory *factory, // factory for creating new thread instances
  244. IExceptionHandler *exceptionHandler=NULL, // optional exception handler
  245. unsigned defaultmax=50, // maximum number of threads before starts blocking
  246. unsigned delay=1000, // maximum delay on each block
  247. unsigned stacksize=0, // stack size (bytes) 0 is default
  248. unsigned timeoutOnRelease=INFINITE, // maximum time waited for thread to terminate on releasing pool
  249. unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax)
  250. );
  251. extern jlib_decl StringBuffer &getThreadList(StringBuffer &str);
  252. extern jlib_decl unsigned getThreadCount();
  253. extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0
  254. // Simple pipe process support
  255. interface ISimpleReadStream;
  256. #define START_FAILURE (199) // return code if program cannot be started
  257. interface IPipeProcessException : extends IException
  258. {
  259. };
  260. extern jlib_decl IPipeProcessException *createPipeErrnoException(int code, const char *msg);
  261. extern jlib_decl IPipeProcessException *createPipeErrnoExceptionV(int code, const char *msg, ...) __attribute__((format(printf, 2, 3)));
  262. interface IPipeProcess: extends IInterface
  263. {
  264. virtual bool run(const char *title,const char *prog, const char *dir,
  265. bool hasinput,bool hasoutput,bool haserror=false,
  266. size32_t stderrbufsize=0, // set to non-zero to automatically buffer stderror output
  267. bool newProcessGroup=false) __attribute__ ((warn_unused_result)) = 0;
  268. virtual bool hasInput() = 0; // i.e. can write to pipe
  269. virtual size32_t write(size32_t sz, const void *buffer) = 0; // write pipe process standard output
  270. virtual bool hasOutput() = 0; // i.e. can read from pipe
  271. virtual size32_t read(size32_t sz, void *buffer) = 0; // read from pipe process standard output
  272. virtual ISimpleReadStream *getOutputStream() = 0; // read from pipe process standard output
  273. virtual bool hasError() = 0; // i.e. can read from pipe stderr
  274. virtual size32_t readError(size32_t sz, void *buffer) = 0; // read from pipe process standard error
  275. virtual ISimpleReadStream *getErrorStream() = 0; // read from pipe process standard error
  276. virtual unsigned wait() = 0; // returns return code
  277. virtual unsigned wait(unsigned timeoutms, bool &timedout) = 0; // sets timedout to true if times out
  278. virtual void closeInput() = 0; // indicate finished input to pipe
  279. virtual void closeOutput() = 0; // indicate finished reading from pipe (generally called automatically)
  280. virtual void closeError() = 0; // indicate finished reading from pipe stderr
  281. virtual void abort() = 0;
  282. virtual void notifyTerminated(HANDLE pid,unsigned retcode) = 0; // internal
  283. virtual HANDLE getProcessHandle() = 0; // used to auto kill
  284. virtual void setenv(const char *var, const char *value) = 0; // Set a value to be passed in the called process environment
  285. };
  286. extern jlib_decl IPipeProcess *createPipeProcess(const char *allowedprograms=NULL);
  287. //--------------------------------------------------------
  288. interface IWorkQueueItem: extends IInterface
  289. {
  290. virtual void execute()=0;
  291. };
  292. interface IWorkQueueThread: extends IInterface
  293. {
  294. virtual void post(IWorkQueueItem *item)=0; // takes ownership of item
  295. virtual void wait()=0;
  296. virtual unsigned pending()=0;
  297. };
  298. // Simple lightweight async worker queue
  299. // internally thread persists for specified time waiting before self destroying
  300. extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=1000*60);
  301. #endif