thorstep.cpp 84 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jlog.hpp"
  15. #include "jqueue.tpp"
  16. #include "jexcept.hpp"
  17. #include "thorcommon.hpp"
  18. #include "thorstep.ipp"
  19. #include "thorstep2.ipp"
  20. #include "roxiemem.hpp"
  21. #ifdef _DEBUG
  22. #define CHECK_CONSISTENCY
  23. #endif
  24. using roxiemem::OwnedConstRoxieRow;
  25. const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
  26. const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  27. const static SmartStepExtra nonSeekStepExtra(SSEFreturnUnbufferedMatches, NULL); // if doing next() instead of nextGE()
  28. const static SmartStepExtra nonBufferedMatchStepExtra(SSEFreturnUnbufferedMatches, NULL);
  29. const static SmartStepExtra nonBufferedMismatchStepExtra(SSEFreturnMismatches, NULL);
  30. bool stepFieldsMatch(const CFieldOffsetSize * leftFields, unsigned leftIndex, const CFieldOffsetSize * rightFields, unsigned rightIndex)
  31. {
  32. const CFieldOffsetSize * leftField = leftFields + leftIndex;
  33. const CFieldOffsetSize * rightField = rightFields + rightIndex;
  34. return (leftField->offset == rightField->offset) && (leftField->size == rightField->size);
  35. }
  36. bool stepFieldsMatch(ISteppingMeta * leftMeta, unsigned leftIndex, ISteppingMeta * rightMeta, unsigned rightIndex)
  37. {
  38. if ((leftIndex >= leftMeta->getNumFields()) || (rightIndex >= rightMeta->getNumFields()))
  39. return false;
  40. return stepFieldsMatch(leftMeta->queryFields(), leftIndex, rightMeta->queryFields(), rightIndex);
  41. }
  42. unsigned getNumMatchingFields(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  43. {
  44. unsigned numStepableFields = 0;
  45. if (inputStepping && callerStepping)
  46. {
  47. //Determine where the stepping fields overlap, and work out the extent.
  48. unsigned inputCount = inputStepping->getNumFields();
  49. for (unsigned i=0; i < inputCount; i++)
  50. {
  51. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  52. break;
  53. numStepableFields++;
  54. }
  55. }
  56. return numStepableFields;
  57. }
  58. void verifySteppingCompatible(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  59. {
  60. if (inputStepping && callerStepping)
  61. {
  62. //Determine where the stepping fields overlap, and work out the extent.
  63. unsigned parentCount = callerStepping->getNumFields();
  64. unsigned inputCount = inputStepping->getNumFields();
  65. unsigned max = parentCount < inputCount ? parentCount : inputCount;
  66. for (unsigned i=0; i < max; i++)
  67. {
  68. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  69. throw MakeStringException(999, "Stepping field %d, input and join do not match", i);
  70. }
  71. }
  72. }
  73. //---------------------------------------------------------------------------
  74. void CSteppingMeta::intersect(IInputSteppingMeta * inputMeta)
  75. {
  76. if (inputMeta)
  77. {
  78. unsigned maxFields = inputMeta->getNumFields();
  79. if (maxFields > numFields)
  80. maxFields = numFields;
  81. for (unsigned curField = 0; curField < maxFields; curField++)
  82. {
  83. if (!stepFieldsMatch(inputMeta->queryFields(), curField, fields, curField))
  84. {
  85. numFields = curField;
  86. break;
  87. }
  88. }
  89. if (inputMeta->hasPostFilter())
  90. postFiltered = true;
  91. if (inputMeta->isDistributed())
  92. setDistributed();
  93. unsigned inputFlags = inputMeta->getSteppedFlags();
  94. double inputPriority = inputMeta->getPriority();
  95. if (hadStepExtra)
  96. {
  97. stepFlags &= inputFlags;
  98. if (priority != inputPriority)
  99. stepFlags &= ~SSFhaspriority;
  100. }
  101. else
  102. {
  103. hadStepExtra = true;
  104. stepFlags = inputFlags;
  105. priority = inputPriority;
  106. }
  107. }
  108. else
  109. numFields = 0;
  110. }
  111. //---------------------------------------------------------------------------
  112. CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSteppingMeta * _inputStepping, IEngineRowAllocator * _rowAllocator, IRangeCompare * _compare, bool _paranoid)
  113. : input(_input), compare(_compare)
  114. {
  115. maxFields = compare ? compare->maxFields() : 0;
  116. readAheadRow = NULL;
  117. readAheadRowIsExactMatch = true;
  118. stepFlagsMask = 0;
  119. stepFlagsValue = 0;
  120. paranoid = _paranoid;
  121. previousReadAheadRow = NULL;
  122. rowAllocator.set(_rowAllocator);
  123. inputStepping = _inputStepping;
  124. numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
  125. isPostFiltered = inputStepping ? inputStepping->hasPostFilter() : false;
  126. setRestriction(NULL, 0);
  127. lowestFrequencyInput = NULL;
  128. }
  129. CSteppedInputLookahead::~CSteppedInputLookahead()
  130. {
  131. if (previousReadAheadRow)
  132. rowAllocator->releaseRow(previousReadAheadRow);
  133. if (readAheadRow)
  134. rowAllocator->releaseRow(readAheadRow);
  135. }
  136. const void * CSteppedInputLookahead::nextInputRow()
  137. {
  138. if (readAheadRows.ordinality())
  139. return readAheadRows.dequeue();
  140. return input->nextInputRow();
  141. }
  142. const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  143. {
  144. while (readAheadRows.ordinality())
  145. {
  146. OwnedConstRoxieRow next = readAheadRows.dequeue();
  147. if (compare->docompare(next, seek, numFields) >= 0)
  148. {
  149. assertex(wasCompleteMatch);
  150. return (void *)next.getClear();
  151. }
  152. }
  153. return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  154. }
  155. void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
  156. {
  157. const void * lastSeekRow = NULL;
  158. //Remove any rows from the seek list that occur before the new seek row
  159. while (seekRows.ordinality())
  160. {
  161. const void * next = seekRows.head();
  162. if (compare->docompare(next, seek, numFields) >= 0)
  163. {
  164. //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
  165. seek = seekRows.tail();
  166. lastSeekRow = seek;
  167. break;
  168. }
  169. rowAllocator->releaseRow(seekRows.dequeue());
  170. }
  171. //Could the current readahead row be part of the seek set.
  172. if (readAheadRow && compare->docompare(readAheadRow, seek, numFields) >= 0)
  173. {
  174. //Check not already added - could conceivably happen after rows are read directly beyond the matching seeks.
  175. if (!lastSeekRow || compare->docompare(readAheadRow, lastSeekRow, numFields) > 0)
  176. {
  177. seekRows.enqueue(rowAllocator->linkRow(readAheadRow));
  178. lastSeekRow = readAheadRow;
  179. seek = readAheadRow;
  180. }
  181. }
  182. //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
  183. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  184. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  185. seekRows.ensure(maxcount);
  186. while (seekRows.ordinality() < maxcount)
  187. {
  188. bool wasCompleteMatch = true;
  189. const void * next = input->nextInputRowGE(seek, numFields, wasCompleteMatch, inputStepExtra);
  190. if (!next)
  191. break;
  192. //wasCompleteMatch can be false if we've just read the last row returned from a block of reads,
  193. //but if so the next read request will do another blocked read, so just ignore this one.
  194. if (wasCompleteMatch)
  195. {
  196. readAheadRows.enqueue(next);
  197. if (!lastSeekRow || compare->docompare(next, lastSeekRow, numFields) > 0)
  198. {
  199. //Only record unique seek positions in the seek rows
  200. seekRows.enqueue(rowAllocator->linkRow(next));
  201. lastSeekRow = next;
  202. }
  203. //update the seek pointer to the best value.
  204. seek = next;
  205. }
  206. else
  207. rowAllocator->releaseRow(next);
  208. }
  209. }
  210. unsigned CSteppedInputLookahead::ordinality() const
  211. {
  212. return seekRows.ordinality();
  213. }
  214. const void * CSteppedInputLookahead::querySeek(unsigned i) const
  215. {
  216. return seekRows.item(i);
  217. }
  218. const void * CSteppedInputLookahead::consume()
  219. {
  220. if (!readAheadRow)
  221. fill();
  222. if (!includeInOutput(readAheadRow))
  223. return NULL;
  224. if (paranoid && readAheadRow)
  225. {
  226. if (previousReadAheadRow)
  227. rowAllocator->releaseRow(previousReadAheadRow);
  228. previousReadAheadRow = rowAllocator->linkRow(readAheadRow);
  229. }
  230. const void * ret = readAheadRow;
  231. readAheadRow = NULL;
  232. readAheadRowIsExactMatch = true;
  233. return ret;
  234. }
  235. IMultipleStepSeekInfo * CSteppedInputLookahead::createMutipleReadWrapper()
  236. {
  237. return this;
  238. }
  239. void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper)
  240. {
  241. lowestFrequencyInput = wrapper;
  242. }
  243. void CSteppedInputLookahead::fill()
  244. {
  245. readAheadRowIsExactMatch = true;
  246. if (restrictValue && numStepableFields)
  247. {
  248. //note - this will either return a valid value to be included in the range,
  249. //or if invalid then it must be out of range -> will fail includeInOutput later,
  250. //but we may as well keep the row
  251. unsigned numFields = numRestrictFields < numStepableFields ? numRestrictFields : numStepableFields;
  252. //Default to returning mismatches, but could be overidden from outside
  253. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  254. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  255. readAheadRow = nextInputRowGE(restrictValue, numFields, readAheadRowIsExactMatch, inputStepExtra);
  256. if (paranoid && readAheadRow)
  257. {
  258. int c = compare->docompare(readAheadRow, restrictValue, numFields);
  259. if (c < 0)
  260. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  261. if ((c == 0) && !readAheadRowIsExactMatch)
  262. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  263. }
  264. }
  265. else
  266. {
  267. //Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
  268. //Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
  269. readAheadRow = nextInputRow();
  270. }
  271. if (paranoid && readAheadRow && previousReadAheadRow && compare)
  272. {
  273. if (compare->docompare(previousReadAheadRow, readAheadRow, maxFields) > 0)
  274. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  275. }
  276. }
  277. const void * CSteppedInputLookahead::next()
  278. {
  279. if (!readAheadRowIsExactMatch)
  280. {
  281. if (includeInOutput(readAheadRow))
  282. skip();
  283. else
  284. return NULL;
  285. }
  286. if (!readAheadRow)
  287. fill();
  288. if (!includeInOutput(readAheadRow))
  289. return NULL;
  290. return readAheadRow;
  291. }
  292. const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  293. {
  294. if (readAheadRow)
  295. {
  296. int c = compare->docompare(readAheadRow, seek, numFields);
  297. if (c >= 0)
  298. {
  299. if (!includeInOutput(readAheadRow))
  300. return NULL;
  301. if (readAheadRowIsExactMatch)
  302. return readAheadRow;
  303. //readAheadRow is beyond seek point => ok to return an incomplete match
  304. if (stepExtra.returnMismatches() && (c != 0))
  305. {
  306. wasCompleteMatch = readAheadRowIsExactMatch;
  307. return readAheadRow;
  308. }
  309. }
  310. skip();
  311. }
  312. if (numStepableFields)
  313. {
  314. //This class is directly told whether it should be using readAhead, so need to create a modified stepExtra
  315. unsigned flags = (stepExtra.queryFlags() & ~stepFlagsMask) | stepFlagsValue;
  316. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  317. unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
  318. for (;;)
  319. {
  320. readAheadRowIsExactMatch = true;
  321. readAheadRow = nextInputRowGE(seek, stepFields, readAheadRowIsExactMatch, inputStepExtra);
  322. if (paranoid && readAheadRow)
  323. {
  324. int c = compare->docompare(readAheadRow, seek, stepFields);
  325. if (c < 0)
  326. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  327. if ((c == 0) && !readAheadRowIsExactMatch)
  328. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  329. }
  330. if (!readAheadRow || !includeInOutput(readAheadRow))
  331. return NULL;
  332. if (numFields <= numStepableFields)
  333. {
  334. wasCompleteMatch = readAheadRowIsExactMatch;
  335. return readAheadRow;
  336. }
  337. //if !readAheadRowIsExactMatch then isCompleteMatch must have been provided => ok to return a mismatch
  338. //if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
  339. if (!readAheadRowIsExactMatch)
  340. {
  341. wasCompleteMatch = readAheadRowIsExactMatch;
  342. return readAheadRow;
  343. }
  344. if (compare->docompare(readAheadRow, seek, numFields) >= 0)
  345. {
  346. wasCompleteMatch = readAheadRowIsExactMatch;
  347. return readAheadRow;
  348. }
  349. skip();
  350. }
  351. //now need to do an incremental seek on the subsequent fields to find an exact value >
  352. }
  353. //now narrow down
  354. for (;;)
  355. {
  356. const void * cur = next();
  357. if (!cur)
  358. return NULL;
  359. if (compare->docompare(cur, seek, numFields) >= 0)
  360. return cur;
  361. skip();
  362. }
  363. }
  364. unsigned CSteppedInputLookahead::queryMaxStepable(ISteppingMeta * callerStepping) const
  365. {
  366. return getNumMatchingFields(inputStepping, callerStepping);
  367. }
  368. void CSteppedInputLookahead::setAlwaysReadExact()
  369. {
  370. //can be used to force reading only exact matches (for the known lowest priority input)
  371. stepFlagsMask |= SSEFreturnMismatches;
  372. }
  373. void CSteppedInputLookahead::setReadAhead(bool value)
  374. {
  375. //This never removes readahead if requested somewhere else, so don't update the mask.
  376. if (value)
  377. stepFlagsValue |= SSEFreadAhead;
  378. else
  379. stepFlagsValue &= ~SSEFreadAhead;
  380. }
  381. void CSteppedInputLookahead::setRestriction(const void * _value, unsigned _num)
  382. {
  383. restrictValue = _value;
  384. numRestrictFields = _num;
  385. }
  386. void CSteppedInputLookahead::resetEOF()
  387. {
  388. if (numRestrictFields == 0)
  389. resetInputEOF();
  390. }
  391. void CSteppedInputLookahead::skip()
  392. {
  393. if (paranoid)
  394. {
  395. if (previousReadAheadRow)
  396. rowAllocator->releaseRow(previousReadAheadRow);
  397. previousReadAheadRow = readAheadRow;
  398. }
  399. else
  400. {
  401. if (readAheadRow)
  402. rowAllocator->releaseRow(readAheadRow);
  403. }
  404. //NB: Don't read ahead until we have to...
  405. readAheadRow = NULL;
  406. readAheadRowIsExactMatch = true;
  407. }
  408. const void * CSteppedInputLookahead::skipnext()
  409. {
  410. skip();
  411. return next();
  412. }
  413. //---------------------------------------------------------------------------
  414. void CUnfilteredSteppedMerger::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext, const bool * matched)
  415. {
  416. merger.setCandidateRow(_equalityRow);
  417. unsigned numInputs = inputArray->ordinality();
  418. for (unsigned i=0; i< numInputs; i++)
  419. {
  420. if (!needToVerifyNext || matched[i])
  421. firstCandidateRows[i] = inputArray->item(i).consume();
  422. else
  423. firstCandidateRows[i] = NULL;
  424. }
  425. merger.primeRows(firstCandidateRows);
  426. }
  427. //---------------------------------------------------------------------------
  428. CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRangeCompare * _stepCompare, ICompare * _equalCompare, CSteppedInputLookahead * _input, unsigned _numEqualFields)
  429. {
  430. allocator = _allocator;
  431. stepCompare = _stepCompare;
  432. equalCompare = _equalCompare;
  433. input = _input;
  434. matched.setown(createThreadSafeBitSet());
  435. numMatched = 0;
  436. readIndex = 0;
  437. numEqualFields = _numEqualFields;
  438. }
  439. CFilteredInputBuffer::~CFilteredInputBuffer()
  440. {
  441. }
  442. const void * CFilteredInputBuffer::consume()
  443. {
  444. if (!rows.isItem(readIndex))
  445. return NULL;
  446. const void * ret = rows.item(readIndex);
  447. rows.replace(NULL, readIndex);
  448. readIndex++;
  449. return ret;
  450. }
  451. const void * CFilteredInputBuffer::consumeGE(const void * seek, unsigned numFields)
  452. {
  453. while (rows.isItem(readIndex))
  454. {
  455. const void * cur = rows.item(readIndex);
  456. if (stepCompare->docompare(cur, seek, numFields) >= 0)
  457. {
  458. rows.replace(NULL, readIndex);
  459. readIndex++;
  460. return cur;
  461. }
  462. readIndex++;
  463. }
  464. return NULL;
  465. }
  466. void CFilteredInputBuffer::fill(const void * equalityRow)
  467. {
  468. const void * next = input->consume();
  469. assertex(next);
  470. append(next);
  471. if (equalityRow)
  472. {
  473. for (;;)
  474. {
  475. bool matches = true;
  476. SmartStepExtra stepExtra(SSEFreturnMismatches, NULL);
  477. const void * next = input->nextGE(equalityRow, numEqualFields, matches, stepExtra);
  478. if (!next || !matches || equalCompare->docompare(equalityRow, next) != 0)
  479. break;
  480. append(input->consume());
  481. }
  482. }
  483. else
  484. {
  485. for (;;)
  486. {
  487. const void * next = input->consume();
  488. if (!next)
  489. break;
  490. append(next);
  491. }
  492. }
  493. }
  494. void CFilteredInputBuffer::removeMatched()
  495. {
  496. ForEachItemInRev(i, rows)
  497. {
  498. if (isMatched(i))
  499. remove(i);
  500. }
  501. }
  502. void CFilteredInputBuffer::removeUnmatched()
  503. {
  504. ForEachItemInRev(i, rows)
  505. {
  506. if (!isMatched(i))
  507. remove(i);
  508. }
  509. }
  510. void CFilteredInputBuffer::remove(unsigned i)
  511. {
  512. const void * row = rows.item(i);
  513. rows.remove(i);
  514. allocator->releaseRow(row);
  515. }
  516. void CFilteredInputBuffer::reset()
  517. {
  518. ForEachItemIn(i, rows)
  519. {
  520. const void * cur = rows.item(i);
  521. if (cur)
  522. allocator->releaseRow(cur);
  523. }
  524. rows.kill();
  525. matched->reset();
  526. numMatched = 0;
  527. readIndex = 0;
  528. }
  529. CFilteredSteppedMerger::CFilteredSteppedMerger()
  530. {
  531. matches = NULL;
  532. joinKind = 0;
  533. numInputs = 0;
  534. equalCompare = NULL;
  535. extraCompare = NULL;
  536. globalCompare = NULL;
  537. minMatches = 0;
  538. maxMatches = 0;
  539. fullyMatchedLevel = 0;
  540. }
  541. CFilteredSteppedMerger::~CFilteredSteppedMerger()
  542. {
  543. delete [] matches;
  544. }
  545. void CFilteredSteppedMerger::init(IEngineRowAllocator * _allocator, IHThorNWayMergeJoinArg & helper, CSteppedInputLookaheadArray * inputArray)
  546. {
  547. unsigned flags = helper.getJoinFlags();
  548. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  549. numInputs = inputArray->ordinality();
  550. matches = new const void * [numInputs];
  551. equalCompare = helper.queryEqualCompare();
  552. extraCompare = helper.queryNonSteppedCompare();
  553. globalCompare = NULL;
  554. unsigned numEqualFields = helper.numEqualFields();
  555. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  556. globalCompare = helper.queryGlobalCompare();
  557. if (joinKind == IHThorNWayMergeJoinArg::MJFmofn)
  558. {
  559. minMatches = helper.getMinMatches();
  560. maxMatches = helper.getMaxMatches();
  561. }
  562. else
  563. {
  564. minMatches = numInputs;
  565. maxMatches = numInputs;
  566. }
  567. IRangeCompare * stepCompare = helper.querySteppingMeta()->queryCompare();
  568. ForEachItemIn(i, *inputArray)
  569. inputs.append(*new CFilteredInputBuffer(_allocator, stepCompare, equalCompare, &inputArray->item(i), numEqualFields));
  570. merger.init(_allocator, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  571. merger.initInputs(&inputs);
  572. }
  573. //ISteppedJoinRowGenerator
  574. void CFilteredSteppedMerger::beforeProcessCandidates(const void * equalityRow, bool needToVerifyNext, const bool * matched)
  575. {
  576. //Exaustively read from each of the inputs into each of the buffers
  577. ForEachItemIn(i, inputs)
  578. {
  579. if (!needToVerifyNext || matched[i])
  580. inputs.item(i).fill(equalityRow);
  581. }
  582. postFilterRows();
  583. //No point priming the rows here - will be just as efficient to use the default action
  584. }
  585. void CFilteredSteppedMerger::afterProcessCandidates()
  586. {
  587. ForEachItemIn(i, inputs)
  588. inputs.item(i).reset();
  589. merger.reset();
  590. }
  591. void CFilteredSteppedMerger::cleanupAllCandidates()
  592. {
  593. merger.reset(); // not strictly necessary...
  594. }
  595. void CFilteredSteppedMerger::afterProcessingAll()
  596. {
  597. merger.cleanup();
  598. }
  599. const void * CFilteredSteppedMerger::nextOutputRow()
  600. {
  601. return merger.nextRow();
  602. }
  603. const void * CFilteredSteppedMerger::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  604. {
  605. return merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  606. }
  607. bool CFilteredSteppedMerger::tagMatches(unsigned level, unsigned numRows)
  608. {
  609. CFilteredInputBuffer & right = inputs.item(level);
  610. unsigned maxLevel = inputs.ordinality()-1;
  611. ConstPointerArray & curRows = right.rows;
  612. if (curRows.ordinality())
  613. {
  614. bool valid = false;
  615. const void * lhs = matches[numRows-1];
  616. ForEachItemIn(i, curRows)
  617. {
  618. //If we have had a match at this level, and this item is already matched,
  619. //and all levels higher than this have already been completely matched),
  620. //then no need to check this item (and its children) again, since it won't change anything.
  621. bool alreadyMatched = right.isMatched(i);
  622. if (!valid || level + 1 < fullyMatchedLevel || !alreadyMatched)
  623. {
  624. const void * rhs = curRows.item(i);
  625. unsigned matchedRows = numRows;
  626. bool recurse;
  627. if (!extraCompare || extraCompare->match(lhs, rhs))
  628. {
  629. matches[matchedRows++] = rhs;
  630. recurse = matchedRows <= maxMatches;
  631. }
  632. else
  633. {
  634. //for mofn, check enough levels left to create a potential match, for others it will fail.
  635. unsigned remain = maxLevel-level;
  636. recurse = (numRows + remain >= minMatches);
  637. }
  638. if (recurse)
  639. {
  640. bool isFullMatch;
  641. if (level == maxLevel)
  642. isFullMatch = (!globalCompare || globalCompare->match(matchedRows, matches));
  643. else
  644. isFullMatch = tagMatches(level+1, matchedRows);
  645. if (isFullMatch)
  646. {
  647. valid = true;
  648. if (!alreadyMatched)
  649. right.noteMatch(i);
  650. //If the previous level is fully matched, and so is this one - then update the minimum fully matched level
  651. if ((level + 1 == fullyMatchedLevel) && right.isFullyMatched())
  652. fullyMatchedLevel--;
  653. //If all rows in this level and above are fully matched, then iterating any further will have no effect.
  654. //Could potentially reduce a O(N^m) to O(mN) if the majority of elements match.
  655. if (level >= fullyMatchedLevel)
  656. break;
  657. }
  658. }
  659. }
  660. }
  661. return valid;
  662. }
  663. else
  664. {
  665. //mofn may still be ok with a skipped level or two
  666. unsigned remain = maxLevel-level;
  667. if (numRows + remain >= minMatches)
  668. {
  669. if (level == maxLevel)
  670. return (!globalCompare || globalCompare->match(numRows, matches));
  671. else
  672. return tagMatches(level+1, numRows);
  673. }
  674. return false;
  675. }
  676. }
  677. void CFilteredSteppedMerger::tagMatches()
  678. {
  679. unsigned numInputs = inputs.ordinality();
  680. fullyMatchedLevel = numInputs;
  681. //for m of n, need to start matching at levels 0,1,.. numLevels - minMatches
  682. unsigned iterateLevels = numInputs - minMatches;
  683. for (unsigned level =0; level <= iterateLevels; level++)
  684. {
  685. CFilteredInputBuffer & left = inputs.item(level);
  686. ForEachItemIn(i, left.rows)
  687. {
  688. matches[0] = left.rows.item(i);
  689. bool thisMatched;
  690. //mofn(1) may not have another level, to just check global compare.
  691. if (level == numInputs-1)
  692. thisMatched = (!globalCompare || globalCompare->match(1, matches));
  693. else
  694. thisMatched = tagMatches(level+1, 1);
  695. if (thisMatched)
  696. {
  697. if (!left.isMatched(i))
  698. left.noteMatch(i);
  699. //Check if this level, and all above are now fully matched. If so, we're done.
  700. if ((level + 1 == fullyMatchedLevel) && left.isFullyMatched())
  701. {
  702. fullyMatchedLevel--;
  703. break;
  704. }
  705. }
  706. }
  707. if (level >= fullyMatchedLevel)
  708. break;
  709. }
  710. }
  711. void CFilteredSteppedMerger::postFilterRows()
  712. {
  713. tagMatches();
  714. unsigned max = inputs.ordinality();
  715. switch (joinKind)
  716. {
  717. case IHThorNWayMergeJoinArg::MJFinner:
  718. case IHThorNWayMergeJoinArg::MJFmofn:
  719. {
  720. for (unsigned i=0; i < max; i++)
  721. inputs.item(i).removeUnmatched();
  722. break;
  723. }
  724. case IHThorNWayMergeJoinArg::MJFleftouter:
  725. {
  726. for (unsigned i=1; i < max; i++)
  727. inputs.item(i).removeUnmatched();
  728. break;
  729. }
  730. case IHThorNWayMergeJoinArg::MJFleftonly:
  731. {
  732. inputs.item(0).removeMatched();
  733. unsigned max = inputs.ordinality();
  734. for (unsigned i=1; i < max; i++)
  735. inputs.item(i).reset();
  736. break;
  737. }
  738. }
  739. }
  740. //---------------------------------------------------------------------------
  741. CMergeJoinProcessor::CMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : helper(_arg)
  742. {
  743. mergeSteppingMeta = helper.querySteppingMeta();
  744. assertex(mergeSteppingMeta);
  745. stepCompare = mergeSteppingMeta->queryCompare();
  746. equalCompare = helper.queryEqualCompare();
  747. equalCompareEq = helper.queryEqualCompareEq();
  748. numEqualFields = helper.numEqualFields();
  749. flags = helper.getJoinFlags();
  750. matched = NULL;
  751. candidateEqualityRow = NULL;
  752. numExternalEqualFields = 0;
  753. conjunctionOptimizer = NULL;
  754. tempSeekBuffer = NULL;
  755. lowestSeekRow = NULL;
  756. combineConjunctions = true;
  757. allInputsAreOuterInputs = false;
  758. maxSeekRecordSize = 0;
  759. numInputs = 0;
  760. eof = true;
  761. assertex(helper.numOrderFields() == mergeSteppingMeta->getNumFields());
  762. bool hasPostfilter = false;
  763. thisSteppingMeta.init(mergeSteppingMeta->getNumFields(), mergeSteppingMeta->queryFields(), stepCompare, mergeSteppingMeta->queryDistance(), hasPostfilter, SSFhaspriority|SSFisjoin);
  764. }
  765. CMergeJoinProcessor::~CMergeJoinProcessor()
  766. {
  767. afterProcessing();
  768. }
  769. void CMergeJoinProcessor::addInput(ISteppedInput * _input)
  770. {
  771. IInputSteppingMeta * _meta = _input->queryInputSteppingMeta();
  772. verifySteppingCompatible(_meta, mergeSteppingMeta);
  773. rawInputs.append(*LINK(_input));
  774. if (_meta)
  775. {
  776. if (!_meta->hasPriority())
  777. thisSteppingMeta.removePriority();
  778. else
  779. thisSteppingMeta.intersectPriority(_meta->getPriority());
  780. if (_meta->isDistributed())
  781. thisSteppingMeta.setDistributed();
  782. }
  783. else
  784. thisSteppingMeta.removePriority();
  785. }
  786. void CMergeJoinProcessor::afterProcessing()
  787. {
  788. cleanupCandidates();
  789. if (outputProcessor)
  790. {
  791. outputProcessor->afterProcessingAll();
  792. outputProcessor.clear();
  793. }
  794. if (conjunctionOptimizer)
  795. {
  796. conjunctionOptimizer->afterProcessing();
  797. delete conjunctionOptimizer;
  798. conjunctionOptimizer = NULL;
  799. }
  800. delete [] matched;
  801. matched = NULL;
  802. inputs.kill();
  803. rawInputs.kill();
  804. orderedInputs.kill();
  805. if (lowestSeekRow)
  806. {
  807. inputAllocator->releaseRow(lowestSeekRow);
  808. lowestSeekRow = NULL;
  809. }
  810. if (tempSeekBuffer)
  811. {
  812. inputAllocator->releaseRow(tempSeekBuffer);
  813. tempSeekBuffer = NULL;
  814. }
  815. //Now free the allocators
  816. inputAllocator.clear();
  817. outputAllocator.clear();
  818. maxSeekRecordSize = 0;
  819. }
  820. void CMergeJoinProcessor::createTempSeekBuffer()
  821. {
  822. tempSeekBuffer = inputAllocator->createRow();
  823. #ifdef _DEBUG
  824. //Clear the complete tempSeekBBuffer record, so that toXML() can be used to trace the seek row in roxie
  825. if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
  826. {
  827. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, inputAllocator->queryOutputMeta()->getMinRecordSize());
  828. helper.createLowInputRow(rowBuilder);
  829. }
  830. #endif
  831. }
  832. void CMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  833. {
  834. inputAllocator.set(_inputAllocator);
  835. outputAllocator.set(_outputAllocator);
  836. //The seek components must all be fixed width, so the seek record size must be <= the minimum size of the input record
  837. maxSeekRecordSize = inputAllocator->queryOutputMeta()->getMinRecordSize();
  838. bool paranoid = (flags & IHThorNWayMergeJoinArg::MJFassertsorted) != 0;
  839. ForEachItemIn(i1, rawInputs)
  840. {
  841. ISteppedInput & cur = rawInputs.item(i1);
  842. inputs.append(* new CSteppedInputLookahead(&cur, cur.queryInputSteppingMeta(), inputAllocator, stepCompare, paranoid));
  843. }
  844. if (flags & IHThorNWayMergeJoinArg::MJFhasclearlow)
  845. {
  846. RtlDynamicRowBuilder rowBuilder(inputAllocator);
  847. size32_t size = helper.createLowInputRow(rowBuilder);
  848. lowestSeekRow = rowBuilder.finalizeRowClear(size);
  849. }
  850. cleanupCandidates();
  851. eof = false;
  852. numInputs = inputs.ordinality();
  853. matched = new bool[numInputs];
  854. if (numInputs == 0)
  855. eof = true;
  856. //Sort the inputs by the preferred processing order (if provided), ensuring no duplicates
  857. clearMatches();
  858. ForEachItemIn(i2, searchOrder)
  859. {
  860. unsigned next = searchOrder.item(i2);
  861. if (next < numInputs && !matched[next])
  862. {
  863. orderedInputs.append(OLINK(inputs.item(next)));
  864. matched[next] = true;
  865. }
  866. }
  867. //MORE: We really should move the most-stepable inputs to the start
  868. for (unsigned i3 = 0; i3 < numInputs; i3++)
  869. {
  870. if (!matched[i3])
  871. orderedInputs.append(OLINK(inputs.item(i3)));
  872. }
  873. }
  874. bool CMergeJoinProcessor::createConjunctionOptimizer()
  875. {
  876. if (inputs.ordinality())
  877. {
  878. conjunctionOptimizer = new CSteppedConjunctionOptimizer(inputAllocator, helper, this);
  879. if (gatherConjunctions(*conjunctionOptimizer) && conjunctionOptimizer->worthCombining())
  880. {
  881. conjunctionOptimizer->beforeProcessing();
  882. return true;
  883. }
  884. delete conjunctionOptimizer;
  885. conjunctionOptimizer = NULL;
  886. }
  887. combineConjunctions = false;
  888. return false;
  889. }
  890. void CMergeJoinProcessor::createMerger()
  891. {
  892. ICompareEq * extraCompare = helper.queryNonSteppedCompare();
  893. bool hasGlobalCompare = (flags & IHThorNWayMergeJoinArg::MJFglobalcompare) != 0;
  894. if (!extraCompare && !hasGlobalCompare)
  895. {
  896. Owned<CUnfilteredSteppedMerger> simpleMerger = new CUnfilteredSteppedMerger(numEqualFields);
  897. simpleMerger->init(inputAllocator, equalCompare, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  898. simpleMerger->initInputs(&inputs);
  899. outputProcessor.setown(simpleMerger.getClear());
  900. }
  901. else
  902. {
  903. Owned<CFilteredSteppedMerger> simpleMerger = new CFilteredSteppedMerger();
  904. simpleMerger->init(inputAllocator, helper, &inputs);
  905. outputProcessor.setown(simpleMerger.getClear());
  906. }
  907. }
  908. void CMergeJoinProcessor::createEqualityJoinProcessor()
  909. {
  910. if (numEqualFields >= helper.numOrderFields())
  911. outputProcessor.setown(new CEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  912. else
  913. outputProcessor.setown(new CSortedEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  914. }
  915. void CMergeJoinProcessor::finishCandidates()
  916. {
  917. if (outputProcessor)
  918. outputProcessor->afterProcessCandidates();
  919. assertex(hasCandidates());
  920. inputAllocator->releaseRow(candidateEqualityRow);
  921. candidateEqualityRow = NULL;
  922. }
  923. bool CMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  924. {
  925. allInputsAreOuterInputs = true;
  926. ForEachItemIn(i, inputs)
  927. {
  928. CSteppedInputLookahead & cur = inputs.item(i);
  929. if (!cur.gatherConjunctions(collector))
  930. collector.addInput(cur);
  931. else
  932. {
  933. collector.addPseudoInput(cur);
  934. allInputsAreOuterInputs = false;
  935. }
  936. }
  937. collector.addJoin(*this);
  938. return true;
  939. }
  940. const void * CMergeJoinProcessor::nextInputRow()
  941. {
  942. if (!hasCandidates() && !findCandidates(NULL, 0))
  943. return NULL;
  944. for (;;)
  945. {
  946. const void * next = nextCandidate();
  947. if (next)
  948. return next;
  949. finishCandidates();
  950. //Abort early if externally optimized, and not proximity (since they may not have read all records for this equality)
  951. if ((numEqualFields == numExternalEqualFields) && candidatesExhaustEquality())
  952. return NULL;
  953. if (!findCandidates(NULL, 0))
  954. return NULL;
  955. }
  956. }
  957. const void * CMergeJoinProcessor::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  958. {
  959. //First check the next row from the candidates, it may be ok.
  960. if (hasCandidates())
  961. {
  962. unsigned compareFields = numFields < numEqualFields ? numFields : numEqualFields;
  963. //check whether the candidates could possibly return the match
  964. if (stepCompare->docompare(candidateEqualityRow, seek, compareFields) == 0)
  965. {
  966. const void * next = nextCandidateGE(seek, numFields, wasCompleteMatch, stepExtra);
  967. if (next)
  968. return next; // note must match equality to have been returned.
  969. }
  970. finishCandidates();
  971. }
  972. if (!findCandidates(seek, numFields))
  973. return NULL;
  974. return nextInputRow();
  975. }
  976. void CMergeJoinProcessor::resetEOF()
  977. {
  978. ForEachItemIn(i, inputs)
  979. inputs.item(i).resetEOF();
  980. }
  981. void CMergeJoinProcessor::queryResetEOF()
  982. {
  983. resetEOF();
  984. }
  985. const void * CMergeJoinProcessor::nextRow()
  986. {
  987. if (conjunctionOptimizer)
  988. return conjunctionOptimizer->next();
  989. if (combineConjunctions)
  990. {
  991. if (numExternalEqualFields == 0)
  992. {
  993. if (createConjunctionOptimizer())
  994. return conjunctionOptimizer->next();
  995. }
  996. else
  997. combineConjunctions = false; // being used inside a conjunction optimizer => don't create another..
  998. }
  999. return nextInputRow();
  1000. }
  1001. const void * CMergeJoinProcessor::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  1002. {
  1003. if (conjunctionOptimizer)
  1004. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  1005. if (combineConjunctions)
  1006. {
  1007. if (createConjunctionOptimizer())
  1008. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  1009. }
  1010. return nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  1011. }
  1012. void CMergeJoinProcessor::startRestrictedJoin(const void * equalityRow, unsigned numEqualityFields)
  1013. {
  1014. assertex(numExternalEqualFields == 0);
  1015. numExternalEqualFields = numEqualityFields;
  1016. eof = false;
  1017. }
  1018. void CMergeJoinProcessor::stopRestrictedJoin()
  1019. {
  1020. numExternalEqualFields = 0;
  1021. if (hasCandidates())
  1022. finishCandidates();
  1023. //There are no more matches for this (outer) equality condition, so all active rows need to be thrown away.
  1024. if (outputProcessor)
  1025. outputProcessor->cleanupAllCandidates();
  1026. }
  1027. void CMergeJoinProcessor::setCandidateRow(const void * row, bool inputsMayBeEmpty, const bool * matched)
  1028. {
  1029. candidateEqualityRow = inputAllocator->linkRow(row);
  1030. const void * restrictionRow = (numEqualFields == numExternalEqualFields) ? NULL : candidateEqualityRow;
  1031. outputProcessor->beforeProcessCandidates(restrictionRow, inputsMayBeEmpty, matched);
  1032. }
  1033. void CMergeJoinProcessor::connectRemotePriorityInputs()
  1034. {
  1035. CIArrayOf<OrderedInput> orderedInputs;
  1036. ForEachItemIn(i, inputs)
  1037. {
  1038. CSteppedInputLookahead & cur = inputs.item(i);
  1039. if (!cur.hasPriority() || !cur.readsRowsRemotely())
  1040. return;
  1041. orderedInputs.append(*new OrderedInput(cur, i, false));
  1042. }
  1043. orderedInputs.sort(compareInitialInputOrder);
  1044. associateRemoteInputs(orderedInputs, orderedInputs.ordinality());
  1045. combineConjunctions = false;
  1046. }
  1047. //---------------------------------------------------------------------------
  1048. CAndMergeJoinProcessor::CAndMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1049. {
  1050. }
  1051. void CAndMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1052. {
  1053. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1054. connectRemotePriorityInputs();
  1055. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1056. createEqualityJoinProcessor();
  1057. else
  1058. createMerger();
  1059. }
  1060. bool CAndMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1061. {
  1062. if (eof)
  1063. return false;
  1064. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1065. PreservedRow savedRow(inputAllocator);
  1066. const void * equalValue;
  1067. unsigned firstInput = 0;
  1068. if (inputsMustMatchEquality && allInputsAreOuterInputs)
  1069. {
  1070. //special case - all inputs are already advanced to the correct place, so just start generating candidates
  1071. //for nested conjunctions they may already be exausted though
  1072. equalValue = orderedInputs.item(firstInput).next();
  1073. if (!equalValue)
  1074. {
  1075. eof = true;
  1076. return false;
  1077. }
  1078. }
  1079. else
  1080. {
  1081. if (!seekValue)
  1082. {
  1083. numSeekFields = numEqualFields;
  1084. seekValue = lowestSeekRow;
  1085. }
  1086. bool matchedCompletely = true;
  1087. equalValue = orderedInputs.item(firstInput).next(seekValue, numSeekFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1088. if (!equalValue)
  1089. {
  1090. eof = true;
  1091. return false;
  1092. }
  1093. unsigned matchCount = 0;
  1094. clearMatches();
  1095. if (matchedCompletely)
  1096. {
  1097. matched[firstInput] = true;
  1098. matchCount++;
  1099. }
  1100. else
  1101. {
  1102. equalValue = orderedInputs.item(firstInput).consume();
  1103. savedRow.setown(equalValue);
  1104. }
  1105. unsigned lastInput = firstInput;
  1106. while (matchCount != numInputs)
  1107. {
  1108. unsigned nextInput = nextToMatch(lastInput);
  1109. lastInput = nextInput;
  1110. bool matchedCompletely = true;
  1111. const void * nextRow = orderedInputs.item(nextInput).nextGE(equalValue, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1112. if (!nextRow)
  1113. {
  1114. eof = true;
  1115. return false;
  1116. }
  1117. #ifdef CHECK_CONSISTENCY
  1118. if (inputsMustMatchEquality)
  1119. {
  1120. if (equalCompare->docompare(nextRow, equalValue) != 0)
  1121. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1122. }
  1123. else
  1124. {
  1125. if (equalCompare->docompare(nextRow, equalValue) < 0)
  1126. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1127. }
  1128. #endif
  1129. if (!inputsMustMatchEquality)
  1130. {
  1131. if (!equalCompareEq->match(nextRow, equalValue))
  1132. {
  1133. //value didn't match => skip all the previously matched entries.
  1134. for (unsigned i=0; i < numInputs; i++)
  1135. {
  1136. if (matched[i])
  1137. {
  1138. matched[i] = false;
  1139. orderedInputs.item(i).skip();
  1140. if (--matchCount == 0)
  1141. break;
  1142. }
  1143. }
  1144. if (!matchedCompletely)
  1145. {
  1146. //Need to preserve nextRow, otherwise it will be gone after we skip
  1147. equalValue = orderedInputs.item(nextInput).consume();
  1148. savedRow.setown(equalValue);
  1149. }
  1150. else
  1151. equalValue = nextRow;
  1152. }
  1153. }
  1154. if (matchedCompletely)
  1155. {
  1156. matched[nextInput] = true;
  1157. matchCount++;
  1158. }
  1159. }
  1160. }
  1161. //Set up the mergeProcessor with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
  1162. //with the candidate.
  1163. //clone one of the rows
  1164. setCandidateRow(equalValue, false, NULL);
  1165. return true;
  1166. }
  1167. unsigned CAndMergeJoinProcessor::nextToMatch(unsigned lastInput) const
  1168. {
  1169. for (unsigned i=0; i < numInputs; i++)
  1170. {
  1171. //Don't seek on the last input again (it may have found a keyed match, but not matched the post filter)
  1172. if ((i != lastInput) && !matched[i])
  1173. return i;
  1174. }
  1175. throwUnexpected();
  1176. }
  1177. //---------------------------------------------------------------------------
  1178. CAndLeftMergeJoinProcessor::CAndLeftMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1179. {
  1180. combineConjunctions = false; // No advantage using this as the base of a combined conjunction
  1181. isLeftOnly = (flags & IHThorNWayMergeJoinArg::MJFkindmask) == IHThorNWayMergeJoinArg::MJFleftonly;
  1182. //Left only with a not stepped comparison needs to be done as a left outer at the stepping level
  1183. if (isLeftOnly && (helper.queryNonSteppedCompare() || helper.queryGlobalCompare()))
  1184. isLeftOnly = false;
  1185. }
  1186. void CAndLeftMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1187. {
  1188. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1189. createTempSeekBuffer();
  1190. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1191. createEqualityJoinProcessor();
  1192. else
  1193. createMerger();
  1194. }
  1195. bool CAndLeftMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1196. {
  1197. if (eof)
  1198. return false;
  1199. CSteppedInputLookahead & input0 = inputs.item(0);
  1200. bool wasMatched = true;
  1201. const void * lhs = input0.next(seekValue, numSeekFields, wasMatched, nonBufferedMatchStepExtra);
  1202. assertex(wasMatched);
  1203. if (!lhs)
  1204. {
  1205. eof = true;
  1206. return false;
  1207. }
  1208. unsigned matchCount = 1;
  1209. while (matchCount != numInputs)
  1210. {
  1211. bool matchedCompletely = true; // we don't care what the next rhs value is - as long as it can't match the left
  1212. const void * rhs = orderedInputs.item(matchCount).nextGE(lhs, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1213. if (rhs)
  1214. {
  1215. int c = equalCompare->docompare(rhs, lhs);
  1216. if (c < 0)
  1217. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1218. if (c == 0)
  1219. {
  1220. assertex(matchedCompletely);
  1221. //previously the (matchCount+1) test wasn't here, so it aborted as soon as there was any match.
  1222. if (isLeftOnly && (matchCount+1 == numInputs))
  1223. {
  1224. if (numEqualFields == numExternalEqualFields)
  1225. {
  1226. //I think this is worth doing here...
  1227. //Skip input0 to a mismatch value, so the optimizer doesn't waste time reading extra equalities
  1228. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1229. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1230. input0.skip(); // invalidates lhs
  1231. if (calculatedNextSeek)
  1232. {
  1233. bool wasMatched = true;
  1234. input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1235. }
  1236. eof = true;
  1237. return false;
  1238. }
  1239. //Create the next join value if that is possible
  1240. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1241. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1242. input0.skip(); // invalidates lhs
  1243. bool wasMatched = true;
  1244. if (calculatedNextSeek)
  1245. lhs = input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1246. else
  1247. lhs = input0.next();
  1248. if (!lhs)
  1249. {
  1250. eof = true;
  1251. return false;
  1252. }
  1253. matchCount = 0; //incremented at tail of loop
  1254. }
  1255. }
  1256. else
  1257. break;
  1258. }
  1259. else
  1260. break;
  1261. matchCount++;
  1262. }
  1263. clearMatches();
  1264. matched[0] = true;
  1265. if (matchCount != numInputs)
  1266. {
  1267. //Failed to match completely => generate a match for just the left. Skip any matched rows so far and break out.
  1268. for (unsigned i=1; i < matchCount; i++)
  1269. orderedInputs.item(i).skip();
  1270. matchCount = 1;
  1271. }
  1272. else
  1273. {
  1274. for (unsigned i=1; i < numInputs; i++)
  1275. matched[i] = true;
  1276. }
  1277. //LEFT ONLY will only merge 1 stream, LEFT OUTER will merge as many as match LEFT
  1278. setCandidateRow(lhs, true, matched);
  1279. return true;
  1280. }
  1281. bool CAndLeftMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1282. {
  1283. CSteppedInputLookahead & cur = inputs.item(0);
  1284. if (!cur.gatherConjunctions(collector))
  1285. collector.addInput(cur);
  1286. collector.addJoin(*this);
  1287. return true;
  1288. }
  1289. //---------------------------------------------------------------------------
  1290. void BestMatchManager::associate(unsigned input, const void * value)
  1291. {
  1292. unsigned curIndex = 0;
  1293. while (curIndex != numEntries)
  1294. {
  1295. BestMatchItem & cur = matches.item(curIndex);
  1296. int c = compare->docompare(value, cur.value);
  1297. if (c <= 0)
  1298. {
  1299. if (c == 0)
  1300. {
  1301. //insert at the end of the duplicates
  1302. curIndex += cur.duplicates;
  1303. cur.duplicates++;
  1304. }
  1305. //Move a record at the end of the list to the correct position, ready for updating.
  1306. if (curIndex != numEntries)
  1307. matches.rotateR(curIndex, numEntries);
  1308. break; // now go and modify record at position curIndex
  1309. }
  1310. curIndex += cur.duplicates;
  1311. }
  1312. assertex(matches.isItem(curIndex));
  1313. BestMatchItem & inserted = matches.item(curIndex);
  1314. inserted.duplicates = 1;
  1315. inserted.value = value;
  1316. inserted.input = input;
  1317. numEntries++;
  1318. return;
  1319. }
  1320. unsigned BestMatchManager::getValueOffset(unsigned idx) const
  1321. {
  1322. unsigned offset = 0;
  1323. while (idx--)
  1324. offset += matches.item(offset).duplicates;
  1325. return offset;
  1326. }
  1327. void BestMatchManager::init(ICompare * _compare, unsigned numInputs)
  1328. {
  1329. compare = _compare;
  1330. numEntries = 0;
  1331. for (unsigned i=0; i < numInputs; i++)
  1332. matches.append(* new BestMatchItem);
  1333. }
  1334. void BestMatchManager::kill()
  1335. {
  1336. matches.kill();
  1337. }
  1338. unsigned BestMatchManager::getInput(unsigned whichValue, unsigned inputIndex) const
  1339. {
  1340. return matches.item(getValueOffset(whichValue) + inputIndex).input;
  1341. }
  1342. unsigned BestMatchManager::getInput0(unsigned inputIndex) const
  1343. {
  1344. return matches.item(inputIndex).input;
  1345. }
  1346. unsigned BestMatchManager::numInputs(unsigned whichValue) const
  1347. {
  1348. return matches.item(getValueOffset(whichValue)).duplicates;
  1349. }
  1350. void BestMatchManager::remove(unsigned whichValue)
  1351. {
  1352. unsigned offset = getValueOffset(whichValue);
  1353. unsigned duplicates = matches.item(offset).duplicates;
  1354. matches.rotateLN(offset, numEntries-1, duplicates);
  1355. numEntries -= duplicates;
  1356. }
  1357. const void * BestMatchManager::queryValue(unsigned whichValue) const
  1358. {
  1359. return matches.item(getValueOffset(whichValue)).value;
  1360. }
  1361. //---------------------------------------------------------------------------
  1362. CMofNMergeJoinProcessor::CMofNMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1363. {
  1364. combineConjunctions = false;
  1365. alive = NULL;
  1366. candidateMask = NULL;
  1367. minMatches = 0;
  1368. maxMatches = 0;
  1369. numActive = 0;
  1370. }
  1371. void CMofNMergeJoinProcessor::afterProcessing()
  1372. {
  1373. delete [] alive;
  1374. delete [] candidateMask;
  1375. alive = NULL;
  1376. candidateMask = NULL;
  1377. matches.kill();
  1378. CMergeJoinProcessor::afterProcessing();
  1379. }
  1380. void CMofNMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1381. {
  1382. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1383. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1384. createEqualityJoinProcessor();
  1385. else
  1386. createMerger();
  1387. minMatches = helper.getMinMatches();
  1388. maxMatches = helper.getMaxMatches();
  1389. if (minMatches == 0)
  1390. throw MakeStringException(99, "Need a non-zero minimum number of matches");
  1391. alive = new bool [numInputs];
  1392. candidateMask = new bool [numInputs];
  1393. for (unsigned i= 0; i < numInputs; i++)
  1394. alive[i] = true;
  1395. numActive = numInputs;
  1396. matches.init(equalCompare, numInputs);
  1397. }
  1398. bool CMofNMergeJoinProcessor::findCandidates(const void * originalSeekValue, unsigned numOriginalSeekFields)
  1399. {
  1400. if (numActive < minMatches)
  1401. return false;
  1402. unsigned numFreeToMismatch = numActive - minMatches;
  1403. const void * seekValue = originalSeekValue;
  1404. unsigned numSeekFields = numOriginalSeekFields;
  1405. //This should be true, because after candidates are matched their values are removed.
  1406. assertex(matches.numInputs() <= numFreeToMismatch); //
  1407. //MORE: This needs rewriting, so that mismatches are handled coorectly. In particular,
  1408. while (matches.numInputs() < numActive)
  1409. {
  1410. unsigned nextInput = nextToMatch();
  1411. bool matchedCompletely = true;
  1412. // MORE: This needs rewriting, so that mismatches are handled coorectly. In particular, the matches need to retain information about whether
  1413. // they matched fully, since that will optimize where could be sought next.
  1414. const void * value = inputs.item(nextInput).next(seekValue, numSeekFields, matchedCompletely, nonBufferedMatchStepExtra);
  1415. //NOTE: matchedCompletely is currently always true. More work is needed if not true.
  1416. assertex(matchedCompletely);
  1417. if (value)
  1418. {
  1419. if (matchedCompletely)
  1420. {
  1421. matched[nextInput] = true;
  1422. matches.associate(nextInput, value);
  1423. }
  1424. }
  1425. else
  1426. {
  1427. alive[nextInput] = false;
  1428. numActive--;
  1429. numFreeToMismatch--;
  1430. if (numActive < minMatches)
  1431. return false;
  1432. }
  1433. unsigned matchCount = matches.numInputs();
  1434. if (matchCount > numFreeToMismatch)
  1435. {
  1436. unsigned numMatch0 = matches.numInputs0();
  1437. if ((matchCount - numMatch0 > numFreeToMismatch) || (numMatch0 > maxMatches))
  1438. {
  1439. //clear seekValue, because seek value won't be valid any more after the skips - may be updated later.
  1440. seekValue = originalSeekValue;
  1441. numSeekFields = numOriginalSeekFields;
  1442. //No way that the first element is going to match, so remove all inputs associated with it.
  1443. for (unsigned i= 0; i < numMatch0; i++)
  1444. {
  1445. unsigned input = matches.getInput0(i);
  1446. inputs.item(input).skip();
  1447. matched[input] = false;
  1448. }
  1449. matches.remove(0);
  1450. }
  1451. //Lowest element now provides the best seek position.
  1452. if (matches.numInputs() > numFreeToMismatch)
  1453. {
  1454. seekValue = matches.queryValue(0);
  1455. numSeekFields = numEqualFields;
  1456. }
  1457. }
  1458. }
  1459. //matches(0) contains the next match set, set a set of flags indicating which inputs to use
  1460. unsigned numMatches = matches.numInputs0();
  1461. for (unsigned i1=0; i1< numInputs; i1++)
  1462. candidateMask[i1] = false;
  1463. for (unsigned i2=0; i2 < numMatches; i2++)
  1464. candidateMask[matches.getInput0(i2)] = true;
  1465. setCandidateRow(matches.queryValue(0), true, candidateMask);
  1466. //Now cleanup housekeeping, so that findCandidates() is ready to find the next block
  1467. for (unsigned i3=0; i3 < numInputs; i3++)
  1468. if (candidateMask[i3])
  1469. matched[i3] = false;
  1470. matches.remove(0);
  1471. return true;
  1472. }
  1473. bool CMofNMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1474. {
  1475. //MORE: We may need to create pseudo inputs in order to process these optimially.
  1476. return false;
  1477. }
  1478. unsigned CMofNMergeJoinProcessor::nextToMatch() const
  1479. {
  1480. for (unsigned i= 0; i < numInputs; i++)
  1481. {
  1482. if (alive[i] && !matched[i])
  1483. return i;
  1484. }
  1485. throwUnexpected();
  1486. }
  1487. //---------------------------------------------------------------------------
  1488. /*
  1489. NOTES on the distances... this is far from simple once you get arbitrary trees involved.
  1490. given a join expression right.x between left.x - a and left.x + b i.e. up to a Before and b after
  1491. a is maxRightBeforeLeft
  1492. b is maxLeftBeforeRight
  1493. We define a function D(x,y) which is the maximum value which can be deducted from row a, to provide a valid value for row b
  1494. Given a tree of join expressions, we can calculate a distance function between any pair of inputs.
  1495. J1(a,b,c) = D(i,i+1)=4, D(i+1,i) = 10
  1496. J2(d, e) = D(i,i+1)=-1, D(i+1,i) = 12
  1497. J3(J1, J2) = D(i,i+1)=0 D(i+1,i) = 5
  1498. =>
  1499. For each join we define
  1500. D(i, lowest) - maximum value to deduct from row i to obtain the lowest
  1501. D(highest, i) - maximum value to deduct from highest to obtain row i
  1502. by definition these must both be >= 0, for a simple input they are both 0
  1503. A join's extend is given by
  1504. D(highest,lowest) = max(D(highest,i)+D(i,lowest))
  1505. We're only interested in the maximum values, which are obtained by the maximum distances between the elements. The lowest and highest memebers
  1506. of the group are going to be the ends. So we use the maximum of those distances to work out D(i,low) and D(high, i), being careful to only use
  1507. the range if it is valid (e.g., the end must be possible to be the highest/lowest)
  1508. D(a,b) = 4, D(b,a) = 10
  1509. D(b,c) = 4, D(c,b) = 10
  1510. D(a,c) = 8, D(c,a) = 20
  1511. D(d,e) = -1, D(e,d) = 12
  1512. D(a, lowest) = 8 D(highest, a) = 20
  1513. D(b, lowest) = 10 D(highest, b) = 10
  1514. D(c, lowest) = 20 D(highest, c) = 8
  1515. D(highest, lowest) = 28
  1516. Then assuming the left is the highest and the right is the lowest we have
  1517. D(a,e) = DJ1(a, lowest) + DJ3(i,i+1) + DJ2(highest, e)
  1518. = 8 + 0 + 0
  1519. For >2 terms, you also need to take into account the size of a term given by D(highest,lowest)
  1520. */
  1521. inline unsigned __int64 adjustRangeValue(unsigned __int64 rangeValue, __int64 delta)
  1522. {
  1523. if ((delta >= 0) || (rangeValue > (unsigned __int64)-delta))
  1524. return rangeValue + delta;
  1525. return 0;
  1526. }
  1527. //---------------------------------------------------------------------------
  1528. //This class is created for each each input of a nary-join, it maintains a queue of potential records.
  1529. CNaryJoinLookaheadQueue::CNaryJoinLookaheadQueue(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookahead * _input, CNaryJoinLookaheadQueue * _left, const void * * _activeRowPtr) : helper(_helper), rows(_inputAllocator), unmatchedRows(_inputAllocator)
  1530. {
  1531. equalCompareEq = helper.queryEqualCompareEq();
  1532. nonSteppedCompareEq = helper.queryNonSteppedCompare();
  1533. numEqualFields = helper.numEqualFields();
  1534. stepCompare = helper.querySteppingMeta()->queryCompare();
  1535. input.set(_input);
  1536. activeRowPtr = _activeRowPtr;
  1537. left = _left;
  1538. equalityRow = NULL;
  1539. curRow = 0;
  1540. maxRow = 0;
  1541. numSkipped = 0;
  1542. done = true;
  1543. }
  1544. bool CNaryJoinLookaheadQueue::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext)
  1545. {
  1546. done = false;
  1547. equalityRow = _equalityRow;
  1548. rows.kill();
  1549. numSkipped = 0;
  1550. if (matchedLeft)
  1551. matchedLeft->reset();
  1552. // next is guaranteed to match the equality condition for AND, proximity but not for m of n/left outer...
  1553. if (!needToVerifyNext || nextUnqueued())
  1554. {
  1555. consumeNextInput();
  1556. return true;
  1557. }
  1558. return false;
  1559. }
  1560. void CNaryJoinLookaheadQueue::clearPending()
  1561. {
  1562. rows.kill();
  1563. }
  1564. bool CNaryJoinLookaheadQueue::ensureNonEmpty()
  1565. {
  1566. if (rows.ordinality())
  1567. return true;
  1568. if (nextUnqueued())
  1569. {
  1570. consumeNextInput();
  1571. return true;
  1572. }
  1573. return false;
  1574. }
  1575. bool CNaryJoinLookaheadQueue::firstSelection()
  1576. {
  1577. if (!left)
  1578. {
  1579. assertex(maxRow != 0);
  1580. curRow = 0;
  1581. *activeRowPtr = rows.item(curRow);
  1582. return true;
  1583. }
  1584. if (!left->firstSelection())
  1585. return false;
  1586. return findValidSelection(0);
  1587. }
  1588. bool CNaryJoinLookaheadQueue::findValidSelection(unsigned initialRow)
  1589. {
  1590. assertex(left);
  1591. const unsigned max = maxRow;
  1592. unsigned candidateRow = initialRow;
  1593. for (;;)
  1594. {
  1595. const void * leftRow = left->activeRow();
  1596. while (candidateRow < max)
  1597. {
  1598. const void * rightRow = rows.item(candidateRow);
  1599. if (!nonSteppedCompareEq || nonSteppedCompareEq->match(leftRow, rightRow))
  1600. {
  1601. curRow = candidateRow;
  1602. *activeRowPtr = rightRow;
  1603. return true;
  1604. }
  1605. candidateRow++;
  1606. }
  1607. if (!left->nextSelection())
  1608. return false;
  1609. candidateRow = 0;
  1610. }
  1611. }
  1612. const void * CNaryJoinLookaheadQueue::nextUnqueued()
  1613. {
  1614. if (equalityRow)
  1615. {
  1616. bool matches = true;
  1617. const void * next = input->nextGE(equalityRow, numEqualFields, matches, nonBufferedMismatchStepExtra);
  1618. if (next && matches && equalCompareEq->match(next, equalityRow))
  1619. return next;
  1620. return NULL;
  1621. }
  1622. else
  1623. return input->next();
  1624. }
  1625. bool CNaryJoinLookaheadQueue::nextSelection()
  1626. {
  1627. if (left)
  1628. return findValidSelection(curRow+1);
  1629. curRow++;
  1630. if (curRow >= maxRow)
  1631. return false;
  1632. *activeRowPtr = rows.item(curRow);
  1633. return true;
  1634. }
  1635. bool CNaryJoinLookaheadQueue::ensureCandidateExists(unsigned __int64 minDistance, unsigned __int64 maxDistance)
  1636. {
  1637. for (;;)
  1638. {
  1639. const void * next = rows.head();
  1640. if (!next)
  1641. break;
  1642. unsigned __int64 distance = helper.extractRangeValue(next);
  1643. if (distance >= minDistance)
  1644. {
  1645. assertex(distance <= maxDistance);
  1646. return true;
  1647. }
  1648. rows.skip();
  1649. }
  1650. for (;;)
  1651. {
  1652. const void * next = nextUnqueued();
  1653. if (!next)
  1654. return false;
  1655. unsigned __int64 distance = helper.extractRangeValue(next);
  1656. if (distance >= minDistance)
  1657. {
  1658. if (distance <= maxDistance)
  1659. {
  1660. consumeNextInput();
  1661. return true;
  1662. }
  1663. return false;
  1664. }
  1665. input->skip();
  1666. }
  1667. }
  1668. bool CNaryJoinLookaheadQueue::checkExistsGE(const void * seek, unsigned numFields)
  1669. {
  1670. for (;;)
  1671. {
  1672. const void * next = rows.head();
  1673. if (!next)
  1674. return false;
  1675. if (stepCompare->docompare(next, seek, numFields) >= 0)
  1676. return true;
  1677. rows.skip();
  1678. }
  1679. }
  1680. unsigned CNaryJoinLookaheadQueue::readAheadTo(unsigned __int64 maxDistance, bool canConsumeBeyondMax)
  1681. {
  1682. const void * tail = rows.tail();
  1683. if (tail && helper.extractRangeValue(tail) > maxDistance)
  1684. {
  1685. unsigned limit = rows.ordinality() - 1;
  1686. //Already have all the records, return how many...
  1687. while (limit > 0)
  1688. {
  1689. const void * prev = rows.item(limit-1);
  1690. if (helper.extractRangeValue(prev) <= maxDistance)
  1691. return limit;
  1692. --limit;
  1693. }
  1694. return 0;
  1695. }
  1696. while (!done)
  1697. {
  1698. const void * next = nextUnqueued();
  1699. if (!next)
  1700. {
  1701. done = true;
  1702. break;
  1703. }
  1704. //This is a bit nasty. We need to consume the next value to ensure that the lowest spotter always has the next valid
  1705. //but it means we might be reading this input for too long
  1706. if (helper.extractRangeValue(next) > maxDistance)
  1707. {
  1708. if (!canConsumeBeyondMax)
  1709. break;
  1710. consumeNextInput();
  1711. return rows.ordinality()-1;
  1712. }
  1713. consumeNextInput();
  1714. }
  1715. return rows.ordinality();
  1716. }
  1717. void CNaryJoinLookaheadQueue::readCandidateAll()
  1718. {
  1719. for (;;)
  1720. {
  1721. const void * next = nextUnqueued();
  1722. if (!next)
  1723. return;
  1724. consumeNextInput();
  1725. }
  1726. }
  1727. void CNaryJoinLookaheadQueue::skip()
  1728. {
  1729. if (matchedLeft && !matchedLeft->test(numSkipped))
  1730. {
  1731. unmatchedRows.enqueue(rows.dequeue());
  1732. }
  1733. else
  1734. rows.skip();
  1735. numSkipped++;
  1736. }
  1737. bool CNaryJoinLookaheadQueue::flushUnmatched()
  1738. {
  1739. while (rows.ordinality())
  1740. skip();
  1741. return unmatchedRows.ordinality() != 0;
  1742. }
  1743. //---------------------------------------------------------------------------
  1744. CProximityJoinProcessor::CProximityJoinProcessor(IHThorNWayMergeJoinArg & _helper) :
  1745. CMergeJoinProcessor(_helper)
  1746. {
  1747. maxRightBeforeLeft = 0;
  1748. maxLeftBeforeRight = 0;
  1749. }
  1750. void CProximityJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1751. {
  1752. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1753. createTempSeekBuffer();
  1754. //Have to delay creating the actual join joinProcessor because maxRightBeforeLeft() etc. can be onStart dependant.
  1755. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  1756. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  1757. //Handle phrases using a different class i) because the general scheme doesn't quite work and ii) for efficiency
  1758. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1759. {
  1760. if ((maxRightBeforeLeft < 0 || maxLeftBeforeRight < 0))
  1761. outputProcessor.setown(new CAnchoredRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1762. else
  1763. outputProcessor.setown(new CProximityRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1764. }
  1765. else
  1766. createMerger();
  1767. }
  1768. bool CProximityJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1769. {
  1770. unsigned firstInput = 0;//searchOrder.item(0);
  1771. bool wasCompleteMatch = true;
  1772. if (eof || !inputs.item(firstInput).next(seekValue, numSeekFields, wasCompleteMatch, nonBufferedMatchStepExtra))
  1773. return false;
  1774. unsigned matchCount = 1;
  1775. clearMatches();
  1776. matched[firstInput] = true;
  1777. const unsigned numJoinFields = numEqualFields + 1;
  1778. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1779. while (matchCount != numInputs)
  1780. {
  1781. unsigned nextInput = nextToMatch();
  1782. unsigned baseInput = getBestToSeekFrom(nextInput);
  1783. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1784. helper.adjustRangeValue(rowBuilder, inputs.item(baseInput).next(), -maxDistanceBefore(baseInput, nextInput));
  1785. bool wasCompleteMatch = true;
  1786. //MORE: Would it help to allow mismatches? I would have thought so, but there was a previous comment sayimg "I don't think so because of the range calculation"
  1787. const void * nextRow = inputs.item(nextInput).nextGE(tempSeekBuffer, numJoinFields, wasCompleteMatch, nonBufferedMatchStepExtra);
  1788. assertex(wasCompleteMatch);
  1789. if (!nextRow)
  1790. {
  1791. eof = true;
  1792. return false;
  1793. }
  1794. if (inputsMustMatchEquality || equalityComponentMatches(nextRow, tempSeekBuffer))
  1795. {
  1796. //Now check if this new record causes other records to be too far away
  1797. unsigned __int64 thisRangeValue = helper.extractRangeValue(nextRow);
  1798. for (unsigned i=0; i<numInputs; i++)
  1799. {
  1800. if (matched[i])
  1801. {
  1802. unsigned __int64 seekRangeValue = adjustRangeValue(thisRangeValue, -maxDistanceBefore(nextInput, i));
  1803. if (getRangeValue(i) < seekRangeValue)
  1804. {
  1805. inputs.item(i).skip();
  1806. matched[i] = false;
  1807. if (--matchCount == 0)
  1808. break;
  1809. }
  1810. }
  1811. }
  1812. }
  1813. else
  1814. {
  1815. for (unsigned i=0; i<numInputs; i++)
  1816. {
  1817. if (matched[i])
  1818. {
  1819. inputs.item(i).skip();
  1820. matched[i] = false;
  1821. matchCount--;
  1822. }
  1823. }
  1824. }
  1825. matched[nextInput] = true;
  1826. matchCount++;
  1827. }
  1828. setCandidateRow(inputs.item(0).next(), false, NULL);
  1829. return true;
  1830. }
  1831. __int64 CProximityJoinProcessor::maxDistanceBefore(unsigned fixedInput, unsigned searchInput) const
  1832. {
  1833. assertex(outputProcessor); // sanity check to ensure this isn't called before maxXBeforeY are set up
  1834. if (searchInput < fixedInput)
  1835. return maxLeftBeforeRight * (fixedInput - searchInput);
  1836. else
  1837. return maxRightBeforeLeft * (searchInput - fixedInput);
  1838. }
  1839. unsigned CProximityJoinProcessor::nextToMatch() const
  1840. {
  1841. for (unsigned i=0; i < numInputs; i++)
  1842. {
  1843. unsigned next = i;//searchOrder.item(i);
  1844. if (!matched[next])
  1845. return next;
  1846. }
  1847. throwUnexpected();
  1848. }
  1849. //Choose the input to seek from that restricts the input being sought the most.
  1850. unsigned CProximityJoinProcessor::getBestToSeekFrom(unsigned seekInput) const
  1851. {
  1852. unsigned __int64 bestRangeValue = 0;
  1853. unsigned best = NotFound;
  1854. //MORE: This can be optimized!
  1855. for (unsigned i=0; i < numInputs; i++)
  1856. {
  1857. if (matched[i])
  1858. {
  1859. //Calculate the value of the distance
  1860. __int64 distanceBefore = maxDistanceBefore(i, seekInput);
  1861. unsigned __int64 rangeValue = adjustRangeValue(getRangeValue(i), -distanceBefore);
  1862. if (rangeValue >= bestRangeValue)
  1863. {
  1864. bestRangeValue = rangeValue;
  1865. best = i;
  1866. }
  1867. }
  1868. }
  1869. assertex(best != NotFound);
  1870. return best;
  1871. }
  1872. //---------------------------------------------------------------------------
  1873. //NULL passed to CSteppedInputLookahead first parameter means nextGE() must be overridden
  1874. CJoinGenerator::CJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  1875. helper(_helper), inputAllocator(_inputAllocator), outputAllocator(_outputAllocator)
  1876. {
  1877. state = JSdone;
  1878. unsigned flags = helper.getJoinFlags();
  1879. stepCompare = helper.querySteppingMeta()->queryCompare();
  1880. globalCompare = NULL;
  1881. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  1882. globalCompare = helper.queryGlobalCompare();
  1883. unsigned numInputs = _inputs.ordinality();
  1884. rows = new const void * [numInputs];
  1885. CNaryJoinLookaheadQueue * prev = NULL;
  1886. ForEachItemIn(i, _inputs)
  1887. {
  1888. CNaryJoinLookaheadQueue * queue = new CNaryJoinLookaheadQueue(inputAllocator, helper, &_inputs.item(i), prev, rows + i);
  1889. inputs.append(*queue);
  1890. prev = queue;
  1891. }
  1892. isSpecialLeftJoin = false;
  1893. numActiveInputs = numInputs;
  1894. lastActiveInput = numInputs ? &inputs.tos() : NULL;
  1895. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  1896. switch (joinKind)
  1897. {
  1898. case IHThorNWayMergeJoinArg::MJFleftonly:
  1899. case IHThorNWayMergeJoinArg::MJFleftouter:
  1900. if (helper.queryNonSteppedCompare() || globalCompare)
  1901. {
  1902. isSpecialLeftJoin = true;
  1903. if (numInputs)
  1904. inputs.item(0).trackUnmatched();
  1905. }
  1906. break;
  1907. case IHThorNWayMergeJoinArg::MJFmofn:
  1908. if (helper.queryNonSteppedCompare() || globalCompare)
  1909. throw MakeStringException(99, "MOFN JOIN with non stepped condition not yet supported");
  1910. break;
  1911. }
  1912. }
  1913. CJoinGenerator::~CJoinGenerator()
  1914. {
  1915. delete [] rows;
  1916. }
  1917. void CJoinGenerator::beforeProcessCandidates(const void * candidateRow, bool needToVerifyNext, const bool * matched)
  1918. {
  1919. if (needToVerifyNext)
  1920. {
  1921. numActiveInputs = 0;
  1922. CNaryJoinLookaheadQueue * prev = NULL;
  1923. ForEachItemIn(i, inputs)
  1924. {
  1925. CNaryJoinLookaheadQueue & cur = inputs.item(i);
  1926. if (cur.beforeProcessCandidates(candidateRow, needToVerifyNext))
  1927. {
  1928. cur.updateContext(prev, rows + numActiveInputs);
  1929. prev = &cur;
  1930. numActiveInputs++;
  1931. }
  1932. }
  1933. lastActiveInput = prev;
  1934. }
  1935. else
  1936. {
  1937. ForEachItemIn(i, inputs)
  1938. inputs.item(i).beforeProcessCandidates(candidateRow, needToVerifyNext);
  1939. }
  1940. state = JSfirst;
  1941. }
  1942. void CJoinGenerator::cleanupAllCandidates()
  1943. {
  1944. //Remove all pending candidates - only called if outer join optimization is enabled
  1945. //afterProcessCandidates() will already have been called.
  1946. ForEachItemIn(i, inputs)
  1947. inputs.item(i).clearPending();
  1948. }
  1949. void CJoinGenerator::afterProcessCandidates()
  1950. {
  1951. }
  1952. const void * CJoinGenerator::nextOutputRow()
  1953. {
  1954. RtlDynamicRowBuilder rowBuilder(outputAllocator, false);
  1955. for (;;)
  1956. {
  1957. if (isSpecialLeftJoin)
  1958. {
  1959. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1960. for (;;)
  1961. {
  1962. const void * unmatchedLeft = left.nextUnmatched();
  1963. if (!unmatchedLeft)
  1964. break;
  1965. rowBuilder.ensureRow();
  1966. size32_t retSize = helper.transform(rowBuilder, 1, &unmatchedLeft);
  1967. left.skipUnmatched();
  1968. if (retSize)
  1969. return rowBuilder.finalizeRowClear(retSize);
  1970. }
  1971. }
  1972. switch (state)
  1973. {
  1974. case JSdone:
  1975. if (isSpecialLeftJoin)
  1976. {
  1977. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1978. left.readCandidateAll();
  1979. if (left.flushUnmatched())
  1980. break; // round again
  1981. }
  1982. return NULL;
  1983. case JShascandidate:
  1984. {
  1985. state = JSnextcandidate;
  1986. //If is left only join, and has an additional equality criteria, then ignore matches.
  1987. //If left only, and no extra equality - or only one dataset has matches, then all matches are real left only matches
  1988. if (isSpecialLeftJoin && (joinKind == IHThorNWayMergeJoinArg::MJFleftonly) && (numActiveInputs != 1))
  1989. break;
  1990. rowBuilder.ensureRow();
  1991. size32_t retSize = helper.transform(rowBuilder, numActiveInputs, rows);
  1992. if (retSize)
  1993. return rowBuilder.finalizeRowClear(retSize);
  1994. break;
  1995. }
  1996. case JSnextcandidate:
  1997. if (nextCandidate())
  1998. state = JShascandidate;
  1999. else
  2000. {
  2001. if (state != JSdone)
  2002. state = JSgathercandidates;
  2003. }
  2004. break;
  2005. case JSfirst:
  2006. case JSgathercandidates:
  2007. if (gatherNextCandidates())
  2008. state = JShascandidate;
  2009. else
  2010. state = JSdone;
  2011. break;
  2012. default:
  2013. throwUnexpected();
  2014. }
  2015. }
  2016. }
  2017. const void * CJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  2018. {
  2019. //A stupid version. We could possibly skip on the lowest value if we knew the fields were assigned from the lowest value in the input
  2020. //which would potentially save a lot of transforms.
  2021. //would also probably need the input to match the output.
  2022. for (;;)
  2023. {
  2024. const void * next = nextOutputRow();
  2025. if (!next || stepCompare->docompare(next, seek, numFields) >= 0)
  2026. return next;
  2027. outputAllocator->releaseRow(next);
  2028. }
  2029. }
  2030. bool CJoinGenerator::firstSelection()
  2031. {
  2032. if (lastActiveInput->firstSelection())
  2033. {
  2034. if (globalCompare && !globalCompare->match(numActiveInputs, rows))
  2035. return nextSelection();
  2036. if (isSpecialLeftJoin)
  2037. inputs.item(0).noteMatched();
  2038. return true;
  2039. }
  2040. return false;
  2041. }
  2042. bool CJoinGenerator::nextSelection()
  2043. {
  2044. while (lastActiveInput->nextSelection())
  2045. {
  2046. if (!globalCompare || globalCompare->match(numActiveInputs, rows))
  2047. {
  2048. if (isSpecialLeftJoin)
  2049. inputs.item(0).noteMatched();
  2050. return true;
  2051. }
  2052. }
  2053. return false;
  2054. }
  2055. //---------------------------------------------------------------------------
  2056. CEqualityJoinGenerator::CEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2057. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2058. {
  2059. lowestInput = NULL;
  2060. }
  2061. void CEqualityJoinGenerator::afterProcessCandidates()
  2062. {
  2063. lowestInput = NULL;
  2064. CJoinGenerator::afterProcessCandidates();
  2065. }
  2066. bool CEqualityJoinGenerator::nextCandidate()
  2067. {
  2068. if (nextSelection())
  2069. return true;
  2070. selectNextLowestInput();
  2071. return false;
  2072. }
  2073. /*
  2074. o Walk the input which is guaranteed to be the lowest
  2075. o Once that is done throw away that record, and choose the next.
  2076. */
  2077. bool CEqualityJoinGenerator::doGatherNextCandidates()
  2078. {
  2079. ForEachItemIn(iInput, inputs)
  2080. {
  2081. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2082. if (&curInput != lowestInput)
  2083. curInput.setCandidateAll();
  2084. else
  2085. curInput.setCandidateLowest();
  2086. }
  2087. return firstSelection();
  2088. }
  2089. bool CEqualityJoinGenerator::gatherNextCandidates()
  2090. {
  2091. if (state == JSfirst)
  2092. {
  2093. prefetchAllCandidates();
  2094. selectLowestInput();
  2095. }
  2096. else if (lowestInput->empty())
  2097. return false;
  2098. for (;;)
  2099. {
  2100. if (doGatherNextCandidates())
  2101. return true;
  2102. if (!selectNextLowestInput())
  2103. return false;
  2104. }
  2105. }
  2106. void CEqualityJoinGenerator::prefetchAllCandidates()
  2107. {
  2108. //could be done in parallel, but
  2109. ForEachItemIn(i, inputs)
  2110. {
  2111. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2112. curInput.readCandidateAll();
  2113. }
  2114. }
  2115. void CEqualityJoinGenerator::selectLowestInput()
  2116. {
  2117. ForEachItemIn(i, inputs)
  2118. {
  2119. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2120. if (!curInput.empty())
  2121. {
  2122. lowestInput = &curInput;
  2123. return;
  2124. }
  2125. }
  2126. throwUnexpected();
  2127. }
  2128. bool CEqualityJoinGenerator::selectNextLowestInput()
  2129. {
  2130. lowestInput->skip();
  2131. if (lowestInput->empty())
  2132. {
  2133. state = JSdone;
  2134. return false;
  2135. }
  2136. return true;
  2137. }
  2138. //---------------------------------------------------------------------------
  2139. CSortedEqualityJoinGenerator::CSortedEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2140. CEqualityJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2141. {
  2142. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2143. lowestSpotter.initInputs();
  2144. }
  2145. CSortedEqualityJoinGenerator::~CSortedEqualityJoinGenerator()
  2146. {
  2147. lowestSpotter.cleanup();
  2148. }
  2149. void CSortedEqualityJoinGenerator::afterProcessCandidates()
  2150. {
  2151. lowestSpotter.reset();
  2152. CEqualityJoinGenerator::afterProcessCandidates();
  2153. }
  2154. void CSortedEqualityJoinGenerator::selectLowestInput()
  2155. {
  2156. unsigned iLowest = lowestSpotter.queryNextInput();
  2157. assertex(iLowest != NotFound);
  2158. lowestInput = &inputs.item(iLowest);
  2159. }
  2160. bool CSortedEqualityJoinGenerator::selectNextLowestInput()
  2161. {
  2162. lowestSpotter.skipRow();
  2163. if (lowestInput->empty())
  2164. {
  2165. state = JSdone;
  2166. return false;
  2167. }
  2168. CSortedEqualityJoinGenerator::selectLowestInput();
  2169. return true;
  2170. }
  2171. //---------------------------------------------------------------------------
  2172. CRangeJoinGenerator::CRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2173. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2174. {
  2175. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  2176. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  2177. }
  2178. //---------------------------------------------------------------------------
  2179. CAnchoredRangeJoinGenerator::CAnchoredRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2180. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2181. {
  2182. iLowest = maxRightBeforeLeft < 0 ? 0 : inputs.ordinality()-1;
  2183. lowestInput = &inputs.item(iLowest);
  2184. }
  2185. bool CAnchoredRangeJoinGenerator::nextCandidate()
  2186. {
  2187. if (nextSelection())
  2188. return true;
  2189. lowestInput->skip();
  2190. return false;
  2191. }
  2192. /*
  2193. o Walk the input which is guaranteed to be the lowest
  2194. o Once that is done throw away that record, and choose the next.
  2195. */
  2196. bool CAnchoredRangeJoinGenerator::doGatherNextCandidates()
  2197. {
  2198. const void * lowestRow = lowestInput->next();
  2199. if (!lowestRow)
  2200. return false;
  2201. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2202. ForEachItemIn(iInput, inputs)
  2203. {
  2204. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2205. if (iInput != iLowest)
  2206. {
  2207. __int64 maxLowestBeforeCur = maxDistanceAfterLowest(iInput);
  2208. assertex(maxLowestBeforeCur > 0);
  2209. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2210. if (!curInput.setCandidateRange(maxDistance, false))
  2211. return false;
  2212. }
  2213. }
  2214. lowestInput->setCandidateLowest();
  2215. return firstSelection();
  2216. }
  2217. const void * CAnchoredRangeJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  2218. {
  2219. //Note: Skip any lower values that are less than seek value, but don't read any more
  2220. if (!lowestInput->checkExistsGE(seek, numFields))
  2221. return NULL;
  2222. return CRangeJoinGenerator::nextOutputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  2223. }
  2224. bool CAnchoredRangeJoinGenerator::nextMatchesAnyConsumed()
  2225. {
  2226. const void * lowestRow = lowestInput->next();
  2227. bool consumePending = false;
  2228. if (!lowestRow)
  2229. {
  2230. lowestRow = lowestInput->nextUnqueued();
  2231. if (!lowestRow)
  2232. return false;
  2233. consumePending = true;
  2234. }
  2235. //Throw any non-matching rows away, and return true if there are no other rows left.
  2236. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2237. ForEachItemIn(iInput, inputs)
  2238. {
  2239. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2240. if (iInput != iLowest)
  2241. {
  2242. //note: maxRightBeforeLeft is -minRightAfterLeft
  2243. __int64 minCurAfterLowest;
  2244. if (iInput < iLowest)
  2245. minCurAfterLowest = (-maxLeftBeforeRight) * (iLowest - iInput);
  2246. else
  2247. minCurAfterLowest = (-maxRightBeforeLeft) * (iInput - iLowest);
  2248. assertex(minCurAfterLowest >= 0);
  2249. if (!curInput.ensureCandidateExists(lowestDistance+minCurAfterLowest, lowestDistance + maxDistanceAfterLowest(iInput)))
  2250. return false;
  2251. }
  2252. }
  2253. //A potential match, so consume the potential start word and try again
  2254. if (consumePending)
  2255. lowestInput->consumeNextInput();
  2256. return true;
  2257. }
  2258. bool CAnchoredRangeJoinGenerator::gatherNextCandidates()
  2259. {
  2260. for (;;)
  2261. {
  2262. if (!nextMatchesAnyConsumed())
  2263. return false;
  2264. if (doGatherNextCandidates())
  2265. return true;
  2266. lowestInput->skip();
  2267. }
  2268. }
  2269. //---------------------------------------------------------------------------
  2270. CProximityRangeJoinGenerator::CProximityRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2271. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2272. {
  2273. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2274. lowestSpotter.initInputs();
  2275. }
  2276. CProximityRangeJoinGenerator::~CProximityRangeJoinGenerator()
  2277. {
  2278. lowestSpotter.cleanup();
  2279. }
  2280. void CProximityRangeJoinGenerator::afterProcessCandidates()
  2281. {
  2282. lowestSpotter.reset();
  2283. CRangeJoinGenerator::afterProcessCandidates();
  2284. }
  2285. bool CProximityRangeJoinGenerator::nextCandidate()
  2286. {
  2287. if (nextSelection())
  2288. return true;
  2289. if (!lowestSpotter.skipNextLowest())
  2290. state = JSdone;
  2291. return false;
  2292. }
  2293. /*
  2294. First version.....
  2295. o Walk the input datasets in the order lowest first.
  2296. o Perform the cross product of that record with all others that could possibly match.
  2297. o Once that is done throw away that record, and choose the next.
  2298. o Abort as soon as any of the inputs contains no records within potential range.
  2299. */
  2300. bool CProximityRangeJoinGenerator::gatherNextCandidates(unsigned iLowest)
  2301. {
  2302. CNaryJoinLookaheadQueue & lowestInput = inputs.item(iLowest);
  2303. const void * lowestRow = lowestInput.next();
  2304. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2305. ForEachItemIn(iInput, inputs)
  2306. {
  2307. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2308. if (iInput != iLowest)
  2309. {
  2310. __int64 maxLowestBeforeCur;
  2311. if (iInput < iLowest)
  2312. maxLowestBeforeCur = maxRightBeforeLeft * (iLowest - iInput);
  2313. else
  2314. maxLowestBeforeCur = maxLeftBeforeRight * (iInput - iLowest);
  2315. assertex(maxLowestBeforeCur >= 0); // should have created an anchored variant if not true
  2316. // maxLowestBeforeCur = maxCurAfterLowest
  2317. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2318. if (!curInput.setCandidateRange(maxDistance, true))
  2319. return false;
  2320. }
  2321. else
  2322. curInput.setCandidateLowest();
  2323. }
  2324. return firstSelection();
  2325. }
  2326. bool CProximityRangeJoinGenerator::gatherNextCandidates()
  2327. {
  2328. for (;;)
  2329. {
  2330. unsigned iLowest = lowestSpotter.queryNextInput();
  2331. assertex(iLowest != NotFound);
  2332. if (gatherNextCandidates(iLowest))
  2333. return true;
  2334. //It would be really nice to break out early if there were no more potential matches, but even if there
  2335. //is only one matching stream we need to keep walking the consumed records, because the later records
  2336. //may pull in the relevant related records, and we can't sensibly put back the consumed records
  2337. if (!lowestSpotter.skipNextLowest())
  2338. {
  2339. //No more records within this document => can't ever match
  2340. state = JSdone;
  2341. return false;
  2342. }
  2343. }
  2344. }