thorstrand.hpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef THORSTRAND_HPP
  14. #define THORSTRAND_HPP
  15. #include "jqueue.hpp"
  16. #include "thorhelper.hpp"
  17. #include "roxiestream.hpp"
  18. #include "roxiemem.hpp"
  19. interface IStrandJunction : extends IInterface
  20. {
  21. public:
  22. virtual IEngineRowStream * queryOutput(unsigned n) = 0;
  23. virtual void setInput(unsigned n, IEngineRowStream * _stream) = 0;
  24. virtual void start() = 0;
  25. virtual void reset() = 0;
  26. virtual void abort() = 0;
  27. };
  28. inline void startJunction(IStrandJunction * junction) { if (junction) junction->start(); }
  29. inline void resetJunction(IStrandJunction * junction) { if (junction) junction->reset(); }
  30. interface IStrandThreaded : extends IThreaded
  31. {
  32. virtual void stopStream() = 0;
  33. };
  34. interface IStrandBarrier : extends IInterface
  35. {
  36. public:
  37. virtual void startStrand(IStrandThreaded & strand) = 0;
  38. virtual void waitForStrands() = 0;
  39. virtual void noteStrandFinished(IRowStream * stream) = 0;
  40. };
  41. interface IManyToOneRowStream : extends IRowStream
  42. {
  43. public:
  44. virtual IRowWriterEx * getWriter(unsigned n) = 0;
  45. virtual void abort() = 0;
  46. };
  47. interface IStrandBranch : extends IInterface
  48. {
  49. virtual IStrandJunction * queryInputJunction() = 0;
  50. virtual IStrandJunction * queryOutputJunction() = 0;
  51. };
  52. interface IOrderedOutputCallback
  53. {
  54. virtual bool noteEndOfInputChunk() = 0;
  55. virtual void noteEndOfInput() = 0;
  56. };
  57. interface IOrderedCallbackCollection
  58. {
  59. virtual IOrderedOutputCallback * queryCallback(unsigned i) = 0;
  60. };
  61. extern THORHELPER_API IStrandJunction * createStrandJunction(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered);
  62. extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & _rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped, bool inputIsStreamed, IOrderedCallbackCollection * orderedCallbacks);
  63. extern THORHELPER_API void clearRowQueue(IRowQueue * queue);
  64. extern THORHELPER_API IStrandBarrier * createStrandBarrier();
  65. extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered);
  66. extern THORHELPER_API const void * queryEndOfSectionMarker();
  67. //---------------------------------------------------------------------------------------------------------------------
  68. class RowBlockAllocator;
  69. class THORHELPER_API RoxieRowBlock
  70. {
  71. public:
  72. const static unsigned numDummyDynamicRows = 1;
  73. explicit RoxieRowBlock(unsigned _maxRows) noexcept : maxRows(_maxRows)
  74. {
  75. readPos = 0;
  76. writePos = 0;
  77. endOfChunk = false;
  78. }
  79. ~RoxieRowBlock();
  80. inline bool addRowNowFull(const void * row)
  81. {
  82. dbgassertex(writePos < maxRows);
  83. rows[writePos] = row;
  84. return (++writePos == maxRows);
  85. }
  86. bool empty() const;
  87. IException * getClearException()
  88. {
  89. return exception.getClear();
  90. }
  91. inline bool isEndOfChunk() const { return endOfChunk; }
  92. inline bool nextRow(const void * & row)
  93. {
  94. if (readPos >= writePos)
  95. return false;
  96. row = rows[readPos++];
  97. return true;
  98. }
  99. inline size32_t numRows() const { return writePos - readPos; }
  100. bool readFromStream(IRowStream * stream);
  101. inline void releaseBlock()
  102. {
  103. //This function is called instead of directly calling delete in case a cache is introduced later.
  104. delete this;
  105. }
  106. void releaseRows();
  107. inline void setEndOfChunk() { endOfChunk = true; }
  108. inline void setExceptionOwn(IException * e) { exception.setown(e); }
  109. void throwAnyPendingException();
  110. static void operator delete (void * ptr);
  111. protected:
  112. Owned<IException> exception;
  113. const size32_t maxRows;
  114. size32_t readPos;
  115. size32_t writePos;
  116. bool endOfChunk;
  117. const void * rows[numDummyDynamicRows]; // Actually multiple rows. Memory is allocated by the RowBlockAllocator.
  118. };
  119. class THORHELPER_API RowBlockAllocator
  120. {
  121. public:
  122. RowBlockAllocator(roxiemem::IRowManager & _rowManager, unsigned rowsPerBlock);
  123. RoxieRowBlock * newBlock();
  124. size32_t maxRowsPerBlock() const { return rowsPerBlock; }
  125. public:
  126. size32_t rowsPerBlock;
  127. Owned<roxiemem::IFixedRowHeap> heap;
  128. };
  129. //---------------------------------------------------------------------------------------------------------------------
  130. typedef IQueueOf<RoxieRowBlock *> IRowBlockQueue;
  131. //MORE: This implementation should be improved! Directly use the correct queue implementation??
  132. class CRowBlockQueue : implements CInterfaceOf<IRowBlockQueue>
  133. {
  134. public:
  135. CRowBlockQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots)
  136. {
  137. queue.setown(createRowQueue(numReaders, numWriters, maxItems, maxSlots));
  138. }
  139. virtual bool enqueue(RoxieRowBlock * const item)
  140. {
  141. return queue->enqueue(reinterpret_cast<const void *>(item));
  142. }
  143. virtual bool dequeue(RoxieRowBlock * & result)
  144. {
  145. const void * tempResult;
  146. bool ok = queue->dequeue(tempResult);
  147. result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
  148. return ok;
  149. }
  150. virtual bool tryDequeue(RoxieRowBlock * & result)
  151. {
  152. const void * tempResult;
  153. bool ok = queue->tryDequeue(tempResult);
  154. result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
  155. return ok;
  156. }
  157. virtual void reset()
  158. {
  159. queue->reset();
  160. }
  161. virtual void noteWriterStopped()
  162. {
  163. queue->noteWriterStopped();
  164. }
  165. virtual void abort()
  166. {
  167. queue->abort();
  168. }
  169. private:
  170. Owned<IRowQueue> queue;
  171. };
  172. #endif // THORSTRAND_HPP