1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jlib.hpp"
- #include "jlog.hpp"
- #include "jqueue.tpp"
- #include "jexcept.hpp"
- #include "thorcommon.hpp"
- #include "thorstep.ipp"
- #include "thorstep2.ipp"
- #include "roxiemem.hpp"
- #ifdef _DEBUG
- #define CHECK_CONSISTENCY
- #endif
- using roxiemem::OwnedConstRoxieRow;
- const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
- const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
- const static SmartStepExtra nonSeekStepExtra(SSEFreturnUnbufferedMatches, NULL); // if doing next() instead of nextGE()
- const static SmartStepExtra nonBufferedMatchStepExtra(SSEFreturnUnbufferedMatches, NULL);
- const static SmartStepExtra nonBufferedMismatchStepExtra(SSEFreturnMismatches, NULL);
- bool stepFieldsMatch(const CFieldOffsetSize * leftFields, unsigned leftIndex, const CFieldOffsetSize * rightFields, unsigned rightIndex)
- {
- const CFieldOffsetSize * leftField = leftFields + leftIndex;
- const CFieldOffsetSize * rightField = rightFields + rightIndex;
- return (leftField->offset == rightField->offset) && (leftField->size == rightField->size);
- }
- bool stepFieldsMatch(ISteppingMeta * leftMeta, unsigned leftIndex, ISteppingMeta * rightMeta, unsigned rightIndex)
- {
- if ((leftIndex >= leftMeta->getNumFields()) || (rightIndex >= rightMeta->getNumFields()))
- return false;
- return stepFieldsMatch(leftMeta->queryFields(), leftIndex, rightMeta->queryFields(), rightIndex);
- }
- unsigned getNumMatchingFields(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
- {
- unsigned numStepableFields = 0;
- if (inputStepping && callerStepping)
- {
- //Determine where the stepping fields overlap, and work out the extent.
- unsigned inputCount = inputStepping->getNumFields();
- for (unsigned i=0; i < inputCount; i++)
- {
- if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
- break;
- numStepableFields++;
- }
- }
- return numStepableFields;
- }
- void verifySteppingCompatible(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
- {
- if (inputStepping && callerStepping)
- {
- //Determine where the stepping fields overlap, and work out the extent.
- unsigned parentCount = callerStepping->getNumFields();
- unsigned inputCount = inputStepping->getNumFields();
- unsigned max = parentCount < inputCount ? parentCount : inputCount;
- for (unsigned i=0; i < max; i++)
- {
- if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
- throw MakeStringException(999, "Stepping field %d, input and join do not match", i);
- }
- }
- }
- //---------------------------------------------------------------------------
- void CSteppingMeta::intersect(IInputSteppingMeta * inputMeta)
- {
- if (inputMeta)
- {
- unsigned maxFields = inputMeta->getNumFields();
- if (maxFields > numFields)
- maxFields = numFields;
- for (unsigned curField = 0; curField < maxFields; curField++)
- {
- if (!stepFieldsMatch(inputMeta->queryFields(), curField, fields, curField))
- {
- numFields = curField;
- break;
- }
- }
- if (inputMeta->hasPostFilter())
- postFiltered = true;
- if (inputMeta->isDistributed())
- setDistributed();
- unsigned inputFlags = inputMeta->getSteppedFlags();
- double inputPriority = inputMeta->getPriority();
- if (hadStepExtra)
- {
- stepFlags &= inputFlags;
- if (priority != inputPriority)
- stepFlags &= ~SSFhaspriority;
- }
- else
- {
- hadStepExtra = true;
- stepFlags = inputFlags;
- priority = inputPriority;
- }
- }
- else
- numFields = 0;
- }
- //---------------------------------------------------------------------------
- CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSteppingMeta * _inputStepping, IEngineRowAllocator * _rowAllocator, IRangeCompare * _compare, bool _paranoid)
- : input(_input), compare(_compare)
- {
- maxFields = compare ? compare->maxFields() : 0;
- readAheadRow = NULL;
- readAheadRowIsExactMatch = true;
- stepFlagsMask = 0;
- stepFlagsValue = 0;
- paranoid = _paranoid;
- previousReadAheadRow = NULL;
- rowAllocator.set(_rowAllocator);
- inputStepping = _inputStepping;
- numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
- isPostFiltered = inputStepping ? inputStepping->hasPostFilter() : false;
- setRestriction(NULL, 0);
- lowestFrequencyInput = NULL;
- }
- CSteppedInputLookahead::~CSteppedInputLookahead()
- {
- if (previousReadAheadRow)
- rowAllocator->releaseRow(previousReadAheadRow);
- if (readAheadRow)
- rowAllocator->releaseRow(readAheadRow);
- }
- const void * CSteppedInputLookahead::nextInputRow()
- {
- if (readAheadRows.ordinality())
- return readAheadRows.dequeue();
- return input->nextInputRow();
- }
-
- const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- while (readAheadRows.ordinality())
- {
- OwnedConstRoxieRow next = readAheadRows.dequeue();
- if (compare->docompare(next, seek, numFields) >= 0)
- {
- assertex(wasCompleteMatch);
- return (void *)next.getClear();
- }
- }
- return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
- {
- const void * lastSeekRow = NULL;
- //Remove any rows from the seek list that occur before the new seek row
- while (seekRows.ordinality())
- {
- const void * next = seekRows.head();
- if (compare->docompare(next, seek, numFields) >= 0)
- {
- //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
- seek = seekRows.tail();
- lastSeekRow = seek;
- break;
- }
- rowAllocator->releaseRow(seekRows.dequeue());
- }
- //Could the current readahead row be part of the seek set.
- if (readAheadRow && compare->docompare(readAheadRow, seek, numFields) >= 0)
- {
- //Check not already added - could conceivably happen after rows are read directly beyond the matching seeks.
- if (!lastSeekRow || compare->docompare(readAheadRow, lastSeekRow, numFields) > 0)
- {
- seekRows.enqueue(rowAllocator->linkRow(readAheadRow));
- lastSeekRow = readAheadRow;
- seek = readAheadRow;
- }
- }
- //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
- unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- seekRows.ensure(maxcount);
- while (seekRows.ordinality() < maxcount)
- {
- bool wasCompleteMatch = true;
- const void * next = input->nextInputRowGE(seek, numFields, wasCompleteMatch, inputStepExtra);
- if (!next)
- break;
- //wasCompleteMatch can be false if we've just read the last row returned from a block of reads,
- //but if so the next read request will do another blocked read, so just ignore this one.
- if (wasCompleteMatch)
- {
- readAheadRows.enqueue(next);
- if (!lastSeekRow || compare->docompare(next, lastSeekRow, numFields) > 0)
- {
- //Only record unique seek positions in the seek rows
- seekRows.enqueue(rowAllocator->linkRow(next));
- lastSeekRow = next;
- }
- //update the seek pointer to the best value.
- seek = next;
- }
- else
- rowAllocator->releaseRow(next);
- }
- }
- unsigned CSteppedInputLookahead::ordinality() const
- {
- return seekRows.ordinality();
- }
- const void * CSteppedInputLookahead::querySeek(unsigned i) const
- {
- return seekRows.item(i);
- }
- const void * CSteppedInputLookahead::consume()
- {
- if (!readAheadRow)
- fill();
- if (!includeInOutput(readAheadRow))
- return NULL;
- if (paranoid && readAheadRow)
- {
- if (previousReadAheadRow)
- rowAllocator->releaseRow(previousReadAheadRow);
- previousReadAheadRow = rowAllocator->linkRow(readAheadRow);
- }
- const void * ret = readAheadRow;
- readAheadRow = NULL;
- readAheadRowIsExactMatch = true;
- return ret;
- }
- IMultipleStepSeekInfo * CSteppedInputLookahead::createMutipleReadWrapper()
- {
- return this;
- }
- void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper)
- {
- lowestFrequencyInput = wrapper;
- }
- void CSteppedInputLookahead::fill()
- {
- readAheadRowIsExactMatch = true;
- if (restrictValue && numStepableFields)
- {
- //note - this will either return a valid value to be included in the range,
- //or if invalid then it must be out of range -> will fail includeInOutput later,
- //but we may as well keep the row
- unsigned numFields = numRestrictFields < numStepableFields ? numRestrictFields : numStepableFields;
- //Default to returning mismatches, but could be overidden from outside
- unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- readAheadRow = nextInputRowGE(restrictValue, numFields, readAheadRowIsExactMatch, inputStepExtra);
- if (paranoid && readAheadRow)
- {
- int c = compare->docompare(readAheadRow, restrictValue, numFields);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join preceeds seek point");
- if ((c == 0) && !readAheadRowIsExactMatch)
- throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
- }
- }
- else
- {
- //Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
- //Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
- readAheadRow = nextInputRow();
- }
- if (paranoid && readAheadRow && previousReadAheadRow && compare)
- {
- if (compare->docompare(previousReadAheadRow, readAheadRow, maxFields) > 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- }
- const void * CSteppedInputLookahead::next()
- {
- if (!readAheadRowIsExactMatch)
- {
- if (includeInOutput(readAheadRow))
- skip();
- else
- return NULL;
- }
- if (!readAheadRow)
- fill();
- if (!includeInOutput(readAheadRow))
- return NULL;
- return readAheadRow;
- }
- const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- if (readAheadRow)
- {
- int c = compare->docompare(readAheadRow, seek, numFields);
- if (c >= 0)
- {
- if (!includeInOutput(readAheadRow))
- return NULL;
- if (readAheadRowIsExactMatch)
- return readAheadRow;
- //readAheadRow is beyond seek point => ok to return an incomplete match
- if (stepExtra.returnMismatches() && (c != 0))
- {
- wasCompleteMatch = readAheadRowIsExactMatch;
- return readAheadRow;
- }
- }
- skip();
- }
- if (numStepableFields)
- {
- //This class is directly told whether it should be using readAhead, so need to create a modified stepExtra
- unsigned flags = (stepExtra.queryFlags() & ~stepFlagsMask) | stepFlagsValue;
- SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
- unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
- for (;;)
- {
- readAheadRowIsExactMatch = true;
- readAheadRow = nextInputRowGE(seek, stepFields, readAheadRowIsExactMatch, inputStepExtra);
- if (paranoid && readAheadRow)
- {
- int c = compare->docompare(readAheadRow, seek, stepFields);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join preceeds seek point");
- if ((c == 0) && !readAheadRowIsExactMatch)
- throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
- }
- if (!readAheadRow || !includeInOutput(readAheadRow))
- return NULL;
- if (numFields <= numStepableFields)
- {
- wasCompleteMatch = readAheadRowIsExactMatch;
- return readAheadRow;
- }
- //if !readAheadRowIsExactMatch then isCompleteMatch must have been provided => ok to return a mismatch
- //if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
- if (!readAheadRowIsExactMatch)
- {
- wasCompleteMatch = readAheadRowIsExactMatch;
- return readAheadRow;
- }
- if (compare->docompare(readAheadRow, seek, numFields) >= 0)
- {
- wasCompleteMatch = readAheadRowIsExactMatch;
- return readAheadRow;
- }
- skip();
- }
- //now need to do an incremental seek on the subsequent fields to find an exact value >
- }
- //now narrow down
- for (;;)
- {
- const void * cur = next();
- if (!cur)
- return NULL;
- if (compare->docompare(cur, seek, numFields) >= 0)
- return cur;
- skip();
- }
- }
- unsigned CSteppedInputLookahead::queryMaxStepable(ISteppingMeta * callerStepping) const
- {
- return getNumMatchingFields(inputStepping, callerStepping);
- }
- void CSteppedInputLookahead::setAlwaysReadExact()
- {
- //can be used to force reading only exact matches (for the known lowest priority input)
- stepFlagsMask |= SSEFreturnMismatches;
- }
- void CSteppedInputLookahead::setReadAhead(bool value)
- {
- //This never removes readahead if requested somewhere else, so don't update the mask.
- if (value)
- stepFlagsValue |= SSEFreadAhead;
- else
- stepFlagsValue &= ~SSEFreadAhead;
- }
- void CSteppedInputLookahead::setRestriction(const void * _value, unsigned _num)
- {
- restrictValue = _value;
- numRestrictFields = _num;
- }
- void CSteppedInputLookahead::resetEOF()
- {
- if (numRestrictFields == 0)
- resetInputEOF();
- }
- void CSteppedInputLookahead::skip()
- {
- if (paranoid)
- {
- if (previousReadAheadRow)
- rowAllocator->releaseRow(previousReadAheadRow);
- previousReadAheadRow = readAheadRow;
- }
- else
- {
- if (readAheadRow)
- rowAllocator->releaseRow(readAheadRow);
- }
- //NB: Don't read ahead until we have to...
- readAheadRow = NULL;
- readAheadRowIsExactMatch = true;
- }
- const void * CSteppedInputLookahead::skipnext()
- {
- skip();
- return next();
- }
- //---------------------------------------------------------------------------
- void CUnfilteredSteppedMerger::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext, const bool * matched)
- {
- merger.setCandidateRow(_equalityRow);
- unsigned numInputs = inputArray->ordinality();
- for (unsigned i=0; i< numInputs; i++)
- {
- if (!needToVerifyNext || matched[i])
- firstCandidateRows[i] = inputArray->item(i).consume();
- else
- firstCandidateRows[i] = NULL;
- }
- merger.primeRows(firstCandidateRows);
- }
- //---------------------------------------------------------------------------
- CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRangeCompare * _stepCompare, ICompare * _equalCompare, CSteppedInputLookahead * _input, unsigned _numEqualFields)
- {
- allocator = _allocator;
- stepCompare = _stepCompare;
- equalCompare = _equalCompare;
- input = _input;
- matched.setown(createThreadSafeBitSet());
- numMatched = 0;
- readIndex = 0;
- numEqualFields = _numEqualFields;
- }
- CFilteredInputBuffer::~CFilteredInputBuffer()
- {
- }
- const void * CFilteredInputBuffer::consume()
- {
- if (!rows.isItem(readIndex))
- return NULL;
- const void * ret = rows.item(readIndex);
- rows.replace(NULL, readIndex);
- readIndex++;
- return ret;
- }
- const void * CFilteredInputBuffer::consumeGE(const void * seek, unsigned numFields)
- {
- while (rows.isItem(readIndex))
- {
- const void * cur = rows.item(readIndex);
- if (stepCompare->docompare(cur, seek, numFields) >= 0)
- {
- rows.replace(NULL, readIndex);
- readIndex++;
- return cur;
- }
- readIndex++;
- }
- return NULL;
- }
- void CFilteredInputBuffer::fill(const void * equalityRow)
- {
- const void * next = input->consume();
- assertex(next);
- append(next);
- if (equalityRow)
- {
- for (;;)
- {
- bool matches = true;
- SmartStepExtra stepExtra(SSEFreturnMismatches, NULL);
- const void * next = input->nextGE(equalityRow, numEqualFields, matches, stepExtra);
- if (!next || !matches || equalCompare->docompare(equalityRow, next) != 0)
- break;
- append(input->consume());
- }
- }
- else
- {
- for (;;)
- {
- const void * next = input->consume();
- if (!next)
- break;
- append(next);
- }
- }
- }
- void CFilteredInputBuffer::removeMatched()
- {
- ForEachItemInRev(i, rows)
- {
- if (isMatched(i))
- remove(i);
- }
- }
- void CFilteredInputBuffer::removeUnmatched()
- {
- ForEachItemInRev(i, rows)
- {
- if (!isMatched(i))
- remove(i);
- }
- }
- void CFilteredInputBuffer::remove(unsigned i)
- {
- const void * row = rows.item(i);
- rows.remove(i);
- allocator->releaseRow(row);
- }
- void CFilteredInputBuffer::reset()
- {
- ForEachItemIn(i, rows)
- {
- const void * cur = rows.item(i);
- if (cur)
- allocator->releaseRow(cur);
- }
- rows.kill();
- matched->reset();
- numMatched = 0;
- readIndex = 0;
- }
- CFilteredSteppedMerger::CFilteredSteppedMerger()
- {
- matches = NULL;
- joinKind = 0;
- numInputs = 0;
- equalCompare = NULL;
- extraCompare = NULL;
- globalCompare = NULL;
- minMatches = 0;
- maxMatches = 0;
- fullyMatchedLevel = 0;
- }
- CFilteredSteppedMerger::~CFilteredSteppedMerger()
- {
- delete [] matches;
- }
- void CFilteredSteppedMerger::init(IEngineRowAllocator * _allocator, IHThorNWayMergeJoinArg & helper, CSteppedInputLookaheadArray * inputArray)
- {
- unsigned flags = helper.getJoinFlags();
- joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
- numInputs = inputArray->ordinality();
- matches = new const void * [numInputs];
- equalCompare = helper.queryEqualCompare();
- extraCompare = helper.queryNonSteppedCompare();
- globalCompare = NULL;
- unsigned numEqualFields = helper.numEqualFields();
- if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
- globalCompare = helper.queryGlobalCompare();
- if (joinKind == IHThorNWayMergeJoinArg::MJFmofn)
- {
- minMatches = helper.getMinMatches();
- maxMatches = helper.getMaxMatches();
- }
- else
- {
- minMatches = numInputs;
- maxMatches = numInputs;
- }
- IRangeCompare * stepCompare = helper.querySteppingMeta()->queryCompare();
- ForEachItemIn(i, *inputArray)
- inputs.append(*new CFilteredInputBuffer(_allocator, stepCompare, equalCompare, &inputArray->item(i), numEqualFields));
- merger.init(_allocator, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
- merger.initInputs(&inputs);
- }
- //ISteppedJoinRowGenerator
- void CFilteredSteppedMerger::beforeProcessCandidates(const void * equalityRow, bool needToVerifyNext, const bool * matched)
- {
- //Exaustively read from each of the inputs into each of the buffers
- ForEachItemIn(i, inputs)
- {
- if (!needToVerifyNext || matched[i])
- inputs.item(i).fill(equalityRow);
- }
- postFilterRows();
- //No point priming the rows here - will be just as efficient to use the default action
- }
- void CFilteredSteppedMerger::afterProcessCandidates()
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).reset();
- merger.reset();
- }
- void CFilteredSteppedMerger::cleanupAllCandidates()
- {
- merger.reset(); // not strictly necessary...
- }
- void CFilteredSteppedMerger::afterProcessingAll()
- {
- merger.cleanup();
- }
-
- const void * CFilteredSteppedMerger::nextOutputRow()
- {
- return merger.nextRow();
- }
- const void * CFilteredSteppedMerger::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- return merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- bool CFilteredSteppedMerger::tagMatches(unsigned level, unsigned numRows)
- {
- CFilteredInputBuffer & right = inputs.item(level);
- unsigned maxLevel = inputs.ordinality()-1;
- ConstPointerArray & curRows = right.rows;
- if (curRows.ordinality())
- {
- bool valid = false;
- const void * lhs = matches[numRows-1];
- ForEachItemIn(i, curRows)
- {
- //If we have had a match at this level, and this item is already matched,
- //and all levels higher than this have already been completely matched),
- //then no need to check this item (and its children) again, since it won't change anything.
- bool alreadyMatched = right.isMatched(i);
- if (!valid || level + 1 < fullyMatchedLevel || !alreadyMatched)
- {
- const void * rhs = curRows.item(i);
- unsigned matchedRows = numRows;
- bool recurse;
- if (!extraCompare || extraCompare->match(lhs, rhs))
- {
- matches[matchedRows++] = rhs;
- recurse = matchedRows <= maxMatches;
- }
- else
- {
- //for mofn, check enough levels left to create a potential match, for others it will fail.
- unsigned remain = maxLevel-level;
- recurse = (numRows + remain >= minMatches);
- }
- if (recurse)
- {
- bool isFullMatch;
- if (level == maxLevel)
- isFullMatch = (!globalCompare || globalCompare->match(matchedRows, matches));
- else
- isFullMatch = tagMatches(level+1, matchedRows);
- if (isFullMatch)
- {
- valid = true;
- if (!alreadyMatched)
- right.noteMatch(i);
- //If the previous level is fully matched, and so is this one - then update the minimum fully matched level
- if ((level + 1 == fullyMatchedLevel) && right.isFullyMatched())
- fullyMatchedLevel--;
- //If all rows in this level and above are fully matched, then iterating any further will have no effect.
- //Could potentially reduce a O(N^m) to O(mN) if the majority of elements match.
- if (level >= fullyMatchedLevel)
- break;
- }
- }
- }
- }
- return valid;
- }
- else
- {
- //mofn may still be ok with a skipped level or two
- unsigned remain = maxLevel-level;
- if (numRows + remain >= minMatches)
- {
- if (level == maxLevel)
- return (!globalCompare || globalCompare->match(numRows, matches));
- else
- return tagMatches(level+1, numRows);
- }
- return false;
- }
- }
- void CFilteredSteppedMerger::tagMatches()
- {
- unsigned numInputs = inputs.ordinality();
- fullyMatchedLevel = numInputs;
- //for m of n, need to start matching at levels 0,1,.. numLevels - minMatches
- unsigned iterateLevels = numInputs - minMatches;
- for (unsigned level =0; level <= iterateLevels; level++)
- {
- CFilteredInputBuffer & left = inputs.item(level);
- ForEachItemIn(i, left.rows)
- {
- matches[0] = left.rows.item(i);
- bool thisMatched;
- //mofn(1) may not have another level, to just check global compare.
- if (level == numInputs-1)
- thisMatched = (!globalCompare || globalCompare->match(1, matches));
- else
- thisMatched = tagMatches(level+1, 1);
- if (thisMatched)
- {
- if (!left.isMatched(i))
- left.noteMatch(i);
- //Check if this level, and all above are now fully matched. If so, we're done.
- if ((level + 1 == fullyMatchedLevel) && left.isFullyMatched())
- {
- fullyMatchedLevel--;
- break;
- }
- }
- }
- if (level >= fullyMatchedLevel)
- break;
- }
- }
- void CFilteredSteppedMerger::postFilterRows()
- {
- tagMatches();
- unsigned max = inputs.ordinality();
- switch (joinKind)
- {
- case IHThorNWayMergeJoinArg::MJFinner:
- case IHThorNWayMergeJoinArg::MJFmofn:
- {
- for (unsigned i=0; i < max; i++)
- inputs.item(i).removeUnmatched();
- break;
- }
- case IHThorNWayMergeJoinArg::MJFleftouter:
- {
- for (unsigned i=1; i < max; i++)
- inputs.item(i).removeUnmatched();
- break;
- }
- case IHThorNWayMergeJoinArg::MJFleftonly:
- {
- inputs.item(0).removeMatched();
- unsigned max = inputs.ordinality();
- for (unsigned i=1; i < max; i++)
- inputs.item(i).reset();
- break;
- }
- }
- }
- //---------------------------------------------------------------------------
- CMergeJoinProcessor::CMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : helper(_arg)
- {
- mergeSteppingMeta = helper.querySteppingMeta();
- assertex(mergeSteppingMeta);
- stepCompare = mergeSteppingMeta->queryCompare();
- equalCompare = helper.queryEqualCompare();
- equalCompareEq = helper.queryEqualCompareEq();
- numEqualFields = helper.numEqualFields();
- flags = helper.getJoinFlags();
- matched = NULL;
- candidateEqualityRow = NULL;
- numExternalEqualFields = 0;
- conjunctionOptimizer = NULL;
- tempSeekBuffer = NULL;
- lowestSeekRow = NULL;
- combineConjunctions = true;
- allInputsAreOuterInputs = false;
- maxSeekRecordSize = 0;
- numInputs = 0;
- eof = true;
- assertex(helper.numOrderFields() == mergeSteppingMeta->getNumFields());
- bool hasPostfilter = false;
- thisSteppingMeta.init(mergeSteppingMeta->getNumFields(), mergeSteppingMeta->queryFields(), stepCompare, mergeSteppingMeta->queryDistance(), hasPostfilter, SSFhaspriority|SSFisjoin);
- }
- CMergeJoinProcessor::~CMergeJoinProcessor()
- {
- afterProcessing();
- }
- void CMergeJoinProcessor::addInput(ISteppedInput * _input)
- {
- IInputSteppingMeta * _meta = _input->queryInputSteppingMeta();
- verifySteppingCompatible(_meta, mergeSteppingMeta);
- rawInputs.append(*LINK(_input));
- if (_meta)
- {
- if (!_meta->hasPriority())
- thisSteppingMeta.removePriority();
- else
- thisSteppingMeta.intersectPriority(_meta->getPriority());
- if (_meta->isDistributed())
- thisSteppingMeta.setDistributed();
- }
- else
- thisSteppingMeta.removePriority();
- }
- void CMergeJoinProcessor::afterProcessing()
- {
- cleanupCandidates();
- if (outputProcessor)
- {
- outputProcessor->afterProcessingAll();
- outputProcessor.clear();
- }
- if (conjunctionOptimizer)
- {
- conjunctionOptimizer->afterProcessing();
- delete conjunctionOptimizer;
- conjunctionOptimizer = NULL;
- }
- delete [] matched;
- matched = NULL;
- inputs.kill();
- rawInputs.kill();
- orderedInputs.kill();
- if (lowestSeekRow)
- {
- inputAllocator->releaseRow(lowestSeekRow);
- lowestSeekRow = NULL;
- }
- if (tempSeekBuffer)
- {
- inputAllocator->releaseRow(tempSeekBuffer);
- tempSeekBuffer = NULL;
- }
- //Now free the allocators
- inputAllocator.clear();
- outputAllocator.clear();
- maxSeekRecordSize = 0;
- }
- void CMergeJoinProcessor::createTempSeekBuffer()
- {
- tempSeekBuffer = inputAllocator->createRow();
- #ifdef _DEBUG
- //Clear the complete tempSeekBBuffer record, so that toXML() can be used to trace the seek row in roxie
- if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
- {
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, inputAllocator->queryOutputMeta()->getMinRecordSize());
- helper.createLowInputRow(rowBuilder);
- }
- #endif
- }
- void CMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- inputAllocator.set(_inputAllocator);
- outputAllocator.set(_outputAllocator);
- //The seek components must all be fixed width, so the seek record size must be <= the minimum size of the input record
- maxSeekRecordSize = inputAllocator->queryOutputMeta()->getMinRecordSize();
- bool paranoid = (flags & IHThorNWayMergeJoinArg::MJFassertsorted) != 0;
- ForEachItemIn(i1, rawInputs)
- {
- ISteppedInput & cur = rawInputs.item(i1);
- inputs.append(* new CSteppedInputLookahead(&cur, cur.queryInputSteppingMeta(), inputAllocator, stepCompare, paranoid));
- }
- if (flags & IHThorNWayMergeJoinArg::MJFhasclearlow)
- {
- RtlDynamicRowBuilder rowBuilder(inputAllocator);
- size32_t size = helper.createLowInputRow(rowBuilder);
- lowestSeekRow = rowBuilder.finalizeRowClear(size);
- }
- cleanupCandidates();
- eof = false;
- numInputs = inputs.ordinality();
- matched = new bool[numInputs];
- if (numInputs == 0)
- eof = true;
- //Sort the inputs by the preferred processing order (if provided), ensuring no duplicates
- clearMatches();
- ForEachItemIn(i2, searchOrder)
- {
- unsigned next = searchOrder.item(i2);
- if (next < numInputs && !matched[next])
- {
- orderedInputs.append(OLINK(inputs.item(next)));
- matched[next] = true;
- }
- }
- //MORE: We really should move the most-stepable inputs to the start
- for (unsigned i3 = 0; i3 < numInputs; i3++)
- {
- if (!matched[i3])
- orderedInputs.append(OLINK(inputs.item(i3)));
- }
- }
- bool CMergeJoinProcessor::createConjunctionOptimizer()
- {
- if (inputs.ordinality())
- {
- conjunctionOptimizer = new CSteppedConjunctionOptimizer(inputAllocator, helper, this);
- if (gatherConjunctions(*conjunctionOptimizer) && conjunctionOptimizer->worthCombining())
- {
- conjunctionOptimizer->beforeProcessing();
- return true;
- }
- delete conjunctionOptimizer;
- conjunctionOptimizer = NULL;
- }
- combineConjunctions = false;
- return false;
- }
- void CMergeJoinProcessor::createMerger()
- {
- ICompareEq * extraCompare = helper.queryNonSteppedCompare();
- bool hasGlobalCompare = (flags & IHThorNWayMergeJoinArg::MJFglobalcompare) != 0;
- if (!extraCompare && !hasGlobalCompare)
- {
- Owned<CUnfilteredSteppedMerger> simpleMerger = new CUnfilteredSteppedMerger(numEqualFields);
- simpleMerger->init(inputAllocator, equalCompare, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
- simpleMerger->initInputs(&inputs);
- outputProcessor.setown(simpleMerger.getClear());
- }
- else
- {
- Owned<CFilteredSteppedMerger> simpleMerger = new CFilteredSteppedMerger();
- simpleMerger->init(inputAllocator, helper, &inputs);
- outputProcessor.setown(simpleMerger.getClear());
- }
- }
- void CMergeJoinProcessor::createEqualityJoinProcessor()
- {
- if (numEqualFields >= helper.numOrderFields())
- outputProcessor.setown(new CEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- else
- outputProcessor.setown(new CSortedEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- }
- void CMergeJoinProcessor::finishCandidates()
- {
- if (outputProcessor)
- outputProcessor->afterProcessCandidates();
- assertex(hasCandidates());
- inputAllocator->releaseRow(candidateEqualityRow);
- candidateEqualityRow = NULL;
- }
- bool CMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- allInputsAreOuterInputs = true;
- ForEachItemIn(i, inputs)
- {
- CSteppedInputLookahead & cur = inputs.item(i);
- if (!cur.gatherConjunctions(collector))
- collector.addInput(cur);
- else
- {
- collector.addPseudoInput(cur);
- allInputsAreOuterInputs = false;
- }
- }
- collector.addJoin(*this);
- return true;
- }
- const void * CMergeJoinProcessor::nextInputRow()
- {
- if (!hasCandidates() && !findCandidates(NULL, 0))
- return NULL;
- for (;;)
- {
- const void * next = nextCandidate();
- if (next)
- return next;
- finishCandidates();
- //Abort early if externally optimized, and not proximity (since they may not have read all records for this equality)
- if ((numEqualFields == numExternalEqualFields) && candidatesExhaustEquality())
- return NULL;
- if (!findCandidates(NULL, 0))
- return NULL;
- }
- }
- const void * CMergeJoinProcessor::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //First check the next row from the candidates, it may be ok.
- if (hasCandidates())
- {
- unsigned compareFields = numFields < numEqualFields ? numFields : numEqualFields;
- //check whether the candidates could possibly return the match
- if (stepCompare->docompare(candidateEqualityRow, seek, compareFields) == 0)
- {
- const void * next = nextCandidateGE(seek, numFields, wasCompleteMatch, stepExtra);
- if (next)
- return next; // note must match equality to have been returned.
- }
- finishCandidates();
- }
- if (!findCandidates(seek, numFields))
- return NULL;
- return nextInputRow();
- }
- void CMergeJoinProcessor::resetEOF()
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).resetEOF();
- }
- void CMergeJoinProcessor::queryResetEOF()
- {
- resetEOF();
- }
- const void * CMergeJoinProcessor::nextRow()
- {
- if (conjunctionOptimizer)
- return conjunctionOptimizer->next();
- if (combineConjunctions)
- {
- if (numExternalEqualFields == 0)
- {
- if (createConjunctionOptimizer())
- return conjunctionOptimizer->next();
- }
- else
- combineConjunctions = false; // being used inside a conjunction optimizer => don't create another..
- }
- return nextInputRow();
- }
- const void * CMergeJoinProcessor::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- if (conjunctionOptimizer)
- return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
- if (combineConjunctions)
- {
- if (createConjunctionOptimizer())
- return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- return nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- void CMergeJoinProcessor::startRestrictedJoin(const void * equalityRow, unsigned numEqualityFields)
- {
- assertex(numExternalEqualFields == 0);
- numExternalEqualFields = numEqualityFields;
- eof = false;
- }
- void CMergeJoinProcessor::stopRestrictedJoin()
- {
- numExternalEqualFields = 0;
- if (hasCandidates())
- finishCandidates();
- //There are no more matches for this (outer) equality condition, so all active rows need to be thrown away.
- if (outputProcessor)
- outputProcessor->cleanupAllCandidates();
- }
- void CMergeJoinProcessor::setCandidateRow(const void * row, bool inputsMayBeEmpty, const bool * matched)
- {
- candidateEqualityRow = inputAllocator->linkRow(row);
- const void * restrictionRow = (numEqualFields == numExternalEqualFields) ? NULL : candidateEqualityRow;
- outputProcessor->beforeProcessCandidates(restrictionRow, inputsMayBeEmpty, matched);
- }
- void CMergeJoinProcessor::connectRemotePriorityInputs()
- {
- CIArrayOf<OrderedInput> orderedInputs;
- ForEachItemIn(i, inputs)
- {
- CSteppedInputLookahead & cur = inputs.item(i);
- if (!cur.hasPriority() || !cur.readsRowsRemotely())
- return;
- orderedInputs.append(*new OrderedInput(cur, i, false));
- }
- orderedInputs.sort(compareInitialInputOrder);
- associateRemoteInputs(orderedInputs, orderedInputs.ordinality());
- combineConjunctions = false;
- }
- //---------------------------------------------------------------------------
- CAndMergeJoinProcessor::CAndMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- }
- void CAndMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- connectRemotePriorityInputs();
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- }
- bool CAndMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- if (eof)
- return false;
- const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
- PreservedRow savedRow(inputAllocator);
- const void * equalValue;
- unsigned firstInput = 0;
- if (inputsMustMatchEquality && allInputsAreOuterInputs)
- {
- //special case - all inputs are already advanced to the correct place, so just start generating candidates
- //for nested conjunctions they may already be exausted though
- equalValue = orderedInputs.item(firstInput).next();
- if (!equalValue)
- {
- eof = true;
- return false;
- }
- }
- else
- {
- if (!seekValue)
- {
- numSeekFields = numEqualFields;
- seekValue = lowestSeekRow;
- }
- bool matchedCompletely = true;
- equalValue = orderedInputs.item(firstInput).next(seekValue, numSeekFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (!equalValue)
- {
- eof = true;
- return false;
- }
- unsigned matchCount = 0;
- clearMatches();
- if (matchedCompletely)
- {
- matched[firstInput] = true;
- matchCount++;
- }
- else
- {
- equalValue = orderedInputs.item(firstInput).consume();
- savedRow.setown(equalValue);
- }
- unsigned lastInput = firstInput;
- while (matchCount != numInputs)
- {
- unsigned nextInput = nextToMatch(lastInput);
- lastInput = nextInput;
- bool matchedCompletely = true;
- const void * nextRow = orderedInputs.item(nextInput).nextGE(equalValue, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (!nextRow)
- {
- eof = true;
- return false;
- }
- #ifdef CHECK_CONSISTENCY
- if (inputsMustMatchEquality)
- {
- if (equalCompare->docompare(nextRow, equalValue) != 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- else
- {
- if (equalCompare->docompare(nextRow, equalValue) < 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- }
- #endif
- if (!inputsMustMatchEquality)
- {
- if (!equalCompareEq->match(nextRow, equalValue))
- {
- //value didn't match => skip all the previously matched entries.
- for (unsigned i=0; i < numInputs; i++)
- {
- if (matched[i])
- {
- matched[i] = false;
- orderedInputs.item(i).skip();
- if (--matchCount == 0)
- break;
- }
- }
- if (!matchedCompletely)
- {
- //Need to preserve nextRow, otherwise it will be gone after we skip
- equalValue = orderedInputs.item(nextInput).consume();
- savedRow.setown(equalValue);
- }
- else
- equalValue = nextRow;
- }
- }
- if (matchedCompletely)
- {
- matched[nextInput] = true;
- matchCount++;
- }
- }
- }
- //Set up the mergeProcessor with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
- //with the candidate.
- //clone one of the rows
- setCandidateRow(equalValue, false, NULL);
- return true;
- }
- unsigned CAndMergeJoinProcessor::nextToMatch(unsigned lastInput) const
- {
- for (unsigned i=0; i < numInputs; i++)
- {
- //Don't seek on the last input again (it may have found a keyed match, but not matched the post filter)
- if ((i != lastInput) && !matched[i])
- return i;
- }
- throwUnexpected();
- }
- //---------------------------------------------------------------------------
- CAndLeftMergeJoinProcessor::CAndLeftMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- combineConjunctions = false; // No advantage using this as the base of a combined conjunction
- isLeftOnly = (flags & IHThorNWayMergeJoinArg::MJFkindmask) == IHThorNWayMergeJoinArg::MJFleftonly;
- //Left only with a not stepped comparison needs to be done as a left outer at the stepping level
- if (isLeftOnly && (helper.queryNonSteppedCompare() || helper.queryGlobalCompare()))
- isLeftOnly = false;
- }
- void CAndLeftMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- createTempSeekBuffer();
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- }
- bool CAndLeftMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- if (eof)
- return false;
- CSteppedInputLookahead & input0 = inputs.item(0);
- bool wasMatched = true;
- const void * lhs = input0.next(seekValue, numSeekFields, wasMatched, nonBufferedMatchStepExtra);
- assertex(wasMatched);
- if (!lhs)
- {
- eof = true;
- return false;
- }
- unsigned matchCount = 1;
- while (matchCount != numInputs)
- {
- bool matchedCompletely = true; // we don't care what the next rhs value is - as long as it can't match the left
- const void * rhs = orderedInputs.item(matchCount).nextGE(lhs, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
- if (rhs)
- {
- int c = equalCompare->docompare(rhs, lhs);
- if (c < 0)
- throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
- if (c == 0)
- {
- assertex(matchedCompletely);
- //previously the (matchCount+1) test wasn't here, so it aborted as soon as there was any match.
- if (isLeftOnly && (matchCount+1 == numInputs))
- {
- if (numEqualFields == numExternalEqualFields)
- {
- //I think this is worth doing here...
- //Skip input0 to a mismatch value, so the optimizer doesn't waste time reading extra equalities
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
- input0.skip(); // invalidates lhs
- if (calculatedNextSeek)
- {
- bool wasMatched = true;
- input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
- }
- eof = true;
- return false;
- }
- //Create the next join value if that is possible
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
- input0.skip(); // invalidates lhs
- bool wasMatched = true;
- if (calculatedNextSeek)
- lhs = input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
- else
- lhs = input0.next();
- if (!lhs)
- {
- eof = true;
- return false;
- }
- matchCount = 0; //incremented at tail of loop
- }
- }
- else
- break;
- }
- else
- break;
- matchCount++;
- }
- clearMatches();
- matched[0] = true;
- if (matchCount != numInputs)
- {
- //Failed to match completely => generate a match for just the left. Skip any matched rows so far and break out.
- for (unsigned i=1; i < matchCount; i++)
- orderedInputs.item(i).skip();
- matchCount = 1;
- }
- else
- {
- for (unsigned i=1; i < numInputs; i++)
- matched[i] = true;
- }
- //LEFT ONLY will only merge 1 stream, LEFT OUTER will merge as many as match LEFT
- setCandidateRow(lhs, true, matched);
- return true;
- }
- bool CAndLeftMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- CSteppedInputLookahead & cur = inputs.item(0);
- if (!cur.gatherConjunctions(collector))
- collector.addInput(cur);
- collector.addJoin(*this);
- return true;
- }
- //---------------------------------------------------------------------------
- void BestMatchManager::associate(unsigned input, const void * value)
- {
- unsigned curIndex = 0;
- while (curIndex != numEntries)
- {
- BestMatchItem & cur = matches.item(curIndex);
- int c = compare->docompare(value, cur.value);
- if (c <= 0)
- {
- if (c == 0)
- {
- //insert at the end of the duplicates
- curIndex += cur.duplicates;
- cur.duplicates++;
- }
- //Move a record at the end of the list to the correct position, ready for updating.
- if (curIndex != numEntries)
- matches.rotateR(curIndex, numEntries);
- break; // now go and modify record at position curIndex
- }
- curIndex += cur.duplicates;
- }
- assertex(matches.isItem(curIndex));
- BestMatchItem & inserted = matches.item(curIndex);
- inserted.duplicates = 1;
- inserted.value = value;
- inserted.input = input;
- numEntries++;
- return;
- }
- unsigned BestMatchManager::getValueOffset(unsigned idx) const
- {
- unsigned offset = 0;
- while (idx--)
- offset += matches.item(offset).duplicates;
- return offset;
- }
- void BestMatchManager::init(ICompare * _compare, unsigned numInputs)
- {
- compare = _compare;
- numEntries = 0;
- for (unsigned i=0; i < numInputs; i++)
- matches.append(* new BestMatchItem);
- }
- void BestMatchManager::kill()
- {
- matches.kill();
- }
- unsigned BestMatchManager::getInput(unsigned whichValue, unsigned inputIndex) const
- {
- return matches.item(getValueOffset(whichValue) + inputIndex).input;
- }
- unsigned BestMatchManager::getInput0(unsigned inputIndex) const
- {
- return matches.item(inputIndex).input;
- }
- unsigned BestMatchManager::numInputs(unsigned whichValue) const
- {
- return matches.item(getValueOffset(whichValue)).duplicates;
- }
- void BestMatchManager::remove(unsigned whichValue)
- {
- unsigned offset = getValueOffset(whichValue);
- unsigned duplicates = matches.item(offset).duplicates;
- matches.rotateLN(offset, numEntries-1, duplicates);
- numEntries -= duplicates;
- }
- const void * BestMatchManager::queryValue(unsigned whichValue) const
- {
- return matches.item(getValueOffset(whichValue)).value;
- }
- //---------------------------------------------------------------------------
- CMofNMergeJoinProcessor::CMofNMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
- {
- combineConjunctions = false;
- alive = NULL;
- candidateMask = NULL;
- minMatches = 0;
- maxMatches = 0;
- numActive = 0;
- }
- void CMofNMergeJoinProcessor::afterProcessing()
- {
- delete [] alive;
- delete [] candidateMask;
- alive = NULL;
- candidateMask = NULL;
- matches.kill();
- CMergeJoinProcessor::afterProcessing();
- }
- void CMofNMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- createEqualityJoinProcessor();
- else
- createMerger();
- minMatches = helper.getMinMatches();
- maxMatches = helper.getMaxMatches();
- if (minMatches == 0)
- throw MakeStringException(99, "Need a non-zero minimum number of matches");
- alive = new bool [numInputs];
- candidateMask = new bool [numInputs];
- for (unsigned i= 0; i < numInputs; i++)
- alive[i] = true;
- numActive = numInputs;
- matches.init(equalCompare, numInputs);
- }
- bool CMofNMergeJoinProcessor::findCandidates(const void * originalSeekValue, unsigned numOriginalSeekFields)
- {
- if (numActive < minMatches)
- return false;
-
- unsigned numFreeToMismatch = numActive - minMatches;
- const void * seekValue = originalSeekValue;
- unsigned numSeekFields = numOriginalSeekFields;
- //This should be true, because after candidates are matched their values are removed.
- assertex(matches.numInputs() <= numFreeToMismatch); //
- //MORE: This needs rewriting, so that mismatches are handled coorectly. In particular,
- while (matches.numInputs() < numActive)
- {
- unsigned nextInput = nextToMatch();
- bool matchedCompletely = true;
- // MORE: This needs rewriting, so that mismatches are handled coorectly. In particular, the matches need to retain information about whether
- // they matched fully, since that will optimize where could be sought next.
- const void * value = inputs.item(nextInput).next(seekValue, numSeekFields, matchedCompletely, nonBufferedMatchStepExtra);
- //NOTE: matchedCompletely is currently always true. More work is needed if not true.
- assertex(matchedCompletely);
- if (value)
- {
- if (matchedCompletely)
- {
- matched[nextInput] = true;
- matches.associate(nextInput, value);
- }
- }
- else
- {
- alive[nextInput] = false;
- numActive--;
- numFreeToMismatch--;
- if (numActive < minMatches)
- return false;
- }
- unsigned matchCount = matches.numInputs();
- if (matchCount > numFreeToMismatch)
- {
- unsigned numMatch0 = matches.numInputs0();
- if ((matchCount - numMatch0 > numFreeToMismatch) || (numMatch0 > maxMatches))
- {
- //clear seekValue, because seek value won't be valid any more after the skips - may be updated later.
- seekValue = originalSeekValue;
- numSeekFields = numOriginalSeekFields;
- //No way that the first element is going to match, so remove all inputs associated with it.
- for (unsigned i= 0; i < numMatch0; i++)
- {
- unsigned input = matches.getInput0(i);
- inputs.item(input).skip();
- matched[input] = false;
- }
- matches.remove(0);
- }
- //Lowest element now provides the best seek position.
- if (matches.numInputs() > numFreeToMismatch)
- {
- seekValue = matches.queryValue(0);
- numSeekFields = numEqualFields;
- }
- }
- }
- //matches(0) contains the next match set, set a set of flags indicating which inputs to use
- unsigned numMatches = matches.numInputs0();
- for (unsigned i1=0; i1< numInputs; i1++)
- candidateMask[i1] = false;
- for (unsigned i2=0; i2 < numMatches; i2++)
- candidateMask[matches.getInput0(i2)] = true;
- setCandidateRow(matches.queryValue(0), true, candidateMask);
- //Now cleanup housekeeping, so that findCandidates() is ready to find the next block
- for (unsigned i3=0; i3 < numInputs; i3++)
- if (candidateMask[i3])
- matched[i3] = false;
- matches.remove(0);
- return true;
- }
- bool CMofNMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
- {
- //MORE: We may need to create pseudo inputs in order to process these optimially.
- return false;
- }
- unsigned CMofNMergeJoinProcessor::nextToMatch() const
- {
- for (unsigned i= 0; i < numInputs; i++)
- {
- if (alive[i] && !matched[i])
- return i;
- }
- throwUnexpected();
- }
- //---------------------------------------------------------------------------
- /*
- NOTES on the distances... this is far from simple once you get arbitrary trees involved.
- given a join expression right.x between left.x - a and left.x + b i.e. up to a Before and b after
- a is maxRightBeforeLeft
- b is maxLeftBeforeRight
- We define a function D(x,y) which is the maximum value which can be deducted from row a, to provide a valid value for row b
- Given a tree of join expressions, we can calculate a distance function between any pair of inputs.
- J1(a,b,c) = D(i,i+1)=4, D(i+1,i) = 10
- J2(d, e) = D(i,i+1)=-1, D(i+1,i) = 12
- J3(J1, J2) = D(i,i+1)=0 D(i+1,i) = 5
- =>
- For each join we define
- D(i, lowest) - maximum value to deduct from row i to obtain the lowest
- D(highest, i) - maximum value to deduct from highest to obtain row i
- by definition these must both be >= 0, for a simple input they are both 0
- A join's extend is given by
- D(highest,lowest) = max(D(highest,i)+D(i,lowest))
- We're only interested in the maximum values, which are obtained by the maximum distances between the elements. The lowest and highest memebers
- of the group are going to be the ends. So we use the maximum of those distances to work out D(i,low) and D(high, i), being careful to only use
- the range if it is valid (e.g., the end must be possible to be the highest/lowest)
- D(a,b) = 4, D(b,a) = 10
- D(b,c) = 4, D(c,b) = 10
- D(a,c) = 8, D(c,a) = 20
- D(d,e) = -1, D(e,d) = 12
- D(a, lowest) = 8 D(highest, a) = 20
- D(b, lowest) = 10 D(highest, b) = 10
- D(c, lowest) = 20 D(highest, c) = 8
- D(highest, lowest) = 28
- Then assuming the left is the highest and the right is the lowest we have
- D(a,e) = DJ1(a, lowest) + DJ3(i,i+1) + DJ2(highest, e)
- = 8 + 0 + 0
- For >2 terms, you also need to take into account the size of a term given by D(highest,lowest)
- */
- inline unsigned __int64 adjustRangeValue(unsigned __int64 rangeValue, __int64 delta)
- {
- if ((delta >= 0) || (rangeValue > (unsigned __int64)-delta))
- return rangeValue + delta;
- return 0;
- }
- //---------------------------------------------------------------------------
- //This class is created for each each input of a nary-join, it maintains a queue of potential records.
- CNaryJoinLookaheadQueue::CNaryJoinLookaheadQueue(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookahead * _input, CNaryJoinLookaheadQueue * _left, const void * * _activeRowPtr) : helper(_helper), rows(_inputAllocator), unmatchedRows(_inputAllocator)
- {
- equalCompareEq = helper.queryEqualCompareEq();
- nonSteppedCompareEq = helper.queryNonSteppedCompare();
- numEqualFields = helper.numEqualFields();
- stepCompare = helper.querySteppingMeta()->queryCompare();
- input.set(_input);
- activeRowPtr = _activeRowPtr;
- left = _left;
- equalityRow = NULL;
- curRow = 0;
- maxRow = 0;
- numSkipped = 0;
- done = true;
- }
- bool CNaryJoinLookaheadQueue::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext)
- {
- done = false;
- equalityRow = _equalityRow;
- rows.kill();
- numSkipped = 0;
- if (matchedLeft)
- matchedLeft->reset();
- // next is guaranteed to match the equality condition for AND, proximity but not for m of n/left outer...
- if (!needToVerifyNext || nextUnqueued())
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- void CNaryJoinLookaheadQueue::clearPending()
- {
- rows.kill();
- }
- bool CNaryJoinLookaheadQueue::ensureNonEmpty()
- {
- if (rows.ordinality())
- return true;
- if (nextUnqueued())
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- bool CNaryJoinLookaheadQueue::firstSelection()
- {
- if (!left)
- {
- assertex(maxRow != 0);
- curRow = 0;
- *activeRowPtr = rows.item(curRow);
- return true;
- }
-
- if (!left->firstSelection())
- return false;
- return findValidSelection(0);
- }
- bool CNaryJoinLookaheadQueue::findValidSelection(unsigned initialRow)
- {
- assertex(left);
- const unsigned max = maxRow;
- unsigned candidateRow = initialRow;
- for (;;)
- {
- const void * leftRow = left->activeRow();
- while (candidateRow < max)
- {
- const void * rightRow = rows.item(candidateRow);
- if (!nonSteppedCompareEq || nonSteppedCompareEq->match(leftRow, rightRow))
- {
- curRow = candidateRow;
- *activeRowPtr = rightRow;
- return true;
- }
- candidateRow++;
- }
- if (!left->nextSelection())
- return false;
- candidateRow = 0;
- }
- }
- const void * CNaryJoinLookaheadQueue::nextUnqueued()
- {
- if (equalityRow)
- {
- bool matches = true;
- const void * next = input->nextGE(equalityRow, numEqualFields, matches, nonBufferedMismatchStepExtra);
- if (next && matches && equalCompareEq->match(next, equalityRow))
- return next;
- return NULL;
- }
- else
- return input->next();
- }
- bool CNaryJoinLookaheadQueue::nextSelection()
- {
- if (left)
- return findValidSelection(curRow+1);
- curRow++;
- if (curRow >= maxRow)
- return false;
- *activeRowPtr = rows.item(curRow);
- return true;
- }
- bool CNaryJoinLookaheadQueue::ensureCandidateExists(unsigned __int64 minDistance, unsigned __int64 maxDistance)
- {
- for (;;)
- {
- const void * next = rows.head();
- if (!next)
- break;
- unsigned __int64 distance = helper.extractRangeValue(next);
- if (distance >= minDistance)
- {
- assertex(distance <= maxDistance);
- return true;
- }
- rows.skip();
- }
- for (;;)
- {
- const void * next = nextUnqueued();
- if (!next)
- return false;
- unsigned __int64 distance = helper.extractRangeValue(next);
- if (distance >= minDistance)
- {
- if (distance <= maxDistance)
- {
- consumeNextInput();
- return true;
- }
- return false;
- }
- input->skip();
- }
- }
- bool CNaryJoinLookaheadQueue::checkExistsGE(const void * seek, unsigned numFields)
- {
- for (;;)
- {
- const void * next = rows.head();
- if (!next)
- return false;
- if (stepCompare->docompare(next, seek, numFields) >= 0)
- return true;
- rows.skip();
- }
- }
- unsigned CNaryJoinLookaheadQueue::readAheadTo(unsigned __int64 maxDistance, bool canConsumeBeyondMax)
- {
- const void * tail = rows.tail();
- if (tail && helper.extractRangeValue(tail) > maxDistance)
- {
- unsigned limit = rows.ordinality() - 1;
- //Already have all the records, return how many...
- while (limit > 0)
- {
- const void * prev = rows.item(limit-1);
- if (helper.extractRangeValue(prev) <= maxDistance)
- return limit;
- --limit;
- }
- return 0;
- }
- while (!done)
- {
- const void * next = nextUnqueued();
- if (!next)
- {
- done = true;
- break;
- }
- //This is a bit nasty. We need to consume the next value to ensure that the lowest spotter always has the next valid
- //but it means we might be reading this input for too long
- if (helper.extractRangeValue(next) > maxDistance)
- {
- if (!canConsumeBeyondMax)
- break;
- consumeNextInput();
- return rows.ordinality()-1;
- }
- consumeNextInput();
- }
- return rows.ordinality();
- }
- void CNaryJoinLookaheadQueue::readCandidateAll()
- {
- for (;;)
- {
- const void * next = nextUnqueued();
- if (!next)
- return;
- consumeNextInput();
- }
- }
- void CNaryJoinLookaheadQueue::skip()
- {
- if (matchedLeft && !matchedLeft->test(numSkipped))
- {
- unmatchedRows.enqueue(rows.dequeue());
- }
- else
- rows.skip();
- numSkipped++;
- }
- bool CNaryJoinLookaheadQueue::flushUnmatched()
- {
- while (rows.ordinality())
- skip();
- return unmatchedRows.ordinality() != 0;
- }
- //---------------------------------------------------------------------------
- CProximityJoinProcessor::CProximityJoinProcessor(IHThorNWayMergeJoinArg & _helper) :
- CMergeJoinProcessor(_helper)
- {
- maxRightBeforeLeft = 0;
- maxLeftBeforeRight = 0;
- }
- void CProximityJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
- {
- CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
- createTempSeekBuffer();
- //Have to delay creating the actual join joinProcessor because maxRightBeforeLeft() etc. can be onStart dependant.
- maxRightBeforeLeft = helper.maxRightBeforeLeft();
- maxLeftBeforeRight = helper.maxLeftBeforeRight();
- //Handle phrases using a different class i) because the general scheme doesn't quite work and ii) for efficiency
- if (flags & IHThorNWayMergeJoinArg::MJFtransform)
- {
- if ((maxRightBeforeLeft < 0 || maxLeftBeforeRight < 0))
- outputProcessor.setown(new CAnchoredRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- else
- outputProcessor.setown(new CProximityRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
- }
- else
- createMerger();
- }
- bool CProximityJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
- {
- unsigned firstInput = 0;//searchOrder.item(0);
- bool wasCompleteMatch = true;
- if (eof || !inputs.item(firstInput).next(seekValue, numSeekFields, wasCompleteMatch, nonBufferedMatchStepExtra))
- return false;
- unsigned matchCount = 1;
- clearMatches();
- matched[firstInput] = true;
- const unsigned numJoinFields = numEqualFields + 1;
- const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
- while (matchCount != numInputs)
- {
- unsigned nextInput = nextToMatch();
- unsigned baseInput = getBestToSeekFrom(nextInput);
- RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
- helper.adjustRangeValue(rowBuilder, inputs.item(baseInput).next(), -maxDistanceBefore(baseInput, nextInput));
- bool wasCompleteMatch = true;
- //MORE: Would it help to allow mismatches? I would have thought so, but there was a previous comment sayimg "I don't think so because of the range calculation"
- const void * nextRow = inputs.item(nextInput).nextGE(tempSeekBuffer, numJoinFields, wasCompleteMatch, nonBufferedMatchStepExtra);
- assertex(wasCompleteMatch);
- if (!nextRow)
- {
- eof = true;
- return false;
- }
- if (inputsMustMatchEquality || equalityComponentMatches(nextRow, tempSeekBuffer))
- {
- //Now check if this new record causes other records to be too far away
- unsigned __int64 thisRangeValue = helper.extractRangeValue(nextRow);
- for (unsigned i=0; i<numInputs; i++)
- {
- if (matched[i])
- {
- unsigned __int64 seekRangeValue = adjustRangeValue(thisRangeValue, -maxDistanceBefore(nextInput, i));
- if (getRangeValue(i) < seekRangeValue)
- {
- inputs.item(i).skip();
- matched[i] = false;
- if (--matchCount == 0)
- break;
- }
- }
- }
- }
- else
- {
- for (unsigned i=0; i<numInputs; i++)
- {
- if (matched[i])
- {
- inputs.item(i).skip();
- matched[i] = false;
- matchCount--;
- }
- }
- }
- matched[nextInput] = true;
- matchCount++;
- }
- setCandidateRow(inputs.item(0).next(), false, NULL);
- return true;
- }
- __int64 CProximityJoinProcessor::maxDistanceBefore(unsigned fixedInput, unsigned searchInput) const
- {
- assertex(outputProcessor); // sanity check to ensure this isn't called before maxXBeforeY are set up
- if (searchInput < fixedInput)
- return maxLeftBeforeRight * (fixedInput - searchInput);
- else
- return maxRightBeforeLeft * (searchInput - fixedInput);
- }
- unsigned CProximityJoinProcessor::nextToMatch() const
- {
- for (unsigned i=0; i < numInputs; i++)
- {
- unsigned next = i;//searchOrder.item(i);
- if (!matched[next])
- return next;
- }
- throwUnexpected();
- }
- //Choose the input to seek from that restricts the input being sought the most.
- unsigned CProximityJoinProcessor::getBestToSeekFrom(unsigned seekInput) const
- {
- unsigned __int64 bestRangeValue = 0;
- unsigned best = NotFound;
- //MORE: This can be optimized!
- for (unsigned i=0; i < numInputs; i++)
- {
- if (matched[i])
- {
- //Calculate the value of the distance
- __int64 distanceBefore = maxDistanceBefore(i, seekInput);
- unsigned __int64 rangeValue = adjustRangeValue(getRangeValue(i), -distanceBefore);
- if (rangeValue >= bestRangeValue)
- {
- bestRangeValue = rangeValue;
- best = i;
- }
- }
- }
- assertex(best != NotFound);
- return best;
- }
- //---------------------------------------------------------------------------
- //NULL passed to CSteppedInputLookahead first parameter means nextGE() must be overridden
- CJoinGenerator::CJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- helper(_helper), inputAllocator(_inputAllocator), outputAllocator(_outputAllocator)
- {
- state = JSdone;
- unsigned flags = helper.getJoinFlags();
- stepCompare = helper.querySteppingMeta()->queryCompare();
- globalCompare = NULL;
- if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
- globalCompare = helper.queryGlobalCompare();
- unsigned numInputs = _inputs.ordinality();
- rows = new const void * [numInputs];
- CNaryJoinLookaheadQueue * prev = NULL;
- ForEachItemIn(i, _inputs)
- {
- CNaryJoinLookaheadQueue * queue = new CNaryJoinLookaheadQueue(inputAllocator, helper, &_inputs.item(i), prev, rows + i);
- inputs.append(*queue);
- prev = queue;
- }
- isSpecialLeftJoin = false;
- numActiveInputs = numInputs;
- lastActiveInput = numInputs ? &inputs.tos() : NULL;
- joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
- switch (joinKind)
- {
- case IHThorNWayMergeJoinArg::MJFleftonly:
- case IHThorNWayMergeJoinArg::MJFleftouter:
- if (helper.queryNonSteppedCompare() || globalCompare)
- {
- isSpecialLeftJoin = true;
- if (numInputs)
- inputs.item(0).trackUnmatched();
- }
- break;
- case IHThorNWayMergeJoinArg::MJFmofn:
- if (helper.queryNonSteppedCompare() || globalCompare)
- throw MakeStringException(99, "MOFN JOIN with non stepped condition not yet supported");
- break;
- }
- }
- CJoinGenerator::~CJoinGenerator()
- {
- delete [] rows;
- }
- void CJoinGenerator::beforeProcessCandidates(const void * candidateRow, bool needToVerifyNext, const bool * matched)
- {
- if (needToVerifyNext)
- {
- numActiveInputs = 0;
- CNaryJoinLookaheadQueue * prev = NULL;
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & cur = inputs.item(i);
- if (cur.beforeProcessCandidates(candidateRow, needToVerifyNext))
- {
- cur.updateContext(prev, rows + numActiveInputs);
- prev = &cur;
- numActiveInputs++;
- }
- }
- lastActiveInput = prev;
- }
- else
- {
- ForEachItemIn(i, inputs)
- inputs.item(i).beforeProcessCandidates(candidateRow, needToVerifyNext);
- }
- state = JSfirst;
- }
- void CJoinGenerator::cleanupAllCandidates()
- {
- //Remove all pending candidates - only called if outer join optimization is enabled
- //afterProcessCandidates() will already have been called.
- ForEachItemIn(i, inputs)
- inputs.item(i).clearPending();
- }
- void CJoinGenerator::afterProcessCandidates()
- {
- }
- const void * CJoinGenerator::nextOutputRow()
- {
- RtlDynamicRowBuilder rowBuilder(outputAllocator, false);
- for (;;)
- {
- if (isSpecialLeftJoin)
- {
- CNaryJoinLookaheadQueue & left = inputs.item(0);
- for (;;)
- {
- const void * unmatchedLeft = left.nextUnmatched();
- if (!unmatchedLeft)
- break;
- rowBuilder.ensureRow();
- size32_t retSize = helper.transform(rowBuilder, 1, &unmatchedLeft);
- left.skipUnmatched();
- if (retSize)
- return rowBuilder.finalizeRowClear(retSize);
- }
- }
- switch (state)
- {
- case JSdone:
- if (isSpecialLeftJoin)
- {
- CNaryJoinLookaheadQueue & left = inputs.item(0);
- left.readCandidateAll();
- if (left.flushUnmatched())
- break; // round again
- }
- return NULL;
- case JShascandidate:
- {
- state = JSnextcandidate;
- //If is left only join, and has an additional equality criteria, then ignore matches.
- //If left only, and no extra equality - or only one dataset has matches, then all matches are real left only matches
- if (isSpecialLeftJoin && (joinKind == IHThorNWayMergeJoinArg::MJFleftonly) && (numActiveInputs != 1))
- break;
- rowBuilder.ensureRow();
- size32_t retSize = helper.transform(rowBuilder, numActiveInputs, rows);
- if (retSize)
- return rowBuilder.finalizeRowClear(retSize);
- break;
- }
- case JSnextcandidate:
- if (nextCandidate())
- state = JShascandidate;
- else
- {
- if (state != JSdone)
- state = JSgathercandidates;
- }
- break;
- case JSfirst:
- case JSgathercandidates:
- if (gatherNextCandidates())
- state = JShascandidate;
- else
- state = JSdone;
- break;
- default:
- throwUnexpected();
- }
- }
- }
- const void * CJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //A stupid version. We could possibly skip on the lowest value if we knew the fields were assigned from the lowest value in the input
- //which would potentially save a lot of transforms.
- //would also probably need the input to match the output.
- for (;;)
- {
- const void * next = nextOutputRow();
- if (!next || stepCompare->docompare(next, seek, numFields) >= 0)
- return next;
- outputAllocator->releaseRow(next);
- }
- }
- bool CJoinGenerator::firstSelection()
- {
- if (lastActiveInput->firstSelection())
- {
- if (globalCompare && !globalCompare->match(numActiveInputs, rows))
- return nextSelection();
- if (isSpecialLeftJoin)
- inputs.item(0).noteMatched();
- return true;
- }
- return false;
- }
- bool CJoinGenerator::nextSelection()
- {
- while (lastActiveInput->nextSelection())
- {
- if (!globalCompare || globalCompare->match(numActiveInputs, rows))
- {
- if (isSpecialLeftJoin)
- inputs.item(0).noteMatched();
- return true;
- }
- }
- return false;
- }
- //---------------------------------------------------------------------------
- CEqualityJoinGenerator::CEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- lowestInput = NULL;
- }
- void CEqualityJoinGenerator::afterProcessCandidates()
- {
- lowestInput = NULL;
- CJoinGenerator::afterProcessCandidates();
- }
- bool CEqualityJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- selectNextLowestInput();
- return false;
- }
- /*
- o Walk the input which is guaranteed to be the lowest
- o Once that is done throw away that record, and choose the next.
- */
- bool CEqualityJoinGenerator::doGatherNextCandidates()
- {
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (&curInput != lowestInput)
- curInput.setCandidateAll();
- else
- curInput.setCandidateLowest();
- }
- return firstSelection();
- }
- bool CEqualityJoinGenerator::gatherNextCandidates()
- {
- if (state == JSfirst)
- {
- prefetchAllCandidates();
- selectLowestInput();
- }
- else if (lowestInput->empty())
- return false;
- for (;;)
- {
- if (doGatherNextCandidates())
- return true;
- if (!selectNextLowestInput())
- return false;
- }
- }
- void CEqualityJoinGenerator::prefetchAllCandidates()
- {
- //could be done in parallel, but
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(i);
- curInput.readCandidateAll();
- }
- }
- void CEqualityJoinGenerator::selectLowestInput()
- {
- ForEachItemIn(i, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(i);
- if (!curInput.empty())
- {
- lowestInput = &curInput;
- return;
- }
- }
- throwUnexpected();
- }
- bool CEqualityJoinGenerator::selectNextLowestInput()
- {
- lowestInput->skip();
- if (lowestInput->empty())
- {
- state = JSdone;
- return false;
- }
- return true;
- }
- //---------------------------------------------------------------------------
- CSortedEqualityJoinGenerator::CSortedEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CEqualityJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
- {
- lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
- lowestSpotter.initInputs();
- }
- CSortedEqualityJoinGenerator::~CSortedEqualityJoinGenerator()
- {
- lowestSpotter.cleanup();
- }
- void CSortedEqualityJoinGenerator::afterProcessCandidates()
- {
- lowestSpotter.reset();
- CEqualityJoinGenerator::afterProcessCandidates();
- }
- void CSortedEqualityJoinGenerator::selectLowestInput()
- {
- unsigned iLowest = lowestSpotter.queryNextInput();
- assertex(iLowest != NotFound);
- lowestInput = &inputs.item(iLowest);
- }
- bool CSortedEqualityJoinGenerator::selectNextLowestInput()
- {
- lowestSpotter.skipRow();
- if (lowestInput->empty())
- {
- state = JSdone;
- return false;
- }
- CSortedEqualityJoinGenerator::selectLowestInput();
- return true;
- }
- //---------------------------------------------------------------------------
- CRangeJoinGenerator::CRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- maxRightBeforeLeft = helper.maxRightBeforeLeft();
- maxLeftBeforeRight = helper.maxLeftBeforeRight();
- }
- //---------------------------------------------------------------------------
- CAnchoredRangeJoinGenerator::CAnchoredRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
- {
- iLowest = maxRightBeforeLeft < 0 ? 0 : inputs.ordinality()-1;
- lowestInput = &inputs.item(iLowest);
- }
- bool CAnchoredRangeJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- lowestInput->skip();
- return false;
- }
- /*
- o Walk the input which is guaranteed to be the lowest
- o Once that is done throw away that record, and choose the next.
- */
- bool CAnchoredRangeJoinGenerator::doGatherNextCandidates()
- {
- const void * lowestRow = lowestInput->next();
- if (!lowestRow)
- return false;
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- __int64 maxLowestBeforeCur = maxDistanceAfterLowest(iInput);
- assertex(maxLowestBeforeCur > 0);
- unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
- if (!curInput.setCandidateRange(maxDistance, false))
- return false;
- }
- }
- lowestInput->setCandidateLowest();
- return firstSelection();
- }
- const void * CAnchoredRangeJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
- {
- //Note: Skip any lower values that are less than seek value, but don't read any more
- if (!lowestInput->checkExistsGE(seek, numFields))
- return NULL;
- return CRangeJoinGenerator::nextOutputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
- }
- bool CAnchoredRangeJoinGenerator::nextMatchesAnyConsumed()
- {
- const void * lowestRow = lowestInput->next();
- bool consumePending = false;
- if (!lowestRow)
- {
- lowestRow = lowestInput->nextUnqueued();
- if (!lowestRow)
- return false;
- consumePending = true;
- }
- //Throw any non-matching rows away, and return true if there are no other rows left.
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- //note: maxRightBeforeLeft is -minRightAfterLeft
- __int64 minCurAfterLowest;
- if (iInput < iLowest)
- minCurAfterLowest = (-maxLeftBeforeRight) * (iLowest - iInput);
- else
- minCurAfterLowest = (-maxRightBeforeLeft) * (iInput - iLowest);
- assertex(minCurAfterLowest >= 0);
- if (!curInput.ensureCandidateExists(lowestDistance+minCurAfterLowest, lowestDistance + maxDistanceAfterLowest(iInput)))
- return false;
- }
- }
- //A potential match, so consume the potential start word and try again
- if (consumePending)
- lowestInput->consumeNextInput();
- return true;
- }
- bool CAnchoredRangeJoinGenerator::gatherNextCandidates()
- {
- for (;;)
- {
- if (!nextMatchesAnyConsumed())
- return false;
- if (doGatherNextCandidates())
- return true;
- lowestInput->skip();
- }
- }
- //---------------------------------------------------------------------------
- CProximityRangeJoinGenerator::CProximityRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
- CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
- {
- lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
- lowestSpotter.initInputs();
- }
- CProximityRangeJoinGenerator::~CProximityRangeJoinGenerator()
- {
- lowestSpotter.cleanup();
- }
- void CProximityRangeJoinGenerator::afterProcessCandidates()
- {
- lowestSpotter.reset();
- CRangeJoinGenerator::afterProcessCandidates();
- }
- bool CProximityRangeJoinGenerator::nextCandidate()
- {
- if (nextSelection())
- return true;
- if (!lowestSpotter.skipNextLowest())
- state = JSdone;
- return false;
- }
- /*
- First version.....
- o Walk the input datasets in the order lowest first.
- o Perform the cross product of that record with all others that could possibly match.
- o Once that is done throw away that record, and choose the next.
- o Abort as soon as any of the inputs contains no records within potential range.
- */
- bool CProximityRangeJoinGenerator::gatherNextCandidates(unsigned iLowest)
- {
- CNaryJoinLookaheadQueue & lowestInput = inputs.item(iLowest);
- const void * lowestRow = lowestInput.next();
- unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
- ForEachItemIn(iInput, inputs)
- {
- CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
- if (iInput != iLowest)
- {
- __int64 maxLowestBeforeCur;
- if (iInput < iLowest)
- maxLowestBeforeCur = maxRightBeforeLeft * (iLowest - iInput);
- else
- maxLowestBeforeCur = maxLeftBeforeRight * (iInput - iLowest);
- assertex(maxLowestBeforeCur >= 0); // should have created an anchored variant if not true
- // maxLowestBeforeCur = maxCurAfterLowest
- unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
- if (!curInput.setCandidateRange(maxDistance, true))
- return false;
- }
- else
- curInput.setCandidateLowest();
- }
- return firstSelection();
- }
- bool CProximityRangeJoinGenerator::gatherNextCandidates()
- {
- for (;;)
- {
- unsigned iLowest = lowestSpotter.queryNextInput();
- assertex(iLowest != NotFound);
- if (gatherNextCandidates(iLowest))
- return true;
- //It would be really nice to break out early if there were no more potential matches, but even if there
- //is only one matching stream we need to keep walking the consumed records, because the later records
- //may pull in the relevant related records, and we can't sensibly put back the consumed records
- if (!lowestSpotter.skipNextLowest())
- {
- //No more records within this document => can't ever match
- state = JSdone;
- return false;
- }
- }
- }
|