12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #include "jlib.hpp"
- #include "jlog.hpp"
- #include "jqueue.tpp"
- #include "jexcept.hpp"
- #include "thorcommon.hpp"
- #include "thorstep.ipp"
- #include "thorstep2.ipp"
- #ifdef _DEBUG
- #define CHECK_CONSISTENCY
- #endif
- const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
- const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
- const static SmartStepExtra nonSeekStepExtra(SSEFreturnUnbufferedMatches, NULL); // if doing next() instead of nextGE()
- const static SmartStepExtra nonBufferedMatchStepExtra(SSEFreturnUnbufferedMatches, NULL);
- const static SmartStepExtra nonBufferedMismatchStepExtra(SSEFreturnMismatches, NULL);
- bool stepFieldsMatch(const CFieldOffsetSize * leftFields, unsigned leftIndex, const CFieldOffsetSize * rightFields, unsigned rightIndex)
- {
- const CFieldOffsetSize * leftField = leftFields + leftIndex;
- const CFieldOffsetSize * rightField = rightFields + rightIndex;
- return (leftField->offset == rightField->offset) && (leftField->size == rightField->size);
- }
- bool stepFieldsMatch(ISteppingMeta * leftMeta, unsigned leftIndex, ISteppingMeta * rightMeta, unsigned rightIndex)
- {
- if ((leftIndex >= leftMeta->getNumFields()) || (rightIndex >= rightMeta->getNumFields()))
- return false;
- return stepFieldsMatch(leftMeta->queryFields(), leftIndex, rightMeta->queryFields(), rightIndex);
- }
- unsigned getNumMatchingFields(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
- {
- unsigned numStepableFields = 0;
- if (inputStepping && callerStepping)
- {
- //Determine where the stepping fields overlap, and work out the extent.
- unsigned inputCount = inputStepping->getNumFields();
- for (unsigned i=0; i < inputCount; i++)
- {
- if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
- break;
- numStepableFields++;
- }
- }
- return numStepableFields;
- }
- void verifySteppingCompatible(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
- {
- if (inputStepping && callerStepping)
- {
- //Determine where the stepping fields overlap, and work out the extent.
- unsigned parentCount = callerStepping->getNumFields();
- unsigned inputCount = inputStepping->getNumFields();
- unsigned max = parentCount < inputCount ? parentCount : inputCount;
- for (unsigned i=0; i < max; i++)
- {
- if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
- throw MakeStringException(999, "Stepping field %d, input and join do not match", i);
- }
- }
- }
- //---------------------------------------------------------------------------
- void CSteppingMeta::intersect(IInputSteppingMeta * inputMeta)
- {
- if (inputMeta)
- {
- unsigned maxFields = inputMeta->getNumFields();
- if (maxFields > numFields)
- maxFields = numFields;
- for (unsigned curField = 0; curField < maxFields; curField++)
- {
- if (!stepFieldsMatch(inputMeta->queryFields(), curField, fields, curField))
- {
- numFields = curField;
- break;
- }
- }
- if (inputMeta->hasPostFilter())
- postFiltered = true;
- if (inputMeta->isDistributed())
- setDistributed();
- unsigned inputFlags = inputMeta->getSteppedFlags();
- double inputPriority = inputMeta->getPriority();
- if (hadStepExtra)
- {
- stepFlags &= inputFlags;
- if (priority != inputPriority)
- stepFlags &= ~SSFhaspriority;
- }
- else
- {
- hadStepExtra = true;
- stepFlags = inputFlags;
- priority = inputPriority;
- }
- }
- else
- numFields = 0;
- }
- //---------------------------------------------------------------------------
- CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSteppingMeta * _inputStepping, IEngineRowAllocator * _rowAllocator, IRangeCompare * _compare, bool _paranoid)
- : input(_input), compare(_compare)
- {
- maxFields = compare ? compare->maxFields() : 0;
- pending = NULL;
- pendingMatches = true;
- stepFlagsMask = 0;
- stepFlagsValue = 0;
- paranoid = _paranoid;
- previousPending = NULL;
- rowAllocator.set(_rowAllocator);
- inputStepping = _inputStepping;
- numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
- isPostFiltered = inputStepping ? inputStepping->hasPostFilter() : false;
- setRestriction(NULL, 0);
- lowestFrequencyInput = NULL;
- }
- CSteppedInputLookahead::~CSteppedInputLookahead()
- {
- if (previousPending)
- rowAllocator->releaseRow(previousPending);
- if (pending)
- rowAllocator->releaseRow(pending);
- }
- const void * CSteppedInputLookahead::nextInputRow()
- {
- if (readAheadRows.ordinality())
- return readAheadRows.dequeue();
- if (seekRows.ordinality())
- return seekRows.dequeue();
- return input->nextInputRow();
- }
-
- const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- while (readAheadRows.ordinality())
- {
- OwnedLCRow next = readAheadRows.dequeue();
- if (compare->docompare(next, seek, numFields) >= 0)
- {
- assertex(wasCompleteMatch);
- return (void *)next.getClear();
- }
- }
- while (seekRows.ordinality())
- {
- OwnedLCRow next = seekRows.dequeue();
- if (compare->docompare(next, seek, numFields) >= 0)
- {
- assertex(wasCompleteMatch);
- return (void *)next.getClear();
- }
- }
- return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
- {
- //Transfer any rows with fields before the seek position to a list of pending rows, so we don't waste
- //time sending seek rows that can't match..
- while (seekRows.ordinality())
- {
- const void * next = seekRows.head();
- if (compare->docompare(next, seek, numFields) >= 0)
- {
- //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
- seek = seekRows.tail();
- break;
- }
- readAheadRows.enqueue(seekRows.dequeue());
- }
- //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
- unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- seekRows.ensure(maxcount);
- while (seekRows.ordinality() < maxcount)
- {
- bool wasCompleteMatch = true;
- const void * next = input->nextInputRowGE(seek, numFields, wasCompleteMatch, inputStepExtra);
- if (!next)
- break;
- //wasCompleteMatch can be false if we've just read the last row returned from a block of reads,
- //but if so the next read request will do another blocked read, so just ignore this one.
- if (wasCompleteMatch)
- {
- seekRows.enqueue(next);
- //update the seek pointer to the best value.
- seek = next;
- }
- else
- rowAllocator->releaseRow(next);
- }
- }
- unsigned CSteppedInputLookahead::ordinality() const
- {
- //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
- if ((readAheadRows.ordinality() == 0) && pending)
- return seekRows.ordinality() + 1;
- return seekRows.ordinality();
- }
- const void * CSteppedInputLookahead::querySeek(unsigned i) const
- {
- //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
- if ((readAheadRows.ordinality() == 0) && pending)
- {
- if (i == 0)
- return pending;
- i--;
- }
- return seekRows.item(i);
- }
- const void * CSteppedInputLookahead::consume()
- {
- if (!pending)
- fill();
- if (!includeInOutput(pending))
- return NULL;
- if (paranoid && pending)
- {
- if (previousPending)
- rowAllocator->releaseRow(previousPending);
- previousPending = rowAllocator->linkRow(pending);
- }
- const void * ret = pending;
- pending = NULL;
- pendingMatches = true;
- return ret;
- }
- IMultipleStepSeekInfo * CSteppedInputLookahead::createMutipleReadWrapper()
- {
- return this;
- }
- void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper)
- {
- lowestFrequencyInput = wrapper;
- }
- void CSteppedInputLookahead::fill()
- {
- pendingMatches = true;
- if (restrictValue && numStepableFields)
- {
- //note - this will either return a valid value to be included in the range,
- //or if invalid then it must be out of range -> will fail includeInOutput later,
- //but we may as well keep the row
- unsigned numFields = numRestrictFields < numStepableFields ? numRestrictFields : numStepableFields;
- //Default to returning mismatches, but could be overidden from outside
- unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- pending = nextInputRowGE(restrictValue, numFields, pendingMatches, inputStepExtra);
- if (paranoid && pending)
- {
- int c = compare->docompare(pending, restrictValue, numFields);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join preceeds seek point");
- if ((c == 0) && !pendingMatches)
- throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
- }
- }
- else
- {
- //Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
- //Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
- pending = nextInputRow();
- }
- if (paranoid && pending && previousPending && compare)
- {
- if (compare->docompare(previousPending, pending, maxFields) > 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- }
- const void * CSteppedInputLookahead::next()
- {
- if (!pendingMatches)
- {
- if (includeInOutput(pending))
- skip();
- else
- return NULL;
- }
- if (!pending)
- fill();
- if (!includeInOutput(pending))
- return NULL;
- return pending;
- }
- const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- if (pending)
- {
- int c = compare->docompare(pending, seek, numFields);
- if (c >= 0)
- {
- if (!includeInOutput(pending))
- return NULL;
- if (pendingMatches)
- return pending;
- //pending Row is beyond seek point => ok to return an incomplete match
- if (stepExtra.returnMismatches() && (c != 0))
- {
- wasCompleteMatch = pendingMatches;
- return pending;
- }
- }
- skip();
- }
- if (numStepableFields)
- {
- //This class is directly told whether it should be using readAhead, so need to create a modified stepExtra
- unsigned flags = (stepExtra.queryFlags() & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
- loop
- {
- pendingMatches = true;
- pending = nextInputRowGE(seek, stepFields, pendingMatches, inputStepExtra);
- if (paranoid && pending)
- {
- int c = compare->docompare(pending, seek, stepFields);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join preceeds seek point");
- if ((c == 0) && !pendingMatches)
- throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
- }
- if (!pending || !includeInOutput(pending))
- return NULL;
- if (numFields <= numStepableFields)
- {
- wasCompleteMatch = pendingMatches;
- return pending;
- }
- //if !pendingMatches then isCompleteMatch must have been provided => ok to return a mismatch
- //if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
- if (!pendingMatches)
- {
- wasCompleteMatch = pendingMatches;
- return pending;
- }
- if (compare->docompare(pending, seek, numFields) >= 0)
- {
- wasCompleteMatch = pendingMatches;
- return pending;
- }
- skip();
- }
- //now need to do an incremental seek on the subsequent fields to find an exact value >
- }
- //now narrow down
- loop
- {
- const void * cur = next();
- if (!cur)
- return NULL;
- if (compare->docompare(cur, seek, numFields) >= 0)
- return cur;
- skip();
- }
- }
- unsigned CSteppedInputLookahead::queryMaxStepable(ISteppingMeta * callerStepping) const
- {
- return getNumMatchingFields(inputStepping, callerStepping);
- }
- void CSteppedInputLookahead::setAlwaysReadExact()
- {
- //can be used to force reading only exact matches (for the known lowest priority input)
- stepFlagsMask |= SSEFreturnMismatches;
- }
- void CSteppedInputLookahead::setReadAhead(bool value)
- {
- //This never removes readahead if requested somewhere else, so don't update the mask.
- if (value)
- stepFlagsValue |= SSEFreadAhead;
- else
- stepFlagsValue &= ~SSEFreadAhead;
- }
- void CSteppedInputLookahead::setRestriction(const void * _value, unsigned _num)
- {
- restrictValue = _value;
- numRestrictFields = _num;
- }
- void CSteppedInputLookahead::resetEOF()
- {
- if (numRestrictFields == 0)
- resetInputEOF();
- }
- void CSteppedInputLookahead::skip()
- {
- if (paranoid)
- {
- if (previousPending)
- rowAllocator->releaseRow(previousPending);
- previousPending = pending;
- }
- else
- {
- if (pending)
- rowAllocator->releaseRow(pending);
- }
- //NB: Don't read ahead until we have to...
- pending = NULL;
- pendingMatches = true;
- }
- const void * CSteppedInputLookahead::skipnext()
- {
- skip();
- return next();
- }
- //---------------------------------------------------------------------------
- void CUnfilteredSteppedMerger::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext, const bool * matched)
- {
- merger.setCandidateRow(_equalityRow);
- unsigned numInputs = inputArray->ordinality();
- for (unsigned i=0; i< numInputs; i++)
- {
- if (!needToVerifyNext || matched[i])
- firstCandidateRows[i] = inputArray->item(i).consume();
- else
- firstCandidateRows[i] = NULL;
- }
- merger.primeRows(firstCandidateRows);
- }
- //---------------------------------------------------------------------------
- CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRangeCompare * _stepCompare, ICompare * _equalCompare, CSteppedInputLookahead * _input, unsigned _numEqualFields)
- {
- allocator = _allocator;
- stepCompare = _stepCompare;
- equalCompare = _equalCompare;
- input = _input;
- matched.setown(createBitSet());
- numMatched = 0;
- readIndex = 0;
- numEqualFields = _numEqualFields;
- }
- CFilteredInputBuffer::~CFilteredInputBuffer()
- {
- }
- const void * CFilteredInputBuffer::consume()
- {
- if (!rows.isItem(readIndex))
- return NULL;
- const void * ret = rows.item(readIndex);
- rows.replace(NULL, readIndex);
- readIndex++;
- return ret;
- }
- const void * CFilteredInputBuffer::consumeGE(const void * seek, unsigned numFields)
- {
- while (rows.isItem(readIndex))
- {
- const void * cur = rows.item(readIndex);
- if (stepCompare->docompare(cur, seek, numFields) >= 0)
- {
- rows.replace(NULL, readIndex);
- readIndex++;
- return cur;
- }
- readIndex++;
- }
- return NULL;
- }
- void CFilteredInputBuffer::fill(const void * equalityRow)
- {
- const void * next = input->consume();
- assertex(next);
- append(next);
- if (equalityRow)
- {
- loop
- {
- bool matches = true;
- SmartStepExtra stepExtra(SSEFreturnMismatches, NULL);
- const void * next = input->nextGE(equalityRow, numEqualFields, matches, stepExtra);
- if (!next || !matches || equalCompare->docompare(equalityRow, next) != 0)
- break;
- append(input->consume());
- }
- }
- else
- {
- loop
- {
- const void * next = input->consume();
- if (!next)
- break;
- append(next);
- }
- }
- }
- void CFilteredInputBuffer::removeMatched()
- {
- ForEachItemInRev(i, rows)
- {
- if (isMatched(i))
- remove(i);
- }
- }
- void CFilteredInputBuffer::removeUnmatched()
- {
- ForEachItemInRev(i, rows)
- {
- if (!isMatched(i))
- remove(i);
- }
- }
- void CFilteredInputBuffer::remove(unsigned i)
- {
- const void * row = rows.item(i);
- rows.remove(i);
- allocator->releaseRow(row);
- }
- void CFilteredInputBuffer::reset()
- {
- ForEachItemIn(i, rows)
- {
- const void * cur = rows.item(i);
- if (cur)
- allocator->releaseRow(cur);
- }
- rows.kill();
- matched->reset();
- numMatched = 0;
- readIndex = 0;
- }
- CFilteredSteppedMerger::CFilteredSteppedMerger()
- {
- matches = NULL;
- joinKind = 0;
- numInputs = 0;
- equalCompare = NULL;
- extraCompare = NULL;
- globalCompare = NULL;
- minMatches = 0;
- maxMatches = 0;
- fullyMatchedLevel = 0;
- }
- CFilteredSteppedMerger::~CFilteredSteppedMerger()
- {
- delete [] matches;
- }
- void CFilteredSteppedMerger::init(IEngineRowAllocator * _allocator, IHThorNWayMergeJoinArg & helper, CSteppedInputLookaheadArray * inputArray)
- {
- unsigned flags = helper.getJoinFlags();
- joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
- numInputs = inputArray->ordinality();
- matches = new const void * [numInputs];
- equalCompare = helper.queryEqualCompare();
- extraCompare = helper.queryNonSteppedCompare();
- globalCompare = NULL;
- unsigned numEqualFields = helper.numEqualFields();
- if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
- globalCompare = helper.queryGlobalCompare();
- if (joinKind == IHThorNWayMergeJoinArg::MJFmofn)
- {
- minMatches = helper.getMinMatches();
- maxMatches = helper.getMaxMatches();
- }
- else
- {
- minMatches = numInputs;
- maxMatches = numInputs;
- }
- IRangeCompare * stepCompare = helper.querySteppingMeta()->queryCompare();
- ForEachItemIn(i, *inputArray)
- inputs.append(*new CFilteredInputBuffer(_allocator, stepCompare, equalCompare, &inputArray->item(i), numEqualFields));
- merger.init(_allocator, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
- merger.initInputs(&inputs);
- }
- //ISteppedJoinRowGenerator
- void CFilteredSteppedMerger::beforeProcessCandidates(const void * equalityRow, bool needToVerifyNext, const bool * matched)
- {
- //Exaustively read from each of the inputs into each of the buffers
- ForEachItemIn(i, inputs)
- {
- if (!needToVerifyNext || matched[i])
- inputs.item(i).fill(equalityRow);
- }
- postFilterRows();
- //No point priming the rows here - will be just as efficient to use the default action
- }
- void CFilteredSteppedMerger::afterProcessCandidates()
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).reset();
- merger.reset();
- }
- void CFilteredSteppedMerger::cleanupAllCandidates()
- {
- merger.reset(); // not strictly necessary...
- }
- void CFilteredSteppedMerger::afterProcessingAll()
- {
- merger.cleanup();
- }
-
- const void * CFilteredSteppedMerger::nextOutputRow()
- {
- return merger.nextRow();
- }
- const void * CFilteredSteppedMerger::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- return merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- bool CFilteredSteppedMerger::tagMatches(unsigned level, unsigned numRows)
- {
- CFilteredInputBuffer & right = inputs.item(level);
- unsigned maxLevel = inputs.ordinality()-1;
- ConstPointerArray & curRows = right.rows;
- if (curRows.ordinality())
- {
- bool valid = false;
- const void * lhs = matches[numRows-1];
- ForEachItemIn(i, curRows)
- {
- //If we have had a match at this level, and this item is already matched,
- //and all levels higher than this have already been completely matched),
- //then no need to check this item (and its children) again, since it won't change anything.
- bool alreadyMatched = right.isMatched(i);
- if (!valid || level + 1 < fullyMatchedLevel || !alreadyMatched)
- {
- const void * rhs = curRows.item(i);
- unsigned matchedRows = numRows;
- bool recurse;
- if (!extraCompare || extraCompare->match(lhs, rhs))
- {
- matches[matchedRows++] = rhs;
- recurse = matchedRows <= maxMatches;
- }
- else
- {
- //for mofn, check enough levels left to create a potential match, for others it will fail.
- unsigned remain = maxLevel-level;
- recurse = (numRows + remain >= minMatches);
- }
- if (recurse)
- {
- bool isFullMatch;
- if (level == maxLevel)
- isFullMatch = (!globalCompare || globalCompare->match(matchedRows, matches));
- else
- isFullMatch = tagMatches(level+1, matchedRows);
- if (isFullMatch)
- {
- valid = true;
- if (!alreadyMatched)
- right.noteMatch(i);
- //If the previous level is fully matched, and so is this one - then update the minimum fully matched level
- if ((level + 1 == fullyMatchedLevel) && right.isFullyMatched())
- fullyMatchedLevel--;
- //If all rows in this level and above are fully matched, then iterating any further will have no effect.
- //Could potentially reduce a O(N^m) to O(mN) if the majority of elements match.
- if (level >= fullyMatchedLevel)
- break;
- }
- }
- }
- }
- return valid;
- }
- else
- {
- //mofn may still be ok with a skipped level or two
- unsigned remain = maxLevel-level;
- if (numRows + remain >= minMatches)
- {
- if (level == maxLevel)
- return (!globalCompare || globalCompare->match(numRows, matches));
- else
- return tagMatches(level+1, numRows);
- }
- return false;
- }
- }
- void CFilteredSteppedMerger::tagMatches()
- {
- unsigned numInputs = inputs.ordinality();
- fullyMatchedLevel = numInputs;
- //for m of n, need to start matching at levels 0,1,.. numLevels - minMatches
- unsigned iterateLevels = numInputs - minMatches;
- for (unsigned level =0; level <= iterateLevels; level++)
- {
- CFilteredInputBuffer & left = inputs.item(level);
- ForEachItemIn(i, left.rows)
- {
- matches[0] = left.rows.item(i);
- bool thisMatched;
- //mofn(1) may not have another level, to just check global compare.
- if (level == numInputs-1)
- thisMatched = (!globalCompare || globalCompare->match(1, matches));
- else
- thisMatched = tagMatches(level+1, 1);
- if (thisMatched)
- {
- if (!left.isMatched(i))
- left.noteMatch(i);
- //Check if this level, and all above are now fully matched. If so, we're done.
- if ((level + 1 == fullyMatchedLevel) && left.isFullyMatched())
- {
- fullyMatchedLevel--;
- break;
- }
- }
- }
- if (level >= fullyMatchedLevel)
- break;
- }
- }
- void CFilteredSteppedMerger::postFilterRows()
- {
- tagMatches();
- unsigned max = inputs.ordinality();
- switch (joinKind)
- {
- case IHThorNWayMergeJoinArg::MJFinner:
- case IHThorNWayMergeJoinArg::MJFmofn:
- {
- for (unsigned i=0; i < max; i++)
- inputs.item(i).removeUnmatched();
- break;
- }
- case IHThorNWayMergeJoinArg::MJFleftouter:
- {
- for (unsigned i=1; i < max; i++)
- inputs.item(i).removeUnmatched();
- break;
- }
- case IHThorNWayMergeJoinArg::MJFleftonly:
- {
- inputs.item(0).removeMatched();
- unsigned max = inputs.ordinality();
- for (unsigned i=1; i < max; i++)
- inputs.item(i).reset();
- break;
- }
- }
- }
- //---------------------------------------------------------------------------
- CMergeJoinProcessor::CMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : helper(_arg)
- {
- mergeSteppingMeta = helper.querySteppingMeta();
- assertex(mergeSteppingMeta);
- stepCompare = mergeSteppingMeta->queryCompare();
- equalCompare = helper.queryEqualCompare();
- equalCompareEq = helper.queryEqualCompareEq();
- numEqualFields = helper.numEqualFields();
- flags = helper.getJoinFlags();
- matched = NULL;
- candidateEqualityRow = NULL;
- numExternalEqualFields = 0;
- conjunctionOptimizer = NULL;
- tempSeekBuffer = NULL;
- lowestSeekRow = NULL;
- combineConjunctions = true;
- allInputsAreOuterInputs = false;
- maxSeekRecordSize = 0;
- numInputs = 0;
- eof = true;
- assertex(helper.numOrderFields() == mergeSteppingMeta->getNumFields());
- bool hasPostfilter = false;
- thisSteppingMeta.init(mergeSteppingMeta->getNumFields(), mergeSteppingMeta->queryFields(), stepCompare, mergeSteppingMeta->queryDistance(), hasPostfilter);
- }
- CMergeJoinProcessor::~CMergeJoinProcessor()
- {
- afterProcessing();
- }
- void CMergeJoinProcessor::addInput(ISteppedInput * _input)
- {
- IInputSteppingMeta * _meta = _input->queryInputSteppingMeta();
- verifySteppingCompatible(_meta, mergeSteppingMeta);
- rawInputs.append(*LINK(_input));
- }
- void CMergeJoinProcessor::afterProcessing()
- {
- cleanupCandidates();
- if (outputProcessor)
- {
- outputProcessor->afterProcessingAll();
- outputProcessor.clear();
- }
- if (conjunctionOptimizer)
- {
- conjunctionOptimizer->afterProcessing();
- delete conjunctionOptimizer;
- conjunctionOptimizer = NULL;
- }
- delete [] matched;
- matched = NULL;
- inputs.kill();
- rawInputs.kill();
- orderedInputs.kill();
- if (lowestSeekRow)
- {
- inputAllocator->releaseRow(lowestSeekRow);
- lowestSeekRow = NULL;
- }
- if (tempSeekBuffer)
- {
- inputAllocator->releaseRow(tempSeekBuffer);
- tempSeekBuffer = NULL;
- }
- //Now free the allocators
- inputAllocator.clear();
- outputAllocator.clear();
- maxSeekRecordSize = 0;
- }
- void CMergeJoinProcessor::createTempSeekBuffer()
- {
- tempSeekBuffer = inputAllocator->createRow();
- #ifdef _DEBUG
- //Clear the complete tempSeekBBuffer record, so that toXML() can be used to trace the seek row in roxie
- if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
- {
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, inputAllocator->queryOutputMeta()->getMinRecordSize());
- helper.createLowInputRow(rowBuilder);
- }
- #endif
- }
- void CMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- inputAllocator.set(_inputAllocator);
- outputAllocator.set(_outputAllocator);
- //The seek components must all be fixed width, so the seek record size must be <= the minimum size of the input record
- maxSeekRecordSize = inputAllocator->queryOutputMeta()->getMinRecordSize();
- bool paranoid = (flags & IHThorNWayMergeJoinArg::MJFassertsorted) != 0;
- ForEachItemIn(i1, rawInputs)
- {
- ISteppedInput & cur = rawInputs.item(i1);
- inputs.append(* new CSteppedInputLookahead(&cur, cur.queryInputSteppingMeta(), inputAllocator, stepCompare, paranoid));
- }
- if (flags & IHThorNWayMergeJoinArg::MJFhasclearlow)
- {
- RtlDynamicRowBuilder rowBuilder(inputAllocator);
- size32_t size = helper.createLowInputRow(rowBuilder);
- lowestSeekRow = rowBuilder.finalizeRowClear(size);
- }
- cleanupCandidates();
- eof = false;
- numInputs = inputs.ordinality();
- matched = new bool[numInputs];
- if (numInputs == 0)
- eof = true;
- //Sort the inputs by the preferred processing order (if provided), ensuring no duplicates
- clearMatches();
- ForEachItemIn(i2, searchOrder)
- {
- unsigned next = searchOrder.item(i2);
- if (next < numInputs && !matched[next])
- {
- orderedInputs.append(OLINK(inputs.item(next)));
- matched[next] = true;
- }
- }
- //MORE: We really should move the most-stepable inputs to the start
- for (unsigned i3 = 0; i3 < numInputs; i3++)
- {
- if (!matched[i3])
- orderedInputs.append(OLINK(inputs.item(i3)));
- }
- }
- bool CMergeJoinProcessor::createConjunctionOptimizer()
- {
- if (inputs.ordinality())
- {
- conjunctionOptimizer = new CSteppedConjunctionOptimizer(inputAllocator, helper, this);
- if (gatherConjunctions(*conjunctionOptimizer) && conjunctionOptimizer->worthCombining())
- {
- conjunctionOptimizer->beforeProcessing();
- return true;
- }
- delete conjunctionOptimizer;
- conjunctionOptimizer = NULL;
- }
- combineConjunctions = false;
- return false;
- }
- void CMergeJoinProcessor::createMerger()
- {
- ICompareEq * extraCompare = helper.queryNonSteppedCompare();
- bool hasGlobalCompare = (flags & IHThorNWayMergeJoinArg::MJFglobalcompare) != 0;
- if (!extraCompare && !hasGlobalCompare)
- {
- Owned<CUnfilteredSteppedMerger> simpleMerger = new CUnfilteredSteppedMerger(numEqualFields);
- simpleMerger->init(inputAllocator, equalCompare, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
- simpleMerger->initInputs(&inputs);
- outputProcessor.setown(simpleMerger.getClear());
- }
- else
- {
- Owned<CFilteredSteppedMerger> simpleMerger = new CFilteredSteppedMerger();
- simpleMerger->init(inputAllocator, helper, &inputs);
- outputProcessor.setown(simpleMerger.getClear());
- }
- }
- void CMergeJoinProcessor::createEqualityJoinProcessor()
- {
- if (numEqualFields >= helper.numOrderFields())
- outputProcessor.setown(new CEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- else
- outputProcessor.setown(new CSortedEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- }
- void CMergeJoinProcessor::finishCandidates()
- {
- if (outputProcessor)
- outputProcessor->afterProcessCandidates();
- assertex(hasCandidates());
- inputAllocator->releaseRow(candidateEqualityRow);
- candidateEqualityRow = NULL;
- }
- bool CMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- allInputsAreOuterInputs = true;
- ForEachItemIn(i, inputs)
- {
- CSteppedInputLookahead & cur = inputs.item(i);
- if (!cur.gatherConjunctions(collector))
- collector.addInput(cur);
- else
- {
- collector.addPseudoInput(cur);
- allInputsAreOuterInputs = false;
- }
- }
- collector.addJoin(*this);
- return true;
- }
- const void * CMergeJoinProcessor::nextInputRow()
- {
- if (!hasCandidates() && !findCandidates(NULL, 0))
- return NULL;
- loop
- {
- const void * next = nextCandidate();
- if (next)
- return next;
- finishCandidates();
- //Abort early if externally optimized, and not proximity (since they may not have read all records for this equality)
- if ((numEqualFields == numExternalEqualFields) && candidatesExhaustEquality())
- return NULL;
- if (!findCandidates(NULL, 0))
- return NULL;
- }
- }
- const void * CMergeJoinProcessor::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //First check the next row from the candidates, it may be ok.
- if (hasCandidates())
- {
- unsigned compareFields = numFields < numEqualFields ? numFields : numEqualFields;
- //check whether the candidates could possibly return the match
- if (stepCompare->docompare(candidateEqualityRow, seek, compareFields) == 0)
- {
- const void * next = nextCandidateGE(seek, numFields, wasCompleteMatch, stepExtra);
- if (next)
- return next; // note must match equality to have been returned.
- }
- finishCandidates();
- }
- if (!findCandidates(seek, numFields))
- return NULL;
- return nextInputRow();
- }
- void CMergeJoinProcessor::resetEOF()
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).resetEOF();
- }
- void CMergeJoinProcessor::queryResetEOF()
- {
- resetEOF();
- }
- const void * CMergeJoinProcessor::nextInGroup()
- {
- if (conjunctionOptimizer)
- return conjunctionOptimizer->next();
- if (combineConjunctions)
- {
- if (numExternalEqualFields == 0)
- {
- if (createConjunctionOptimizer())
- return conjunctionOptimizer->next();
- }
- else
- combineConjunctions = false; // being used inside a conjunction optimizer => don't create another..
- }
- return nextInputRow();
- }
- const void * CMergeJoinProcessor::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- if (conjunctionOptimizer)
- return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
- if (combineConjunctions)
- {
- if (createConjunctionOptimizer())
- return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- return nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- void CMergeJoinProcessor::startRestrictedJoin(const void * equalityRow, unsigned numEqualityFields)
- {
- assertex(numExternalEqualFields == 0);
- numExternalEqualFields = numEqualityFields;
- eof = false;
- }
- void CMergeJoinProcessor::stopRestrictedJoin()
- {
- numExternalEqualFields = 0;
- if (hasCandidates())
- finishCandidates();
- //There are no more matches for this (outer) equality condition, so all active rows need to be thrown away.
- if (outputProcessor)
- outputProcessor->cleanupAllCandidates();
- }
- void CMergeJoinProcessor::setCandidateRow(const void * row, bool inputsMayBeEmpty, const bool * matched)
- {
- candidateEqualityRow = inputAllocator->linkRow(row);
- const void * restrictionRow = (numEqualFields == numExternalEqualFields) ? NULL : candidateEqualityRow;
- outputProcessor->beforeProcessCandidates(restrictionRow, inputsMayBeEmpty, matched);
- }
- //---------------------------------------------------------------------------
- CAndMergeJoinProcessor::CAndMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- }
- void CAndMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- }
- bool CAndMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- if (eof)
- return false;
- const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
- const void * equalValue;
- unsigned firstInput = 0;
- if (inputsMustMatchEquality && allInputsAreOuterInputs)
- {
- //special case - all inputs are already advanced to the correct place, so just start generating candidates
- //for nested conjunctions they may already be exausted though
- equalValue = orderedInputs.item(firstInput).next();
- if (!equalValue)
- {
- eof = true;
- return false;
- }
- }
- else
- {
- if (!seekValue)
- {
- numSeekFields = numEqualFields;
- seekValue = lowestSeekRow;
- }
- bool matchedCompletely = true;
- equalValue = orderedInputs.item(firstInput).next(seekValue, numSeekFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (!equalValue)
- {
- eof = true;
- return false;
- }
- PreservedRow savedRow(inputAllocator);
- unsigned matchCount = 0;
- clearMatches();
- if (matchedCompletely)
- {
- matched[firstInput] = true;
- matchCount++;
- }
- else
- {
- equalValue = orderedInputs.item(firstInput).consume();
- savedRow.setown(equalValue);
- }
- unsigned lastInput = firstInput;
- while (matchCount != numInputs)
- {
- unsigned nextInput = nextToMatch(lastInput);
- lastInput = nextInput;
- bool matchedCompletely = true;
- const void * nextRow = orderedInputs.item(nextInput).nextGE(equalValue, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (!nextRow)
- {
- eof = true;
- return false;
- }
- #ifdef CHECK_CONSISTENCY
- if (inputsMustMatchEquality)
- {
- if (equalCompare->docompare(nextRow, equalValue) != 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- else
- {
- if (equalCompare->docompare(nextRow, equalValue) < 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- #endif
- if (!inputsMustMatchEquality)
- {
- if (!equalCompareEq->match(nextRow, equalValue))
- {
- //value didn't match => skip all the previously matched entries.
- for (unsigned i=0; i < numInputs; i++)
- {
- if (matched[i])
- {
- matched[i] = false;
- orderedInputs.item(i).skip();
- if (--matchCount == 0)
- break;
- }
- }
- if (!matchedCompletely)
- {
- //Need to preserve nextRow, otherwise it will be gone after we skip
- equalValue = orderedInputs.item(nextInput).consume();
- savedRow.setown(equalValue);
- }
- else
- equalValue = nextRow;
- }
- }
- if (matchedCompletely)
- {
- matched[nextInput] = true;
- matchCount++;
- }
- }
- }
- //Set up the mergeProcessor with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
- //with the candidate.
- //clone one of the rows
- setCandidateRow(equalValue, false, NULL);
- return true;
- }
- unsigned CAndMergeJoinProcessor::nextToMatch(unsigned lastInput) const
- {
- for (unsigned i=0; i < numInputs; i++)
- {
- //Don't seek on the last input again (it may have found a keyed match, but not matched the post filter)
- if ((i != lastInput) && !matched[i])
- return i;
- }
- throwUnexpected();
- }
- //---------------------------------------------------------------------------
- CAndLeftMergeJoinProcessor::CAndLeftMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- combineConjunctions = false; // No advantage using this as the base of a combined conjunction
- isLeftOnly = (flags & IHThorNWayMergeJoinArg::MJFkindmask) == IHThorNWayMergeJoinArg::MJFleftonly;
- //Left only with a not stepped comparison needs to be done as a left outer at the stepping level
- if (isLeftOnly && (helper.queryNonSteppedCompare() || helper.queryGlobalCompare()))
- isLeftOnly = false;
- }
- void CAndLeftMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- createTempSeekBuffer();
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- }
- bool CAndLeftMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- if (eof)
- return false;
- CSteppedInputLookahead & input0 = inputs.item(0);
- bool wasMatched = true;
- const void * lhs = input0.next(seekValue, numSeekFields, wasMatched, nonBufferedMatchStepExtra);
- assertex(wasMatched);
- if (!lhs)
- {
- eof = true;
- return false;
- }
- unsigned matchCount = 1;
- while (matchCount != numInputs)
- {
- bool matchedCompletely = true; // we don't care what the next rhs value is - as long as it can't match the left
- const void * rhs = orderedInputs.item(matchCount).nextGE(lhs, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (rhs)
- {
- int c = equalCompare->docompare(rhs, lhs);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- if (c == 0)
- {
- assertex(matchedCompletely);
- //previously the (matchCount+1) test wasn't here, so it aborted as soon as there was any match.
- if (isLeftOnly && (matchCount+1 == numInputs))
- {
- if (numEqualFields == numExternalEqualFields)
- {
- //I think this is worth doing here...
- //Skip input0 to a mismatch value, so the optimizer doesn't waste time reading extra equalities
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
- input0.skip(); // invalidates lhs
- if (calculatedNextSeek)
- {
- bool wasMatched = true;
- input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
- }
- eof = true;
- return false;
- }
- //Create the next join value if that is possible
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
- input0.skip(); // invalidates lhs
- bool wasMatched = true;
- if (calculatedNextSeek)
- lhs = input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
- else
- lhs = input0.next();
- if (!lhs)
- {
- eof = true;
- return false;
- }
- matchCount = 0; //incremented at tail of loop
- }
- }
- else
- break;
- }
- else
- break;
- matchCount++;
- }
- clearMatches();
- matched[0] = true;
- if (matchCount != numInputs)
- {
- //Failed to match completely => generate a match for just the left. Skip any matched rows so far and break out.
- for (unsigned i=1; i < matchCount; i++)
- orderedInputs.item(i).skip();
- matchCount = 1;
- }
- else
- {
- for (unsigned i=1; i < numInputs; i++)
- matched[i] = true;
- }
- //LEFT ONLY will only merge 1 stream, LEFT OUTER will merge as many as match LEFT
- setCandidateRow(lhs, true, matched);
- return true;
- }
- bool CAndLeftMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- CSteppedInputLookahead & cur = inputs.item(0);
- if (!cur.gatherConjunctions(collector))
- collector.addInput(cur);
- collector.addJoin(*this);
- return true;
- }
- //---------------------------------------------------------------------------
- void BestMatchManager::associate(unsigned input, const void * value)
- {
- unsigned curIndex = 0;
- while (curIndex != numEntries)
- {
- BestMatchItem & cur = matches.item(curIndex);
- int c = compare->docompare(value, cur.value);
- if (c <= 0)
- {
- if (c == 0)
- {
- //insert at the end of the duplicates
- curIndex += cur.duplicates;
- cur.duplicates++;
- }
- //Move a record at the end of the list to the correct position, ready for updating.
- if (curIndex != numEntries)
- matches.rotateR(curIndex, numEntries);
- break; // now go and modify record at position curIndex
- }
- curIndex += cur.duplicates;
- }
- assertex(matches.isItem(curIndex));
- BestMatchItem & inserted = matches.item(curIndex);
- inserted.duplicates = 1;
- inserted.value = value;
- inserted.input = input;
- numEntries++;
- return;
- }
- unsigned BestMatchManager::getValueOffset(unsigned idx) const
- {
- unsigned offset = 0;
- while (idx--)
- offset += matches.item(offset).duplicates;
- return offset;
- }
- void BestMatchManager::init(ICompare * _compare, unsigned numInputs)
- {
- compare = _compare;
- numEntries = 0;
- for (unsigned i=0; i < numInputs; i++)
- matches.append(* new BestMatchItem);
- }
- void BestMatchManager::kill()
- {
- matches.kill();
- }
- unsigned BestMatchManager::getInput(unsigned whichValue, unsigned inputIndex) const
- {
- return matches.item(getValueOffset(whichValue) + inputIndex).input;
- }
- unsigned BestMatchManager::getInput0(unsigned inputIndex) const
- {
- return matches.item(inputIndex).input;
- }
- unsigned BestMatchManager::numInputs(unsigned whichValue) const
- {
- return matches.item(getValueOffset(whichValue)).duplicates;
- }
- void BestMatchManager::remove(unsigned whichValue)
- {
- unsigned offset = getValueOffset(whichValue);
- unsigned duplicates = matches.item(offset).duplicates;
- matches.rotateLN(offset, numEntries-1, duplicates);
- numEntries -= duplicates;
- }
- const void * BestMatchManager::queryValue(unsigned whichValue) const
- {
- return matches.item(getValueOffset(whichValue)).value;
- }
- //---------------------------------------------------------------------------
- CMofNMergeJoinProcessor::CMofNMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- combineConjunctions = false;
- alive = NULL;
- candidateMask = NULL;
- minMatches = 0;
- maxMatches = 0;
- numActive = 0;
- }
- void CMofNMergeJoinProcessor::afterProcessing()
- {
- delete [] alive;
- delete [] candidateMask;
- alive = NULL;
- candidateMask = NULL;
- matches.kill();
- CMergeJoinProcessor::afterProcessing();
- }
- void CMofNMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- minMatches = helper.getMinMatches();
- maxMatches = helper.getMaxMatches();
- if (minMatches == 0)
- throw MakeStringException(99, "Need a non-zero minimum number of matches");
- alive = new bool [numInputs];
- candidateMask = new bool [numInputs];
- for (unsigned i= 0; i < numInputs; i++)
- alive[i] = true;
- numActive = numInputs;
- matches.init(equalCompare, numInputs);
- }
- bool CMofNMergeJoinProcessor::findCandidates(const void * originalSeekValue, unsigned numOriginalSeekFields)
- {
- if (numActive < minMatches)
- return false;
-
- unsigned numFreeToMismatch = numActive - minMatches;
- const void * seekValue = originalSeekValue;
- unsigned numSeekFields = numOriginalSeekFields;
- //This should be true, because after candidates are matched their values are removed.
- assertex(matches.numInputs() <= numFreeToMismatch); //
- //MORE: This needs rewriting, so that mismatches are handled coorectly. In particular,
- while (matches.numInputs() < numActive)
- {
- unsigned nextInput = nextToMatch();
- bool matchedCompletely = true;
- // MORE: This needs rewriting, so that mismatches are handled coorectly. In particular, the matches need to retain information about whether
- // they matched fully, since that will optimize where could be sought next.
- const void * value = inputs.item(nextInput).next(seekValue, numSeekFields, matchedCompletely, nonBufferedMatchStepExtra);
- //NOTE: matchedCompletely is currently always true. More work is needed if not true.
- assertex(matchedCompletely);
- if (value)
- {
- if (matchedCompletely)
- {
- matched[nextInput] = true;
- matches.associate(nextInput, value);
- }
- }
- else
- {
- alive[nextInput] = false;
- numActive--;
- numFreeToMismatch--;
- if (numActive < minMatches)
- return false;
- }
- unsigned matchCount = matches.numInputs();
- if (matchCount > numFreeToMismatch)
- {
- unsigned numMatch0 = matches.numInputs0();
- if ((matchCount - numMatch0 > numFreeToMismatch) || (numMatch0 > maxMatches))
- {
- //clear seekValue, because seek value won't be valid any more after the skips - may be updated later.
- seekValue = originalSeekValue;
- numSeekFields = numOriginalSeekFields;
- //No way that the first element is going to match, so remove all inputs associated with it.
- for (unsigned i= 0; i < numMatch0; i++)
- {
- unsigned input = matches.getInput0(i);
- inputs.item(input).skip();
- matched[input] = false;
- }
- matches.remove(0);
- }
- //Lowest element now provides the best seek position.
- if (matches.numInputs() > numFreeToMismatch)
- {
- seekValue = matches.queryValue(0);
- numSeekFields = numEqualFields;
- }
- }
- }
- //matches(0) contains the next match set, set a set of flags indicating which inputs to use
- unsigned numMatches = matches.numInputs0();
- for (unsigned i1=0; i1< numInputs; i1++)
- candidateMask[i1] = false;
- for (unsigned i2=0; i2 < numMatches; i2++)
- candidateMask[matches.getInput0(i2)] = true;
- setCandidateRow(matches.queryValue(0), true, candidateMask);
- //Now cleanup housekeeping, so that findCandidates() is ready to find the next block
- for (unsigned i3=0; i3 < numInputs; i3++)
- if (candidateMask[i3])
- matched[i3] = false;
- matches.remove(0);
- return true;
- }
- bool CMofNMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- //MORE: We may need to create pseudo inputs in order to process these optimially.
- return false;
- }
- unsigned CMofNMergeJoinProcessor::nextToMatch() const
- {
- for (unsigned i= 0; i < numInputs; i++)
- {
- if (alive[i] && !matched[i])
- return i;
- }
- throwUnexpected();
- }
- //---------------------------------------------------------------------------
- /*
- NOTES on the distances... this is far from simple once you get arbitrary trees involved.
- given a join expression right.x between left.x - a and left.x + b i.e. up to a Before and b after
- a is maxRightBeforeLeft
- b is maxLeftBeforeRight
- We define a function D(x,y) which is the maximum value which can be deducted from row a, to provide a valid value for row b
- Given a tree of join expressions, we can calculate a distance function between any pair of inputs.
- J1(a,b,c) = D(i,i+1)=4, D(i+1,i) = 10
- J2(d, e) = D(i,i+1)=-1, D(i+1,i) = 12
- J3(J1, J2) = D(i,i+1)=0 D(i+1,i) = 5
- =>
- For each join we define
- D(i, lowest) - maximum value to deduct from row i to obtain the lowest
- D(highest, i) - maximum value to deduct from highest to obtain row i
- by definition these must both be >= 0, for a simple input they are both 0
- A join's extend is given by
- D(highest,lowest) = max(D(highest,i)+D(i,lowest))
- We're only interested in the maximum values, which are obtained by the maximum distances between the elements. The lowest and highest memebers
- of the group are going to be the ends. So we use the maximum of those distances to work out D(i,low) and D(high, i), being careful to only use
- the range if it is valid (e.g., the end must be possible to be the highest/lowest)
- D(a,b) = 4, D(b,a) = 10
- D(b,c) = 4, D(c,b) = 10
- D(a,c) = 8, D(c,a) = 20
- D(d,e) = -1, D(e,d) = 12
- D(a, lowest) = 8 D(highest, a) = 20
- D(b, lowest) = 10 D(highest, b) = 10
- D(c, lowest) = 20 D(highest, c) = 8
- D(highest, lowest) = 28
- Then assuming the left is the highest and the right is the lowest we have
- D(a,e) = DJ1(a, lowest) + DJ3(i,i+1) + DJ2(highest, e)
- = 8 + 0 + 0
- For >2 terms, you also need to take into account the size of a term given by D(highest,lowest)
- */
- inline unsigned __int64 adjustRangeValue(unsigned __int64 rangeValue, __int64 delta)
- {
- if ((delta >= 0) || (rangeValue > (unsigned __int64)-delta))
- return rangeValue + delta;
- return 0;
- }
- //---------------------------------------------------------------------------
- //This class is created for each each input of a nary-join, it maintains a queue of potential records.
- CNaryJoinLookaheadQueue::CNaryJoinLookaheadQueue(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookahead * _input, CNaryJoinLookaheadQueue * _left, const void * * _activeRowPtr) : helper(_helper), rows(_inputAllocator), unmatchedRows(_inputAllocator)
- {
- equalCompareEq = helper.queryEqualCompareEq();
- nonSteppedCompareEq = helper.queryNonSteppedCompare();
- numEqualFields = helper.numEqualFields();
- stepCompare = helper.querySteppingMeta()->queryCompare();
- input.set(_input);
- activeRowPtr = _activeRowPtr;
- left = _left;
- equalityRow = NULL;
- curRow = 0;
- maxRow = 0;
- numSkipped = 0;
- done = true;
- }
- bool CNaryJoinLookaheadQueue::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext)
- {
- done = false;
- equalityRow = _equalityRow;
- rows.kill();
- numSkipped = 0;
- if (matchedLeft)
- matchedLeft->reset();
- // next is guaranteed to match the equality condition for AND, proximity but not for m of n/left outer...
- if (!needToVerifyNext || nextUnqueued())
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- void CNaryJoinLookaheadQueue::clearPending()
- {
- rows.kill();
- }
- bool CNaryJoinLookaheadQueue::ensureNonEmpty()
- {
- if (rows.ordinality())
- return true;
- if (nextUnqueued())
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- bool CNaryJoinLookaheadQueue::firstSelection()
- {
- if (!left)
- {
- assertex(maxRow != 0);
- curRow = 0;
- *activeRowPtr = rows.item(curRow);
- return true;
- }
-
- if (!left->firstSelection())
- return false;
- return findValidSelection(0);
- }
- bool CNaryJoinLookaheadQueue::findValidSelection(unsigned initialRow)
- {
- assertex(left);
- const unsigned max = maxRow;
- unsigned candidateRow = initialRow;
- loop
- {
- const void * leftRow = left->activeRow();
- while (candidateRow < max)
- {
- const void * rightRow = rows.item(candidateRow);
- if (!nonSteppedCompareEq || nonSteppedCompareEq->match(leftRow, rightRow))
- {
- curRow = candidateRow;
- *activeRowPtr = rightRow;
- return true;
- }
- candidateRow++;
- }
- if (!left->nextSelection())
- return false;
- candidateRow = 0;
- }
- }
- const void * CNaryJoinLookaheadQueue::nextUnqueued()
- {
- if (equalityRow)
- {
- bool matches = true;
- const void * next = input->nextGE(equalityRow, numEqualFields, matches, nonBufferedMismatchStepExtra);
- if (next && matches && equalCompareEq->match(next, equalityRow))
- return next;
- return NULL;
- }
- else
- return input->next();
- }
- bool CNaryJoinLookaheadQueue::nextSelection()
- {
- if (left)
- return findValidSelection(curRow+1);
- curRow++;
- if (curRow >= maxRow)
- return false;
- *activeRowPtr = rows.item(curRow);
- return true;
- }
- bool CNaryJoinLookaheadQueue::ensureCandidateExists(unsigned __int64 minDistance, unsigned __int64 maxDistance)
- {
- loop
- {
- const void * next = rows.head();
- if (!next)
- break;
- unsigned __int64 distance = helper.extractRangeValue(next);
- if (distance >= minDistance)
- {
- assertex(distance <= maxDistance);
- return true;
- }
- rows.skip();
- }
- loop
- {
- const void * next = nextUnqueued();
- if (!next)
- return false;
- unsigned __int64 distance = helper.extractRangeValue(next);
- if (distance >= minDistance)
- {
- if (distance <= maxDistance)
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- input->skip();
- }
- }
- bool CNaryJoinLookaheadQueue::checkExistsGE(const void * seek, unsigned numFields)
- {
- loop
- {
- const void * next = rows.head();
- if (!next)
- return false;
- if (stepCompare->docompare(next, seek, numFields) >= 0)
- return true;
- rows.skip();
- }
- }
- unsigned CNaryJoinLookaheadQueue::readAheadTo(unsigned __int64 maxDistance, bool canConsumeBeyondMax)
- {
- const void * tail = rows.tail();
- if (tail && helper.extractRangeValue(tail) > maxDistance)
- {
- unsigned limit = rows.ordinality() - 1;
- //Already have all the records, return how many...
- while (limit > 0)
- {
- const void * prev = rows.item(limit-1);
- if (helper.extractRangeValue(prev) <= maxDistance)
- return limit;
- --limit;
- }
- return 0;
- }
- while (!done)
- {
- const void * next = nextUnqueued();
- if (!next)
- {
- done = true;
- break;
- }
- //This is a bit nasty. We need to consume the next value to ensure that the lowest spotter always has the next valid
- //but it means we might be reading this input for too long
- if (helper.extractRangeValue(next) > maxDistance)
- {
- if (!canConsumeBeyondMax)
- break;
- consumeNextInput();
- return rows.ordinality()-1;
- }
- consumeNextInput();
- }
- return rows.ordinality();
- }
- void CNaryJoinLookaheadQueue::readCandidateAll()
- {
- loop
- {
- const void * next = nextUnqueued();
- if (!next)
- return;
- consumeNextInput();
- }
- }
- void CNaryJoinLookaheadQueue::skip()
- {
- if (matchedLeft && !matchedLeft->test(numSkipped))
- {
- unmatchedRows.enqueue(rows.dequeue());
- }
- else
- rows.skip();
- numSkipped++;
- }
- bool CNaryJoinLookaheadQueue::flushUnmatched()
- {
- while (rows.ordinality())
- skip();
- return unmatchedRows.ordinality() != 0;
- }
- //---------------------------------------------------------------------------
- CProximityJoinProcessor::CProximityJoinProcessor(IHThorNWayMergeJoinArg & _helper) :
- CMergeJoinProcessor(_helper)
- {
- maxRightBeforeLeft = 0;
- maxLeftBeforeRight = 0;
- }
- void CProximityJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- createTempSeekBuffer();
- //Have to delay creating the actual join joinProcessor because maxRightBeforeLeft() etc. can be onStart dependant.
- maxRightBeforeLeft = helper.maxRightBeforeLeft();
- maxLeftBeforeRight = helper.maxLeftBeforeRight();
- //Handle phrases using a different class i) because the general scheme doesn't quite work and ii) for efficiency
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- {
- if ((maxRightBeforeLeft < 0 || maxLeftBeforeRight < 0))
- outputProcessor.setown(new CAnchoredRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- else
- outputProcessor.setown(new CProximityRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- }
- else
- createMerger();
- }
- bool CProximityJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- unsigned firstInput = 0;//searchOrder.item(0);
- bool wasCompleteMatch = true;
- if (eof || !inputs.item(firstInput).next(seekValue, numSeekFields, wasCompleteMatch, nonBufferedMatchStepExtra))
- return false;
- unsigned matchCount = 1;
- clearMatches();
- matched[firstInput] = true;
- const unsigned numJoinFields = numEqualFields + 1;
- const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
- while (matchCount != numInputs)
- {
- unsigned nextInput = nextToMatch();
- unsigned baseInput = getBestToSeekFrom(nextInput);
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- helper.adjustRangeValue(rowBuilder, inputs.item(baseInput).next(), -maxDistanceBefore(baseInput, nextInput));
- bool wasCompleteMatch = true;
- //MORE: Would it help to allow mismatches? I would have thought so, but there was a previous comment sayimg "I don't think so because of the range calculation"
- const void * nextRow = inputs.item(nextInput).nextGE(tempSeekBuffer, numJoinFields, wasCompleteMatch, nonBufferedMatchStepExtra);
- assertex(wasCompleteMatch);
- if (!nextRow)
- {
- eof = true;
- return false;
- }
- if (inputsMustMatchEquality || equalityComponentMatches(nextRow, tempSeekBuffer))
- {
- //Now check if this new record causes other records to be too far away
- unsigned __int64 thisRangeValue = helper.extractRangeValue(nextRow);
- for (unsigned i=0; i<numInputs; i++)
- {
- if (matched[i])
- {
- unsigned __int64 seekRangeValue = adjustRangeValue(thisRangeValue, -maxDistanceBefore(nextInput, i));
- if (getRangeValue(i) < seekRangeValue)
- {
- inputs.item(i).skip();
- matched[i] = false;
- if (--matchCount == 0)
- break;
- }
- }
- }
- }
- else
- {
- for (unsigned i=0; i<numInputs; i++)
- {
- if (matched[i])
- {
- inputs.item(i).skip();
- matched[i] = false;
- matchCount--;
- }
- }
- }
- matched[nextInput] = true;
- matchCount++;
- }
- setCandidateRow(inputs.item(0).next(), false, NULL);
- return true;
- }
- __int64 CProximityJoinProcessor::maxDistanceBefore(unsigned fixedInput, unsigned searchInput) const
- {
- assertex(outputProcessor); // sanity check to ensure this isn't called before maxXBeforeY are set up
- if (searchInput < fixedInput)
- return maxLeftBeforeRight * (fixedInput - searchInput);
- else
- return maxRightBeforeLeft * (searchInput - fixedInput);
- }
- unsigned CProximityJoinProcessor::nextToMatch() const
- {
- for (unsigned i=0; i < numInputs; i++)
- {
- unsigned next = i;//searchOrder.item(i);
- if (!matched[next])
- return next;
- }
- throwUnexpected();
- }
- //Choose the input to seek from that restricts the input being sought the most.
- unsigned CProximityJoinProcessor::getBestToSeekFrom(unsigned seekInput) const
- {
- unsigned __int64 bestRangeValue = 0;
- unsigned best = NotFound;
- //MORE: This can be optimized!
- for (unsigned i=0; i < numInputs; i++)
- {
- if (matched[i])
- {
- //Calculate the value of the distance
- __int64 distanceBefore = maxDistanceBefore(i, seekInput);
- unsigned __int64 rangeValue = adjustRangeValue(getRangeValue(i), -distanceBefore);
- if (rangeValue >= bestRangeValue)
- {
- bestRangeValue = rangeValue;
- best = i;
- }
- }
- }
- assertex(best != NotFound);
- return best;
- }
- //---------------------------------------------------------------------------
- //NULL passed to CSteppedInputLookahead first parameter means nextGE() must be overridden
- CJoinGenerator::CJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- helper(_helper), inputAllocator(_inputAllocator), outputAllocator(_outputAllocator)
- {
- state = JSdone;
- unsigned flags = helper.getJoinFlags();
- stepCompare = helper.querySteppingMeta()->queryCompare();
- globalCompare = NULL;
- if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
- globalCompare = helper.queryGlobalCompare();
- unsigned numInputs = _inputs.ordinality();
- rows = new const void * [numInputs];
- CNaryJoinLookaheadQueue * prev = NULL;
- ForEachItemIn(i, _inputs)
- {
- CNaryJoinLookaheadQueue * queue = new CNaryJoinLookaheadQueue(inputAllocator, helper, &_inputs.item(i), prev, rows + i);
- inputs.append(*queue);
- prev = queue;
- }
- isSpecialLeftJoin = false;
- numActiveInputs = numInputs;
- lastActiveInput = numInputs ? &inputs.tos() : NULL;
- joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
- switch (joinKind)
- {
- case IHThorNWayMergeJoinArg::MJFleftonly:
- case IHThorNWayMergeJoinArg::MJFleftouter:
- if (helper.queryNonSteppedCompare() || globalCompare)
- {
- isSpecialLeftJoin = true;
- if (numInputs)
- inputs.item(0).trackUnmatched();
- }
- break;
- case IHThorNWayMergeJoinArg::MJFmofn:
- if (helper.queryNonSteppedCompare() || globalCompare)
- throw MakeStringException(99, "MOFN JOIN with non stepped condition not yet supported");
- break;
- }
- }
- CJoinGenerator::~CJoinGenerator()
- {
- delete [] rows;
- }
- void CJoinGenerator::beforeProcessCandidates(const void * candidateRow, bool needToVerifyNext, const bool * matched)
- {
- if (needToVerifyNext)
- {
- numActiveInputs = 0;
- CNaryJoinLookaheadQueue * prev = NULL;
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & cur = inputs.item(i);
- if (cur.beforeProcessCandidates(candidateRow, needToVerifyNext))
- {
- cur.updateContext(prev, rows + numActiveInputs);
- prev = &cur;
- numActiveInputs++;
- }
- }
- lastActiveInput = prev;
- }
- else
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).beforeProcessCandidates(candidateRow, needToVerifyNext);
- }
- state = JSfirst;
- }
- void CJoinGenerator::cleanupAllCandidates()
- {
- //Remove all pending candidates - only called if outer join optimization is enabled
- //afterProcessCandidates() will already have been called.
- ForEachItemIn(i, inputs)
- inputs.item(i).clearPending();
- }
- void CJoinGenerator::afterProcessCandidates()
- {
- }
- const void * CJoinGenerator::nextOutputRow()
- {
- RtlDynamicRowBuilder rowBuilder(outputAllocator, false);
- loop
- {
- if (isSpecialLeftJoin)
- {
- CNaryJoinLookaheadQueue & left = inputs.item(0);
- loop
- {
- const void * unmatchedLeft = left.nextUnmatched();
- if (!unmatchedLeft)
- break;
- rowBuilder.ensureRow();
- size32_t retSize = helper.transform(rowBuilder, 1, &unmatchedLeft);
- left.skipUnmatched();
- if (retSize)
- return rowBuilder.finalizeRowClear(retSize);
- }
- }
- switch (state)
- {
- case JSdone:
- if (isSpecialLeftJoin)
- {
- CNaryJoinLookaheadQueue & left = inputs.item(0);
- left.readCandidateAll();
- if (left.flushUnmatched())
- break; // round again
- }
- return NULL;
- case JShascandidate:
- {
- state = JSnextcandidate;
- //If is left only join, and has an additional equality criteria, then ignore matches.
- //If left only, and no extra equality - or only one dataset has matches, then all matches are real left only matches
- if (isSpecialLeftJoin && (joinKind == IHThorNWayMergeJoinArg::MJFleftonly) && (numActiveInputs != 1))
- break;
- rowBuilder.ensureRow();
- size32_t retSize = helper.transform(rowBuilder, numActiveInputs, rows);
- if (retSize)
- return rowBuilder.finalizeRowClear(retSize);
- break;
- }
- case JSnextcandidate:
- if (nextCandidate())
- state = JShascandidate;
- else
- {
- if (state != JSdone)
- state = JSgathercandidates;
- }
- break;
- case JSfirst:
- case JSgathercandidates:
- if (gatherNextCandidates())
- state = JShascandidate;
- else
- state = JSdone;
- break;
- default:
- throwUnexpected();
- }
- }
- }
- const void * CJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //A stupid version. We could possibly skip on the lowest value if we knew the fields were assigned from the lowest value in the input
- //which would potentially save a lot of transforms.
- //would also probably need the input to match the output.
- loop
- {
- const void * next = nextOutputRow();
- if (!next || stepCompare->docompare(next, seek, numFields) >= 0)
- return next;
- outputAllocator->releaseRow(next);
- }
- }
- bool CJoinGenerator::firstSelection()
- {
- if (lastActiveInput->firstSelection())
- {
- if (globalCompare && !globalCompare->match(numActiveInputs, rows))
- return nextSelection();
- if (isSpecialLeftJoin)
- inputs.item(0).noteMatched();
- return true;
- }
- return false;
- }
- bool CJoinGenerator::nextSelection()
- {
- while (lastActiveInput->nextSelection())
- {
- if (!globalCompare || globalCompare->match(numActiveInputs, rows))
- {
- if (isSpecialLeftJoin)
- inputs.item(0).noteMatched();
- return true;
- }
- }
- return false;
- }
- //---------------------------------------------------------------------------
- CEqualityJoinGenerator::CEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- lowestInput = NULL;
- }
- void CEqualityJoinGenerator::afterProcessCandidates()
- {
- lowestInput = NULL;
- CJoinGenerator::afterProcessCandidates();
- }
- bool CEqualityJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- selectNextLowestInput();
- return false;
- }
- /*
- o Walk the input which is guaranteed to be the lowest
- o Once that is done throw away that record, and choose the next.
- */
- bool CEqualityJoinGenerator::doGatherNextCandidates()
- {
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (&curInput != lowestInput)
- curInput.setCandidateAll();
- else
- curInput.setCandidateLowest();
- }
- return firstSelection();
- }
- bool CEqualityJoinGenerator::gatherNextCandidates()
- {
- if (state == JSfirst)
- {
- prefetchAllCandidates();
- selectLowestInput();
- }
- else if (lowestInput->empty())
- return false;
- loop
- {
- if (doGatherNextCandidates())
- return true;
- if (!selectNextLowestInput())
- return false;
- }
- }
- void CEqualityJoinGenerator::prefetchAllCandidates()
- {
- //could be done in parallel, but
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(i);
- curInput.readCandidateAll();
- }
- }
- void CEqualityJoinGenerator::selectLowestInput()
- {
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(i);
- if (!curInput.empty())
- {
- lowestInput = &curInput;
- return;
- }
- }
- throwUnexpected();
- }
- bool CEqualityJoinGenerator::selectNextLowestInput()
- {
- lowestInput->skip();
- if (lowestInput->empty())
- {
- state = JSdone;
- return false;
- }
- return true;
- }
- //---------------------------------------------------------------------------
- CSortedEqualityJoinGenerator::CSortedEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CEqualityJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
- {
- lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
- lowestSpotter.initInputs();
- }
- CSortedEqualityJoinGenerator::~CSortedEqualityJoinGenerator()
- {
- lowestSpotter.cleanup();
- }
- void CSortedEqualityJoinGenerator::afterProcessCandidates()
- {
- lowestSpotter.reset();
- CEqualityJoinGenerator::afterProcessCandidates();
- }
- void CSortedEqualityJoinGenerator::selectLowestInput()
- {
- unsigned iLowest = lowestSpotter.queryNextInput();
- assertex(iLowest != NotFound);
- lowestInput = &inputs.item(iLowest);
- }
- bool CSortedEqualityJoinGenerator::selectNextLowestInput()
- {
- lowestSpotter.skipRow();
- if (lowestInput->empty())
- {
- state = JSdone;
- return false;
- }
- CSortedEqualityJoinGenerator::selectLowestInput();
- return true;
- }
- //---------------------------------------------------------------------------
- CRangeJoinGenerator::CRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- maxRightBeforeLeft = helper.maxRightBeforeLeft();
- maxLeftBeforeRight = helper.maxLeftBeforeRight();
- }
- //---------------------------------------------------------------------------
- CAnchoredRangeJoinGenerator::CAnchoredRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- iLowest = maxRightBeforeLeft < 0 ? 0 : inputs.ordinality()-1;
- lowestInput = &inputs.item(iLowest);
- }
- bool CAnchoredRangeJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- lowestInput->skip();
- return false;
- }
- /*
- o Walk the input which is guaranteed to be the lowest
- o Once that is done throw away that record, and choose the next.
- */
- bool CAnchoredRangeJoinGenerator::doGatherNextCandidates()
- {
- const void * lowestRow = lowestInput->next();
- if (!lowestRow)
- return false;
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- __int64 maxLowestBeforeCur = maxDistanceAfterLowest(iInput);
- assertex(maxLowestBeforeCur > 0);
- unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
- if (!curInput.setCandidateRange(maxDistance, false))
- return false;
- }
- }
- lowestInput->setCandidateLowest();
- return firstSelection();
- }
- const void * CAnchoredRangeJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //Note: Skip any lower values that are less than seek value, but don't read any more
- if (!lowestInput->checkExistsGE(seek, numFields))
- return NULL;
- return CRangeJoinGenerator::nextOutputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- bool CAnchoredRangeJoinGenerator::nextMatchesAnyConsumed()
- {
- const void * lowestRow = lowestInput->next();
- bool consumePending = false;
- if (!lowestRow)
- {
- lowestRow = lowestInput->nextUnqueued();
- if (!lowestRow)
- return false;
- consumePending = true;
- }
- //Throw any non-matching rows away, and return true if there are no other rows left.
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- //note: maxRightBeforeLeft is -minRightAfterLeft
- __int64 minCurAfterLowest;
- if (iInput < iLowest)
- minCurAfterLowest = (-maxLeftBeforeRight) * (iLowest - iInput);
- else
- minCurAfterLowest = (-maxRightBeforeLeft) * (iInput - iLowest);
- assertex(minCurAfterLowest >= 0);
- if (!curInput.ensureCandidateExists(lowestDistance+minCurAfterLowest, lowestDistance + maxDistanceAfterLowest(iInput)))
- return false;
- }
- }
- //A potential match, so consume the potential start word and try again
- if (consumePending)
- lowestInput->consumeNextInput();
- return true;
- }
- bool CAnchoredRangeJoinGenerator::gatherNextCandidates()
- {
- loop
- {
- if (!nextMatchesAnyConsumed())
- return false;
- if (doGatherNextCandidates())
- return true;
- lowestInput->skip();
- }
- }
- //---------------------------------------------------------------------------
- CProximityRangeJoinGenerator::CProximityRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
- {
- lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
- lowestSpotter.initInputs();
- }
- CProximityRangeJoinGenerator::~CProximityRangeJoinGenerator()
- {
- lowestSpotter.cleanup();
- }
- void CProximityRangeJoinGenerator::afterProcessCandidates()
- {
- lowestSpotter.reset();
- CRangeJoinGenerator::afterProcessCandidates();
- }
- bool CProximityRangeJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- if (!lowestSpotter.skipNextLowest())
- state = JSdone;
- return false;
- }
- /*
- First version.....
- o Walk the input datasets in the order lowest first.
- o Perform the cross product of that record with all others that could possibly match.
- o Once that is done throw away that record, and choose the next.
- o Abort as soon as any of the inputs contains no records within potential range.
- */
- bool CProximityRangeJoinGenerator::gatherNextCandidates(unsigned iLowest)
- {
- CNaryJoinLookaheadQueue & lowestInput = inputs.item(iLowest);
- const void * lowestRow = lowestInput.next();
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- __int64 maxLowestBeforeCur;
- if (iInput < iLowest)
- maxLowestBeforeCur = maxRightBeforeLeft * (iLowest - iInput);
- else
- maxLowestBeforeCur = maxLeftBeforeRight * (iInput - iLowest);
- assertex(maxLowestBeforeCur >= 0); // should have created an anchored varient if not true
- // maxLowestBeforeCur = maxCurAfterLowest
- unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
- if (!curInput.setCandidateRange(maxDistance, true))
- return false;
- }
- else
- curInput.setCandidateLowest();
- }
- return firstSelection();
- }
- bool CProximityRangeJoinGenerator::gatherNextCandidates()
- {
- loop
- {
- unsigned iLowest = lowestSpotter.queryNextInput();
- assertex(iLowest != NotFound);
- if (gatherNextCandidates(iLowest))
- return true;
- //It would be really nice to break out early if there were no more potential matches, but even if there
- //is only one matching stream we need to keep walking the consumed records, because the later records
- //may pull in the relevant related records, and we can't sensibly put back the consumed records
- if (!lowestSpotter.skipNextLowest())
- {
- //No more records within this document => can't ever match
- state = JSdone;
- return false;
- }
- }
- }
|