jthread.cpp 59 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jthread.hpp"
  15. #include "jlib.hpp"
  16. #include "jfile.hpp"
  17. #include "jmutex.hpp"
  18. #include "jexcept.hpp"
  19. #include "jmisc.hpp"
  20. #include "jqueue.tpp"
  21. #include "jregexp.hpp"
  22. #include <assert.h>
  23. #ifdef _WIN32
  24. #include <process.h>
  25. #else
  26. #include <unistd.h>
  27. #include <sys/wait.h>
  28. #include <sys/syscall.h>
  29. #include <sys/types.h>
  30. #endif
  31. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  32. #undef new
  33. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  34. #endif
  35. #define LINUX_STACKSIZE_CAP (0x200000)
  36. //#define NO_CATCHALL
  37. PointerArray *exceptionHandlers = NULL;
  38. MODULE_INIT(INIT_PRIORITY_JTHREAD)
  39. {
  40. exceptionHandlers = new PointerArray();
  41. return true;
  42. }
  43. MODULE_EXIT()
  44. {
  45. delete exceptionHandlers;
  46. }
  47. void addThreadExceptionHandler(IExceptionHandler *handler)
  48. {
  49. assertex(exceptionHandlers); // have to ensure MODULE_INIT has appropriate priority.
  50. exceptionHandlers->append(handler);
  51. }
  52. void removeThreadExceptionHandler(IExceptionHandler *handler)
  53. {
  54. exceptionHandlers->zap(handler);
  55. }
  56. static bool SEHHandling = false;
  57. void enableThreadSEH() { SEHHandling=true; }
  58. void disableThreadSEH() { SEHHandling=false; } // only prevents new threads from having SEH handler, no mech. for turning off existing threads SEH handling.
  59. static ICopyArrayOf<Thread> ThreadList;
  60. static CriticalSection ThreadListSem;
  61. static size32_t defaultThreadStackSize=0;
  62. static ICopyArrayOf<Thread> ThreadDestroyList;
  63. static SpinLock ThreadDestroyListLock;
  64. #ifdef _WIN32
  65. extern void *EnableSEHtranslation();
  66. unsigned WINAPI Thread::_threadmain(LPVOID v)
  67. #else
  68. void *Thread::_threadmain(void *v)
  69. #endif
  70. {
  71. Thread * t = (Thread *)v;
  72. #ifdef _WIN32
  73. if (SEHHandling)
  74. EnableSEHtranslation();
  75. #else
  76. t->tidlog = threadLogID();
  77. #endif
  78. int ret = t->begin();
  79. char *&threadname = t->cthreadname.threadname;
  80. if (threadname) {
  81. memsize_t l=strlen(threadname);
  82. char *newname = (char *)malloc(l+8+1);
  83. memcpy(newname,"Stopped ",8);
  84. memcpy(newname+8,threadname,l+1);
  85. char *oldname = threadname;
  86. threadname = newname;
  87. free(oldname);
  88. }
  89. {
  90. // need to ensure joining thread does not race with us to release
  91. t->Link(); // extra safety link
  92. {
  93. SpinBlock block(ThreadDestroyListLock);
  94. ThreadDestroyList.append(*t);
  95. }
  96. try {
  97. t->stopped.signal();
  98. if (t->Release()) {
  99. PROGLOG("extra unlinked thread");
  100. PrintStackReport();
  101. }
  102. else
  103. t->Release();
  104. }
  105. catch (...) {
  106. PROGLOG("thread release exception");
  107. throw;
  108. }
  109. {
  110. SpinBlock block(ThreadDestroyListLock);
  111. ThreadDestroyList.zap(*t); // hopefully won't get too big (i.e. one entry!)
  112. }
  113. }
  114. #if defined(_WIN32)
  115. return ret;
  116. #else
  117. return (void *) (memsize_t)ret;
  118. #endif
  119. }
  120. // JCSMORE - should have a setPriority(), unsupported under _WIN32
  121. void Thread::adjustPriority(char delta)
  122. {
  123. if (delta < -2)
  124. prioritydelta = -2;
  125. else if (delta > 2)
  126. prioritydelta = 2;
  127. else
  128. prioritydelta = delta;
  129. if (alive)
  130. {
  131. #if defined(_WIN32)
  132. int priority;
  133. switch (delta)
  134. {
  135. case -2: priority = THREAD_PRIORITY_LOWEST; break;
  136. case -1: priority = THREAD_PRIORITY_BELOW_NORMAL; break;
  137. case 0: priority = THREAD_PRIORITY_NORMAL; break;
  138. case +1: priority = THREAD_PRIORITY_ABOVE_NORMAL; break;
  139. case +2: priority = THREAD_PRIORITY_HIGHEST; break;
  140. }
  141. SetThreadPriority(hThread, priority);
  142. #else
  143. //MORE - What control is there?
  144. int policy;
  145. sched_param param;
  146. int rc;
  147. if (( rc = pthread_getschedparam(threadid, &policy, &param)) != 0)
  148. DBGLOG("pthread_getschedparam error: %d", rc);
  149. switch (delta)
  150. {
  151. // JCS - doubtful whether these good values...
  152. case -2: param.sched_priority = 0; policy =SCHED_OTHER; break;
  153. case -1: param.sched_priority = 0; policy =SCHED_OTHER; break;
  154. case 0: param.sched_priority = 0; policy =SCHED_OTHER; break;
  155. case +1: param.sched_priority = (sched_get_priority_max(SCHED_RR)-sched_get_priority_min(SCHED_RR))/2; policy =SCHED_RR; break;
  156. case +2: param.sched_priority = sched_get_priority_max(SCHED_RR); policy =SCHED_RR; break;
  157. }
  158. if(( rc = pthread_setschedparam(threadid, policy, &param)) != 0)
  159. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%"I64F"u PID=%i", rc,policy,param.sched_priority,(unsigned __int64) threadid,getpid());
  160. else
  161. DBGLOG("priority set id=%"I64F"u policy=%i pri=%i PID=%i",(unsigned __int64) threadid,policy,param.sched_priority,getpid());
  162. #endif
  163. }
  164. }
  165. void Thread::adjustNiceLevel()
  166. {
  167. #if defined(_WIN32)
  168. int priority;
  169. if(nicelevel < -15)
  170. priority = THREAD_PRIORITY_TIME_CRITICAL;
  171. else if(nicelevel >= -15 && nicelevel < -10)
  172. priority = THREAD_PRIORITY_HIGHEST;
  173. else if(nicelevel >= -10 && nicelevel < 0)
  174. priority = THREAD_PRIORITY_ABOVE_NORMAL;
  175. else if(nicelevel == 0)
  176. priority = THREAD_PRIORITY_NORMAL;
  177. else if(nicelevel > 0 && nicelevel <= 10)
  178. priority = THREAD_PRIORITY_BELOW_NORMAL;
  179. else if(nicelevel > 10 && nicelevel <= 15)
  180. priority = THREAD_PRIORITY_LOWEST;
  181. else if(nicelevel >15)
  182. priority = THREAD_PRIORITY_IDLE;
  183. SetThreadPriority(hThread, priority);
  184. #elif defined(__linux__)
  185. setpriority(PRIO_PROCESS, 0, nicelevel);
  186. #else
  187. UNIMPLEMENTED;
  188. #endif
  189. }
  190. // _nicelevel ranges from -20 to 19, the higher the nice level, the less cpu time the thread will get.
  191. void Thread::setNice(char _nicelevel)
  192. {
  193. if (_nicelevel < -20 || _nicelevel > 19)
  194. throw MakeStringException(0, "nice level should be between -20 and 19");
  195. if(alive)
  196. throw MakeStringException(0, "nice can only be set before the thread is started.");
  197. nicelevel = _nicelevel;
  198. }
  199. void Thread::setStackSize(size32_t size)
  200. {
  201. stacksize = (unsigned short)(size/0x1000);
  202. }
  203. void Thread::setDefaultStackSize(size32_t size)
  204. {
  205. defaultThreadStackSize = size; // has no effect under windows (though may be used for calculations later)
  206. }
  207. int Thread::begin()
  208. {
  209. if(nicelevel)
  210. adjustNiceLevel();
  211. #ifndef _WIN32
  212. starting.signal();
  213. suspend.wait();
  214. #endif
  215. int ret=-1;
  216. try {
  217. ret = run();
  218. }
  219. catch (IException *e)
  220. {
  221. handleException(e);
  222. }
  223. #ifndef NO_CATCHALL
  224. catch (...)
  225. {
  226. handleException(MakeStringException(0, "Unknown exception in Thread %s", getName()));
  227. }
  228. #endif
  229. #ifdef _WIN32
  230. #ifndef _DEBUG
  231. CloseHandle(hThread); // leak handle when debugging,
  232. // fixes some lockups/crashes in the debugger when lots of threads being created
  233. #endif
  234. hThread = NULL;
  235. #endif
  236. //alive = false; // not safe here
  237. return ret;
  238. }
  239. void Thread::handleException(IException *e)
  240. {
  241. assertex(exceptionHandlers);
  242. if (exceptionHandlers->ordinality() == 0)
  243. {
  244. PrintExceptionLog(e,getName());
  245. //throw; // don't rethrow unhandled, preferable over alternative of causing process death
  246. e->Release();
  247. }
  248. else
  249. {
  250. PrintExceptionLog(e,getName());
  251. bool handled = false;
  252. ForEachItemIn(ie, *exceptionHandlers)
  253. {
  254. IExceptionHandler *handler = (IExceptionHandler *) exceptionHandlers->item(ie);
  255. handled = handler->fireException(e) || handled;
  256. }
  257. if (!handled)
  258. {
  259. // if nothing choose to handle it.
  260. EXCLOG(e, NULL);
  261. //throw e; // don't rethrow unhandled, preferable over alternative of causing process death
  262. }
  263. e->Release();
  264. }
  265. }
  266. void Thread::init(const char *_name)
  267. {
  268. #ifdef _WIN32
  269. hThread = NULL;
  270. #endif
  271. threadid = 0;
  272. tidlog = 0;
  273. alive = false;
  274. cthreadname.threadname = (NULL == _name) ? NULL : strdup(_name);
  275. ithreadname = &cthreadname;
  276. prioritydelta = 0;
  277. nicelevel = 0;
  278. stacksize = 0; // default is EXE default stack size (set by /STACK)
  279. }
  280. void Thread::start()
  281. {
  282. if (alive) {
  283. WARNLOG("Thread::start(%s) - Thread already started!",getName());
  284. PrintStackReport();
  285. #ifdef _DEBUG
  286. throw MakeStringException(-1,"Thread::start(%s) - Thread already started!",getName());
  287. #endif
  288. return;
  289. }
  290. Link();
  291. startRelease();
  292. }
  293. void Thread::startRelease()
  294. {
  295. assertex(!alive);
  296. stopped.reinit(0); // just in case restarting
  297. #ifdef _WIN32
  298. hThread = (HANDLE)_beginthreadex(NULL, 0x1000*(unsigned)stacksize, Thread::_threadmain, this, CREATE_SUSPENDED, (unsigned *)&threadid);
  299. if (!hThread || !threadid)
  300. {
  301. Release();
  302. throw MakeOsException(GetLastError());
  303. }
  304. #else
  305. int status;
  306. unsigned numretrys = 8;
  307. unsigned delay = 1000;
  308. loop {
  309. pthread_attr_t attr;
  310. pthread_attr_init(&attr);
  311. pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
  312. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  313. if (stacksize)
  314. pthread_attr_setstacksize(&attr, (unsigned)stacksize*0x1000);
  315. else if (defaultThreadStackSize)
  316. pthread_attr_setstacksize(&attr, defaultThreadStackSize);
  317. else {
  318. #ifndef __64BIT__ // no need to cap 64bit
  319. size_t defss=0;
  320. pthread_attr_getstacksize(&attr, &defss);
  321. if (defss>LINUX_STACKSIZE_CAP)
  322. pthread_attr_setstacksize(&attr, LINUX_STACKSIZE_CAP);
  323. #endif
  324. }
  325. status = pthread_create(&threadid, &attr, Thread::_threadmain, this);
  326. if ((status==EAGAIN)||(status==EINTR)) {
  327. if (numretrys--==0)
  328. break;
  329. WARNLOG("pthread_create(%d): Out of threads, retrying...",status);
  330. Sleep(delay);
  331. delay *= 2;
  332. }
  333. else
  334. break;
  335. }
  336. if (status) {
  337. threadid = 0;
  338. Release();
  339. ERRLOG("pthread_create returns %d",status);
  340. PrintStackReport();
  341. PrintMemoryReport();
  342. StringBuffer s;
  343. getThreadList(s);
  344. ERRLOG("Running threads:\n %s",s.str());
  345. throw MakeOsException(status);
  346. }
  347. if (!starting.wait(1000*10))
  348. throw MakeStringException(-1, "Thread::start(%s) failed",getName());
  349. #endif
  350. alive = true;
  351. if (prioritydelta)
  352. adjustPriority(prioritydelta);
  353. {
  354. CriticalBlock block(ThreadListSem);
  355. ThreadList.zap(*this); // just in case restarting
  356. ThreadList.append(*this);
  357. }
  358. #ifdef _WIN32
  359. DWORD count = ResumeThread(hThread);
  360. assertex(count == 1);
  361. #else
  362. suspend.signal();
  363. #endif
  364. }
  365. bool Thread::join(unsigned timeout)
  366. {
  367. if (!alive&&!threadid) {
  368. #ifdef _DEBUG
  369. PROGLOG("join on unstarted thread!");
  370. PrintStackReport();
  371. #endif
  372. return true;
  373. }
  374. if (!stopped.wait(timeout))
  375. return false;
  376. if (!alive) // already joined
  377. {
  378. stopped.signal();
  379. return true;
  380. }
  381. unsigned count = 0;
  382. unsigned st = 0;
  383. loop { // this is to prevent race with destroy
  384. // (because Thread objects are not always link counted!)
  385. {
  386. SpinBlock block(ThreadDestroyListLock);
  387. if (ThreadDestroyList.find(*this)==NotFound)
  388. break;
  389. }
  390. #ifdef _DEBUG
  391. if (st==10)
  392. PROGLOG("Thread::join race");
  393. #endif
  394. Sleep(st); // switch back to exiting thread (not very elegant!)
  395. st++;
  396. if (st>10)
  397. st = 10; // note must be non-zero for high priority threads
  398. }
  399. #ifdef _DEBUG
  400. int c = getLinkCount();
  401. if (c>=DEAD_PSEUDO_COUNT) {
  402. PROGLOG("Dead/Dying thread joined! %d",c);
  403. PrintStackReport();
  404. }
  405. #endif
  406. alive = false; // should be safe here
  407. stopped.signal(); // signal stopped again, to prevent any parallel call from blocking.
  408. return true;
  409. }
  410. Thread::~Thread()
  411. {
  412. ithreadname = &cthreadname; // safer (as derived classes destroyed)
  413. #ifdef _DEBUG
  414. if (alive) {
  415. if (!stopped.wait(0)) { // see if fell out of threadmain and signal stopped
  416. PROGLOG("Live thread killed! %s",getName());
  417. PrintStackReport();
  418. }
  419. // don't need to resignal as we are on way out
  420. }
  421. #endif
  422. Link();
  423. // DBGLOG("Thread %x (%s) destroyed\n", threadid, threadname);
  424. {
  425. CriticalBlock block(ThreadListSem);
  426. ThreadList.zap(*this);
  427. }
  428. free(cthreadname.threadname);
  429. cthreadname.threadname = NULL;
  430. }
  431. unsigned getThreadCount()
  432. {
  433. CriticalBlock block(ThreadListSem);
  434. return ThreadList.ordinality();
  435. }
  436. StringBuffer & getThreadList(StringBuffer &str)
  437. {
  438. CriticalBlock block(ThreadListSem);
  439. ForEachItemIn(i,ThreadList) {
  440. Thread &item=ThreadList.item(i);
  441. item.getInfo(str).append("\n");
  442. }
  443. return str;
  444. }
  445. StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
  446. {
  447. CriticalBlock block(ThreadListSem);
  448. bool found=false;
  449. ForEachItemIn(i,ThreadList) {
  450. Thread &item=ThreadList.item(i);
  451. int h;
  452. unsigned t;
  453. const char *s = item.getLogInfo(h,t);
  454. if (s&&*s&&((thandle==0)||(h==thandle))&&((tid==0)||(t==tid))) {
  455. if (found) {
  456. name.clear();
  457. break; // only return if unambiguous
  458. }
  459. name.append(s);
  460. found = true;
  461. }
  462. }
  463. return name;
  464. }
  465. //class CAsyncFor
  466. void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
  467. {
  468. if (num <= 1)
  469. {
  470. if (num == 1)
  471. Do(0);
  472. return;
  473. }
  474. Mutex errmutex;
  475. Semaphore ready;
  476. Semaphore finished;
  477. IException *e=NULL;
  478. Owned<IShuffledIterator> shuffler;
  479. if (shuffled) {
  480. shuffler.setown(createShuffledIterator(num));
  481. shuffler->first(); // prime (needed to make thread safe)
  482. }
  483. unsigned i;
  484. if (maxatonce==1) { // no need for threads
  485. for (i=0;i<num;i++) {
  486. unsigned idx = shuffled?shuffler->lookup(i):i;
  487. try {
  488. Do(idx);
  489. }
  490. catch (IException * _e)
  491. {
  492. if (e)
  493. _e->Release(); // only return first
  494. else
  495. e = _e;
  496. if (abortFollowingException)
  497. break;
  498. }
  499. }
  500. }
  501. else {
  502. class cdothread: public Thread
  503. {
  504. public:
  505. Mutex *errmutex;
  506. Semaphore &ready;
  507. Semaphore &finished;
  508. int timeout;
  509. IException *&erre;
  510. unsigned idx;
  511. CAsyncFor *self;
  512. cdothread(CAsyncFor *_self,unsigned _idx,Semaphore &_ready,Semaphore &_finished,Mutex *_errmutex,IException *&_e)
  513. : Thread("CAsyncFor"),ready(_ready),finished(_finished),erre(_e)
  514. {
  515. errmutex =_errmutex;
  516. idx = _idx;
  517. self = _self;
  518. }
  519. int run()
  520. {
  521. try {
  522. self->Do(idx);
  523. }
  524. catch (IException * _e)
  525. {
  526. synchronized block(*errmutex);
  527. if (erre)
  528. _e->Release(); // only return first
  529. else
  530. erre = _e;
  531. }
  532. #ifndef NO_CATCHALL
  533. catch (...)
  534. {
  535. synchronized block(*errmutex);
  536. if (!erre)
  537. erre = MakeStringException(0, "Unknown exception in Thread %s", getName());
  538. }
  539. #endif
  540. ready.signal();
  541. finished.signal();
  542. return 0;
  543. }
  544. };
  545. if (maxatonce==0)
  546. maxatonce = num;
  547. for (i=0;(i<num)&&(i<maxatonce);i++)
  548. ready.signal();
  549. for (i=0;i<num;i++) {
  550. ready.wait();
  551. if (abortFollowingException && e) break;
  552. Thread *thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,finished,&errmutex,e);
  553. thread->startRelease();
  554. }
  555. while (i--)
  556. finished.wait();
  557. }
  558. if (e)
  559. throw e;
  560. }
  561. // ---------------------------------------------------------------------------
  562. // Thread Pools
  563. // ---------------------------------------------------------------------------
  564. class CPooledThreadWrapper;
  565. class CThreadPoolBase
  566. {
  567. public:
  568. virtual ~CThreadPoolBase() {}
  569. protected: friend class CPooledThreadWrapper;
  570. IExceptionHandler *exceptionHandler;
  571. CriticalSection crit;
  572. StringAttr poolname;
  573. int donewaiting;
  574. Semaphore donesem;
  575. PointerArray waitingsems;
  576. UnsignedArray waitingids;
  577. bool stopall;
  578. unsigned defaultmax;
  579. unsigned targetpoolsize;
  580. unsigned delay;
  581. Semaphore availsem;
  582. atomic_t numrunning;
  583. virtual bool notifyStopped(CPooledThreadWrapper *item)=0;
  584. };
  585. class CPooledThreadWrapper: public Thread
  586. {
  587. PooledThreadHandle handle;
  588. IPooledThread *thread;
  589. Semaphore sem;
  590. CThreadPoolBase &parent;
  591. char *runningname;
  592. public:
  593. IMPLEMENT_IINTERFACE;
  594. CPooledThreadWrapper(CThreadPoolBase &_parent,
  595. PooledThreadHandle _handle,
  596. IPooledThread *_thread) // takes ownership of thread
  597. : Thread(StringBuffer("Member of thread pool: ").append(_parent.poolname).str()), parent(_parent)
  598. {
  599. thread = _thread;
  600. handle = _handle;
  601. runningname = strdup(_parent.poolname);
  602. }
  603. ~CPooledThreadWrapper()
  604. {
  605. thread->Release();
  606. free(runningname);
  607. }
  608. void setName(const char *name) { free(runningname); runningname=strdup(name); }
  609. void setHandle(PooledThreadHandle _handle) { handle = _handle; }
  610. PooledThreadHandle queryHandle() { return handle; }
  611. IPooledThread &queryThread() { return *thread; }
  612. void setThread(IPooledThread *_thread) { thread = _thread; } // takes ownership
  613. bool isStopped() { return (handle==0); }
  614. PooledThreadHandle markStopped() { PooledThreadHandle ret=handle; handle = 0; if (ret) atomic_dec(&parent.numrunning); return ret; }
  615. int run()
  616. {
  617. do {
  618. sem.wait();
  619. {
  620. CriticalBlock block(parent.crit); // to synchronize
  621. if (parent.stopall)
  622. break;
  623. }
  624. try {
  625. char *&threadname = cthreadname.threadname;
  626. char *temp = threadname; // swap running name and threadname
  627. threadname = runningname;
  628. runningname = temp;
  629. thread->main();
  630. temp = threadname; // and back
  631. threadname = runningname;
  632. runningname = temp;
  633. }
  634. catch (IException *e)
  635. {
  636. char *&threadname = cthreadname.threadname;
  637. char *temp = threadname; // swap back
  638. threadname = runningname;
  639. runningname = temp;
  640. handleException(e);
  641. }
  642. #ifndef NO_CATCHALL
  643. catch (...)
  644. {
  645. char *&threadname = cthreadname.threadname;
  646. char *temp = threadname; // swap back
  647. threadname = runningname;
  648. runningname = temp;
  649. handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
  650. }
  651. #endif
  652. } while (parent.notifyStopped(this));
  653. return 0;
  654. }
  655. void cycle()
  656. {
  657. sem.signal();
  658. }
  659. void go(void *param)
  660. {
  661. thread->init(param);
  662. cycle();
  663. }
  664. bool stop()
  665. {
  666. if (handle)
  667. return thread->stop();
  668. return true;
  669. }
  670. void handleException(IException *e)
  671. {
  672. CriticalBlock block(parent.crit);
  673. PrintExceptionLog(e,parent.poolname.get());
  674. if (!parent.exceptionHandler||!parent.exceptionHandler->fireException(e)) {
  675. }
  676. e->Release();
  677. }
  678. };
  679. class CPooledThreadIterator: public CInterface , implements IPooledThreadIterator
  680. {
  681. unsigned current;
  682. public:
  683. IArrayOf<IPooledThread> threads;
  684. IMPLEMENT_IINTERFACE;
  685. CPooledThreadIterator()
  686. {
  687. current = 0;
  688. }
  689. bool first()
  690. {
  691. current = 0;
  692. return threads.isItem(current);
  693. }
  694. bool next()
  695. {
  696. current++;
  697. return threads.isItem(current);
  698. }
  699. bool isValid()
  700. {
  701. return threads.isItem(current);
  702. }
  703. IPooledThread & query()
  704. {
  705. return threads.item(current);
  706. }
  707. };
  708. class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInterface
  709. {
  710. CIArrayOf<CPooledThreadWrapper> threadwrappers;
  711. PooledThreadHandle nextid;
  712. IThreadFactory *factory;
  713. unsigned stacksize;
  714. unsigned timeoutOnRelease;
  715. PooledThreadHandle _start(void *param,const char *name, bool noBlock, unsigned timeout=0)
  716. {
  717. bool timedout = defaultmax && !availsem.wait(noBlock ? 0 : (timeout>0?timeout:delay));
  718. PooledThreadHandle ret;
  719. {
  720. CriticalBlock block(crit);
  721. if (timedout&&!availsem.wait(0)) { // make sure take allocated sem if has become available
  722. if (noBlock || timeout > 0)
  723. throw MakeStringException(0, "No threads available in pool %s", poolname.get());
  724. PROGLOG("WARNING: Pool limit exceeded for %s", poolname.get());
  725. }
  726. CPooledThreadWrapper &t = allocThread();
  727. if (name)
  728. t.setName(name);
  729. t.go(param);
  730. ret = t.queryHandle();
  731. }
  732. Sleep(0);
  733. return ret;
  734. }
  735. public:
  736. IMPLEMENT_IINTERFACE;
  737. CThreadPool(IThreadFactory *_factory,IExceptionHandler *_exceptionHandler,const char *_poolname,unsigned _defaultmax, unsigned _delay, unsigned _stacksize, unsigned _timeoutOnRelease, unsigned _targetpoolsize)
  738. {
  739. poolname.set(_poolname);
  740. factory = LINK(_factory);
  741. exceptionHandler = _exceptionHandler;
  742. nextid = 1;
  743. stopall = false;
  744. defaultmax = _defaultmax;
  745. delay = _delay;
  746. if (defaultmax)
  747. availsem.signal(defaultmax);
  748. stacksize = _stacksize;
  749. timeoutOnRelease = _timeoutOnRelease;
  750. targetpoolsize = _targetpoolsize?_targetpoolsize:defaultmax;
  751. atomic_set(&numrunning,0);
  752. }
  753. ~CThreadPool()
  754. {
  755. stopAll(true);
  756. if (!joinAll(true, timeoutOnRelease))
  757. WARNLOG("%s; timedout[%d] waiting for threads in pool", poolname.get(), timeoutOnRelease);
  758. CriticalBlock block(crit);
  759. bool first=true;
  760. ForEachItemIn(i,threadwrappers)
  761. {
  762. CPooledThreadWrapper &t = threadwrappers.item(i);
  763. if (!t.isStopped())
  764. {
  765. if (first)
  766. {
  767. WARNLOG("Threads still active: ");
  768. first = false;
  769. }
  770. StringBuffer threadInfo;
  771. PROGLOG("Active thread: %s, info: %s", t.getName(), t.getInfo(threadInfo).str());
  772. }
  773. }
  774. factory->Release();
  775. }
  776. CPooledThreadWrapper &allocThread()
  777. { // called in critical section
  778. PooledThreadHandle newid=nextid++;
  779. if (newid==0)
  780. newid=nextid++;
  781. ForEachItemIn(i,threadwrappers) {
  782. CPooledThreadWrapper &it = threadwrappers.item(i);
  783. if (it.isStopped()) {
  784. it.setHandle(newid);
  785. if (!it.queryThread().canReuse()) {
  786. it.queryThread().Release();
  787. it.setThread(factory->createNew());
  788. }
  789. return it;
  790. }
  791. }
  792. CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew());
  793. if (stacksize)
  794. ret.setStackSize(stacksize);
  795. ret.start();
  796. atomic_inc(&numrunning);
  797. threadwrappers.append(ret);
  798. return ret;
  799. }
  800. CPooledThreadWrapper *findThread(PooledThreadHandle handle)
  801. { // called in critical section
  802. ForEachItemIn(i,threadwrappers) {
  803. CPooledThreadWrapper &it = threadwrappers.item(i);
  804. if (it.queryHandle()==handle)
  805. return &it;
  806. }
  807. return NULL;
  808. }
  809. PooledThreadHandle startNoBlock(void *param)
  810. {
  811. return _start(param, NULL, true);
  812. }
  813. PooledThreadHandle startNoBlock(void *param,const char *name)
  814. {
  815. return _start(param, name, true);
  816. }
  817. PooledThreadHandle start(void *param)
  818. {
  819. return _start(param, NULL, false);
  820. }
  821. PooledThreadHandle start(void *param,const char *name)
  822. {
  823. return _start(param, name, false);
  824. }
  825. PooledThreadHandle start(void *param,const char *name, unsigned timeout)
  826. {
  827. return _start(param, name, false, timeout);
  828. }
  829. bool stop(PooledThreadHandle handle)
  830. {
  831. CriticalBlock block(crit);
  832. CPooledThreadWrapper *t = findThread(handle);
  833. if (t)
  834. return t->stop();
  835. return true; // already stopped
  836. }
  837. bool stopAll(bool tryall=false)
  838. {
  839. availsem.signal(1000);
  840. availsem.wait();
  841. CriticalBlock block(crit);
  842. bool ret=true;
  843. ForEachItemIn(i,threadwrappers) {
  844. CPooledThreadWrapper &it = threadwrappers.item(i);
  845. if (!it.stop()) {
  846. ret = false;
  847. if (!tryall)
  848. break;
  849. }
  850. }
  851. return ret;
  852. }
  853. bool joinWait(CPooledThreadWrapper &t,unsigned timeout)
  854. {
  855. // called in critical section
  856. if (t.isStopped())
  857. return true;
  858. Semaphore sem;
  859. waitingsems.append(&sem);
  860. waitingids.append(t.queryHandle());
  861. crit.leave();
  862. bool ret = sem.wait(timeout);
  863. crit.enter();
  864. unsigned i = waitingsems.find(&sem);
  865. if (i!=NotFound) {
  866. waitingids.remove(i);
  867. waitingsems.remove(i);
  868. }
  869. return ret;
  870. }
  871. bool join(PooledThreadHandle handle,unsigned timeout=INFINITE)
  872. {
  873. CriticalBlock block(crit);
  874. CPooledThreadWrapper *t = findThread(handle);
  875. if (!t)
  876. return true; // already stopped
  877. return joinWait(*t,timeout);
  878. }
  879. virtual bool joinAll(bool del,unsigned timeout=INFINITE)
  880. { // note timeout is for each join
  881. CriticalBlock block(crit);
  882. CIArrayOf<CPooledThreadWrapper> tojoin;
  883. ForEachItemIn(i1,threadwrappers) {
  884. CPooledThreadWrapper &it = threadwrappers.item(i1);
  885. it.Link();
  886. tojoin.append(it);
  887. }
  888. ForEachItemIn(i2,tojoin)
  889. if (!joinWait(tojoin.item(i2),timeout))
  890. return false;
  891. if (del) {
  892. stopall = true;
  893. ForEachItemIn(i3,tojoin)
  894. tojoin.item(i3).cycle();
  895. {
  896. CriticalUnblock unblock(crit);
  897. ForEachItemIn(i4,tojoin)
  898. tojoin.item(i4).join();
  899. }
  900. threadwrappers.kill();
  901. stopall = false;
  902. }
  903. return true;
  904. }
  905. IPooledThreadIterator *running()
  906. {
  907. CriticalBlock block(crit);
  908. CPooledThreadIterator *ret = new CPooledThreadIterator;
  909. ForEachItemIn(i,threadwrappers) {
  910. CPooledThreadWrapper &it = threadwrappers.item(i);
  911. if (!it.isStopped()) {
  912. IPooledThread &t = it.queryThread();
  913. t.Link();
  914. ret->threads.append(t);
  915. }
  916. }
  917. return ret;
  918. }
  919. unsigned runningCount()
  920. {
  921. return (unsigned)atomic_read(&numrunning);
  922. }
  923. bool notifyStopped(CPooledThreadWrapper *item)
  924. {
  925. CriticalBlock block(crit);
  926. PooledThreadHandle myid = item->markStopped();
  927. ForEachItemIn(i1,waitingids) { // tell anyone waiting
  928. if (waitingids.item(i1)==myid)
  929. ((Semaphore *)waitingsems.item(i1))->signal();
  930. }
  931. bool ret = true;
  932. if (defaultmax) {
  933. unsigned n=threadwrappers.ordinality();
  934. for (unsigned i2=targetpoolsize;i2<n;i2++) { // only check excess for efficiency
  935. if (item==&threadwrappers.item(i2)) {
  936. threadwrappers.remove(i2);
  937. ret = false;
  938. break;
  939. }
  940. }
  941. availsem.signal();
  942. }
  943. return ret;
  944. }
  945. };
  946. IThreadPool *createThreadPool(const char *poolname,IThreadFactory *factory,IExceptionHandler *exceptionHandler,unsigned defaultmax, unsigned delay, unsigned stacksize, unsigned timeoutOnRelease, unsigned targetpoolsize)
  947. {
  948. return new CThreadPool(factory,exceptionHandler,poolname,defaultmax,delay,stacksize,timeoutOnRelease,targetpoolsize);
  949. }
  950. //=======================================================================================================
  951. static void CheckAllowedProgram(const char *prog,const char *allowed)
  952. {
  953. if (!prog||!allowed||(strcmp(allowed,"*")==0))
  954. return;
  955. StringBuffer head;
  956. bool inq = false;
  957. // note don't have to be too worried about odd quoting as matching fixed list
  958. while (*prog&&((*prog!=' ')||inq)) {
  959. if (*prog=='"')
  960. inq = !inq;
  961. head.append(*(prog++));
  962. }
  963. StringArray list;
  964. CslToStringArray(allowed, list);
  965. ForEachItemIn(i,list) {
  966. if (WildMatch(head.str(),list.item(i)))
  967. return;
  968. }
  969. ERRLOG("Unauthorized pipe program(%s)",head.str());
  970. throw MakeStringException(-1,"Unauthorized pipe program(%s)",head.str());
  971. }
  972. class CSimplePipeStream: public CInterface, implements ISimpleReadStream
  973. {
  974. public:
  975. IMPLEMENT_IINTERFACE;
  976. CSimplePipeStream(IPipeProcess *_pipe, bool _isStderr) : pipe(_pipe), isStderr(_isStderr) {}
  977. virtual size32_t read(size32_t sz, void * data)
  978. {
  979. if (isStderr)
  980. return pipe->readError(sz, data);
  981. else
  982. return pipe->read(sz, data);
  983. }
  984. private:
  985. Owned<IPipeProcess> pipe;
  986. bool isStderr;
  987. };
  988. #ifdef _WIN32
  989. class CWindowsPipeProcess: public CInterface, implements IPipeProcess
  990. {
  991. HANDLE pipeProcess;
  992. HANDLE hInput;
  993. HANDLE hOutput;
  994. HANDLE hError;
  995. StringAttr title;
  996. unsigned retcode;
  997. CriticalSection sect;
  998. bool aborted;
  999. StringAttr allowedprogs;
  1000. public:
  1001. IMPLEMENT_IINTERFACE;
  1002. CWindowsPipeProcess(const char *_allowedprogs)
  1003. : allowedprogs(_allowedprogs)
  1004. {
  1005. pipeProcess = (HANDLE)-1;
  1006. hInput=(HANDLE)-1;
  1007. hOutput=(HANDLE)-1;
  1008. hError=(HANDLE)-1;
  1009. retcode = (unsigned)-1;
  1010. aborted = false;
  1011. }
  1012. ~CWindowsPipeProcess()
  1013. {
  1014. kill();
  1015. }
  1016. void kill()
  1017. {
  1018. doCloseInput();
  1019. doCloseOutput();
  1020. doCloseError();
  1021. if (pipeProcess != (HANDLE)-1) {
  1022. CloseHandle(pipeProcess);
  1023. pipeProcess = (HANDLE)-1;
  1024. }
  1025. }
  1026. bool run(const char *_title,const char *prog,const char *dir,bool hasinput,bool hasoutput,bool haserror, size32_t stderrbufsize)
  1027. {
  1028. // size32_t stderrbufsize ignored as not required (I think)
  1029. CriticalBlock block(sect);
  1030. kill();
  1031. title.clear();
  1032. if (_title) {
  1033. title.set(_title);
  1034. PROGLOG("%s: Creating PIPE process : %s", title.get(), prog);
  1035. }
  1036. CheckAllowedProgram(prog,allowedprogs);
  1037. SECURITY_ATTRIBUTES sa;
  1038. sa.nLength = sizeof(SECURITY_ATTRIBUTES);
  1039. sa.bInheritHandle = TRUE;
  1040. sa.lpSecurityDescriptor = NULL;
  1041. HANDLE hProgOutput=(HANDLE)-1;
  1042. HANDLE hProgInput=(HANDLE)-1;
  1043. HANDLE hProgError=(HANDLE)-1;
  1044. HANDLE h;
  1045. //NB: Create a pipe handles that are not inherited our end
  1046. if (hasinput) {
  1047. CreatePipe(&hProgInput,&h,&sa,0);
  1048. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hInput, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1049. CloseHandle(h);
  1050. }
  1051. if (hasoutput) {
  1052. CreatePipe(&h,&hProgOutput,&sa,0);
  1053. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hOutput, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1054. CloseHandle(h);
  1055. }
  1056. if (haserror) {
  1057. CreatePipe(&h,&hProgError,&sa,0);
  1058. DuplicateHandle(GetCurrentProcess(),h, GetCurrentProcess(), &hError, 0, FALSE, DUPLICATE_SAME_ACCESS);
  1059. CloseHandle(h);
  1060. }
  1061. STARTUPINFO StartupInfo;
  1062. _clear(StartupInfo);
  1063. StartupInfo.cb = sizeof(StartupInfo);
  1064. StartupInfo.wShowWindow = SW_HIDE;
  1065. StartupInfo.dwFlags = STARTF_USESTDHANDLES|STARTF_USESHOWWINDOW ;
  1066. StartupInfo.hStdOutput = hasoutput?hProgOutput:GetStdHandle(STD_OUTPUT_HANDLE);
  1067. StartupInfo.hStdError = haserror?hProgError:GetStdHandle(STD_ERROR_HANDLE);
  1068. StartupInfo.hStdInput = hasinput?hProgInput:GetStdHandle(STD_INPUT_HANDLE);
  1069. PROCESS_INFORMATION ProcessInformation;
  1070. if (!CreateProcess(NULL, (char *)prog, NULL,NULL,TRUE,0,NULL, dir&&*dir?dir:NULL, &StartupInfo,&ProcessInformation)) {
  1071. if (_title) {
  1072. StringBuffer errstr;
  1073. formatSystemError(errstr, GetLastError());
  1074. ERRLOG("%s: PIPE process '%s' failed: %s", title.get(), prog, errstr.str());
  1075. }
  1076. return false;
  1077. }
  1078. pipeProcess = ProcessInformation.hProcess;
  1079. CloseHandle(ProcessInformation.hThread);
  1080. if (hasoutput)
  1081. CloseHandle(hProgOutput);
  1082. if (hasinput)
  1083. CloseHandle(hProgInput);
  1084. if (haserror)
  1085. CloseHandle(hProgError);
  1086. return true;
  1087. }
  1088. size32_t read(size32_t sz, void *buf)
  1089. {
  1090. DWORD sizeRead;
  1091. if (!ReadFile(hOutput, buf, sz, &sizeRead, NULL)) {
  1092. //raise error here
  1093. if(aborted)
  1094. return 0;
  1095. int err=GetLastError();
  1096. switch(err)
  1097. {
  1098. case ERROR_HANDLE_EOF:
  1099. case ERROR_BROKEN_PIPE:
  1100. case ERROR_NO_DATA:
  1101. return 0;
  1102. default:
  1103. aborted = true;
  1104. IException *e = MakeOsException(err, "Pipe: ReadFile failed (size %d)", sz);
  1105. PrintExceptionLog(e, NULL);
  1106. throw e;
  1107. }
  1108. }
  1109. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1110. }
  1111. ISimpleReadStream *getOutputStream()
  1112. {
  1113. return new CSimplePipeStream(LINK(this), false);
  1114. }
  1115. size32_t readError(size32_t sz, void *buf)
  1116. {
  1117. DWORD sizeRead;
  1118. if (!ReadFile(hError, buf, sz, &sizeRead, NULL)) {
  1119. //raise error here
  1120. if(aborted)
  1121. return 0;
  1122. int err=GetLastError();
  1123. switch(err)
  1124. {
  1125. case ERROR_HANDLE_EOF:
  1126. case ERROR_BROKEN_PIPE:
  1127. case ERROR_NO_DATA:
  1128. return 0;
  1129. default:
  1130. aborted = true;
  1131. IException *e = MakeOsException(err, "Pipe: ReadError failed (size %d)", sz);
  1132. PrintExceptionLog(e, NULL);
  1133. throw e;
  1134. }
  1135. }
  1136. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1137. }
  1138. ISimpleReadStream *getErrorStream()
  1139. {
  1140. return new CSimplePipeStream(LINK(this), true);
  1141. }
  1142. size32_t write(size32_t sz, const void *buf)
  1143. {
  1144. DWORD sizeWritten;
  1145. if (!WriteFile(hInput, buf, sz, &sizeWritten, NULL)) {
  1146. int err=GetLastError();
  1147. if ((err==ERROR_HANDLE_EOF)||aborted)
  1148. sizeWritten = 0;
  1149. else {
  1150. IException *e = MakeOsException(err, "Pipe: WriteFile failed (size %d)", sz);
  1151. PrintExceptionLog(e, NULL);
  1152. throw e;
  1153. }
  1154. }
  1155. return aborted?((size32_t)-1):((size32_t)sizeWritten);
  1156. }
  1157. unsigned wait()
  1158. {
  1159. CriticalBlock block(sect);
  1160. if (pipeProcess != (HANDLE)-1) {
  1161. if (title.length())
  1162. PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
  1163. {
  1164. CriticalUnblock unblock(sect);
  1165. WaitForSingleObject(pipeProcess, INFINITE);
  1166. }
  1167. if (pipeProcess != (HANDLE)-1) {
  1168. GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
  1169. CloseHandle(pipeProcess);
  1170. pipeProcess = (HANDLE)-1;
  1171. }
  1172. if (title.length())
  1173. PROGLOG("%s: Pipe: process complete",title.get());
  1174. }
  1175. return retcode;
  1176. }
  1177. unsigned wait(unsigned timeoutms, bool &timedout)
  1178. {
  1179. CriticalBlock block(sect);
  1180. timedout = false;
  1181. if (pipeProcess != (HANDLE)-1) {
  1182. if (title.length())
  1183. PROGLOG("%s: Pipe: Waiting for process to complete %d",title.get(),(unsigned)pipeProcess);
  1184. {
  1185. CriticalUnblock unblock(sect);
  1186. if (WaitForSingleObject(pipeProcess, timeoutms)!=WAIT_OBJECT_0) {
  1187. timedout = true;
  1188. return retcode;
  1189. }
  1190. }
  1191. if (pipeProcess != (HANDLE)-1) {
  1192. GetExitCodeProcess(pipeProcess,(LPDWORD)&retcode); // already got if notified
  1193. CloseHandle(pipeProcess);
  1194. pipeProcess = (HANDLE)-1;
  1195. }
  1196. if (title.length())
  1197. PROGLOG("%s: Pipe: process complete",title.get());
  1198. }
  1199. return retcode;
  1200. }
  1201. void notifyTerminated(HANDLE pid,unsigned _retcode)
  1202. {
  1203. CriticalBlock block(sect);
  1204. if ((pid!=(HANDLE)-1)&&(pid==pipeProcess)) {
  1205. retcode = _retcode;
  1206. pipeProcess = (HANDLE)-1;
  1207. }
  1208. }
  1209. void doCloseInput()
  1210. {
  1211. CriticalBlock block(sect);
  1212. if (hInput != (HANDLE)-1) {
  1213. CloseHandle(hInput);
  1214. hInput = (HANDLE)-1;
  1215. }
  1216. }
  1217. void doCloseOutput()
  1218. {
  1219. CriticalBlock block(sect);
  1220. if (hOutput != (HANDLE)-1) {
  1221. CloseHandle(hOutput);
  1222. hOutput = (HANDLE)-1;
  1223. }
  1224. }
  1225. void doCloseError()
  1226. {
  1227. CriticalBlock block(sect);
  1228. if (hError != (HANDLE)-1) {
  1229. CloseHandle(hError);
  1230. hError = (HANDLE)-1;
  1231. }
  1232. }
  1233. void closeInput()
  1234. {
  1235. doCloseInput();
  1236. }
  1237. void closeOutput()
  1238. {
  1239. doCloseOutput();
  1240. }
  1241. void closeError()
  1242. {
  1243. doCloseError();
  1244. }
  1245. void abort()
  1246. {
  1247. CriticalBlock block(sect);
  1248. if (pipeProcess != (HANDLE)-1) {
  1249. if (title.length())
  1250. PROGLOG("%s: Pipe Aborting",title.get());
  1251. aborted = true;
  1252. //doCloseOutput(); // seems to work better without this
  1253. doCloseInput();
  1254. {
  1255. CriticalUnblock unblock(sect);
  1256. Sleep(100);
  1257. }
  1258. try { // this code is problematic for some reason
  1259. if (pipeProcess != (HANDLE)-1) {
  1260. TerminateProcess(pipeProcess, 255);
  1261. CloseHandle(pipeProcess);
  1262. pipeProcess = (HANDLE)-1;
  1263. }
  1264. }
  1265. catch (...) {
  1266. // ignore errors
  1267. }
  1268. if (title.length())
  1269. PROGLOG("%s: Pipe Aborted",title.get());
  1270. }
  1271. }
  1272. bool hasInput()
  1273. {
  1274. return hInput!=(HANDLE)-1;
  1275. }
  1276. bool hasOutput()
  1277. {
  1278. return hOutput!=(HANDLE)-1;
  1279. }
  1280. bool hasError()
  1281. {
  1282. return hError!=(HANDLE)-1;
  1283. }
  1284. HANDLE getProcessHandle()
  1285. {
  1286. return pipeProcess;
  1287. }
  1288. };
  1289. IPipeProcess *createPipeProcess(const char *allowedprogs)
  1290. {
  1291. return new CWindowsPipeProcess(allowedprogs);
  1292. }
  1293. #else
  1294. class CIgnoreSIGPIPE
  1295. {
  1296. public:
  1297. CIgnoreSIGPIPE()
  1298. {
  1299. struct sigaction act;
  1300. sigset_t blockset;
  1301. sigemptyset(&blockset);
  1302. act.sa_mask = blockset;
  1303. act.sa_handler = SIG_IGN;
  1304. sigaction(SIGPIPE, &act, NULL);
  1305. }
  1306. ~CIgnoreSIGPIPE()
  1307. {
  1308. signal(SIGPIPE, SIG_DFL);
  1309. }
  1310. };
  1311. #define WHITESPACE " \t\n\r"
  1312. #define START_FAILURE (199)
  1313. static unsigned dowaitpid(HANDLE pid, int mode)
  1314. {
  1315. while (pid != (HANDLE)-1) {
  1316. int stat=-1;
  1317. int ret = waitpid(pid, &stat, mode);
  1318. if (ret>0)
  1319. {
  1320. if (WIFEXITED(stat))
  1321. return WEXITSTATUS(stat);
  1322. else if (WIFSIGNALED(stat))
  1323. {
  1324. ERRLOG("Program was terminated by signal %u", (unsigned) WTERMSIG(stat));
  1325. if (WTERMSIG(stat)==SIGPIPE)
  1326. return 0;
  1327. return 254;
  1328. }
  1329. else
  1330. {
  1331. return 254;
  1332. }
  1333. }
  1334. if (ret==0)
  1335. break;
  1336. int err = errno;
  1337. if (err == ECHILD)
  1338. break;
  1339. if (err!=EINTR) {
  1340. ERRLOG("dowait failed with errcode %d",err);
  1341. return (unsigned)-1;
  1342. }
  1343. }
  1344. return 0;
  1345. }
  1346. class CLinuxPipeProcess: public CInterface, implements IPipeProcess
  1347. {
  1348. class cForkThread: public Thread
  1349. {
  1350. CLinuxPipeProcess *parent;
  1351. public:
  1352. cForkThread(CLinuxPipeProcess *_parent)
  1353. {
  1354. parent = _parent;
  1355. }
  1356. int run()
  1357. {
  1358. parent->run();
  1359. return 0;
  1360. }
  1361. };
  1362. Owned<cForkThread> forkthread;
  1363. class cStdErrorBufferThread: public Thread
  1364. {
  1365. MemoryAttr buf;
  1366. size32_t bufsize;
  1367. Semaphore stopsem;
  1368. CriticalSection &sect;
  1369. int &hError;
  1370. public:
  1371. cStdErrorBufferThread(size32_t maxbufsize,int &_hError,CriticalSection &_sect)
  1372. : hError(_hError), sect(_sect)
  1373. {
  1374. buf.allocate(maxbufsize);
  1375. bufsize = 0;
  1376. }
  1377. int run()
  1378. {
  1379. while (!stopsem.wait(1000)) {
  1380. CriticalBlock block(sect);
  1381. if (hError!=(HANDLE)-1) { // hmm who did that
  1382. fcntl(hError,F_SETFL,O_NONBLOCK); // make sure non-blocking
  1383. if (bufsize<buf.length()) {
  1384. size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
  1385. if ((int)sizeRead>0) {
  1386. bufsize += sizeRead;
  1387. }
  1388. }
  1389. else { // flush (to avoid process blocking)
  1390. byte tmp[1024];
  1391. size32_t totsz = 0;
  1392. for (unsigned i=0;i<1024;i++) {
  1393. size32_t sz = (size32_t)::read(hError, tmp, sizeof(tmp));
  1394. if ((int)sz<=0)
  1395. break;
  1396. totsz+=sz;
  1397. }
  1398. if (totsz)
  1399. WARNLOG("Lost %d bytes of stderr output",totsz);
  1400. }
  1401. }
  1402. }
  1403. return 0;
  1404. }
  1405. void stop()
  1406. {
  1407. stopsem.signal();
  1408. Thread::join();
  1409. }
  1410. size32_t read(size32_t sz,void *out)
  1411. {
  1412. CriticalBlock block(sect);
  1413. if (bufsize<sz)
  1414. sz = bufsize;
  1415. if (sz>0) {
  1416. memcpy(out,buf.bufferBase(),sz);
  1417. if (sz!=bufsize) {
  1418. bufsize -= sz;
  1419. memmove(buf.bufferBase(),(byte *)buf.bufferBase()+sz,bufsize); // not ideal but hopefully not large
  1420. }
  1421. else
  1422. bufsize = 0;
  1423. }
  1424. return sz;
  1425. }
  1426. } *stderrbufferthread;
  1427. protected: friend class PipeWriterThread;
  1428. HANDLE pipeProcess;
  1429. HANDLE hInput;
  1430. HANDLE hOutput;
  1431. HANDLE hError;
  1432. bool hasinput;
  1433. bool hasoutput;
  1434. bool haserror;
  1435. StringAttr title;
  1436. StringAttr cmd;
  1437. StringAttr prog;
  1438. StringAttr dir;
  1439. int retcode;
  1440. CriticalSection sect;
  1441. Semaphore started;
  1442. bool aborted;
  1443. MemoryBuffer stderrbuf;
  1444. size32_t stderrbufsize;
  1445. StringAttr allowedprogs;
  1446. public:
  1447. IMPLEMENT_IINTERFACE;
  1448. CLinuxPipeProcess(const char *_allowedprogs)
  1449. : allowedprogs(_allowedprogs)
  1450. {
  1451. pipeProcess = (HANDLE)-1;
  1452. hInput=(HANDLE)-1;
  1453. hOutput=(HANDLE)-1;
  1454. hError=(HANDLE)-1;
  1455. retcode = -1;
  1456. aborted = false;
  1457. stderrbufferthread = NULL;
  1458. }
  1459. ~CLinuxPipeProcess()
  1460. {
  1461. kill();
  1462. }
  1463. void kill()
  1464. {
  1465. closeInput();
  1466. closeOutput();
  1467. closeError();
  1468. Owned<cForkThread> ft;
  1469. cStdErrorBufferThread *et;
  1470. { CriticalBlock block(sect); // clear forkthread and stderrbufferthread
  1471. ft.setown(forkthread.getClear());
  1472. et = stderrbufferthread;
  1473. stderrbufferthread = NULL;
  1474. }
  1475. if (ft) {
  1476. ft->join();
  1477. ft.clear();
  1478. }
  1479. if (et) {
  1480. et->stop();
  1481. delete et;
  1482. }
  1483. }
  1484. char **splitargs(const char *line,unsigned &argc)
  1485. {
  1486. char *buf = strdup(line);
  1487. // first count params (this probably could be improved)
  1488. char *s = buf;
  1489. argc = 0;
  1490. while (readarg(s))
  1491. argc++;
  1492. free(buf);
  1493. size32_t l = strlen(line)+1;
  1494. size32_t al = (argc+1)*sizeof(char *);
  1495. char **argv = (char **)malloc(al+l);
  1496. argv[argc] = NULL;
  1497. s = ((char *)argv)+al;
  1498. memcpy(s,line,l);
  1499. for (unsigned i=0;i<argc;i++)
  1500. argv[i] = readarg(s);
  1501. return argv;
  1502. }
  1503. void run()
  1504. {
  1505. int inpipe[2];
  1506. int outpipe[2];
  1507. int errpipe[2];
  1508. if (hasinput)
  1509. if (::pipe(inpipe)==-1)
  1510. throw MakeOsException(errno);
  1511. if (hasoutput)
  1512. if (::pipe(outpipe)==-1)
  1513. throw MakeOsException(errno);
  1514. if (haserror)
  1515. if (::pipe(errpipe)==-1)
  1516. throw MakeOsException(errno);
  1517. loop {
  1518. pipeProcess = (HANDLE)fork();
  1519. if (pipeProcess!=(HANDLE)-1)
  1520. break;
  1521. if (errno!=EAGAIN) {
  1522. if (hasinput) {
  1523. close(inpipe[0]);
  1524. close(inpipe[1]);
  1525. }
  1526. if (hasoutput) {
  1527. close(outpipe[0]);
  1528. close(outpipe[1]);
  1529. }
  1530. if (haserror) {
  1531. close(errpipe[0]);
  1532. close(errpipe[1]);
  1533. }
  1534. retcode = START_FAILURE;
  1535. started.signal();
  1536. return;
  1537. }
  1538. }
  1539. if (pipeProcess==0) { // child
  1540. if (hasinput) {
  1541. dup2(inpipe[0],0);
  1542. close(inpipe[0]);
  1543. close(inpipe[1]);
  1544. }
  1545. if (hasoutput) {
  1546. dup2(outpipe[1],1);
  1547. close(outpipe[0]);
  1548. close(outpipe[1]);
  1549. }
  1550. if (haserror) {
  1551. dup2(errpipe[1],2);
  1552. close(errpipe[0]);
  1553. close(errpipe[1]);
  1554. }
  1555. unsigned argc;
  1556. char **argv=splitargs(prog,argc);
  1557. if (dir.get())
  1558. chdir(dir);
  1559. execvp(argv[0],argv);
  1560. _exit(START_FAILURE); // must be _exit!!
  1561. }
  1562. if (hasinput)
  1563. close(inpipe[0]);
  1564. if (hasoutput)
  1565. close(outpipe[1]);
  1566. if (haserror)
  1567. close(errpipe[1]);
  1568. hInput = hasinput?inpipe[1]:((HANDLE)-1);
  1569. hOutput = hasoutput?outpipe[0]:((HANDLE)-1);
  1570. hError = haserror?errpipe[0]:((HANDLE)-1);
  1571. started.signal();
  1572. retcode = dowaitpid(pipeProcess, 0);
  1573. if (retcode==START_FAILURE)
  1574. closeOutput();
  1575. }
  1576. bool run(const char *_title,const char *_prog,const char *_dir,bool _hasinput,bool _hasoutput, bool _haserror, size32_t stderrbufsize)
  1577. {
  1578. static CriticalSection runsect; // single thread process start to avoid forked handle open/closes interleaving
  1579. CriticalBlock runblock(runsect);
  1580. kill();
  1581. CriticalBlock block(sect);
  1582. hasinput = _hasinput;
  1583. hasoutput = _hasoutput;
  1584. haserror = _haserror;
  1585. title.clear();
  1586. prog.set(_prog);
  1587. dir.set(_dir);
  1588. if (_title) {
  1589. title.set(_title);
  1590. PROGLOG("%s: Creating PIPE program process : '%s' - hasinput=%d, hasoutput=%d stderrbufsize=%d", title.get(), prog.get(),(int)hasinput, (int)hasoutput, stderrbufsize);
  1591. }
  1592. CheckAllowedProgram(prog,allowedprogs);
  1593. retcode = 0;
  1594. if (forkthread) {
  1595. {
  1596. CriticalUnblock unblock(sect);
  1597. forkthread->join();
  1598. }
  1599. forkthread.clear();
  1600. }
  1601. forkthread.setown(new cForkThread(this));
  1602. forkthread->start();
  1603. {
  1604. CriticalUnblock unblock(sect);
  1605. started.wait();
  1606. forkthread->join(50); // give a chance to fail
  1607. }
  1608. if (retcode==START_FAILURE) {
  1609. ERRLOG("%s: PIPE process '%s' failed to start", title.get()?title.get():"CLinuxPipeProcess", prog.get());
  1610. forkthread.clear();
  1611. return false;
  1612. }
  1613. if (stderrbufsize) {
  1614. if (stderrbufferthread) {
  1615. stderrbufferthread->stop();
  1616. delete stderrbufferthread;
  1617. }
  1618. stderrbufferthread = new cStdErrorBufferThread(stderrbufsize,hError,sect);
  1619. stderrbufferthread->start();
  1620. }
  1621. return true;
  1622. }
  1623. size32_t read(size32_t sz, void *buf)
  1624. {
  1625. CriticalBlock block(sect);
  1626. if (aborted)
  1627. return (size32_t)-1;
  1628. if (hOutput==(HANDLE)-1)
  1629. return 0;
  1630. size32_t sizeRead;
  1631. loop {
  1632. {
  1633. CriticalUnblock unblock(sect);
  1634. sizeRead = (size32_t)::read(hOutput, buf, sz);
  1635. }
  1636. if (sizeRead!=(size32_t)-1)
  1637. break;
  1638. if (aborted)
  1639. break;
  1640. if (errno!=EINTR) {
  1641. aborted = true;
  1642. throw MakeErrnoException(errno,"Pipe: read failed (size %d)", sz);
  1643. }
  1644. }
  1645. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1646. }
  1647. ISimpleReadStream *getOutputStream()
  1648. {
  1649. return new CSimplePipeStream(LINK(this), false);
  1650. }
  1651. size32_t write(size32_t sz, const void *buf)
  1652. {
  1653. CriticalBlock block(sect);
  1654. CIgnoreSIGPIPE ignoresigpipe;
  1655. if (aborted)
  1656. return (size32_t)-1;
  1657. if (hInput==(HANDLE)-1)
  1658. return 0;
  1659. size32_t sizeWritten;
  1660. loop {
  1661. {
  1662. CriticalUnblock unblock(sect);
  1663. sizeWritten = (size32_t)::write(hInput, buf, sz);
  1664. }
  1665. if (sizeWritten!=(size32_t)-1)
  1666. break;
  1667. if (aborted)
  1668. break;
  1669. if (errno!=EINTR) {
  1670. throw MakeErrnoException(errno,"Pipe: write failed (size %d)", sz);
  1671. }
  1672. }
  1673. return aborted?((size32_t)-1):((size32_t)sizeWritten);
  1674. }
  1675. size32_t readError(size32_t sz, void *buf)
  1676. {
  1677. CriticalBlock block(sect);
  1678. if (stderrbufferthread)
  1679. return stderrbufferthread->read(sz,buf);
  1680. if (aborted)
  1681. return (size32_t)-1;
  1682. if (hError==(HANDLE)-1)
  1683. return 0;
  1684. size32_t sizeRead;
  1685. loop {
  1686. {
  1687. CriticalUnblock unblock(sect);
  1688. sizeRead = (size32_t)::read(hError, buf, sz);
  1689. }
  1690. if (sizeRead!=(size32_t)-1)
  1691. break;
  1692. if (aborted)
  1693. break;
  1694. if (errno!=EINTR) {
  1695. aborted = true;
  1696. throw MakeErrnoException(errno,"Pipe: readError failed (size %d)", sz);
  1697. }
  1698. }
  1699. return aborted?((size32_t)-1):((size32_t)sizeRead);
  1700. }
  1701. ISimpleReadStream *getErrorStream()
  1702. {
  1703. return new CSimplePipeStream(LINK(this), true);
  1704. }
  1705. void notifyTerminated(HANDLE pid,unsigned _retcode)
  1706. {
  1707. CriticalBlock block(sect);
  1708. if (((int)pid>0)&&(pid==pipeProcess)) {
  1709. retcode = _retcode;
  1710. pipeProcess = (HANDLE)-1;
  1711. }
  1712. }
  1713. unsigned wait()
  1714. {
  1715. CriticalBlock block(sect);
  1716. if (forkthread) {
  1717. {
  1718. CriticalUnblock unblock(sect);
  1719. forkthread->join();
  1720. }
  1721. if (pipeProcess != (HANDLE)-1) {
  1722. if (title.length())
  1723. PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
  1724. pipeProcess = (HANDLE)-1;
  1725. }
  1726. forkthread.clear();
  1727. }
  1728. return retcode;
  1729. }
  1730. unsigned wait(unsigned timeoutms, bool &timedout)
  1731. {
  1732. CriticalBlock block(sect);
  1733. timedout = false;
  1734. if (forkthread) {
  1735. {
  1736. CriticalUnblock unblock(sect);
  1737. if (!forkthread->join(timeoutms)) {
  1738. timedout = true;
  1739. return retcode;
  1740. }
  1741. }
  1742. if (pipeProcess != (HANDLE)-1) {
  1743. if (title.length())
  1744. PROGLOG("%s: Pipe: process %d complete %d",title.get(),pipeProcess,retcode);
  1745. pipeProcess = (HANDLE)-1;
  1746. }
  1747. forkthread.clear();
  1748. }
  1749. return retcode;
  1750. }
  1751. void closeOutput()
  1752. {
  1753. CriticalBlock block(sect);
  1754. if (hOutput != (HANDLE)-1) {
  1755. ::close(hOutput);
  1756. hOutput = (HANDLE)-1;
  1757. }
  1758. }
  1759. void closeInput()
  1760. {
  1761. CriticalBlock block(sect);
  1762. if (hInput != (HANDLE)-1) {
  1763. ::close(hInput);
  1764. hInput = (HANDLE)-1;
  1765. }
  1766. }
  1767. void closeError()
  1768. {
  1769. CriticalBlock block(sect);
  1770. if (hError != (HANDLE)-1) {
  1771. ::close(hError);
  1772. hError = (HANDLE)-1;
  1773. }
  1774. }
  1775. void abort()
  1776. {
  1777. CriticalBlock block(sect);
  1778. if (pipeProcess != (HANDLE)-1) {
  1779. if (title.length())
  1780. PROGLOG("%s: Pipe Aborting",title.get());
  1781. aborted = true;
  1782. closeInput();
  1783. {
  1784. CriticalUnblock unblock(sect);
  1785. forkthread->join(1000);
  1786. }
  1787. if (pipeProcess != (HANDLE)-1) {
  1788. if (title.length())
  1789. PROGLOG("%s: Forcibly killing pipe process %d",title.get(),pipeProcess);
  1790. ::kill(pipeProcess,SIGKILL); // if this doesn't kill it we are in trouble
  1791. CriticalUnblock unblock(sect);
  1792. wait();
  1793. }
  1794. if (title.length())
  1795. PROGLOG("%s: Pipe Aborted",title.get());
  1796. retcode = -1;
  1797. forkthread.clear();
  1798. }
  1799. }
  1800. bool hasInput()
  1801. {
  1802. CriticalBlock block(sect);
  1803. return hInput!=(HANDLE)-1;
  1804. }
  1805. bool hasOutput()
  1806. {
  1807. CriticalBlock block(sect);
  1808. return hOutput!=(HANDLE)-1;
  1809. }
  1810. bool hasError()
  1811. {
  1812. CriticalBlock block(sect);
  1813. return hError!=(HANDLE)-1;
  1814. }
  1815. HANDLE getProcessHandle()
  1816. {
  1817. CriticalBlock block(sect);
  1818. return pipeProcess;
  1819. }
  1820. };
  1821. IPipeProcess *createPipeProcess(const char *allowedprogs)
  1822. {
  1823. return new CLinuxPipeProcess(allowedprogs);
  1824. }
  1825. #endif
  1826. // Worker thread
  1827. class CWorkQueueThread: public CInterface, implements IWorkQueueThread
  1828. {
  1829. public:
  1830. IMPLEMENT_IINTERFACE;
  1831. CriticalSection crit;
  1832. unsigned persisttime;
  1833. class cWorkerThread: public Thread
  1834. {
  1835. unsigned persisttime;
  1836. CWorkQueueThread *parent;
  1837. CriticalSection &crit;
  1838. public:
  1839. IMPLEMENT_IINTERFACE;
  1840. cWorkerThread(CWorkQueueThread *_parent,CriticalSection &_crit,unsigned _persisttime)
  1841. : crit(_crit)
  1842. {
  1843. parent = _parent;
  1844. persisttime = _persisttime;
  1845. }
  1846. QueueOf<IWorkQueueItem,false> queue;
  1847. Semaphore sem;
  1848. int run()
  1849. {
  1850. loop {
  1851. IWorkQueueItem * work;
  1852. bool wr = sem.wait(persisttime);
  1853. {
  1854. CriticalBlock block(crit);
  1855. if (!wr) {
  1856. wr = sem.wait(0); // catch race
  1857. if (!wr)
  1858. break; // timed out
  1859. }
  1860. work = queue.dequeue();
  1861. }
  1862. if (!work)
  1863. break;
  1864. try {
  1865. work->execute();
  1866. work->Release();
  1867. }
  1868. catch (IException *e)
  1869. {
  1870. EXCLOG(e,"CWorkQueueThread item execute");
  1871. e->Release();
  1872. }
  1873. }
  1874. CriticalBlock block(crit);
  1875. parent->worker=NULL; // this should be safe
  1876. return 0;
  1877. }
  1878. } *worker;
  1879. CWorkQueueThread(unsigned _persisttime)
  1880. {
  1881. persisttime = _persisttime;
  1882. worker = NULL;
  1883. }
  1884. ~CWorkQueueThread()
  1885. {
  1886. wait();
  1887. }
  1888. void post(IWorkQueueItem *packet)
  1889. {
  1890. CriticalBlock block(crit);
  1891. if (!worker) {
  1892. worker = new cWorkerThread(this,crit,persisttime);
  1893. worker->startRelease();
  1894. }
  1895. worker->queue.enqueue(packet);
  1896. worker->sem.signal();
  1897. }
  1898. void wait()
  1899. {
  1900. CriticalBlock block(crit);
  1901. if (worker) {
  1902. worker->queue.enqueue(NULL);
  1903. worker->sem.signal();
  1904. Linked<cWorkerThread> wt;
  1905. wt.set(worker);
  1906. CriticalUnblock unblock(crit);
  1907. wt->join();
  1908. }
  1909. }
  1910. unsigned pending()
  1911. {
  1912. CriticalBlock block(crit);
  1913. unsigned ret = 0;
  1914. if (worker)
  1915. ret = worker->queue.ordinality();
  1916. return ret;
  1917. }
  1918. };
  1919. IWorkQueueThread *createWorkQueueThread(unsigned persisttime)
  1920. {
  1921. return new CWorkQueueThread(persisttime);
  1922. }
  1923. unsigned threadLogID() // for use in logging
  1924. {
  1925. #ifndef _WIN32
  1926. #ifdef SYS_gettid
  1927. return (unsigned) (memsize_t) syscall(SYS_gettid);
  1928. #endif
  1929. #endif
  1930. return (unsigned)(memsize_t) GetCurrentThreadId(); // truncated in 64bit
  1931. }