jpqueue.hpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifndef __PQUEUE_HPP
  15. #define __PQUEUE_HPP
  16. #ifdef _MSC_VER
  17. #pragma warning( push )
  18. #pragma warning(disable : 4284 ) // return type for '...::operator ->' is '...' (ie; not a UDT or reference to a UDT. Will produce errors if applied using infix notation)
  19. #endif
  20. #include "jmutex.hpp"
  21. #include "jthread.hpp"
  22. #include "jmisc.hpp"
  23. #include <list>
  24. #include <vector>
  25. template<typename T> class WaitQueue: public CInterface, protected Mutex, implements IInterface
  26. {
  27. public:
  28. IMPLEMENT_IINTERFACE;
  29. WaitQueue(): counter(), stopped(false), waiting(0)
  30. {
  31. }
  32. ~WaitQueue()
  33. {
  34. stop();
  35. synchronized block(*this);
  36. while(waiting)
  37. {
  38. counter.signal(waiting); // actually need only one and only once
  39. wait(INFINITE);
  40. }
  41. }
  42. unsigned size()
  43. {
  44. synchronized block(*this);
  45. return queue.size();
  46. }
  47. T get(unsigned timeout=INFINITE)
  48. {
  49. synchronized block(*this);
  50. for(;;)
  51. {
  52. if(stopped)
  53. return 0;
  54. if(queue.size())
  55. break;
  56. if(!wait(timeout))
  57. return 0;
  58. }
  59. T item=queue.front();
  60. queue.pop_front();
  61. return item;
  62. }
  63. bool put(const T& item)
  64. {
  65. synchronized block(*this);
  66. if(stopped)
  67. return true;
  68. queue.push_back(item);
  69. counter.signal();
  70. return waiting>0;
  71. }
  72. void stop()
  73. {
  74. synchronized block(*this);
  75. stopped=true;
  76. queue.clear();
  77. counter.signal(waiting);
  78. }
  79. bool isStopped()
  80. {
  81. synchronized block(*this);
  82. return stopped;
  83. }
  84. private:
  85. bool wait(unsigned timeout)
  86. {
  87. bool ret=false;
  88. waiting++;
  89. int locked = unlockAll();
  90. ret=counter.wait(timeout);
  91. lockAll(locked);
  92. waiting--;
  93. return ret;
  94. }
  95. Semaphore counter;
  96. std::list<T> queue;
  97. volatile unsigned waiting;
  98. volatile bool stopped; //need event
  99. };
  100. interface ITask: extends IInterface
  101. {
  102. virtual int run()=0;
  103. virtual bool stop()=0;
  104. };
  105. interface IErrorListener: extends IInterface
  106. {
  107. virtual void reportError(const char* err,...)=0;
  108. };
  109. class TaskQueue
  110. {
  111. public:
  112. TaskQueue(size32_t _maxsize,IErrorListener* _err=0): maxsize(_maxsize), err(_err)
  113. {
  114. }
  115. ~TaskQueue()
  116. {
  117. stop();
  118. join();
  119. }
  120. void put(ITask* task)
  121. {
  122. bool needthread=!queue.put(task);
  123. if(needthread)
  124. {
  125. synchronized block(mworkers);
  126. if(workers.size()<maxsize)
  127. {
  128. workers.push_back(new WorkerThread(*this));
  129. workers.back()->start();
  130. }
  131. // PrintLog("%d threads",workers.size());
  132. }
  133. }
  134. void stop()
  135. {
  136. queue.stop();
  137. synchronized block(mworkers);
  138. for(Workers::iterator it=workers.begin();it!=workers.end();it++)
  139. (*it)->stop(); // no good if threads did not clean up
  140. }
  141. void join()
  142. {
  143. synchronized block(mworkers);
  144. while(!workers.empty())
  145. {
  146. mworkers.wait();
  147. }
  148. }
  149. void setErrorListener(IErrorListener* _err)
  150. {
  151. err.set(_err);
  152. }
  153. void reportError(const char* e)
  154. {
  155. if(err)
  156. {
  157. synchronized block(merr);
  158. err->reportError(e);
  159. }
  160. }
  161. private:
  162. class WorkerThread: public Thread
  163. {
  164. public:
  165. WorkerThread(TaskQueue& _tq): tq(_tq), stopped(false)
  166. {
  167. }
  168. virtual int run()
  169. {
  170. for(;;)
  171. {
  172. try
  173. {
  174. task.setown(tq.queue.get(1000).get());
  175. if(stopped || !task)
  176. break;
  177. task->run();
  178. }
  179. catch (IException *E)
  180. {
  181. StringBuffer err;
  182. E->errorMessage(err);
  183. tq.reportError(err.str());
  184. E->Release();
  185. }
  186. catch (...)
  187. {
  188. tq.reportError("Unknown exception ");
  189. }
  190. task.clear();
  191. }
  192. Release(); // There should be one more
  193. return 0;
  194. }
  195. bool stop()
  196. {
  197. stopped=true;
  198. Linked<ITask> t(task.get());
  199. return t ? t->stop() : true;
  200. }
  201. virtual void beforeDispose()
  202. {
  203. tq.remove(this);
  204. }
  205. private:
  206. TaskQueue& tq;
  207. volatile bool stopped;
  208. Owned<ITask> task;
  209. };
  210. void remove(WorkerThread* th)
  211. {
  212. synchronized block(mworkers);
  213. workers.remove(th);
  214. if(workers.empty())
  215. mworkers.notifyAll();
  216. }
  217. WaitQueue<Linked<ITask> > queue;
  218. size32_t maxsize;
  219. friend class WorkerThread;
  220. typedef std::list<WorkerThread*> Workers;
  221. Workers workers;
  222. Monitor mworkers;
  223. Linked<IErrorListener> err;
  224. Mutex merr;
  225. };
  226. #ifdef _MSC_VER
  227. #pragma warning(pop)
  228. #endif
  229. #endif