pqueue.hpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifndef __PQUEUE_HPP
  15. #define __PQUEUE_HPP
  16. #ifdef _MSC_VER
  17. #pragma warning( push )
  18. #pragma warning(disable : 4284 ) // return type for '...::operator ->' is '...' (ie; not a UDT or reference to a UDT. Will produce errors if applied using infix notation)
  19. #endif
  20. #include "jlibplus.hpp"
  21. #include "jmutex.hpp"
  22. #include "jthread.hpp"
  23. #include "jmisc.hpp"
  24. #include <list>
  25. #include <vector>
  26. namespace esp
  27. {
  28. template<typename T> class WaitQueue: public CInterface, protected Mutex, implements IInterface
  29. {
  30. public:
  31. IMPLEMENT_IINTERFACE;
  32. WaitQueue(): counter(), stopped(false), waiting(0)
  33. {
  34. }
  35. ~WaitQueue()
  36. {
  37. stop();
  38. synchronized block(*this);
  39. while(waiting)
  40. {
  41. counter.signal(waiting); // actually need only one and only once
  42. wait(INFINITE);
  43. }
  44. }
  45. unsigned size()
  46. {
  47. synchronized block(*this);
  48. return queue.size();
  49. }
  50. T get(unsigned timeout=INFINITE)
  51. {
  52. synchronized block(*this);
  53. for(;;)
  54. {
  55. if(stopped)
  56. return 0;
  57. if(queue.size())
  58. break;
  59. if(!wait(timeout))
  60. return 0;
  61. }
  62. T item=queue.front();
  63. queue.pop_front();
  64. return item;
  65. }
  66. bool put(const T& item)
  67. {
  68. synchronized block(*this);
  69. if(stopped)
  70. return true;
  71. queue.push_back(item);
  72. counter.signal();
  73. return waiting>0;
  74. }
  75. void stop()
  76. {
  77. synchronized block(*this);
  78. stopped=true;
  79. queue.clear();
  80. counter.signal(waiting);
  81. }
  82. bool isStopped()
  83. {
  84. synchronized block(*this);
  85. return stopped;
  86. }
  87. private:
  88. bool wait(unsigned timeout)
  89. {
  90. bool ret=false;
  91. waiting++;
  92. int locked = unlockAll();
  93. ret=counter.wait(timeout);
  94. lockAll(locked);
  95. waiting--;
  96. return ret;
  97. }
  98. Semaphore counter;
  99. std::list<T> queue;
  100. volatile unsigned waiting;
  101. volatile bool stopped; //need event
  102. };
  103. interface ITask: extends IInterface
  104. {
  105. virtual int run()=0;
  106. virtual bool stop()=0;
  107. };
  108. interface IErrorListener: extends IInterface
  109. {
  110. virtual void reportError(const char* err,...) __attribute__((format(printf, 2, 3))) =0;
  111. };
  112. class TaskQueue
  113. {
  114. public:
  115. TaskQueue(size32_t _maxsize,IErrorListener* _err=0): maxsize(_maxsize), err(_err)
  116. {
  117. }
  118. ~TaskQueue()
  119. {
  120. stop();
  121. join();
  122. }
  123. void put(ITask* task)
  124. {
  125. bool needthread=!queue.put(task);
  126. if(needthread)
  127. {
  128. synchronized block(mworkers);
  129. if(workers.size()<maxsize)
  130. {
  131. workers.push_back(new WorkerThread(*this));
  132. workers.back()->start();
  133. }
  134. // PrintLog("%d threads",workers.size());
  135. }
  136. }
  137. void stop()
  138. {
  139. queue.stop();
  140. synchronized block(mworkers);
  141. for(Workers::iterator it=workers.begin();it!=workers.end();it++)
  142. (*it)->stop(); // no good if threads did not clean up
  143. }
  144. void join()
  145. {
  146. synchronized block(mworkers);
  147. while(!workers.empty())
  148. {
  149. mworkers.wait();
  150. }
  151. }
  152. void setErrorListener(IErrorListener* _err)
  153. {
  154. err.set(_err);
  155. }
  156. void reportError(const char* e)
  157. {
  158. if(err)
  159. {
  160. synchronized block(merr);
  161. err->reportError(e);
  162. }
  163. }
  164. private:
  165. class WorkerThread: public Thread
  166. {
  167. public:
  168. WorkerThread(TaskQueue& _tq): tq(_tq), stopped(false)
  169. {
  170. }
  171. virtual int run()
  172. {
  173. for(;;)
  174. {
  175. try
  176. {
  177. task.setown(tq.queue.get(1000).get());
  178. if(stopped || !task)
  179. break;
  180. task->run();
  181. }
  182. catch (IException *E)
  183. {
  184. StringBuffer err;
  185. E->errorMessage(err);
  186. tq.reportError(err.str());
  187. E->Release();
  188. }
  189. catch (...)
  190. {
  191. tq.reportError("Unknown exception ");
  192. }
  193. task.clear();
  194. }
  195. Release(); // There should be one more
  196. return 0;
  197. }
  198. bool stop()
  199. {
  200. stopped=true;
  201. Linked<ITask> t(task.get());
  202. return t ? t->stop() : true;
  203. }
  204. virtual void beforeDispose()
  205. {
  206. tq.remove(this);
  207. }
  208. private:
  209. TaskQueue& tq;
  210. volatile bool stopped;
  211. Owned<ITask> task;
  212. };
  213. void remove(WorkerThread* th)
  214. {
  215. synchronized block(mworkers);
  216. workers.remove(th);
  217. if(workers.empty())
  218. mworkers.notifyAll();
  219. }
  220. WaitQueue<StlLinked<ITask> > queue;
  221. size32_t maxsize;
  222. friend WorkerThread;
  223. typedef std::list<WorkerThread*> Workers;
  224. Workers workers;
  225. Monitor mworkers;
  226. Linked<IErrorListener> err;
  227. Mutex merr;
  228. };
  229. }
  230. #ifdef _MSC_VER
  231. #pragma warning(pop)
  232. #endif
  233. #endif