jpqueue.hpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __PQUEUE_HPP
  14. #define __PQUEUE_HPP
  15. #ifdef _MSC_VER
  16. #pragma warning( push )
  17. #pragma warning(disable : 4284 ) // return type for '...::operator ->' is '...' (ie; not a UDT or reference to a UDT. Will produce errors if applied using infix notation)
  18. #endif
  19. #include "jmutex.hpp"
  20. #include "jthread.hpp"
  21. #include "jmisc.hpp"
  22. #include <list>
  23. #include <vector>
  24. template<typename T> class WaitQueue: public CInterface, protected Mutex
  25. {
  26. public:
  27. WaitQueue(): counter(), stopped(false), waiting(0)
  28. {
  29. }
  30. ~WaitQueue()
  31. {
  32. stop();
  33. synchronized block(*this);
  34. while(waiting)
  35. {
  36. counter.signal(waiting); // actually need only one and only once
  37. wait(INFINITE);
  38. }
  39. }
  40. unsigned size()
  41. {
  42. synchronized block(*this);
  43. return queue.size();
  44. }
  45. T get(unsigned timeout=INFINITE)
  46. {
  47. synchronized block(*this);
  48. for(;;)
  49. {
  50. if(stopped)
  51. return 0;
  52. if(queue.size())
  53. break;
  54. if(!wait(timeout))
  55. return 0;
  56. }
  57. T item=queue.front();
  58. queue.pop_front();
  59. return item;
  60. }
  61. bool put(const T& item)
  62. {
  63. synchronized block(*this);
  64. if(stopped)
  65. return true;
  66. queue.push_back(item);
  67. counter.signal();
  68. return waiting>0;
  69. }
  70. void stop()
  71. {
  72. synchronized block(*this);
  73. stopped=true;
  74. queue.clear();
  75. counter.signal(waiting);
  76. }
  77. bool isStopped()
  78. {
  79. synchronized block(*this);
  80. return stopped;
  81. }
  82. private:
  83. bool wait(unsigned timeout)
  84. {
  85. bool ret=false;
  86. waiting++;
  87. int locked = unlockAll();
  88. ret=counter.wait(timeout);
  89. lockAll(locked);
  90. waiting--;
  91. return ret;
  92. }
  93. Semaphore counter;
  94. std::list<T> queue;
  95. volatile unsigned waiting;
  96. volatile bool stopped; //need event
  97. };
  98. interface ITask: extends IInterface
  99. {
  100. virtual int run()=0;
  101. virtual bool stop()=0;
  102. };
  103. interface IErrorListener: extends IInterface
  104. {
  105. virtual void reportError(const char* err,...)=0;
  106. };
  107. class TaskQueue
  108. {
  109. public:
  110. TaskQueue(size32_t _maxsize,IErrorListener* _err=0): maxsize(_maxsize), err(_err)
  111. {
  112. }
  113. ~TaskQueue()
  114. {
  115. stop();
  116. join();
  117. }
  118. void put(ITask* task)
  119. {
  120. bool needthread=!queue.put(task);
  121. if(needthread)
  122. {
  123. synchronized block(mworkers);
  124. if(workers.size()<maxsize)
  125. {
  126. workers.push_back(new WorkerThread(*this));
  127. workers.back()->start();
  128. }
  129. // PrintLog("%d threads",workers.size());
  130. }
  131. }
  132. void stop()
  133. {
  134. queue.stop();
  135. synchronized block(mworkers);
  136. for(Workers::iterator it=workers.begin();it!=workers.end();it++)
  137. (*it)->stop(); // no good if threads did not clean up
  138. }
  139. void join()
  140. {
  141. synchronized block(mworkers);
  142. while(!workers.empty())
  143. {
  144. mworkers.wait();
  145. }
  146. }
  147. void setErrorListener(IErrorListener* _err)
  148. {
  149. err.set(_err);
  150. }
  151. void reportError(const char* e)
  152. {
  153. if(err)
  154. {
  155. synchronized block(merr);
  156. err->reportError(e);
  157. }
  158. }
  159. private:
  160. class WorkerThread: public Thread
  161. {
  162. public:
  163. WorkerThread(TaskQueue& _tq): tq(_tq), stopped(false)
  164. {
  165. }
  166. virtual int run()
  167. {
  168. for(;;)
  169. {
  170. try
  171. {
  172. task.setown(tq.queue.get(1000).get());
  173. if(stopped || !task)
  174. break;
  175. task->run();
  176. }
  177. catch (IException *E)
  178. {
  179. StringBuffer err;
  180. E->errorMessage(err);
  181. tq.reportError(err.str());
  182. E->Release();
  183. }
  184. catch (...)
  185. {
  186. tq.reportError("Unknown exception ");
  187. }
  188. task.clear();
  189. }
  190. Release(); // There should be one more
  191. return 0;
  192. }
  193. bool stop()
  194. {
  195. stopped=true;
  196. Linked<ITask> t(task.get());
  197. return t ? t->stop() : true;
  198. }
  199. virtual void beforeDispose()
  200. {
  201. tq.remove(this);
  202. }
  203. private:
  204. TaskQueue& tq;
  205. volatile bool stopped;
  206. Owned<ITask> task;
  207. };
  208. void remove(WorkerThread* th)
  209. {
  210. synchronized block(mworkers);
  211. workers.remove(th);
  212. if(workers.empty())
  213. mworkers.notifyAll();
  214. }
  215. WaitQueue<Linked<ITask> > queue;
  216. size32_t maxsize;
  217. friend class WorkerThread;
  218. typedef std::list<WorkerThread*> Workers;
  219. Workers workers;
  220. Monitor mworkers;
  221. Linked<IErrorListener> err;
  222. Mutex merr;
  223. };
  224. #ifdef _MSC_VER
  225. #pragma warning(pop)
  226. #endif
  227. #endif