jqueue.tpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef JQueue_TINCL
  14. #define JQueue_TINCL
  15. #include "jmutex.hpp"
  16. // Simple Queue Template for an expanding circular buffer queue
  17. typedef bool (*priorityGreaterFunction)(const void * left, const void * right);
  18. template <class BASE, bool ALLOWNULLS>
  19. class QueueOf
  20. {
  21. typedef QueueOf<BASE, ALLOWNULLS> SELF;
  22. BASE **ptrs;
  23. unsigned headp;
  24. unsigned tailp;
  25. unsigned max;
  26. unsigned num;
  27. void expand()
  28. {
  29. unsigned inc;
  30. if (max>=0x4000)
  31. inc = 0x4000;
  32. else if (max)
  33. inc = max;
  34. else
  35. inc = 4;
  36. reserve(inc);
  37. }
  38. public:
  39. inline QueueOf() { ptrs = NULL; headp=0; tailp=0; max=0; num = 0; }
  40. inline ~QueueOf() { free(ptrs); }
  41. inline void clear() { free(ptrs); ptrs = NULL; headp=0; tailp=0; max=0; num = 0; }
  42. inline void ensure(unsigned minSize) { if (max < minSize) reserve(minSize-max); }
  43. void reserve(unsigned n)
  44. {
  45. max += n;
  46. ptrs = (BASE **)realloc(ptrs,max*sizeof(BASE *));
  47. assertex(!max || ptrs);
  48. if (headp>tailp) { // wrapped
  49. memmove(ptrs+headp+n, ptrs+headp, (num-tailp-1)*sizeof(BASE *));
  50. headp += n;
  51. }
  52. }
  53. inline BASE *head() const { return num?ptrs[headp]:NULL; }
  54. inline BASE *tail() const { return num?ptrs[tailp]:NULL; }
  55. inline BASE *item(unsigned idx) const {
  56. if (idx>=num)
  57. return NULL;
  58. idx+=headp;
  59. if (idx>=max)
  60. idx-=max;
  61. return ptrs[idx];
  62. }
  63. inline void enqueue(BASE *e)
  64. {
  65. if (ALLOWNULLS || e) {
  66. if (num==max)
  67. expand();
  68. if (num==0) {
  69. headp = 0;
  70. tailp = 0;
  71. }
  72. else {
  73. tailp++;
  74. if (tailp==max)
  75. tailp=0;
  76. }
  77. ptrs[tailp] = e;
  78. num++;
  79. }
  80. }
  81. void enqueueHead(BASE *e)
  82. {
  83. if (ALLOWNULLS || e) {
  84. if (num==max)
  85. expand();
  86. if (num==0) {
  87. headp = 0;
  88. tailp = 0;
  89. }
  90. else {
  91. if (headp==0)
  92. headp=max;
  93. headp--;
  94. }
  95. ptrs[headp] = e;
  96. num++;
  97. }
  98. }
  99. void enqueue(BASE *e,unsigned i)
  100. {
  101. if (ALLOWNULLS || e) {
  102. if (i==0)
  103. enqueueHead(e);
  104. else if (i>=num)
  105. enqueue(e);
  106. else {
  107. if (num==max)
  108. expand();
  109. i += headp;
  110. if (i>=max)
  111. i-=max;
  112. // do rotate slow way for simplicity
  113. tailp++;
  114. if (tailp==max)
  115. tailp = 0;
  116. unsigned p=tailp;
  117. do {
  118. unsigned n = (p==0)?max:p;
  119. n--;
  120. ptrs[p] = ptrs[n];
  121. p = n;
  122. } while (p!=i);
  123. ptrs[i] = e;
  124. num++;
  125. }
  126. }
  127. }
  128. void enqueue(BASE *e,priorityGreaterFunction pgf)
  129. {
  130. if (ALLOWNULLS || e) {
  131. unsigned p=tailp;
  132. unsigned i=num;
  133. while (i&&pgf(e,ptrs[p])) {
  134. i--;
  135. if (p==0)
  136. p=max;
  137. p--;
  138. }
  139. enqueue(e,i);
  140. }
  141. }
  142. inline BASE *dequeue()
  143. {
  144. if (!num)
  145. return NULL;
  146. BASE *ret = ptrs[headp];
  147. headp++;
  148. if (headp==max)
  149. headp = 0;
  150. num--;
  151. return ret;
  152. }
  153. BASE *dequeueTail()
  154. {
  155. if (!num)
  156. return NULL;
  157. BASE *ret = ptrs[tailp];
  158. if (tailp==0)
  159. tailp=max;
  160. tailp--;
  161. num--;
  162. return ret;
  163. }
  164. BASE *dequeue(unsigned i)
  165. {
  166. if (i==0)
  167. return dequeue();
  168. if (i>=num)
  169. return NULL;
  170. if (i+1==num)
  171. return dequeueTail();
  172. i += headp;
  173. if (i>=max)
  174. i-=max;
  175. BASE *ret = ptrs[i];
  176. // do rotate slow way for simplicity
  177. unsigned p=i;
  178. do {
  179. unsigned n = (p==0)?max:p;
  180. n--;
  181. ptrs[p] = ptrs[n];
  182. p = n;
  183. } while (p!=headp);
  184. headp++;
  185. if (headp==max)
  186. headp = 0;
  187. num--;
  188. return ret;
  189. }
  190. void set(unsigned idx, BASE *v)
  191. {
  192. assertex(idx<num);
  193. idx+=headp;
  194. if (idx>=max)
  195. idx-=max;
  196. ptrs[idx] = v;
  197. }
  198. BASE * query(unsigned idx)
  199. {
  200. assertex(idx<num);
  201. idx+=headp;
  202. if (idx>=max)
  203. idx-=max;
  204. return ptrs[idx];
  205. }
  206. unsigned find(BASE *e)
  207. { // simple linear search
  208. if (num!=0) {
  209. if (e==ptrs[tailp])
  210. return num-1;
  211. unsigned i=headp;
  212. for (;;) {
  213. if (ptrs[i]==e) {
  214. if (i<headp)
  215. i += max;
  216. return i-headp;
  217. }
  218. if (i==tailp)
  219. break;
  220. i++;
  221. if (i==max)
  222. i = 0;
  223. }
  224. }
  225. return (unsigned)-1;
  226. }
  227. BASE *dequeue(BASE *e)
  228. {
  229. return dequeue(find(e));
  230. }
  231. inline unsigned ordinality() const { return num; }
  232. };
  233. template <class BASE, bool ALLOWNULLS>
  234. class SafeQueueOf : private QueueOf<BASE, ALLOWNULLS>
  235. {
  236. typedef SafeQueueOf<BASE, ALLOWNULLS> SELF;
  237. protected:
  238. mutable CriticalSection crit;
  239. inline void unsafeenqueue(BASE *e) { QueueOf<BASE, ALLOWNULLS>::enqueue(e); }
  240. inline void unsafeenqueueHead(BASE *e) { QueueOf<BASE, ALLOWNULLS>::enqueue(e); }
  241. inline void unsafeenqueue(BASE *e,priorityGreaterFunction p) { QueueOf<BASE, ALLOWNULLS>::enqueue(e,p); }
  242. inline BASE *unsafedequeue() { return QueueOf<BASE, ALLOWNULLS>::dequeue(); }
  243. inline BASE *unsafedequeueTail() { return QueueOf<BASE, ALLOWNULLS>::dequeueTail(); }
  244. inline unsigned unsafeordinality() { return QueueOf<BASE, ALLOWNULLS>::ordinality(); }
  245. public:
  246. void clear() { CriticalBlock b(crit); QueueOf<BASE, ALLOWNULLS>::clear(); }
  247. BASE *head() const { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::head(); }
  248. BASE *tail() const { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::tail(); }
  249. BASE *item(unsigned idx) const { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::item(idx); }
  250. void enqueue(BASE *e) { CriticalBlock b(crit); QueueOf<BASE, ALLOWNULLS>::enqueue(e); }
  251. void enqueueHead(BASE *e) { CriticalBlock b(crit); QueueOf<BASE, ALLOWNULLS>::enqueueHead(e); }
  252. void enqueue(BASE *e,unsigned i) { CriticalBlock b(crit); QueueOf<BASE, ALLOWNULLS>::enqueue(e, i); }
  253. void enqueue(BASE *e,priorityGreaterFunction p) { CriticalBlock b(crit); QueueOf<BASE, ALLOWNULLS>::enqueue(e, p); }
  254. BASE *dequeue() { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::dequeue(); }
  255. BASE *dequeueTail() { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::dequeueTail(); }
  256. BASE *dequeue(unsigned i) { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::dequeue(i); }
  257. unsigned find(BASE *e) { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::find(e); }
  258. void dequeue(BASE *e) { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::dequeue(e); }
  259. inline unsigned ordinality() const { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::ordinality(); }
  260. void set(unsigned idx, BASE *e) { CriticalBlock b(crit); return QueueOf<BASE, ALLOWNULLS>::set(idx, e); }
  261. };
  262. template <class BASE, bool ALLOWNULLS>
  263. class SimpleInterThreadQueueOf : protected SafeQueueOf<BASE, ALLOWNULLS>
  264. {
  265. typedef SimpleInterThreadQueueOf<BASE, ALLOWNULLS> SELF;
  266. protected:
  267. Semaphore deqwaitsem;
  268. unsigned deqwaiting;
  269. Semaphore enqwaitsem;
  270. unsigned enqwaiting;
  271. bool stopped;
  272. unsigned limit;
  273. bool qwait(Semaphore &sem,unsigned &waiting,unsigned timeout,unsigned &start)
  274. {
  275. // in crit block
  276. unsigned remaining;
  277. if (timeout) {
  278. if (timeout==INFINITE)
  279. remaining = timeout;
  280. else
  281. {
  282. if (!start) {
  283. start = msTick();
  284. remaining = timeout;
  285. }
  286. else {
  287. unsigned elapsed=msTick()-start;
  288. if (elapsed>=timeout)
  289. return false;
  290. remaining = (timeout-elapsed);
  291. }
  292. }
  293. }
  294. else
  295. return false;
  296. waiting++;
  297. bool wr;
  298. {
  299. CriticalUnblock unblock(SELF::crit);
  300. wr = sem.wait(remaining);
  301. }
  302. if (!wr) {
  303. wr = sem.wait(0); // catch race
  304. if (!wr) {
  305. waiting--;
  306. return false;
  307. }
  308. }
  309. return true;
  310. }
  311. inline bool get(BASE *&ret,bool tail)
  312. {
  313. if (ALLOWNULLS) {
  314. if (SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality()) {
  315. ret = tail?SafeQueueOf<BASE, ALLOWNULLS>::unsafedequeueTail():SafeQueueOf<BASE, ALLOWNULLS>::unsafedequeue();
  316. return true;
  317. }
  318. return false;
  319. }
  320. ret = tail?SafeQueueOf<BASE, ALLOWNULLS>::unsafedequeueTail():SafeQueueOf<BASE, ALLOWNULLS>::unsafedequeue();
  321. return ret!=NULL;
  322. }
  323. public:
  324. SimpleInterThreadQueueOf<BASE, ALLOWNULLS>()
  325. {
  326. limit = 0; // no limit
  327. reset();
  328. }
  329. ~SimpleInterThreadQueueOf<BASE, ALLOWNULLS>()
  330. {
  331. stop();
  332. }
  333. void reset()
  334. {
  335. enqwaiting = 0;
  336. deqwaiting = 0;
  337. stopped = false;
  338. }
  339. bool enqueue(BASE *e,unsigned timeout=INFINITE)
  340. {
  341. CriticalBlock b(SELF::crit);
  342. if (limit) {
  343. unsigned start=0;
  344. while (limit<=SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality())
  345. if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start))
  346. return false;
  347. }
  348. SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueue(e);
  349. if (enqwaiting) {
  350. enqwaitsem.signal(enqwaiting);
  351. enqwaiting = 0;
  352. }
  353. return true;
  354. }
  355. bool enqueueHead(BASE *e,unsigned timeout=INFINITE)
  356. {
  357. CriticalBlock b(SELF::crit);
  358. if (limit) {
  359. unsigned start=0;
  360. while (limit<=SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality())
  361. if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start))
  362. return false;
  363. }
  364. SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueueHead(e);
  365. if (enqwaiting) {
  366. enqwaitsem.signal(enqwaiting);
  367. enqwaiting = 0;
  368. }
  369. return true;
  370. }
  371. bool enqueue(BASE *e,priorityGreaterFunction p,unsigned timeout=INFINITE)
  372. {
  373. CriticalBlock b(SELF::crit);
  374. if (limit) {
  375. unsigned start=0;
  376. while (limit<=SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality())
  377. if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start))
  378. return false;
  379. }
  380. SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueue(e,p);
  381. if (enqwaiting) {
  382. enqwaitsem.signal(enqwaiting);
  383. enqwaiting = 0;
  384. }
  385. return true;
  386. }
  387. BASE *dequeue(unsigned timeout=INFINITE)
  388. {
  389. CriticalBlock b(SELF::crit);
  390. unsigned start=0;
  391. while (!stopped) {
  392. BASE *ret;
  393. if (get(ret,false)) {
  394. if (deqwaiting) {
  395. deqwaitsem.signal(deqwaiting);
  396. deqwaiting = 0;
  397. }
  398. return ret;
  399. }
  400. if (!qwait(enqwaitsem,enqwaiting,timeout,start))
  401. break;
  402. }
  403. return NULL;
  404. }
  405. BASE *dequeueTail(unsigned timeout=INFINITE)
  406. {
  407. CriticalBlock b(SELF::crit);
  408. unsigned start=0;
  409. while (!stopped) {
  410. BASE *ret;
  411. if (get(ret,true)) {
  412. if (deqwaiting) {
  413. deqwaitsem.signal(deqwaiting);
  414. deqwaiting = 0;
  415. }
  416. return ret;
  417. }
  418. if (!qwait(enqwaitsem,enqwaiting,timeout,start))
  419. break;
  420. }
  421. return NULL;
  422. }
  423. BASE *dequeueNow()
  424. {
  425. CriticalBlock b(SELF::crit);
  426. BASE * ret=NULL;
  427. if(get(ret,false)) {
  428. if(deqwaiting)
  429. {
  430. deqwaitsem.signal(deqwaiting);
  431. deqwaiting = 0;
  432. }
  433. }
  434. return ret;
  435. }
  436. bool waitMaxOrdinality(unsigned max,unsigned timeout)
  437. {
  438. CriticalBlock b(SELF::crit);
  439. unsigned start=0;
  440. while (!stopped) {
  441. if (SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality()<=max)
  442. return true;
  443. if (!qwait(deqwaitsem,deqwaiting,timeout,start))
  444. break;
  445. }
  446. return false;
  447. }
  448. inline unsigned ordinality() const { return SafeQueueOf<BASE, ALLOWNULLS>::ordinality(); }
  449. unsigned setLimit(unsigned num) { CriticalBlock b(SELF::crit); unsigned ret=limit; limit = num; return ret; }
  450. void stop() // stops all waiting operations
  451. {
  452. CriticalBlock b(SELF::crit);
  453. do {
  454. stopped = true;
  455. if (enqwaiting) {
  456. enqwaitsem.signal(enqwaiting);
  457. enqwaiting = 0;
  458. }
  459. if (deqwaiting) {
  460. deqwaitsem.signal(deqwaiting);
  461. deqwaiting = 0;
  462. }
  463. {
  464. CriticalUnblock ub(SELF::crit);
  465. Sleep(10); // bit of a kludge
  466. }
  467. } while (enqwaiting||deqwaiting);
  468. }
  469. };
  470. template <class BASE, class OWNER, bool ALLOWNULLS>
  471. class CallbackInterThreadQueueOf : public SimpleInterThreadQueueOf<BASE, ALLOWNULLS>
  472. {
  473. typedef CallbackInterThreadQueueOf<BASE, OWNER, ALLOWNULLS> SELF;
  474. public:
  475. BASE * dequeueAndNotify(OWNER * owner, unsigned timeout=INFINITE)
  476. {
  477. CriticalBlock b(SELF::crit);
  478. unsigned start=0;
  479. while (!SELF::stopped) {
  480. BASE *ret;
  481. if (this->get(ret,false)) {
  482. owner->notify(ret);
  483. if (SELF::deqwaiting) {
  484. SELF::deqwaitsem.signal(SELF::deqwaiting);
  485. SELF::deqwaiting = 0;
  486. }
  487. return ret;
  488. }
  489. if (!this->qwait(SELF::enqwaitsem,SELF::enqwaiting,timeout,start))
  490. break;
  491. }
  492. return NULL;
  493. }
  494. BASE *dequeueNowAndNotify(OWNER * owner)
  495. {
  496. CriticalBlock b(SELF::crit);
  497. BASE *ret=NULL;
  498. if (this->get(ret,false)) {
  499. owner->notify(ret);
  500. if(SELF::deqwaiting)
  501. {
  502. SELF::deqwaitsem.signal(SELF::deqwaiting);
  503. SELF::deqwaiting = 0;
  504. }
  505. }
  506. return ret;
  507. }
  508. };
  509. #define ForEachQueueItemIn(x,y) unsigned numItems##x = (y).ordinality(); \
  510. for (unsigned x = 0; x< numItems##x; ++x)
  511. #define ForEachQueueItemInRev(x,y) unsigned x = (y).ordinality(); \
  512. while (x--)
  513. #endif