/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #ifndef __JTHREAD__ #define __JTHREAD__ #include "jexpdef.hpp" #include "jiface.hpp" #include "jmutex.hpp" #include "jexcept.hpp" #include "jhash.hpp" #ifdef _WIN32 #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL #else // no thread priority handling? #endif interface jlib_decl IThread : public IInterface { virtual void start() = 0; virtual int run() = 0; }; interface jlib_decl IThreadName { virtual const char *get()=0; }; extern jlib_decl void addThreadExceptionHandler(IExceptionHandler *handler); extern jlib_decl void removeThreadExceptionHandler(IExceptionHandler *handler); extern jlib_decl void enableThreadSEH(); extern jlib_decl void disableThreadSEH(); extern jlib_decl unsigned threadLogID(); // for use in logging class jlib_decl Thread : public CInterface, public IThread { private: ThreadId threadid; unsigned short stacksize; // in 4K blocks char prioritydelta; char nicelevel; bool alive; unsigned tidlog; #ifdef _WIN32 HANDLE hThread; static unsigned WINAPI _threadmain(LPVOID v); #else static void *_threadmain(void *v); #endif virtual int begin(); void init(const char *name); void handleException(IException *e); void adjustNiceLevel(); protected: struct cThreadName: implements IThreadName { char *threadname; const char *get() { return threadname; } } cthreadname; IThreadName *ithreadname; public: #ifndef _WIN32 Semaphore suspend; Semaphore starting; #endif Semaphore stopped; IMPLEMENT_IINTERFACE; Thread(const char *_name) { init(_name); } Thread() { init(NULL); } ~Thread(); void adjustPriority(char delta); void setNice(char nicelevel); void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously) const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; } bool isAlive() { return alive; } bool join(unsigned timeout=INFINITE); virtual void start(); virtual void startRelease(); StringBuffer &getInfo(StringBuffer &str) { str.appendf("%8"I64F"X %6"I64F"d %u: %s",(__int64)threadid,(__int64)threadid,tidlog,getName()); return str; } const char *getLogInfo(int &thandle,unsigned &tid) { #ifdef _WIN32 thandle = (int)(memsize_t)hThread; #elif defined __FreeBSD__ thandle = (int)(memsize_t)threadid; #else thandle = threadid; #endif tid = tidlog; return getName(); } // run method not implemented - concrete derived classes must do so static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:) IThreadName *queryThreadName() { return ithreadname; } void setThreadName(IThreadName *name) { ithreadname = name; } }; interface IThreaded { virtual void main() = 0; }; // utility class, useful for containing a thread class CThreaded : public Thread { IThreaded *owner; public: CThreaded(const char *name) : Thread(name) { } void init(IThreaded *_owner) { owner = _owner; start(); } virtual int run() { owner->main(); return 1; } }; // Asynchronous 'for' utility class // see HRPCUTIL.CPP for example of usage class jlib_decl CAsyncFor { public: void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false); virtual void Do(unsigned idx=0)=0; }; // Thread local storage - use MAKETHREADLOCALIINTERFACE macro to get a thread local type template > class ThreadLocalOf : public MAP { public: CLASS * query() { CLASS * find = threadMap.getValue(GetCurrentThreadId()); if(find) return find; threadMap.setValue(GetCurrentThreadId(), CLASSINIT()); return threadMap.getValue(GetCurrentThreadId()); } operator CLASS & () { return *query(); } private: MAP threadMap; }; #define MAKETHREADLOCALIINTERFACE(C, CI, NAME) \ typedef ThreadLocalOf NAME // --------------------------------------------------------------------------- // Thread Pools // --------------------------------------------------------------------------- interface IPooledThread: extends IInterface // base class for deriving pooled thread (alternative to Thread) { public: virtual void init(void *param)=0; // called before main started (from within start) virtual void main()=0; // where threads code goes (param is passed from start) virtual bool stop()=0; // called to cause main to return, returns false if request rejected virtual bool canReuse()=0; // return true if object can be re-used (after stopped), otherwise released }; interface IThreadFactory: extends IInterface // factory for creating new pooled instances (called when pool empty) { virtual IPooledThread *createNew()=0; }; typedef IIteratorOf IPooledThreadIterator; typedef unsigned PooledThreadHandle; interface IThreadPool : extends IInterface { virtual PooledThreadHandle start(void *param)=0; // starts a new thread reuses stopped pool entries virtual PooledThreadHandle start(void *param,const char *name)=0; // starts a new thread reuses stopped pool entries virtual PooledThreadHandle start(void *param,const char *name,unsigned timeout)=0; // starts a new thread reuses stopped pool entries, throws exception if can't start within timeout virtual bool stop(PooledThreadHandle handle)=0; // initiates stop on specified thread (may return false) virtual bool stopAll(bool tryall=false)=0; // initiates stop on all threads, if tryall continues even if one or more fails virtual bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)=0; // waits for a single thread to terminate virtual bool joinAll(bool del=true,unsigned timeout=INFINITE)=0; // waits for all threads in thread pool to terminate // if del true frees all pooled threads virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads virtual unsigned runningCount()=0; // number of currently running threads virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception }; extern jlib_decl IThreadPool *createThreadPool( const char *poolname, // trace name of pool IThreadFactory *factory, // factory for creating new thread instances IExceptionHandler *exceptionHandler=NULL, // optional exception handler unsigned defaultmax=50, // maximum number of threads before starts blocking unsigned delay=1000, // maximum delay on each block unsigned stacksize=0, // stack size (bytes) 0 is default unsigned timeoutOnRelease=INFINITE, // maximum time waited for thread to terminate on releasing pool unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax) ); extern jlib_decl StringBuffer &getThreadList(StringBuffer &str); extern jlib_decl unsigned getThreadCount(); extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0 // Simple pipe process support interface ISimpleReadStream; interface IPipeProcess: extends IInterface { virtual bool run(const char *title,const char *prog, const char *dir, bool hasinput,bool hasoutput,bool haserror=false, size32_t stderrbufsize=0) = 0; // set to non-zero to automatically buffer stderror output virtual bool hasInput() = 0; // i.e. can write to pipe virtual size32_t write(size32_t sz, const void *buffer) = 0; // write pipe process standard output virtual bool hasOutput() = 0; // i.e. can read from pipe virtual size32_t read(size32_t sz, void *buffer) = 0; // read from pipe process standard output virtual ISimpleReadStream *getOutputStream() = 0; // read from pipe process standard output virtual bool hasError() = 0; // i.e. can read from pipe stderr virtual size32_t readError(size32_t sz, void *buffer) = 0; // read from pipe process standard error virtual ISimpleReadStream *getErrorStream() = 0; // read from pipe process standard error virtual unsigned wait() = 0; // returns return code virtual unsigned wait(unsigned timeoutms, bool &timedout) = 0; // sets timedout to true if times out virtual void closeInput() = 0; // indicate finished input to pipe virtual void closeOutput() = 0; // indicate finished reading from pipe (generally called automatically) virtual void closeError() = 0; // indicate finished reading from pipe stderr virtual void abort() = 0; virtual void notifyTerminated(HANDLE pid,unsigned retcode) = 0; // internal virtual HANDLE getProcessHandle() = 0; // used to auto kill }; extern jlib_decl IPipeProcess *createPipeProcess(const char *allowedprograms=NULL); //-------------------------------------------------------- interface IWorkQueueItem: extends IInterface { virtual void execute()=0; }; interface IWorkQueueThread: extends IInterface { virtual void post(IWorkQueueItem *item)=0; // takes ownership of item virtual void wait()=0; virtual unsigned pending()=0; }; // Simple lightweight async worker queue // internally thread persists for specified time waiting before self destroying extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=1000*60); #endif