jqueue.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __JQUEUE__
  14. #define __JQUEUE__
  15. #include "jlib.hpp"
  16. #include <atomic>
  17. #include <utility>
  18. #include "jatomic.hpp"
  19. // A generalised queue interface.
  20. template <typename ELEMENT>
  21. interface IQueueOf : extends IInterface
  22. {
  23. public:
  24. virtual bool enqueue(const ELEMENT item) = 0;
  25. virtual bool dequeue(ELEMENT & result) = 0;
  26. virtual bool tryDequeue(ELEMENT & result) = 0;
  27. virtual void noteWriterStopped() = 0;
  28. virtual void abort() = 0;
  29. virtual void reset() = 0;
  30. };
  31. typedef IQueueOf<const void *> IRowQueue;
  32. extern jlib_decl IRowQueue * createRowQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots);
  33. /*
  34. * The ReaderWriterQueue is a bounded inter-thread queue that aims to be lock free when adding an item to the queue
  35. * if there is space, and removing items from a non empty queue.
  36. * Normally lock free implementations may be unbounded - which can cause memory to be exhausted if the producers
  37. * are faster than the consumer. They also tend to spin until they can proceed - but this performs poorly if the
  38. * consumer and producer are not perfectly balanced. The high level of contention can also cause cache issues.
  39. *
  40. * This implementations will wait on a semaphore if there is no room, or if the queue is empty.
  41. * It uses a single state field which combines information about the queue state and the number of waiting readers/writers.
  42. *
  43. * The queue also has support for
  44. * aborting - causing all consumers and producers to fail to enqueue/dequeue
  45. * noting writers have completed. This is particularly useful consumers of a M:1 queue knowing there will be no more items.
  46. */
  47. //NOTE: maxSlotBits * 2 + writerBits * 2 + readerBits <= sizeof(ELEMENT)*8)
  48. template <typename ELEMENT, typename state_t, unsigned readerBits, unsigned writerBits, unsigned maxSlotBits, unsigned fixedSlotBits=0>
  49. class ReaderWriterQueue
  50. {
  51. //This uses one spare slot in the array to ensure the count field cannot overflow.
  52. //state has [dequeue-pos][reader-count][writer-count][num-items] in a single field.
  53. //num-items is the least significant field (so does not need to be shifted)
  54. //dequeue-pos it the most significant field, so there is no need to worry about wrapping into other fields.
  55. //dequeue-pos must have enough extra bits to disambiguate N writers all waiting to write to the same slot
  56. class BufferElement
  57. {
  58. public:
  59. ELEMENT value;
  60. std::atomic<unsigned> sequence;
  61. };
  62. // max readers = 2^readerBits-1
  63. // max writers = 2^writerBits-1
  64. // max queuesize = 2^(sequenceBits-1)-1
  65. //Derived constants
  66. const static unsigned extraSequenceBits = writerBits; // Possibly this could be reduced to min(readerBits, writerBits)
  67. const static unsigned stateBits = (unsigned)(sizeof(state_t)*8);
  68. const static unsigned padBits = stateBits - (extraSequenceBits + writerBits + 2 * maxSlotBits + readerBits);
  69. const static unsigned countBits = maxSlotBits + padBits; // ensure the sequence wraps as expected in the top bits.
  70. const static unsigned sequenceBits = extraSequenceBits+maxSlotBits;
  71. const static unsigned countShift = 0;
  72. const static unsigned writerShift = countBits;
  73. const static unsigned readerShift = countBits + writerBits;
  74. const static unsigned dequeueShift = countBits + writerBits + readerBits;
  75. const static state_t readerMask = ((state_t)1 << dequeueShift) - ((state_t)1 << readerShift);
  76. const static state_t writerMask = ((state_t)1 << readerShift) - ((state_t)1 << writerShift);
  77. const static state_t sequenceMask = ((state_t)1 << sequenceBits) - 1;
  78. const static state_t countMask = ((state_t)1 << countBits) - 1;;
  79. const static state_t dequeueMask = (sequenceMask << dequeueShift);
  80. const static unsigned maxSlots = (1U << maxSlotBits) - 1;
  81. const static unsigned initialSpinsBeforeWait = 2000;
  82. const static unsigned slotUnavailableSpins = 50; // If not available for a short time then the thread must have been rescheduled
  83. const static state_t fixedSlotMask = (1U << fixedSlotBits) - 1;
  84. public:
  85. ReaderWriterQueue(unsigned _maxWriters, unsigned _maxItems) : maxItems(_maxItems), maxWriters(_maxWriters)
  86. {
  87. //printf("element(%u) pad(%u) write(%u), read(%u) slot(%u) count(%u) max(%u)\n", stateBits, padBits, writerBits, readerBits, maxSlotBits, countBits, maxItems);
  88. //Check all the bits are used, and none of the bits overlap.
  89. assertex(padBits < stateBits);
  90. assertex(countBits >= maxSlotBits);
  91. assertex(sequenceBits + readerBits + writerBits + countBits == stateBits);
  92. assertex((readerMask | writerMask | countMask | dequeueMask) == (state_t)-1);
  93. assertex((readerMask | writerMask | countMask | dequeueMask) == (readerMask ^ writerMask ^ countMask ^ dequeueMask));
  94. // Reserve at least one free entry to ensure the count bitfield does not overflow.
  95. const unsigned minSpace = 1;
  96. assertex(maxItems != 0);
  97. assertex(maxItems + minSpace <= maxSlots);
  98. unsigned numSlots;
  99. if (fixedSlotBits == 0)
  100. {
  101. //Ensure the array is a power of two, so the sequence can be mapped to an item using an AND
  102. numSlots = 1;
  103. while (numSlots < maxItems + minSpace)
  104. numSlots += numSlots;
  105. dynamicSlotMask = numSlots - 1;
  106. }
  107. else
  108. {
  109. numSlots = fixedSlotMask + 1;
  110. dynamicSlotMask = fixedSlotMask;
  111. }
  112. activeWriters.store(maxWriters, std::memory_order_relaxed);
  113. aborted.store(false, std::memory_order_relaxed);
  114. state.store(0, std::memory_order_relaxed);
  115. values = new BufferElement[numSlots];
  116. for (unsigned i=0; i < numSlots; i++)
  117. values[i].sequence.store(i, std::memory_order_relaxed);
  118. }
  119. ~ReaderWriterQueue()
  120. {
  121. delete [] values;
  122. }
  123. //Should possibly have the following functions instead for correct C++11 integration...
  124. //void enqueue(const ELEMENT & value);
  125. //void enqueue(ELEMENT && value);
  126. bool enqueue(const ELEMENT value)
  127. {
  128. if (aborted.load(std::memory_order_relaxed))
  129. return false;
  130. dbgassertex(!allWritersStopped());
  131. //Note, compare_exchange_weak updates curState when it fails, so don't read inside the main loop
  132. unsigned numSpins = initialSpinsBeforeWait;
  133. state_t curState = state.load(std::memory_order_acquire);
  134. loop
  135. {
  136. unsigned curCount = (curState & countMask);
  137. if (curCount == maxItems)
  138. {
  139. if (--numSpins != 0) // likely
  140. {
  141. curState = state.load(std::memory_order_acquire);
  142. continue;
  143. }
  144. numSpins = initialSpinsBeforeWait;
  145. //The list is currently full, increment the number of writers waiting.
  146. //This can never overflow...
  147. const state_t nextState = curState + ((state_t)1 << writerShift);
  148. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  149. {
  150. if (aborted.load(std::memory_order_acquire))
  151. return false;
  152. writers.wait();
  153. if (aborted.load(std::memory_order_acquire))
  154. return false;
  155. curState = state.load(std::memory_order_acquire);
  156. }
  157. }
  158. else
  159. {
  160. //Increment the number of items (which can never overflow), and possibly decrease readers
  161. state_t nextState = (curState + 1);
  162. //If a reader is waiting then decrement the count, and signal later..
  163. //Note, this test is a constant folded
  164. if (readerBits == 1)
  165. {
  166. //More efficient to perform an unconditional mask
  167. nextState &= ~readerMask;
  168. }
  169. else
  170. {
  171. if ((curState & readerMask) != 0)
  172. nextState -= (1 << readerShift);
  173. }
  174. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  175. {
  176. unsigned slotMask = (fixedSlotBits ? fixedSlotMask : dynamicSlotMask);
  177. unsigned curDequeueSeq = (curState >> dequeueShift); // No need to mask since the top field
  178. unsigned curEnqueueSeq = (curDequeueSeq + curCount) & sequenceMask;
  179. unsigned filledSeq = (curEnqueueSeq + 1) & sequenceMask;
  180. unsigned curEnqueueSlot = curEnqueueSeq & slotMask;
  181. BufferElement & cur = values[curEnqueueSlot];
  182. //MORE: Another producer has been interrupted while writing to the same slot
  183. //or the consumer has not yet read from the slot.
  184. //spin until that has been consumed.
  185. unsigned spins = 0;
  186. while (cur.sequence.load(std::memory_order_acquire) != curEnqueueSeq)
  187. {
  188. if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
  189. {
  190. ThreadYield();
  191. spins = 0;
  192. }
  193. else
  194. spinPause();
  195. }
  196. //enqueue takes ownership of the object -> use std::move
  197. cur.value = std::move(value);
  198. cur.sequence.store(filledSeq, std::memory_order_release);
  199. if ((curState & readerMask) != 0)
  200. readers.signal();
  201. return true;
  202. }
  203. }
  204. }
  205. }
  206. bool dequeue(ELEMENT & result, bool returnIfEmpty)
  207. {
  208. if (aborted.load(std::memory_order_relaxed))
  209. return false;
  210. unsigned numSpins = initialSpinsBeforeWait;
  211. //Note, compare_exchange_weak updates curState when it fails, so don't read inside the main loop
  212. state_t curState = state.load(std::memory_order_acquire);
  213. loop
  214. {
  215. unsigned curCount = (curState & countMask);
  216. //Check if the queue is empty
  217. if (curCount == 0)
  218. {
  219. if (returnIfEmpty)
  220. return false;
  221. //If all writers have finished then no more items will be enqueued
  222. if (allWritersStopped())
  223. {
  224. curState = state.load(std::memory_order_acquire);
  225. //Check that nothing has been added since the previous load. (Very small window.)
  226. if ((curState & countMask) == 0)
  227. return false;
  228. continue;
  229. }
  230. //We must check numSpins before we try and increment the number of readers
  231. if (--numSpins != 0)
  232. {
  233. curState = state.load(std::memory_order_acquire);
  234. continue;
  235. }
  236. numSpins = initialSpinsBeforeWait;
  237. //The list is currently empty, increment the number of readers waiting.
  238. //This can never overflow...
  239. state_t nextState = curState + (1 << readerShift);
  240. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  241. {
  242. if (aborted.load(std::memory_order_acquire))
  243. return false;
  244. //If no longer any active writers it may have happened before the cas, so the semaphore may not
  245. //have been signalled. Either new items have been added, or the loop will terminate - so loop again.
  246. if (!allWritersStopped())
  247. {
  248. readers.wait();
  249. if (aborted.load(std::memory_order_acquire))
  250. return false;
  251. }
  252. curState = state.load(std::memory_order_acquire);
  253. }
  254. }
  255. else
  256. {
  257. //Increase the dequeue position (which will harmlessly wrap), and decrement the count.
  258. state_t nextState = (curState + ((state_t)1 << dequeueShift) - 1);
  259. //If a reader is waiting then decrement the count, and signal later..
  260. if (writerBits == 1)
  261. {
  262. //More efficient to perform an unconditional mask
  263. //NOTE: check assembler to ensure it is merged with previous mask above
  264. nextState &= ~writerMask;
  265. }
  266. else
  267. {
  268. if ((curState & writerMask) != 0)
  269. nextState -= (1 << writerShift);
  270. }
  271. if (state.compare_exchange_weak(curState, nextState, std::memory_order_relaxed))
  272. {
  273. unsigned curDequeueSeq = (curState >> dequeueShift); // No need to mask since the top field
  274. unsigned slotMask = (fixedSlotBits ? fixedSlotMask : dynamicSlotMask);
  275. unsigned expectedSeq = (curDequeueSeq + 1) & sequenceMask;
  276. unsigned curDequeueSlot = (curDequeueSeq & slotMask);
  277. BufferElement & cur = values[curDequeueSlot];
  278. unsigned spins = 0;
  279. loop
  280. {
  281. unsigned sequence = cur.sequence.load(std::memory_order_acquire);
  282. if (sequence == expectedSeq)
  283. break;
  284. //possibly yield every n iterations?
  285. if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
  286. {
  287. ThreadYield();
  288. spins = 0;
  289. }
  290. else
  291. spinPause();
  292. }
  293. result = std::move(cur.value);
  294. const unsigned numSlots = slotMask + 1;
  295. unsigned nextSeq = (curDequeueSeq + numSlots) & sequenceMask;
  296. cur.sequence.store(nextSeq, std::memory_order_release);
  297. if ((curState & writerMask) != 0)
  298. writers.signal();
  299. return true;
  300. }
  301. }
  302. }
  303. }
  304. virtual void reset()
  305. {
  306. activeWriters.store(maxWriters, std::memory_order_relaxed);
  307. aborted.store(false, std::memory_order_relaxed);
  308. readers.reinit(0);
  309. writers.reinit(0);
  310. }
  311. virtual void noteWriterStopped()
  312. {
  313. //MORE: If this reduces activeProducers to 0 then it may need to wake up any waiting threads.
  314. if (--activeWriters <= 0)
  315. {
  316. state_t curState = state.load(std::memory_order_acquire);
  317. unsigned readersWaiting = (curState & readerMask) >> readerShift;
  318. readers.signal(readersWaiting);
  319. }
  320. }
  321. virtual void abort()
  322. {
  323. //readers and writers may enqueue/dequeue another row before this takes effect
  324. aborted.store(true, std::memory_order_release);
  325. state_t curState = state.load(std::memory_order_acquire);
  326. unsigned readersWaiting = (curState & readerMask) >> readerShift;
  327. unsigned writersWaiting = (curState & writerMask) >> writerShift;
  328. readers.signal(readersWaiting);
  329. writers.signal(writersWaiting);
  330. }
  331. inline bool allWritersStopped() const { return activeWriters.load(std::memory_order_acquire) <= 0; }
  332. protected:
  333. BufferElement * values;
  334. unsigned dynamicSlotMask;
  335. unsigned maxItems;
  336. unsigned maxWriters;
  337. std::atomic<int> activeWriters;
  338. std::atomic<bool> aborted;
  339. char pad1[64];
  340. Semaphore readers;
  341. char pad2[64-sizeof(Semaphore)];
  342. Semaphore writers;
  343. char pad3[64-sizeof(Semaphore)];
  344. //Ensure the state is not on the same cache line as anything else, especially anything that is modified.
  345. std::atomic<state_t> state;
  346. };
  347. #endif