1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jthread.hpp"
- #include "jlib.hpp"
- #include "jfile.hpp"
- #include "jmutex.hpp"
- #include "jexcept.hpp"
- #include "jmisc.hpp"
- #include "jqueue.tpp"
- #include "jregexp.hpp"
- #include <assert.h>
- #ifdef _WIN32
- #include <process.h>
- #else
- #include <unistd.h>
- #include <sys/wait.h>
- #include <sys/syscall.h>
- #include <sys/types.h>
- #include <sys/resource.h>
- #endif
- #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
- #undef new
- #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
- #endif
- #define LINUX_STACKSIZE_CAP (0x200000)
- //#define NO_CATCHALL
- static __thread ThreadTermFunc threadTerminationHook;
- ThreadTermFunc addThreadTermFunc(ThreadTermFunc onTerm)
- {
- ThreadTermFunc old = threadTerminationHook;
- threadTerminationHook = onTerm;
- return old;
- }
- PointerArray *exceptionHandlers = NULL;
- MODULE_INIT(INIT_PRIORITY_JTHREAD)
- {
- if (threadTerminationHook)
- (*threadTerminationHook)(); // May be too late :(
- exceptionHandlers = new PointerArray();
- return true;
- }
- MODULE_EXIT()
- {
- delete exceptionHandlers;
- }
- void addThreadExceptionHandler(IExceptionHandler *handler)
- {
- assertex(exceptionHandlers); // have to ensure MODULE_INIT has appropriate priority.
- exceptionHandlers->append(handler);
- }
- void removeThreadExceptionHandler(IExceptionHandler *handler)
- {
- exceptionHandlers->zap(handler);
- }
- static bool SEHHandling = false;
- void enableThreadSEH() { SEHHandling=true; }
- void disableThreadSEH() { SEHHandling=false; } // only prevents new threads from having SEH handler, no mech. for turning off existing threads SEH handling.
- static ICopyArrayOf<Thread> ThreadList;
- static CriticalSection ThreadListSem;
- static size32_t defaultThreadStackSize=0;
- static ICopyArrayOf<Thread> ThreadDestroyList;
- static SpinLock ThreadDestroyListLock;
- #ifdef _WIN32
- extern void *EnableSEHtranslation();
- unsigned WINAPI Thread::_threadmain(LPVOID v)
- #else
- void *Thread::_threadmain(void *v)
- #endif
- {
- Thread * t = (Thread *)v;
- #ifdef _WIN32
- if (SEHHandling)
- EnableSEHtranslation();
- #else
- t->tidlog = threadLogID();
- #endif
- int ret = t->begin();
- char *&threadname = t->cthreadname.threadname;
- if (threadname) {
- memsize_t l=strlen(threadname);
- char *newname = (char *)malloc(l+8+1);
- memcpy(newname,"Stopped ",8);
- memcpy(newname+8,threadname,l+1);
- char *oldname = threadname;
- threadname = newname;
- free(oldname);
- }
- {
- // need to ensure joining thread does not race with us to release
- t->Link(); // extra safety link
- {
- SpinBlock block(ThreadDestroyListLock);
- ThreadDestroyList.append(*t);
- }
- try {
- t->stopped.signal();
- if (t->Release()) {
- PROGLOG("extra unlinked thread");
- PrintStackReport();
- }
- else
- t->Release();
- }
- catch (...) {
- PROGLOG("thread release exception");
- throw;
- }
- {
- SpinBlock block(ThreadDestroyListLock);
- ThreadDestroyList.zap(*t); // hopefully won't get too big (i.e. one entry!)
- }
- }
- #if defined(_WIN32)
- return ret;
- #else
- return (void *) (memsize_t)ret;
- #endif
- }
- // JCSMORE - should have a setPriority(), unsupported under _WIN32
- void Thread::adjustPriority(char delta)
- {
- if (delta < -2)
- prioritydelta = -2;
- else if (delta > 2)
- prioritydelta = 2;
- else
- prioritydelta = delta;
- if (alive)
- {
- #if defined(_WIN32)
- int priority;
- switch (delta)
- {
- case -2: priority = THREAD_PRIORITY_LOWEST; break;
- case -1: priority = THREAD_PRIORITY_BELOW_NORMAL; break;
- case 0: priority = THREAD_PRIORITY_NORMAL; break;
- case +1: priority = THREAD_PRIORITY_ABOVE_NORMAL; break;
- case +2: priority = THREAD_PRIORITY_HIGHEST; break;
- }
- SetThreadPriority(hThread, priority);
- #else
- //MORE - What control is there?
- int policy;
- sched_param param;
- int rc;
- if (( rc = pthread_getschedparam(threadid, &policy, ¶m)) != 0)
- DBGLOG("pthread_getschedparam error: %d", rc);
- switch (delta)
- {
- // JCS - doubtful whether these good values...
- case -2: param.sched_priority = 0; policy =SCHED_OTHER; break;
- case -1: param.sched_priority = 0; policy =SCHED_OTHER; break;
- case 0: param.sched_priority = 0; policy =SCHED_OTHER; break;
- case +1: param.sched_priority = (sched_get_priority_max(SCHED_RR)-sched_get_priority_min(SCHED_RR))/2; policy =SCHED_RR; break;
- case +2: param.sched_priority = sched_get_priority_max(SCHED_RR); policy =SCHED_RR; break;
- }
- if(( rc = pthread_setschedparam(threadid, policy, ¶m)) != 0)
- DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%"I64F"u PID=%i", rc,policy,param.sched_priority,(unsigned __int64) threadid,getpid());
- else
- DBGLOG("priority set id=%"I64F"u policy=%i pri=%i PID=%i",(unsigned __int64) threadid,policy,param.sched_priority,getpid());
- #endif
- }
- }
- void Thread::adjustNiceLevel()
- {
- #if defined(_WIN32)
- int priority;
- if(nicelevel < -15)
- priority = THREAD_PRIORITY_TIME_CRITICAL;
- else if(nicelevel >= -15 && nicelevel < -10)
- priority = THREAD_PRIORITY_HIGHEST;
- else if(nicelevel >= -10 && nicelevel < 0)
- priority = THREAD_PRIORITY_ABOVE_NORMAL;
- else if(nicelevel == 0)
- priority = THREAD_PRIORITY_NORMAL;
- else if(nicelevel > 0 && nicelevel <= 10)
- priority = THREAD_PRIORITY_BELOW_NORMAL;
- else if(nicelevel > 10 && nicelevel <= 15)
- priority = THREAD_PRIORITY_LOWEST;
- else if(nicelevel >15)
- priority = THREAD_PRIORITY_IDLE;
- SetThreadPriority(hThread, priority);
- #elif defined(__linux__)
- setpriority(PRIO_PROCESS, 0, nicelevel);
- #else
- UNIMPLEMENTED;
- #endif
- }
- bool Thread::isCurrentThread() const
- {
- return GetCurrentThreadId() == threadid;
- }
- // _nicelevel ranges from -20 to 19, the higher the nice level, the less cpu time the thread will get.
- void Thread::setNice(char _nicelevel)
- {
- if (_nicelevel < -20 || _nicelevel > 19)
- throw MakeStringException(0, "nice level should be between -20 and 19");
- if(alive)
- throw MakeStringException(0, "nice can only be set before the thread is started.");
-
- nicelevel = _nicelevel;
- }
- void Thread::setStackSize(size32_t size)
- {
- stacksize = (unsigned short)(size/0x1000);
- }
- void Thread::setDefaultStackSize(size32_t size)
- {
- defaultThreadStackSize = size; // has no effect under windows (though may be used for calculations later)
- }
- int Thread::begin()
- {
- if(nicelevel)
- adjustNiceLevel();
- #ifndef _WIN32
- starting.signal();
- suspend.wait();
- #endif
- int ret=-1;
- try {
- ret = run();
- }
- catch (IException *e)
- {
- handleException(e);
- }
- #ifndef NO_CATCHALL
- catch (...)
- {
- handleException(MakeStringException(0, "Unknown exception in Thread %s", getName()));
- }
- #endif
- if (threadTerminationHook)
- {
- (*threadTerminationHook)();
- threadTerminationHook = NULL;
- }
- #ifdef _WIN32
- #ifndef _DEBUG
- CloseHandle(hThread); // leak handle when debugging,
- // fixes some lockups/crashes in the debugger when lots of threads being created
- #endif
- hThread = NULL;
- #endif
- //alive = false; // not safe here
- return ret;
- }
- void Thread::handleException(IException *e)
- {
- assertex(exceptionHandlers);
- if (exceptionHandlers->ordinality() == 0)
- {
- PrintExceptionLog(e,getName());
- //throw; // don't rethrow unhandled, preferable over alternative of causing process death
- e->Release();
- }
- else
- {
- PrintExceptionLog(e,getName());
- bool handled = false;
- ForEachItemIn(ie, *exceptionHandlers)
- {
- IExceptionHandler *handler = (IExceptionHandler *) exceptionHandlers->item(ie);
- handled = handler->fireException(e) || handled;
- }
- if (!handled)
- {
- // if nothing choose to handle it.
- EXCLOG(e, NULL);
- //throw e; // don't rethrow unhandled, preferable over alternative of causing process death
- }
- e->Release();
- }
- }
- void Thread::init(const char *_name)
- {
- #ifdef _WIN32
- hThread = NULL;
- #endif
- threadid = 0;
- tidlog = 0;
- alive = false;
- cthreadname.threadname = (NULL == _name) ? NULL : strdup(_name);
- ithreadname = &cthreadname;
- prioritydelta = 0;
- nicelevel = 0;
- stacksize = 0; // default is EXE default stack size (set by /STACK)
- }
- void Thread::start()
- {
- if (alive) {
- WARNLOG("Thread::start(%s) - Thread already started!",getName());
- PrintStackReport();
- #ifdef _DEBUG
- throw MakeStringException(-1,"Thread::start(%s) - Thread already started!",getName());
- #endif
- return;
- }
- Link();
- startRelease();
- }
- void Thread::startRelease()
- {
- assertex(!alive);
- stopped.reinit(0); // just in case restarting
- #ifdef _WIN32
- hThread = (HANDLE)_beginthreadex(NULL, 0x1000*(unsigned)stacksize, Thread::_threadmain, this, CREATE_SUSPENDED, (unsigned *)&threadid);
- if (!hThread || !threadid)
- {
- Release();
- throw MakeOsException(GetLastError());
- }
- #else
- int status;
- unsigned numretrys = 8;
- unsigned delay = 1000;
- loop {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (stacksize)
- pthread_attr_setstacksize(&attr, (unsigned)stacksize*0x1000);
- else if (defaultThreadStackSize)
- pthread_attr_setstacksize(&attr, defaultThreadStackSize);
- else {
- #ifndef __64BIT__ // no need to cap 64bit
- size_t defss=0;
- pthread_attr_getstacksize(&attr, &defss);
- if (defss>LINUX_STACKSIZE_CAP)
- pthread_attr_setstacksize(&attr, LINUX_STACKSIZE_CAP);
- #endif
- }
- status = pthread_create(&threadid, &attr, Thread::_threadmain, this);
- if ((status==EAGAIN)||(status==EINTR)) {
- if (numretrys--==0)
- break;
- WARNLOG("pthread_create(%d): Out of threads, retrying...",status);
- Sleep(delay);
- delay *= 2;
- }
- else
- break;
- }
- if (status) {
- threadid = 0;
- Release();
- ERRLOG("pthread_create returns %d",status);
- PrintStackReport();
- PrintMemoryReport();
- StringBuffer s;
- getThreadList(s);
- ERRLOG("Running threads:\n %s",s.str());
- throw MakeOsException(status);
- }
- unsigned retryCount = 10;
- loop
- {
- if (starting.wait(1000*10))
- break;
- else if (0 == --retryCount)
- throw MakeStringException(-1, "Thread::start(%s) failed", getName());
- WARNLOG("Thread::start(%s) stalled, waiting to start, retrying", getName());
- }
- #endif
- alive = true;
- if (prioritydelta)
- adjustPriority(prioritydelta);
- {
- CriticalBlock block(ThreadListSem);
- ThreadList.zap(*this); // just in case restarting
- ThreadList.append(*this);
- }
- #ifdef _WIN32
- DWORD count = ResumeThread(hThread);
- assertex(count == 1);
- #else
- suspend.signal();
- #endif
- }
- bool Thread::join(unsigned timeout)
- {
- if (!alive&&!threadid) {
- #ifdef _DEBUG
- PROGLOG("join on unstarted thread!");
- PrintStackReport();
- #endif
- return true;
- }
- if (!stopped.wait(timeout))
- return false;
- if (!alive) // already joined
- {
- stopped.signal();
- return true;
- }
- unsigned count = 0;
- unsigned st = 0;
- loop { // this is to prevent race with destroy
- // (because Thread objects are not always link counted!)
- {
- SpinBlock block(ThreadDestroyListLock);
- if (ThreadDestroyList.find(*this)==NotFound)
- break;
- }
- #ifdef _DEBUG
- if (st==10)
- PROGLOG("Thread::join race");
- #endif
- Sleep(st); // switch back to exiting thread (not very elegant!)
- st++;
- if (st>10)
- st = 10; // note must be non-zero for high priority threads
- }
- #ifdef _DEBUG
- int c = getLinkCount();
- if (c>=DEAD_PSEUDO_COUNT) {
- PROGLOG("Dead/Dying thread joined! %d",c);
- PrintStackReport();
- }
- #endif
- alive = false; // should be safe here
- stopped.signal(); // signal stopped again, to prevent any parallel call from blocking.
- return true;
- }
- Thread::~Thread()
- {
- ithreadname = &cthreadname; // safer (as derived classes destroyed)
- #ifdef _DEBUG
- if (alive) {
- if (!stopped.wait(0)) { // see if fell out of threadmain and signal stopped
- PROGLOG("Live thread killed! %s",getName());
- PrintStackReport();
- }
- // don't need to resignal as we are on way out
- }
- #endif
- Link();
-
- // DBGLOG("Thread %x (%s) destroyed\n", threadid, threadname);
- {
- CriticalBlock block(ThreadListSem);
- ThreadList.zap(*this);
- }
- free(cthreadname.threadname);
- cthreadname.threadname = NULL;
- }
- unsigned getThreadCount()
- {
- CriticalBlock block(ThreadListSem);
- return ThreadList.ordinality();
- }
- StringBuffer & getThreadList(StringBuffer &str)
- {
- CriticalBlock block(ThreadListSem);
- ForEachItemIn(i,ThreadList) {
- Thread &item=ThreadList.item(i);
- item.getInfo(str).append("\n");
- }
- return str;
- }
- StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
- {
- CriticalBlock block(ThreadListSem);
- bool found=false;
- ForEachItemIn(i,ThreadList) {
- Thread &item=ThreadList.item(i);
- int h;
- unsigned t;
- const char *s = item.getLogInfo(h,t);
- if (s&&*s&&((thandle==0)||(h==thandle))&&((tid==0)||(t==tid))) {
- if (found) {
- name.clear();
- break; // only return if unambiguous
- }
- name.append(s);
- found = true;
- }
- }
- return name;
- }
- // CThreadedPersistent
- CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner)
- {
- halt = false;
- atomic_set(&state, s_ready);
- athread.start();
- }
- CThreadedPersistent::~CThreadedPersistent()
- {
- join(INFINITE);
- halt = true;
- sem.signal();
- athread.join();
- }
- void CThreadedPersistent::main()
- {
- loop
- {
- sem.wait();
- if (halt)
- break;
- try
- {
- owner->main();
- // Note we do NOT call the thread reset hook here - these threads are expected to be able to preserve state, I think
- }
- catch (IException *e)
- {
- VStringBuffer errMsg("CThreadedPersistent (%s)", athread.getName());
- EXCLOG(e, errMsg.str());
- exception.setown(e);
- joinSem.signal(); // leave in running state, signal to join to handle
- continue;
- }
- if (!atomic_cas(&state, s_ready, s_running))
- if (atomic_cas(&state, s_ready, s_joining))
- joinSem.signal();
- }
- }
- void CThreadedPersistent::start()
- {
- if (!atomic_cas(&state, s_running, s_ready))
- {
- VStringBuffer msg("CThreadedPersistent::start(%s) - not ready", athread.getName());
- WARNLOG("%s", msg.str());
- PrintStackReport();
- throw MakeStringException(-1, "%s", msg.str());
- }
- sem.signal();
- }
- bool CThreadedPersistent::join(unsigned timeout)
- {
- if (atomic_cas(&state, s_joining, s_running))
- {
- if (!joinSem.wait(timeout))
- {
- if (atomic_cas(&state, s_running, s_joining)) // if still joining, restore running state
- return false;
- // if here, main() set s_ready after timeout and has or will signal
- if (!joinSem.wait(60000)) // should be instant
- throwUnexpected();
- return true;
- }
- if (exception.get())
- {
- // switch back to ready state and throw
- Owned<IException> e = exception.getClear();
- if (!atomic_cas(&state, s_ready, s_joining))
- throwUnexpected();
- throw e.getClear();
- }
- }
- return true;
- }
- //class CAsyncFor
- void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
- {
- if (num <= 1)
- {
- if (num == 1)
- Do(0);
- return;
- }
- Mutex errmutex;
- Semaphore ready;
- Semaphore finished;
- IException *e=NULL;
- Owned<IShuffledIterator> shuffler;
- if (shuffled) {
- shuffler.setown(createShuffledIterator(num));
- shuffler->first(); // prime (needed to make thread safe)
- }
- unsigned i;
- if (maxatonce==1) { // no need for threads
- for (i=0;i<num;i++) {
- unsigned idx = shuffled?shuffler->lookup(i):i;
- try {
- Do(idx);
- }
- catch (IException * _e)
- {
- if (e)
- _e->Release(); // only return first
- else
- e = _e;
- if (abortFollowingException)
- break;
- }
- }
- }
- else {
- class cdothread: public Thread
- {
- public:
- Mutex *errmutex;
- Semaphore &ready;
- Semaphore &finished;
- int timeout;
- IException *&erre;
- unsigned idx;
- CAsyncFor *self;
- cdothread(CAsyncFor *_self,unsigned _idx,Semaphore &_ready,Semaphore &_finished,Mutex *_errmutex,IException *&_e)
- : Thread("CAsyncFor"),ready(_ready),finished(_finished),erre(_e)
- {
- errmutex =_errmutex;
- idx = _idx;
- self = _self;
- }
- int run()
- {
- try {
- self->Do(idx);
- }
- catch (IException * _e)
- {
- synchronized block(*errmutex);
- if (erre)
- _e->Release(); // only return first
- else
- erre = _e;
- }
- #ifndef NO_CATCHALL
- catch (...)
- {
- synchronized block(*errmutex);
- if (!erre)
- erre = MakeStringException(0, "Unknown exception in Thread %s", getName());
- }
- #endif
- ready.signal();
- finished.signal();
- return 0;
- }
- };
- if (maxatonce==0)
- maxatonce = num;
- for (i=0;(i<num)&&(i<maxatonce);i++)
- ready.signal();
- for (i=0;i<num;i++) {
- ready.wait();
- if (abortFollowingException && e) break;
- Thread *thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,finished,&errmutex,e);
- thread->startRelease();
- }
- while (i--)
- finished.wait();
- }
- if (e)
- throw e;
- }
- // ---------------------------------------------------------------------------
- // Thread Pools
- // ---------------------------------------------------------------------------
- class CPooledThreadWrapper;
- class CThreadPoolBase
- {
- public:
- virtual ~CThreadPoolBase() {}
- protected: friend class CPooledThreadWrapper;
- IExceptionHandler *exceptionHandler;
- CriticalSection crit;
- StringAttr poolname;
- int donewaiting;
- Semaphore donesem;
- PointerArray waitingsems;
- UnsignedArray waitingids;
- bool stopall;
- unsigned defaultmax;
- unsigned targetpoolsize;
- unsigned delay;
- Semaphore availsem;
- atomic_t numrunning;
- virtual void notifyStarted(CPooledThreadWrapper *item)=0;
- virtual bool notifyStopped(CPooledThreadWrapper *item)=0;
- };
- class CPooledThreadWrapper: public Thread
- {
- PooledThreadHandle handle;
- IPooledThread *thread;
- Semaphore sem;
- CThreadPoolBase &parent;
- char *runningname;
- public:
- IMPLEMENT_IINTERFACE;
- CPooledThreadWrapper(CThreadPoolBase &_parent,
- PooledThreadHandle _handle,
- IPooledThread *_thread) // takes ownership of thread
- : Thread(StringBuffer("Member of thread pool: ").append(_parent.poolname).str()), parent(_parent)
- {
- thread = _thread;
- handle = _handle;
- runningname = strdup(_parent.poolname);
- }
- ~CPooledThreadWrapper()
- {
- thread->Release();
- free(runningname);
- }
- void setName(const char *name) { free(runningname); runningname=strdup(name); }
- void setHandle(PooledThreadHandle _handle) { handle = _handle; }
- PooledThreadHandle queryHandle() { return handle; }
- IPooledThread &queryThread() { return *thread; }
- void setThread(IPooledThread *_thread) { thread = _thread; } // takes ownership
- bool isStopped() { return (handle==0); }
- PooledThreadHandle markStopped()
- {
- PooledThreadHandle ret=handle;
- handle = 0;
- if (ret) // JCSMORE - I can't see how handle can not be set if here..
- atomic_dec(&parent.numrunning);
- return ret;
- }
- void markStarted()
- {
- atomic_inc(&parent.numrunning);
- }
- int run()
- {
- do
- {
- sem.wait();
- {
- CriticalBlock block(parent.crit); // to synchronize
- if (parent.stopall)
- break;
- }
- parent.notifyStarted(this);
- try
- {
- char *&threadname = cthreadname.threadname;
- char *temp = threadname; // swap running name and threadname
- threadname = runningname;
- runningname = temp;
- thread->main();
- temp = threadname; // and back
- threadname = runningname;
- runningname = temp;
- }
- catch (IException *e)
- {
- char *&threadname = cthreadname.threadname;
- char *temp = threadname; // swap back
- threadname = runningname;
- runningname = temp;
- handleException(e);
- }
- #ifndef NO_CATCHALL
- catch (...)
- {
- char *&threadname = cthreadname.threadname;
- char *temp = threadname; // swap back
- threadname = runningname;
- runningname = temp;
- handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
- }
- #endif
- if (threadTerminationHook)
- {
- (*threadTerminationHook)(); // Reset any pre-thread state.
- threadTerminationHook = NULL;
- }
- } while (parent.notifyStopped(this));
- return 0;
- }
- void cycle()
- {
- sem.signal();
- }
- void go(void *param)
- {
- thread->init(param);
- cycle();
- }
- bool stop()
- {
- if (handle)
- return thread->stop();
- return true;
- }
- void handleException(IException *e)
- {
- CriticalBlock block(parent.crit);
- PrintExceptionLog(e,parent.poolname.get());
- if (!parent.exceptionHandler||!parent.exceptionHandler->fireException(e)) {
- }
- e->Release();
- }
- };
- class CPooledThreadIterator: public CInterface , implements IPooledThreadIterator
- {
- unsigned current;
- public:
- IArrayOf<IPooledThread> threads;
- IMPLEMENT_IINTERFACE;
- CPooledThreadIterator()
- {
- current = 0;
- }
- bool first()
- {
- current = 0;
- return threads.isItem(current);
- }
- bool next()
- {
- current++;
- return threads.isItem(current);
- }
- bool isValid()
- {
- return threads.isItem(current);
- }
- IPooledThread & query()
- {
- return threads.item(current);
- }
- };
- class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInterface
- {
- CIArrayOf<CPooledThreadWrapper> threadwrappers;
- PooledThreadHandle nextid;
- IThreadFactory *factory;
- unsigned stacksize;
- unsigned timeoutOnRelease;
- unsigned traceStartDelayPeriod;
- unsigned startsInPeriod;
- cycle_t startDelayInPeriod;
- CCycleTimer overAllTimer;
- PooledThreadHandle _start(void *param,const char *name, bool noBlock, unsigned timeout=0)
- {
- CCycleTimer startTimer;
- bool timedout = defaultmax && !availsem.wait(noBlock ? 0 : (timeout>0?timeout:delay));
- PooledThreadHandle ret;
- {
- CriticalBlock block(crit);
- if (timedout&&!availsem.wait(0)) { // make sure take allocated sem if has become available
- if (noBlock || timeout > 0)
- throw MakeStringException(0, "No threads available in pool %s", poolname.get());
- WARNLOG("Pool limit exceeded for %s", poolname.get());
- }
- if (traceStartDelayPeriod)
- {
- ++startsInPeriod;
- startDelayInPeriod += startTimer.elapsedCycles();
- if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
- {
- cycle_t avg = startDelayInPeriod/startsInPeriod;
- unsigned avgMs = static_cast<unsigned>(cycle_to_nanosec(avg)/1000000);
- PROGLOG("%s: %d threads started in last %d seconds, average delay = %d milliseconds", poolname.get(), startsInPeriod, traceStartDelayPeriod, avgMs);
- startsInPeriod = 0;
- startDelayInPeriod = 0;
- overAllTimer.reset();
- }
- }
- CPooledThreadWrapper &t = allocThread();
- if (name)
- t.setName(name);
- t.go(param);
- ret = t.queryHandle();
- }
- Sleep(0);
- return ret;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CThreadPool(IThreadFactory *_factory,IExceptionHandler *_exceptionHandler,const char *_poolname,unsigned _defaultmax, unsigned _delay, unsigned _stacksize, unsigned _timeoutOnRelease, unsigned _targetpoolsize)
- {
- poolname.set(_poolname);
- factory = LINK(_factory);
- exceptionHandler = _exceptionHandler;
- nextid = 1;
- stopall = false;
- defaultmax = _defaultmax;
- delay = _delay;
- if (defaultmax)
- availsem.signal(defaultmax);
- stacksize = _stacksize;
- timeoutOnRelease = _timeoutOnRelease;
- targetpoolsize = _targetpoolsize?_targetpoolsize:defaultmax;
- atomic_set(&numrunning,0);
- traceStartDelayPeriod = 0;
- startsInPeriod = 0;
- startDelayInPeriod = 0;
- }
- ~CThreadPool()
- {
- stopAll(true);
- if (!joinAll(true, timeoutOnRelease))
- WARNLOG("%s; timedout[%d] waiting for threads in pool", poolname.get(), timeoutOnRelease);
- CriticalBlock block(crit);
- bool first=true;
- ForEachItemIn(i,threadwrappers)
- {
- CPooledThreadWrapper &t = threadwrappers.item(i);
- if (!t.isStopped())
- {
- if (first)
- {
- WARNLOG("Threads still active: ");
- first = false;
- }
- StringBuffer threadInfo;
- PROGLOG("Active thread: %s, info: %s", t.getName(), t.getInfo(threadInfo).str());
- }
- }
- factory->Release();
- }
- CPooledThreadWrapper &allocThread()
- { // called in critical section
- PooledThreadHandle newid=nextid++;
- if (newid==0)
- newid=nextid++;
- ForEachItemIn(i,threadwrappers) {
- CPooledThreadWrapper &it = threadwrappers.item(i);
- if (it.isStopped()) {
- it.setHandle(newid);
- if (!it.queryThread().canReuse()) {
- it.queryThread().Release();
- it.setThread(factory->createNew());
- }
- return it;
- }
- }
- CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew());
- if (stacksize)
- ret.setStackSize(stacksize);
- ret.start();
- threadwrappers.append(ret);
- return ret;
- }
- CPooledThreadWrapper *findThread(PooledThreadHandle handle)
- { // called in critical section
- ForEachItemIn(i,threadwrappers) {
- CPooledThreadWrapper &it = threadwrappers.item(i);
- if (it.queryHandle()==handle)
- return ⁢
- }
- return NULL;
- }
- PooledThreadHandle startNoBlock(void *param)
- {
- return _start(param, NULL, true);
- }
- PooledThreadHandle startNoBlock(void *param,const char *name)
- {
- return _start(param, name, true);
- }
- PooledThreadHandle start(void *param)
- {
- return _start(param, NULL, false);
- }
- PooledThreadHandle start(void *param,const char *name)
- {
- return _start(param, name, false);
- }
- PooledThreadHandle start(void *param,const char *name, unsigned timeout)
- {
- return _start(param, name, false, timeout);
- }
- bool stop(PooledThreadHandle handle)
- {
- CriticalBlock block(crit);
- CPooledThreadWrapper *t = findThread(handle);
- if (t)
- return t->stop();
- return true; // already stopped
- }
- bool stopAll(bool tryall=false)
- {
- availsem.signal(1000);
- availsem.wait();
- CriticalBlock block(crit);
- bool ret=true;
- ForEachItemIn(i,threadwrappers) {
- CPooledThreadWrapper &it = threadwrappers.item(i);
- if (!it.stop()) {
- ret = false;
- if (!tryall)
- break;
- }
- }
- return ret;
- }
- bool joinWait(CPooledThreadWrapper &t,unsigned timeout)
- {
- // called in critical section
- if (t.isStopped())
- return true;
- Semaphore sem;
- waitingsems.append(&sem);
- waitingids.append(t.queryHandle());
- crit.leave();
- bool ret = sem.wait(timeout);
- crit.enter();
- unsigned i = waitingsems.find(&sem);
- if (i!=NotFound) {
- waitingids.remove(i);
- waitingsems.remove(i);
- }
- return ret;
- }
- bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)
- {
- CriticalBlock block(crit);
- CPooledThreadWrapper *t = findThread(handle);
- if (!t)
- return true; // already stopped
- return joinWait(*t,timeout);
- }
- virtual bool joinAll(bool del,unsigned timeout=INFINITE)
- { // note timeout is for each join
- CriticalBlock block(crit);
- CIArrayOf<CPooledThreadWrapper> tojoin;
- ForEachItemIn(i1,threadwrappers) {
- CPooledThreadWrapper &it = threadwrappers.item(i1);
- it.Link();
- tojoin.append(it);
- }
-
- ForEachItemIn(i2,tojoin)
- if (!joinWait(tojoin.item(i2),timeout))
- return false;
- if (del) {
- stopall = true;
- ForEachItemIn(i3,tojoin)
- tojoin.item(i3).cycle();
- {
- CriticalUnblock unblock(crit);
- ForEachItemIn(i4,tojoin)
- tojoin.item(i4).join();
- }
- threadwrappers.kill();
- stopall = false;
- }
- return true;
- }
- IPooledThreadIterator *running()
- {
- CriticalBlock block(crit);
- CPooledThreadIterator *ret = new CPooledThreadIterator;
- ForEachItemIn(i,threadwrappers) {
- CPooledThreadWrapper &it = threadwrappers.item(i);
- if (!it.isStopped()) {
- IPooledThread &t = it.queryThread();
- t.Link();
- ret->threads.append(t);
- }
- }
- return ret;
- }
- unsigned runningCount()
- {
- return (unsigned)atomic_read(&numrunning);
- }
- void notifyStarted(CPooledThreadWrapper *item)
- {
- item->markStarted();
- }
- bool notifyStopped(CPooledThreadWrapper *item)
- {
- CriticalBlock block(crit);
- PooledThreadHandle myid = item->markStopped();
- ForEachItemIn(i1,waitingids) { // tell anyone waiting
- if (waitingids.item(i1)==myid)
- ((Semaphore *)waitingsems.item(i1))->signal();
- }
- bool ret = true;
- if (defaultmax) {
- unsigned n=threadwrappers.ordinality();
- for (unsigned i2=targetpoolsize;i2<n;i2++) { // only check excess for efficiency
- if (item==&threadwrappers.item(i2)) {
- threadwrappers.remove(i2);
- ret = false;
- break;
- }
- }
- availsem.signal();
- }
- return ret;
- }
- void setStartDelayTracing(unsigned secs)
- {
- traceStartDelayPeriod = secs;
- }
- };
- IThreadPool *createThreadPool(const char *poolname,IThreadFactory *factory,IExceptionHandler *exceptionHandler,unsigned defaultmax, unsigned delay, unsigned stacksize, unsigned timeoutOnRelease, unsigned targetpoolsize)
- {
- return new CThreadPool(factory,exceptionHandler,poolname,defaultmax,delay,stacksize,timeoutOnRelease,targetpoolsize);
- }
- //=======================================================================================================
- static void CheckAllowedProgram(const char *prog,const char *allowed)
- {
- if (!prog||!allowed||(strcmp(allowed,"*")==0))
- return;
- StringBuffer head;
- bool inq = false;
- // note don't have to be too worried about odd quoting as matching fixed list
- while (*prog&&((*prog!=' ')||inq)) {
- if (*prog=='"')
- inq = !inq;
- head.append(*(prog++));
- }
- StringArray list;
- list.appendList(allowed, ",");
- ForEachItemIn(i,list) {
- if (WildMatch(head.str(),list.item(i)))
- return;
- }
- ERRLOG("Unauthorized pipe program(%s)",head.str());
- throw MakeStringException(-1,"Unauthorized pipe program(%s)",head.str());
- }
- class CSimplePipeStream: public CInterface, implements ISimpleReadStream
- {
- public:
- IMPLEMENT_IINTERFACE;
- CSimplePipeStream(IPipeProcess *_pipe, bool _isStderr) : pipe(_pipe), isStderr(_isStderr) {}
- virtual size32_t read(size32_t sz, void * data)
- {
- if (isStderr)
- return pipe->readError(sz, data);
- else
- return pipe->read(sz, data);
- }
- private:
- Owned<IPipeProcess> pipe;
- bool isStderr;
- };
- #ifdef _WIN32
- class CWindowsPipeProcess: public CInterface, implements IPipeProcess
- {
- HANDLE pipeProcess;
- HANDLE hInput;
- HANDLE hOutput;
- HANDLE hError;
- StringAttr title;
- unsigned retcode;
- CriticalSection sect;
- bool aborted;
- StringAttr allowedprogs;
- StringArray envVars;
- StringArray envValues;
- public:
- IMPLEMENT_IINTERFACE;
- CWindowsPipeProcess(const char *_allowedprogs)
- : allowedprogs(_allowedprogs)
- {
- pipeProcess = (HANDLE)-1;
- hInput=(HANDLE)-1;
- hOutput=(HANDLE)-1;
- hError=(HANDLE)-1;
- retcode = (unsigned)-1;
- aborted = false;
- }
- ~CWindowsPipeProcess()
- {
- kill();
- }
- void kill()
- {
- doCloseInput();
- doCloseOutput();
- doCloseError();
- if (pipeProcess != (HANDLE)-1) {
- CloseHandle(pipeProcess);
- pipeProcess = (HANDLE)-1;
- }
- }
- bool run(const char *_title,const char *prog,const char *dir,bool hasinput,bool hasoutput,bool haserror, size32_t stderrbufsize)
- {
- // size32_t stderrbufsize ignored as not required (I think)
- CriticalBlock block(sect);
- kill();
- title.clear();
- if (_title) {
- title.set(_title);
- PROGLOG("%s: Creating PIPE process : %s", title.get(), prog);
- }
- CheckAllowedProgram(prog,allowedprogs);
- SECURITY_ATTRIBUTES sa;
- sa.nLength = sizeof(SECURITY_ATTRIBUTES);
- sa.bInheritHandle = TRUE;
- sa.lpSecurityDescriptor = NULL;
- HANDLE hProgOutput=(HANDLE)-1;
- HANDLE hProgInput=(HANDLE)-1;
- HANDLE hProgError=(HANDLE)-1;
-
- HANDLE h;
- //NB: Create a pipe handles that are not inherited our end
- if (hasinput) {
- CreatePipe(&hProgInput,&h,&sa,0);
- DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hInput, 0, FALSE, DUPLICATE_SAME_ACCESS);
- CloseHandle(h);
- }
- if (hasoutput) {
- CreatePipe(&h,&hProgOutput,&sa,0);
- DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hOutput, 0, FALSE, DUPLICATE_SAME_ACCESS);
- CloseHandle(h);
- }
- if (haserror) {
- CreatePipe(&h,&hProgError,&sa,0);
- DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hError, 0, FALSE, DUPLICATE_SAME_ACCESS);
- CloseHandle(h);
- }
- STARTUPINFO StartupInfo;
- _clear(StartupInfo);
- StartupInfo.cb = sizeof(StartupInfo);
- StartupInfo.wShowWindow = SW_HIDE;
- StartupInfo.dwFlags = STARTF_USESTDHANDLES|STARTF_USESHOWWINDOW ;
- StartupInfo.hStdOutput = hasoutput?hProgOutput:GetStdHandle(STD_OUTPUT_HANDLE);
- StartupInfo.hStdError = haserror?hProgError:GetStdHandle(STD_ERROR_HANDLE);
- StartupInfo.hStdInput = hasinput?hProgInput:GetStdHandle(STD_INPUT_HANDLE);
-
- PROCESS_INFORMATION ProcessInformation;
- // MORE - should create a new environment block that is copy of parent's, then set all the values in envVars/envValues, and pass it
- if (!CreateProcess(NULL, (char *)prog, NULL,NULL,TRUE,0,NULL, dir&&*dir?dir:NULL, &StartupInfo,&ProcessInformation)) {
- if (_title) {
- StringBuffer errstr;
- formatSystemError(errstr, GetLastError());
- ERRLOG("%s: PIPE process '%s' failed: %s", title.get(), prog, errstr.str());
- }
- return false;
- }
- pipeProcess = ProcessInformation.hProcess;
- CloseHandle(ProcessInformation.hThread);
- if (hasoutput)
- CloseHandle(hProgOutput);
- if (hasinput)
- CloseHandle(hProgInput);
- if (haserror)
- CloseHandle(hProgError);
- return true;
- }
- virtual void setenv(const char *var, const char *value)
- {
- assertex(var);
- if (!value)
- value = "";
- envVars.append(var);
- envValues.append(value);
- }
- size32_t read(size32_t sz, void *buf)
- {
- DWORD sizeRead;
- if (!ReadFile(hOutput, buf, sz, &sizeRead, NULL)) {
- //raise error here
- if(aborted)
- return 0;
- int err=GetLastError();
- switch(err)
- {
- case ERROR_HANDLE_EOF:
- case ERROR_BROKEN_PIPE:
- case ERROR_NO_DATA:
- return 0;
- default:
- aborted = true;
- IException *e = MakeOsException(err, "Pipe: ReadFile failed (size %d)", sz);
- PrintExceptionLog(e, NULL);
- throw e;
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeRead);
- }
- ISimpleReadStream *getOutputStream()
- {
- return new CSimplePipeStream(LINK(this), false);
- }
- size32_t readError(size32_t sz, void *buf)
- {
- DWORD sizeRead;
- if (!ReadFile(hError, buf, sz, &sizeRead, NULL)) {
- //raise error here
- if(aborted)
- return 0;
- int err=GetLastError();
- switch(err)
- {
- case ERROR_HANDLE_EOF:
- case ERROR_BROKEN_PIPE:
- case ERROR_NO_DATA:
- return 0;
- default:
- aborted = true;
- IException *e = MakeOsException(err, "Pipe: ReadError failed (size %d)", sz);
- PrintExceptionLog(e, NULL);
- throw e;
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeRead);
- }
- ISimpleReadStream *getErrorStream()
- {
- return new CSimplePipeStream(LINK(this), true);
- }
- size32_t write(size32_t sz, const void *buf)
- {
- DWORD sizeWritten;
- if (!WriteFile(hInput, buf, sz, &sizeWritten, NULL)) {
- int err=GetLastError();
- if ((err==ERROR_HANDLE_EOF)||aborted)
- sizeWritten = 0;
- else {
- IException *e = MakeOsException(err, "Pipe: WriteFile failed (size %d)", sz);
- PrintExceptionLog(e, NULL);
- throw e;
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeWritten);
- }
- unsigned wait()
- {
- CriticalBlock block(sect);
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
- {
- CriticalUnblock unblock(sect);
- WaitForSingleObject(pipeProcess, INFINITE);
- }
- if (pipeProcess != (HANDLE)-1) {
- GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
- CloseHandle(pipeProcess);
- pipeProcess = (HANDLE)-1;
- }
- if (title.length())
- PROGLOG("%s: Pipe: process complete",title.get());
- }
- return retcode;
- }
- unsigned wait(unsigned timeoutms, bool &timedout)
- {
- CriticalBlock block(sect);
- timedout = false;
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
- {
- CriticalUnblock unblock(sect);
- if (WaitForSingleObject(pipeProcess, timeoutms)!=WAIT_OBJECT_0) {
- timedout = true;
- return retcode;
- }
- }
- if (pipeProcess != (HANDLE)-1) {
- GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
- CloseHandle(pipeProcess);
- pipeProcess = (HANDLE)-1;
- }
- if (title.length())
- PROGLOG("%s: Pipe: process complete",title.get());
- }
- return retcode;
- }
- void notifyTerminated(HANDLE pid,unsigned _retcode)
- {
- CriticalBlock block(sect);
- if ((pid!=(HANDLE)-1)&&(pid==pipeProcess)) {
- retcode = _retcode;
- pipeProcess = (HANDLE)-1;
- }
- }
-
- void doCloseInput()
- {
- CriticalBlock block(sect);
- if (hInput != (HANDLE)-1) {
- CloseHandle(hInput);
- hInput = (HANDLE)-1;
- }
- }
- void doCloseOutput()
- {
- CriticalBlock block(sect);
- if (hOutput != (HANDLE)-1) {
- CloseHandle(hOutput);
- hOutput = (HANDLE)-1;
- }
- }
- void doCloseError()
- {
- CriticalBlock block(sect);
- if (hError != (HANDLE)-1) {
- CloseHandle(hError);
- hError = (HANDLE)-1;
- }
- }
- void closeInput()
- {
- doCloseInput();
- }
- void closeOutput()
- {
- doCloseOutput();
- }
- void closeError()
- {
- doCloseError();
- }
- void abort()
- {
- CriticalBlock block(sect);
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe Aborting",title.get());
- aborted = true;
- //doCloseOutput(); // seems to work better without this
- doCloseInput();
- {
- CriticalUnblock unblock(sect);
- Sleep(100);
- }
- try { // this code is problematic for some reason
- if (pipeProcess != (HANDLE)-1) {
- TerminateProcess(pipeProcess, 255);
- CloseHandle(pipeProcess);
- pipeProcess = (HANDLE)-1;
- }
- }
- catch (...) {
- // ignore errors
- }
- if (title.length())
- PROGLOG("%s: Pipe Aborted",title.get());
- }
- }
- bool hasInput()
- {
- return hInput!=(HANDLE)-1;
- }
- bool hasOutput()
- {
- return hOutput!=(HANDLE)-1;
- }
- bool hasError()
- {
- return hError!=(HANDLE)-1;
- }
- HANDLE getProcessHandle()
- {
- return pipeProcess;
- }
- };
- IPipeProcess *createPipeProcess(const char *allowedprogs)
- {
- return new CWindowsPipeProcess(allowedprogs);
- }
- #else
- class CIgnoreSIGPIPE
- {
- public:
- CIgnoreSIGPIPE()
- {
- struct sigaction act;
- sigset_t blockset;
- sigemptyset(&blockset);
- act.sa_mask = blockset;
- act.sa_handler = SIG_IGN;
- sigaction(SIGPIPE, &act, NULL);
- }
- ~CIgnoreSIGPIPE()
- {
- signal(SIGPIPE, SIG_DFL);
- }
- };
- #define WHITESPACE " \t\n\r"
- #define START_FAILURE (199)
- static unsigned dowaitpid(HANDLE pid, int mode)
- {
- while (pid != (HANDLE)-1) {
- int stat=-1;
- int ret = waitpid(pid, &stat, mode);
- if (ret>0)
- {
- if (WIFEXITED(stat))
- return WEXITSTATUS(stat);
- else if (WIFSIGNALED(stat))
- {
- ERRLOG("Program was terminated by signal %u", (unsigned) WTERMSIG(stat));
- if (WTERMSIG(stat)==SIGPIPE)
- return 0;
- return 254;
- }
- else
- {
- return 254;
- }
- }
- if (ret==0)
- break;
- int err = errno;
- if (err == ECHILD)
- break;
- if (err!=EINTR) {
- ERRLOG("dowait failed with errcode %d",err);
- return (unsigned)-1;
- }
- }
- return 0;
- }
- class CLinuxPipeProcess: public CInterface, implements IPipeProcess
- {
- class cForkThread: public Thread
- {
- CLinuxPipeProcess *parent;
- public:
- cForkThread(CLinuxPipeProcess *_parent)
- {
- parent = _parent;
- }
- int run()
- {
- parent->run();
- return 0;
- }
- };
- Owned<cForkThread> forkthread;
- class cStdErrorBufferThread: public Thread
- {
- MemoryAttr buf;
- size32_t bufsize;
- Semaphore stopsem;
- CriticalSection §
- int &hError;
- public:
- cStdErrorBufferThread(size32_t maxbufsize,int &_hError,CriticalSection &_sect)
- : hError(_hError), sect(_sect)
- {
- buf.allocate(maxbufsize);
- bufsize = 0;
- }
- int run()
- {
- while (!stopsem.wait(1000)) {
- CriticalBlock block(sect);
- if (hError!=(HANDLE)-1) { // hmm who did that
- fcntl(hError,F_SETFL,O_NONBLOCK); // make sure non-blocking
- if (bufsize<buf.length()) {
- size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
- if ((int)sizeRead>0) {
- bufsize += sizeRead;
- }
- }
- else { // flush (to avoid process blocking)
- byte tmp[1024];
- size32_t totsz = 0;
- for (unsigned i=0;i<1024;i++) {
- size32_t sz = (size32_t)::read(hError, tmp, sizeof(tmp));
- if ((int)sz<=0)
- break;
- totsz+=sz;
- }
- if (totsz)
- WARNLOG("Lost %d bytes of stderr output",totsz);
- }
- }
- }
- if (hError!=(HANDLE)-1) { // hmm who did that
- fcntl(hError,F_SETFL,0); // read any remaining data in blocking mode
- while (bufsize<buf.length()) {
- size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
- if ((int)sizeRead>0)
- bufsize += sizeRead;
- else
- break;
- }
- }
- return 0;
- }
- void stop()
- {
- stopsem.signal();
- Thread::join();
- }
- size32_t read(size32_t sz,void *out)
- {
- CriticalBlock block(sect);
- if (bufsize<sz)
- sz = bufsize;
- if (sz>0) {
- memcpy(out,buf.bufferBase(),sz);
- if (sz!=bufsize) {
- bufsize -= sz;
- memmove(buf.bufferBase(),(byte *)buf.bufferBase()+sz,bufsize); // not ideal but hopefully not large
- }
- else
- bufsize = 0;
- }
- return sz;
- }
- } *stderrbufferthread;
- protected: friend class PipeWriterThread;
- HANDLE pipeProcess;
- HANDLE hInput;
- HANDLE hOutput;
- HANDLE hError;
- bool hasinput;
- bool hasoutput;
- bool haserror;
- StringAttr title;
- StringAttr cmd;
- StringAttr prog;
- StringAttr dir;
- int retcode;
- CriticalSection sect;
- Semaphore started;
- bool aborted;
- MemoryBuffer stderrbuf;
- size32_t stderrbufsize;
- StringAttr allowedprogs;
- StringArray envVars;
- StringArray envValues;
- public:
- IMPLEMENT_IINTERFACE;
- CLinuxPipeProcess(const char *_allowedprogs)
- : allowedprogs(_allowedprogs)
- {
- pipeProcess = (HANDLE)-1;
- hInput=(HANDLE)-1;
- hOutput=(HANDLE)-1;
- hError=(HANDLE)-1;
- retcode = -1;
- aborted = false;
- stderrbufferthread = NULL;
- }
- ~CLinuxPipeProcess()
- {
- kill();
- }
- void kill()
- {
- closeInput();
- closeOutput();
- closeError();
- Owned<cForkThread> ft;
- cStdErrorBufferThread *et;
- { CriticalBlock block(sect); // clear forkthread and stderrbufferthread
- ft.setown(forkthread.getClear());
- et = stderrbufferthread;
- stderrbufferthread = NULL;
- }
- if (ft) {
- ft->join();
- ft.clear();
- }
- if (et) {
- et->stop();
- delete et;
- }
- }
- char **splitargs(const char *line,unsigned &argc)
- {
- char *buf = strdup(line);
- // first count params (this probably could be improved)
- char *s = buf;
- argc = 0;
- while (readarg(s))
- argc++;
- free(buf);
- size32_t l = strlen(line)+1;
- size32_t al = (argc+1)*sizeof(char *);
- char **argv = (char **)malloc(al+l);
- argv[argc] = NULL;
- s = ((char *)argv)+al;
- memcpy(s,line,l);
- for (unsigned i=0;i<argc;i++)
- argv[i] = readarg(s);
- return argv;
- }
- void run()
- {
- int inpipe[2];
- int outpipe[2];
- int errpipe[2];
- if (hasinput)
- if (::pipe(inpipe)==-1)
- throw MakeOsException(errno);
- if (hasoutput)
- if (::pipe(outpipe)==-1)
- throw MakeOsException(errno);
- if (haserror)
- if (::pipe(errpipe)==-1)
- throw MakeOsException(errno);
- loop {
- pipeProcess = (HANDLE)fork();
- if (pipeProcess!=(HANDLE)-1)
- break;
- if (errno!=EAGAIN) {
- if (hasinput) {
- close(inpipe[0]);
- close(inpipe[1]);
- }
- if (hasoutput) {
- close(outpipe[0]);
- close(outpipe[1]);
- }
- if (haserror) {
- close(errpipe[0]);
- close(errpipe[1]);
- }
- retcode = START_FAILURE;
- started.signal();
- return;
- }
- }
- if (pipeProcess==0) { // child
- if (hasinput) {
- dup2(inpipe[0],0);
- close(inpipe[0]);
- close(inpipe[1]);
- }
- if (hasoutput) {
- dup2(outpipe[1],1);
- close(outpipe[0]);
- close(outpipe[1]);
- }
- if (haserror) {
- dup2(errpipe[1],2);
- close(errpipe[0]);
- close(errpipe[1]);
- }
- unsigned argc;
- char **argv=splitargs(prog,argc);
- if (dir.get()) {
- if (chdir(dir) == -1)
- throw MakeStringException(-1, "CLinuxPipeProcess::run: could not change dir to %s", dir.get());
- }
- ForEachItemIn(idx, envVars)
- {
- ::setenv(envVars.item(idx), envValues.item(idx), 1);
- }
- execvp(argv[0],argv);
- _exit(START_FAILURE); // must be _exit!!
- }
- if (hasinput)
- close(inpipe[0]);
- if (hasoutput)
- close(outpipe[1]);
- if (haserror)
- close(errpipe[1]);
- hInput = hasinput?inpipe[1]:((HANDLE)-1);
- hOutput = hasoutput?outpipe[0]:((HANDLE)-1);
- hError = haserror?errpipe[0]:((HANDLE)-1);
- started.signal();
- retcode = dowaitpid(pipeProcess, 0);
- if (retcode==START_FAILURE)
- closeOutput();
- }
- bool run(const char *_title,const char *_prog,const char *_dir,bool _hasinput,bool _hasoutput, bool _haserror, size32_t stderrbufsize)
- {
- static CriticalSection runsect; // single thread process start to avoid forked handle open/closes interleaving
- CriticalBlock runblock(runsect);
- kill();
- CriticalBlock block(sect);
- hasinput = _hasinput;
- hasoutput = _hasoutput;
- haserror = _haserror;
- title.clear();
- prog.set(_prog);
- dir.set(_dir);
- if (_title) {
- title.set(_title);
- PROGLOG("%s: Creating PIPE program process : '%s' - hasinput=%d, hasoutput=%d stderrbufsize=%d", title.get(), prog.get(),(int)hasinput, (int)hasoutput, stderrbufsize);
- }
- CheckAllowedProgram(prog,allowedprogs);
- retcode = 0;
- if (forkthread) {
- {
- CriticalUnblock unblock(sect);
- forkthread->join();
- }
- forkthread.clear();
- }
- forkthread.setown(new cForkThread(this));
- forkthread->start();
- {
- CriticalUnblock unblock(sect);
- started.wait();
- forkthread->join(50); // give a chance to fail
- }
- if (retcode==START_FAILURE) {
- ERRLOG("%s: PIPE process '%s' failed to start", title.get()?title.get():"CLinuxPipeProcess", prog.get());
- forkthread.clear();
- return false;
- }
- if (stderrbufsize) {
- if (stderrbufferthread) {
- stderrbufferthread->stop();
- delete stderrbufferthread;
- }
- stderrbufferthread = new cStdErrorBufferThread(stderrbufsize,hError,sect);
- stderrbufferthread->start();
- }
- return true;
- }
- virtual void setenv(const char *var, const char *value)
- {
- assertex(var);
- if (!value)
- value = "";
- envVars.append(var);
- envValues.append(value);
- }
-
- size32_t read(size32_t sz, void *buf)
- {
- CriticalBlock block(sect);
- if (aborted)
- return (size32_t)-1;
- if (hOutput==(HANDLE)-1)
- return 0;
- size32_t sizeRead;
- loop {
- {
- CriticalUnblock unblock(sect);
- sizeRead = (size32_t)::read(hOutput, buf, sz);
- }
- if (sizeRead!=(size32_t)-1)
- break;
- if (aborted)
- break;
- if (errno!=EINTR) {
- aborted = true;
- throw MakeErrnoException(errno,"Pipe: read failed (size %d)", sz);
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeRead);
- }
- ISimpleReadStream *getOutputStream()
- {
- return new CSimplePipeStream(LINK(this), false);
- }
-
- size32_t write(size32_t sz, const void *buf)
- {
- CriticalBlock block(sect);
- CIgnoreSIGPIPE ignoresigpipe;
- if (aborted)
- return (size32_t)-1;
- if (hInput==(HANDLE)-1)
- return 0;
- size32_t sizeWritten;
- loop {
- {
- CriticalUnblock unblock(sect);
- sizeWritten = (size32_t)::write(hInput, buf, sz);
- }
- if (sizeWritten!=(size32_t)-1)
- break;
- if (aborted)
- break;
- if (errno!=EINTR) {
- throw MakeErrnoException(errno,"Pipe: write failed (size %d)", sz);
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeWritten);
- }
- size32_t readError(size32_t sz, void *buf)
- {
- CriticalBlock block(sect);
- if (stderrbufferthread)
- return stderrbufferthread->read(sz,buf);
- if (aborted)
- return (size32_t)-1;
- if (hError==(HANDLE)-1)
- return 0;
- size32_t sizeRead;
- loop {
- {
- CriticalUnblock unblock(sect);
- sizeRead = (size32_t)::read(hError, buf, sz);
- }
- if (sizeRead!=(size32_t)-1)
- break;
- if (aborted)
- break;
- if (errno!=EINTR) {
- aborted = true;
- throw MakeErrnoException(errno,"Pipe: readError failed (size %d)", sz);
- }
- }
- return aborted?((size32_t)-1):((size32_t)sizeRead);
- }
- ISimpleReadStream *getErrorStream()
- {
- return new CSimplePipeStream(LINK(this), true);
- }
- void notifyTerminated(HANDLE pid,unsigned _retcode)
- {
- CriticalBlock block(sect);
- if (((int)pid>0)&&(pid==pipeProcess)) {
- retcode = _retcode;
- pipeProcess = (HANDLE)-1;
- }
- }
-
-
- unsigned wait()
- {
- CriticalBlock block(sect);
- if (stderrbufferthread)
- stderrbufferthread->stop();
- if (forkthread) {
- {
- CriticalUnblock unblock(sect);
- forkthread->join();
- }
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
- pipeProcess = (HANDLE)-1;
- }
- forkthread.clear();
- }
- return retcode;
- }
- unsigned wait(unsigned timeoutms, bool &timedout)
- {
- CriticalBlock block(sect);
- timedout = false;
- if (forkthread) {
- {
- CriticalUnblock unblock(sect);
- if (!forkthread->join(timeoutms)) {
- timedout = true;
- return retcode;
- }
- }
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
- pipeProcess = (HANDLE)-1;
- }
- forkthread.clear();
- }
- return retcode;
- }
-
- void closeOutput()
- {
- CriticalBlock block(sect);
- if (hOutput != (HANDLE)-1) {
- ::close(hOutput);
- hOutput = (HANDLE)-1;
- }
- }
-
- void closeInput()
- {
- CriticalBlock block(sect);
- if (hInput != (HANDLE)-1) {
- ::close(hInput);
- hInput = (HANDLE)-1;
- }
- }
- void closeError()
- {
- CriticalBlock block(sect);
- if (hError != (HANDLE)-1) {
- ::close(hError);
- hError = (HANDLE)-1;
- }
- }
- void abort()
- {
- CriticalBlock block(sect);
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Pipe Aborting",title.get());
- aborted = true;
- closeInput();
- {
- CriticalUnblock unblock(sect);
- forkthread->join(1000);
- }
- if (pipeProcess != (HANDLE)-1) {
- if (title.length())
- PROGLOG("%s: Forcibly killing pipe process %d",title.get(),pipeProcess);
- ::kill(pipeProcess,SIGKILL); // if this doesn't kill it we are in trouble
- CriticalUnblock unblock(sect);
- wait();
- }
- if (title.length())
- PROGLOG("%s: Pipe Aborted",title.get());
- retcode = -1;
- forkthread.clear();
- }
- }
-
- bool hasInput()
- {
- CriticalBlock block(sect);
- return hInput!=(HANDLE)-1;
- }
-
- bool hasOutput()
- {
- CriticalBlock block(sect);
- return hOutput!=(HANDLE)-1;
- }
- bool hasError()
- {
- CriticalBlock block(sect);
- return hError!=(HANDLE)-1;
- }
- HANDLE getProcessHandle()
- {
- CriticalBlock block(sect);
- return pipeProcess;
- }
- };
- IPipeProcess *createPipeProcess(const char *allowedprogs)
- {
- return new CLinuxPipeProcess(allowedprogs);
- }
- #endif
- // Worker thread
- class CWorkQueueThread: public CInterface, implements IWorkQueueThread
- {
- public:
- IMPLEMENT_IINTERFACE;
- CriticalSection crit;
- unsigned persisttime;
- class cWorkerThread: public Thread
- {
- unsigned persisttime;
- CWorkQueueThread *parent;
- CriticalSection &crit;
- public:
- IMPLEMENT_IINTERFACE;
- cWorkerThread(CWorkQueueThread *_parent,CriticalSection &_crit,unsigned _persisttime)
- : crit(_crit)
- {
- parent = _parent;
- persisttime = _persisttime;
- }
- QueueOf<IWorkQueueItem,false> queue;
- Semaphore sem;
- int run()
- {
- loop {
- IWorkQueueItem * work;
- bool wr = sem.wait(persisttime);
- {
- CriticalBlock block(crit);
- if (!wr) {
- wr = sem.wait(0); // catch race
- if (!wr)
- break; // timed out
- }
- work = queue.dequeue();
- }
- if (!work)
- break;
- try {
- work->execute();
- work->Release();
- }
- catch (IException *e)
- {
- EXCLOG(e,"CWorkQueueThread item execute");
- e->Release();
- }
- }
- CriticalBlock block(crit);
- parent->worker=NULL; // this should be safe
- return 0;
- }
-
- } *worker;
- CWorkQueueThread(unsigned _persisttime)
- {
- persisttime = _persisttime;
- worker = NULL;
- }
- ~CWorkQueueThread()
- {
- wait();
- }
- void post(IWorkQueueItem *packet)
- {
- CriticalBlock block(crit);
- if (!worker) {
- worker = new cWorkerThread(this,crit,persisttime);
- worker->startRelease();
- }
- worker->queue.enqueue(packet);
- worker->sem.signal();
- }
- void wait()
- {
- CriticalBlock block(crit);
- if (worker) {
- worker->queue.enqueue(NULL);
- worker->sem.signal();
- Linked<cWorkerThread> wt;
- wt.set(worker);
- CriticalUnblock unblock(crit);
- wt->join();
- }
- }
- unsigned pending()
- {
- CriticalBlock block(crit);
- unsigned ret = 0;
- if (worker)
- ret = worker->queue.ordinality();
- return ret;
- }
- };
- IWorkQueueThread *createWorkQueueThread(unsigned persisttime)
- {
- return new CWorkQueueThread(persisttime);
- }
- unsigned threadLogID() // for use in logging
- {
- #ifndef _WIN32
- #ifdef SYS_gettid
- return (unsigned) (memsize_t) syscall(SYS_gettid);
- #endif
- #endif
- return (unsigned)(memsize_t) GetCurrentThreadId(); // truncated in 64bit
- }
|