jthread.cpp 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jthread.hpp"
  14. #include "jlib.hpp"
  15. #include "jfile.hpp"
  16. #include "jmutex.hpp"
  17. #include "jexcept.hpp"
  18. #include "jmisc.hpp"
  19. #include "jqueue.tpp"
  20. #include "jregexp.hpp"
  21. #include <assert.h>
  22. #ifdef _WIN32
  23. #include <process.h>
  24. #else
  25. #include <unistd.h>
  26. #include <sys/wait.h>
  27. #include <sys/syscall.h>
  28. #include <sys/types.h>
  29. #include <sys/resource.h>
  30. #endif
  31. #define LINUX_STACKSIZE_CAP (0x200000)
  32. //#define NO_CATCHALL
  33. //static __thread ThreadTermFunc threadTerminationHook;
  34. static thread_local std::vector<ThreadTermFunc> threadTermHooks;
  35. static std::vector<ThreadTermFunc> mainThreadTermHooks;
  36. static struct MainThreadIdHelper
  37. {
  38. ThreadId tid;
  39. MainThreadIdHelper()
  40. {
  41. tid = GetCurrentThreadId();
  42. }
  43. } mainThreadIdHelper;
  44. /*
  45. * NB: Thread termination hook functions are tracked using a thread local vector (threadTermHooks).
  46. * However, hook functions installed on the main thread must be tracked separately in a non thread local vector (mainThreadTermHooks).
  47. * This is because thread local variables are destroyed before atexit functions are called and therefore before ModuleExitObjects().
  48. * The hooks tracked by mainThreadTermHooks are called by the MODULE_EXIT below.
  49. */
  50. void addThreadTermFunc(ThreadTermFunc onTerm)
  51. {
  52. auto &termHooks = (GetCurrentThreadId() == mainThreadIdHelper.tid) ? mainThreadTermHooks : threadTermHooks;
  53. for (auto hook: termHooks)
  54. {
  55. if (hook==onTerm)
  56. return;
  57. }
  58. termHooks.push_back(onTerm);
  59. }
  60. void callThreadTerminationHooks(bool isPooled)
  61. {
  62. std::vector<ThreadTermFunc> keepHooks;
  63. auto &termHooks = (GetCurrentThreadId() == mainThreadIdHelper.tid) ? mainThreadTermHooks : threadTermHooks;
  64. for (auto hook: termHooks)
  65. {
  66. if ((*hook)(isPooled) && isPooled)
  67. keepHooks.push_back(hook);
  68. }
  69. termHooks.swap(keepHooks);
  70. }
  71. PointerArray *exceptionHandlers = NULL;
  72. MODULE_INIT(INIT_PRIORITY_JTHREAD)
  73. {
  74. exceptionHandlers = new PointerArray();
  75. return true;
  76. }
  77. MODULE_EXIT()
  78. {
  79. callThreadTerminationHooks(false);
  80. delete exceptionHandlers;
  81. }
  82. void addThreadExceptionHandler(IExceptionHandler *handler)
  83. {
  84. assertex(exceptionHandlers); // have to ensure MODULE_INIT has appropriate priority.
  85. exceptionHandlers->append(handler);
  86. }
  87. void removeThreadExceptionHandler(IExceptionHandler *handler)
  88. {
  89. exceptionHandlers->zap(handler);
  90. }
  91. static bool SEHHandling = false;
  92. void enableThreadSEH() { SEHHandling=true; }
  93. void disableThreadSEH() { SEHHandling=false; } // only prevents new threads from having SEH handler, no mech. for turning off existing threads SEH handling.
  94. static ICopyArrayOf<Thread> ThreadList;
  95. static CriticalSection ThreadListSem;
  96. static size32_t defaultThreadStackSize=0;
  97. static ICopyArrayOf<Thread> ThreadDestroyList;
  98. static SpinLock ThreadDestroyListLock;
  99. #ifdef _WIN32
  100. extern void *EnableSEHtranslation();
  101. unsigned WINAPI Thread::_threadmain(LPVOID v)
  102. #else
  103. void *Thread::_threadmain(void *v)
  104. #endif
  105. {
  106. Thread * t = (Thread *)v;
  107. #ifdef _WIN32
  108. if (SEHHandling)
  109. EnableSEHtranslation();
  110. #else
  111. t->tidlog = threadLogID();
  112. #endif
  113. int ret = t->begin();
  114. char *&threadname = t->cthreadname.threadname;
  115. if (threadname) {
  116. memsize_t l=strlen(threadname);
  117. char *newname = (char *)malloc(l+8+1);
  118. memcpy(newname,"Stopped ",8);
  119. memcpy(newname+8,threadname,l+1);
  120. char *oldname = threadname;
  121. threadname = newname;
  122. free(oldname);
  123. }
  124. {
  125. // need to ensure joining thread does not race with us to release
  126. t->Link(); // extra safety link
  127. {
  128. SpinBlock block(ThreadDestroyListLock);
  129. ThreadDestroyList.append(*t);
  130. }
  131. try {
  132. t->stopped.signal();
  133. if (t->Release()) {
  134. PROGLOG("extra unlinked thread");
  135. PrintStackReport();
  136. }
  137. else
  138. t->Release();
  139. }
  140. catch (...) {
  141. PROGLOG("thread release exception");
  142. throw;
  143. }
  144. {
  145. SpinBlock block(ThreadDestroyListLock);
  146. ThreadDestroyList.zap(*t); // hopefully won't get too big (i.e. one entry!)
  147. }
  148. }
  149. #if defined(_WIN32)
  150. return ret;
  151. #else
  152. return (void *) (memsize_t)ret;
  153. #endif
  154. }
  155. // JCSMORE - should have a setPriority(), unsupported under _WIN32
  156. void Thread::adjustPriority(int delta)
  157. {
  158. if (delta < -2)
  159. prioritydelta = -2;
  160. else if (delta > 2)
  161. prioritydelta = 2;
  162. else
  163. prioritydelta = delta;
  164. if (alive)
  165. {
  166. #if defined(_WIN32)
  167. int priority;
  168. switch (delta)
  169. {
  170. case -2: priority = THREAD_PRIORITY_LOWEST; break;
  171. case -1: priority = THREAD_PRIORITY_BELOW_NORMAL; break;
  172. case 0: priority = THREAD_PRIORITY_NORMAL; break;
  173. case +1: priority = THREAD_PRIORITY_ABOVE_NORMAL; break;
  174. case +2: priority = THREAD_PRIORITY_HIGHEST; break;
  175. }
  176. SetThreadPriority(hThread, priority);
  177. #else
  178. //MORE - What control is there?
  179. int policy;
  180. sched_param param;
  181. int rc;
  182. if (( rc = pthread_getschedparam(threadid, &policy, &param)) != 0)
  183. DBGLOG("pthread_getschedparam error: %d", rc);
  184. switch (delta)
  185. {
  186. // JCS - doubtful whether these good values...
  187. case -2: param.sched_priority = 0; policy =SCHED_OTHER; break;
  188. case -1: param.sched_priority = 0; policy =SCHED_OTHER; break;
  189. case 0: param.sched_priority = 0; policy =SCHED_OTHER; break;
  190. case +1: param.sched_priority = (sched_get_priority_max(SCHED_RR)-sched_get_priority_min(SCHED_RR))/2; policy =SCHED_RR; break;
  191. case +2: param.sched_priority = sched_get_priority_max(SCHED_RR); policy =SCHED_RR; break;
  192. }
  193. if(( rc = pthread_setschedparam(threadid, policy, &param)) != 0)
  194. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "u PID=%i", rc,policy,param.sched_priority,(unsigned __int64) threadid,getpid());
  195. else
  196. DBGLOG("priority set id=%" I64F "u policy=%i pri=%i PID=%i",(unsigned __int64) threadid,policy,param.sched_priority,getpid());
  197. #endif
  198. }
  199. }
  200. void Thread::adjustNiceLevel()
  201. {
  202. #if defined(_WIN32)
  203. int priority;
  204. if(nicelevel < -15)
  205. priority = THREAD_PRIORITY_TIME_CRITICAL;
  206. else if(nicelevel >= -15 && nicelevel < -10)
  207. priority = THREAD_PRIORITY_HIGHEST;
  208. else if(nicelevel >= -10 && nicelevel < 0)
  209. priority = THREAD_PRIORITY_ABOVE_NORMAL;
  210. else if(nicelevel == 0)
  211. priority = THREAD_PRIORITY_NORMAL;
  212. else if(nicelevel > 0 && nicelevel <= 10)
  213. priority = THREAD_PRIORITY_BELOW_NORMAL;
  214. else if(nicelevel > 10 && nicelevel <= 15)
  215. priority = THREAD_PRIORITY_LOWEST;
  216. else if(nicelevel >15)
  217. priority = THREAD_PRIORITY_IDLE;
  218. SetThreadPriority(hThread, priority);
  219. #elif defined(__linux__)
  220. setpriority(PRIO_PROCESS, 0, nicelevel);
  221. #else
  222. UNIMPLEMENTED;
  223. #endif
  224. }
  225. bool Thread::isCurrentThread() const
  226. {
  227. return GetCurrentThreadId() == threadid;
  228. }
  229. // _nicelevel ranges from -20 to 19, the higher the nice level, the less cpu time the thread will get.
  230. void Thread::setNice(int _nicelevel)
  231. {
  232. if (_nicelevel < -20 || _nicelevel > 19)
  233. throw MakeStringException(0, "nice level should be between -20 and 19");
  234. if(alive)
  235. throw MakeStringException(0, "nice can only be set before the thread is started.");
  236. nicelevel = _nicelevel;
  237. }
  238. void Thread::setStackSize(size32_t size)
  239. {
  240. stacksize = (unsigned short)(size/0x1000);
  241. }
  242. void Thread::setDefaultStackSize(size32_t size)
  243. {
  244. defaultThreadStackSize = size; // has no effect under windows (though may be used for calculations later)
  245. }
  246. int Thread::begin()
  247. {
  248. if(nicelevel)
  249. adjustNiceLevel();
  250. #ifndef _WIN32
  251. starting.signal();
  252. suspend.wait();
  253. #endif
  254. int ret=-1;
  255. try {
  256. ret = run();
  257. }
  258. catch (IException *e)
  259. {
  260. handleException(e);
  261. }
  262. #ifndef NO_CATCHALL
  263. catch (...)
  264. {
  265. handleException(MakeStringException(0, "Unknown exception in Thread %s", getName()));
  266. }
  267. #endif
  268. callThreadTerminationHooks(false);
  269. #ifdef _WIN32
  270. #ifndef _DEBUG
  271. CloseHandle(hThread); // leak handle when debugging,
  272. // fixes some lockups/crashes in the debugger when lots of threads being created
  273. #endif
  274. hThread = NULL;
  275. #endif
  276. //alive = false; // not safe here
  277. return ret;
  278. }
  279. void Thread::handleException(IException *e)
  280. {
  281. assertex(exceptionHandlers);
  282. if (exceptionHandlers->ordinality() == 0)
  283. {
  284. PrintExceptionLog(e,getName());
  285. //throw; // don't rethrow unhandled, preferable over alternative of causing process death
  286. e->Release();
  287. }
  288. else
  289. {
  290. PrintExceptionLog(e,getName());
  291. bool handled = false;
  292. ForEachItemIn(ie, *exceptionHandlers)
  293. {
  294. IExceptionHandler *handler = (IExceptionHandler *) exceptionHandlers->item(ie);
  295. handled = handler->fireException(e) || handled;
  296. }
  297. if (!handled)
  298. {
  299. // if nothing choose to handle it.
  300. EXCLOG(e, NULL);
  301. //throw e; // don't rethrow unhandled, preferable over alternative of causing process death
  302. }
  303. e->Release();
  304. }
  305. }
  306. void Thread::init(const char *_name)
  307. {
  308. #ifdef _WIN32
  309. hThread = NULL;
  310. #endif
  311. threadid = 0;
  312. tidlog = 0;
  313. alive = false;
  314. cthreadname.threadname = (NULL == _name) ? NULL : strdup(_name);
  315. ithreadname = &cthreadname;
  316. prioritydelta = 0;
  317. nicelevel = 0;
  318. stacksize = 0; // default is EXE default stack size (set by /STACK)
  319. }
  320. void Thread::start()
  321. {
  322. if (alive) {
  323. IWARNLOG("Thread::start(%s) - Thread already started!",getName());
  324. PrintStackReport();
  325. #ifdef _DEBUG
  326. throw MakeStringException(-1,"Thread::start(%s) - Thread already started!",getName());
  327. #endif
  328. return;
  329. }
  330. Link();
  331. startRelease();
  332. }
  333. void Thread::startRelease()
  334. {
  335. assertex(!alive);
  336. stopped.reinit(0); // just in case restarting
  337. #ifdef _WIN32
  338. hThread = (HANDLE)_beginthreadex(NULL, 0x1000*(unsigned)stacksize, Thread::_threadmain, this, CREATE_SUSPENDED, (unsigned *)&threadid);
  339. if (!hThread || !threadid)
  340. {
  341. Release();
  342. throw makeOsException(GetLastError());
  343. }
  344. #else
  345. int status;
  346. unsigned numretrys = 8;
  347. unsigned delay = 1000;
  348. for (;;) {
  349. pthread_attr_t attr;
  350. pthread_attr_init(&attr);
  351. pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
  352. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  353. if (stacksize)
  354. pthread_attr_setstacksize(&attr, (unsigned)stacksize*0x1000);
  355. else if (defaultThreadStackSize)
  356. pthread_attr_setstacksize(&attr, defaultThreadStackSize);
  357. else {
  358. #ifndef __64BIT__ // no need to cap 64bit
  359. size_t defss=0;
  360. pthread_attr_getstacksize(&attr, &defss);
  361. if (defss>LINUX_STACKSIZE_CAP)
  362. pthread_attr_setstacksize(&attr, LINUX_STACKSIZE_CAP);
  363. #endif
  364. }
  365. status = pthread_create(&threadid, &attr, Thread::_threadmain, this);
  366. if ((status==EAGAIN)||(status==EINTR)) {
  367. if (numretrys--==0)
  368. break;
  369. IWARNLOG("pthread_create(%d): Out of threads, retrying...",status);
  370. Sleep(delay);
  371. delay *= 2;
  372. }
  373. else
  374. break;
  375. }
  376. if (status) {
  377. threadid = 0;
  378. Release();
  379. IERRLOG("pthread_create returns %d",status);
  380. PrintStackReport();
  381. PrintMemoryReport();
  382. StringBuffer s;
  383. getThreadList(s);
  384. IERRLOG("Running threads:\n %s",s.str());
  385. throw makeOsException(status);
  386. }
  387. unsigned retryCount = 10;
  388. for (;;)
  389. {
  390. if (starting.wait(1000*10))
  391. break;
  392. else if (0 == --retryCount)
  393. throw MakeStringException(-1, "Thread::start(%s) failed", getName());
  394. IWARNLOG("Thread::start(%s) stalled, waiting to start, retrying", getName());
  395. }
  396. #endif
  397. alive = true;
  398. if (prioritydelta)
  399. adjustPriority(prioritydelta);
  400. {
  401. CriticalBlock block(ThreadListSem);
  402. ThreadList.zap(*this); // just in case restarting
  403. ThreadList.append(*this);
  404. }
  405. #ifdef _WIN32
  406. DWORD count = ResumeThread(hThread);
  407. assertex(count == 1);
  408. #else
  409. suspend.signal();
  410. #endif
  411. }
  412. bool Thread::join(unsigned timeout)
  413. {
  414. if (!alive&&!threadid) {
  415. #ifdef _DEBUG
  416. PROGLOG("join on unstarted thread!");
  417. PrintStackReport();
  418. #endif
  419. return true;
  420. }
  421. if (!stopped.wait(timeout))
  422. return false;
  423. if (!alive) // already joined
  424. {
  425. stopped.signal();
  426. return true;
  427. }
  428. unsigned st = 0;
  429. for (;;) { // this is to prevent race with destroy
  430. // (because Thread objects are not always link counted!)
  431. {
  432. SpinBlock block(ThreadDestroyListLock);
  433. if (ThreadDestroyList.find(*this)==NotFound)
  434. break;
  435. }
  436. #ifdef _DEBUG
  437. if (st==10)
  438. PROGLOG("Thread::join race");
  439. #endif
  440. Sleep(st); // switch back to exiting thread (not very elegant!)
  441. st++;
  442. if (st>10)
  443. st = 10; // note must be non-zero for high priority threads
  444. }
  445. #ifdef _DEBUG
  446. int c = getLinkCount();
  447. if (c>=DEAD_PSEUDO_COUNT) {
  448. PROGLOG("Dead/Dying thread joined! %d",c);
  449. PrintStackReport();
  450. }
  451. #endif
  452. alive = false; // should be safe here
  453. stopped.signal(); // signal stopped again, to prevent any parallel call from blocking.
  454. return true;
  455. }
  456. Thread::~Thread()
  457. {
  458. ithreadname = &cthreadname; // safer (as derived classes destroyed)
  459. #ifdef _DEBUG
  460. if (alive) {
  461. if (!stopped.wait(0)) { // see if fell out of threadmain and signal stopped
  462. PROGLOG("Live thread killed! %s",getName());
  463. PrintStackReport();
  464. }
  465. // don't need to resignal as we are on way out
  466. }
  467. #endif
  468. Link();
  469. // DBGLOG("Thread %x (%s) destroyed\n", threadid, threadname);
  470. {
  471. CriticalBlock block(ThreadListSem);
  472. ThreadList.zap(*this);
  473. }
  474. free(cthreadname.threadname);
  475. cthreadname.threadname = NULL;
  476. }
  477. unsigned getThreadCount()
  478. {
  479. CriticalBlock block(ThreadListSem);
  480. return ThreadList.ordinality();
  481. }
  482. StringBuffer & getThreadList(StringBuffer &str)
  483. {
  484. CriticalBlock block(ThreadListSem);
  485. ForEachItemIn(i,ThreadList) {
  486. Thread &item=ThreadList.item(i);
  487. item.getInfo(str).append("\n");
  488. }
  489. return str;
  490. }
  491. StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
  492. {
  493. CriticalBlock block(ThreadListSem);
  494. bool found=false;
  495. ForEachItemIn(i,ThreadList) {
  496. Thread &item=ThreadList.item(i);
  497. int h;
  498. unsigned t;
  499. const char *s = item.getLogInfo(h,t);
  500. if (s&&*s&&((thandle==0)||(h==thandle))&&((tid==0)||(t==tid))) {
  501. if (found) {
  502. name.clear();
  503. break; // only return if unambiguous
  504. }
  505. name.append(s);
  506. found = true;
  507. }
  508. }
  509. return name;
  510. }
  511. // CThreadedPersistent
  512. CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner), state(s_ready)
  513. {
  514. halt = false;
  515. athread.start();
  516. }
  517. CThreadedPersistent::~CThreadedPersistent()
  518. {
  519. join(INFINITE, false);
  520. halt = true;
  521. sem.signal();
  522. athread.join();
  523. }
  524. void CThreadedPersistent::threadmain()
  525. {
  526. for (;;)
  527. {
  528. sem.wait();
  529. if (halt)
  530. break;
  531. try
  532. {
  533. owner->threadmain();
  534. // Note we do NOT call the thread reset hook here - these threads are expected to be able to preserve state, I think
  535. }
  536. catch (IException *e)
  537. {
  538. VStringBuffer errMsg("CThreadedPersistent (%s)", athread.getName());
  539. EXCLOG(e, errMsg.str());
  540. exception.setown(e);
  541. joinSem.signal(); // leave in running state, signal to join to handle
  542. continue;
  543. }
  544. unsigned expected = s_running;
  545. if (!state.compare_exchange_strong(expected, s_ready))
  546. {
  547. expected = s_joining;
  548. if (state.compare_exchange_strong(expected, s_ready))
  549. joinSem.signal();
  550. }
  551. }
  552. }
  553. void CThreadedPersistent::start()
  554. {
  555. unsigned expected = s_ready;
  556. if (!state.compare_exchange_strong(expected, s_running))
  557. {
  558. VStringBuffer msg("CThreadedPersistent::start(%s) - not ready", athread.getName());
  559. IWARNLOG("%s", msg.str());
  560. PrintStackReport();
  561. throw MakeStringExceptionDirect(-1, msg.str());
  562. }
  563. sem.signal();
  564. }
  565. bool CThreadedPersistent::join(unsigned timeout, bool throwException)
  566. {
  567. unsigned expected = s_running;
  568. if (state.compare_exchange_strong(expected, s_joining))
  569. {
  570. if (!joinSem.wait(timeout))
  571. {
  572. unsigned expected = s_joining;
  573. if (state.compare_exchange_strong(expected, s_running)) // if still joining, restore running state
  574. return false;
  575. // if here, threadmain() set s_ready after timeout and has or will signal
  576. if (!joinSem.wait(60000) && throwException) // should be instant
  577. throwUnexpected();
  578. return true;
  579. }
  580. if (throwException && exception.get())
  581. {
  582. // switch back to ready state and throw
  583. Owned<IException> e = exception.getClear();
  584. unsigned expected = s_joining;
  585. if (!state.compare_exchange_strong(expected, s_ready))
  586. throwUnexpected();
  587. throw e.getClear();
  588. }
  589. }
  590. return true;
  591. }
  592. //class CAsyncFor
  593. void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
  594. {
  595. if (num <= 1)
  596. {
  597. if (num == 1)
  598. Do(0);
  599. return;
  600. }
  601. Mutex errmutex;
  602. IException *e=NULL;
  603. Owned<IShuffledIterator> shuffler;
  604. if (shuffled) {
  605. shuffler.setown(createShuffledIterator(num));
  606. shuffler->first(); // prime (needed to make thread safe)
  607. }
  608. unsigned i;
  609. if (maxatonce==1) { // no need for threads
  610. for (i=0;i<num;i++) {
  611. unsigned idx = shuffled?shuffler->lookup(i):i;
  612. try {
  613. Do(idx);
  614. }
  615. catch (IException * _e)
  616. {
  617. if (e)
  618. _e->Release(); // only return first
  619. else
  620. e = _e;
  621. if (abortFollowingException)
  622. break;
  623. }
  624. }
  625. }
  626. else {
  627. if (maxatonce==0 || maxatonce > num)
  628. maxatonce = num;
  629. if (maxatonce < num)
  630. {
  631. class cdothread: public Thread
  632. {
  633. public:
  634. Mutex *errmutex;
  635. Semaphore &ready;
  636. IException *&erre;
  637. unsigned idx;
  638. CAsyncFor *self;
  639. cdothread(CAsyncFor *_self,unsigned _idx,Semaphore &_ready,Mutex *_errmutex,IException *&_e)
  640. : Thread("CAsyncFor"),ready(_ready),erre(_e)
  641. {
  642. errmutex =_errmutex;
  643. idx = _idx;
  644. self = _self;
  645. }
  646. int run()
  647. {
  648. try {
  649. self->Do(idx);
  650. }
  651. catch (IException * _e)
  652. {
  653. synchronized block(*errmutex);
  654. if (erre)
  655. _e->Release(); // only return first
  656. else
  657. erre = _e;
  658. }
  659. #ifndef NO_CATCHALL
  660. catch (...)
  661. {
  662. synchronized block(*errmutex);
  663. if (!erre)
  664. erre = MakeStringException(0, "Unknown exception in Thread %s", getName());
  665. }
  666. #endif
  667. ready.signal();
  668. return 0;
  669. }
  670. };
  671. Semaphore ready;
  672. for (i=0;(i<num)&&(i<maxatonce);i++)
  673. ready.signal();
  674. IArrayOf<Thread> started;
  675. started.ensure(num);
  676. for (i=0;i<num;i++) {
  677. ready.wait();
  678. if (abortFollowingException && e) break;
  679. Owned<Thread> thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,&errmutex,e);
  680. thread->start();
  681. started.append(*thread.getClear());
  682. }
  683. ForEachItemIn(idx, started)
  684. {
  685. started.item(idx).join();
  686. }
  687. }
  688. else
  689. {
  690. // Common case of execute all at once can be optimized a little
  691. // Note that shuffle and abortFollowingException are meaningless when executing all at once
  692. class cdothread: public Thread
  693. {
  694. public:
  695. Mutex *errmutex;
  696. IException *&erre;
  697. unsigned idx;
  698. CAsyncFor *self;
  699. cdothread(CAsyncFor *_self,unsigned _idx,Mutex *_errmutex,IException *&_e)
  700. : Thread("CAsyncFor"),erre(_e)
  701. {
  702. errmutex =_errmutex;
  703. idx = _idx;
  704. self = _self;
  705. }
  706. int run()
  707. {
  708. try {
  709. self->Do(idx);
  710. }
  711. catch (IException * _e)
  712. {
  713. synchronized block(*errmutex);
  714. if (erre)
  715. _e->Release(); // only return first
  716. else
  717. erre = _e;
  718. }
  719. #ifndef NO_CATCHALL
  720. catch (...)
  721. {
  722. synchronized block(*errmutex);
  723. if (!erre)
  724. erre = MakeStringException(0, "Unknown exception in Thread %s", getName());
  725. }
  726. #endif
  727. return 0;
  728. }
  729. };
  730. IArrayOf<Thread> started;
  731. started.ensure(num);
  732. for (i=0;i<num-1;i++)
  733. {
  734. Owned<Thread> thread = new cdothread(this,i,&errmutex,e);
  735. thread->start();
  736. started.append(*thread.getClear());
  737. }
  738. Do(num-1);
  739. ForEachItemIn(idx, started)
  740. {
  741. started.item(idx).join();
  742. }
  743. }
  744. }
  745. if (e)
  746. throw e;
  747. }
  748. //---------------------------------------------------------------------------------------------------------------------
  749. class CSimpleFunctionThread : public Thread
  750. {
  751. std::function<void()> func;
  752. public:
  753. inline CSimpleFunctionThread(std::function<void()> _func) : Thread("TaskProcessor"), func(_func) { }
  754. virtual int run()
  755. {
  756. func();
  757. return 1;
  758. }
  759. };
  760. void asyncStart(IThreaded & threaded)
  761. {
  762. CThreaded * thread = new CThreaded("AsyncStart", &threaded);
  763. thread->startRelease();
  764. }
  765. void asyncStart(const char * name, IThreaded & threaded)
  766. {
  767. CThreaded * thread = new CThreaded(name, &threaded);
  768. thread->startRelease();
  769. }
  770. //Experimental - is this a useful function to replace some uses of IThreaded?
  771. void asyncStart(std::function<void()> func)
  772. {
  773. (new CSimpleFunctionThread(func))->startRelease();
  774. }
  775. // ---------------------------------------------------------------------------
  776. // Thread Pools
  777. // ---------------------------------------------------------------------------
  778. class CPooledThreadWrapper;
  779. class CThreadPoolBase
  780. {
  781. public:
  782. virtual ~CThreadPoolBase() {}
  783. protected: friend class CPooledThreadWrapper;
  784. IExceptionHandler *exceptionHandler;
  785. CriticalSection crit;
  786. StringAttr poolname;
  787. int donewaiting;
  788. Semaphore donesem;
  789. PointerArray waitingsems;
  790. UnsignedArray waitingids;
  791. bool stopall;
  792. unsigned defaultmax;
  793. unsigned targetpoolsize;
  794. unsigned delay;
  795. Semaphore availsem;
  796. std::atomic_uint numrunning{0};
  797. virtual void notifyStarted(CPooledThreadWrapper *item)=0;
  798. virtual bool notifyStopped(CPooledThreadWrapper *item)=0;
  799. };
  800. class CPooledThreadWrapper: public Thread
  801. {
  802. PooledThreadHandle handle;
  803. IPooledThread *thread;
  804. Semaphore sem;
  805. CThreadPoolBase &parent;
  806. char *runningname;
  807. public:
  808. CPooledThreadWrapper(CThreadPoolBase &_parent,
  809. PooledThreadHandle _handle,
  810. IPooledThread *_thread) // takes ownership of thread
  811. : Thread(StringBuffer("Member of thread pool: ").append(_parent.poolname).str()), parent(_parent)
  812. {
  813. thread = _thread;
  814. handle = _handle;
  815. runningname = strdup(_parent.poolname);
  816. }
  817. ~CPooledThreadWrapper()
  818. {
  819. thread->Release();
  820. free(runningname);
  821. }
  822. void setName(const char *name) { free(runningname); runningname=strdup(name); }
  823. void setHandle(PooledThreadHandle _handle) { handle = _handle; }
  824. PooledThreadHandle queryHandle() { return handle; }
  825. IPooledThread &queryThread() { return *thread; }
  826. void setThread(IPooledThread *_thread) { thread = _thread; } // takes ownership
  827. bool isStopped() { return (handle==0); }
  828. PooledThreadHandle markStopped()
  829. {
  830. PooledThreadHandle ret=handle;
  831. handle = 0;
  832. if (ret) // JCSMORE - I can't see how handle can not be set if here..
  833. parent.numrunning--;
  834. return ret;
  835. }
  836. void markStarted()
  837. {
  838. parent.numrunning++;
  839. }
  840. int run()
  841. {
  842. do
  843. {
  844. sem.wait();
  845. {
  846. CriticalBlock block(parent.crit); // to synchronize
  847. if (parent.stopall)
  848. break;
  849. }
  850. parent.notifyStarted(this);
  851. try
  852. {
  853. char *&threadname = cthreadname.threadname;
  854. char *temp = threadname; // swap running name and threadname
  855. threadname = runningname;
  856. runningname = temp;
  857. thread->threadmain();
  858. temp = threadname; // and back
  859. threadname = runningname;
  860. runningname = temp;
  861. }
  862. catch (IException *e)
  863. {
  864. char *&threadname = cthreadname.threadname;
  865. char *temp = threadname; // swap back
  866. threadname = runningname;
  867. runningname = temp;
  868. handleException(e);
  869. }
  870. #ifndef NO_CATCHALL
  871. catch (...)
  872. {
  873. char *&threadname = cthreadname.threadname;
  874. char *temp = threadname; // swap back
  875. threadname = runningname;
  876. runningname = temp;
  877. handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
  878. }
  879. #endif
  880. callThreadTerminationHooks(true); // Reset any per-thread state.
  881. } while (parent.notifyStopped(this));
  882. return 0;
  883. }
  884. void cycle()
  885. {
  886. sem.signal();
  887. }
  888. void go(void *param)
  889. {
  890. thread->init(param);
  891. cycle();
  892. }
  893. bool stop()
  894. {
  895. if (handle)
  896. return thread->stop();
  897. return true;
  898. }
  899. void handleException(IException *e)
  900. {
  901. CriticalBlock block(parent.crit);
  902. PrintExceptionLog(e,parent.poolname.get());
  903. if (!parent.exceptionHandler||!parent.exceptionHandler->fireException(e)) {
  904. }
  905. e->Release();
  906. }
  907. };
  908. class CPooledThreadIterator: implements IPooledThreadIterator, public CInterface
  909. {
  910. unsigned current;
  911. public:
  912. IArrayOf<IPooledThread> threads;
  913. IMPLEMENT_IINTERFACE;
  914. CPooledThreadIterator()
  915. {
  916. current = 0;
  917. }
  918. bool first()
  919. {
  920. current = 0;
  921. return threads.isItem(current);
  922. }
  923. bool next()
  924. {
  925. current++;
  926. return threads.isItem(current);
  927. }
  928. bool isValid()
  929. {
  930. return threads.isItem(current);
  931. }
  932. IPooledThread & query()
  933. {
  934. return threads.item(current);
  935. }
  936. };
  937. class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInterface
  938. {
  939. CIArrayOf<CPooledThreadWrapper> threadwrappers;
  940. PooledThreadHandle nextid;
  941. IThreadFactory *factory;
  942. unsigned stacksize;
  943. unsigned timeoutOnRelease;
  944. unsigned traceStartDelayPeriod;
  945. unsigned startsInPeriod;
  946. cycle_t startDelayInPeriod;
  947. CCycleTimer overAllTimer;
  948. PooledThreadHandle _start(void *param,const char *name, bool noBlock, unsigned timeout=0)
  949. {
  950. CCycleTimer startTimer;
  951. bool timedout = defaultmax && !availsem.wait(noBlock ? 0 : (timeout>0?timeout:delay));
  952. PooledThreadHandle ret;
  953. {
  954. CriticalBlock block(crit);
  955. if (timedout)
  956. {
  957. if (!availsem.wait(0)) { // make sure take allocated sem if has become available
  958. if (noBlock || timeout > 0)
  959. throw MakeStringException(0, "No threads available in pool %s", poolname.get());
  960. IWARNLOG("Pool limit exceeded for %s", poolname.get());
  961. }
  962. else
  963. timedout = false;
  964. }
  965. if (traceStartDelayPeriod)
  966. {
  967. ++startsInPeriod;
  968. if (timedout)
  969. {
  970. startDelayInPeriod += startTimer.elapsedCycles();
  971. if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
  972. {
  973. double totalDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod)))/1000000;
  974. double avgDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod/startsInPeriod)))/1000000;
  975. unsigned totalElapsedSecs = overAllTimer.elapsedMs()/1000;
  976. PROGLOG("%s: %u threads started in last %u seconds, total delay = %0.2f milliseconds, average delay = %0.2f milliseconds, currently running = %u", poolname.get(), startsInPeriod, totalElapsedSecs, totalDelayMs, avgDelayMs, runningCount());
  977. startsInPeriod = 0;
  978. startDelayInPeriod = 0;
  979. overAllTimer.reset();
  980. }
  981. }
  982. }
  983. CPooledThreadWrapper &t = allocThread();
  984. if (name)
  985. t.setName(name);
  986. t.go(param);
  987. ret = t.queryHandle();
  988. }
  989. Sleep(0);
  990. return ret;
  991. }
  992. public:
  993. IMPLEMENT_IINTERFACE;
  994. CThreadPool(IThreadFactory *_factory,IExceptionHandler *_exceptionHandler,const char *_poolname,unsigned _defaultmax, unsigned _delay, unsigned _stacksize, unsigned _timeoutOnRelease, unsigned _targetpoolsize)
  995. {
  996. poolname.set(_poolname);
  997. factory = LINK(_factory);
  998. exceptionHandler = _exceptionHandler;
  999. nextid = 1;
  1000. stopall = false;
  1001. defaultmax = _defaultmax;
  1002. delay = _delay;
  1003. if (defaultmax)
  1004. availsem.signal(defaultmax);
  1005. stacksize = _stacksize;
  1006. timeoutOnRelease = _timeoutOnRelease;
  1007. targetpoolsize = _targetpoolsize?_targetpoolsize:defaultmax;
  1008. traceStartDelayPeriod = 0;
  1009. startsInPeriod = 0;
  1010. startDelayInPeriod = 0;
  1011. }
  1012. ~CThreadPool()
  1013. {
  1014. stopAll(true);
  1015. if (!joinAll(true, timeoutOnRelease))
  1016. IWARNLOG("%s; timedout[%d] waiting for threads in pool", poolname.get(), timeoutOnRelease);
  1017. CriticalBlock block(crit);
  1018. bool first=true;
  1019. ForEachItemIn(i,threadwrappers)
  1020. {
  1021. CPooledThreadWrapper &t = threadwrappers.item(i);
  1022. if (!t.isStopped())
  1023. {
  1024. if (first)
  1025. {
  1026. IWARNLOG("Threads still active: ");
  1027. first = false;
  1028. }
  1029. StringBuffer threadInfo;
  1030. PROGLOG("Active thread: %s, info: %s", t.getName(), t.getInfo(threadInfo).str());
  1031. }
  1032. }
  1033. factory->Release();
  1034. }
  1035. CPooledThreadWrapper &allocThread()
  1036. { // called in critical section
  1037. PooledThreadHandle newid=nextid++;
  1038. if (newid==0)
  1039. newid=nextid++;
  1040. ForEachItemIn(i,threadwrappers) {
  1041. CPooledThreadWrapper &it = threadwrappers.item(i);
  1042. if (it.isStopped()) {
  1043. it.setHandle(newid);
  1044. if (!it.queryThread().canReuse()) {
  1045. it.queryThread().Release();
  1046. it.setThread(factory->createNew());
  1047. }
  1048. return it;
  1049. }
  1050. }
  1051. CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew());
  1052. if (stacksize)
  1053. ret.setStackSize(stacksize);
  1054. ret.start();
  1055. threadwrappers.append(ret);
  1056. return ret;
  1057. }
  1058. CPooledThreadWrapper *findThread(PooledThreadHandle handle)
  1059. { // called in critical section
  1060. ForEachItemIn(i,threadwrappers) {
  1061. CPooledThreadWrapper &it = threadwrappers.item(i);
  1062. if (it.queryHandle()==handle)
  1063. return &it;
  1064. }
  1065. return NULL;
  1066. }
  1067. PooledThreadHandle startNoBlock(void *param)
  1068. {
  1069. return _start(param, NULL, true);
  1070. }
  1071. PooledThreadHandle startNoBlock(void *param,const char *name)
  1072. {
  1073. return _start(param, name, true);
  1074. }
  1075. PooledThreadHandle start(void *param)
  1076. {
  1077. return _start(param, NULL, false);
  1078. }
  1079. PooledThreadHandle start(void *param,const char *name)
  1080. {
  1081. return _start(param, name, false);
  1082. }
  1083. PooledThreadHandle start(void *param,const char *name, unsigned timeout)
  1084. {
  1085. return _start(param, name, false, timeout);
  1086. }
  1087. bool stop(PooledThreadHandle handle)
  1088. {
  1089. CriticalBlock block(crit);
  1090. CPooledThreadWrapper *t = findThread(handle);
  1091. if (t)
  1092. return t->stop();
  1093. return true; // already stopped
  1094. }
  1095. bool stopAll(bool tryall=false)
  1096. {
  1097. availsem.signal(1000);
  1098. availsem.wait();
  1099. CriticalBlock block(crit);
  1100. bool ret=true;
  1101. ForEachItemIn(i,threadwrappers) {
  1102. CPooledThreadWrapper &it = threadwrappers.item(i);
  1103. if (!it.stop()) {
  1104. ret = false;
  1105. if (!tryall)
  1106. break;
  1107. }
  1108. }
  1109. return ret;
  1110. }
  1111. bool joinWait(CPooledThreadWrapper &t,unsigned timeout)
  1112. {
  1113. // called in critical section
  1114. if (t.isStopped())
  1115. return true;
  1116. Semaphore sem;
  1117. waitingsems.append(&sem);
  1118. waitingids.append(t.queryHandle());
  1119. crit.leave();
  1120. bool ret = sem.wait(timeout);
  1121. crit.enter();
  1122. unsigned i = waitingsems.find(&sem);
  1123. if (i!=NotFound) {
  1124. waitingids.remove(i);
  1125. waitingsems.remove(i);
  1126. }
  1127. return ret;
  1128. }
  1129. bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)
  1130. {
  1131. CriticalBlock block(crit);
  1132. CPooledThreadWrapper *t = findThread(handle);
  1133. if (!t)
  1134. return true; // already stopped
  1135. return joinWait(*t,timeout);
  1136. }
  1137. virtual bool joinAll(bool del,unsigned timeout=INFINITE)
  1138. { // note timeout is for each join
  1139. CriticalBlock block(crit);
  1140. CIArrayOf<CPooledThreadWrapper> tojoin;
  1141. ForEachItemIn(i1,threadwrappers) {
  1142. CPooledThreadWrapper &it = threadwrappers.item(i1);
  1143. it.Link();
  1144. tojoin.append(it);
  1145. }
  1146. ForEachItemIn(i2,tojoin)
  1147. if (!joinWait(tojoin.item(i2),timeout))
  1148. return false;
  1149. if (del) {
  1150. stopall = true;
  1151. ForEachItemIn(i3,tojoin)
  1152. tojoin.item(i3).cycle();
  1153. {
  1154. CriticalUnblock unblock(crit);
  1155. ForEachItemIn(i4,tojoin)
  1156. tojoin.item(i4).join();
  1157. }
  1158. threadwrappers.kill();
  1159. stopall = false;
  1160. }
  1161. return true;
  1162. }
  1163. IPooledThreadIterator *running()
  1164. {
  1165. CriticalBlock block(crit);
  1166. CPooledThreadIterator *ret = new CPooledThreadIterator;
  1167. ForEachItemIn(i,threadwrappers) {
  1168. CPooledThreadWrapper &it = threadwrappers.item(i);
  1169. if (!it.isStopped()) {
  1170. IPooledThread &t = it.queryThread();
  1171. t.Link();
  1172. ret->threads.append(t);
  1173. }
  1174. }
  1175. return ret;
  1176. }
  1177. unsigned runningCount()
  1178. {
  1179. return numrunning;
  1180. }
  1181. void notifyStarted(CPooledThreadWrapper *item)
  1182. {
  1183. item->markStarted();
  1184. }
  1185. bool notifyStopped(CPooledThreadWrapper *item)
  1186. {
  1187. CriticalBlock block(crit);
  1188. PooledThreadHandle myid = item->markStopped();
  1189. ForEachItemIn(i1,waitingids) { // tell anyone waiting
  1190. if (waitingids.item(i1)==myid)
  1191. ((Semaphore *)waitingsems.item(i1))->signal();
  1192. }
  1193. bool ret = true;
  1194. if (defaultmax) {
  1195. unsigned n=threadwrappers.ordinality();
  1196. for (unsigned i2=targetpoolsize;i2<n;i2++) { // only check excess for efficiency
  1197. if (item==&threadwrappers.item(i2)) {
  1198. threadwrappers.remove(i2);
  1199. ret = false;
  1200. break;
  1201. }
  1202. }
  1203. availsem.signal();
  1204. }
  1205. return ret;
  1206. }
  1207. void setStartDelayTracing(unsigned secs)
  1208. {
  1209. traceStartDelayPeriod = secs;
  1210. }
  1211. bool waitAvailable(unsigned timeout)
  1212. {
  1213. if (!defaultmax)
  1214. return true;
  1215. if (availsem.wait(timeout))
  1216. {
  1217. availsem.signal();
  1218. return true;
  1219. }
  1220. return false;
  1221. }
  1222. };
  1223. IThreadPool *createThreadPool(const char *poolname,IThreadFactory *factory,IExceptionHandler *exceptionHandler,unsigned defaultmax, unsigned delay, unsigned stacksize, unsigned timeoutOnRelease, unsigned targetpoolsize)
  1224. {
  1225. return new CThreadPool(factory,exceptionHandler,poolname,defaultmax,delay,stacksize,timeoutOnRelease,targetpoolsize);
  1226. }
  1227. //=======================================================================================================
  1228. static void CheckAllowedProgram(const char *prog,const char *allowed)
  1229. {
  1230. if (!prog||!allowed||(strcmp(allowed,"*")==0))
  1231. return;
  1232. StringBuffer head;
  1233. bool inq = false;
  1234. // note don't have to be too worried about odd quoting as matching fixed list
  1235. while (*prog&&((*prog!=' ')||inq)) {
  1236. if (*prog=='"')
  1237. inq = !inq;
  1238. head.append(*(prog++));
  1239. }
  1240. StringArray list;
  1241. list.appendList(allowed, ",");
  1242. ForEachItemIn(i,list) {
  1243. if (WildMatch(head.str(),list.item(i)))
  1244. return;
  1245. }
  1246. AERRLOG("Unauthorized pipe program(%s)",head.str());
  1247. throw MakeStringException(-1,"Unauthorized pipe program(%s)",head.str());
  1248. }
  1249. class CPipeProcessException : public CSimpleInterfaceOf<IPipeProcessException>
  1250. {
  1251. int errCode;
  1252. StringAttr msg;
  1253. MessageAudience audience;
  1254. public:
  1255. CPipeProcessException(int _errCode, const char *_msg, MessageAudience _audience = MSGAUD_user) : errCode(_errCode), msg(_msg), audience(_audience)
  1256. {
  1257. }
  1258. virtual int errorCode() const override { return errCode; }
  1259. virtual StringBuffer & errorMessage(StringBuffer &str) const override
  1260. {
  1261. if (msg)
  1262. str.append(msg).append(", ");
  1263. return str.append(strerror(errCode));
  1264. }
  1265. MessageAudience errorAudience() const { return audience; }
  1266. };
  1267. IPipeProcessException *createPipeErrnoException(int code, const char *msg)
  1268. {
  1269. return new CPipeProcessException(code, msg);
  1270. }
  1271. IPipeProcessException *createPipeErrnoExceptionV(int code, const char *msg, ...)
  1272. {
  1273. StringBuffer eStr;
  1274. va_list args;
  1275. va_start(args, msg);
  1276. eStr.limited_valist_appendf(1024, msg, args);
  1277. va_end(args);
  1278. return new CPipeProcessException(code, eStr.str());
  1279. }
  1280. class CSimplePipeStream: implements ISimpleReadStream, public CInterface
  1281. {
  1282. public:
  1283. IMPLEMENT_IINTERFACE;
  1284. CSimplePipeStream(IPipeProcess *_pipe, bool _isStderr) : pipe(_pipe), isStderr(_isStderr) {}
  1285. virtual size32_t read(size32_t sz, void * data)
  1286. {
  1287. if (isStderr)
  1288. return pipe->readError(sz, data);
  1289. else
  1290. return pipe->read(sz, data);
  1291. }
  1292. private:
  1293. Owned<IPipeProcess> pipe;
  1294. bool isStderr;
  1295. };
  1296. #ifdef _WIN32
  1297. class CWindowsPipeProcess: implements IPipeProcess, public CInterface
  1298. {
  1299. HANDLE pipeProcess;
  1300. HANDLE hInput;
  1301. HANDLE hOutput;
  1302. HANDLE hError;
  1303. StringAttr title;
  1304. unsigned retcode;
  1305. CriticalSection sect;
  1306. bool aborted;
  1307. StringAttr allowedprogs;
  1308. StringArray envVars;
  1309. StringArray envValues;
  1310. public:
  1311. IMPLEMENT_IINTERFACE;
  1312. CWindowsPipeProcess(const char *_allowedprogs)
  1313. : allowedprogs(_allowedprogs)
  1314. {
  1315. pipeProcess = (HANDLE)-1;
  1316. hInput=(HANDLE)-1;
  1317. hOutput=(HANDLE)-1;
  1318. hError=(HANDLE)-1;
  1319. retcode = (unsigned)-1;
  1320. aborted = false;
  1321. }
  1322. ~CWindowsPipeProcess()
  1323. {
  1324. kill();
  1325. }
  1326. void kill()
  1327. {
  1328. doCloseInput();
  1329. doCloseOutput();
  1330. doCloseError();
  1331. if (pipeProcess != (HANDLE)-1) {
  1332. CloseHandle(pipeProcess);
  1333. pipeProcess = (HANDLE)-1;
  1334. }
  1335. }
  1336. bool run(const char *_title,const char *prog,const char *dir,bool hasinput,bool hasoutput,bool haserror, size32_t stderrbufsize,bool newProcessGroup)
  1337. {
  1338. // size32_t stderrbufsize ignored as not required (I think)
  1339. CriticalBlock block(sect);
  1340. kill();
  1341. title.clear();
  1342. if (_title) {
  1343. title.set(_title);
  1344. PROGLOG("%s: Creating PIPE process : %s", title.get(), prog);
  1345. }
  1346. CheckAllowedProgram(prog,allowedprogs);
  1347. SECURITY_ATTRIBUTES sa;
  1348. sa.nLength = sizeof(SECURITY_ATTRIBUTES);
  1349. sa.bInheritHandle = TRUE;
  1350. sa.lpSecurityDescriptor = NULL;
  1351. HANDLE hProgOutput=(HANDLE)-1;
  1352. HANDLE hProgInput=(HANDLE)-1;
  1353. HANDLE hProgError=(HANDLE)-1;
  1354. HANDLE h;
  1355. //NB: Create a pipe handles that are not inherited our end
  1356. if (hasinput) {
  1357. CreatePipe(&hProgInput,&h,&sa,0);
  1358. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hInput, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1359. CloseHandle(h);
  1360. }
  1361. if (hasoutput) {
  1362. CreatePipe(&h,&hProgOutput,&sa,0);
  1363. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hOutput, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1364. CloseHandle(h);
  1365. }
  1366. if (haserror) {
  1367. CreatePipe(&h,&hProgError,&sa,0);
  1368. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hError, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1369. CloseHandle(h);
  1370. }
  1371. STARTUPINFO StartupInfo;
  1372. _clear(StartupInfo);
  1373. StartupInfo.cb = sizeof(StartupInfo);
  1374. StartupInfo.wShowWindow = SW_HIDE;
  1375. StartupInfo.dwFlags = STARTF_USESTDHANDLES|STARTF_USESHOWWINDOW ;
  1376. StartupInfo.hStdOutput = hasoutput?hProgOutput:GetStdHandle(STD_OUTPUT_HANDLE);
  1377. StartupInfo.hStdError = haserror?hProgError:GetStdHandle(STD_ERROR_HANDLE);
  1378. StartupInfo.hStdInput = hasinput?hProgInput:GetStdHandle(STD_INPUT_HANDLE);
  1379. PROCESS_INFORMATION ProcessInformation;
  1380. // MORE - should create a new environment block that is copy of parent's, then set all the values in envVars/envValues, and pass it
  1381. if (!CreateProcess(NULL, (char *)prog, NULL,NULL,TRUE,0,NULL, dir&&*dir?dir:NULL, &StartupInfo,&ProcessInformation)) {
  1382. if (_title) {
  1383. StringBuffer errstr;
  1384. formatSystemError(errstr, GetLastError());
  1385. OERRLOG("%s: PIPE process '%s' failed: %s", title.get(), prog, errstr.str());
  1386. }
  1387. return false;
  1388. }
  1389. pipeProcess = ProcessInformation.hProcess;
  1390. CloseHandle(ProcessInformation.hThread);
  1391. if (hasoutput)
  1392. CloseHandle(hProgOutput);
  1393. if (hasinput)
  1394. CloseHandle(hProgInput);
  1395. if (haserror)
  1396. CloseHandle(hProgError);
  1397. return true;
  1398. }
  1399. virtual void setenv(const char *var, const char *value)
  1400. {
  1401. assertex(var);
  1402. if (!value)
  1403. value = "";
  1404. envVars.append(var);
  1405. envValues.append(value);
  1406. }
  1407. size32_t read(size32_t sz, void *buf)
  1408. {
  1409. DWORD sizeRead;
  1410. if (!ReadFile(hOutput, buf, sz, &sizeRead, NULL)) {
  1411. //raise error here
  1412. if(aborted)
  1413. return 0;
  1414. int err=GetLastError();
  1415. switch(err)
  1416. {
  1417. case ERROR_HANDLE_EOF:
  1418. case ERROR_BROKEN_PIPE:
  1419. case ERROR_NO_DATA:
  1420. return 0;
  1421. default:
  1422. aborted = true;
  1423. IException *e = makeOsExceptionV(err, "Pipe: ReadFile failed (size %d)", sz);
  1424. PrintExceptionLog(e, NULL);
  1425. throw e;
  1426. }
  1427. }
  1428. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1429. }
  1430. ISimpleReadStream *getOutputStream()
  1431. {
  1432. return new CSimplePipeStream(LINK(this), false);
  1433. }
  1434. size32_t readError(size32_t sz, void *buf)
  1435. {
  1436. DWORD sizeRead;
  1437. if (!ReadFile(hError, buf, sz, &sizeRead, NULL)) {
  1438. //raise error here
  1439. if(aborted)
  1440. return 0;
  1441. int err=GetLastError();
  1442. switch(err)
  1443. {
  1444. case ERROR_HANDLE_EOF:
  1445. case ERROR_BROKEN_PIPE:
  1446. case ERROR_NO_DATA:
  1447. return 0;
  1448. default:
  1449. aborted = true;
  1450. IException *e = makeOsExceptionV(err, "Pipe: ReadError failed (size %d)", sz);
  1451. PrintExceptionLog(e, NULL);
  1452. throw e;
  1453. }
  1454. }
  1455. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1456. }
  1457. ISimpleReadStream *getErrorStream()
  1458. {
  1459. return new CSimplePipeStream(LINK(this), true);
  1460. }
  1461. size32_t write(size32_t sz, const void *buf)
  1462. {
  1463. DWORD sizeWritten;
  1464. if (!WriteFile(hInput, buf, sz, &sizeWritten, NULL)) {
  1465. int err=GetLastError();
  1466. if ((err==ERROR_HANDLE_EOF)||aborted)
  1467. sizeWritten = 0;
  1468. else {
  1469. IException *e = makeOsExceptionV(err, "Pipe: WriteFile failed (size %d)", sz);
  1470. PrintExceptionLog(e, NULL);
  1471. throw e;
  1472. }
  1473. }
  1474. return aborted?((size32_t)-1):((size32_t)sizeWritten);
  1475. }
  1476. unsigned wait()
  1477. {
  1478. CriticalBlock block(sect);
  1479. if (pipeProcess != (HANDLE)-1) {
  1480. if (title.length())
  1481. PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
  1482. {
  1483. CriticalUnblock unblock(sect);
  1484. WaitForSingleObject(pipeProcess, INFINITE);
  1485. }
  1486. if (pipeProcess != (HANDLE)-1) {
  1487. GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
  1488. CloseHandle(pipeProcess);
  1489. pipeProcess = (HANDLE)-1;
  1490. }
  1491. if (title.length())
  1492. PROGLOG("%s: Pipe: process complete",title.get());
  1493. }
  1494. return retcode;
  1495. }
  1496. unsigned wait(unsigned timeoutms, bool &timedout)
  1497. {
  1498. CriticalBlock block(sect);
  1499. timedout = false;
  1500. if (pipeProcess != (HANDLE)-1) {
  1501. if (title.length())
  1502. PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
  1503. {
  1504. CriticalUnblock unblock(sect);
  1505. if (WaitForSingleObject(pipeProcess, timeoutms)!=WAIT_OBJECT_0) {
  1506. timedout = true;
  1507. return retcode;
  1508. }
  1509. }
  1510. if (pipeProcess != (HANDLE)-1) {
  1511. GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
  1512. CloseHandle(pipeProcess);
  1513. pipeProcess = (HANDLE)-1;
  1514. }
  1515. if (title.length())
  1516. PROGLOG("%s: Pipe: process complete",title.get());
  1517. }
  1518. return retcode;
  1519. }
  1520. void notifyTerminated(HANDLE pid,unsigned _retcode)
  1521. {
  1522. CriticalBlock block(sect);
  1523. if ((pid!=(HANDLE)-1)&&(pid==pipeProcess)) {
  1524. retcode = _retcode;
  1525. pipeProcess = (HANDLE)-1;
  1526. }
  1527. }
  1528. void doCloseInput()
  1529. {
  1530. CriticalBlock block(sect);
  1531. if (hInput != (HANDLE)-1) {
  1532. CloseHandle(hInput);
  1533. hInput = (HANDLE)-1;
  1534. }
  1535. }
  1536. void doCloseOutput()
  1537. {
  1538. CriticalBlock block(sect);
  1539. if (hOutput != (HANDLE)-1) {
  1540. CloseHandle(hOutput);
  1541. hOutput = (HANDLE)-1;
  1542. }
  1543. }
  1544. void doCloseError()
  1545. {
  1546. CriticalBlock block(sect);
  1547. if (hError != (HANDLE)-1) {
  1548. CloseHandle(hError);
  1549. hError = (HANDLE)-1;
  1550. }
  1551. }
  1552. void closeInput()
  1553. {
  1554. doCloseInput();
  1555. }
  1556. void closeOutput()
  1557. {
  1558. doCloseOutput();
  1559. }
  1560. void closeError()
  1561. {
  1562. doCloseError();
  1563. }
  1564. void abort()
  1565. {
  1566. CriticalBlock block(sect);
  1567. if (pipeProcess != (HANDLE)-1) {
  1568. if (title.length())
  1569. PROGLOG("%s: Pipe Aborting",title.get());
  1570. aborted = true;
  1571. //doCloseOutput(); // seems to work better without this
  1572. doCloseInput();
  1573. {
  1574. CriticalUnblock unblock(sect);
  1575. Sleep(100);
  1576. }
  1577. try { // this code is problematic for some reason
  1578. if (pipeProcess != (HANDLE)-1) {
  1579. TerminateProcess(pipeProcess, 255);
  1580. CloseHandle(pipeProcess);
  1581. pipeProcess = (HANDLE)-1;
  1582. }
  1583. }
  1584. catch (...) {
  1585. // ignore errors
  1586. }
  1587. if (title.length())
  1588. PROGLOG("%s: Pipe Aborted",title.get());
  1589. }
  1590. }
  1591. bool hasInput()
  1592. {
  1593. return hInput!=(HANDLE)-1;
  1594. }
  1595. bool hasOutput()
  1596. {
  1597. return hOutput!=(HANDLE)-1;
  1598. }
  1599. bool hasError()
  1600. {
  1601. return hError!=(HANDLE)-1;
  1602. }
  1603. HANDLE getProcessHandle()
  1604. {
  1605. return pipeProcess;
  1606. }
  1607. };
  1608. IPipeProcess *createPipeProcess(const char *allowedprogs)
  1609. {
  1610. return new CWindowsPipeProcess(allowedprogs);
  1611. }
  1612. #else
  1613. class CIgnoreSIGPIPE
  1614. {
  1615. public:
  1616. CIgnoreSIGPIPE()
  1617. {
  1618. oact.sa_handler = SIG_IGN;
  1619. struct sigaction act;
  1620. sigset_t blockset;
  1621. sigemptyset(&blockset);
  1622. act.sa_mask = blockset;
  1623. act.sa_handler = SIG_IGN;
  1624. act.sa_flags = 0;
  1625. sigaction(SIGPIPE, &act, &oact);
  1626. }
  1627. ~CIgnoreSIGPIPE()
  1628. {
  1629. if (oact.sa_handler != SIG_IGN)
  1630. sigaction(SIGPIPE, &oact, NULL);
  1631. }
  1632. private:
  1633. struct sigaction oact;
  1634. };
  1635. #define WHITESPACE " \t\n\r"
  1636. static unsigned dowaitpid(HANDLE pid, int mode)
  1637. {
  1638. while (pid != (HANDLE)-1) {
  1639. int stat=-1;
  1640. int ret = waitpid(pid, &stat, mode);
  1641. if (ret>0)
  1642. {
  1643. if (WIFEXITED(stat))
  1644. return WEXITSTATUS(stat);
  1645. else if (WIFSIGNALED(stat))
  1646. {
  1647. OERRLOG("Program was terminated by signal %u", (unsigned) WTERMSIG(stat));
  1648. if (WTERMSIG(stat)==SIGPIPE)
  1649. return 0;
  1650. return 254;
  1651. }
  1652. else
  1653. {
  1654. return 254;
  1655. }
  1656. }
  1657. if (ret==0)
  1658. break;
  1659. int err = errno;
  1660. if (err == ECHILD)
  1661. break;
  1662. if (err!=EINTR) {
  1663. OERRLOG("dowait failed with errcode %d",err);
  1664. return (unsigned)-1;
  1665. }
  1666. }
  1667. return 0;
  1668. }
  1669. static CriticalSection runsect; // single thread process start to avoid forked handle open/closes interleaving
  1670. class CLinuxPipeProcess: implements IPipeProcess, public CInterface
  1671. {
  1672. class cForkThread: public Thread
  1673. {
  1674. CLinuxPipeProcess *parent;
  1675. public:
  1676. cForkThread(CLinuxPipeProcess *_parent)
  1677. {
  1678. parent = _parent;
  1679. }
  1680. int run()
  1681. {
  1682. parent->run();
  1683. return 0;
  1684. }
  1685. };
  1686. Owned<cForkThread> forkthread;
  1687. class cStdErrorBufferThread: public Thread
  1688. {
  1689. MemoryAttr buf;
  1690. size32_t bufsize;
  1691. Semaphore stopsem;
  1692. CriticalSection &sect;
  1693. int &hError;
  1694. public:
  1695. cStdErrorBufferThread(size32_t maxbufsize,int &_hError,CriticalSection &_sect)
  1696. : sect(_sect), hError(_hError)
  1697. {
  1698. buf.allocate(maxbufsize);
  1699. bufsize = 0;
  1700. }
  1701. int run()
  1702. {
  1703. while (!stopsem.wait(1000)) {
  1704. CriticalBlock block(sect);
  1705. if (hError!=(HANDLE)-1) { // hmm who did that
  1706. fcntl(hError,F_SETFL,O_NONBLOCK); // make sure non-blocking
  1707. if (bufsize<buf.length()) {
  1708. size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
  1709. if ((int)sizeRead>0) {
  1710. bufsize += sizeRead;
  1711. }
  1712. }
  1713. else { // flush (to avoid process blocking)
  1714. byte tmp[1024];
  1715. size32_t totsz = 0;
  1716. for (unsigned i=0;i<1024;i++) {
  1717. size32_t sz = (size32_t)::read(hError, tmp, sizeof(tmp));
  1718. if ((int)sz<=0)
  1719. break;
  1720. totsz+=sz;
  1721. }
  1722. if (totsz)
  1723. IWARNLOG("Lost %d bytes of stderr output",totsz);
  1724. }
  1725. }
  1726. }
  1727. if (hError!=(HANDLE)-1) { // hmm who did that
  1728. fcntl(hError,F_SETFL,0); // read any remaining data in blocking mode
  1729. while (bufsize<buf.length()) {
  1730. size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
  1731. if ((int)sizeRead>0)
  1732. bufsize += sizeRead;
  1733. else
  1734. break;
  1735. }
  1736. }
  1737. return 0;
  1738. }
  1739. void stop()
  1740. {
  1741. stopsem.signal();
  1742. Thread::join();
  1743. }
  1744. size32_t read(size32_t sz,void *out)
  1745. {
  1746. CriticalBlock block(sect);
  1747. if (bufsize<sz)
  1748. sz = bufsize;
  1749. if (sz>0) {
  1750. memcpy(out,buf.bufferBase(),sz);
  1751. if (sz!=bufsize) {
  1752. bufsize -= sz;
  1753. memmove(buf.bufferBase(),(byte *)buf.bufferBase()+sz,bufsize); // not ideal but hopefully not large
  1754. }
  1755. else
  1756. bufsize = 0;
  1757. }
  1758. return sz;
  1759. }
  1760. } *stderrbufferthread;
  1761. protected: friend class PipeWriterThread;
  1762. HANDLE pipeProcess;
  1763. HANDLE hInput;
  1764. HANDLE hOutput;
  1765. HANDLE hError;
  1766. bool hasinput;
  1767. bool hasoutput;
  1768. bool haserror;
  1769. bool newProcessGroup;
  1770. StringAttr title;
  1771. StringAttr cmd;
  1772. StringAttr prog;
  1773. StringAttr dir;
  1774. int retcode;
  1775. CriticalSection sect;
  1776. Semaphore started;
  1777. bool aborted;
  1778. MemoryBuffer stderrbuf;
  1779. size32_t stderrbufsize;
  1780. StringAttr allowedprogs;
  1781. StringArray envVars;
  1782. StringArray envValues;
  1783. void clearUtilityThreads(bool clearStderr)
  1784. {
  1785. Owned<cForkThread> ft;
  1786. cStdErrorBufferThread *et;
  1787. {
  1788. CriticalBlock block(sect); // clear forkthread and optionally stderrbufferthread
  1789. ft.setown(forkthread.getClear());
  1790. et = stderrbufferthread;
  1791. if (clearStderr)
  1792. stderrbufferthread = nullptr;
  1793. }
  1794. if (ft)
  1795. {
  1796. ft->join();
  1797. ft.clear();
  1798. }
  1799. if (et)
  1800. {
  1801. et->stop();
  1802. if (clearStderr)
  1803. delete et;
  1804. }
  1805. }
  1806. public:
  1807. IMPLEMENT_IINTERFACE;
  1808. CLinuxPipeProcess(const char *_allowedprogs)
  1809. : allowedprogs(_allowedprogs)
  1810. {
  1811. pipeProcess = (HANDLE)-1;
  1812. hInput=(HANDLE)-1;
  1813. hOutput=(HANDLE)-1;
  1814. hError=(HANDLE)-1;
  1815. retcode = -1;
  1816. aborted = false;
  1817. stderrbufferthread = NULL;
  1818. newProcessGroup = false;
  1819. }
  1820. ~CLinuxPipeProcess()
  1821. {
  1822. kill();
  1823. }
  1824. void kill()
  1825. {
  1826. closeInput();
  1827. closeOutput();
  1828. closeError();
  1829. clearUtilityThreads(true);
  1830. }
  1831. char **splitargs(const char *line,unsigned &argc)
  1832. {
  1833. char *buf = strdup(line);
  1834. // first count params (this probably could be improved)
  1835. char *s = buf;
  1836. argc = 0;
  1837. while (readarg(s))
  1838. argc++;
  1839. free(buf);
  1840. size32_t l = strlen(line)+1;
  1841. size32_t al = (argc+1)*sizeof(char *);
  1842. char **argv = (char **)malloc(al+l);
  1843. argv[argc] = NULL;
  1844. s = ((char *)argv)+al;
  1845. memcpy(s,line,l);
  1846. for (unsigned i=0;i<argc;i++)
  1847. argv[i] = readarg(s);
  1848. return argv;
  1849. }
  1850. void run()
  1851. {
  1852. int inpipe[2];
  1853. int outpipe[2];
  1854. int errpipe[2];
  1855. if ((hasinput && (::pipe(inpipe)==-1)) ||
  1856. (hasoutput && (::pipe(outpipe)==-1)) ||
  1857. (haserror && (::pipe(errpipe)==-1)))
  1858. {
  1859. retcode = START_FAILURE;
  1860. started.signal();
  1861. throw makeOsException(errno);
  1862. }
  1863. /* NB: Important to call splitargs (which calls malloc) before the fork()
  1864. * and not in the child process. Because performing malloc in the child
  1865. * process, which then calls exec() can cause problems for TBB malloc proxy.
  1866. */
  1867. unsigned argc;
  1868. char **argv=splitargs(prog,argc);
  1869. for (;;)
  1870. {
  1871. pipeProcess = (HANDLE)fork();
  1872. if (pipeProcess!=(HANDLE)-1)
  1873. break;
  1874. if (errno!=EAGAIN) {
  1875. if (hasinput) {
  1876. close(inpipe[0]);
  1877. close(inpipe[1]);
  1878. }
  1879. if (hasoutput) {
  1880. close(outpipe[0]);
  1881. close(outpipe[1]);
  1882. }
  1883. if (haserror) {
  1884. close(errpipe[0]);
  1885. close(errpipe[1]);
  1886. }
  1887. retcode = START_FAILURE;
  1888. started.signal();
  1889. free(argv);
  1890. return;
  1891. }
  1892. }
  1893. if (pipeProcess==0) { // child
  1894. if (newProcessGroup)//Force the child process into its own process group, so we can terminate it and its children.
  1895. setpgid(0,0);
  1896. if (hasinput) {
  1897. dup2(inpipe[0],0);
  1898. close(inpipe[0]);
  1899. close(inpipe[1]);
  1900. }
  1901. if (hasoutput) {
  1902. dup2(outpipe[1],1);
  1903. close(outpipe[0]);
  1904. close(outpipe[1]);
  1905. }
  1906. if (haserror) {
  1907. dup2(errpipe[1],2);
  1908. close(errpipe[0]);
  1909. close(errpipe[1]);
  1910. }
  1911. if (dir.get()) {
  1912. if (chdir(dir) == -1)
  1913. throw MakeStringException(-1, "CLinuxPipeProcess::run: could not change dir to %s", dir.get());
  1914. }
  1915. ForEachItemIn(idx, envVars)
  1916. {
  1917. ::setenv(envVars.item(idx), envValues.item(idx), 1);
  1918. }
  1919. execvp(argv[0],argv);
  1920. if (haserror)
  1921. {
  1922. Owned<IException> e = createPipeErrnoExceptionV(errno, "exec failed: %s", prog.get());
  1923. StringBuffer eStr;
  1924. fprintf(stderr, "ERROR: %d: %s", e->errorCode(), e->errorMessage(eStr).str());
  1925. fflush(stderr);
  1926. }
  1927. _exit(START_FAILURE); // must be _exit!!
  1928. }
  1929. free(argv);
  1930. if (hasinput)
  1931. close(inpipe[0]);
  1932. if (hasoutput)
  1933. close(outpipe[1]);
  1934. if (haserror)
  1935. close(errpipe[1]);
  1936. hInput = hasinput?inpipe[1]:((HANDLE)-1);
  1937. hOutput = hasoutput?outpipe[0]:((HANDLE)-1);
  1938. hError = haserror?errpipe[0]:((HANDLE)-1);
  1939. started.signal();
  1940. retcode = dowaitpid(pipeProcess, 0);
  1941. if (retcode==START_FAILURE)
  1942. closeOutput();
  1943. }
  1944. bool run(const char *_title,const char *_prog,const char *_dir,bool _hasinput,bool _hasoutput, bool _haserror, size32_t stderrbufsize, bool _newProcessGroup)
  1945. {
  1946. CriticalBlock runblock(runsect);
  1947. kill();
  1948. CriticalBlock block(sect);
  1949. hasinput = _hasinput;
  1950. hasoutput = _hasoutput;
  1951. haserror = _haserror;
  1952. newProcessGroup = _newProcessGroup;
  1953. title.clear();
  1954. prog.set(_prog);
  1955. dir.set(_dir);
  1956. if (_title)
  1957. {
  1958. title.set(_title);
  1959. PROGLOG("%s: Creating PIPE program process : '%s' - hasinput=%d, hasoutput=%d stderrbufsize=%d", title.get(), prog.get(),(int)hasinput, (int)hasoutput, stderrbufsize);
  1960. }
  1961. CheckAllowedProgram(prog,allowedprogs);
  1962. retcode = 0;
  1963. if (forkthread)
  1964. {
  1965. {
  1966. CriticalUnblock unblock(sect);
  1967. forkthread->join();
  1968. }
  1969. forkthread.clear();
  1970. }
  1971. forkthread.setown(new cForkThread(this));
  1972. forkthread->start();
  1973. bool joined = false;
  1974. {
  1975. CriticalUnblock unblock(sect);
  1976. started.wait();
  1977. joined = forkthread->join(50); // give a chance to fail
  1978. }
  1979. // only check retcode if we were able to join
  1980. if ( (joined) && (retcode==START_FAILURE) )
  1981. {
  1982. DBGLOG("%s: PIPE process '%s' failed to start", title.get()?title.get():"CLinuxPipeProcess", prog.get());
  1983. forkthread.clear();
  1984. return false;
  1985. }
  1986. if (stderrbufsize)
  1987. {
  1988. if (stderrbufferthread)
  1989. {
  1990. stderrbufferthread->stop();
  1991. delete stderrbufferthread;
  1992. }
  1993. stderrbufferthread = new cStdErrorBufferThread(stderrbufsize,hError,sect);
  1994. stderrbufferthread->start();
  1995. }
  1996. return true;
  1997. }
  1998. virtual void setenv(const char *var, const char *value)
  1999. {
  2000. assertex(var);
  2001. if (!value)
  2002. value = "";
  2003. envVars.append(var);
  2004. envValues.append(value);
  2005. }
  2006. size32_t read(size32_t sz, void *buf)
  2007. {
  2008. CriticalBlock block(sect);
  2009. if (aborted)
  2010. return (size32_t)-1;
  2011. if (hOutput==(HANDLE)-1)
  2012. return 0;
  2013. size32_t sizeRead;
  2014. for (;;) {
  2015. {
  2016. CriticalUnblock unblock(sect);
  2017. sizeRead = (size32_t)::read(hOutput, buf, sz);
  2018. }
  2019. if (sizeRead!=(size32_t)-1)
  2020. break;
  2021. if (aborted)
  2022. break;
  2023. if (errno!=EINTR) {
  2024. aborted = true;
  2025. throw createPipeErrnoExceptionV(errno,"Pipe: read failed (size %d)", sz);
  2026. }
  2027. }
  2028. return aborted?((size32_t)-1):((size32_t)sizeRead);
  2029. }
  2030. ISimpleReadStream *getOutputStream()
  2031. {
  2032. return new CSimplePipeStream(LINK(this), false);
  2033. }
  2034. size32_t write(size32_t sz, const void *buf)
  2035. {
  2036. CriticalBlock block(sect);
  2037. CIgnoreSIGPIPE ignoresigpipe;
  2038. if (aborted)
  2039. return (size32_t)-1;
  2040. if (hInput==(HANDLE)-1)
  2041. return 0;
  2042. size32_t sizeWritten;
  2043. for (;;) {
  2044. {
  2045. CriticalUnblock unblock(sect);
  2046. sizeWritten = (size32_t)::write(hInput, buf, sz);
  2047. }
  2048. if (sizeWritten!=(size32_t)-1)
  2049. break;
  2050. if (aborted)
  2051. break;
  2052. if (errno!=EINTR) {
  2053. throw createPipeErrnoExceptionV(errno, "Pipe: write failed (size %d)", sz);
  2054. }
  2055. }
  2056. return aborted?((size32_t)-1):((size32_t)sizeWritten);
  2057. }
  2058. size32_t readError(size32_t sz, void *buf)
  2059. {
  2060. CriticalBlock block(sect);
  2061. if (stderrbufferthread)
  2062. return stderrbufferthread->read(sz,buf);
  2063. if (aborted)
  2064. return (size32_t)-1;
  2065. if (hError==(HANDLE)-1)
  2066. return 0;
  2067. size32_t sizeRead;
  2068. for (;;) {
  2069. {
  2070. CriticalUnblock unblock(sect);
  2071. sizeRead = (size32_t)::read(hError, buf, sz);
  2072. }
  2073. if (sizeRead!=(size32_t)-1)
  2074. break;
  2075. if (aborted)
  2076. break;
  2077. if (errno!=EINTR) {
  2078. aborted = true;
  2079. throw createPipeErrnoExceptionV(errno, "Pipe: readError failed (size %d)", sz);
  2080. }
  2081. }
  2082. return aborted?((size32_t)-1):((size32_t)sizeRead);
  2083. }
  2084. ISimpleReadStream *getErrorStream()
  2085. {
  2086. return new CSimplePipeStream(LINK(this), true);
  2087. }
  2088. void notifyTerminated(HANDLE pid,unsigned _retcode)
  2089. {
  2090. CriticalBlock block(sect);
  2091. if (((int)pid>0)&&(pid==pipeProcess)) {
  2092. retcode = _retcode;
  2093. pipeProcess = (HANDLE)-1;
  2094. }
  2095. }
  2096. unsigned wait()
  2097. {
  2098. bool timedout;
  2099. return wait(INFINITE, timedout);
  2100. }
  2101. unsigned wait(unsigned timeoutms, bool &timedout)
  2102. {
  2103. timedout = false;
  2104. if (INFINITE != timeoutms)
  2105. {
  2106. CriticalBlock block(sect);
  2107. if (forkthread)
  2108. {
  2109. {
  2110. CriticalUnblock unblock(sect);
  2111. if (!forkthread->join(timeoutms))
  2112. {
  2113. timedout = true;
  2114. return retcode;
  2115. }
  2116. }
  2117. }
  2118. }
  2119. // NOTE - we don't clear stderrbufferthread here, since we want to be able to still read the buffered data
  2120. clearUtilityThreads(false); // NB: will recall forkthread->join(), but doesn't matter
  2121. if (pipeProcess != (HANDLE)-1)
  2122. {
  2123. if (title.length())
  2124. PROGLOG("%s: Pipe: process %d complete %d", title.get(), pipeProcess, retcode);
  2125. pipeProcess = (HANDLE)-1;
  2126. }
  2127. return retcode;
  2128. }
  2129. void closeOutput()
  2130. {
  2131. CriticalBlock block(sect);
  2132. if (hOutput != (HANDLE)-1) {
  2133. ::close(hOutput);
  2134. hOutput = (HANDLE)-1;
  2135. }
  2136. }
  2137. void closeInput()
  2138. {
  2139. CriticalBlock block(sect);
  2140. if (hInput != (HANDLE)-1) {
  2141. ::close(hInput);
  2142. hInput = (HANDLE)-1;
  2143. }
  2144. }
  2145. void closeError()
  2146. {
  2147. CriticalBlock block(sect);
  2148. if (hError != (HANDLE)-1) {
  2149. ::close(hError);
  2150. hError = (HANDLE)-1;
  2151. }
  2152. }
  2153. void abort()
  2154. {
  2155. CriticalBlock block(sect);
  2156. if (pipeProcess != (HANDLE)-1) {
  2157. if (title.length())
  2158. PROGLOG("%s: Pipe Aborting",title.get());
  2159. aborted = true;
  2160. closeInput();
  2161. {
  2162. CriticalUnblock unblock(sect);
  2163. forkthread->join(1000);
  2164. }
  2165. if (pipeProcess != (HANDLE)-1) {
  2166. if (title.length())
  2167. PROGLOG("%s: Forcibly killing pipe process %d",title.get(),pipeProcess);
  2168. if (newProcessGroup)
  2169. ::kill(-pipeProcess,SIGKILL);
  2170. else
  2171. ::kill(pipeProcess,SIGKILL); // if this doesn't kill it we are in trouble
  2172. CriticalUnblock unblock(sect);
  2173. wait();
  2174. }
  2175. if (title.length())
  2176. PROGLOG("%s: Pipe Aborted",title.get());
  2177. retcode = -1;
  2178. forkthread.clear();
  2179. }
  2180. }
  2181. bool hasInput()
  2182. {
  2183. CriticalBlock block(sect);
  2184. return hInput!=(HANDLE)-1;
  2185. }
  2186. bool hasOutput()
  2187. {
  2188. CriticalBlock block(sect);
  2189. return hOutput!=(HANDLE)-1;
  2190. }
  2191. bool hasError()
  2192. {
  2193. CriticalBlock block(sect);
  2194. return hError!=(HANDLE)-1;
  2195. }
  2196. HANDLE getProcessHandle()
  2197. {
  2198. CriticalBlock block(sect);
  2199. return pipeProcess;
  2200. }
  2201. };
  2202. IPipeProcess *createPipeProcess(const char *allowedprogs)
  2203. {
  2204. return new CLinuxPipeProcess(allowedprogs);
  2205. }
  2206. #endif
  2207. // Worker thread
  2208. class CWorkQueueThread: implements IWorkQueueThread, public CInterface
  2209. {
  2210. public:
  2211. IMPLEMENT_IINTERFACE;
  2212. CriticalSection crit;
  2213. unsigned persisttime;
  2214. class cWorkerThread: public Thread
  2215. {
  2216. unsigned persisttime;
  2217. CWorkQueueThread *parent;
  2218. CriticalSection &crit;
  2219. public:
  2220. cWorkerThread(CWorkQueueThread *_parent,CriticalSection &_crit,unsigned _persisttime)
  2221. : crit(_crit)
  2222. {
  2223. parent = _parent;
  2224. persisttime = _persisttime;
  2225. }
  2226. QueueOf<IWorkQueueItem,false> queue;
  2227. Semaphore sem;
  2228. int run()
  2229. {
  2230. for (;;) {
  2231. IWorkQueueItem * work;
  2232. bool wr = sem.wait(persisttime);
  2233. {
  2234. CriticalBlock block(crit);
  2235. if (!wr) {
  2236. wr = sem.wait(0); // catch race
  2237. if (!wr)
  2238. break; // timed out
  2239. }
  2240. work = queue.dequeue();
  2241. }
  2242. if (!work)
  2243. break;
  2244. try {
  2245. work->execute();
  2246. work->Release();
  2247. }
  2248. catch (IException *e)
  2249. {
  2250. EXCLOG(e,"CWorkQueueThread item execute");
  2251. e->Release();
  2252. }
  2253. }
  2254. CriticalBlock block(crit);
  2255. parent->worker=NULL; // this should be safe
  2256. return 0;
  2257. }
  2258. } *worker;
  2259. CWorkQueueThread(unsigned _persisttime)
  2260. {
  2261. persisttime = _persisttime;
  2262. worker = NULL;
  2263. }
  2264. ~CWorkQueueThread()
  2265. {
  2266. wait();
  2267. }
  2268. void post(IWorkQueueItem *packet)
  2269. {
  2270. CriticalBlock block(crit);
  2271. if (!worker) {
  2272. worker = new cWorkerThread(this,crit,persisttime);
  2273. worker->startRelease();
  2274. }
  2275. worker->queue.enqueue(packet);
  2276. worker->sem.signal();
  2277. }
  2278. void wait()
  2279. {
  2280. CriticalBlock block(crit);
  2281. if (worker) {
  2282. worker->queue.enqueue(NULL);
  2283. worker->sem.signal();
  2284. Linked<cWorkerThread> wt;
  2285. wt.set(worker);
  2286. CriticalUnblock unblock(crit);
  2287. wt->join();
  2288. }
  2289. }
  2290. unsigned pending()
  2291. {
  2292. CriticalBlock block(crit);
  2293. unsigned ret = 0;
  2294. if (worker)
  2295. ret = worker->queue.ordinality();
  2296. return ret;
  2297. }
  2298. };
  2299. IWorkQueueThread *createWorkQueueThread(unsigned persisttime)
  2300. {
  2301. return new CWorkQueueThread(persisttime);
  2302. }
  2303. unsigned threadLogID() // for use in logging
  2304. {
  2305. #if defined(__APPLE__)
  2306. return pthread_mach_thread_np(pthread_self());
  2307. #elif !defined(_WIN32)
  2308. #ifdef SYS_gettid
  2309. return (unsigned) (memsize_t) syscall(SYS_gettid);
  2310. #endif
  2311. #endif
  2312. return (unsigned)(memsize_t) GetCurrentThreadId(); // truncated in 64bit
  2313. }