thorstep.cpp 83 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jlog.hpp"
  15. #include "jqueue.tpp"
  16. #include "jexcept.hpp"
  17. #include "thorcommon.hpp"
  18. #include "thorstep.ipp"
  19. #include "thorstep2.ipp"
  20. #include "roxiemem.hpp"
  21. #ifdef _DEBUG
  22. #define CHECK_CONSISTENCY
  23. #endif
  24. using roxiemem::OwnedConstRoxieRow;
  25. const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
  26. const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  27. const static SmartStepExtra nonSeekStepExtra(SSEFreturnUnbufferedMatches, NULL); // if doing next() instead of nextGE()
  28. const static SmartStepExtra nonBufferedMatchStepExtra(SSEFreturnUnbufferedMatches, NULL);
  29. const static SmartStepExtra nonBufferedMismatchStepExtra(SSEFreturnMismatches, NULL);
  30. bool stepFieldsMatch(const CFieldOffsetSize * leftFields, unsigned leftIndex, const CFieldOffsetSize * rightFields, unsigned rightIndex)
  31. {
  32. const CFieldOffsetSize * leftField = leftFields + leftIndex;
  33. const CFieldOffsetSize * rightField = rightFields + rightIndex;
  34. return (leftField->offset == rightField->offset) && (leftField->size == rightField->size);
  35. }
  36. bool stepFieldsMatch(ISteppingMeta * leftMeta, unsigned leftIndex, ISteppingMeta * rightMeta, unsigned rightIndex)
  37. {
  38. if ((leftIndex >= leftMeta->getNumFields()) || (rightIndex >= rightMeta->getNumFields()))
  39. return false;
  40. return stepFieldsMatch(leftMeta->queryFields(), leftIndex, rightMeta->queryFields(), rightIndex);
  41. }
  42. unsigned getNumMatchingFields(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  43. {
  44. unsigned numStepableFields = 0;
  45. if (inputStepping && callerStepping)
  46. {
  47. //Determine where the stepping fields overlap, and work out the extent.
  48. unsigned inputCount = inputStepping->getNumFields();
  49. for (unsigned i=0; i < inputCount; i++)
  50. {
  51. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  52. break;
  53. numStepableFields++;
  54. }
  55. }
  56. return numStepableFields;
  57. }
  58. void verifySteppingCompatible(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  59. {
  60. if (inputStepping && callerStepping)
  61. {
  62. //Determine where the stepping fields overlap, and work out the extent.
  63. unsigned parentCount = callerStepping->getNumFields();
  64. unsigned inputCount = inputStepping->getNumFields();
  65. unsigned max = parentCount < inputCount ? parentCount : inputCount;
  66. for (unsigned i=0; i < max; i++)
  67. {
  68. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  69. throw MakeStringException(999, "Stepping field %d, input and join do not match", i);
  70. }
  71. }
  72. }
  73. //---------------------------------------------------------------------------
  74. void CSteppingMeta::intersect(IInputSteppingMeta * inputMeta)
  75. {
  76. if (inputMeta)
  77. {
  78. unsigned maxFields = inputMeta->getNumFields();
  79. if (maxFields > numFields)
  80. maxFields = numFields;
  81. for (unsigned curField = 0; curField < maxFields; curField++)
  82. {
  83. if (!stepFieldsMatch(inputMeta->queryFields(), curField, fields, curField))
  84. {
  85. numFields = curField;
  86. break;
  87. }
  88. }
  89. if (inputMeta->hasPostFilter())
  90. postFiltered = true;
  91. if (inputMeta->isDistributed())
  92. setDistributed();
  93. unsigned inputFlags = inputMeta->getSteppedFlags();
  94. double inputPriority = inputMeta->getPriority();
  95. if (hadStepExtra)
  96. {
  97. stepFlags &= inputFlags;
  98. if (priority != inputPriority)
  99. stepFlags &= ~SSFhaspriority;
  100. }
  101. else
  102. {
  103. hadStepExtra = true;
  104. stepFlags = inputFlags;
  105. priority = inputPriority;
  106. }
  107. }
  108. else
  109. numFields = 0;
  110. }
  111. //---------------------------------------------------------------------------
  112. CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSteppingMeta * _inputStepping, IEngineRowAllocator * _rowAllocator, IRangeCompare * _compare, bool _paranoid)
  113. : input(_input), compare(_compare)
  114. {
  115. maxFields = compare ? compare->maxFields() : 0;
  116. readAheadRow = NULL;
  117. readAheadRowIsExactMatch = true;
  118. stepFlagsMask = 0;
  119. stepFlagsValue = 0;
  120. paranoid = _paranoid;
  121. previousReadAheadRow = NULL;
  122. rowAllocator.set(_rowAllocator);
  123. inputStepping = _inputStepping;
  124. numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
  125. isPostFiltered = inputStepping ? inputStepping->hasPostFilter() : false;
  126. setRestriction(NULL, 0);
  127. lowestFrequencyInput = NULL;
  128. eof = false;
  129. }
  130. CSteppedInputLookahead::~CSteppedInputLookahead()
  131. {
  132. if (previousReadAheadRow)
  133. rowAllocator->releaseRow(previousReadAheadRow);
  134. if (readAheadRow)
  135. rowAllocator->releaseRow(readAheadRow);
  136. }
  137. const void * CSteppedInputLookahead::nextInputRow()
  138. {
  139. if (readAheadRows.ordinality())
  140. return readAheadRows.dequeue();
  141. if (eof)
  142. return NULL;
  143. const void * ret = input->nextInputRow();
  144. if (!ret)
  145. eof = true;
  146. return ret;
  147. }
  148. const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  149. {
  150. while (readAheadRows.ordinality())
  151. {
  152. OwnedConstRoxieRow next = readAheadRows.dequeue();
  153. if (compare->docompare(next, seek, numFields) >= 0)
  154. {
  155. assertex(wasCompleteMatch);
  156. return (void *)next.getClear();
  157. }
  158. }
  159. if (eof)
  160. return NULL;
  161. const void * ret = input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  162. if (!ret)
  163. eof = true;
  164. return ret;
  165. }
  166. void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
  167. {
  168. const void * lastSeekRow = NULL;
  169. //Remove any rows from the seek list that occur before the new seek row
  170. while (seekRows.ordinality())
  171. {
  172. const void * next = seekRows.head();
  173. if (compare->docompare(next, seek, numFields) >= 0)
  174. {
  175. //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
  176. seek = seekRows.tail();
  177. lastSeekRow = seek;
  178. break;
  179. }
  180. rowAllocator->releaseRow(seekRows.dequeue());
  181. }
  182. //Could the current readahead row be part of the seek set.
  183. if (readAheadRow && compare->docompare(readAheadRow, seek, numFields) >= 0)
  184. {
  185. //Check not already added - could conceivably happen after rows are read directly beyond the matching seeks.
  186. if (!lastSeekRow || compare->docompare(readAheadRow, lastSeekRow, numFields) > 0)
  187. {
  188. seekRows.enqueue(rowAllocator->linkRow(readAheadRow));
  189. lastSeekRow = readAheadRow;
  190. seek = readAheadRow;
  191. }
  192. }
  193. if (eof)
  194. return;
  195. //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
  196. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  197. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  198. seekRows.ensure(maxcount);
  199. while (seekRows.ordinality() < maxcount)
  200. {
  201. bool wasCompleteMatch = true;
  202. const void * next = input->nextInputRowGE(seek, numFields, wasCompleteMatch, inputStepExtra);
  203. if (!next)
  204. {
  205. eof = true;
  206. break;
  207. }
  208. //wasCompleteMatch can be false if we've just read the last row returned from a block of reads,
  209. //but if so the next read request will do another blocked read, so just ignore this one.
  210. if (wasCompleteMatch)
  211. {
  212. readAheadRows.enqueue(next);
  213. if (!lastSeekRow || compare->docompare(next, lastSeekRow, numFields) > 0)
  214. {
  215. //Only record unique seek positions in the seek rows
  216. seekRows.enqueue(rowAllocator->linkRow(next));
  217. lastSeekRow = next;
  218. }
  219. //update the seek pointer to the best value.
  220. seek = next;
  221. }
  222. else
  223. rowAllocator->releaseRow(next);
  224. }
  225. }
  226. unsigned CSteppedInputLookahead::ordinality() const
  227. {
  228. return seekRows.ordinality();
  229. }
  230. const void * CSteppedInputLookahead::querySeek(unsigned i) const
  231. {
  232. return seekRows.item(i);
  233. }
  234. const void * CSteppedInputLookahead::consume()
  235. {
  236. if (!readAheadRow)
  237. fill();
  238. if (!includeInOutput(readAheadRow))
  239. return NULL;
  240. if (paranoid && readAheadRow)
  241. {
  242. if (previousReadAheadRow)
  243. rowAllocator->releaseRow(previousReadAheadRow);
  244. previousReadAheadRow = rowAllocator->linkRow(readAheadRow);
  245. }
  246. const void * ret = readAheadRow;
  247. readAheadRow = NULL;
  248. readAheadRowIsExactMatch = true;
  249. return ret;
  250. }
  251. IMultipleStepSeekInfo * CSteppedInputLookahead::createMutipleReadWrapper()
  252. {
  253. return this;
  254. }
  255. void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper)
  256. {
  257. lowestFrequencyInput = wrapper;
  258. }
  259. void CSteppedInputLookahead::fill()
  260. {
  261. readAheadRowIsExactMatch = true;
  262. if (restrictValue && numStepableFields)
  263. {
  264. //note - this will either return a valid value to be included in the range,
  265. //or if invalid then it must be out of range -> will fail includeInOutput later,
  266. //but we may as well keep the row
  267. unsigned numFields = numRestrictFields < numStepableFields ? numRestrictFields : numStepableFields;
  268. //Default to returning mismatches, but could be overidden from outside
  269. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  270. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  271. readAheadRow = nextInputRowGE(restrictValue, numFields, readAheadRowIsExactMatch, inputStepExtra);
  272. if (paranoid && readAheadRow)
  273. {
  274. int c = compare->docompare(readAheadRow, restrictValue, numFields);
  275. if (c < 0)
  276. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  277. if ((c == 0) && !readAheadRowIsExactMatch)
  278. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  279. }
  280. }
  281. else
  282. {
  283. //Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
  284. //Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
  285. readAheadRow = nextInputRow();
  286. }
  287. if (paranoid && readAheadRow && previousReadAheadRow && compare)
  288. {
  289. if (compare->docompare(previousReadAheadRow, readAheadRow, maxFields) > 0)
  290. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  291. }
  292. }
  293. const void * CSteppedInputLookahead::next()
  294. {
  295. if (!readAheadRowIsExactMatch)
  296. {
  297. if (includeInOutput(readAheadRow))
  298. skip();
  299. else
  300. return NULL;
  301. }
  302. if (!readAheadRow)
  303. fill();
  304. if (!includeInOutput(readAheadRow))
  305. return NULL;
  306. return readAheadRow;
  307. }
  308. const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  309. {
  310. if (readAheadRow)
  311. {
  312. int c = compare->docompare(readAheadRow, seek, numFields);
  313. if (c >= 0)
  314. {
  315. if (!includeInOutput(readAheadRow))
  316. return NULL;
  317. if (readAheadRowIsExactMatch)
  318. return readAheadRow;
  319. //readAheadRow is beyond seek point => ok to return an incomplete match
  320. if (stepExtra.returnMismatches() && (c != 0))
  321. {
  322. wasCompleteMatch = readAheadRowIsExactMatch;
  323. return readAheadRow;
  324. }
  325. }
  326. skip();
  327. }
  328. if (numStepableFields)
  329. {
  330. //This class is directly told whether it should be using readAhead, so need to create a modified stepExtra
  331. unsigned flags = (stepExtra.queryFlags() & ~stepFlagsMask) | stepFlagsValue;
  332. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  333. unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
  334. loop
  335. {
  336. readAheadRowIsExactMatch = true;
  337. readAheadRow = nextInputRowGE(seek, stepFields, readAheadRowIsExactMatch, inputStepExtra);
  338. if (paranoid && readAheadRow)
  339. {
  340. int c = compare->docompare(readAheadRow, seek, stepFields);
  341. if (c < 0)
  342. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  343. if ((c == 0) && !readAheadRowIsExactMatch)
  344. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  345. }
  346. if (!readAheadRow || !includeInOutput(readAheadRow))
  347. return NULL;
  348. if (numFields <= numStepableFields)
  349. {
  350. wasCompleteMatch = readAheadRowIsExactMatch;
  351. return readAheadRow;
  352. }
  353. //if !readAheadRowIsExactMatch then isCompleteMatch must have been provided => ok to return a mismatch
  354. //if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
  355. if (!readAheadRowIsExactMatch)
  356. {
  357. wasCompleteMatch = readAheadRowIsExactMatch;
  358. return readAheadRow;
  359. }
  360. if (compare->docompare(readAheadRow, seek, numFields) >= 0)
  361. {
  362. wasCompleteMatch = readAheadRowIsExactMatch;
  363. return readAheadRow;
  364. }
  365. skip();
  366. }
  367. //now need to do an incremental seek on the subsequent fields to find an exact value >
  368. }
  369. //now narrow down
  370. loop
  371. {
  372. const void * cur = next();
  373. if (!cur)
  374. return NULL;
  375. if (compare->docompare(cur, seek, numFields) >= 0)
  376. return cur;
  377. skip();
  378. }
  379. }
  380. unsigned CSteppedInputLookahead::queryMaxStepable(ISteppingMeta * callerStepping) const
  381. {
  382. return getNumMatchingFields(inputStepping, callerStepping);
  383. }
  384. void CSteppedInputLookahead::setAlwaysReadExact()
  385. {
  386. //can be used to force reading only exact matches (for the known lowest priority input)
  387. stepFlagsMask |= SSEFreturnMismatches;
  388. }
  389. void CSteppedInputLookahead::setReadAhead(bool value)
  390. {
  391. //This never removes readahead if requested somewhere else, so don't update the mask.
  392. if (value)
  393. stepFlagsValue |= SSEFreadAhead;
  394. else
  395. stepFlagsValue &= ~SSEFreadAhead;
  396. }
  397. void CSteppedInputLookahead::setRestriction(const void * _value, unsigned _num)
  398. {
  399. restrictValue = _value;
  400. numRestrictFields = _num;
  401. }
  402. void CSteppedInputLookahead::resetEOF()
  403. {
  404. if (numRestrictFields == 0)
  405. resetInputEOF();
  406. }
  407. void CSteppedInputLookahead::skip()
  408. {
  409. if (paranoid)
  410. {
  411. if (previousReadAheadRow)
  412. rowAllocator->releaseRow(previousReadAheadRow);
  413. previousReadAheadRow = readAheadRow;
  414. }
  415. else
  416. {
  417. if (readAheadRow)
  418. rowAllocator->releaseRow(readAheadRow);
  419. }
  420. //NB: Don't read ahead until we have to...
  421. readAheadRow = NULL;
  422. readAheadRowIsExactMatch = true;
  423. }
  424. const void * CSteppedInputLookahead::skipnext()
  425. {
  426. skip();
  427. return next();
  428. }
  429. //---------------------------------------------------------------------------
  430. void CUnfilteredSteppedMerger::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext, const bool * matched)
  431. {
  432. merger.setCandidateRow(_equalityRow);
  433. unsigned numInputs = inputArray->ordinality();
  434. for (unsigned i=0; i< numInputs; i++)
  435. {
  436. if (!needToVerifyNext || matched[i])
  437. firstCandidateRows[i] = inputArray->item(i).consume();
  438. else
  439. firstCandidateRows[i] = NULL;
  440. }
  441. merger.primeRows(firstCandidateRows);
  442. }
  443. //---------------------------------------------------------------------------
  444. CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRangeCompare * _stepCompare, ICompare * _equalCompare, CSteppedInputLookahead * _input, unsigned _numEqualFields)
  445. {
  446. allocator = _allocator;
  447. stepCompare = _stepCompare;
  448. equalCompare = _equalCompare;
  449. input = _input;
  450. matched.setown(createBitSet());
  451. numMatched = 0;
  452. readIndex = 0;
  453. numEqualFields = _numEqualFields;
  454. }
  455. CFilteredInputBuffer::~CFilteredInputBuffer()
  456. {
  457. }
  458. const void * CFilteredInputBuffer::consume()
  459. {
  460. if (!rows.isItem(readIndex))
  461. return NULL;
  462. const void * ret = rows.item(readIndex);
  463. rows.replace(NULL, readIndex);
  464. readIndex++;
  465. return ret;
  466. }
  467. const void * CFilteredInputBuffer::consumeGE(const void * seek, unsigned numFields)
  468. {
  469. while (rows.isItem(readIndex))
  470. {
  471. const void * cur = rows.item(readIndex);
  472. if (stepCompare->docompare(cur, seek, numFields) >= 0)
  473. {
  474. rows.replace(NULL, readIndex);
  475. readIndex++;
  476. return cur;
  477. }
  478. readIndex++;
  479. }
  480. return NULL;
  481. }
  482. void CFilteredInputBuffer::fill(const void * equalityRow)
  483. {
  484. const void * next = input->consume();
  485. assertex(next);
  486. append(next);
  487. if (equalityRow)
  488. {
  489. loop
  490. {
  491. bool matches = true;
  492. SmartStepExtra stepExtra(SSEFreturnMismatches, NULL);
  493. const void * next = input->nextGE(equalityRow, numEqualFields, matches, stepExtra);
  494. if (!next || !matches || equalCompare->docompare(equalityRow, next) != 0)
  495. break;
  496. append(input->consume());
  497. }
  498. }
  499. else
  500. {
  501. loop
  502. {
  503. const void * next = input->consume();
  504. if (!next)
  505. break;
  506. append(next);
  507. }
  508. }
  509. }
  510. void CFilteredInputBuffer::removeMatched()
  511. {
  512. ForEachItemInRev(i, rows)
  513. {
  514. if (isMatched(i))
  515. remove(i);
  516. }
  517. }
  518. void CFilteredInputBuffer::removeUnmatched()
  519. {
  520. ForEachItemInRev(i, rows)
  521. {
  522. if (!isMatched(i))
  523. remove(i);
  524. }
  525. }
  526. void CFilteredInputBuffer::remove(unsigned i)
  527. {
  528. const void * row = rows.item(i);
  529. rows.remove(i);
  530. allocator->releaseRow(row);
  531. }
  532. void CFilteredInputBuffer::reset()
  533. {
  534. ForEachItemIn(i, rows)
  535. {
  536. const void * cur = rows.item(i);
  537. if (cur)
  538. allocator->releaseRow(cur);
  539. }
  540. rows.kill();
  541. matched->reset();
  542. numMatched = 0;
  543. readIndex = 0;
  544. }
  545. CFilteredSteppedMerger::CFilteredSteppedMerger()
  546. {
  547. matches = NULL;
  548. joinKind = 0;
  549. numInputs = 0;
  550. equalCompare = NULL;
  551. extraCompare = NULL;
  552. globalCompare = NULL;
  553. minMatches = 0;
  554. maxMatches = 0;
  555. fullyMatchedLevel = 0;
  556. }
  557. CFilteredSteppedMerger::~CFilteredSteppedMerger()
  558. {
  559. delete [] matches;
  560. }
  561. void CFilteredSteppedMerger::init(IEngineRowAllocator * _allocator, IHThorNWayMergeJoinArg & helper, CSteppedInputLookaheadArray * inputArray)
  562. {
  563. unsigned flags = helper.getJoinFlags();
  564. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  565. numInputs = inputArray->ordinality();
  566. matches = new const void * [numInputs];
  567. equalCompare = helper.queryEqualCompare();
  568. extraCompare = helper.queryNonSteppedCompare();
  569. globalCompare = NULL;
  570. unsigned numEqualFields = helper.numEqualFields();
  571. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  572. globalCompare = helper.queryGlobalCompare();
  573. if (joinKind == IHThorNWayMergeJoinArg::MJFmofn)
  574. {
  575. minMatches = helper.getMinMatches();
  576. maxMatches = helper.getMaxMatches();
  577. }
  578. else
  579. {
  580. minMatches = numInputs;
  581. maxMatches = numInputs;
  582. }
  583. IRangeCompare * stepCompare = helper.querySteppingMeta()->queryCompare();
  584. ForEachItemIn(i, *inputArray)
  585. inputs.append(*new CFilteredInputBuffer(_allocator, stepCompare, equalCompare, &inputArray->item(i), numEqualFields));
  586. merger.init(_allocator, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  587. merger.initInputs(&inputs);
  588. }
  589. //ISteppedJoinRowGenerator
  590. void CFilteredSteppedMerger::beforeProcessCandidates(const void * equalityRow, bool needToVerifyNext, const bool * matched)
  591. {
  592. //Exaustively read from each of the inputs into each of the buffers
  593. ForEachItemIn(i, inputs)
  594. {
  595. if (!needToVerifyNext || matched[i])
  596. inputs.item(i).fill(equalityRow);
  597. }
  598. postFilterRows();
  599. //No point priming the rows here - will be just as efficient to use the default action
  600. }
  601. void CFilteredSteppedMerger::afterProcessCandidates()
  602. {
  603. ForEachItemIn(i, inputs)
  604. inputs.item(i).reset();
  605. merger.reset();
  606. }
  607. void CFilteredSteppedMerger::cleanupAllCandidates()
  608. {
  609. merger.reset(); // not strictly necessary...
  610. }
  611. void CFilteredSteppedMerger::afterProcessingAll()
  612. {
  613. merger.cleanup();
  614. }
  615. const void * CFilteredSteppedMerger::nextOutputRow()
  616. {
  617. return merger.nextRow();
  618. }
  619. const void * CFilteredSteppedMerger::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  620. {
  621. return merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  622. }
  623. bool CFilteredSteppedMerger::tagMatches(unsigned level, unsigned numRows)
  624. {
  625. CFilteredInputBuffer & right = inputs.item(level);
  626. unsigned maxLevel = inputs.ordinality()-1;
  627. ConstPointerArray & curRows = right.rows;
  628. if (curRows.ordinality())
  629. {
  630. bool valid = false;
  631. const void * lhs = matches[numRows-1];
  632. ForEachItemIn(i, curRows)
  633. {
  634. //If we have had a match at this level, and this item is already matched,
  635. //and all levels higher than this have already been completely matched),
  636. //then no need to check this item (and its children) again, since it won't change anything.
  637. bool alreadyMatched = right.isMatched(i);
  638. if (!valid || level + 1 < fullyMatchedLevel || !alreadyMatched)
  639. {
  640. const void * rhs = curRows.item(i);
  641. unsigned matchedRows = numRows;
  642. bool recurse;
  643. if (!extraCompare || extraCompare->match(lhs, rhs))
  644. {
  645. matches[matchedRows++] = rhs;
  646. recurse = matchedRows <= maxMatches;
  647. }
  648. else
  649. {
  650. //for mofn, check enough levels left to create a potential match, for others it will fail.
  651. unsigned remain = maxLevel-level;
  652. recurse = (numRows + remain >= minMatches);
  653. }
  654. if (recurse)
  655. {
  656. bool isFullMatch;
  657. if (level == maxLevel)
  658. isFullMatch = (!globalCompare || globalCompare->match(matchedRows, matches));
  659. else
  660. isFullMatch = tagMatches(level+1, matchedRows);
  661. if (isFullMatch)
  662. {
  663. valid = true;
  664. if (!alreadyMatched)
  665. right.noteMatch(i);
  666. //If the previous level is fully matched, and so is this one - then update the minimum fully matched level
  667. if ((level + 1 == fullyMatchedLevel) && right.isFullyMatched())
  668. fullyMatchedLevel--;
  669. //If all rows in this level and above are fully matched, then iterating any further will have no effect.
  670. //Could potentially reduce a O(N^m) to O(mN) if the majority of elements match.
  671. if (level >= fullyMatchedLevel)
  672. break;
  673. }
  674. }
  675. }
  676. }
  677. return valid;
  678. }
  679. else
  680. {
  681. //mofn may still be ok with a skipped level or two
  682. unsigned remain = maxLevel-level;
  683. if (numRows + remain >= minMatches)
  684. {
  685. if (level == maxLevel)
  686. return (!globalCompare || globalCompare->match(numRows, matches));
  687. else
  688. return tagMatches(level+1, numRows);
  689. }
  690. return false;
  691. }
  692. }
  693. void CFilteredSteppedMerger::tagMatches()
  694. {
  695. unsigned numInputs = inputs.ordinality();
  696. fullyMatchedLevel = numInputs;
  697. //for m of n, need to start matching at levels 0,1,.. numLevels - minMatches
  698. unsigned iterateLevels = numInputs - minMatches;
  699. for (unsigned level =0; level <= iterateLevels; level++)
  700. {
  701. CFilteredInputBuffer & left = inputs.item(level);
  702. ForEachItemIn(i, left.rows)
  703. {
  704. matches[0] = left.rows.item(i);
  705. bool thisMatched;
  706. //mofn(1) may not have another level, to just check global compare.
  707. if (level == numInputs-1)
  708. thisMatched = (!globalCompare || globalCompare->match(1, matches));
  709. else
  710. thisMatched = tagMatches(level+1, 1);
  711. if (thisMatched)
  712. {
  713. if (!left.isMatched(i))
  714. left.noteMatch(i);
  715. //Check if this level, and all above are now fully matched. If so, we're done.
  716. if ((level + 1 == fullyMatchedLevel) && left.isFullyMatched())
  717. {
  718. fullyMatchedLevel--;
  719. break;
  720. }
  721. }
  722. }
  723. if (level >= fullyMatchedLevel)
  724. break;
  725. }
  726. }
  727. void CFilteredSteppedMerger::postFilterRows()
  728. {
  729. tagMatches();
  730. unsigned max = inputs.ordinality();
  731. switch (joinKind)
  732. {
  733. case IHThorNWayMergeJoinArg::MJFinner:
  734. case IHThorNWayMergeJoinArg::MJFmofn:
  735. {
  736. for (unsigned i=0; i < max; i++)
  737. inputs.item(i).removeUnmatched();
  738. break;
  739. }
  740. case IHThorNWayMergeJoinArg::MJFleftouter:
  741. {
  742. for (unsigned i=1; i < max; i++)
  743. inputs.item(i).removeUnmatched();
  744. break;
  745. }
  746. case IHThorNWayMergeJoinArg::MJFleftonly:
  747. {
  748. inputs.item(0).removeMatched();
  749. unsigned max = inputs.ordinality();
  750. for (unsigned i=1; i < max; i++)
  751. inputs.item(i).reset();
  752. break;
  753. }
  754. }
  755. }
  756. //---------------------------------------------------------------------------
  757. CMergeJoinProcessor::CMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : helper(_arg)
  758. {
  759. mergeSteppingMeta = helper.querySteppingMeta();
  760. assertex(mergeSteppingMeta);
  761. stepCompare = mergeSteppingMeta->queryCompare();
  762. equalCompare = helper.queryEqualCompare();
  763. equalCompareEq = helper.queryEqualCompareEq();
  764. numEqualFields = helper.numEqualFields();
  765. flags = helper.getJoinFlags();
  766. matched = NULL;
  767. candidateEqualityRow = NULL;
  768. numExternalEqualFields = 0;
  769. conjunctionOptimizer = NULL;
  770. tempSeekBuffer = NULL;
  771. lowestSeekRow = NULL;
  772. combineConjunctions = true;
  773. allInputsAreOuterInputs = false;
  774. maxSeekRecordSize = 0;
  775. numInputs = 0;
  776. eof = true;
  777. assertex(helper.numOrderFields() == mergeSteppingMeta->getNumFields());
  778. bool hasPostfilter = false;
  779. thisSteppingMeta.init(mergeSteppingMeta->getNumFields(), mergeSteppingMeta->queryFields(), stepCompare, mergeSteppingMeta->queryDistance(), hasPostfilter);
  780. }
  781. CMergeJoinProcessor::~CMergeJoinProcessor()
  782. {
  783. afterProcessing();
  784. }
  785. void CMergeJoinProcessor::addInput(ISteppedInput * _input)
  786. {
  787. IInputSteppingMeta * _meta = _input->queryInputSteppingMeta();
  788. verifySteppingCompatible(_meta, mergeSteppingMeta);
  789. rawInputs.append(*LINK(_input));
  790. }
  791. void CMergeJoinProcessor::afterProcessing()
  792. {
  793. cleanupCandidates();
  794. if (outputProcessor)
  795. {
  796. outputProcessor->afterProcessingAll();
  797. outputProcessor.clear();
  798. }
  799. if (conjunctionOptimizer)
  800. {
  801. conjunctionOptimizer->afterProcessing();
  802. delete conjunctionOptimizer;
  803. conjunctionOptimizer = NULL;
  804. }
  805. delete [] matched;
  806. matched = NULL;
  807. inputs.kill();
  808. rawInputs.kill();
  809. orderedInputs.kill();
  810. if (lowestSeekRow)
  811. {
  812. inputAllocator->releaseRow(lowestSeekRow);
  813. lowestSeekRow = NULL;
  814. }
  815. if (tempSeekBuffer)
  816. {
  817. inputAllocator->releaseRow(tempSeekBuffer);
  818. tempSeekBuffer = NULL;
  819. }
  820. //Now free the allocators
  821. inputAllocator.clear();
  822. outputAllocator.clear();
  823. maxSeekRecordSize = 0;
  824. }
  825. void CMergeJoinProcessor::createTempSeekBuffer()
  826. {
  827. tempSeekBuffer = inputAllocator->createRow();
  828. #ifdef _DEBUG
  829. //Clear the complete tempSeekBBuffer record, so that toXML() can be used to trace the seek row in roxie
  830. if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
  831. {
  832. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, inputAllocator->queryOutputMeta()->getMinRecordSize());
  833. helper.createLowInputRow(rowBuilder);
  834. }
  835. #endif
  836. }
  837. void CMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  838. {
  839. inputAllocator.set(_inputAllocator);
  840. outputAllocator.set(_outputAllocator);
  841. //The seek components must all be fixed width, so the seek record size must be <= the minimum size of the input record
  842. maxSeekRecordSize = inputAllocator->queryOutputMeta()->getMinRecordSize();
  843. bool paranoid = (flags & IHThorNWayMergeJoinArg::MJFassertsorted) != 0;
  844. ForEachItemIn(i1, rawInputs)
  845. {
  846. ISteppedInput & cur = rawInputs.item(i1);
  847. inputs.append(* new CSteppedInputLookahead(&cur, cur.queryInputSteppingMeta(), inputAllocator, stepCompare, paranoid));
  848. }
  849. if (flags & IHThorNWayMergeJoinArg::MJFhasclearlow)
  850. {
  851. RtlDynamicRowBuilder rowBuilder(inputAllocator);
  852. size32_t size = helper.createLowInputRow(rowBuilder);
  853. lowestSeekRow = rowBuilder.finalizeRowClear(size);
  854. }
  855. cleanupCandidates();
  856. eof = false;
  857. numInputs = inputs.ordinality();
  858. matched = new bool[numInputs];
  859. if (numInputs == 0)
  860. eof = true;
  861. //Sort the inputs by the preferred processing order (if provided), ensuring no duplicates
  862. clearMatches();
  863. ForEachItemIn(i2, searchOrder)
  864. {
  865. unsigned next = searchOrder.item(i2);
  866. if (next < numInputs && !matched[next])
  867. {
  868. orderedInputs.append(OLINK(inputs.item(next)));
  869. matched[next] = true;
  870. }
  871. }
  872. //MORE: We really should move the most-stepable inputs to the start
  873. for (unsigned i3 = 0; i3 < numInputs; i3++)
  874. {
  875. if (!matched[i3])
  876. orderedInputs.append(OLINK(inputs.item(i3)));
  877. }
  878. }
  879. bool CMergeJoinProcessor::createConjunctionOptimizer()
  880. {
  881. if (inputs.ordinality())
  882. {
  883. conjunctionOptimizer = new CSteppedConjunctionOptimizer(inputAllocator, helper, this);
  884. if (gatherConjunctions(*conjunctionOptimizer) && conjunctionOptimizer->worthCombining())
  885. {
  886. conjunctionOptimizer->beforeProcessing();
  887. return true;
  888. }
  889. delete conjunctionOptimizer;
  890. conjunctionOptimizer = NULL;
  891. }
  892. combineConjunctions = false;
  893. return false;
  894. }
  895. void CMergeJoinProcessor::createMerger()
  896. {
  897. ICompareEq * extraCompare = helper.queryNonSteppedCompare();
  898. bool hasGlobalCompare = (flags & IHThorNWayMergeJoinArg::MJFglobalcompare) != 0;
  899. if (!extraCompare && !hasGlobalCompare)
  900. {
  901. Owned<CUnfilteredSteppedMerger> simpleMerger = new CUnfilteredSteppedMerger(numEqualFields);
  902. simpleMerger->init(inputAllocator, equalCompare, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  903. simpleMerger->initInputs(&inputs);
  904. outputProcessor.setown(simpleMerger.getClear());
  905. }
  906. else
  907. {
  908. Owned<CFilteredSteppedMerger> simpleMerger = new CFilteredSteppedMerger();
  909. simpleMerger->init(inputAllocator, helper, &inputs);
  910. outputProcessor.setown(simpleMerger.getClear());
  911. }
  912. }
  913. void CMergeJoinProcessor::createEqualityJoinProcessor()
  914. {
  915. if (numEqualFields >= helper.numOrderFields())
  916. outputProcessor.setown(new CEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  917. else
  918. outputProcessor.setown(new CSortedEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  919. }
  920. void CMergeJoinProcessor::finishCandidates()
  921. {
  922. if (outputProcessor)
  923. outputProcessor->afterProcessCandidates();
  924. assertex(hasCandidates());
  925. inputAllocator->releaseRow(candidateEqualityRow);
  926. candidateEqualityRow = NULL;
  927. }
  928. bool CMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  929. {
  930. allInputsAreOuterInputs = true;
  931. ForEachItemIn(i, inputs)
  932. {
  933. CSteppedInputLookahead & cur = inputs.item(i);
  934. if (!cur.gatherConjunctions(collector))
  935. collector.addInput(cur);
  936. else
  937. {
  938. collector.addPseudoInput(cur);
  939. allInputsAreOuterInputs = false;
  940. }
  941. }
  942. collector.addJoin(*this);
  943. return true;
  944. }
  945. const void * CMergeJoinProcessor::nextInputRow()
  946. {
  947. if (!hasCandidates() && !findCandidates(NULL, 0))
  948. return NULL;
  949. loop
  950. {
  951. const void * next = nextCandidate();
  952. if (next)
  953. return next;
  954. finishCandidates();
  955. //Abort early if externally optimized, and not proximity (since they may not have read all records for this equality)
  956. if ((numEqualFields == numExternalEqualFields) && candidatesExhaustEquality())
  957. return NULL;
  958. if (!findCandidates(NULL, 0))
  959. return NULL;
  960. }
  961. }
  962. const void * CMergeJoinProcessor::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  963. {
  964. //First check the next row from the candidates, it may be ok.
  965. if (hasCandidates())
  966. {
  967. unsigned compareFields = numFields < numEqualFields ? numFields : numEqualFields;
  968. //check whether the candidates could possibly return the match
  969. if (stepCompare->docompare(candidateEqualityRow, seek, compareFields) == 0)
  970. {
  971. const void * next = nextCandidateGE(seek, numFields, wasCompleteMatch, stepExtra);
  972. if (next)
  973. return next; // note must match equality to have been returned.
  974. }
  975. finishCandidates();
  976. }
  977. if (!findCandidates(seek, numFields))
  978. return NULL;
  979. return nextInputRow();
  980. }
  981. void CMergeJoinProcessor::resetEOF()
  982. {
  983. ForEachItemIn(i, inputs)
  984. inputs.item(i).resetEOF();
  985. }
  986. void CMergeJoinProcessor::queryResetEOF()
  987. {
  988. resetEOF();
  989. }
  990. const void * CMergeJoinProcessor::nextInGroup()
  991. {
  992. if (conjunctionOptimizer)
  993. return conjunctionOptimizer->next();
  994. if (combineConjunctions)
  995. {
  996. if (numExternalEqualFields == 0)
  997. {
  998. if (createConjunctionOptimizer())
  999. return conjunctionOptimizer->next();
  1000. }
  1001. else
  1002. combineConjunctions = false; // being used inside a conjunction optimizer => don't create another..
  1003. }
  1004. return nextInputRow();
  1005. }
  1006. const void * CMergeJoinProcessor::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  1007. {
  1008. if (conjunctionOptimizer)
  1009. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  1010. if (combineConjunctions)
  1011. {
  1012. if (createConjunctionOptimizer())
  1013. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  1014. }
  1015. return nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  1016. }
  1017. void CMergeJoinProcessor::startRestrictedJoin(const void * equalityRow, unsigned numEqualityFields)
  1018. {
  1019. assertex(numExternalEqualFields == 0);
  1020. numExternalEqualFields = numEqualityFields;
  1021. eof = false;
  1022. }
  1023. void CMergeJoinProcessor::stopRestrictedJoin()
  1024. {
  1025. numExternalEqualFields = 0;
  1026. if (hasCandidates())
  1027. finishCandidates();
  1028. //There are no more matches for this (outer) equality condition, so all active rows need to be thrown away.
  1029. if (outputProcessor)
  1030. outputProcessor->cleanupAllCandidates();
  1031. }
  1032. void CMergeJoinProcessor::setCandidateRow(const void * row, bool inputsMayBeEmpty, const bool * matched)
  1033. {
  1034. candidateEqualityRow = inputAllocator->linkRow(row);
  1035. const void * restrictionRow = (numEqualFields == numExternalEqualFields) ? NULL : candidateEqualityRow;
  1036. outputProcessor->beforeProcessCandidates(restrictionRow, inputsMayBeEmpty, matched);
  1037. }
  1038. //---------------------------------------------------------------------------
  1039. CAndMergeJoinProcessor::CAndMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1040. {
  1041. }
  1042. void CAndMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1043. {
  1044. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1045. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1046. createEqualityJoinProcessor();
  1047. else
  1048. createMerger();
  1049. }
  1050. bool CAndMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1051. {
  1052. if (eof)
  1053. return false;
  1054. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1055. const void * equalValue;
  1056. unsigned firstInput = 0;
  1057. if (inputsMustMatchEquality && allInputsAreOuterInputs)
  1058. {
  1059. //special case - all inputs are already advanced to the correct place, so just start generating candidates
  1060. //for nested conjunctions they may already be exausted though
  1061. equalValue = orderedInputs.item(firstInput).next();
  1062. if (!equalValue)
  1063. {
  1064. eof = true;
  1065. return false;
  1066. }
  1067. }
  1068. else
  1069. {
  1070. if (!seekValue)
  1071. {
  1072. numSeekFields = numEqualFields;
  1073. seekValue = lowestSeekRow;
  1074. }
  1075. bool matchedCompletely = true;
  1076. equalValue = orderedInputs.item(firstInput).next(seekValue, numSeekFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1077. if (!equalValue)
  1078. {
  1079. eof = true;
  1080. return false;
  1081. }
  1082. PreservedRow savedRow(inputAllocator);
  1083. unsigned matchCount = 0;
  1084. clearMatches();
  1085. if (matchedCompletely)
  1086. {
  1087. matched[firstInput] = true;
  1088. matchCount++;
  1089. }
  1090. else
  1091. {
  1092. equalValue = orderedInputs.item(firstInput).consume();
  1093. savedRow.setown(equalValue);
  1094. }
  1095. unsigned lastInput = firstInput;
  1096. while (matchCount != numInputs)
  1097. {
  1098. unsigned nextInput = nextToMatch(lastInput);
  1099. lastInput = nextInput;
  1100. bool matchedCompletely = true;
  1101. const void * nextRow = orderedInputs.item(nextInput).nextGE(equalValue, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1102. if (!nextRow)
  1103. {
  1104. eof = true;
  1105. return false;
  1106. }
  1107. #ifdef CHECK_CONSISTENCY
  1108. if (inputsMustMatchEquality)
  1109. {
  1110. if (equalCompare->docompare(nextRow, equalValue) != 0)
  1111. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1112. }
  1113. else
  1114. {
  1115. if (equalCompare->docompare(nextRow, equalValue) < 0)
  1116. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1117. }
  1118. #endif
  1119. if (!inputsMustMatchEquality)
  1120. {
  1121. if (!equalCompareEq->match(nextRow, equalValue))
  1122. {
  1123. //value didn't match => skip all the previously matched entries.
  1124. for (unsigned i=0; i < numInputs; i++)
  1125. {
  1126. if (matched[i])
  1127. {
  1128. matched[i] = false;
  1129. orderedInputs.item(i).skip();
  1130. if (--matchCount == 0)
  1131. break;
  1132. }
  1133. }
  1134. if (!matchedCompletely)
  1135. {
  1136. //Need to preserve nextRow, otherwise it will be gone after we skip
  1137. equalValue = orderedInputs.item(nextInput).consume();
  1138. savedRow.setown(equalValue);
  1139. }
  1140. else
  1141. equalValue = nextRow;
  1142. }
  1143. }
  1144. if (matchedCompletely)
  1145. {
  1146. matched[nextInput] = true;
  1147. matchCount++;
  1148. }
  1149. }
  1150. }
  1151. //Set up the mergeProcessor with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
  1152. //with the candidate.
  1153. //clone one of the rows
  1154. setCandidateRow(equalValue, false, NULL);
  1155. return true;
  1156. }
  1157. unsigned CAndMergeJoinProcessor::nextToMatch(unsigned lastInput) const
  1158. {
  1159. for (unsigned i=0; i < numInputs; i++)
  1160. {
  1161. //Don't seek on the last input again (it may have found a keyed match, but not matched the post filter)
  1162. if ((i != lastInput) && !matched[i])
  1163. return i;
  1164. }
  1165. throwUnexpected();
  1166. }
  1167. //---------------------------------------------------------------------------
  1168. CAndLeftMergeJoinProcessor::CAndLeftMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1169. {
  1170. combineConjunctions = false; // No advantage using this as the base of a combined conjunction
  1171. isLeftOnly = (flags & IHThorNWayMergeJoinArg::MJFkindmask) == IHThorNWayMergeJoinArg::MJFleftonly;
  1172. //Left only with a not stepped comparison needs to be done as a left outer at the stepping level
  1173. if (isLeftOnly && (helper.queryNonSteppedCompare() || helper.queryGlobalCompare()))
  1174. isLeftOnly = false;
  1175. }
  1176. void CAndLeftMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1177. {
  1178. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1179. createTempSeekBuffer();
  1180. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1181. createEqualityJoinProcessor();
  1182. else
  1183. createMerger();
  1184. }
  1185. bool CAndLeftMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1186. {
  1187. if (eof)
  1188. return false;
  1189. CSteppedInputLookahead & input0 = inputs.item(0);
  1190. bool wasMatched = true;
  1191. const void * lhs = input0.next(seekValue, numSeekFields, wasMatched, nonBufferedMatchStepExtra);
  1192. assertex(wasMatched);
  1193. if (!lhs)
  1194. {
  1195. eof = true;
  1196. return false;
  1197. }
  1198. unsigned matchCount = 1;
  1199. while (matchCount != numInputs)
  1200. {
  1201. bool matchedCompletely = true; // we don't care what the next rhs value is - as long as it can't match the left
  1202. const void * rhs = orderedInputs.item(matchCount).nextGE(lhs, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1203. if (rhs)
  1204. {
  1205. int c = equalCompare->docompare(rhs, lhs);
  1206. if (c < 0)
  1207. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1208. if (c == 0)
  1209. {
  1210. assertex(matchedCompletely);
  1211. //previously the (matchCount+1) test wasn't here, so it aborted as soon as there was any match.
  1212. if (isLeftOnly && (matchCount+1 == numInputs))
  1213. {
  1214. if (numEqualFields == numExternalEqualFields)
  1215. {
  1216. //I think this is worth doing here...
  1217. //Skip input0 to a mismatch value, so the optimizer doesn't waste time reading extra equalities
  1218. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1219. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1220. input0.skip(); // invalidates lhs
  1221. if (calculatedNextSeek)
  1222. {
  1223. bool wasMatched = true;
  1224. input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1225. }
  1226. eof = true;
  1227. return false;
  1228. }
  1229. //Create the next join value if that is possible
  1230. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1231. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1232. input0.skip(); // invalidates lhs
  1233. bool wasMatched = true;
  1234. if (calculatedNextSeek)
  1235. lhs = input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1236. else
  1237. lhs = input0.next();
  1238. if (!lhs)
  1239. {
  1240. eof = true;
  1241. return false;
  1242. }
  1243. matchCount = 0; //incremented at tail of loop
  1244. }
  1245. }
  1246. else
  1247. break;
  1248. }
  1249. else
  1250. break;
  1251. matchCount++;
  1252. }
  1253. clearMatches();
  1254. matched[0] = true;
  1255. if (matchCount != numInputs)
  1256. {
  1257. //Failed to match completely => generate a match for just the left. Skip any matched rows so far and break out.
  1258. for (unsigned i=1; i < matchCount; i++)
  1259. orderedInputs.item(i).skip();
  1260. matchCount = 1;
  1261. }
  1262. else
  1263. {
  1264. for (unsigned i=1; i < numInputs; i++)
  1265. matched[i] = true;
  1266. }
  1267. //LEFT ONLY will only merge 1 stream, LEFT OUTER will merge as many as match LEFT
  1268. setCandidateRow(lhs, true, matched);
  1269. return true;
  1270. }
  1271. bool CAndLeftMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1272. {
  1273. CSteppedInputLookahead & cur = inputs.item(0);
  1274. if (!cur.gatherConjunctions(collector))
  1275. collector.addInput(cur);
  1276. collector.addJoin(*this);
  1277. return true;
  1278. }
  1279. //---------------------------------------------------------------------------
  1280. void BestMatchManager::associate(unsigned input, const void * value)
  1281. {
  1282. unsigned curIndex = 0;
  1283. while (curIndex != numEntries)
  1284. {
  1285. BestMatchItem & cur = matches.item(curIndex);
  1286. int c = compare->docompare(value, cur.value);
  1287. if (c <= 0)
  1288. {
  1289. if (c == 0)
  1290. {
  1291. //insert at the end of the duplicates
  1292. curIndex += cur.duplicates;
  1293. cur.duplicates++;
  1294. }
  1295. //Move a record at the end of the list to the correct position, ready for updating.
  1296. if (curIndex != numEntries)
  1297. matches.rotateR(curIndex, numEntries);
  1298. break; // now go and modify record at position curIndex
  1299. }
  1300. curIndex += cur.duplicates;
  1301. }
  1302. assertex(matches.isItem(curIndex));
  1303. BestMatchItem & inserted = matches.item(curIndex);
  1304. inserted.duplicates = 1;
  1305. inserted.value = value;
  1306. inserted.input = input;
  1307. numEntries++;
  1308. return;
  1309. }
  1310. unsigned BestMatchManager::getValueOffset(unsigned idx) const
  1311. {
  1312. unsigned offset = 0;
  1313. while (idx--)
  1314. offset += matches.item(offset).duplicates;
  1315. return offset;
  1316. }
  1317. void BestMatchManager::init(ICompare * _compare, unsigned numInputs)
  1318. {
  1319. compare = _compare;
  1320. numEntries = 0;
  1321. for (unsigned i=0; i < numInputs; i++)
  1322. matches.append(* new BestMatchItem);
  1323. }
  1324. void BestMatchManager::kill()
  1325. {
  1326. matches.kill();
  1327. }
  1328. unsigned BestMatchManager::getInput(unsigned whichValue, unsigned inputIndex) const
  1329. {
  1330. return matches.item(getValueOffset(whichValue) + inputIndex).input;
  1331. }
  1332. unsigned BestMatchManager::getInput0(unsigned inputIndex) const
  1333. {
  1334. return matches.item(inputIndex).input;
  1335. }
  1336. unsigned BestMatchManager::numInputs(unsigned whichValue) const
  1337. {
  1338. return matches.item(getValueOffset(whichValue)).duplicates;
  1339. }
  1340. void BestMatchManager::remove(unsigned whichValue)
  1341. {
  1342. unsigned offset = getValueOffset(whichValue);
  1343. unsigned duplicates = matches.item(offset).duplicates;
  1344. matches.rotateLN(offset, numEntries-1, duplicates);
  1345. numEntries -= duplicates;
  1346. }
  1347. const void * BestMatchManager::queryValue(unsigned whichValue) const
  1348. {
  1349. return matches.item(getValueOffset(whichValue)).value;
  1350. }
  1351. //---------------------------------------------------------------------------
  1352. CMofNMergeJoinProcessor::CMofNMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1353. {
  1354. combineConjunctions = false;
  1355. alive = NULL;
  1356. candidateMask = NULL;
  1357. minMatches = 0;
  1358. maxMatches = 0;
  1359. numActive = 0;
  1360. }
  1361. void CMofNMergeJoinProcessor::afterProcessing()
  1362. {
  1363. delete [] alive;
  1364. delete [] candidateMask;
  1365. alive = NULL;
  1366. candidateMask = NULL;
  1367. matches.kill();
  1368. CMergeJoinProcessor::afterProcessing();
  1369. }
  1370. void CMofNMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1371. {
  1372. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1373. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1374. createEqualityJoinProcessor();
  1375. else
  1376. createMerger();
  1377. minMatches = helper.getMinMatches();
  1378. maxMatches = helper.getMaxMatches();
  1379. if (minMatches == 0)
  1380. throw MakeStringException(99, "Need a non-zero minimum number of matches");
  1381. alive = new bool [numInputs];
  1382. candidateMask = new bool [numInputs];
  1383. for (unsigned i= 0; i < numInputs; i++)
  1384. alive[i] = true;
  1385. numActive = numInputs;
  1386. matches.init(equalCompare, numInputs);
  1387. }
  1388. bool CMofNMergeJoinProcessor::findCandidates(const void * originalSeekValue, unsigned numOriginalSeekFields)
  1389. {
  1390. if (numActive < minMatches)
  1391. return false;
  1392. unsigned numFreeToMismatch = numActive - minMatches;
  1393. const void * seekValue = originalSeekValue;
  1394. unsigned numSeekFields = numOriginalSeekFields;
  1395. //This should be true, because after candidates are matched their values are removed.
  1396. assertex(matches.numInputs() <= numFreeToMismatch); //
  1397. //MORE: This needs rewriting, so that mismatches are handled coorectly. In particular,
  1398. while (matches.numInputs() < numActive)
  1399. {
  1400. unsigned nextInput = nextToMatch();
  1401. bool matchedCompletely = true;
  1402. // MORE: This needs rewriting, so that mismatches are handled coorectly. In particular, the matches need to retain information about whether
  1403. // they matched fully, since that will optimize where could be sought next.
  1404. const void * value = inputs.item(nextInput).next(seekValue, numSeekFields, matchedCompletely, nonBufferedMatchStepExtra);
  1405. //NOTE: matchedCompletely is currently always true. More work is needed if not true.
  1406. assertex(matchedCompletely);
  1407. if (value)
  1408. {
  1409. if (matchedCompletely)
  1410. {
  1411. matched[nextInput] = true;
  1412. matches.associate(nextInput, value);
  1413. }
  1414. }
  1415. else
  1416. {
  1417. alive[nextInput] = false;
  1418. numActive--;
  1419. numFreeToMismatch--;
  1420. if (numActive < minMatches)
  1421. return false;
  1422. }
  1423. unsigned matchCount = matches.numInputs();
  1424. if (matchCount > numFreeToMismatch)
  1425. {
  1426. unsigned numMatch0 = matches.numInputs0();
  1427. if ((matchCount - numMatch0 > numFreeToMismatch) || (numMatch0 > maxMatches))
  1428. {
  1429. //clear seekValue, because seek value won't be valid any more after the skips - may be updated later.
  1430. seekValue = originalSeekValue;
  1431. numSeekFields = numOriginalSeekFields;
  1432. //No way that the first element is going to match, so remove all inputs associated with it.
  1433. for (unsigned i= 0; i < numMatch0; i++)
  1434. {
  1435. unsigned input = matches.getInput0(i);
  1436. inputs.item(input).skip();
  1437. matched[input] = false;
  1438. }
  1439. matches.remove(0);
  1440. }
  1441. //Lowest element now provides the best seek position.
  1442. if (matches.numInputs() > numFreeToMismatch)
  1443. {
  1444. seekValue = matches.queryValue(0);
  1445. numSeekFields = numEqualFields;
  1446. }
  1447. }
  1448. }
  1449. //matches(0) contains the next match set, set a set of flags indicating which inputs to use
  1450. unsigned numMatches = matches.numInputs0();
  1451. for (unsigned i1=0; i1< numInputs; i1++)
  1452. candidateMask[i1] = false;
  1453. for (unsigned i2=0; i2 < numMatches; i2++)
  1454. candidateMask[matches.getInput0(i2)] = true;
  1455. setCandidateRow(matches.queryValue(0), true, candidateMask);
  1456. //Now cleanup housekeeping, so that findCandidates() is ready to find the next block
  1457. for (unsigned i3=0; i3 < numInputs; i3++)
  1458. if (candidateMask[i3])
  1459. matched[i3] = false;
  1460. matches.remove(0);
  1461. return true;
  1462. }
  1463. bool CMofNMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1464. {
  1465. //MORE: We may need to create pseudo inputs in order to process these optimially.
  1466. return false;
  1467. }
  1468. unsigned CMofNMergeJoinProcessor::nextToMatch() const
  1469. {
  1470. for (unsigned i= 0; i < numInputs; i++)
  1471. {
  1472. if (alive[i] && !matched[i])
  1473. return i;
  1474. }
  1475. throwUnexpected();
  1476. }
  1477. //---------------------------------------------------------------------------
  1478. /*
  1479. NOTES on the distances... this is far from simple once you get arbitrary trees involved.
  1480. given a join expression right.x between left.x - a and left.x + b i.e. up to a Before and b after
  1481. a is maxRightBeforeLeft
  1482. b is maxLeftBeforeRight
  1483. We define a function D(x,y) which is the maximum value which can be deducted from row a, to provide a valid value for row b
  1484. Given a tree of join expressions, we can calculate a distance function between any pair of inputs.
  1485. J1(a,b,c) = D(i,i+1)=4, D(i+1,i) = 10
  1486. J2(d, e) = D(i,i+1)=-1, D(i+1,i) = 12
  1487. J3(J1, J2) = D(i,i+1)=0 D(i+1,i) = 5
  1488. =>
  1489. For each join we define
  1490. D(i, lowest) - maximum value to deduct from row i to obtain the lowest
  1491. D(highest, i) - maximum value to deduct from highest to obtain row i
  1492. by definition these must both be >= 0, for a simple input they are both 0
  1493. A join's extend is given by
  1494. D(highest,lowest) = max(D(highest,i)+D(i,lowest))
  1495. We're only interested in the maximum values, which are obtained by the maximum distances between the elements. The lowest and highest memebers
  1496. of the group are going to be the ends. So we use the maximum of those distances to work out D(i,low) and D(high, i), being careful to only use
  1497. the range if it is valid (e.g., the end must be possible to be the highest/lowest)
  1498. D(a,b) = 4, D(b,a) = 10
  1499. D(b,c) = 4, D(c,b) = 10
  1500. D(a,c) = 8, D(c,a) = 20
  1501. D(d,e) = -1, D(e,d) = 12
  1502. D(a, lowest) = 8 D(highest, a) = 20
  1503. D(b, lowest) = 10 D(highest, b) = 10
  1504. D(c, lowest) = 20 D(highest, c) = 8
  1505. D(highest, lowest) = 28
  1506. Then assuming the left is the highest and the right is the lowest we have
  1507. D(a,e) = DJ1(a, lowest) + DJ3(i,i+1) + DJ2(highest, e)
  1508. = 8 + 0 + 0
  1509. For >2 terms, you also need to take into account the size of a term given by D(highest,lowest)
  1510. */
  1511. inline unsigned __int64 adjustRangeValue(unsigned __int64 rangeValue, __int64 delta)
  1512. {
  1513. if ((delta >= 0) || (rangeValue > (unsigned __int64)-delta))
  1514. return rangeValue + delta;
  1515. return 0;
  1516. }
  1517. //---------------------------------------------------------------------------
  1518. //This class is created for each each input of a nary-join, it maintains a queue of potential records.
  1519. CNaryJoinLookaheadQueue::CNaryJoinLookaheadQueue(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookahead * _input, CNaryJoinLookaheadQueue * _left, const void * * _activeRowPtr) : helper(_helper), rows(_inputAllocator), unmatchedRows(_inputAllocator)
  1520. {
  1521. equalCompareEq = helper.queryEqualCompareEq();
  1522. nonSteppedCompareEq = helper.queryNonSteppedCompare();
  1523. numEqualFields = helper.numEqualFields();
  1524. stepCompare = helper.querySteppingMeta()->queryCompare();
  1525. input.set(_input);
  1526. activeRowPtr = _activeRowPtr;
  1527. left = _left;
  1528. equalityRow = NULL;
  1529. curRow = 0;
  1530. maxRow = 0;
  1531. numSkipped = 0;
  1532. done = true;
  1533. }
  1534. bool CNaryJoinLookaheadQueue::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext)
  1535. {
  1536. done = false;
  1537. equalityRow = _equalityRow;
  1538. rows.kill();
  1539. numSkipped = 0;
  1540. if (matchedLeft)
  1541. matchedLeft->reset();
  1542. // next is guaranteed to match the equality condition for AND, proximity but not for m of n/left outer...
  1543. if (!needToVerifyNext || nextUnqueued())
  1544. {
  1545. consumeNextInput();
  1546. return true;
  1547. }
  1548. return false;
  1549. }
  1550. void CNaryJoinLookaheadQueue::clearPending()
  1551. {
  1552. rows.kill();
  1553. }
  1554. bool CNaryJoinLookaheadQueue::ensureNonEmpty()
  1555. {
  1556. if (rows.ordinality())
  1557. return true;
  1558. if (nextUnqueued())
  1559. {
  1560. consumeNextInput();
  1561. return true;
  1562. }
  1563. return false;
  1564. }
  1565. bool CNaryJoinLookaheadQueue::firstSelection()
  1566. {
  1567. if (!left)
  1568. {
  1569. assertex(maxRow != 0);
  1570. curRow = 0;
  1571. *activeRowPtr = rows.item(curRow);
  1572. return true;
  1573. }
  1574. if (!left->firstSelection())
  1575. return false;
  1576. return findValidSelection(0);
  1577. }
  1578. bool CNaryJoinLookaheadQueue::findValidSelection(unsigned initialRow)
  1579. {
  1580. assertex(left);
  1581. const unsigned max = maxRow;
  1582. unsigned candidateRow = initialRow;
  1583. loop
  1584. {
  1585. const void * leftRow = left->activeRow();
  1586. while (candidateRow < max)
  1587. {
  1588. const void * rightRow = rows.item(candidateRow);
  1589. if (!nonSteppedCompareEq || nonSteppedCompareEq->match(leftRow, rightRow))
  1590. {
  1591. curRow = candidateRow;
  1592. *activeRowPtr = rightRow;
  1593. return true;
  1594. }
  1595. candidateRow++;
  1596. }
  1597. if (!left->nextSelection())
  1598. return false;
  1599. candidateRow = 0;
  1600. }
  1601. }
  1602. const void * CNaryJoinLookaheadQueue::nextUnqueued()
  1603. {
  1604. if (equalityRow)
  1605. {
  1606. bool matches = true;
  1607. const void * next = input->nextGE(equalityRow, numEqualFields, matches, nonBufferedMismatchStepExtra);
  1608. if (next && matches && equalCompareEq->match(next, equalityRow))
  1609. return next;
  1610. return NULL;
  1611. }
  1612. else
  1613. return input->next();
  1614. }
  1615. bool CNaryJoinLookaheadQueue::nextSelection()
  1616. {
  1617. if (left)
  1618. return findValidSelection(curRow+1);
  1619. curRow++;
  1620. if (curRow >= maxRow)
  1621. return false;
  1622. *activeRowPtr = rows.item(curRow);
  1623. return true;
  1624. }
  1625. bool CNaryJoinLookaheadQueue::ensureCandidateExists(unsigned __int64 minDistance, unsigned __int64 maxDistance)
  1626. {
  1627. loop
  1628. {
  1629. const void * next = rows.head();
  1630. if (!next)
  1631. break;
  1632. unsigned __int64 distance = helper.extractRangeValue(next);
  1633. if (distance >= minDistance)
  1634. {
  1635. assertex(distance <= maxDistance);
  1636. return true;
  1637. }
  1638. rows.skip();
  1639. }
  1640. loop
  1641. {
  1642. const void * next = nextUnqueued();
  1643. if (!next)
  1644. return false;
  1645. unsigned __int64 distance = helper.extractRangeValue(next);
  1646. if (distance >= minDistance)
  1647. {
  1648. if (distance <= maxDistance)
  1649. {
  1650. consumeNextInput();
  1651. return true;
  1652. }
  1653. return false;
  1654. }
  1655. input->skip();
  1656. }
  1657. }
  1658. bool CNaryJoinLookaheadQueue::checkExistsGE(const void * seek, unsigned numFields)
  1659. {
  1660. loop
  1661. {
  1662. const void * next = rows.head();
  1663. if (!next)
  1664. return false;
  1665. if (stepCompare->docompare(next, seek, numFields) >= 0)
  1666. return true;
  1667. rows.skip();
  1668. }
  1669. }
  1670. unsigned CNaryJoinLookaheadQueue::readAheadTo(unsigned __int64 maxDistance, bool canConsumeBeyondMax)
  1671. {
  1672. const void * tail = rows.tail();
  1673. if (tail && helper.extractRangeValue(tail) > maxDistance)
  1674. {
  1675. unsigned limit = rows.ordinality() - 1;
  1676. //Already have all the records, return how many...
  1677. while (limit > 0)
  1678. {
  1679. const void * prev = rows.item(limit-1);
  1680. if (helper.extractRangeValue(prev) <= maxDistance)
  1681. return limit;
  1682. --limit;
  1683. }
  1684. return 0;
  1685. }
  1686. while (!done)
  1687. {
  1688. const void * next = nextUnqueued();
  1689. if (!next)
  1690. {
  1691. done = true;
  1692. break;
  1693. }
  1694. //This is a bit nasty. We need to consume the next value to ensure that the lowest spotter always has the next valid
  1695. //but it means we might be reading this input for too long
  1696. if (helper.extractRangeValue(next) > maxDistance)
  1697. {
  1698. if (!canConsumeBeyondMax)
  1699. break;
  1700. consumeNextInput();
  1701. return rows.ordinality()-1;
  1702. }
  1703. consumeNextInput();
  1704. }
  1705. return rows.ordinality();
  1706. }
  1707. void CNaryJoinLookaheadQueue::readCandidateAll()
  1708. {
  1709. loop
  1710. {
  1711. const void * next = nextUnqueued();
  1712. if (!next)
  1713. return;
  1714. consumeNextInput();
  1715. }
  1716. }
  1717. void CNaryJoinLookaheadQueue::skip()
  1718. {
  1719. if (matchedLeft && !matchedLeft->test(numSkipped))
  1720. {
  1721. unmatchedRows.enqueue(rows.dequeue());
  1722. }
  1723. else
  1724. rows.skip();
  1725. numSkipped++;
  1726. }
  1727. bool CNaryJoinLookaheadQueue::flushUnmatched()
  1728. {
  1729. while (rows.ordinality())
  1730. skip();
  1731. return unmatchedRows.ordinality() != 0;
  1732. }
  1733. //---------------------------------------------------------------------------
  1734. CProximityJoinProcessor::CProximityJoinProcessor(IHThorNWayMergeJoinArg & _helper) :
  1735. CMergeJoinProcessor(_helper)
  1736. {
  1737. maxRightBeforeLeft = 0;
  1738. maxLeftBeforeRight = 0;
  1739. }
  1740. void CProximityJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1741. {
  1742. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1743. createTempSeekBuffer();
  1744. //Have to delay creating the actual join joinProcessor because maxRightBeforeLeft() etc. can be onStart dependant.
  1745. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  1746. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  1747. //Handle phrases using a different class i) because the general scheme doesn't quite work and ii) for efficiency
  1748. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1749. {
  1750. if ((maxRightBeforeLeft < 0 || maxLeftBeforeRight < 0))
  1751. outputProcessor.setown(new CAnchoredRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1752. else
  1753. outputProcessor.setown(new CProximityRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1754. }
  1755. else
  1756. createMerger();
  1757. }
  1758. bool CProximityJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1759. {
  1760. unsigned firstInput = 0;//searchOrder.item(0);
  1761. bool wasCompleteMatch = true;
  1762. if (eof || !inputs.item(firstInput).next(seekValue, numSeekFields, wasCompleteMatch, nonBufferedMatchStepExtra))
  1763. return false;
  1764. unsigned matchCount = 1;
  1765. clearMatches();
  1766. matched[firstInput] = true;
  1767. const unsigned numJoinFields = numEqualFields + 1;
  1768. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1769. while (matchCount != numInputs)
  1770. {
  1771. unsigned nextInput = nextToMatch();
  1772. unsigned baseInput = getBestToSeekFrom(nextInput);
  1773. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1774. helper.adjustRangeValue(rowBuilder, inputs.item(baseInput).next(), -maxDistanceBefore(baseInput, nextInput));
  1775. bool wasCompleteMatch = true;
  1776. //MORE: Would it help to allow mismatches? I would have thought so, but there was a previous comment sayimg "I don't think so because of the range calculation"
  1777. const void * nextRow = inputs.item(nextInput).nextGE(tempSeekBuffer, numJoinFields, wasCompleteMatch, nonBufferedMatchStepExtra);
  1778. assertex(wasCompleteMatch);
  1779. if (!nextRow)
  1780. {
  1781. eof = true;
  1782. return false;
  1783. }
  1784. if (inputsMustMatchEquality || equalityComponentMatches(nextRow, tempSeekBuffer))
  1785. {
  1786. //Now check if this new record causes other records to be too far away
  1787. unsigned __int64 thisRangeValue = helper.extractRangeValue(nextRow);
  1788. for (unsigned i=0; i<numInputs; i++)
  1789. {
  1790. if (matched[i])
  1791. {
  1792. unsigned __int64 seekRangeValue = adjustRangeValue(thisRangeValue, -maxDistanceBefore(nextInput, i));
  1793. if (getRangeValue(i) < seekRangeValue)
  1794. {
  1795. inputs.item(i).skip();
  1796. matched[i] = false;
  1797. if (--matchCount == 0)
  1798. break;
  1799. }
  1800. }
  1801. }
  1802. }
  1803. else
  1804. {
  1805. for (unsigned i=0; i<numInputs; i++)
  1806. {
  1807. if (matched[i])
  1808. {
  1809. inputs.item(i).skip();
  1810. matched[i] = false;
  1811. matchCount--;
  1812. }
  1813. }
  1814. }
  1815. matched[nextInput] = true;
  1816. matchCount++;
  1817. }
  1818. setCandidateRow(inputs.item(0).next(), false, NULL);
  1819. return true;
  1820. }
  1821. __int64 CProximityJoinProcessor::maxDistanceBefore(unsigned fixedInput, unsigned searchInput) const
  1822. {
  1823. assertex(outputProcessor); // sanity check to ensure this isn't called before maxXBeforeY are set up
  1824. if (searchInput < fixedInput)
  1825. return maxLeftBeforeRight * (fixedInput - searchInput);
  1826. else
  1827. return maxRightBeforeLeft * (searchInput - fixedInput);
  1828. }
  1829. unsigned CProximityJoinProcessor::nextToMatch() const
  1830. {
  1831. for (unsigned i=0; i < numInputs; i++)
  1832. {
  1833. unsigned next = i;//searchOrder.item(i);
  1834. if (!matched[next])
  1835. return next;
  1836. }
  1837. throwUnexpected();
  1838. }
  1839. //Choose the input to seek from that restricts the input being sought the most.
  1840. unsigned CProximityJoinProcessor::getBestToSeekFrom(unsigned seekInput) const
  1841. {
  1842. unsigned __int64 bestRangeValue = 0;
  1843. unsigned best = NotFound;
  1844. //MORE: This can be optimized!
  1845. for (unsigned i=0; i < numInputs; i++)
  1846. {
  1847. if (matched[i])
  1848. {
  1849. //Calculate the value of the distance
  1850. __int64 distanceBefore = maxDistanceBefore(i, seekInput);
  1851. unsigned __int64 rangeValue = adjustRangeValue(getRangeValue(i), -distanceBefore);
  1852. if (rangeValue >= bestRangeValue)
  1853. {
  1854. bestRangeValue = rangeValue;
  1855. best = i;
  1856. }
  1857. }
  1858. }
  1859. assertex(best != NotFound);
  1860. return best;
  1861. }
  1862. //---------------------------------------------------------------------------
  1863. //NULL passed to CSteppedInputLookahead first parameter means nextGE() must be overridden
  1864. CJoinGenerator::CJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  1865. helper(_helper), inputAllocator(_inputAllocator), outputAllocator(_outputAllocator)
  1866. {
  1867. state = JSdone;
  1868. unsigned flags = helper.getJoinFlags();
  1869. stepCompare = helper.querySteppingMeta()->queryCompare();
  1870. globalCompare = NULL;
  1871. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  1872. globalCompare = helper.queryGlobalCompare();
  1873. unsigned numInputs = _inputs.ordinality();
  1874. rows = new const void * [numInputs];
  1875. CNaryJoinLookaheadQueue * prev = NULL;
  1876. ForEachItemIn(i, _inputs)
  1877. {
  1878. CNaryJoinLookaheadQueue * queue = new CNaryJoinLookaheadQueue(inputAllocator, helper, &_inputs.item(i), prev, rows + i);
  1879. inputs.append(*queue);
  1880. prev = queue;
  1881. }
  1882. isSpecialLeftJoin = false;
  1883. numActiveInputs = numInputs;
  1884. lastActiveInput = numInputs ? &inputs.tos() : NULL;
  1885. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  1886. switch (joinKind)
  1887. {
  1888. case IHThorNWayMergeJoinArg::MJFleftonly:
  1889. case IHThorNWayMergeJoinArg::MJFleftouter:
  1890. if (helper.queryNonSteppedCompare() || globalCompare)
  1891. {
  1892. isSpecialLeftJoin = true;
  1893. if (numInputs)
  1894. inputs.item(0).trackUnmatched();
  1895. }
  1896. break;
  1897. case IHThorNWayMergeJoinArg::MJFmofn:
  1898. if (helper.queryNonSteppedCompare() || globalCompare)
  1899. throw MakeStringException(99, "MOFN JOIN with non stepped condition not yet supported");
  1900. break;
  1901. }
  1902. }
  1903. CJoinGenerator::~CJoinGenerator()
  1904. {
  1905. delete [] rows;
  1906. }
  1907. void CJoinGenerator::beforeProcessCandidates(const void * candidateRow, bool needToVerifyNext, const bool * matched)
  1908. {
  1909. if (needToVerifyNext)
  1910. {
  1911. numActiveInputs = 0;
  1912. CNaryJoinLookaheadQueue * prev = NULL;
  1913. ForEachItemIn(i, inputs)
  1914. {
  1915. CNaryJoinLookaheadQueue & cur = inputs.item(i);
  1916. if (cur.beforeProcessCandidates(candidateRow, needToVerifyNext))
  1917. {
  1918. cur.updateContext(prev, rows + numActiveInputs);
  1919. prev = &cur;
  1920. numActiveInputs++;
  1921. }
  1922. }
  1923. lastActiveInput = prev;
  1924. }
  1925. else
  1926. {
  1927. ForEachItemIn(i, inputs)
  1928. inputs.item(i).beforeProcessCandidates(candidateRow, needToVerifyNext);
  1929. }
  1930. state = JSfirst;
  1931. }
  1932. void CJoinGenerator::cleanupAllCandidates()
  1933. {
  1934. //Remove all pending candidates - only called if outer join optimization is enabled
  1935. //afterProcessCandidates() will already have been called.
  1936. ForEachItemIn(i, inputs)
  1937. inputs.item(i).clearPending();
  1938. }
  1939. void CJoinGenerator::afterProcessCandidates()
  1940. {
  1941. }
  1942. const void * CJoinGenerator::nextOutputRow()
  1943. {
  1944. RtlDynamicRowBuilder rowBuilder(outputAllocator, false);
  1945. loop
  1946. {
  1947. if (isSpecialLeftJoin)
  1948. {
  1949. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1950. loop
  1951. {
  1952. const void * unmatchedLeft = left.nextUnmatched();
  1953. if (!unmatchedLeft)
  1954. break;
  1955. rowBuilder.ensureRow();
  1956. size32_t retSize = helper.transform(rowBuilder, 1, &unmatchedLeft);
  1957. left.skipUnmatched();
  1958. if (retSize)
  1959. return rowBuilder.finalizeRowClear(retSize);
  1960. }
  1961. }
  1962. switch (state)
  1963. {
  1964. case JSdone:
  1965. if (isSpecialLeftJoin)
  1966. {
  1967. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1968. left.readCandidateAll();
  1969. if (left.flushUnmatched())
  1970. break; // round again
  1971. }
  1972. return NULL;
  1973. case JShascandidate:
  1974. {
  1975. state = JSnextcandidate;
  1976. //If is left only join, and has an additional equality criteria, then ignore matches.
  1977. //If left only, and no extra equality - or only one dataset has matches, then all matches are real left only matches
  1978. if (isSpecialLeftJoin && (joinKind == IHThorNWayMergeJoinArg::MJFleftonly) && (numActiveInputs != 1))
  1979. break;
  1980. rowBuilder.ensureRow();
  1981. size32_t retSize = helper.transform(rowBuilder, numActiveInputs, rows);
  1982. if (retSize)
  1983. return rowBuilder.finalizeRowClear(retSize);
  1984. break;
  1985. }
  1986. case JSnextcandidate:
  1987. if (nextCandidate())
  1988. state = JShascandidate;
  1989. else
  1990. {
  1991. if (state != JSdone)
  1992. state = JSgathercandidates;
  1993. }
  1994. break;
  1995. case JSfirst:
  1996. case JSgathercandidates:
  1997. if (gatherNextCandidates())
  1998. state = JShascandidate;
  1999. else
  2000. state = JSdone;
  2001. break;
  2002. default:
  2003. throwUnexpected();
  2004. }
  2005. }
  2006. }
  2007. const void * CJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  2008. {
  2009. //A stupid version. We could possibly skip on the lowest value if we knew the fields were assigned from the lowest value in the input
  2010. //which would potentially save a lot of transforms.
  2011. //would also probably need the input to match the output.
  2012. loop
  2013. {
  2014. const void * next = nextOutputRow();
  2015. if (!next || stepCompare->docompare(next, seek, numFields) >= 0)
  2016. return next;
  2017. outputAllocator->releaseRow(next);
  2018. }
  2019. }
  2020. bool CJoinGenerator::firstSelection()
  2021. {
  2022. if (lastActiveInput->firstSelection())
  2023. {
  2024. if (globalCompare && !globalCompare->match(numActiveInputs, rows))
  2025. return nextSelection();
  2026. if (isSpecialLeftJoin)
  2027. inputs.item(0).noteMatched();
  2028. return true;
  2029. }
  2030. return false;
  2031. }
  2032. bool CJoinGenerator::nextSelection()
  2033. {
  2034. while (lastActiveInput->nextSelection())
  2035. {
  2036. if (!globalCompare || globalCompare->match(numActiveInputs, rows))
  2037. {
  2038. if (isSpecialLeftJoin)
  2039. inputs.item(0).noteMatched();
  2040. return true;
  2041. }
  2042. }
  2043. return false;
  2044. }
  2045. //---------------------------------------------------------------------------
  2046. CEqualityJoinGenerator::CEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2047. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2048. {
  2049. lowestInput = NULL;
  2050. }
  2051. void CEqualityJoinGenerator::afterProcessCandidates()
  2052. {
  2053. lowestInput = NULL;
  2054. CJoinGenerator::afterProcessCandidates();
  2055. }
  2056. bool CEqualityJoinGenerator::nextCandidate()
  2057. {
  2058. if (nextSelection())
  2059. return true;
  2060. selectNextLowestInput();
  2061. return false;
  2062. }
  2063. /*
  2064. o Walk the input which is guaranteed to be the lowest
  2065. o Once that is done throw away that record, and choose the next.
  2066. */
  2067. bool CEqualityJoinGenerator::doGatherNextCandidates()
  2068. {
  2069. ForEachItemIn(iInput, inputs)
  2070. {
  2071. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2072. if (&curInput != lowestInput)
  2073. curInput.setCandidateAll();
  2074. else
  2075. curInput.setCandidateLowest();
  2076. }
  2077. return firstSelection();
  2078. }
  2079. bool CEqualityJoinGenerator::gatherNextCandidates()
  2080. {
  2081. if (state == JSfirst)
  2082. {
  2083. prefetchAllCandidates();
  2084. selectLowestInput();
  2085. }
  2086. else if (lowestInput->empty())
  2087. return false;
  2088. loop
  2089. {
  2090. if (doGatherNextCandidates())
  2091. return true;
  2092. if (!selectNextLowestInput())
  2093. return false;
  2094. }
  2095. }
  2096. void CEqualityJoinGenerator::prefetchAllCandidates()
  2097. {
  2098. //could be done in parallel, but
  2099. ForEachItemIn(i, inputs)
  2100. {
  2101. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2102. curInput.readCandidateAll();
  2103. }
  2104. }
  2105. void CEqualityJoinGenerator::selectLowestInput()
  2106. {
  2107. ForEachItemIn(i, inputs)
  2108. {
  2109. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2110. if (!curInput.empty())
  2111. {
  2112. lowestInput = &curInput;
  2113. return;
  2114. }
  2115. }
  2116. throwUnexpected();
  2117. }
  2118. bool CEqualityJoinGenerator::selectNextLowestInput()
  2119. {
  2120. lowestInput->skip();
  2121. if (lowestInput->empty())
  2122. {
  2123. state = JSdone;
  2124. return false;
  2125. }
  2126. return true;
  2127. }
  2128. //---------------------------------------------------------------------------
  2129. CSortedEqualityJoinGenerator::CSortedEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2130. CEqualityJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2131. {
  2132. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2133. lowestSpotter.initInputs();
  2134. }
  2135. CSortedEqualityJoinGenerator::~CSortedEqualityJoinGenerator()
  2136. {
  2137. lowestSpotter.cleanup();
  2138. }
  2139. void CSortedEqualityJoinGenerator::afterProcessCandidates()
  2140. {
  2141. lowestSpotter.reset();
  2142. CEqualityJoinGenerator::afterProcessCandidates();
  2143. }
  2144. void CSortedEqualityJoinGenerator::selectLowestInput()
  2145. {
  2146. unsigned iLowest = lowestSpotter.queryNextInput();
  2147. assertex(iLowest != NotFound);
  2148. lowestInput = &inputs.item(iLowest);
  2149. }
  2150. bool CSortedEqualityJoinGenerator::selectNextLowestInput()
  2151. {
  2152. lowestSpotter.skipRow();
  2153. if (lowestInput->empty())
  2154. {
  2155. state = JSdone;
  2156. return false;
  2157. }
  2158. CSortedEqualityJoinGenerator::selectLowestInput();
  2159. return true;
  2160. }
  2161. //---------------------------------------------------------------------------
  2162. CRangeJoinGenerator::CRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2163. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2164. {
  2165. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  2166. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  2167. }
  2168. //---------------------------------------------------------------------------
  2169. CAnchoredRangeJoinGenerator::CAnchoredRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2170. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2171. {
  2172. iLowest = maxRightBeforeLeft < 0 ? 0 : inputs.ordinality()-1;
  2173. lowestInput = &inputs.item(iLowest);
  2174. }
  2175. bool CAnchoredRangeJoinGenerator::nextCandidate()
  2176. {
  2177. if (nextSelection())
  2178. return true;
  2179. lowestInput->skip();
  2180. return false;
  2181. }
  2182. /*
  2183. o Walk the input which is guaranteed to be the lowest
  2184. o Once that is done throw away that record, and choose the next.
  2185. */
  2186. bool CAnchoredRangeJoinGenerator::doGatherNextCandidates()
  2187. {
  2188. const void * lowestRow = lowestInput->next();
  2189. if (!lowestRow)
  2190. return false;
  2191. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2192. ForEachItemIn(iInput, inputs)
  2193. {
  2194. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2195. if (iInput != iLowest)
  2196. {
  2197. __int64 maxLowestBeforeCur = maxDistanceAfterLowest(iInput);
  2198. assertex(maxLowestBeforeCur > 0);
  2199. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2200. if (!curInput.setCandidateRange(maxDistance, false))
  2201. return false;
  2202. }
  2203. }
  2204. lowestInput->setCandidateLowest();
  2205. return firstSelection();
  2206. }
  2207. const void * CAnchoredRangeJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  2208. {
  2209. //Note: Skip any lower values that are less than seek value, but don't read any more
  2210. if (!lowestInput->checkExistsGE(seek, numFields))
  2211. return NULL;
  2212. return CRangeJoinGenerator::nextOutputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  2213. }
  2214. bool CAnchoredRangeJoinGenerator::nextMatchesAnyConsumed()
  2215. {
  2216. const void * lowestRow = lowestInput->next();
  2217. bool consumePending = false;
  2218. if (!lowestRow)
  2219. {
  2220. lowestRow = lowestInput->nextUnqueued();
  2221. if (!lowestRow)
  2222. return false;
  2223. consumePending = true;
  2224. }
  2225. //Throw any non-matching rows away, and return true if there are no other rows left.
  2226. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2227. ForEachItemIn(iInput, inputs)
  2228. {
  2229. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2230. if (iInput != iLowest)
  2231. {
  2232. //note: maxRightBeforeLeft is -minRightAfterLeft
  2233. __int64 minCurAfterLowest;
  2234. if (iInput < iLowest)
  2235. minCurAfterLowest = (-maxLeftBeforeRight) * (iLowest - iInput);
  2236. else
  2237. minCurAfterLowest = (-maxRightBeforeLeft) * (iInput - iLowest);
  2238. assertex(minCurAfterLowest >= 0);
  2239. if (!curInput.ensureCandidateExists(lowestDistance+minCurAfterLowest, lowestDistance + maxDistanceAfterLowest(iInput)))
  2240. return false;
  2241. }
  2242. }
  2243. //A potential match, so consume the potential start word and try again
  2244. if (consumePending)
  2245. lowestInput->consumeNextInput();
  2246. return true;
  2247. }
  2248. bool CAnchoredRangeJoinGenerator::gatherNextCandidates()
  2249. {
  2250. loop
  2251. {
  2252. if (!nextMatchesAnyConsumed())
  2253. return false;
  2254. if (doGatherNextCandidates())
  2255. return true;
  2256. lowestInput->skip();
  2257. }
  2258. }
  2259. //---------------------------------------------------------------------------
  2260. CProximityRangeJoinGenerator::CProximityRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2261. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2262. {
  2263. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2264. lowestSpotter.initInputs();
  2265. }
  2266. CProximityRangeJoinGenerator::~CProximityRangeJoinGenerator()
  2267. {
  2268. lowestSpotter.cleanup();
  2269. }
  2270. void CProximityRangeJoinGenerator::afterProcessCandidates()
  2271. {
  2272. lowestSpotter.reset();
  2273. CRangeJoinGenerator::afterProcessCandidates();
  2274. }
  2275. bool CProximityRangeJoinGenerator::nextCandidate()
  2276. {
  2277. if (nextSelection())
  2278. return true;
  2279. if (!lowestSpotter.skipNextLowest())
  2280. state = JSdone;
  2281. return false;
  2282. }
  2283. /*
  2284. First version.....
  2285. o Walk the input datasets in the order lowest first.
  2286. o Perform the cross product of that record with all others that could possibly match.
  2287. o Once that is done throw away that record, and choose the next.
  2288. o Abort as soon as any of the inputs contains no records within potential range.
  2289. */
  2290. bool CProximityRangeJoinGenerator::gatherNextCandidates(unsigned iLowest)
  2291. {
  2292. CNaryJoinLookaheadQueue & lowestInput = inputs.item(iLowest);
  2293. const void * lowestRow = lowestInput.next();
  2294. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2295. ForEachItemIn(iInput, inputs)
  2296. {
  2297. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2298. if (iInput != iLowest)
  2299. {
  2300. __int64 maxLowestBeforeCur;
  2301. if (iInput < iLowest)
  2302. maxLowestBeforeCur = maxRightBeforeLeft * (iLowest - iInput);
  2303. else
  2304. maxLowestBeforeCur = maxLeftBeforeRight * (iInput - iLowest);
  2305. assertex(maxLowestBeforeCur >= 0); // should have created an anchored varient if not true
  2306. // maxLowestBeforeCur = maxCurAfterLowest
  2307. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2308. if (!curInput.setCandidateRange(maxDistance, true))
  2309. return false;
  2310. }
  2311. else
  2312. curInput.setCandidateLowest();
  2313. }
  2314. return firstSelection();
  2315. }
  2316. bool CProximityRangeJoinGenerator::gatherNextCandidates()
  2317. {
  2318. loop
  2319. {
  2320. unsigned iLowest = lowestSpotter.queryNextInput();
  2321. assertex(iLowest != NotFound);
  2322. if (gatherNextCandidates(iLowest))
  2323. return true;
  2324. //It would be really nice to break out early if there were no more potential matches, but even if there
  2325. //is only one matching stream we need to keep walking the consumed records, because the later records
  2326. //may pull in the relevant related records, and we can't sensibly put back the consumed records
  2327. if (!lowestSpotter.skipNextLowest())
  2328. {
  2329. //No more records within this document => can't ever match
  2330. state = JSdone;
  2331. return false;
  2332. }
  2333. }
  2334. }