jqueue.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmutex.hpp"
  15. #include "jqueue.hpp"
  16. //---------------------------------------------------------------------------------------------------------------------
  17. template <typename state_t, unsigned readerBits, unsigned writerBits, unsigned maxSlotBits, unsigned slotBits>
  18. class CRowQueue : implements CInterfaceOf<IRowQueue>
  19. {
  20. public:
  21. CRowQueue(unsigned _maxItems, unsigned _numProducers, unsigned _numConsumers) : queue(_numProducers, _numConsumers, _maxItems), numConsumers(_numConsumers), numProducers(_numProducers)
  22. {
  23. }
  24. virtual bool enqueue(const void * const item)
  25. {
  26. return queue.enqueue(item);
  27. }
  28. virtual bool dequeue(const void * & result)
  29. {
  30. return queue.dequeue(result, false);
  31. }
  32. virtual bool tryDequeue(const void * & result)
  33. {
  34. return queue.dequeue(result, true);
  35. }
  36. virtual void reset()
  37. {
  38. //This resets the state, but does not clear the elements, they must be dequeued and released.
  39. queue.reset();
  40. //How clean up the queue and ensure the elements are disposed of?
  41. }
  42. virtual void noteReaderStopped()
  43. {
  44. queue.noteReaderStopped();
  45. }
  46. virtual void noteWriterStopped()
  47. {
  48. queue.noteWriterStopped();
  49. }
  50. virtual void abort()
  51. {
  52. queue.abort();
  53. }
  54. private:
  55. ReaderWriterQueue<const void *, state_t, readerBits, writerBits, maxSlotBits, slotBits> queue;
  56. const unsigned numConsumers;
  57. const unsigned numProducers;
  58. };
  59. IRowQueue * createRowQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots)
  60. {
  61. //Ideally if the numberOfReaders or writers is 1 then ideally, supply 1 for the relevant values.
  62. //
  63. assertex(maxSlots == 0 || maxItems < maxSlots);
  64. if ((numReaders == 1) && (numWriters == 1) && (maxItems < 256))
  65. return new CRowQueue<unsigned, 1, 1, 8, 8>(maxItems, numWriters, numReaders);
  66. if ((numReaders == 1) && (numWriters == 1) && (maxItems < 0x4000))
  67. return new CRowQueue<unsigned, 1, 1, 14, 0>(maxItems, numWriters, numReaders);
  68. if ((numReaders == 1) && (numWriters <= 127) && (maxItems < 256))
  69. return new CRowQueue<unsigned, 1, 7, 8, 0>(maxItems, numWriters, numReaders);
  70. if ((numWriters == 1) && (numReaders <= 255) && (maxItems < 2048))
  71. return new CRowQueue<unsigned, 8, 1, 11, 0>(maxItems, numWriters, numReaders);
  72. if ((numReaders <= 31) && (numWriters <= 31) && (maxItems < 128))
  73. return new CRowQueue<unsigned, 6, 6, 7, 0>(maxItems, numWriters, numReaders);
  74. assertex((numReaders < 0x1000) && (numWriters < 0x400));
  75. return new CRowQueue<unsigned __int64, 12, 10, 16, 0>(maxItems, numWriters, numReaders);
  76. }
  77. //MORE:
  78. //use likely()/unlikely() - they might improve the code
  79. //Adaptive spin counts - separate variables for reader/writer. reduce number of spins if a producer/consumer blocks.
  80. //Add options to indicate spinning for readers or writers is ok
  81. //Use a traits class instead of multiple parameters?
  82. //If readers or writers spin, then there is no need to keep track of waiting counts in the state.
  83. //Base the blog on an earlier simpler example - without the full templatization