12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jexcept.hpp"
- #include "jmisc.hpp"
- #include "jthread.hpp"
- #include "jsocket.hpp"
- #include "jprop.hpp"
- #include "jdebug.hpp"
- #include "jlzw.hpp"
- #include "junicode.hpp"
- #include "eclhelper.hpp"
- #include "thorcommon.ipp"
- #include "eclrtl.hpp"
- #include "rtlread_imp.hpp"
- #include <algorithm>
- #ifdef _USE_NUMA
- #include <numa.h>
- #endif
- #include "thorstep.hpp"
- #define ROWAGG_PERROWOVERHEAD (sizeof(AggregateRowBuilder))
- RowAggregator::RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : helper(_helper)
- {
- comparer = _extra.queryCompareRowElement();
- hasher = _extra.queryHash();
- elementHasher = _extra.queryHashElement();
- elementComparer = _extra.queryCompareElements();
- cursor = NULL;
- eof = false;
- totalSize = overhead = 0;
- }
- RowAggregator::~RowAggregator()
- {
- reset();
- }
- void RowAggregator::start(IEngineRowAllocator *_rowAllocator)
- {
- rowAllocator.set(_rowAllocator);
- }
- void RowAggregator::reset()
- {
- while (!eof)
- {
- AggregateRowBuilder *n = nextResult();
- if (n)
- n->Release();
- }
- SuperHashTable::_releaseAll();
- eof = false;
- cursor = NULL;
- rowAllocator.clear();
- totalSize = overhead = 0;
- }
- AggregateRowBuilder &RowAggregator::addRow(const void * row)
- {
- AggregateRowBuilder *result;
- unsigned hash = hasher->hash(row);
- void * match = find(hash, row);
- if (match)
- {
- result = static_cast<AggregateRowBuilder *>(match);
- totalSize -= result->querySize();
- size32_t sz = helper.processNext(*result, row);
- result->setSize(sz);
- totalSize += sz;
- }
- else
- {
- Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
- helper.clearAggregate(*rowBuilder);
- size32_t sz = helper.processFirst(*rowBuilder, row);
- rowBuilder->setSize(sz);
- result = rowBuilder.getClear();
- addNew(result, hash);
- totalSize += sz;
- overhead += ROWAGG_PERROWOVERHEAD;
- }
- return *result;
- }
- void RowAggregator::mergeElement(const void * otherElement)
- {
- unsigned hash = elementHasher->hash(otherElement);
- void * match = findElement(hash, otherElement);
- if (match)
- {
- AggregateRowBuilder *rowBuilder = static_cast<AggregateRowBuilder *>(match);
- totalSize -= rowBuilder->querySize();
- size32_t sz = helper.mergeAggregate(*rowBuilder, otherElement);
- rowBuilder->setSize(sz);
- totalSize += sz;
- }
- else
- {
- Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
- rowBuilder->setSize(cloneRow(*rowBuilder, otherElement, rowAllocator->queryOutputMeta()));
- addNew(rowBuilder.getClear(), hash);
- }
- }
- const void * RowAggregator::getFindParam(const void *et) const
- {
- // Slightly odd name for this function... it actually gets the comparable element
- const AggregateRowBuilder *rb = static_cast<const AggregateRowBuilder*>(et);
- return rb->row();
- }
- bool RowAggregator::matchesFindParam(const void *et, const void *key, unsigned fphash) const
- {
- if (fphash != hashFromElement(et))
- return false;
- // et = element in the table (an AggregateRowBuilder) key = new row (in input row layout).
- return comparer->docompare(key, getFindParam(et)) == 0;
- }
- bool RowAggregator::matchesElement(const void *et, const void * searchET) const
- {
- return elementComparer->docompare(getFindParam(et), searchET) == 0;
- }
- AggregateRowBuilder *RowAggregator::nextResult()
- {
- void *ret = next(cursor);
- if (!ret)
- {
- eof = true;
- return NULL;
- }
- cursor = ret;
- return static_cast<AggregateRowBuilder *>(ret);
- }
- //=====================================================================================================
- void CStreamMerger::fillheap(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
- {
- assertex(activeInputs == 0);
- for(unsigned i = 0; i < numInputs; i++)
- if(pullInput(i, seek, numFields, stepExtra))
- mergeheap[activeInputs++] = i;
- }
- void CStreamMerger::permute(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
- {
- // the tree structure: element p has children p*2+1 and p*2+2, or element c has parent (unsigned)(c-1)/2
- // the heap property: no element should be smaller than its parent
- // the dedup variant: if(dedup), the top of the heap should also not be equal to either child
- // the method: establish this by starting with the parent of the bottom element and working up to the top element, sifting each down to its correct place
- if (activeInputs >= 2)
- for(unsigned p = (activeInputs-2)/2; p > 0; --p)
- siftDown(p);
- if(dedup)
- siftDownDedupTop(seek, numFields, stepExtra);
- else
- siftDown(0);
- }
- const void * CStreamMerger::consumeTop()
- {
- unsigned top = mergeheap[0];
- if (!pullConsumes)
- consumeInput(top);
- const void *next = pending[top];
- pending[top] = NULL;
- return next;
- }
- bool CStreamMerger::ensureNext(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra * stepExtra)
- {
- //wasCompleteMatch must be initialised from the actual row returned. (See bug #30388)
- if (first)
- {
- fillheap(seek, numFields, stepExtra);
- permute(seek, numFields, stepExtra);
- first = false;
- if (activeInputs == 0)
- return false;
- unsigned top = mergeheap[0];
- wasCompleteMatch = pendingMatches[top];
- return true;
- }
- while (activeInputs)
- {
- unsigned top = mergeheap[0];
- const void *next = pending[top];
- if (next)
- {
- if (seek)
- {
- int c = rangeCompare->docompare(next, seek, numFields);
- if (c >= 0)
- {
- if (stepExtra->returnMismatches() && (c > 0))
- {
- wasCompleteMatch = pendingMatches[top];
- return true;
- }
- else
- {
- if (pendingMatches[top])
- return true;
- }
- }
- }
- else
- {
- if (pendingMatches[top])
- return true;
- }
- skipInput(top);
- }
- if(!pullInput(top, seek, numFields, stepExtra))
- if(!promote(0))
- return false;
- // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
- if(dedup)
- siftDownDedupTop(seek, numFields, stepExtra);
- else
- siftDown(0);
- }
- return false;
- }
- bool CStreamMerger::ensureNext()
- {
- bool isCompleteMatch = true;
- return ensureNext(NULL, 0, isCompleteMatch, NULL);
- }
- void CStreamMerger::permute()
- {
- permute(NULL, 0, NULL);
- }
- bool CStreamMerger::promote(unsigned p)
- {
- activeInputs--;
- if(activeInputs == p)
- return false;
- mergeheap[p] = mergeheap[activeInputs];
- return true;
- }
- void CStreamMerger::siftDownDedupTop(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
- {
- // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
- if(activeInputs < 2)
- return;
- unsigned c = 1;
- int childcmp = 1;
- if(activeInputs >= 3)
- {
- childcmp = compare->docompare(pending[mergeheap[2]], pending[mergeheap[1]]);
- if(childcmp < 0)
- c = 2;
- }
- int cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
- if(cmp > 0)
- return;
- // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
- while(cmp <= 0)
- {
- if(cmp == 0)
- {
- if(mergeheap[c] < mergeheap[0])
- {
- unsigned r = mergeheap[c];
- mergeheap[c] = mergeheap[0];
- mergeheap[0] = r;
- }
- unsigned top = mergeheap[c];
- skipInput(top);
- if(!pullInput(top, seek, numFields, stepExtra))
- if(!promote(c))
- break;
- siftDown(c);
- }
- else
- {
- unsigned r = mergeheap[c];
- mergeheap[c] = mergeheap[0];
- mergeheap[0] = r;
- if(siftDown(c))
- break;
- }
- cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
- }
- // the following loop ensures the uniqueness property holds on the other branch too
- c = 3-c;
- if(activeInputs <= c)
- return;
- while(childcmp == 0)
- {
- if(mergeheap[c] < mergeheap[0])
- {
- unsigned r = mergeheap[c];
- mergeheap[c] = mergeheap[0];
- mergeheap[0] = r;
- }
- unsigned top = mergeheap[c];
- skipInput(top);
- if(!pullInput(top, seek, numFields, stepExtra))
- if(!promote(c))
- break;
- siftDown(c);
- childcmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
- }
- }
- void CStreamMerger::cleanup()
- {
- clearPending();
- delete [] pending;
- pending = NULL;
- delete [] pendingMatches;
- pendingMatches = NULL;
- delete [] mergeheap;
- mergeheap = NULL;
- }
- void CStreamMerger::clearPending()
- {
- if (pending && activeInputs)
- {
- for(unsigned i = 0; i < numInputs; i++)
- {
- if (pullConsumes)
- releaseRow(pending[i]);
- pending[i] = NULL;
- }
- activeInputs = 0;
- }
- first = true;
- }
- CStreamMerger::CStreamMerger(bool _pullConsumes)
- {
- pending = NULL;
- pendingMatches = NULL;
- mergeheap = NULL;
- compare = NULL;
- rangeCompare = NULL;
- dedup = false;
- activeInputs = 0;
- pullConsumes = _pullConsumes;
- numInputs = 0;
- first = true;
- }
- CStreamMerger::~CStreamMerger()
- {
- //can't call cleanup() because virtual releaseRow() won't be defined.
- // NOTE: use assert rather than assertex as exceptions from within destructors are not handled well.
- assert(!pending && !mergeheap);
- }
- void CStreamMerger::init(ICompare * _compare, bool _dedup, IRangeCompare * _rangeCompare)
- {
- compare = _compare;
- dedup = _dedup;
- rangeCompare = _rangeCompare;
- }
- void CStreamMerger::initInputs(unsigned _numInputs)
- {
- assertex(!pending); // cleanup should have been called before reinitializing
- numInputs = _numInputs;
- mergeheap = new unsigned[numInputs];
- pending = new const void *[numInputs];
- pendingMatches = new bool [numInputs];
- for (unsigned i = 0; i < numInputs; i++)
- pending[i] = NULL;
- activeInputs = 0;
- first = true;
- }
- void CStreamMerger::consumeInput(unsigned i)
- {
- //should be over-ridden if pullConsumes is false;
- throwUnexpected();
- }
- void CStreamMerger::skipInput(unsigned i)
- {
- if (!pullConsumes)
- consumeInput(i);
- releaseRow(pending[i]);
- pending[i] = NULL;
- }
- void CStreamMerger::primeRows(const void * * rows)
- {
- assertex(first && (activeInputs == 0));
- first = false;
- for(unsigned i = 0; i < numInputs; i++)
- {
- if ((pending[i] = rows[i]) != NULL)
- {
- mergeheap[activeInputs++] = i;
- pendingMatches[i] = true;
- }
- }
- permute();
- }
- const void * CStreamMerger::nextRow()
- {
- if (ensureNext())
- return consumeTop();
- return NULL;
- }
- const void * CStreamMerger::queryNextRow()
- {
- if (ensureNext())
- return pending[mergeheap[0]];
- return NULL;
- }
- unsigned CStreamMerger::queryNextInput()
- {
- if (ensureNext())
- return mergeheap[0];
- return NotFound;
- }
- const void * CStreamMerger::nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- if (ensureNext(seek, numFields, wasCompleteMatch, &stepExtra))
- return consumeTop();
- return NULL;
- }
- void CStreamMerger::skipRow()
- {
- assertex(!first);
- skipInput(mergeheap[0]);
- }
- //=====================================================================================================
- CThorDemoRowSerializer::CThorDemoRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
- {
- nesting = 0;
- }
- void CThorDemoRowSerializer::put(size32_t len, const void * ptr)
- {
- buffer.append(len, ptr);
- //ok to flush if nesting == 0;
- }
- size32_t CThorDemoRowSerializer::beginNested(size32_t count)
- {
- nesting++;
- unsigned pos = buffer.length();
- buffer.append((size32_t)0);
- return pos;
- }
- void CThorDemoRowSerializer::endNested(size32_t sizePos)
- {
- unsigned pos = buffer.length();
- buffer.rewrite(sizePos);
- buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
- buffer.rewrite(pos);
- nesting--;
- }
- IOutputRowSerializer * CachedOutputMetaData::createDiskSerializer(ICodeContext * ctx, unsigned activityId) const
- {
- if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
- return meta->createDiskSerializer(ctx, activityId);
- if (isFixedSize())
- return new CSimpleFixedRowSerializer(getFixedSize());
- return new CSimpleVariableRowSerializer(this);
- }
- IOutputRowDeserializer * CachedOutputMetaData::createDiskDeserializer(ICodeContext * ctx, unsigned activityId) const
- {
- if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
- return meta->createDiskDeserializer(ctx, activityId);
- if (isFixedSize())
- return new CSimpleFixedRowDeserializer(getFixedSize());
- throwUnexpectedX("createDiskDeserializer variable meta has no serializer");
- }
- IOutputRowSerializer * CachedOutputMetaData::createInternalSerializer(ICodeContext * ctx, unsigned activityId) const
- {
- if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
- return meta->createInternalSerializer(ctx, activityId);
- if (isFixedSize())
- return new CSimpleFixedRowSerializer(getFixedSize());
- return new CSimpleVariableRowSerializer(this);
- }
- IOutputRowDeserializer * CachedOutputMetaData::createInternalDeserializer(ICodeContext * ctx, unsigned activityId) const
- {
- if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
- return meta->createInternalDeserializer(ctx, activityId);
- if (isFixedSize())
- return new CSimpleFixedRowDeserializer(getFixedSize());
- throwUnexpectedX("createInternalDeserializer variable meta has no serializer");
- }
- void CSizingSerializer::put(size32_t len, const void * ptr)
- {
- totalsize += len;
- }
- size32_t CSizingSerializer::beginNested(size32_t count)
- {
- totalsize += sizeof(size32_t);
- return totalsize;
- }
- void CSizingSerializer::endNested(size32_t position)
- {
- }
- void CMemoryRowSerializer::put(size32_t len, const void * ptr)
- {
- buffer.append(len, ptr);
- }
- size32_t CMemoryRowSerializer::beginNested(size32_t count)
- {
- nesting++;
- unsigned pos = buffer.length();
- buffer.append((size32_t)0);
- return pos;
- }
- void CMemoryRowSerializer::endNested(size32_t sizePos)
- {
- size32_t sz = buffer.length()-(sizePos + sizeof(size32_t));
- buffer.writeDirect(sizePos,sizeof(sz),&sz);
- nesting--;
- }
- static void ensureClassesAreNotAbstract()
- {
- MemoryBuffer temp;
- CThorStreamDeserializerSource x1(NULL);
- CThorContiguousRowBuffer x2(NULL);
- CSizingSerializer x3;
- CMemoryRowSerializer x4(temp);
- }
- //=====================================================================================================
- //the visitor callback is used to ensure link counts for children are updated.
- size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta)
- {
- size32_t rowSize = meta->getRecordSize(row); // TBD could be better?
- byte * self = rowBuilder.ensureCapacity(rowSize, NULL);
- memcpy(self, row, rowSize);
- if (meta->getMetaFlags() & MDFneeddestruct)
- {
- ChildRowLinkerWalker walker;
- meta->walkIndirectMembers(self, walker);
- }
- return rowSize;
- }
- //---------------------------------------------------------------------------------------------------
- extern const char * getActivityText(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKnone: return "None";
- case TAKdiskwrite: return "Disk Write";
- case TAKsort: return "Sort";
- case TAKdedup: return "Dedup";
- case TAKfilter: return "Filter";
- case TAKsplit: return "Split";
- case TAKproject: return "Project";
- case TAKrollup: return "Rollup";
- case TAKiterate: return "Iterate";
- case TAKaggregate: return "Aggregate";
- case TAKhashaggregate: return "Hash Aggregate";
- case TAKfirstn: return "Firstn";
- case TAKsample: return "Sample";
- case TAKdegroup: return "Degroup";
- case TAKjoin: return "Join";
- case TAKhashjoin: return "Hash Join";
- case TAKlookupjoin: return "Lookup Join";
- case TAKselfjoin: return "Self Join";
- case TAKkeyedjoin: return "Keyed Join";
- case TAKgroup: return "Group";
- case TAKworkunitwrite: return "Output";
- case TAKfunnel: return "Funnel";
- case TAKapply: return "Apply";
- case TAKinlinetable: return "Inline Dataset";
- case TAKhashdistribute: return "Hash Distribute";
- case TAKhashdedup: return "Hash Dedup";
- case TAKnormalize: return "Normalize";
- case TAKremoteresult: return "Remote Result";
- case TAKpull: return "Pull";
- case TAKdenormalize: return "Denormalize";
- case TAKnormalizechild: return "Normalize Child";
- case TAKchilddataset: return "Child Dataset";
- case TAKselectn: return "Select Nth";
- case TAKenth: return "Enth";
- case TAKif: return "If";
- case TAKnull: return "Null";
- case TAKdistribution: return "Distribution";
- case TAKcountproject: return "Count Project";
- case TAKchoosesets: return "Choose Sets";
- case TAKpiperead: return "Pipe Read";
- case TAKpipewrite: return "Pipe Write";
- case TAKcsvwrite: return "Csv Write";
- case TAKpipethrough: return "Pipe Through";
- case TAKindexwrite: return "Index Write";
- case TAKchoosesetsenth: return "Choose Sets Enth";
- case TAKchoosesetslast: return "Choose Sets Last";
- case TAKfetch: return "Fetch";
- case TAKhashdenormalize: return "Hash Denormalize";
- case TAKworkunitread: return "Read";
- case TAKthroughaggregate: return "Through Aggregate";
- case TAKspill: return "Spill";
- case TAKcase: return "Case";
- case TAKlimit: return "Limit";
- case TAKcsvfetch: return "Csv Fetch";
- case TAKxmlwrite: return "Xml Write";
- case TAKjsonwrite: return "Json Write";
- case TAKparse: return "Parse";
- case TAKsideeffect: return "Simple Action";
- case TAKtopn: return "Top N";
- case TAKmerge: return "Merge";
- case TAKxmlfetch: return "Xml Fetch";
- case TAKjsonfetch: return "Json Fetch";
- case TAKxmlparse: return "Parse Xml";
- case TAKkeyeddistribute: return "Keyed Distribute";
- case TAKjoinlight: return "Lightweight Join";
- case TAKalljoin: return "All Join";
- case TAKsoap_rowdataset: return "SOAP dataset";
- case TAKsoap_rowaction: return "SOAP action";
- case TAKsoap_datasetdataset: return "SOAP dataset";
- case TAKsoap_datasetaction: return "SOAP action";
- case TAKkeydiff: return "Key Difference";
- case TAKkeypatch: return "Key Patch";
- case TAKkeyeddenormalize: return "Keyed Denormalize";
- case TAKsequential: return "Sequential";
- case TAKparallel: return "Parallel";
- case TAKchilditerator: return "Child Dataset";
- case TAKdatasetresult: return "Dataset Result";
- case TAKrowresult: return "Row Result";
- case TAKchildif: return "If";
- case TAKpartition: return "Partition Distribute";
- case TAKsubgraph: return "Sub Graph";
- case TAKlocalgraph: return "Local Graph";
- case TAKifaction: return "If Action";
- case TAKemptyaction: return "Empty Action";
- case TAKskiplimit: return "Skip Limit";
- case TAKdiskread: return "Disk Read";
- case TAKdisknormalize: return "Disk Normalize";
- case TAKdiskaggregate: return "Disk Aggregate";
- case TAKdiskcount: return "Disk Count";
- case TAKdiskgroupaggregate: return "Disk Grouped Aggregate";
- case TAKindexread: return "Index Read";
- case TAKindexnormalize: return "Index Normalize";
- case TAKindexaggregate: return "Index Aggregate";
- case TAKindexcount: return "Index Count";
- case TAKindexgroupaggregate: return "Index Grouped Aggregate";
- case TAKchildnormalize: return "Child Normalize";
- case TAKchildaggregate: return "Child Aggregate";
- case TAKchildgroupaggregate: return "Child Grouped Aggregate";
- case TAKchildthroughnormalize: return "Normalize";
- case TAKcsvread: return "Csv Read";
- case TAKxmlread: return "Xml Read";
- case TAKjsonread: return "Json Read";
- case TAKlocalresultread: return "Read Local Result";
- case TAKlocalresultwrite: return "Local Result";
- case TAKcombine: return "Combine";
- case TAKregroup: return "Regroup";
- case TAKrollupgroup: return "Rollup Group";
- case TAKcombinegroup: return "Combine Group";
- case TAKlookupdenormalize: return "Lookup Denormalize";
- case TAKalldenormalize: return "All Denormalize";
- case TAKdenormalizegroup: return "Denormalize Group";
- case TAKhashdenormalizegroup: return "Hash Denormalize Group";
- case TAKlookupdenormalizegroup: return "Lookup Denormalize Group";
- case TAKkeyeddenormalizegroup: return "Keyed Denormalize Group";
- case TAKalldenormalizegroup: return "All Denormalize Group";
- case TAKlocalresultspill: return "Spill Local Result";
- case TAKsimpleaction: return "Action";
- case TAKloopcount: return "Loop";
- case TAKlooprow: return "Loop";
- case TAKloopdataset: return "Loop";
- case TAKchildcase: return "Case";
- case TAKremotegraph: return "Remote";
- case TAKlibrarycall: return "Library Call";
- case TAKlocalstreamread: return "Read Input";
- case TAKprocess: return "Process";
- case TAKgraphloop: return "Graph";
- case TAKparallelgraphloop: return "Graph";
- case TAKgraphloopresultread: return "Graph Input";
- case TAKgraphloopresultwrite: return "Graph Result";
- case TAKgrouped: return "Grouped";
- case TAKsorted: return "Sorted";
- case TAKdistributed: return "Distributed";
- case TAKnwayjoin: return "Join";
- case TAKnwaymerge: return "Merge";
- case TAKnwaymergejoin: return "Merge Join";
- case TAKnwayinput: return "Nway Input";
- case TAKnwaygraphloopresultread: return "Nway Graph Input";
- case TAKnwayselect: return "Select Nway Input";
- case TAKnonempty: return "Non Empty";
- case TAKcreaterowlimit: return "OnFail Limit";
- case TAKexistsaggregate: return "Exists";
- case TAKcountaggregate: return "Count";
- case TAKprefetchproject: return "Prefetch Project";
- case TAKprefetchcountproject: return "Prefetch Count Project";
- case TAKfiltergroup: return "Filter Group";
- case TAKmemoryspillread: return "Read Spill";
- case TAKmemoryspillwrite: return "Write Spill";
- case TAKmemoryspillsplit: return "Spill";
- case TAKsection: return "Section";
- case TAKlinkedrawiterator: return "Child Dataset";
- case TAKnormalizelinkedchild: return "Normalize";
- case TAKfilterproject: return "Filtered Project";
- case TAKcatch: return "Catch";
- case TAKskipcatch: return "Skip Catch";
- case TAKcreaterowcatch: return "OnFail Catch";
- case TAKsectioninput: return "Section Input";
- case TAKindexgroupcount: return "Index Grouped Count";
- case TAKindexgroupexists: return "Index Grouped Exists";
- case TAKhashdistributemerge: return "Distribute Merge";
- case TAKselfjoinlight: return "Lightweight Self Join";
- case TAKwhen_dataset: return "When";
- case TAKhttp_rowdataset: return "HTTP dataset";
- case TAKstreamediterator: return "Streamed Dataset";
- case TAKexternalsource: return "User Source";
- case TAKexternalsink: return "User Output";
- case TAKexternalprocess: return "User Proceess";
- case TAKwhen_action: return "When";
- case TAKsubsort: return "Sub Sort";
- case TAKdictionaryworkunitwrite:return "Dictionary Write";
- case TAKdictionaryresultwrite: return "Dictionary Result";
- case TAKsmartjoin: return "Smart Join";
- case TAKsmartdenormalize: return "Smart Denormalize";
- case TAKsmartdenormalizegroup: return "Smart Denormalize Group";
- case TAKselfdenormalize: return "Self Denormalize";
- case TAKselfdenormalizegroup: return "Self Denormalize Group";
- case TAKtrace: return "Trace";
- case TAKquantile: return "Quantile";
- }
- throwUnexpected();
- }
- extern bool isActivitySource(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKpiperead:
- case TAKinlinetable:
- case TAKworkunitread:
- case TAKnull:
- case TAKsideeffect:
- case TAKsoap_rowdataset:
- case TAKsoap_rowaction:
- case TAKkeydiff:
- case TAKkeypatch:
- case TAKchilditerator:
- case TAKlocalgraph:
- case TAKemptyaction:
- case TAKdiskread:
- case TAKdisknormalize:
- case TAKdiskaggregate:
- case TAKdiskcount:
- case TAKdiskgroupaggregate:
- case TAKindexread:
- case TAKindexnormalize:
- case TAKindexaggregate:
- case TAKindexcount:
- case TAKindexgroupaggregate:
- case TAKchildnormalize:
- case TAKchildaggregate:
- case TAKchildgroupaggregate:
- case TAKcsvread:
- case TAKxmlread:
- case TAKjsonread:
- case TAKlocalresultread:
- case TAKsimpleaction:
- case TAKlocalstreamread:
- case TAKgraphloopresultread:
- case TAKnwaygraphloopresultread:
- case TAKlinkedrawiterator:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- case TAKstreamediterator:
- case TAKexternalsource:
- return true;
- }
- return false;
- }
- extern bool isActivitySink(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKdiskwrite:
- case TAKworkunitwrite:
- case TAKapply:
- case TAKremoteresult:
- case TAKdistribution:
- case TAKpipewrite:
- case TAKcsvwrite:
- case TAKindexwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- case TAKsoap_rowaction:
- case TAKsoap_datasetaction:
- case TAKkeydiff:
- case TAKkeypatch:
- case TAKdatasetresult:
- case TAKrowresult:
- case TAKemptyaction:
- case TAKlocalresultwrite:
- case TAKgraphloopresultwrite:
- case TAKsimpleaction:
- case TAKexternalsink:
- case TAKifaction:
- case TAKparallel:
- case TAKsequential:
- case TAKwhen_action:
- case TAKdictionaryworkunitwrite:
- case TAKdictionaryresultwrite:
- return true;
- }
- return false;
- }
- //=====================================================================================================
- CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
- {
- buffer = NULL;
- maxOffset = 0;
- readOffset = 0;
- }
- void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
- {
- ensureAccessible(readOffset + len);
- memcpy(ptr, buffer+readOffset, len);
- readOffset += len;
- }
- size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
- {
- doRead(len, ptr);
- return len;
- }
- size32_t CThorContiguousRowBuffer::readSize()
- {
- size32_t value;
- doRead(sizeof(value), &value);
- return value;
- }
- size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
- {
- size32_t size = sizePackedInt();
- doRead(size, ptr);
- return size;
- }
- size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
- {
- if (len == 0)
- return 0;
- size32_t size = sizeUtf8(len);
- byte * self = target.ensureCapacity(fixedSize + size, NULL);
- doRead(size, self+offset);
- return size;
- }
- size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
- {
- size32_t size = sizeVStr();
- byte * self = target.ensureCapacity(fixedSize + size, NULL);
- doRead(size, self+offset);
- return size;
- }
- size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
- {
- size32_t size = sizeVUni();
- byte * self = target.ensureCapacity(fixedSize + size, NULL);
- doRead(size, self+offset);
- return size;
- }
- size32_t CThorContiguousRowBuffer::sizePackedInt()
- {
- ensureAccessible(readOffset+1);
- return rtlGetPackedSizeFromFirst(buffer[readOffset]);
- }
- size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
- {
- if (len == 0)
- return 0;
- //The len is the number of utf characters, size depends on which characters are included.
- size32_t nextOffset = readOffset;
- while (len)
- {
- ensureAccessible(nextOffset+1);
- for (;nextOffset < maxOffset;)
- {
- nextOffset += readUtf8Size(buffer+nextOffset); // This function only accesses the first byte
- if (--len == 0)
- break;
- }
- }
- return nextOffset - readOffset;
- }
- size32_t CThorContiguousRowBuffer::sizeVStr()
- {
- size32_t nextOffset = readOffset;
- loop
- {
- ensureAccessible(nextOffset+1);
- for (; nextOffset < maxOffset; nextOffset++)
- {
- if (buffer[nextOffset] == 0)
- return (nextOffset + 1) - readOffset;
- }
- }
- }
- size32_t CThorContiguousRowBuffer::sizeVUni()
- {
- size32_t nextOffset = readOffset;
- const size32_t sizeOfUChar = 2;
- loop
- {
- ensureAccessible(nextOffset+sizeOfUChar);
- for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
- {
- if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
- return (nextOffset + sizeOfUChar) - readOffset;
- }
- }
- }
- void CThorContiguousRowBuffer::reportReadFail()
- {
- throwUnexpected();
- }
- const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
- {
- if (maxSize+readOffset > maxOffset)
- doPeek(maxSize+readOffset);
- return buffer + readOffset;
- }
- offset_t CThorContiguousRowBuffer::beginNested()
- {
- size32_t len = readSize();
- return len+readOffset;
- }
- bool CThorContiguousRowBuffer::finishedNested(offset_t & endPos)
- {
- return readOffset >= endPos;
- }
- void CThorContiguousRowBuffer::skip(size32_t size)
- {
- ensureAccessible(readOffset+size);
- readOffset += size;
- }
- void CThorContiguousRowBuffer::skipPackedInt()
- {
- size32_t size = sizePackedInt();
- ensureAccessible(readOffset+size);
- readOffset += size;
- }
- void CThorContiguousRowBuffer::skipUtf8(size32_t len)
- {
- size32_t size = sizeUtf8(len);
- ensureAccessible(readOffset+size);
- readOffset += size;
- }
- void CThorContiguousRowBuffer::skipVStr()
- {
- size32_t size = sizeVStr();
- ensureAccessible(readOffset+size);
- readOffset += size;
- }
- void CThorContiguousRowBuffer::skipVUni()
- {
- size32_t size = sizeVUni();
- ensureAccessible(readOffset+size);
- readOffset += size;
- }
- // ===========================================
- IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context)
- {
- class cRowInterfaces: public CSimpleInterface, implements IRowInterfaces
- {
- Linked<IOutputMetaData> meta;
- ICodeContext* context;
- unsigned actid;
- Linked<IEngineRowAllocator> allocator;
- Linked<IOutputRowSerializer> serializer;
- Linked<IOutputRowDeserializer> deserializer;
- CSingletonLock allocatorlock;
- CSingletonLock serializerlock;
- CSingletonLock deserializerlock;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, ICodeContext *_context)
- : meta(_meta)
- {
- context = _context;
- actid = _actid;
- }
- IEngineRowAllocator * queryRowAllocator()
- {
- if (allocatorlock.lock()) {
- if (!allocator&&meta)
- allocator.setown(context->getRowAllocator(meta, actid));
- allocatorlock.unlock();
- }
- return allocator;
- }
- IOutputRowSerializer * queryRowSerializer()
- {
- if (serializerlock.lock()) {
- if (!serializer&&meta)
- serializer.setown(meta->createDiskSerializer(context,actid));
- serializerlock.unlock();
- }
- return serializer;
- }
- IOutputRowDeserializer * queryRowDeserializer()
- {
- if (deserializerlock.lock()) {
- if (!deserializer&&meta)
- deserializer.setown(meta->createDiskDeserializer(context,actid));
- deserializerlock.unlock();
- }
- return deserializer;
- }
- IOutputMetaData *queryRowMetaData()
- {
- return meta;
- }
- unsigned queryActivityId() const
- {
- return actid;
- }
- ICodeContext *queryCodeContext()
- {
- return context;
- }
- };
- return new cRowInterfaces(meta,actid,context);
- };
- class CRowStreamReader : public CSimpleInterface, implements IExtRowStream
- {
- Linked<IFileIO> fileio;
- Linked<IMemoryMappedFile> mmfile;
- Linked<IOutputRowDeserializer> deserializer;
- Linked<IEngineRowAllocator> allocator;
- Owned<ISerialStream> strm;
- CThorStreamDeserializerSource source;
- Owned<ISourceRowPrefetcher> prefetcher;
- CThorContiguousRowBuffer prefetchBuffer; // used if prefetcher set
- bool grouped;
- unsigned __int64 maxrows;
- unsigned __int64 rownum;
- bool eoi;
- bool eos;
- bool eog;
- offset_t bufofs;
- #ifdef TRACE_CREATE
- static unsigned rdnum;
- #endif
- class : implements IFileSerialStreamCallback
- {
- public:
- CRC32 crc;
- void process(offset_t ofs, size32_t sz, const void *buf)
- {
- crc.tally(sz,buf);
- }
- } crccb;
-
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, bool _grouped)
- : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL)
- {
- #ifdef TRACE_CREATE
- PROGLOG("CRowStreamReader %d = %p",++rdnum,this);
- #endif
- maxrows = _maxrows;
- grouped = _grouped;
- eoi = false;
- eos = maxrows==0;
- eog = false;
- bufofs = 0;
- rownum = 0;
- if (fileio)
- strm.setown(createFileSerialStream(fileio,_ofs,_len,(size32_t)-1, _tallycrc?&crccb:NULL));
- else
- strm.setown(createFileSerialStream(mmfile,_ofs,_len,_tallycrc?&crccb:NULL));
- prefetcher.setown(rowif->queryRowMetaData()->createDiskPrefetcher(rowif->queryCodeContext(), rowif->queryActivityId()));
- if (prefetcher)
- prefetchBuffer.setStream(strm);
- source.setStream(strm);
- deserializer.set(rowif->queryRowDeserializer());
- }
- ~CRowStreamReader()
- {
- #ifdef TRACE_CREATE
- PROGLOG("~CRowStreamReader %d = %p",rdnum--,this);
- #endif
- }
- void reinit(offset_t _ofs,offset_t _len,unsigned __int64 _maxrows)
- {
- maxrows = _maxrows;
- eoi = false;
- eos = (maxrows==0)||(_len==0);
- eog = false;
- bufofs = 0;
- rownum = 0;
- strm->reset(_ofs,_len);
- }
- const void *nextRow()
- {
- if (eog) {
- eog = false;
- return NULL;
- }
- if (eos)
- return NULL;
- if (source.eos()) {
- eos = true;
- return NULL;
- }
- RtlDynamicRowBuilder rowBuilder(allocator);
- size_t size = deserializer->deserialize(rowBuilder,source);
- if (grouped && !eos) {
- byte b;
- source.read(sizeof(b),&b);
- eog = (b==1);
- }
- if (++rownum==maxrows)
- eos = true;
- return rowBuilder.finalizeRowClear(size);
- }
- const void *prefetchRow(size32_t *sz)
- {
- if (eog)
- eog = false;
- else if (!eos) {
- if (source.eos())
- eos = true;
- else {
- assertex(prefetcher);
- prefetcher->readAhead(prefetchBuffer);
- const byte * ret = prefetchBuffer.queryRow();
- if (sz)
- *sz = prefetchBuffer.queryRowSize();
- return ret;
- }
- }
- if (sz)
- sz = 0;
- return NULL;
- }
- void prefetchDone()
- {
- prefetchBuffer.finishedRow();
- if (grouped) {
- byte b;
- strm->get(sizeof(b),&b);
- eog = (b==1);
- }
- }
- virtual void stop()
- {
- stop(NULL);
- }
- void clear()
- {
- strm.clear();
- source.clearStream();
- fileio.clear();
- }
- void stop(CRC32 *crcout)
- {
- if (!eos) {
- eos = true;
- clear();
- }
- // NB CRC will only be right if stopped at eos
- if (crcout)
- *crcout = crccb.crc;
- }
- offset_t getOffset()
- {
- return source.tell();
- }
- virtual unsigned __int64 getStatistic(StatisticKind kind)
- {
- if (fileio)
- return fileio->getStatistic(kind);
- return 0;
- }
- };
- #ifdef TRACE_CREATE
- unsigned CRowStreamReader::rdnum;
- #endif
- bool UseMemoryMappedRead = false;
- IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp)
- {
- bool compressed = TestRwFlag(rwFlags, rw_compress);
- if (UseMemoryMappedRead && !compressed)
- {
- PROGLOG("Memory Mapped read of %s",file->queryFilename());
- Owned<IMemoryMappedFile> mmfile = file->openMemoryMapped();
- if (!mmfile)
- return NULL;
- return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
- }
- else
- {
- Owned<IFileIO> fileio;
- if (compressed)
- {
- // JCSMORE should pass in a flag for rw_compressblkcrc I think, doesn't look like it (or anywhere else)
- // checks the block crc's at the moment.
- fileio.setown(createCompressedFileReader(file, eexp, UseMemoryMappedRead));
- }
- else
- fileio.setown(file->open(IFOread));
- if (!fileio)
- return NULL;
- return new CRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
- }
- }
- IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowIf, unsigned rwFlags, IExpander *eexp)
- {
- return createRowStreamEx(file, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, eexp);
- }
- // Memory map sizes can be big, restrict to 64-bit platforms.
- void useMemoryMappedRead(bool on)
- {
- #if defined(_DEBUG) || defined(__64BIT__)
- UseMemoryMappedRead = on;
- #endif
- }
- #define ROW_WRITER_BUFFERSIZE (0x100000)
- class CRowStreamWriter : public CSimpleInterface, private IRowSerializerTarget, implements IExtRowWriter
- {
- Linked<IFileIOStream> stream;
- Linked<IOutputRowSerializer> serializer;
- Linked<IEngineRowAllocator> allocator;
- CRC32 crc;
- bool grouped;
- bool tallycrc;
- unsigned nested;
- MemoryAttr ma;
- MemoryBuffer extbuf; // may need to spill to disk at some point
- byte *buf;
- size32_t bufpos;
- bool autoflush;
- #ifdef TRACE_CREATE
- static unsigned wrnum;
- #endif
- void flushBuffer(bool final)
- {
- try
- {
- if (bufpos) {
- stream->write(bufpos,buf);
- if (tallycrc)
- crc.tally(bufpos,buf);
- bufpos = 0;
- }
- size32_t extpos = extbuf.length();
- if (!extpos)
- return;
- if (!final)
- extpos = (extpos/ROW_WRITER_BUFFERSIZE)*ROW_WRITER_BUFFERSIZE;
- if (extpos) {
- stream->write(extpos,extbuf.toByteArray());
- if (tallycrc)
- crc.tally(extpos,extbuf.toByteArray());
- }
- if (extpos<extbuf.length()) {
- bufpos = extbuf.length()-extpos;
- memcpy(buf,extbuf.toByteArray()+extpos,bufpos);
- }
- extbuf.clear();
- }
- catch (IException *e)
- {
- autoflush = false; // avoid follow-on errors
- EXCLOG(e, "flushBuffer");
- throw;
- }
- }
- void streamFlush()
- {
- try
- {
- stream->flush();
- }
- catch (IException *e)
- {
- autoflush = false; // avoid follow-on errors
- EXCLOG(e, "streamFlush");
- throw;
- }
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CRowStreamWriter(IFileIOStream *_stream,IOutputRowSerializer *_serializer,IEngineRowAllocator *_allocator,bool _grouped, bool _tallycrc, bool _autoflush)
- : stream(_stream), serializer(_serializer), allocator(_allocator)
- {
- #ifdef TRACE_CREATE
- PROGLOG("createRowWriter %d = %p",++wrnum,this);
- #endif
- grouped = _grouped;
- tallycrc = _tallycrc;
- nested = 0;
- buf = (byte *)ma.allocate(ROW_WRITER_BUFFERSIZE);
- bufpos = 0;
- autoflush = _autoflush;
- }
- ~CRowStreamWriter()
- {
- #ifdef TRACE_CREATE
- PROGLOG("~createRowWriter %d = %p",wrnum--,this);
- #endif
- if (autoflush)
- flush();
- else if (bufpos+extbuf.length()) {
- #ifdef _DEBUG
- PrintStackReport();
- #endif
- WARNLOG("CRowStreamWriter closed with %d bytes unflushed",bufpos+extbuf.length());
- }
- }
- void putRow(const void *row)
- {
- if (row) {
- serializer->serialize(*this,(const byte *)row);
- if (grouped) {
- byte b = 0;
- if (bufpos<ROW_WRITER_BUFFERSIZE)
- buf[bufpos++] = b;
- else
- extbuf.append(b);
- }
- allocator->releaseRow(row);
- }
- else if (grouped) { // backpatch
- byte b = 1;
- if (extbuf.length())
- extbuf.writeDirect(extbuf.length()-1,sizeof(b),&b);
- else {
- assertex(bufpos);
- buf[bufpos-1] = b;
- }
- }
- }
- void flush()
- {
- flushBuffer(true);
- streamFlush();
- }
- void flush(CRC32 *crcout)
- {
- flushBuffer(true);
- streamFlush();
- if (crcout)
- *crcout = crc;
- }
- offset_t getPosition()
- {
- return stream->tell()+bufpos+extbuf.length();
- }
- void put(size32_t len, const void * ptr)
- {
- // first fill buf
- loop {
- if (bufpos<ROW_WRITER_BUFFERSIZE) {
- size32_t wr = ROW_WRITER_BUFFERSIZE-bufpos;
- if (wr>len)
- wr = len;
- memcpy(buf+bufpos,ptr,wr);
- bufpos += wr;
- len -= wr;
- if (len==0)
- break; // quick exit
- ptr = (const byte *)ptr + wr;
- }
- if (nested) {
- // have to append to ext buffer (will need to spill to disk here if gets *too* big)
- extbuf.append(len,ptr);
- break;
- }
- else
- flushBuffer(false);
- }
- }
- size32_t beginNested(size32_t count)
- {
- if (nested++==0)
- if (bufpos==ROW_WRITER_BUFFERSIZE)
- flushBuffer(false);
- size32_t ret = bufpos+extbuf.length();
- size32_t sz = 0;
- put(sizeof(sz),&sz);
- return ret;
- }
- void endNested(size32_t pos)
- {
- size32_t sz = bufpos+extbuf.length()-(pos + sizeof(size32_t));
- size32_t wr = sizeof(size32_t);
- byte *out = (byte *)&sz;
- if (pos<ROW_WRITER_BUFFERSIZE) {
- size32_t space = ROW_WRITER_BUFFERSIZE-pos;
- if (space>wr)
- space = wr;
- memcpy(buf+pos,out,space);
- wr -= space;
- if (wr==0) {
- --nested;
- return; // quick exit
- }
- out += space;
- pos += space;
- }
- extbuf.writeDirect(pos-ROW_WRITER_BUFFERSIZE,wr,out);
- --nested;
- }
- };
- #ifdef TRACE_CREATE
- unsigned CRowStreamWriter::wrnum=0;
- #endif
- IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor)
- {
- OwnedIFileIO iFileIO;
- if (TestRwFlag(flags, rw_compress))
- {
- size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize();
- if (fixedSize && TestRwFlag(flags, rw_grouped))
- ++fixedSize; // row writer will include a grouping byte
- iFileIO.setown(createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, getCompMethod(flags)));
- }
- else
- iFileIO.setown(iFile->open((flags & rw_extend)?IFOwrite:IFOcreate));
- if (!iFileIO)
- return NULL;
- flags &= ~COMP_MASK;
- return createRowWriter(iFileIO, rowIf, flags);
- }
- IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags)
- {
- if (TestRwFlag(flags, rw_compress))
- throw MakeStringException(0, "Unsupported createRowWriter flags");
- Owned<IFileIOStream> stream;
- if (TestRwFlag(flags, rw_buffered))
- stream.setown(createBufferedIOStream(iFileIO));
- else
- stream.setown(createIOStream(iFileIO));
- if (flags & rw_extend)
- stream->seek(0, IFSend);
- flags &= ~((unsigned)(rw_extend|rw_buffered));
- return createRowWriter(stream, rowIf, flags);
- }
- IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags)
- {
- if (0 != (flags & (rw_extend|rw_buffered|COMP_MASK)))
- throw MakeStringException(0, "Unsupported createRowWriter flags");
- Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), TestRwFlag(flags, rw_grouped), TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
- return writer.getClear();
- }
- class CDiskMerger : public CInterface, implements IDiskMerger
- {
- IArrayOf<IFile> tempfiles;
- IRowStream **strms;
- Linked<IRecordSize> irecsize;
- StringAttr tempnamebase;
- Linked<IRowLinkCounter> linker;
- Linked<IRowInterfaces> rowInterfaces;
-
- public:
- IMPLEMENT_IINTERFACE;
- CDiskMerger(IRowInterfaces *_rowInterfaces, IRowLinkCounter *_linker, const char *_tempnamebase)
- : rowInterfaces(_rowInterfaces), linker(_linker), tempnamebase(_tempnamebase)
- {
- strms = NULL;
- }
- ~CDiskMerger()
- {
- for (unsigned i=0;i<tempfiles.ordinality();i++) {
- if (strms&&strms[i])
- strms[i]->Release();
- try
- {
- tempfiles.item(i).remove();
- }
- catch (IException * e)
- {
- //Exceptions inside destructors are bad.
- EXCLOG(e);
- e->Release();
- }
- }
- free(strms);
- }
- IRowWriter *createWriteBlock()
- {
- StringBuffer tempname(tempnamebase);
- tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
- IFile *file = createIFile(tempname.str());
- tempfiles.append(*file);
- return createRowWriter(file, rowInterfaces);
- }
- void put(const void **rows,unsigned numrows)
- {
- Owned<IRowWriter> out = createWriteBlock();
- for (unsigned i=0;i<numrows;i++)
- out->putRow(rows[i]);
- }
- void putIndirect(const void ***rowptrs,unsigned numrows)
- {
- Owned<IRowWriter> out = createWriteBlock();
- for (unsigned i=0;i<numrows;i++)
- out->putRow(*(rowptrs[i]));
- }
- virtual void put(ISortedRowProvider *rows)
- {
- Owned<IRowWriter> out = createWriteBlock();
- void * row;
- while((row = rows->getNextSorted()) != NULL)
- out->putRow(row);
- }
- IRowStream *merge(ICompare *icompare, bool partdedup)
- {
- unsigned numstrms = tempfiles.ordinality();
- strms = (IRowStream **)calloc(numstrms,sizeof(IRowStream *));
- unsigned i;
- for (i=0;i<numstrms;i++) {
- strms[i] = createRowStream(&tempfiles.item(i), rowInterfaces);
- }
- if (numstrms==1)
- return LINK(strms[0]);
- if (icompare)
- return createRowStreamMerger(numstrms, strms, icompare, partdedup, linker);
- return createConcatRowStream(numstrms,strms);
- }
- virtual count_t mergeTo(IRowWriter *dest, ICompare *icompare, bool partdedup)
- {
- count_t count = 0;
- Owned<IRowStream> mergedStream = merge(icompare, partdedup);
- loop
- {
- const void *row = mergedStream->nextRow();
- if (!row)
- return count;
- dest->putRow(row); // takes ownership
- ++count;
- }
- return count;
- }
- };
- IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase)
- {
- return new CDiskMerger(rowInterfaces, linker, tempnamebase);
- }
- //---------------------------------------------------------------------------------------------------------------------
- void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
- {
- if (totalCycles)
- {
- builder.addStatistic(StWhenFirstRow, firstRow);
- builder.addStatistic(StTimeElapsed, elapsed());
- builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
- builder.addStatistic(StTimeFirstExecute, latency());
- }
- }
- void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
- {
- if (other.totalCycles)
- {
- if (totalCycles)
- {
- //Record the earliest start, the latest end, the longest latencies
- cycle_t thisLatency = latencyCycles();
- cycle_t otherLatency = other.latencyCycles();
- cycle_t maxLatency = std::max(thisLatency, otherLatency);
- if (startCycles > other.startCycles)
- {
- startCycles = other.startCycles;
- firstRow =other.firstRow;
- }
- firstExitCycles = startCycles + maxLatency;
- if (endCycles < other.endCycles)
- endCycles = other.endCycles;
- totalCycles += other.totalCycles;
- }
- else
- *this = other;
- }
- }
- //---------------------------------------------------------------------------------------------------------------------
- //MORE: Not currently implemented for windows.
- #ifdef CPU_SETSIZE
- static unsigned getCpuId(const char * text, char * * next)
- {
- unsigned cpu = (unsigned)strtoul(text, next, 10);
- if (*next == text)
- throw makeStringExceptionV(1, "Invalid CPU: %s", text);
- else if (cpu >= CPU_SETSIZE)
- throw makeStringExceptionV(1, "CPU %u is out of range 0..%u", cpu, CPU_SETSIZE);
- return cpu;
- }
- #endif
- void setProcessAffinity(const char * cpuList)
- {
- assertex(cpuList);
- #ifdef CPU_ZERO
- cpu_set_t cpus;
- CPU_ZERO(&cpus);
- const char * cur = cpuList;
- loop
- {
- char * next;
- unsigned cpu1 = getCpuId(cur, &next);
- if (*next == '-')
- {
- const char * range = next+1;
- unsigned cpu2 = getCpuId(range, &next);
- for (unsigned cpu= cpu1; cpu <= cpu2; cpu++)
- CPU_SET(cpu, &cpus);
- }
- else
- CPU_SET(cpu1, &cpus);
- if (*next == '\0')
- break;
- if (*next != ',')
- throw makeStringExceptionV(1, "Invalid cpu affinity list %s", cur);
- cur = next+1;
- }
- if (sched_setaffinity(0, sizeof(cpu_set_t), &cpus))
- throw makeStringException(errno, "Failed to set affinity");
- DBGLOG("Process affinity set to %s", cpuList);
- #endif
- clearAffinityCache();
- }
- void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char * optNodes)
- {
- #if defined(CPU_ZERO) && defined(_USE_NUMA)
- if (processPerMachine <= 1)
- return;
- if (numa_available() == -1)
- {
- DBGLOG("Numa functions not available");
- return;
- }
- if (optNodes)
- throw makeStringException(1, "Numa node list not yet supported");
- unsigned numNumaNodes = numa_max_node()+1;
- if (numNumaNodes <= 1)
- return;
- //MORE: If processPerMachine < numNumaNodes we may want to associate with > 1 node.
- unsigned curNode = curProcess % numNumaNodes;
- #if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
- struct bitmask * cpus = numa_allocate_cpumask();
- numa_node_to_cpus(curNode, cpus);
- bool ok = (numa_sched_setaffinity(0, cpus) == 0);
- numa_bitmask_free(cpus);
- #else
- cpu_set_t cpus;
- CPU_ZERO(&cpus);
- numa_node_to_cpus(curNode, (unsigned long *) &cpus, sizeof (cpus));
- bool ok = sched_setaffinity (0, sizeof(cpus), &cpus) != 0;
- #endif
- if (!ok)
- throw makeStringExceptionV(1, "Failed to set affinity for node %u", curNode);
- DBGLOG("Process bound to numa node %u of %u", curNode, numNumaNodes);
- #endif
- clearAffinityCache();
- }
- void bindMemoryToLocalNodes()
- {
- #if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
- numa_set_bind_policy(1);
- unsigned numNumaNodes = numa_max_node() + 1;
- if (numNumaNodes <= 1)
- return;
- struct bitmask *nodes = numa_get_run_node_mask();
- numa_set_membind(nodes);
- DBGLOG("Process memory bound to numa nodemask 0x%x (of %u nodes total)", (unsigned)(*(nodes->maskp)), numNumaNodes);
- numa_bitmask_free(nodes);
- #endif
- }
|