thorstrand.cpp 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jthread.hpp"
  15. #include "jqueue.hpp"
  16. #include "roxiemem.hpp"
  17. #include "thorstrand.hpp"
  18. #include <atomic>
  19. #define DEFAULT_ROWBLOCK_SIZE 500
  20. static const byte endOfSectionMarker = 0;
  21. const void * queryEndOfSectionMarker() { return &endOfSectionMarker; }
  22. //---------------------------------------------------------------------------------------------------------------------
  23. class CStrandBarrier : public CInterfaceOf<IStrandBarrier>
  24. {
  25. public:
  26. virtual void reset()
  27. {
  28. producerStopSem.reinit(0);
  29. }
  30. virtual void startStrand(IStrandThreaded & strand)
  31. {
  32. CThreaded * thread = new CThreaded("Strand", &strand);
  33. threads.append(*thread);
  34. thread->start();
  35. }
  36. virtual void waitForStrands()
  37. {
  38. producerStopSem.signal(threads.ordinality());
  39. ForEachItemIn(i, threads)
  40. threads.item(i).join();
  41. threads.kill();
  42. }
  43. virtual void noteStrandFinished(IRowStream * stream)
  44. {
  45. waitForStop();
  46. if (stream)
  47. stream->stop();
  48. }
  49. protected:
  50. void waitForStop()
  51. {
  52. producerStopSem.wait();
  53. }
  54. protected:
  55. Semaphore producerStopSem;
  56. CIArrayOf<CThreaded> threads;
  57. };
  58. IStrandBarrier * createStrandBarrier()
  59. {
  60. return new CStrandBarrier;
  61. }
  62. //---------------------------------------------------------------------------------------------------------------------
  63. class CStrandJunction : public CInterfaceOf<IStrandJunction>
  64. {
  65. public:
  66. explicit CStrandJunction(unsigned _numProducers, unsigned _numStrands)
  67. : numProducers(_numProducers), numStrands(_numStrands), stopping(false), started(false)
  68. {
  69. assertex(numProducers);
  70. }
  71. virtual void reset()
  72. {
  73. producerStopSem.reinit(0);
  74. producerStoppedSem.reinit(0);
  75. stopping.store(false, std::memory_order_relaxed);
  76. started = false;
  77. }
  78. inline bool isStopping() const
  79. {
  80. return stopping.load(std::memory_order_relaxed);
  81. }
  82. inline void noteStarted()
  83. {
  84. started = true;
  85. }
  86. void startProducerThread(IThreaded & mainthread)
  87. {
  88. CThreaded * thread = new CThreaded("ReadAheadThread", &mainthread);
  89. threads.append(*thread);
  90. thread->start();
  91. }
  92. void processConsumerStop()
  93. {
  94. if (started)
  95. {
  96. //Ensure only one producer triggers stopping on the inputs
  97. if (!stopping.exchange(true, std::memory_order_acq_rel))
  98. {
  99. stopActiveProducers();
  100. for (unsigned i=0; i < numProducers; i++)
  101. producerStoppedSem.wait();
  102. }
  103. }
  104. else
  105. stopInactiveProducers();
  106. }
  107. void noteProducerFinished(IRowStream * stream)
  108. {
  109. waitForStop();
  110. if (stream)
  111. stream->stop();
  112. notifyStopped();
  113. }
  114. inline unsigned getNumProducers() const { return numProducers; }
  115. protected:
  116. void waitForStop()
  117. {
  118. producerStopSem.wait();
  119. }
  120. void notifyStopped()
  121. {
  122. producerStoppedSem.signal();
  123. }
  124. //Wait for all active producers to complete - including calling stop on their inputs
  125. void waitForProducers()
  126. {
  127. producerStopSem.signal(numProducers);
  128. ForEachItemIn(i, threads)
  129. threads.item(i).join();
  130. threads.kill();
  131. }
  132. //Stop producers that have already been started()
  133. virtual void stopActiveProducers()
  134. {
  135. waitForProducers();
  136. }
  137. //Stop producers that have never been started()
  138. virtual void stopInactiveProducers() = 0;
  139. protected:
  140. const unsigned numStrands;
  141. private:
  142. const unsigned numProducers;
  143. Semaphore producerStopSem;
  144. Semaphore producerStoppedSem;
  145. CIArrayOf<CThreaded> threads;
  146. std::atomic<bool> stopping;
  147. bool started;
  148. };
  149. class OneToOneJunction : public CInterfaceOf<IStrandJunction>
  150. {
  151. public:
  152. OneToOneJunction() : stream(NULL) {}
  153. virtual IEngineRowStream * queryOutput(unsigned n)
  154. {
  155. assertex(n == 0);
  156. assertex(stream);
  157. return stream;
  158. }
  159. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  160. {
  161. assertex(n == 0);
  162. stream = _stream;
  163. }
  164. virtual void start()
  165. {
  166. }
  167. virtual void reset()
  168. {
  169. }
  170. virtual void abort()
  171. {
  172. }
  173. protected:
  174. IEngineRowStream * stream;
  175. };
  176. //---------------------------------------------------------------------------------------------------------------------
  177. RoxieRowBlock::~RoxieRowBlock()
  178. {
  179. releaseRows();
  180. }
  181. bool RoxieRowBlock::empty() const
  182. {
  183. return (readPos >= writePos) && !exception;
  184. }
  185. bool RoxieRowBlock::readFromStream(IRowStream * stream)
  186. {
  187. bool done = false;
  188. try
  189. {
  190. for (;;)
  191. {
  192. const void * row = stream->nextRow();
  193. if (!row)
  194. {
  195. done = true;
  196. break;
  197. }
  198. else if (row == &endOfSectionMarker)
  199. {
  200. setEndOfChunk();
  201. break;
  202. }
  203. if (addRowNowFull(row))
  204. break;
  205. }
  206. }
  207. catch (IException * e)
  208. {
  209. setExceptionOwn(e);
  210. done = true;
  211. }
  212. return done;
  213. }
  214. void RoxieRowBlock::releaseRows()
  215. {
  216. while (readPos < writePos)
  217. ReleaseRoxieRow(rows[readPos++]);
  218. }
  219. void RoxieRowBlock::throwAnyPendingException()
  220. {
  221. if (exception)
  222. throw exception.getClear();
  223. }
  224. void RoxieRowBlock::operator delete (void * ptr)
  225. {
  226. ReleaseRoxieRow(ptr);
  227. }
  228. //---------------------------------------------------------------------------------------------------------------------
  229. RowBlockAllocator::RowBlockAllocator(roxiemem::IRowManager & rowManager, size32_t minRowsPerBlock) : rowsPerBlock(0)
  230. {
  231. assertex(minRowsPerBlock);
  232. size_t classSize = sizeof(RoxieRowBlock) - RoxieRowBlock::numDummyDynamicRows * sizeof(void *);
  233. size_t requestedSize = classSize + minRowsPerBlock * sizeof(void*);
  234. roxiemem::RoxieHeapFlags heapFlags = roxiemem::RHFunique|roxiemem::RHFnofragment;
  235. heap.setown(rowManager.createFixedRowHeap(requestedSize, 0, heapFlags, 0));
  236. rowsPerBlock = (rowManager.getExpectedCapacity(requestedSize, heapFlags) - classSize ) / sizeof(void*);
  237. assertex(rowsPerBlock >= minRowsPerBlock);
  238. }
  239. //A bit of an experimental implementation - other options could include a list like the allocators
  240. RoxieRowBlock * RowBlockAllocator::newBlock()
  241. {
  242. return new (heap->allocate()) RoxieRowBlock(rowsPerBlock);
  243. }
  244. //---------------------------------------------------------------------------------------------------------------------
  245. static void resetBlockQueue(IRowQueue * queue)
  246. {
  247. queue->reset();
  248. for (;;)
  249. {
  250. const void * next;
  251. if (!queue->tryDequeue(next))
  252. break;
  253. RoxieRowBlock * curBlock = (RoxieRowBlock *)next;
  254. if (curBlock)
  255. curBlock->releaseBlock();
  256. }
  257. }
  258. class StreamToBlockQueueThread : public CInterface, implements IThreaded
  259. {
  260. public:
  261. StreamToBlockQueueThread(CStrandJunction & _junction, IRowQueue * _queue, RowBlockAllocator & _allocator)
  262. : junction(_junction), queue(_queue), stream(NULL), allocator(_allocator) {}
  263. virtual void threadmain() override
  264. {
  265. bool done = false;
  266. while (!done)
  267. {
  268. RoxieRowBlock * block = allocator.newBlock();
  269. done = block->readFromStream(stream);
  270. if (junction.isStopping() || block->empty() || !queue->enqueue(block))
  271. {
  272. block->releaseBlock();
  273. break;
  274. }
  275. }
  276. queue->noteWriterStopped();
  277. junction.noteProducerFinished(stream);
  278. }
  279. void setInput(IEngineRowStream * _input)
  280. {
  281. stream = _input;
  282. }
  283. void setQueue(IRowQueue * _queue)
  284. {
  285. queue = _queue;
  286. }
  287. void stopInput()
  288. {
  289. stream->stop();
  290. }
  291. protected:
  292. CStrandJunction & junction;
  293. RowBlockAllocator & allocator;
  294. IEngineRowStream * stream;
  295. IRowQueue * queue;
  296. };
  297. class StreamFromBlockQueue : public CInterfaceOf<IEngineRowStream>
  298. {
  299. public:
  300. StreamFromBlockQueue(CStrandJunction & _junction, IRowQueue & _queue) : junction(_junction), queue(_queue)
  301. {
  302. curBlock = NULL;
  303. }
  304. ~StreamFromBlockQueue()
  305. {
  306. reset();
  307. }
  308. virtual const void *nextRow()
  309. {
  310. const void * ret;
  311. for (;;)
  312. {
  313. if (curBlock)
  314. {
  315. if (curBlock->nextRow(ret))
  316. return ret;
  317. if (!pendingException)
  318. pendingException.setown(curBlock->getClearException());
  319. curBlock->releaseBlock();
  320. curBlock = NULL;
  321. }
  322. const void * next;
  323. if (!queue.dequeue(next))
  324. {
  325. //If inputs are unordered, process exceptions last of all
  326. if (pendingException)
  327. throw pendingException.getClear();
  328. return NULL;
  329. }
  330. curBlock = (RoxieRowBlock *)next;
  331. }
  332. }
  333. virtual void stop()
  334. {
  335. queue.noteReaderStopped();
  336. junction.processConsumerStop();
  337. }
  338. virtual void resetEOF()
  339. {
  340. throwUnexpectedX("resetEOF called on BlockedReader");
  341. }
  342. void reset()
  343. {
  344. if (curBlock)
  345. curBlock->releaseBlock();
  346. curBlock = NULL;
  347. pendingException.clear();
  348. }
  349. protected:
  350. CStrandJunction & junction;
  351. IRowQueue & queue;
  352. RoxieRowBlock * curBlock;
  353. Owned<IException> pendingException;
  354. };
  355. //---------------------------------------------------------------------------------------------------------------------
  356. class BlockedManyToOneJunction : public CStrandJunction
  357. {
  358. public:
  359. BlockedManyToOneJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize, IRowQueue * _queue)
  360. : CStrandJunction(_numStrands, _numStrands), queue(_queue), allocator(_rowManager, blockSize), consumer(*this, *_queue)
  361. {
  362. producers = new StreamToBlockQueueThread * [numStrands];
  363. for (unsigned i=0; i < numStrands; i++)
  364. producers[i] = new StreamToBlockQueueThread(*this, queue, allocator);
  365. }
  366. ~BlockedManyToOneJunction()
  367. {
  368. for (unsigned i=0; i < numStrands; i++)
  369. producers[i]->Release();
  370. delete [] producers;
  371. }
  372. virtual IEngineRowStream * queryOutput(unsigned n)
  373. {
  374. assertex(n == 0);
  375. return &consumer;
  376. }
  377. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  378. {
  379. assertex(n < numStrands);
  380. producers[n]->setInput(_stream);
  381. }
  382. virtual void abort()
  383. {
  384. queue->abort();
  385. }
  386. virtual void reset()
  387. {
  388. consumer.reset();
  389. resetBlockQueue(queue);
  390. CStrandJunction::reset();
  391. }
  392. virtual void start()
  393. {
  394. for (unsigned i=0; i < numStrands; i++)
  395. startProducerThread(*producers[i]);
  396. noteStarted();
  397. }
  398. static BlockedManyToOneJunction * create(roxiemem::IRowManager & rowManager, unsigned numStrands, unsigned blockSize)
  399. {
  400. const unsigned maxQueued = numStrands * 4;
  401. Owned<IRowQueue> queue = createRowQueue(1, numStrands, maxQueued, 0);
  402. return new BlockedManyToOneJunction(rowManager, numStrands, blockSize, queue.getClear());
  403. }
  404. protected:
  405. virtual void stopInactiveProducers()
  406. {
  407. for (unsigned i=0; i < numStrands; i++)
  408. producers[i]->stopInput();
  409. }
  410. protected:
  411. Owned<IRowQueue> queue;
  412. RowBlockAllocator allocator;
  413. StreamToBlockQueueThread * * producers;
  414. StreamFromBlockQueue consumer;
  415. };
  416. //---------------------------------------------------------------------------------------------------------------------
  417. class BlockedOneToManyJunction : public CStrandJunction
  418. {
  419. public:
  420. BlockedOneToManyJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _maxQueueItems, unsigned _blockSize)
  421. : CStrandJunction(1, _numStrands), allocator(_rowManager, _blockSize), producer(*this, NULL, allocator)
  422. {
  423. queue.setown(createRowQueue(numStrands, 1, _maxQueueItems, 0));
  424. producer.setQueue(queue);
  425. consumers = new StreamFromBlockQueue * [numStrands];
  426. for (unsigned i=0; i < numStrands; i++)
  427. consumers[i] = new StreamFromBlockQueue(*this, *queue);
  428. }
  429. ~BlockedOneToManyJunction()
  430. {
  431. for (unsigned i=0; i < numStrands; i++)
  432. consumers[i]->Release();
  433. delete [] consumers;
  434. }
  435. virtual IEngineRowStream * queryOutput(unsigned n)
  436. {
  437. assertex(n < numStrands);
  438. return consumers[n];
  439. }
  440. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  441. {
  442. assertex(n == 0);
  443. producer.setInput(_stream);
  444. }
  445. virtual void abort()
  446. {
  447. queue->abort();
  448. }
  449. virtual void reset()
  450. {
  451. resetBlockQueue(queue);
  452. for (unsigned i=0; i < numStrands; i++)
  453. consumers[i]->reset();
  454. CStrandJunction::reset();
  455. }
  456. virtual void start()
  457. {
  458. startProducerThread(producer);
  459. noteStarted();
  460. }
  461. protected:
  462. virtual void stopInactiveProducers()
  463. {
  464. for (unsigned i=0; i < numStrands; i++)
  465. producer.stopInput();
  466. }
  467. protected:
  468. Owned<IRowQueue> queue;
  469. RowBlockAllocator allocator;
  470. StreamToBlockQueueThread producer;
  471. StreamFromBlockQueue * * consumers;
  472. };
  473. //---------------------------------------------------------------------------------------------------------------------
  474. //Trivial single element queue
  475. class SingleItemBlockQueue
  476. {
  477. public:
  478. SingleItemBlockQueue() : avail(0U), space(1U)
  479. {
  480. }
  481. ~SingleItemBlockQueue()
  482. {
  483. if (value)
  484. value->releaseBlock();
  485. }
  486. void abort()
  487. {
  488. abortSoon = true;
  489. avail.signal();
  490. space.signal();
  491. }
  492. void reset()
  493. {
  494. avail.reinit(0);
  495. space.reinit(1);
  496. if (value)
  497. {
  498. value->releaseBlock();
  499. value = NULL;
  500. }
  501. abortSoon = false;
  502. finishedWriting = false;
  503. finishedReading = false;
  504. }
  505. bool enqueue(RoxieRowBlock * next)
  506. {
  507. if (abortSoon || finishedReading)
  508. return false;
  509. space.wait();
  510. if (abortSoon || finishedReading)
  511. return false;
  512. value = next;
  513. avail.signal();
  514. return true;
  515. }
  516. void noteReaderStopped()
  517. {
  518. if (abortSoon)
  519. return;
  520. finishedReading = true;
  521. space.signal();
  522. }
  523. void noteWriterStopped()
  524. {
  525. if (abortSoon)
  526. return;
  527. space.wait();
  528. finishedWriting = true;
  529. avail.signal();
  530. }
  531. bool dequeue(RoxieRowBlock * & ret)
  532. {
  533. if (abortSoon)
  534. return false;
  535. avail.wait();
  536. if (abortSoon)
  537. return false;
  538. if (finishedWriting)
  539. {
  540. avail.signal();
  541. return false;
  542. }
  543. ret = value;
  544. value = NULL;
  545. space.signal();
  546. return true;
  547. }
  548. protected:
  549. RoxieRowBlock * value = nullptr;
  550. bool abortSoon = false;
  551. bool finishedWriting = false;
  552. bool finishedReading = false;
  553. Semaphore space __attribute__((aligned(CACHE_LINE_SIZE)));
  554. Semaphore avail __attribute__((aligned(CACHE_LINE_SIZE)));
  555. };
  556. //may replace with
  557. typedef SingleItemBlockQueue OrderedJunctionBlockQueue;
  558. class OrderedReadAheadThread : public CInterface, implements IThreaded
  559. {
  560. //friend class OrderedManyToOneJunction;
  561. public:
  562. OrderedReadAheadThread(CStrandJunction & _junction, RowBlockAllocator & _allocator) : junction(_junction), stream(NULL), allocator(_allocator)
  563. {
  564. finished = false;
  565. alive = true;
  566. }
  567. virtual void threadmain() override
  568. {
  569. bool done = false;
  570. while (!done)
  571. {
  572. RoxieRowBlock * block = allocator.newBlock();
  573. done = block->readFromStream(stream);
  574. if (block->empty() || !queue.enqueue(block))
  575. {
  576. block->releaseBlock();
  577. break;
  578. }
  579. }
  580. noteWriterStopped();
  581. junction.noteProducerFinished(stream);
  582. }
  583. void abort()
  584. {
  585. queue.abort();
  586. }
  587. void noteFinished()
  588. {
  589. assertex(finished);
  590. assertex(alive);
  591. alive = false;
  592. }
  593. void reset()
  594. {
  595. queue.reset();
  596. finished = false;
  597. alive = true;
  598. }
  599. void stopInput()
  600. {
  601. stream->stop();
  602. }
  603. void setInput(IEngineRowStream * _input)
  604. {
  605. stream = _input;
  606. }
  607. void noteWriterStopped()
  608. {
  609. finished = true;
  610. queue.noteWriterStopped();
  611. }
  612. inline bool isAlive() const { return alive; }
  613. inline OrderedJunctionBlockQueue & queryQueue() { return queue; }
  614. protected:
  615. CStrandJunction & junction;
  616. RowBlockAllocator & allocator;
  617. IEngineRowStream * stream;
  618. OrderedJunctionBlockQueue queue;
  619. bool finished;
  620. bool alive;
  621. };
  622. //This class primarily supports multi-stranded source activities which immediately feed into a single-stranded activity.
  623. class OrderedManyToOneJunction : public CStrandJunction, implements IEngineRowStream
  624. {
  625. public:
  626. IMPLEMENT_IINTERFACE_USING(CStrandJunction)
  627. OrderedManyToOneJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize)
  628. : CStrandJunction(_numStrands, _numStrands), allocator(_rowManager, blockSize)
  629. {
  630. producers = new OrderedReadAheadThread * [numStrands];
  631. for (unsigned i=0; i < numStrands; i++)
  632. producers[i] = new OrderedReadAheadThread(*this, allocator);
  633. curBlock = NULL;
  634. curStrand = 0;
  635. numActiveStrands = numStrands;
  636. }
  637. ~OrderedManyToOneJunction()
  638. {
  639. for (unsigned i=0; i < numStrands; i++)
  640. producers[i]->Release();
  641. delete [] producers;
  642. }
  643. virtual IEngineRowStream * queryOutput(unsigned n)
  644. {
  645. assertex(n == 0);
  646. return this;
  647. }
  648. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  649. {
  650. assertex(n < numStrands);
  651. producers[n]->setInput(_stream);
  652. }
  653. virtual void abort()
  654. {
  655. abortProducers();
  656. }
  657. virtual void reset()
  658. {
  659. if (curBlock)
  660. {
  661. curBlock->releaseBlock();
  662. curBlock = NULL;
  663. }
  664. for (unsigned strand=0; strand < numStrands; strand++)
  665. producers[strand]->reset();
  666. curStrand = 0;
  667. numActiveStrands = numStrands;
  668. CStrandJunction::reset();
  669. }
  670. virtual void start()
  671. {
  672. for (unsigned i=0; i < numStrands; i++)
  673. startProducerThread(*producers[i]);
  674. noteStarted();
  675. }
  676. virtual const void *nextRow()
  677. {
  678. if (numActiveStrands == 0)
  679. return NULL;
  680. for (;;)
  681. {
  682. if (curBlock)
  683. {
  684. const void * ret;
  685. if (curBlock->nextRow(ret))
  686. return ret;
  687. curBlock->throwAnyPendingException();
  688. bool isEnd = curBlock->isEndOfChunk();
  689. curBlock->releaseBlock();
  690. curBlock = NULL;
  691. if (isEnd)
  692. nextStrand();
  693. }
  694. for (;;)
  695. {
  696. OrderedReadAheadThread & curProducer = *(producers[curStrand]);
  697. OrderedJunctionBlockQueue & queue = curProducer.queryQueue();
  698. if (!queue.dequeue(curBlock))
  699. {
  700. //Abort requested
  701. numActiveStrands = 0;
  702. return NULL;
  703. }
  704. //DBGLOG("active(%d) strand(%d)", numActiveStrands, curStrand);
  705. if (curBlock)
  706. break;
  707. curProducer.noteFinished();
  708. if (--numActiveStrands == 0)
  709. return NULL;
  710. nextStrand();
  711. }
  712. }
  713. }
  714. virtual void stop()
  715. {
  716. //reading no more records => abort the queue and prevent the producers adding any more rows
  717. abortProducers();
  718. processConsumerStop();
  719. }
  720. virtual void resetEOF()
  721. {
  722. throwUnexpectedX("resetEOF called on OrderedManyToOneJunction");
  723. }
  724. protected:
  725. void nextStrand()
  726. {
  727. do
  728. {
  729. curStrand++;
  730. if (curStrand == numStrands)
  731. curStrand = 0;
  732. } while (!producers[curStrand]->isAlive());
  733. }
  734. virtual void stopInactiveProducers()
  735. {
  736. for (unsigned i=0; i < numStrands; i++)
  737. producers[i]->stopInput();
  738. }
  739. void abortProducers()
  740. {
  741. for (unsigned i=0; i < numStrands; i++)
  742. producers[i]->abort();
  743. }
  744. protected:
  745. unsigned numActiveStrands;
  746. RowBlockAllocator allocator;
  747. OrderedReadAheadThread * * producers;
  748. RoxieRowBlock * curBlock;
  749. unsigned curStrand;
  750. };
  751. //---------------------------------------------------------------------------------------------------------------------
  752. IStrandJunction * createStrandJunction(roxiemem::IRowManager & rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered)
  753. {
  754. if ((numInputs == 1) && (numOutputs == 1))
  755. return new OneToOneJunction();
  756. if (blockSize == 0)
  757. blockSize = DEFAULT_ROWBLOCK_SIZE;
  758. if (numOutputs == 1)
  759. {
  760. if (isOrdered)
  761. return new OrderedManyToOneJunction(rowManager, numInputs, blockSize);
  762. return BlockedManyToOneJunction::create(rowManager, numInputs, blockSize);
  763. }
  764. if (numInputs == 1)
  765. {
  766. unsigned maxQueueItems = numOutputs * 2;
  767. return new BlockedOneToManyJunction(rowManager, numOutputs, maxQueueItems, blockSize);
  768. }
  769. //More: We could implement M:N using the existing base classes if there was a need
  770. UNIMPLEMENTED_X("createStrandJunction M:N");
  771. }
  772. void clearRowQueue(IRowQueue * queue)
  773. {
  774. const void * next;
  775. while (queue->tryDequeue(next))
  776. ReleaseRoxieRow(next);
  777. }
  778. //---------------------------------------------------------------------------------------------------------------------
  779. //Class for managing processing on a single ordered strand
  780. class OrderedStrandRowBlockInput : public CInterfaceOf<IEngineRowStream>
  781. {
  782. friend class OrderedManyToOneJunction;
  783. public:
  784. OrderedStrandRowBlockInput(CStrandJunction & _splitJunction, IOrderedOutputCallback & _callback)
  785. : splitJunction(_splitJunction), callback(_callback)
  786. {
  787. curInputBlock = nullptr;
  788. }
  789. //interface IEngineRowStream for the rows being supplied to the strand.
  790. virtual const void *nextRow()
  791. {
  792. for (;;)
  793. {
  794. if (!curInputBlock)
  795. {
  796. if (!inputQueue.dequeue(curInputBlock))
  797. {
  798. callback.noteEndOfInput();
  799. return NULL;
  800. }
  801. }
  802. const void * row;
  803. if (curInputBlock->nextRow(row))
  804. return row;
  805. curInputBlock->throwAnyPendingException();
  806. if (curInputBlock->isEndOfChunk())
  807. {
  808. if (!callback.noteEndOfInputChunk())
  809. inputQueue.abort();
  810. }
  811. curInputBlock->releaseBlock();
  812. curInputBlock = NULL;
  813. }
  814. }
  815. virtual void stop()
  816. {
  817. //reading no more records => abort the queue and prevent the producer adding any more rows
  818. inputQueue.noteReaderStopped();
  819. splitJunction.processConsumerStop();
  820. }
  821. virtual void resetEOF()
  822. {
  823. throwUnexpectedX("resetEOF called on OrderedStrand");
  824. }
  825. void abort()
  826. {
  827. inputQueue.abort();
  828. }
  829. void reset()
  830. {
  831. inputQueue.reset();
  832. if (curInputBlock)
  833. curInputBlock->releaseBlock();
  834. curInputBlock = nullptr;
  835. }
  836. void noteWriterStopped()
  837. {
  838. queryInputQueue().noteWriterStopped();
  839. }
  840. inline OrderedJunctionBlockQueue & queryInputQueue() { return inputQueue; }
  841. protected:
  842. CStrandJunction & splitJunction;
  843. IOrderedOutputCallback & callback;
  844. OrderedJunctionBlockQueue inputQueue;
  845. RoxieRowBlock * curInputBlock;
  846. };
  847. class OrderedInputJunction : public CStrandJunction, implements IThreaded
  848. {
  849. public:
  850. OrderedInputJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _blockSize, bool _isGrouped, IOrderedCallbackCollection * callbacks)
  851. : CStrandJunction(1, _numStrands), inputBlockAllocator(_rowManager, _blockSize), isGrouped(_isGrouped)
  852. {
  853. strands = new OrderedStrandRowBlockInput * [numStrands];
  854. for (unsigned i=0; i < numStrands; i++)
  855. strands[i] = new OrderedStrandRowBlockInput(*this, *callbacks->queryCallback(i));
  856. blockSize = inputBlockAllocator.maxRowsPerBlock();
  857. minGroupBlockSize = (blockSize * 7 + 4) / 8; // Fill with groups until at least 7/8 filled.
  858. assertex(minGroupBlockSize != 0);
  859. }
  860. ~OrderedInputJunction()
  861. {
  862. for (unsigned i=0; i < numStrands; i++)
  863. delete strands[i];
  864. delete [] strands;
  865. }
  866. virtual IEngineRowStream * queryOutput(unsigned n) { assertex(n < numStrands); return strands[n]; }
  867. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  868. {
  869. assertex(n==0);
  870. input = _stream;
  871. }
  872. virtual void start()
  873. {
  874. startProducerThread(*this);
  875. noteStarted();
  876. }
  877. virtual void abort()
  878. {
  879. for (unsigned i=0; i < numStrands; i++)
  880. strands[i]->abort();
  881. }
  882. virtual void reset()
  883. {
  884. for (unsigned i=0; i < numStrands; i++)
  885. strands[i]->reset();
  886. CStrandJunction::reset();
  887. }
  888. virtual void stopInactiveProducers()
  889. {
  890. input->stop();
  891. }
  892. virtual void threadmain() override
  893. {
  894. unsigned curStrand = 0;
  895. bool done = false;
  896. size32_t endChunkSize = minGroupBlockSize-1;
  897. const void * prev = nullptr; // only checked if non null, never dereferenced
  898. while (!done)
  899. {
  900. RoxieRowBlock * block = inputBlockAllocator.newBlock();
  901. bool isEndOfChunk = !isGrouped;
  902. try
  903. {
  904. if (isGrouped)
  905. {
  906. for (;;)
  907. {
  908. const void * row = input->nextRow();
  909. if (!row)
  910. {
  911. if (!prev)
  912. {
  913. done = true;
  914. isEndOfChunk = true;
  915. }
  916. else if (block->numRows() >= endChunkSize)
  917. isEndOfChunk = true;
  918. }
  919. prev = row;
  920. if (block->addRowNowFull(row) || isEndOfChunk)
  921. break;
  922. }
  923. }
  924. else
  925. {
  926. //MORE: This could more efficiently loop 0..blockSize-1 and remove the test in addRowNowFull()
  927. for (;;)
  928. {
  929. const void * row = input->nextRow();
  930. if (unlikely(!row))
  931. {
  932. //Some activities rely on two successive nulls to indicate end of file, so read another row and check it
  933. row = input->nextRow();
  934. assertex(!row);
  935. done = true;
  936. break;
  937. }
  938. else
  939. {
  940. if (block->addRowNowFull(row))
  941. break;
  942. }
  943. if (isStopping())
  944. break;
  945. }
  946. }
  947. }
  948. catch (IException * e)
  949. {
  950. //MORE: Protect against exceptions, ensure exception is fed and processed by the strand. (Otherwise read ahead may cause
  951. //premature failure...
  952. block->setExceptionOwn(e);
  953. done = true;
  954. }
  955. if (isEndOfChunk)
  956. {
  957. block->setEndOfChunk();
  958. endChunkSize = minGroupBlockSize-1;
  959. }
  960. else
  961. endChunkSize = 0; // Switch to the next strand as soon as an end of group is encountered
  962. if (block->empty() || !strands[curStrand]->queryInputQueue().enqueue(block))
  963. {
  964. block->releaseBlock();
  965. break;
  966. }
  967. if (isEndOfChunk)
  968. {
  969. curStrand = curStrand+1;
  970. if (curStrand == numStrands)
  971. curStrand = 0;
  972. }
  973. }
  974. for (unsigned i=0; i < numStrands; i++)
  975. {
  976. strands[curStrand]->noteWriterStopped();
  977. curStrand = curStrand+1;
  978. if (curStrand == numStrands)
  979. curStrand = 0;
  980. }
  981. noteProducerFinished(input);
  982. }
  983. protected:
  984. RowBlockAllocator inputBlockAllocator;
  985. OrderedStrandRowBlockInput * * strands;
  986. IEngineRowStream * input = nullptr;
  987. unsigned blockSize;
  988. unsigned minGroupBlockSize;
  989. bool isGrouped;
  990. };
  991. //---------------------------------------------------------------------------------------------------------------------
  992. //Class for reading input from a streaming source activity.
  993. class OrderedStrandStreamInput : public CInterfaceOf<IEngineRowStream>
  994. {
  995. friend class OrderedManyToOneJunction;
  996. public:
  997. OrderedStrandStreamInput(CStrandJunction & _splitJunction, IOrderedOutputCallback & _callback)
  998. : splitJunction(_splitJunction), callback(_callback)
  999. {
  1000. }
  1001. void setInput(IEngineRowStream * _input)
  1002. {
  1003. stream = _input;
  1004. }
  1005. //interface IEngineRowStream for the rows being supplied to the strand.
  1006. virtual const void *nextRow()
  1007. {
  1008. for (;;)
  1009. {
  1010. if (eof)
  1011. return NULL;
  1012. const void * row = stream->nextRow();
  1013. if (likely(row != &endOfSectionMarker))
  1014. return row;
  1015. if (!callback.noteEndOfInputChunk())
  1016. eof = true;
  1017. }
  1018. }
  1019. virtual void stop()
  1020. {
  1021. stream->stop();
  1022. }
  1023. virtual void resetEOF()
  1024. {
  1025. throwUnexpectedX("resetEOF called on OrderedStrand");
  1026. }
  1027. void abort()
  1028. {
  1029. eof = true;
  1030. //MORE: provide a callback for notifying the source?
  1031. }
  1032. void reset()
  1033. {
  1034. eof = false;
  1035. }
  1036. protected:
  1037. CStrandJunction & splitJunction;
  1038. IOrderedOutputCallback & callback;
  1039. IRowStream * stream = nullptr;
  1040. bool eof = false;
  1041. };
  1042. class OrderedSourceJunction : public CStrandJunction
  1043. {
  1044. public:
  1045. OrderedSourceJunction(unsigned _numStrands, IOrderedCallbackCollection * callbacks)
  1046. : CStrandJunction(_numStrands, _numStrands)
  1047. {
  1048. strands = new OrderedStrandStreamInput * [numStrands];
  1049. for (unsigned i=0; i < numStrands; i++)
  1050. strands[i] = new OrderedStrandStreamInput(*this, *callbacks->queryCallback(i));
  1051. }
  1052. ~OrderedSourceJunction()
  1053. {
  1054. for (unsigned i=0; i < numStrands; i++)
  1055. delete strands[i];
  1056. delete [] strands;
  1057. }
  1058. virtual IEngineRowStream * queryOutput(unsigned n)
  1059. {
  1060. assertex(n < numStrands);
  1061. return strands[n];
  1062. }
  1063. virtual void reset()
  1064. {
  1065. for (unsigned i=0; i < numStrands; i++)
  1066. strands[i]->reset();
  1067. CStrandJunction::reset();
  1068. }
  1069. virtual void setInput(unsigned n, IEngineRowStream * _stream)
  1070. {
  1071. assertex(n < numStrands);
  1072. strands[n]->setInput(_stream);
  1073. }
  1074. virtual void start()
  1075. {
  1076. noteStarted();
  1077. }
  1078. virtual void abort()
  1079. {
  1080. for (unsigned i=0; i < numStrands; i++)
  1081. strands[i]->abort();
  1082. }
  1083. virtual void stopActiveProducers()
  1084. {
  1085. throwUnexpected();
  1086. }
  1087. virtual void stopInactiveProducers()
  1088. {
  1089. throwUnexpected();
  1090. }
  1091. protected:
  1092. OrderedStrandStreamInput * * strands;
  1093. };
  1094. //---------------------------------------------------------------------------------------------------------------------
  1095. class OrderedStrandRowBlockOutput : public CInterface, implements IThreaded, implements IOrderedOutputCallback
  1096. {
  1097. friend class OrderedManyToOneJunction;
  1098. public:
  1099. OrderedStrandRowBlockOutput(CStrandJunction & _joinJunction, RowBlockAllocator & _allocator)
  1100. : joinJunction(_joinJunction), allocator(_allocator)
  1101. {
  1102. }
  1103. //IThreaded - threadmain function used to read rows from the strand and add to the output
  1104. virtual void threadmain() override
  1105. {
  1106. bool done = false;
  1107. while (!done)
  1108. {
  1109. try
  1110. {
  1111. for (;;)
  1112. {
  1113. const void * row = strand->nextRow();
  1114. //NB: Need to be check the final eog isn't lost when processing sequentially
  1115. if (!row && eoi)
  1116. {
  1117. done = true;
  1118. break;
  1119. }
  1120. //curOutputBlock may be modified within the call to strand->nextRow() above
  1121. //(but not by any other threads)
  1122. if (!curOutputBlock)
  1123. curOutputBlock = allocator.newBlock();
  1124. if (curOutputBlock->addRowNowFull(row))
  1125. break;
  1126. }
  1127. }
  1128. catch (IException * e)
  1129. {
  1130. if (!curOutputBlock)
  1131. curOutputBlock = allocator.newBlock();
  1132. curOutputBlock->setExceptionOwn(e);
  1133. done = true;
  1134. }
  1135. if (curOutputBlock)
  1136. {
  1137. if (curOutputBlock->empty() || !outputQueue.enqueue(curOutputBlock))
  1138. {
  1139. curOutputBlock->releaseBlock();
  1140. curOutputBlock = NULL;
  1141. break;
  1142. }
  1143. curOutputBlock = NULL;
  1144. }
  1145. }
  1146. finished = true;
  1147. outputQueue.noteWriterStopped();
  1148. joinJunction.noteProducerFinished(strand);
  1149. }
  1150. virtual bool noteEndOfInputChunk()
  1151. {
  1152. if (!curOutputBlock)
  1153. curOutputBlock = allocator.newBlock();
  1154. curOutputBlock->setEndOfChunk();
  1155. bool success = true;
  1156. if (!outputQueue.enqueue(curOutputBlock))
  1157. {
  1158. curOutputBlock->releaseBlock();
  1159. success = false;
  1160. }
  1161. curOutputBlock = NULL;
  1162. return success;
  1163. }
  1164. virtual void noteEndOfInput()
  1165. {
  1166. eoi = true;
  1167. }
  1168. void abort()
  1169. {
  1170. outputQueue.abort();
  1171. }
  1172. void reset()
  1173. {
  1174. outputQueue.reset();
  1175. if (curOutputBlock)
  1176. curOutputBlock->releaseBlock();
  1177. curOutputBlock = nullptr;
  1178. finished = false;
  1179. alive = true;
  1180. eoi = false;
  1181. }
  1182. void setStrand(IEngineRowStream * _input)
  1183. {
  1184. strand = _input;
  1185. }
  1186. void stopStrand()
  1187. {
  1188. strand->stop();
  1189. }
  1190. inline OrderedJunctionBlockQueue & queryOutputQueue() { return outputQueue; }
  1191. protected:
  1192. CStrandJunction & joinJunction;
  1193. RowBlockAllocator & allocator;
  1194. IEngineRowStream * strand = nullptr; // the stream that executes in parallel processing the stream of rows
  1195. OrderedJunctionBlockQueue outputQueue;
  1196. RoxieRowBlock * curOutputBlock = nullptr;
  1197. bool finished = false;
  1198. bool alive = true;
  1199. bool eoi = false;
  1200. };
  1201. class OrderedOutputJunction : public CStrandJunction, implements IEngineRowStream, implements IOrderedCallbackCollection
  1202. {
  1203. public:
  1204. OrderedOutputJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _blockSize)
  1205. : CStrandJunction(_numStrands, _numStrands), outputBlockAllocator(_rowManager, _blockSize)
  1206. {
  1207. strands = new OrderedStrandRowBlockOutput * [numStrands];
  1208. for (unsigned i=0; i < numStrands; i++)
  1209. strands[i] = new OrderedStrandRowBlockOutput(*this, outputBlockAllocator);
  1210. }
  1211. ~OrderedOutputJunction()
  1212. {
  1213. for (unsigned i=0; i < numStrands; i++)
  1214. delete strands[i];
  1215. delete [] strands;
  1216. }
  1217. IMPLEMENT_IINTERFACE_USING(CStrandJunction)
  1218. virtual IEngineRowStream * queryOutput(unsigned n) { assertex(n==0); return this; }
  1219. virtual void setInput(unsigned n, IEngineRowStream * _stream) { strands[n]->setStrand(_stream); }
  1220. virtual void start()
  1221. {
  1222. for (unsigned i=0; i < numStrands; i++)
  1223. startProducerThread(*strands[i]);
  1224. noteStarted();
  1225. }
  1226. virtual void abort()
  1227. {
  1228. for (unsigned i=0; i < numStrands; i++)
  1229. strands[i]->abort();
  1230. }
  1231. virtual void reset()
  1232. {
  1233. curOutputStrand = 0;
  1234. if (curOutputBlock)
  1235. curOutputBlock->releaseBlock();
  1236. curOutputBlock = nullptr;
  1237. for (unsigned i=0; i < numStrands; i++)
  1238. strands[i]->reset();
  1239. CStrandJunction::reset();
  1240. }
  1241. virtual void stopInactiveProducers()
  1242. {
  1243. for (unsigned i=0; i < numStrands; i++)
  1244. strands[i]->stopStrand();
  1245. }
  1246. virtual IOrderedOutputCallback * queryCallback(unsigned i)
  1247. {
  1248. assertex(i < numStrands);
  1249. return strands[i];
  1250. }
  1251. //implementation of IEngineRowStream
  1252. virtual const void *nextRow()
  1253. {
  1254. for (;;)
  1255. {
  1256. if (likely(curOutputBlock))
  1257. {
  1258. const void * result;
  1259. if (curOutputBlock->nextRow(result))
  1260. return result;
  1261. curOutputBlock->throwAnyPendingException();
  1262. if (curOutputBlock->isEndOfChunk())
  1263. {
  1264. curOutputStrand++;
  1265. if (curOutputStrand == numStrands)
  1266. curOutputStrand = 0;
  1267. }
  1268. curOutputBlock->releaseBlock();
  1269. curOutputBlock = NULL;
  1270. }
  1271. if (!strands[curOutputStrand]->queryOutputQueue().dequeue(curOutputBlock))
  1272. {
  1273. //If there is no more output on the next strand, then all the strands will have finished processing.
  1274. return NULL;
  1275. }
  1276. }
  1277. }
  1278. virtual void stop()
  1279. {
  1280. //reading no more records => abort all strand queues and prevent the producers adding any more rows
  1281. for (unsigned i=0; i < numStrands; i++)
  1282. strands[i]->abort();
  1283. processConsumerStop();
  1284. }
  1285. virtual void resetEOF()
  1286. {
  1287. throwUnexpectedX("resetEOF called on OrderedStrandBranch");
  1288. }
  1289. protected:
  1290. RowBlockAllocator outputBlockAllocator;
  1291. OrderedStrandRowBlockOutput * * strands;
  1292. RoxieRowBlock * curOutputBlock = nullptr;
  1293. unsigned curOutputStrand = 0;
  1294. };
  1295. //---------------------------------------------------------------------------------------------------------------------
  1296. class CStrandBranch : public CInterfaceOf<IStrandBranch>
  1297. {
  1298. public:
  1299. CStrandBranch(IStrandJunction * _input, IStrandJunction * _output) : input(_input), output(_output)
  1300. {
  1301. }
  1302. virtual IStrandJunction * queryInputJunction()
  1303. {
  1304. return input;
  1305. }
  1306. virtual IStrandJunction * queryOutputJunction()
  1307. {
  1308. return output;
  1309. }
  1310. protected:
  1311. Linked<IStrandJunction> input;
  1312. Linked<IStrandJunction> output;
  1313. };
  1314. //---------------------------------------------------------------------------------------------------------------------
  1315. extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped, bool inputIsStreamed, IOrderedCallbackCollection * orderedCallbacks)
  1316. {
  1317. Linked<IStrandJunction> input;
  1318. Linked<IStrandJunction> output;
  1319. //Slightly inefficient to go via a junction, but makes the testing code simpler!
  1320. assertex(numStrands);
  1321. if (numStrands == 1)
  1322. {
  1323. input.setown(new OneToOneJunction());
  1324. output.setown(new OneToOneJunction());
  1325. }
  1326. else if (isOrdered || isGrouped)
  1327. {
  1328. //MORE To allow parallel sinks allow callbacks to be provided and have a dummy output junction or none
  1329. //But the number will not be known until the branch is created, so the callback will need to create them
  1330. IOrderedCallbackCollection * callbacks = orderedCallbacks;
  1331. if (!callbacks)
  1332. {
  1333. Owned<OrderedOutputJunction> outputJunction = new OrderedOutputJunction(rowManager, numStrands, blockSize);
  1334. callbacks = outputJunction;
  1335. output.setown(outputJunction.getClear());
  1336. }
  1337. if (inputIsStreamed)
  1338. {
  1339. input.setown(new OrderedSourceJunction(numStrands, callbacks));
  1340. }
  1341. else
  1342. {
  1343. input.setown(new OrderedInputJunction(rowManager, numStrands, blockSize, isGrouped, callbacks));
  1344. }
  1345. }
  1346. else
  1347. {
  1348. input.setown(createStrandJunction( rowManager, 1, numStrands, blockSize, false));
  1349. output.setown(createStrandJunction( rowManager, numStrands, 1, blockSize, false));
  1350. }
  1351. return new CStrandBranch(input, output);
  1352. }
  1353. //---------------------------------------------------------------------------------------------------------------------
  1354. class BlockedRowStreamWriter : public CInterfaceOf<IRowWriterEx>
  1355. {
  1356. public:
  1357. BlockedRowStreamWriter(IRowQueue * _queue, RowBlockAllocator & _allocator) : queue(_queue), allocator(_allocator)
  1358. {
  1359. curBlock = NULL;
  1360. }
  1361. virtual void putRow(const void *row)
  1362. {
  1363. if (!curBlock)
  1364. curBlock = allocator.newBlock();
  1365. if (curBlock->addRowNowFull(row))
  1366. {
  1367. if (!queue->enqueue(curBlock))
  1368. curBlock->releaseBlock();
  1369. curBlock = NULL;
  1370. }
  1371. }
  1372. virtual void flush()
  1373. {
  1374. if (curBlock)
  1375. {
  1376. if (!queue->enqueue(curBlock))
  1377. curBlock->releaseBlock();
  1378. curBlock = NULL;
  1379. }
  1380. }
  1381. virtual void noteStopped()
  1382. {
  1383. flush();
  1384. queue->noteWriterStopped();
  1385. }
  1386. protected:
  1387. IRowQueue * queue;
  1388. RowBlockAllocator & allocator;
  1389. RoxieRowBlock * curBlock;
  1390. };
  1391. class UnorderedManyToOneRowStream : public CInterfaceOf<IManyToOneRowStream>
  1392. {
  1393. public:
  1394. UnorderedManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize) : numStrands(_numStrands), allocator(_rowManager, blockSize)
  1395. {
  1396. const unsigned maxQueued = numStrands * 4;
  1397. queue.setown(createRowQueue(1, numStrands, maxQueued, 0));
  1398. producers = new BlockedRowStreamWriter * [numStrands];
  1399. for (unsigned i=0; i < numStrands; i++)
  1400. producers[i] = new BlockedRowStreamWriter(queue, allocator);
  1401. curBlock = NULL;
  1402. }
  1403. ~UnorderedManyToOneRowStream()
  1404. {
  1405. for (unsigned i=0; i < numStrands; i++)
  1406. producers[i]->Release();
  1407. delete [] producers;
  1408. }
  1409. virtual void abort()
  1410. {
  1411. queue->abort();
  1412. }
  1413. virtual void stop()
  1414. {
  1415. queue->noteReaderStopped();
  1416. }
  1417. virtual void reset()
  1418. {
  1419. queue->reset();
  1420. for (;;)
  1421. {
  1422. if (curBlock)
  1423. curBlock->releaseBlock();
  1424. const void * next;
  1425. if (!queue->tryDequeue(next))
  1426. break;
  1427. curBlock = (RoxieRowBlock *)next;
  1428. }
  1429. curBlock = NULL;
  1430. }
  1431. virtual const void *nextRow()
  1432. {
  1433. const void * ret;
  1434. for (;;)
  1435. {
  1436. if (curBlock)
  1437. {
  1438. if (curBlock->nextRow(ret))
  1439. return ret;
  1440. curBlock->releaseBlock();
  1441. curBlock = NULL;
  1442. }
  1443. const void * next;
  1444. if (!queue->dequeue(next))
  1445. return NULL;
  1446. curBlock = (RoxieRowBlock *)next;
  1447. }
  1448. }
  1449. virtual IRowWriterEx * getWriter(unsigned n)
  1450. {
  1451. return LINK(producers[n]);
  1452. }
  1453. protected:
  1454. unsigned numStrands;
  1455. Owned<IRowQueue> queue;
  1456. RowBlockAllocator allocator;
  1457. BlockedRowStreamWriter * * producers;
  1458. RoxieRowBlock * curBlock;
  1459. };
  1460. //---------------------------------------------------------------------------------------------------------------------
  1461. extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered)
  1462. {
  1463. if (!isOrdered)
  1464. return new UnorderedManyToOneRowStream(rowManager, numInputs, blockSize);
  1465. return NULL;
  1466. }