12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jexcept.hpp"
- #include "jthread.hpp"
- #include "jqueue.hpp"
- #include "roxiemem.hpp"
- #include "thorstrand.hpp"
- #include <atomic>
- #define DEFAULT_ROWBLOCK_SIZE 500
- static const byte endOfSectionMarker = 0;
- const void * queryEndOfSectionMarker() { return &endOfSectionMarker; }
- //---------------------------------------------------------------------------------------------------------------------
- class CStrandBarrier : public CInterfaceOf<IStrandBarrier>
- {
- public:
- virtual void reset()
- {
- producerStopSem.reinit(0);
- }
- virtual void startStrand(IStrandThreaded & strand)
- {
- CThreaded * thread = new CThreaded("Strand", &strand);
- threads.append(*thread);
- thread->start();
- }
- virtual void waitForStrands()
- {
- producerStopSem.signal(threads.ordinality());
- ForEachItemIn(i, threads)
- threads.item(i).join();
- threads.kill();
- }
- virtual void noteStrandFinished(IRowStream * stream)
- {
- waitForStop();
- if (stream)
- stream->stop();
- }
- protected:
- void waitForStop()
- {
- producerStopSem.wait();
- }
- protected:
- Semaphore producerStopSem;
- CIArrayOf<CThreaded> threads;
- };
- IStrandBarrier * createStrandBarrier()
- {
- return new CStrandBarrier;
- }
- //---------------------------------------------------------------------------------------------------------------------
- class CStrandJunction : public CInterfaceOf<IStrandJunction>
- {
- public:
- explicit CStrandJunction(unsigned _numProducers, unsigned _numStrands)
- : numProducers(_numProducers), numStrands(_numStrands), stopping(false), started(false)
- {
- assertex(numProducers);
- }
- virtual void reset()
- {
- producerStopSem.reinit(0);
- producerStoppedSem.reinit(0);
- stopping.store(false, std::memory_order_relaxed);
- started = false;
- }
- inline bool isStopping() const
- {
- return stopping.load(std::memory_order_relaxed);
- }
- inline void noteStarted()
- {
- started = true;
- }
- void startProducerThread(IThreaded & mainthread)
- {
- CThreaded * thread = new CThreaded("ReadAheadThread", &mainthread);
- threads.append(*thread);
- thread->start();
- }
- void processConsumerStop()
- {
- if (started)
- {
- //Ensure only one producer triggers stopping on the inputs
- if (!stopping.exchange(true, std::memory_order_acq_rel))
- {
- stopActiveProducers();
- for (unsigned i=0; i < numProducers; i++)
- producerStoppedSem.wait();
- }
- }
- else
- stopInactiveProducers();
- }
- void noteProducerFinished(IRowStream * stream)
- {
- waitForStop();
- if (stream)
- stream->stop();
- notifyStopped();
- }
- inline unsigned getNumProducers() const { return numProducers; }
- protected:
- void waitForStop()
- {
- producerStopSem.wait();
- }
- void notifyStopped()
- {
- producerStoppedSem.signal();
- }
- //Wait for all active producers to complete - including calling stop on their inputs
- void waitForProducers()
- {
- producerStopSem.signal(numProducers);
- ForEachItemIn(i, threads)
- threads.item(i).join();
- threads.kill();
- }
- //Stop producers that have already been started()
- virtual void stopActiveProducers()
- {
- waitForProducers();
- }
- //Stop producers that have never been started()
- virtual void stopInactiveProducers() = 0;
- protected:
- const unsigned numStrands;
- private:
- const unsigned numProducers;
- Semaphore producerStopSem;
- Semaphore producerStoppedSem;
- CIArrayOf<CThreaded> threads;
- std::atomic<bool> stopping;
- bool started;
- };
- class OneToOneJunction : public CInterfaceOf<IStrandJunction>
- {
- public:
- OneToOneJunction() : stream(NULL) {}
- virtual IEngineRowStream * queryOutput(unsigned n)
- {
- assertex(n == 0);
- assertex(stream);
- return stream;
- }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n == 0);
- stream = _stream;
- }
- virtual void start()
- {
- }
- virtual void reset()
- {
- }
- virtual void abort()
- {
- }
- protected:
- IEngineRowStream * stream;
- };
- //---------------------------------------------------------------------------------------------------------------------
- RoxieRowBlock::~RoxieRowBlock()
- {
- releaseRows();
- }
- bool RoxieRowBlock::empty() const
- {
- return (readPos >= writePos) && !exception;
- }
- bool RoxieRowBlock::readFromStream(IRowStream * stream)
- {
- bool done = false;
- try
- {
- for (;;)
- {
- const void * row = stream->nextRow();
- if (!row)
- {
- done = true;
- break;
- }
- else if (row == &endOfSectionMarker)
- {
- setEndOfChunk();
- break;
- }
- if (addRowNowFull(row))
- break;
- }
- }
- catch (IException * e)
- {
- setExceptionOwn(e);
- done = true;
- }
- return done;
- }
- void RoxieRowBlock::releaseRows()
- {
- while (readPos < writePos)
- ReleaseRoxieRow(rows[readPos++]);
- }
- void RoxieRowBlock::throwAnyPendingException()
- {
- if (exception)
- throw exception.getClear();
- }
- void RoxieRowBlock::operator delete (void * ptr)
- {
- ReleaseRoxieRow(ptr);
- }
- //---------------------------------------------------------------------------------------------------------------------
- RowBlockAllocator::RowBlockAllocator(roxiemem::IRowManager & rowManager, size32_t minRowsPerBlock) : rowsPerBlock(0)
- {
- assertex(minRowsPerBlock);
- size_t classSize = sizeof(RoxieRowBlock) - RoxieRowBlock::numDummyDynamicRows * sizeof(void *);
- size_t requestedSize = classSize + minRowsPerBlock * sizeof(void*);
- roxiemem::RoxieHeapFlags heapFlags = roxiemem::RHFunique|roxiemem::RHFscanning;
- heap.setown(rowManager.createFixedRowHeap(requestedSize, 0, heapFlags));
- rowsPerBlock = (rowManager.getExpectedCapacity(requestedSize, heapFlags) - classSize ) / sizeof(void*);
- assertex(rowsPerBlock >= minRowsPerBlock);
- }
- //A bit of an experimental implementation - other options could include a list like the allocators
- RoxieRowBlock * RowBlockAllocator::newBlock()
- {
- return new (heap->allocate()) RoxieRowBlock(rowsPerBlock);
- }
- //---------------------------------------------------------------------------------------------------------------------
- static void resetBlockQueue(IRowQueue * queue)
- {
- queue->reset();
- for (;;)
- {
- const void * next;
- if (!queue->tryDequeue(next))
- break;
- RoxieRowBlock * curBlock = (RoxieRowBlock *)next;
- if (curBlock)
- curBlock->releaseBlock();
- }
- }
- class StreamToBlockQueueThread : public CInterface, implements IThreaded
- {
- public:
- StreamToBlockQueueThread(CStrandJunction & _junction, IRowQueue * _queue, RowBlockAllocator & _allocator)
- : junction(_junction), queue(_queue), stream(NULL), allocator(_allocator) {}
- virtual void threadmain() override
- {
- bool done = false;
- while (!done)
- {
- RoxieRowBlock * block = allocator.newBlock();
- done = block->readFromStream(stream);
- if (junction.isStopping() || block->empty() || !queue->enqueue(block))
- {
- block->releaseBlock();
- break;
- }
- }
- queue->noteWriterStopped();
- junction.noteProducerFinished(stream);
- }
- void setInput(IEngineRowStream * _input)
- {
- stream = _input;
- }
- void setQueue(IRowQueue * _queue)
- {
- queue = _queue;
- }
- void stopInput()
- {
- stream->stop();
- }
- protected:
- CStrandJunction & junction;
- RowBlockAllocator & allocator;
- IEngineRowStream * stream;
- IRowQueue * queue;
- };
- class StreamFromBlockQueue : public CInterfaceOf<IEngineRowStream>
- {
- public:
- StreamFromBlockQueue(CStrandJunction & _junction, IRowQueue & _queue) : junction(_junction), queue(_queue)
- {
- curBlock = NULL;
- }
- ~StreamFromBlockQueue()
- {
- reset();
- }
- virtual const void *nextRow()
- {
- const void * ret;
- for (;;)
- {
- if (curBlock)
- {
- if (curBlock->nextRow(ret))
- return ret;
- if (!pendingException)
- pendingException.setown(curBlock->getClearException());
- curBlock->releaseBlock();
- curBlock = NULL;
- }
- const void * next;
- if (!queue.dequeue(next))
- {
- //If inputs are unordered, process exceptions last of all
- if (pendingException)
- throw pendingException.getClear();
- return NULL;
- }
- curBlock = (RoxieRowBlock *)next;
- }
- }
- virtual void stop()
- {
- queue.noteReaderStopped();
- junction.processConsumerStop();
- }
- virtual void resetEOF()
- {
- throwUnexpectedX("resetEOF called on BlockedReader");
- }
- void reset()
- {
- if (curBlock)
- curBlock->releaseBlock();
- curBlock = NULL;
- pendingException.clear();
- }
- protected:
- CStrandJunction & junction;
- IRowQueue & queue;
- RoxieRowBlock * curBlock;
- Owned<IException> pendingException;
- };
- //---------------------------------------------------------------------------------------------------------------------
- class BlockedManyToOneJunction : public CStrandJunction
- {
- public:
- BlockedManyToOneJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize, IRowQueue * _queue)
- : CStrandJunction(_numStrands, _numStrands), queue(_queue), allocator(_rowManager, blockSize), consumer(*this, *_queue)
- {
- producers = new StreamToBlockQueueThread * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- producers[i] = new StreamToBlockQueueThread(*this, queue, allocator);
- }
- ~BlockedManyToOneJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->Release();
- delete [] producers;
- }
- virtual IEngineRowStream * queryOutput(unsigned n)
- {
- assertex(n == 0);
- return &consumer;
- }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n < numStrands);
- producers[n]->setInput(_stream);
- }
- virtual void abort()
- {
- queue->abort();
- }
- virtual void reset()
- {
- consumer.reset();
- resetBlockQueue(queue);
- CStrandJunction::reset();
- }
- virtual void start()
- {
- for (unsigned i=0; i < numStrands; i++)
- startProducerThread(*producers[i]);
- noteStarted();
- }
- static BlockedManyToOneJunction * create(roxiemem::IRowManager & rowManager, unsigned numStrands, unsigned blockSize)
- {
- const unsigned maxQueued = numStrands * 4;
- Owned<IRowQueue> queue = createRowQueue(1, numStrands, maxQueued, 0);
- return new BlockedManyToOneJunction(rowManager, numStrands, blockSize, queue.getClear());
- }
- protected:
- virtual void stopInactiveProducers()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->stopInput();
- }
- protected:
- Owned<IRowQueue> queue;
- RowBlockAllocator allocator;
- StreamToBlockQueueThread * * producers;
- StreamFromBlockQueue consumer;
- };
- //---------------------------------------------------------------------------------------------------------------------
- class BlockedOneToManyJunction : public CStrandJunction
- {
- public:
- BlockedOneToManyJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _maxQueueItems, unsigned _blockSize)
- : CStrandJunction(1, _numStrands), allocator(_rowManager, _blockSize), producer(*this, NULL, allocator)
- {
- queue.setown(createRowQueue(numStrands, 1, _maxQueueItems, 0));
- producer.setQueue(queue);
- consumers = new StreamFromBlockQueue * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- consumers[i] = new StreamFromBlockQueue(*this, *queue);
- }
- ~BlockedOneToManyJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- consumers[i]->Release();
- delete [] consumers;
- }
- virtual IEngineRowStream * queryOutput(unsigned n)
- {
- assertex(n < numStrands);
- return consumers[n];
- }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n == 0);
- producer.setInput(_stream);
- }
- virtual void abort()
- {
- queue->abort();
- }
- virtual void reset()
- {
- resetBlockQueue(queue);
- for (unsigned i=0; i < numStrands; i++)
- consumers[i]->reset();
- CStrandJunction::reset();
- }
- virtual void start()
- {
- startProducerThread(producer);
- noteStarted();
- }
- protected:
- virtual void stopInactiveProducers()
- {
- for (unsigned i=0; i < numStrands; i++)
- producer.stopInput();
- }
- protected:
- Owned<IRowQueue> queue;
- RowBlockAllocator allocator;
- StreamToBlockQueueThread producer;
- StreamFromBlockQueue * * consumers;
- };
- //---------------------------------------------------------------------------------------------------------------------
- //Trivial single element queue
- class SingleItemBlockQueue
- {
- public:
- SingleItemBlockQueue() : avail(0U), space(1U)
- {
- }
- ~SingleItemBlockQueue()
- {
- if (value)
- value->releaseBlock();
- }
- void abort()
- {
- abortSoon = true;
- avail.signal();
- space.signal();
- }
- void reset()
- {
- avail.reinit(0);
- space.reinit(1);
- if (value)
- {
- value->releaseBlock();
- value = NULL;
- }
- abortSoon = false;
- finishedWriting = false;
- finishedReading = false;
- }
- bool enqueue(RoxieRowBlock * next)
- {
- if (abortSoon || finishedReading)
- return false;
- space.wait();
- if (abortSoon || finishedReading)
- return false;
- value = next;
- avail.signal();
- return true;
- }
- void noteReaderStopped()
- {
- if (abortSoon)
- return;
- finishedReading = true;
- space.signal();
- }
- void noteWriterStopped()
- {
- if (abortSoon)
- return;
- space.wait();
- finishedWriting = true;
- avail.signal();
- }
- bool dequeue(RoxieRowBlock * & ret)
- {
- if (abortSoon)
- return false;
- avail.wait();
- if (abortSoon)
- return false;
- if (finishedWriting)
- {
- avail.signal();
- return false;
- }
- ret = value;
- value = NULL;
- space.signal();
- return true;
- }
- protected:
- RoxieRowBlock * value = nullptr;
- bool abortSoon = false;
- bool finishedWriting = false;
- bool finishedReading = false;
- Semaphore space __attribute__((aligned(CACHE_LINE_SIZE)));
- Semaphore avail __attribute__((aligned(CACHE_LINE_SIZE)));
- };
- //may replace with
- typedef SingleItemBlockQueue OrderedJunctionBlockQueue;
- class OrderedReadAheadThread : public CInterface, implements IThreaded
- {
- //friend class OrderedManyToOneJunction;
- public:
- OrderedReadAheadThread(CStrandJunction & _junction, RowBlockAllocator & _allocator) : junction(_junction), stream(NULL), allocator(_allocator)
- {
- finished = false;
- alive = true;
- }
- virtual void threadmain() override
- {
- bool done = false;
- while (!done)
- {
- RoxieRowBlock * block = allocator.newBlock();
- done = block->readFromStream(stream);
- if (block->empty() || !queue.enqueue(block))
- {
- block->releaseBlock();
- break;
- }
- }
- noteWriterStopped();
- junction.noteProducerFinished(stream);
- }
- void abort()
- {
- queue.abort();
- }
- void noteFinished()
- {
- assertex(finished);
- assertex(alive);
- alive = false;
- }
- void reset()
- {
- queue.reset();
- finished = false;
- alive = true;
- }
- void stopInput()
- {
- stream->stop();
- }
- void setInput(IEngineRowStream * _input)
- {
- stream = _input;
- }
- void noteWriterStopped()
- {
- finished = true;
- queue.noteWriterStopped();
- }
- inline bool isAlive() const { return alive; }
- inline OrderedJunctionBlockQueue & queryQueue() { return queue; }
- protected:
- CStrandJunction & junction;
- RowBlockAllocator & allocator;
- IEngineRowStream * stream;
- OrderedJunctionBlockQueue queue;
- bool finished;
- bool alive;
- };
- //This class primarily supports multi-stranded source activities which immediately feed into a single-stranded activity.
- class OrderedManyToOneJunction : public CStrandJunction, implements IEngineRowStream
- {
- public:
- IMPLEMENT_IINTERFACE_USING(CStrandJunction)
- OrderedManyToOneJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize)
- : CStrandJunction(_numStrands, _numStrands), allocator(_rowManager, blockSize)
- {
- producers = new OrderedReadAheadThread * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- producers[i] = new OrderedReadAheadThread(*this, allocator);
- curBlock = NULL;
- curStrand = 0;
- numActiveStrands = numStrands;
- }
- ~OrderedManyToOneJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->Release();
- delete [] producers;
- }
- virtual IEngineRowStream * queryOutput(unsigned n)
- {
- assertex(n == 0);
- return this;
- }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n < numStrands);
- producers[n]->setInput(_stream);
- }
- virtual void abort()
- {
- abortProducers();
- }
- virtual void reset()
- {
- if (curBlock)
- {
- curBlock->releaseBlock();
- curBlock = NULL;
- }
- for (unsigned strand=0; strand < numStrands; strand++)
- producers[strand]->reset();
- curStrand = 0;
- numActiveStrands = numStrands;
- CStrandJunction::reset();
- }
- virtual void start()
- {
- for (unsigned i=0; i < numStrands; i++)
- startProducerThread(*producers[i]);
- noteStarted();
- }
- virtual const void *nextRow()
- {
- if (numActiveStrands == 0)
- return NULL;
- for (;;)
- {
- if (curBlock)
- {
- const void * ret;
- if (curBlock->nextRow(ret))
- return ret;
- curBlock->throwAnyPendingException();
- bool isEnd = curBlock->isEndOfChunk();
- curBlock->releaseBlock();
- curBlock = NULL;
- if (isEnd)
- nextStrand();
- }
- for (;;)
- {
- OrderedReadAheadThread & curProducer = *(producers[curStrand]);
- OrderedJunctionBlockQueue & queue = curProducer.queryQueue();
- if (!queue.dequeue(curBlock))
- {
- //Abort requested
- numActiveStrands = 0;
- return NULL;
- }
- //DBGLOG("active(%d) strand(%d)", numActiveStrands, curStrand);
- if (curBlock)
- break;
- curProducer.noteFinished();
- if (--numActiveStrands == 0)
- return NULL;
- nextStrand();
- }
- }
- }
- virtual void stop()
- {
- //reading no more records => abort the queue and prevent the producers adding any more rows
- abortProducers();
- processConsumerStop();
- }
- virtual void resetEOF()
- {
- throwUnexpectedX("resetEOF called on OrderedManyToOneJunction");
- }
- protected:
- void nextStrand()
- {
- do
- {
- curStrand++;
- if (curStrand == numStrands)
- curStrand = 0;
- } while (!producers[curStrand]->isAlive());
- }
- virtual void stopInactiveProducers()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->stopInput();
- }
- void abortProducers()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->abort();
- }
- protected:
- unsigned numActiveStrands;
- RowBlockAllocator allocator;
- OrderedReadAheadThread * * producers;
- RoxieRowBlock * curBlock;
- unsigned curStrand;
- };
- //---------------------------------------------------------------------------------------------------------------------
- IStrandJunction * createStrandJunction(roxiemem::IRowManager & rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered)
- {
- if ((numInputs == 1) && (numOutputs == 1))
- return new OneToOneJunction();
- if (blockSize == 0)
- blockSize = DEFAULT_ROWBLOCK_SIZE;
- if (numOutputs == 1)
- {
- if (isOrdered)
- return new OrderedManyToOneJunction(rowManager, numInputs, blockSize);
- return BlockedManyToOneJunction::create(rowManager, numInputs, blockSize);
- }
- if (numInputs == 1)
- {
- unsigned maxQueueItems = numOutputs * 2;
- return new BlockedOneToManyJunction(rowManager, numOutputs, maxQueueItems, blockSize);
- }
- //More: We could implement M:N using the existing base classes if there was a need
- UNIMPLEMENTED_X("createStrandJunction M:N");
- }
- void clearRowQueue(IRowQueue * queue)
- {
- const void * next;
- while (queue->tryDequeue(next))
- ReleaseRoxieRow(next);
- }
- //---------------------------------------------------------------------------------------------------------------------
- //Class for managing processing on a single ordered strand
- class OrderedStrandRowBlockInput : public CInterfaceOf<IEngineRowStream>
- {
- friend class OrderedManyToOneJunction;
- public:
- OrderedStrandRowBlockInput(CStrandJunction & _splitJunction, IOrderedOutputCallback & _callback)
- : splitJunction(_splitJunction), callback(_callback)
- {
- curInputBlock = nullptr;
- }
- //interface IEngineRowStream for the rows being supplied to the strand.
- virtual const void *nextRow()
- {
- for (;;)
- {
- if (!curInputBlock)
- {
- if (!inputQueue.dequeue(curInputBlock))
- {
- callback.noteEndOfInput();
- return NULL;
- }
- }
- const void * row;
- if (curInputBlock->nextRow(row))
- return row;
- curInputBlock->throwAnyPendingException();
- if (curInputBlock->isEndOfChunk())
- {
- if (!callback.noteEndOfInputChunk())
- inputQueue.abort();
- }
- curInputBlock->releaseBlock();
- curInputBlock = NULL;
- }
- }
- virtual void stop()
- {
- //reading no more records => abort the queue and prevent the producer adding any more rows
- inputQueue.noteReaderStopped();
- splitJunction.processConsumerStop();
- }
- virtual void resetEOF()
- {
- throwUnexpectedX("resetEOF called on OrderedStrand");
- }
- void abort()
- {
- inputQueue.abort();
- }
- void reset()
- {
- inputQueue.reset();
- if (curInputBlock)
- curInputBlock->releaseBlock();
- curInputBlock = nullptr;
- }
- void noteWriterStopped()
- {
- queryInputQueue().noteWriterStopped();
- }
- inline OrderedJunctionBlockQueue & queryInputQueue() { return inputQueue; }
- protected:
- CStrandJunction & splitJunction;
- IOrderedOutputCallback & callback;
- OrderedJunctionBlockQueue inputQueue;
- RoxieRowBlock * curInputBlock;
- };
- class OrderedInputJunction : public CStrandJunction, implements IThreaded
- {
- public:
- OrderedInputJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _blockSize, bool _isGrouped, IOrderedCallbackCollection * callbacks)
- : CStrandJunction(1, _numStrands), inputBlockAllocator(_rowManager, _blockSize), isGrouped(_isGrouped)
- {
- strands = new OrderedStrandRowBlockInput * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- strands[i] = new OrderedStrandRowBlockInput(*this, *callbacks->queryCallback(i));
- blockSize = inputBlockAllocator.maxRowsPerBlock();
- minGroupBlockSize = (blockSize * 7 + 4) / 8; // Fill with groups until at least 7/8 filled.
- assertex(minGroupBlockSize != 0);
- }
- ~OrderedInputJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- delete strands[i];
- delete [] strands;
- }
- virtual IEngineRowStream * queryOutput(unsigned n) { assertex(n < numStrands); return strands[n]; }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n==0);
- input = _stream;
- }
- virtual void start()
- {
- startProducerThread(*this);
- noteStarted();
- }
- virtual void abort()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->abort();
- }
- virtual void reset()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->reset();
- CStrandJunction::reset();
- }
- virtual void stopInactiveProducers()
- {
- input->stop();
- }
- virtual void threadmain() override
- {
- unsigned curStrand = 0;
- bool done = false;
- size32_t endChunkSize = minGroupBlockSize-1;
- const void * prev = nullptr; // only checked if non null, never dereferenced
- while (!done)
- {
- RoxieRowBlock * block = inputBlockAllocator.newBlock();
- bool isEndOfChunk = !isGrouped;
- try
- {
- if (isGrouped)
- {
- for (;;)
- {
- const void * row = input->nextRow();
- if (!row)
- {
- if (!prev)
- {
- done = true;
- isEndOfChunk = true;
- }
- else if (block->numRows() >= endChunkSize)
- isEndOfChunk = true;
- }
- prev = row;
- if (block->addRowNowFull(row) || isEndOfChunk)
- break;
- }
- }
- else
- {
- //MORE: This could more efficiently loop 0..blockSize-1 and remove the test in addRowNowFull()
- for (;;)
- {
- const void * row = input->nextRow();
- if (unlikely(!row))
- {
- //Some activities rely on two successive nulls to indicate end of file, so read another row and check it
- row = input->nextRow();
- assertex(!row);
- done = true;
- break;
- }
- else
- {
- if (block->addRowNowFull(row))
- break;
- }
- if (isStopping())
- break;
- }
- }
- }
- catch (IException * e)
- {
- //MORE: Protect against exceptions, ensure exception is fed and processed by the strand. (Otherwise read ahead may cause
- //premature failure...
- block->setExceptionOwn(e);
- done = true;
- }
- if (isEndOfChunk)
- {
- block->setEndOfChunk();
- endChunkSize = minGroupBlockSize-1;
- }
- else
- endChunkSize = 0; // Switch to the next strand as soon as an end of group is encountered
- if (block->empty() || !strands[curStrand]->queryInputQueue().enqueue(block))
- {
- block->releaseBlock();
- break;
- }
- if (isEndOfChunk)
- {
- curStrand = curStrand+1;
- if (curStrand == numStrands)
- curStrand = 0;
- }
- }
- for (unsigned i=0; i < numStrands; i++)
- {
- strands[curStrand]->noteWriterStopped();
- curStrand = curStrand+1;
- if (curStrand == numStrands)
- curStrand = 0;
- }
- noteProducerFinished(input);
- }
- protected:
- RowBlockAllocator inputBlockAllocator;
- OrderedStrandRowBlockInput * * strands;
- IEngineRowStream * input = nullptr;
- unsigned blockSize;
- unsigned minGroupBlockSize;
- bool isGrouped;
- };
- //---------------------------------------------------------------------------------------------------------------------
- //Class for reading input from a streaming source activity.
- class OrderedStrandStreamInput : public CInterfaceOf<IEngineRowStream>
- {
- friend class OrderedManyToOneJunction;
- public:
- OrderedStrandStreamInput(CStrandJunction & _splitJunction, IOrderedOutputCallback & _callback)
- : splitJunction(_splitJunction), callback(_callback)
- {
- }
- void setInput(IEngineRowStream * _input)
- {
- stream = _input;
- }
- //interface IEngineRowStream for the rows being supplied to the strand.
- virtual const void *nextRow()
- {
- for (;;)
- {
- if (eof)
- return NULL;
- const void * row = stream->nextRow();
- if (likely(row != &endOfSectionMarker))
- return row;
- if (!callback.noteEndOfInputChunk())
- eof = true;
- }
- }
- virtual void stop()
- {
- stream->stop();
- }
- virtual void resetEOF()
- {
- throwUnexpectedX("resetEOF called on OrderedStrand");
- }
- void abort()
- {
- eof = true;
- //MORE: provide a callback for notifying the source?
- }
- void reset()
- {
- eof = false;
- }
- protected:
- CStrandJunction & splitJunction;
- IOrderedOutputCallback & callback;
- IRowStream * stream = nullptr;
- bool eof = false;
- };
- class OrderedSourceJunction : public CStrandJunction
- {
- public:
- OrderedSourceJunction(unsigned _numStrands, IOrderedCallbackCollection * callbacks)
- : CStrandJunction(_numStrands, _numStrands)
- {
- strands = new OrderedStrandStreamInput * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- strands[i] = new OrderedStrandStreamInput(*this, *callbacks->queryCallback(i));
- }
- ~OrderedSourceJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- delete strands[i];
- delete [] strands;
- }
- virtual IEngineRowStream * queryOutput(unsigned n)
- {
- assertex(n < numStrands);
- return strands[n];
- }
- virtual void reset()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->reset();
- CStrandJunction::reset();
- }
- virtual void setInput(unsigned n, IEngineRowStream * _stream)
- {
- assertex(n < numStrands);
- strands[n]->setInput(_stream);
- }
- virtual void start()
- {
- noteStarted();
- }
- virtual void abort()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->abort();
- }
- virtual void stopActiveProducers()
- {
- throwUnexpected();
- }
- virtual void stopInactiveProducers()
- {
- throwUnexpected();
- }
- protected:
- OrderedStrandStreamInput * * strands;
- };
- //---------------------------------------------------------------------------------------------------------------------
- class OrderedStrandRowBlockOutput : public CInterface, implements IThreaded, implements IOrderedOutputCallback
- {
- friend class OrderedManyToOneJunction;
- public:
- OrderedStrandRowBlockOutput(CStrandJunction & _joinJunction, RowBlockAllocator & _allocator)
- : joinJunction(_joinJunction), allocator(_allocator)
- {
- }
- //IThreaded - threadmain function used to read rows from the strand and add to the output
- virtual void threadmain() override
- {
- bool done = false;
- while (!done)
- {
- try
- {
- for (;;)
- {
- const void * row = strand->nextRow();
- //NB: Need to be check the final eog isn't lost when processing sequentially
- if (!row && eoi)
- {
- done = true;
- break;
- }
- //curOutputBlock may be modified within the call to strand->nextRow() above
- //(but not by any other threads)
- if (!curOutputBlock)
- curOutputBlock = allocator.newBlock();
- if (curOutputBlock->addRowNowFull(row))
- break;
- }
- }
- catch (IException * e)
- {
- if (!curOutputBlock)
- curOutputBlock = allocator.newBlock();
- curOutputBlock->setExceptionOwn(e);
- done = true;
- }
- if (curOutputBlock)
- {
- if (curOutputBlock->empty() || !outputQueue.enqueue(curOutputBlock))
- {
- curOutputBlock->releaseBlock();
- curOutputBlock = NULL;
- break;
- }
- curOutputBlock = NULL;
- }
- }
- finished = true;
- outputQueue.noteWriterStopped();
- joinJunction.noteProducerFinished(strand);
- }
- virtual bool noteEndOfInputChunk()
- {
- if (!curOutputBlock)
- curOutputBlock = allocator.newBlock();
- curOutputBlock->setEndOfChunk();
- bool success = true;
- if (!outputQueue.enqueue(curOutputBlock))
- {
- curOutputBlock->releaseBlock();
- success = false;
- }
- curOutputBlock = NULL;
- return success;
- }
- virtual void noteEndOfInput()
- {
- eoi = true;
- }
- void abort()
- {
- outputQueue.abort();
- }
- void reset()
- {
- outputQueue.reset();
- if (curOutputBlock)
- curOutputBlock->releaseBlock();
- curOutputBlock = nullptr;
- finished = false;
- alive = true;
- eoi = false;
- }
- void setStrand(IEngineRowStream * _input)
- {
- strand = _input;
- }
- void stopStrand()
- {
- strand->stop();
- }
- inline OrderedJunctionBlockQueue & queryOutputQueue() { return outputQueue; }
- protected:
- CStrandJunction & joinJunction;
- RowBlockAllocator & allocator;
- IEngineRowStream * strand = nullptr; // the stream that executes in parallel processing the stream of rows
- OrderedJunctionBlockQueue outputQueue;
- RoxieRowBlock * curOutputBlock = nullptr;
- bool finished = false;
- bool alive = true;
- bool eoi = false;
- };
- class OrderedOutputJunction : public CStrandJunction, implements IEngineRowStream, implements IOrderedCallbackCollection
- {
- public:
- OrderedOutputJunction(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned _blockSize)
- : CStrandJunction(_numStrands, _numStrands), outputBlockAllocator(_rowManager, _blockSize)
- {
- strands = new OrderedStrandRowBlockOutput * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- strands[i] = new OrderedStrandRowBlockOutput(*this, outputBlockAllocator);
- }
- ~OrderedOutputJunction()
- {
- for (unsigned i=0; i < numStrands; i++)
- delete strands[i];
- delete [] strands;
- }
- IMPLEMENT_IINTERFACE_USING(CStrandJunction)
- virtual IEngineRowStream * queryOutput(unsigned n) { assertex(n==0); return this; }
- virtual void setInput(unsigned n, IEngineRowStream * _stream) { strands[n]->setStrand(_stream); }
- virtual void start()
- {
- for (unsigned i=0; i < numStrands; i++)
- startProducerThread(*strands[i]);
- noteStarted();
- }
- virtual void abort()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->abort();
- }
- virtual void reset()
- {
- curOutputStrand = 0;
- if (curOutputBlock)
- curOutputBlock->releaseBlock();
- curOutputBlock = nullptr;
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->reset();
- CStrandJunction::reset();
- }
- virtual void stopInactiveProducers()
- {
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->stopStrand();
- }
- virtual IOrderedOutputCallback * queryCallback(unsigned i)
- {
- assertex(i < numStrands);
- return strands[i];
- }
- //implementation of IEngineRowStream
- virtual const void *nextRow()
- {
- for (;;)
- {
- if (likely(curOutputBlock))
- {
- const void * result;
- if (curOutputBlock->nextRow(result))
- return result;
- curOutputBlock->throwAnyPendingException();
- if (curOutputBlock->isEndOfChunk())
- {
- curOutputStrand++;
- if (curOutputStrand == numStrands)
- curOutputStrand = 0;
- }
- curOutputBlock->releaseBlock();
- curOutputBlock = NULL;
- }
- if (!strands[curOutputStrand]->queryOutputQueue().dequeue(curOutputBlock))
- {
- //If there is no more output on the next strand, then all the strands will have finished processing.
- return NULL;
- }
- }
- }
- virtual void stop()
- {
- //reading no more records => abort all strand queues and prevent the producers adding any more rows
- for (unsigned i=0; i < numStrands; i++)
- strands[i]->abort();
- processConsumerStop();
- }
- virtual void resetEOF()
- {
- throwUnexpectedX("resetEOF called on OrderedStrandBranch");
- }
- protected:
- RowBlockAllocator outputBlockAllocator;
- OrderedStrandRowBlockOutput * * strands;
- RoxieRowBlock * curOutputBlock = nullptr;
- unsigned curOutputStrand = 0;
- };
- //---------------------------------------------------------------------------------------------------------------------
- class CStrandBranch : public CInterfaceOf<IStrandBranch>
- {
- public:
- CStrandBranch(IStrandJunction * _input, IStrandJunction * _output) : input(_input), output(_output)
- {
- }
- virtual IStrandJunction * queryInputJunction()
- {
- return input;
- }
- virtual IStrandJunction * queryOutputJunction()
- {
- return output;
- }
- protected:
- Linked<IStrandJunction> input;
- Linked<IStrandJunction> output;
- };
- //---------------------------------------------------------------------------------------------------------------------
- extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped, bool inputIsStreamed, IOrderedCallbackCollection * orderedCallbacks)
- {
- Linked<IStrandJunction> input;
- Linked<IStrandJunction> output;
- //Slightly inefficient to go via a junction, but makes the testing code simpler!
- assertex(numStrands);
- if (numStrands == 1)
- {
- input.setown(new OneToOneJunction());
- output.setown(new OneToOneJunction());
- }
- else if (isOrdered || isGrouped)
- {
- //MORE To allow parallel sinks allow callbacks to be provided and have a dummy output junction or none
- //But the number will not be known until the branch is created, so the callback will need to create them
- IOrderedCallbackCollection * callbacks = orderedCallbacks;
- if (!callbacks)
- {
- Owned<OrderedOutputJunction> outputJunction = new OrderedOutputJunction(rowManager, numStrands, blockSize);
- callbacks = outputJunction;
- output.setown(outputJunction.getClear());
- }
- if (inputIsStreamed)
- {
- input.setown(new OrderedSourceJunction(numStrands, callbacks));
- }
- else
- {
- input.setown(new OrderedInputJunction(rowManager, numStrands, blockSize, isGrouped, callbacks));
- }
- }
- else
- {
- input.setown(createStrandJunction( rowManager, 1, numStrands, blockSize, false));
- output.setown(createStrandJunction( rowManager, numStrands, 1, blockSize, false));
- }
- return new CStrandBranch(input, output);
- }
- //---------------------------------------------------------------------------------------------------------------------
- class BlockedRowStreamWriter : public CInterfaceOf<IRowWriterEx>
- {
- public:
- BlockedRowStreamWriter(IRowQueue * _queue, RowBlockAllocator & _allocator) : queue(_queue), allocator(_allocator)
- {
- curBlock = NULL;
- }
- virtual void putRow(const void *row)
- {
- if (!curBlock)
- curBlock = allocator.newBlock();
- if (curBlock->addRowNowFull(row))
- {
- if (!queue->enqueue(curBlock))
- curBlock->releaseBlock();
- curBlock = NULL;
- }
- }
- virtual void flush()
- {
- if (curBlock)
- {
- if (!queue->enqueue(curBlock))
- curBlock->releaseBlock();
- curBlock = NULL;
- }
- }
- virtual void noteStopped()
- {
- flush();
- queue->noteWriterStopped();
- }
- protected:
- IRowQueue * queue;
- RowBlockAllocator & allocator;
- RoxieRowBlock * curBlock;
- };
- class UnorderedManyToOneRowStream : public CInterfaceOf<IManyToOneRowStream>
- {
- public:
- UnorderedManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned _numStrands, unsigned blockSize) : numStrands(_numStrands), allocator(_rowManager, blockSize)
- {
- const unsigned maxQueued = numStrands * 4;
- queue.setown(createRowQueue(1, numStrands, maxQueued, 0));
- producers = new BlockedRowStreamWriter * [numStrands];
- for (unsigned i=0; i < numStrands; i++)
- producers[i] = new BlockedRowStreamWriter(queue, allocator);
- curBlock = NULL;
- }
- ~UnorderedManyToOneRowStream()
- {
- for (unsigned i=0; i < numStrands; i++)
- producers[i]->Release();
- delete [] producers;
- }
- virtual void abort()
- {
- queue->abort();
- }
- virtual void stop()
- {
- queue->noteReaderStopped();
- }
- virtual void reset()
- {
- queue->reset();
- for (;;)
- {
- if (curBlock)
- curBlock->releaseBlock();
- const void * next;
- if (!queue->tryDequeue(next))
- break;
- curBlock = (RoxieRowBlock *)next;
- }
- curBlock = NULL;
- }
- virtual const void *nextRow()
- {
- const void * ret;
- for (;;)
- {
- if (curBlock)
- {
- if (curBlock->nextRow(ret))
- return ret;
- curBlock->releaseBlock();
- curBlock = NULL;
- }
- const void * next;
- if (!queue->dequeue(next))
- return NULL;
- curBlock = (RoxieRowBlock *)next;
- }
- }
- virtual IRowWriterEx * getWriter(unsigned n)
- {
- return LINK(producers[n]);
- }
- protected:
- unsigned numStrands;
- Owned<IRowQueue> queue;
- RowBlockAllocator allocator;
- BlockedRowStreamWriter * * producers;
- RoxieRowBlock * curBlock;
- };
- //---------------------------------------------------------------------------------------------------------------------
- extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered)
- {
- if (!isOrdered)
- return new UnorderedManyToOneRowStream(rowManager, numInputs, blockSize);
- return NULL;
- }
|