pqueue.hpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __PQUEUE_HPP
  14. #define __PQUEUE_HPP
  15. #ifdef _MSC_VER
  16. #pragma warning( push )
  17. #pragma warning(disable : 4284 ) // return type for '...::operator ->' is '...' (ie; not a UDT or reference to a UDT. Will produce errors if applied using infix notation)
  18. #endif
  19. #include "jlibplus.hpp"
  20. #include "jmutex.hpp"
  21. #include "jthread.hpp"
  22. #include "jmisc.hpp"
  23. #include <list>
  24. #include <vector>
  25. namespace esp
  26. {
  27. template<typename T> class WaitQueue: public CInterface, protected Mutex, implements IInterface
  28. {
  29. public:
  30. IMPLEMENT_IINTERFACE;
  31. WaitQueue(): counter(), stopped(false), waiting(0)
  32. {
  33. }
  34. ~WaitQueue()
  35. {
  36. stop();
  37. synchronized block(*this);
  38. while(waiting)
  39. {
  40. counter.signal(waiting); // actually need only one and only once
  41. wait(INFINITE);
  42. }
  43. }
  44. unsigned size()
  45. {
  46. synchronized block(*this);
  47. return queue.size();
  48. }
  49. T get(unsigned timeout=INFINITE)
  50. {
  51. synchronized block(*this);
  52. for(;;)
  53. {
  54. if(stopped)
  55. return 0;
  56. if(queue.size())
  57. break;
  58. if(!wait(timeout))
  59. return 0;
  60. }
  61. T item=queue.front();
  62. queue.pop_front();
  63. return item;
  64. }
  65. bool put(const T& item)
  66. {
  67. synchronized block(*this);
  68. if(stopped)
  69. return true;
  70. queue.push_back(item);
  71. counter.signal();
  72. return waiting>0;
  73. }
  74. void stop()
  75. {
  76. synchronized block(*this);
  77. stopped=true;
  78. queue.clear();
  79. counter.signal(waiting);
  80. }
  81. bool isStopped()
  82. {
  83. synchronized block(*this);
  84. return stopped;
  85. }
  86. private:
  87. bool wait(unsigned timeout)
  88. {
  89. bool ret=false;
  90. waiting++;
  91. int locked = unlockAll();
  92. ret=counter.wait(timeout);
  93. lockAll(locked);
  94. waiting--;
  95. return ret;
  96. }
  97. Semaphore counter;
  98. std::list<T> queue;
  99. volatile unsigned waiting;
  100. volatile bool stopped; //need event
  101. };
  102. interface ITask: extends IInterface
  103. {
  104. virtual int run()=0;
  105. virtual bool stop()=0;
  106. };
  107. interface IErrorListener: extends IInterface
  108. {
  109. virtual void reportError(const char* err,...) __attribute__((format(printf, 2, 3))) =0;
  110. };
  111. class TaskQueue
  112. {
  113. public:
  114. TaskQueue(size32_t _maxsize,IErrorListener* _err=0): maxsize(_maxsize), err(_err)
  115. {
  116. }
  117. ~TaskQueue()
  118. {
  119. stop();
  120. join();
  121. }
  122. void put(ITask* task)
  123. {
  124. bool needthread=!queue.put(task);
  125. if(needthread)
  126. {
  127. synchronized block(mworkers);
  128. if(workers.size()<maxsize)
  129. {
  130. workers.push_back(new WorkerThread(*this));
  131. workers.back()->start();
  132. }
  133. // DBGLOG("%d threads",workers.size());
  134. }
  135. }
  136. void stop()
  137. {
  138. queue.stop();
  139. synchronized block(mworkers);
  140. for(Workers::iterator it=workers.begin();it!=workers.end();it++)
  141. (*it)->stop(); // no good if threads did not clean up
  142. }
  143. void join()
  144. {
  145. synchronized block(mworkers);
  146. while(!workers.empty())
  147. {
  148. mworkers.wait();
  149. }
  150. }
  151. void setErrorListener(IErrorListener* _err)
  152. {
  153. err.set(_err);
  154. }
  155. void reportError(const char* e)
  156. {
  157. if(err)
  158. {
  159. synchronized block(merr);
  160. err->reportError(e);
  161. }
  162. }
  163. private:
  164. class WorkerThread: public Thread
  165. {
  166. public:
  167. WorkerThread(TaskQueue& _tq): tq(_tq), stopped(false)
  168. {
  169. }
  170. virtual int run()
  171. {
  172. for(;;)
  173. {
  174. try
  175. {
  176. task.setown(tq.queue.get(1000).get());
  177. if(stopped || !task)
  178. break;
  179. task->run();
  180. }
  181. catch (IException *E)
  182. {
  183. StringBuffer err;
  184. E->errorMessage(err);
  185. tq.reportError(err.str());
  186. E->Release();
  187. }
  188. catch (...)
  189. {
  190. tq.reportError("Unknown exception ");
  191. }
  192. task.clear();
  193. }
  194. Release(); // There should be one more
  195. return 0;
  196. }
  197. bool stop()
  198. {
  199. stopped=true;
  200. Linked<ITask> t(task.get());
  201. return t ? t->stop() : true;
  202. }
  203. virtual void beforeDispose()
  204. {
  205. tq.remove(this);
  206. }
  207. private:
  208. TaskQueue& tq;
  209. volatile bool stopped;
  210. Owned<ITask> task;
  211. };
  212. void remove(WorkerThread* th)
  213. {
  214. synchronized block(mworkers);
  215. workers.remove(th);
  216. if(workers.empty())
  217. mworkers.notifyAll();
  218. }
  219. WaitQueue<Linked<ITask> > queue;
  220. size32_t maxsize;
  221. friend WorkerThread;
  222. typedef std::list<WorkerThread*> Workers;
  223. Workers workers;
  224. Monitor mworkers;
  225. Linked<IErrorListener> err;
  226. Mutex merr;
  227. };
  228. }
  229. #ifdef _MSC_VER
  230. #pragma warning(pop)
  231. #endif
  232. #endif