jqueue.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __JQUEUE__
  14. #define __JQUEUE__
  15. #include "jlib.hpp"
  16. #include <atomic>
  17. #include <utility>
  18. #include "jatomic.hpp"
  19. // A generalised queue interface.
  20. template <typename ELEMENT>
  21. interface IQueueOf : extends IInterface
  22. {
  23. public:
  24. virtual bool enqueue(const ELEMENT item) = 0;
  25. virtual bool dequeue(ELEMENT & result) = 0;
  26. virtual bool tryDequeue(ELEMENT & result) = 0;
  27. virtual void noteReaderStopped() = 0;
  28. virtual void noteWriterStopped() = 0;
  29. virtual void abort() = 0;
  30. virtual void reset() = 0;
  31. };
  32. typedef IQueueOf<const void *> IRowQueue;
  33. extern jlib_decl IRowQueue * createRowQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots);
  34. /*
  35. * The ReaderWriterQueue is a bounded inter-thread queue that aims to be lock free when adding an item to the queue
  36. * if there is space, and removing items from a non empty queue.
  37. * Normally lock free implementations may be unbounded - which can cause memory to be exhausted if the producers
  38. * are faster than the consumer. They also tend to spin until they can proceed - but this performs poorly if the
  39. * consumer and producer are not perfectly balanced. The high level of contention can also cause cache issues.
  40. *
  41. * This implementations will wait on a semaphore if there is no room, or if the queue is empty.
  42. * It uses a single state field which combines information about the queue state and the number of waiting readers/writers.
  43. *
  44. * The queue also has support for
  45. * aborting - causing all consumers and producers to fail to enqueue/dequeue
  46. * noting writers have completed. This is particularly useful consumers of a M:1 queue knowing there will be no more items.
  47. */
  48. //NOTE: maxSlotBits * 2 + writerBits * 2 + readerBits <= sizeof(ELEMENT)*8)
  49. template <typename ELEMENT, typename state_t, unsigned readerBits, unsigned writerBits, unsigned maxSlotBits, unsigned fixedSlotBits=0>
  50. class ReaderWriterQueue
  51. {
  52. //This uses one spare slot in the array to ensure the count field cannot overflow.
  53. //state has [dequeue-pos][reader-count][writer-count][num-items] in a single field.
  54. //num-items is the least significant field (so does not need to be shifted)
  55. //dequeue-pos it the most significant field, so there is no need to worry about wrapping into other fields.
  56. //dequeue-pos must have enough extra bits to disambiguate N writers all waiting to write to the same slot
  57. class BufferElement
  58. {
  59. public:
  60. ELEMENT value;
  61. std::atomic<unsigned> sequence;
  62. };
  63. // max readers = 2^readerBits-1
  64. // max writers = 2^writerBits-1
  65. // max queuesize = 2^(sequenceBits-1)-1
  66. //Derived constants
  67. const static unsigned extraSequenceBits = writerBits; // Possibly this could be reduced to min(readerBits, writerBits)
  68. const static unsigned stateBits = (unsigned)(sizeof(state_t)*8);
  69. const static unsigned padBits = stateBits - (extraSequenceBits + writerBits + 2 * maxSlotBits + readerBits);
  70. const static unsigned countBits = maxSlotBits + padBits; // ensure the sequence wraps as expected in the top bits.
  71. const static unsigned sequenceBits = extraSequenceBits+maxSlotBits;
  72. const static unsigned countShift = 0;
  73. const static unsigned writerShift = countBits;
  74. const static unsigned readerShift = countBits + writerBits;
  75. const static unsigned dequeueShift = countBits + writerBits + readerBits;
  76. const static state_t readerMask = ((state_t)1 << dequeueShift) - ((state_t)1 << readerShift);
  77. const static state_t writerMask = ((state_t)1 << readerShift) - ((state_t)1 << writerShift);
  78. const static state_t sequenceMask = ((state_t)1 << sequenceBits) - 1;
  79. const static state_t countMask = ((state_t)1 << countBits) - 1;;
  80. const static state_t dequeueMask = (sequenceMask << dequeueShift);
  81. const static unsigned maxSlots = (1U << maxSlotBits) - 1;
  82. const static unsigned initialSpinsBeforeWait = 2000;
  83. const static unsigned slotUnavailableSpins = 50; // If not available for a short time then the thread must have been rescheduled
  84. const static state_t fixedSlotMask = (1U << fixedSlotBits) - 1;
  85. public:
  86. ReaderWriterQueue(unsigned _maxWriters, unsigned _maxReaders, unsigned _maxItems) : maxItems(_maxItems), maxReaders(_maxReaders), maxWriters(_maxWriters)
  87. {
  88. //printf("element(%u) pad(%u) write(%u), read(%u) slot(%u) count(%u) max(%u)\n", stateBits, padBits, writerBits, readerBits, maxSlotBits, countBits, maxItems);
  89. //Check all the bits are used, and none of the bits overlap.
  90. assertex(padBits < stateBits);
  91. assertex(countBits >= maxSlotBits);
  92. assertex(sequenceBits + readerBits + writerBits + countBits == stateBits);
  93. assertex((readerMask | writerMask | countMask | dequeueMask) == (state_t)-1);
  94. assertex((readerMask | writerMask | countMask | dequeueMask) == (readerMask ^ writerMask ^ countMask ^ dequeueMask));
  95. // Reserve at least one free entry to ensure the count bitfield does not overflow.
  96. const unsigned minSpace = 1;
  97. assertex(maxItems != 0);
  98. assertex(maxItems + minSpace <= maxSlots);
  99. unsigned numSlots;
  100. if (fixedSlotBits == 0)
  101. {
  102. //Ensure the array is a power of two, so the sequence can be mapped to an item using an AND
  103. numSlots = 1;
  104. while (numSlots < maxItems + minSpace)
  105. numSlots += numSlots;
  106. dynamicSlotMask = numSlots - 1;
  107. }
  108. else
  109. {
  110. numSlots = fixedSlotMask + 1;
  111. dynamicSlotMask = fixedSlotMask;
  112. }
  113. activeReaders.store(maxReaders, std::memory_order_relaxed);
  114. activeWriters.store(maxWriters, std::memory_order_relaxed);
  115. aborted.store(false, std::memory_order_relaxed);
  116. state.store(0, std::memory_order_relaxed);
  117. values = new BufferElement[numSlots];
  118. for (unsigned i=0; i < numSlots; i++)
  119. values[i].sequence.store(i, std::memory_order_relaxed);
  120. }
  121. ~ReaderWriterQueue()
  122. {
  123. delete [] values;
  124. }
  125. //Should possibly have the following functions instead for correct C++11 integration...
  126. //void enqueue(const ELEMENT & value);
  127. //void enqueue(ELEMENT && value);
  128. bool enqueue(const ELEMENT value)
  129. {
  130. if (aborted.load(std::memory_order_relaxed))
  131. return false;
  132. dbgassertex(!allWritersStopped());
  133. //Note, compare_exchange_weak updates curState when it fails, so don't read inside the main loop
  134. unsigned numSpins = initialSpinsBeforeWait;
  135. state_t curState = state.load(std::memory_order_acquire);
  136. loop
  137. {
  138. unsigned curCount = (curState & countMask);
  139. if (curCount == maxItems)
  140. {
  141. if (allReadersStopped())
  142. return false;
  143. if (--numSpins != 0) // likely
  144. {
  145. curState = state.load(std::memory_order_acquire);
  146. continue;
  147. }
  148. numSpins = initialSpinsBeforeWait;
  149. //The list is currently full, increment the number of writers waiting.
  150. //This can never overflow...
  151. const state_t nextState = curState + ((state_t)1 << writerShift);
  152. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  153. {
  154. if (aborted.load(std::memory_order_acquire))
  155. return false;
  156. if (allReadersStopped())
  157. return false;
  158. writers.wait();
  159. if (aborted.load(std::memory_order_acquire))
  160. return false;
  161. curState = state.load(std::memory_order_acquire);
  162. }
  163. }
  164. else
  165. {
  166. //Increment the number of items (which can never overflow), and possibly decrease readers
  167. state_t nextState = (curState + 1);
  168. //If a reader is waiting then decrement the count, and signal later..
  169. //Note, this test is a constant folded
  170. if (readerBits == 1)
  171. {
  172. //More efficient to perform an unconditional mask
  173. nextState &= ~readerMask;
  174. }
  175. else
  176. {
  177. if ((curState & readerMask) != 0)
  178. nextState -= (1 << readerShift);
  179. }
  180. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  181. {
  182. unsigned slotMask = (fixedSlotBits ? fixedSlotMask : dynamicSlotMask);
  183. unsigned curDequeueSeq = (curState >> dequeueShift); // No need to mask since the top field
  184. unsigned curEnqueueSeq = (curDequeueSeq + curCount) & sequenceMask;
  185. unsigned filledSeq = (curEnqueueSeq + 1) & sequenceMask;
  186. unsigned curEnqueueSlot = curEnqueueSeq & slotMask;
  187. BufferElement & cur = values[curEnqueueSlot];
  188. //MORE: Another producer has been interrupted while writing to the same slot
  189. //or the consumer has not yet read from the slot.
  190. //spin until that has been consumed.
  191. unsigned spins = 0;
  192. while (cur.sequence.load(std::memory_order_acquire) != curEnqueueSeq)
  193. {
  194. if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
  195. {
  196. ThreadYield();
  197. spins = 0;
  198. }
  199. else
  200. spinPause();
  201. }
  202. //enqueue takes ownership of the object -> use std::move
  203. cur.value = std::move(value);
  204. cur.sequence.store(filledSeq, std::memory_order_release);
  205. if ((curState & readerMask) != 0)
  206. readers.signal();
  207. return true;
  208. }
  209. }
  210. }
  211. }
  212. bool dequeue(ELEMENT & result, bool returnIfEmpty)
  213. {
  214. if (aborted.load(std::memory_order_relaxed))
  215. return false;
  216. dbgassertex(!allReadersStopped());
  217. unsigned numSpins = initialSpinsBeforeWait;
  218. //Note, compare_exchange_weak updates curState when it fails, so don't read inside the main loop
  219. state_t curState = state.load(std::memory_order_acquire);
  220. loop
  221. {
  222. unsigned curCount = (curState & countMask);
  223. //Check if the queue is empty
  224. if (curCount == 0)
  225. {
  226. if (returnIfEmpty)
  227. return false;
  228. //If all writers have finished then no more items will be enqueued
  229. if (allWritersStopped())
  230. {
  231. curState = state.load(std::memory_order_acquire);
  232. //Check that nothing has been added since the previous load. (Very small window.)
  233. if ((curState & countMask) == 0)
  234. return false;
  235. continue;
  236. }
  237. //We must check numSpins before we try and increment the number of readers
  238. if (--numSpins != 0)
  239. {
  240. curState = state.load(std::memory_order_acquire);
  241. continue;
  242. }
  243. numSpins = initialSpinsBeforeWait;
  244. //The list is currently empty, increment the number of readers waiting.
  245. //This can never overflow...
  246. state_t nextState = curState + (1 << readerShift);
  247. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  248. {
  249. if (aborted.load(std::memory_order_acquire))
  250. return false;
  251. //If no longer any active writers it may have happened before the cas, so the semaphore may not
  252. //have been signalled. Either new items have been added, or the loop will terminate - so loop again.
  253. if (!allWritersStopped())
  254. {
  255. readers.wait();
  256. if (aborted.load(std::memory_order_acquire))
  257. return false;
  258. }
  259. curState = state.load(std::memory_order_acquire);
  260. }
  261. }
  262. else
  263. {
  264. //Increase the dequeue position (which will harmlessly wrap), and decrement the count.
  265. state_t nextState = (curState + ((state_t)1 << dequeueShift) - 1);
  266. //If a reader is waiting then decrement the count, and signal later..
  267. if (writerBits == 1)
  268. {
  269. //More efficient to perform an unconditional mask
  270. //NOTE: check assembler to ensure it is merged with previous mask above
  271. nextState &= ~writerMask;
  272. }
  273. else
  274. {
  275. if ((curState & writerMask) != 0)
  276. nextState -= (1 << writerShift);
  277. }
  278. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  279. {
  280. unsigned curDequeueSeq = (curState >> dequeueShift); // No need to mask since the top field
  281. unsigned slotMask = (fixedSlotBits ? fixedSlotMask : dynamicSlotMask);
  282. unsigned expectedSeq = (curDequeueSeq + 1) & sequenceMask;
  283. unsigned curDequeueSlot = (curDequeueSeq & slotMask);
  284. BufferElement & cur = values[curDequeueSlot];
  285. unsigned spins = 0;
  286. loop
  287. {
  288. unsigned sequence = cur.sequence.load(std::memory_order_acquire);
  289. if (sequence == expectedSeq)
  290. break;
  291. //possibly yield every n iterations?
  292. if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
  293. {
  294. ThreadYield();
  295. spins = 0;
  296. }
  297. else
  298. spinPause();
  299. }
  300. result = std::move(cur.value);
  301. const unsigned numSlots = slotMask + 1;
  302. unsigned nextSeq = (curDequeueSeq + numSlots) & sequenceMask;
  303. cur.sequence.store(nextSeq, std::memory_order_release);
  304. if ((curState & writerMask) != 0)
  305. writers.signal();
  306. return true;
  307. }
  308. }
  309. }
  310. }
  311. virtual void reset()
  312. {
  313. activeReaders.store(maxReaders, std::memory_order_relaxed);
  314. activeWriters.store(maxWriters, std::memory_order_relaxed);
  315. aborted.store(false, std::memory_order_relaxed);
  316. readers.reinit(0);
  317. writers.reinit(0);
  318. }
  319. virtual void noteReaderStopped()
  320. {
  321. //MORE: If this reduces activeProducers to 0 then it may need to wake up any waiting threads.
  322. if (--activeReaders <= 0)
  323. {
  324. state_t curState = state.load(std::memory_order_acquire);
  325. unsigned writersWaiting = (unsigned)((curState & writerMask) >> writerShift);
  326. writers.signal(writersWaiting);
  327. }
  328. }
  329. virtual void noteWriterStopped()
  330. {
  331. //MORE: If this reduces activeProducers to 0 then it may need to wake up any waiting threads.
  332. if (--activeWriters <= 0)
  333. {
  334. state_t curState = state.load(std::memory_order_acquire);
  335. unsigned readersWaiting = (unsigned)((curState & readerMask) >> readerShift);
  336. readers.signal(readersWaiting);
  337. }
  338. }
  339. virtual void abort()
  340. {
  341. //readers and writers may enqueue/dequeue another row before this takes effect
  342. aborted.store(true, std::memory_order_release);
  343. state_t curState = state.load(std::memory_order_acquire);
  344. unsigned readersWaiting = (unsigned)((curState & readerMask) >> readerShift);
  345. unsigned writersWaiting = (unsigned)((curState & writerMask) >> writerShift);
  346. readers.signal(readersWaiting);
  347. writers.signal(writersWaiting);
  348. }
  349. inline bool allWritersStopped() const { return activeWriters.load(std::memory_order_acquire) <= 0; }
  350. inline bool allReadersStopped() const { return activeReaders.load(std::memory_order_acquire) <= 0; }
  351. protected:
  352. BufferElement * values;
  353. unsigned dynamicSlotMask;
  354. unsigned maxItems;
  355. unsigned maxReaders;
  356. unsigned maxWriters;
  357. std::atomic<int> activeReaders;
  358. std::atomic<int> activeWriters;
  359. std::atomic<bool> aborted;
  360. Semaphore readers __attribute__((aligned(CACHE_LINE_SIZE)));
  361. Semaphore writers __attribute__((aligned(CACHE_LINE_SIZE)));
  362. //Ensure the state is not on the same cache line as anything else, especially anything that is modified.
  363. std::atomic<state_t> state __attribute__((aligned(CACHE_LINE_SIZE)));
  364. };
  365. #endif