thorstep.cpp 83 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jlib.hpp"
  15. #include "jlog.hpp"
  16. #include "jqueue.tpp"
  17. #include "jexcept.hpp"
  18. #include "thorcommon.hpp"
  19. #include "thorstep.ipp"
  20. #include "thorstep2.ipp"
  21. #ifdef _DEBUG
  22. #define CHECK_CONSISTENCY
  23. #endif
  24. const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
  25. const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  26. const static SmartStepExtra nonSeekStepExtra(SSEFreturnUnbufferedMatches, NULL); // if doing next() instead of nextGE()
  27. const static SmartStepExtra nonBufferedMatchStepExtra(SSEFreturnUnbufferedMatches, NULL);
  28. const static SmartStepExtra nonBufferedMismatchStepExtra(SSEFreturnMismatches, NULL);
  29. bool stepFieldsMatch(const CFieldOffsetSize * leftFields, unsigned leftIndex, const CFieldOffsetSize * rightFields, unsigned rightIndex)
  30. {
  31. const CFieldOffsetSize * leftField = leftFields + leftIndex;
  32. const CFieldOffsetSize * rightField = rightFields + rightIndex;
  33. return (leftField->offset == rightField->offset) && (leftField->size == rightField->size);
  34. }
  35. bool stepFieldsMatch(ISteppingMeta * leftMeta, unsigned leftIndex, ISteppingMeta * rightMeta, unsigned rightIndex)
  36. {
  37. if ((leftIndex >= leftMeta->getNumFields()) || (rightIndex >= rightMeta->getNumFields()))
  38. return false;
  39. return stepFieldsMatch(leftMeta->queryFields(), leftIndex, rightMeta->queryFields(), rightIndex);
  40. }
  41. unsigned getNumMatchingFields(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  42. {
  43. unsigned numStepableFields = 0;
  44. if (inputStepping && callerStepping)
  45. {
  46. //Determine where the stepping fields overlap, and work out the extent.
  47. unsigned inputCount = inputStepping->getNumFields();
  48. for (unsigned i=0; i < inputCount; i++)
  49. {
  50. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  51. break;
  52. numStepableFields++;
  53. }
  54. }
  55. return numStepableFields;
  56. }
  57. void verifySteppingCompatible(ISteppingMeta * inputStepping, ISteppingMeta * callerStepping)
  58. {
  59. if (inputStepping && callerStepping)
  60. {
  61. //Determine where the stepping fields overlap, and work out the extent.
  62. unsigned parentCount = callerStepping->getNumFields();
  63. unsigned inputCount = inputStepping->getNumFields();
  64. unsigned max = parentCount < inputCount ? parentCount : inputCount;
  65. for (unsigned i=0; i < max; i++)
  66. {
  67. if (!stepFieldsMatch(callerStepping, i, inputStepping, i))
  68. throw MakeStringException(999, "Stepping field %d, input and join do not match", i);
  69. }
  70. }
  71. }
  72. //---------------------------------------------------------------------------
  73. void CSteppingMeta::intersect(IInputSteppingMeta * inputMeta)
  74. {
  75. if (inputMeta)
  76. {
  77. unsigned maxFields = inputMeta->getNumFields();
  78. if (maxFields > numFields)
  79. maxFields = numFields;
  80. for (unsigned curField = 0; curField < maxFields; curField++)
  81. {
  82. if (!stepFieldsMatch(inputMeta->queryFields(), curField, fields, curField))
  83. {
  84. numFields = curField;
  85. break;
  86. }
  87. }
  88. if (inputMeta->hasPostFilter())
  89. postFiltered = true;
  90. if (inputMeta->isDistributed())
  91. setDistributed();
  92. unsigned inputFlags = inputMeta->getSteppedFlags();
  93. double inputPriority = inputMeta->getPriority();
  94. if (hadStepExtra)
  95. {
  96. stepFlags &= inputFlags;
  97. if (priority != inputPriority)
  98. stepFlags &= ~SSFhaspriority;
  99. }
  100. else
  101. {
  102. hadStepExtra = true;
  103. stepFlags = inputFlags;
  104. priority = inputPriority;
  105. }
  106. }
  107. else
  108. numFields = 0;
  109. }
  110. //---------------------------------------------------------------------------
  111. CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSteppingMeta * _inputStepping, IEngineRowAllocator * _rowAllocator, IRangeCompare * _compare, bool _paranoid)
  112. : input(_input), compare(_compare)
  113. {
  114. maxFields = compare ? compare->maxFields() : 0;
  115. pending = NULL;
  116. pendingMatches = true;
  117. stepFlagsMask = 0;
  118. stepFlagsValue = 0;
  119. paranoid = _paranoid;
  120. previousPending = NULL;
  121. rowAllocator.set(_rowAllocator);
  122. inputStepping = _inputStepping;
  123. numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
  124. isPostFiltered = inputStepping ? inputStepping->hasPostFilter() : false;
  125. setRestriction(NULL, 0);
  126. lowestFrequencyInput = NULL;
  127. }
  128. CSteppedInputLookahead::~CSteppedInputLookahead()
  129. {
  130. if (previousPending)
  131. rowAllocator->releaseRow(previousPending);
  132. if (pending)
  133. rowAllocator->releaseRow(pending);
  134. }
  135. const void * CSteppedInputLookahead::nextInputRow()
  136. {
  137. if (readAheadRows.ordinality())
  138. return readAheadRows.dequeue();
  139. if (seekRows.ordinality())
  140. return seekRows.dequeue();
  141. return input->nextInputRow();
  142. }
  143. const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  144. {
  145. while (readAheadRows.ordinality())
  146. {
  147. OwnedLCRow next = readAheadRows.dequeue();
  148. if (compare->docompare(next, seek, numFields) >= 0)
  149. {
  150. assertex(wasCompleteMatch);
  151. return (void *)next.getClear();
  152. }
  153. }
  154. while (seekRows.ordinality())
  155. {
  156. OwnedLCRow next = seekRows.dequeue();
  157. if (compare->docompare(next, seek, numFields) >= 0)
  158. {
  159. assertex(wasCompleteMatch);
  160. return (void *)next.getClear();
  161. }
  162. }
  163. return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  164. }
  165. void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
  166. {
  167. //Transfer any rows with fields before the seek position to a list of pending rows, so we don't waste
  168. //time sending seek rows that can't match..
  169. while (seekRows.ordinality())
  170. {
  171. const void * next = seekRows.head();
  172. if (compare->docompare(next, seek, numFields) >= 0)
  173. {
  174. //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
  175. seek = seekRows.tail();
  176. break;
  177. }
  178. readAheadRows.enqueue(seekRows.dequeue());
  179. }
  180. //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
  181. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  182. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  183. seekRows.ensure(maxcount);
  184. while (seekRows.ordinality() < maxcount)
  185. {
  186. bool wasCompleteMatch = true;
  187. const void * next = input->nextInputRowGE(seek, numFields, wasCompleteMatch, inputStepExtra);
  188. if (!next)
  189. break;
  190. //wasCompleteMatch can be false if we've just read the last row returned from a block of reads,
  191. //but if so the next read request will do another blocked read, so just ignore this one.
  192. if (wasCompleteMatch)
  193. {
  194. seekRows.enqueue(next);
  195. //update the seek pointer to the best value.
  196. seek = next;
  197. }
  198. else
  199. rowAllocator->releaseRow(next);
  200. }
  201. }
  202. unsigned CSteppedInputLookahead::ordinality() const
  203. {
  204. //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
  205. if ((readAheadRows.ordinality() == 0) && pending)
  206. return seekRows.ordinality() + 1;
  207. return seekRows.ordinality();
  208. }
  209. const void * CSteppedInputLookahead::querySeek(unsigned i) const
  210. {
  211. //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
  212. if ((readAheadRows.ordinality() == 0) && pending)
  213. {
  214. if (i == 0)
  215. return pending;
  216. i--;
  217. }
  218. return seekRows.item(i);
  219. }
  220. const void * CSteppedInputLookahead::consume()
  221. {
  222. if (!pending)
  223. fill();
  224. if (!includeInOutput(pending))
  225. return NULL;
  226. if (paranoid && pending)
  227. {
  228. if (previousPending)
  229. rowAllocator->releaseRow(previousPending);
  230. previousPending = rowAllocator->linkRow(pending);
  231. }
  232. const void * ret = pending;
  233. pending = NULL;
  234. pendingMatches = true;
  235. return ret;
  236. }
  237. IMultipleStepSeekInfo * CSteppedInputLookahead::createMutipleReadWrapper()
  238. {
  239. return this;
  240. }
  241. void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper)
  242. {
  243. lowestFrequencyInput = wrapper;
  244. }
  245. void CSteppedInputLookahead::fill()
  246. {
  247. pendingMatches = true;
  248. if (restrictValue && numStepableFields)
  249. {
  250. //note - this will either return a valid value to be included in the range,
  251. //or if invalid then it must be out of range -> will fail includeInOutput later,
  252. //but we may as well keep the row
  253. unsigned numFields = numRestrictFields < numStepableFields ? numRestrictFields : numStepableFields;
  254. //Default to returning mismatches, but could be overidden from outside
  255. unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
  256. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  257. pending = nextInputRowGE(restrictValue, numFields, pendingMatches, inputStepExtra);
  258. if (paranoid && pending)
  259. {
  260. int c = compare->docompare(pending, restrictValue, numFields);
  261. if (c < 0)
  262. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  263. if ((c == 0) && !pendingMatches)
  264. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  265. }
  266. }
  267. else
  268. {
  269. //Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
  270. //Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
  271. pending = nextInputRow();
  272. }
  273. if (paranoid && pending && previousPending && compare)
  274. {
  275. if (compare->docompare(previousPending, pending, maxFields) > 0)
  276. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  277. }
  278. }
  279. const void * CSteppedInputLookahead::next()
  280. {
  281. if (!pendingMatches)
  282. {
  283. if (includeInOutput(pending))
  284. skip();
  285. else
  286. return NULL;
  287. }
  288. if (!pending)
  289. fill();
  290. if (!includeInOutput(pending))
  291. return NULL;
  292. return pending;
  293. }
  294. const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  295. {
  296. if (pending)
  297. {
  298. int c = compare->docompare(pending, seek, numFields);
  299. if (c >= 0)
  300. {
  301. if (!includeInOutput(pending))
  302. return NULL;
  303. if (pendingMatches)
  304. return pending;
  305. //pending Row is beyond seek point => ok to return an incomplete match
  306. if (stepExtra.returnMismatches() && (c != 0))
  307. {
  308. wasCompleteMatch = pendingMatches;
  309. return pending;
  310. }
  311. }
  312. skip();
  313. }
  314. if (numStepableFields)
  315. {
  316. //This class is directly told whether it should be using readAhead, so need to create a modified stepExtra
  317. unsigned flags = (stepExtra.queryFlags() & ~stepFlagsMask) | stepFlagsValue;
  318. SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
  319. unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
  320. loop
  321. {
  322. pendingMatches = true;
  323. pending = nextInputRowGE(seek, stepFields, pendingMatches, inputStepExtra);
  324. if (paranoid && pending)
  325. {
  326. int c = compare->docompare(pending, seek, stepFields);
  327. if (c < 0)
  328. throw MakeStringException(1001, "Input to stepped join preceeds seek point");
  329. if ((c == 0) && !pendingMatches)
  330. throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
  331. }
  332. if (!pending || !includeInOutput(pending))
  333. return NULL;
  334. if (numFields <= numStepableFields)
  335. {
  336. wasCompleteMatch = pendingMatches;
  337. return pending;
  338. }
  339. //if !pendingMatches then isCompleteMatch must have been provided => ok to return a mismatch
  340. //if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
  341. if (!pendingMatches)
  342. {
  343. wasCompleteMatch = pendingMatches;
  344. return pending;
  345. }
  346. if (compare->docompare(pending, seek, numFields) >= 0)
  347. {
  348. wasCompleteMatch = pendingMatches;
  349. return pending;
  350. }
  351. skip();
  352. }
  353. //now need to do an incremental seek on the subsequent fields to find an exact value >
  354. }
  355. //now narrow down
  356. loop
  357. {
  358. const void * cur = next();
  359. if (!cur)
  360. return NULL;
  361. if (compare->docompare(cur, seek, numFields) >= 0)
  362. return cur;
  363. skip();
  364. }
  365. }
  366. unsigned CSteppedInputLookahead::queryMaxStepable(ISteppingMeta * callerStepping) const
  367. {
  368. return getNumMatchingFields(inputStepping, callerStepping);
  369. }
  370. void CSteppedInputLookahead::setAlwaysReadExact()
  371. {
  372. //can be used to force reading only exact matches (for the known lowest priority input)
  373. stepFlagsMask |= SSEFreturnMismatches;
  374. }
  375. void CSteppedInputLookahead::setReadAhead(bool value)
  376. {
  377. //This never removes readahead if requested somewhere else, so don't update the mask.
  378. if (value)
  379. stepFlagsValue |= SSEFreadAhead;
  380. else
  381. stepFlagsValue &= ~SSEFreadAhead;
  382. }
  383. void CSteppedInputLookahead::setRestriction(const void * _value, unsigned _num)
  384. {
  385. restrictValue = _value;
  386. numRestrictFields = _num;
  387. }
  388. void CSteppedInputLookahead::resetEOF()
  389. {
  390. if (numRestrictFields == 0)
  391. resetInputEOF();
  392. }
  393. void CSteppedInputLookahead::skip()
  394. {
  395. if (paranoid)
  396. {
  397. if (previousPending)
  398. rowAllocator->releaseRow(previousPending);
  399. previousPending = pending;
  400. }
  401. else
  402. {
  403. if (pending)
  404. rowAllocator->releaseRow(pending);
  405. }
  406. //NB: Don't read ahead until we have to...
  407. pending = NULL;
  408. pendingMatches = true;
  409. }
  410. const void * CSteppedInputLookahead::skipnext()
  411. {
  412. skip();
  413. return next();
  414. }
  415. //---------------------------------------------------------------------------
  416. void CUnfilteredSteppedMerger::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext, const bool * matched)
  417. {
  418. merger.setCandidateRow(_equalityRow);
  419. unsigned numInputs = inputArray->ordinality();
  420. for (unsigned i=0; i< numInputs; i++)
  421. {
  422. if (!needToVerifyNext || matched[i])
  423. firstCandidateRows[i] = inputArray->item(i).consume();
  424. else
  425. firstCandidateRows[i] = NULL;
  426. }
  427. merger.primeRows(firstCandidateRows);
  428. }
  429. //---------------------------------------------------------------------------
  430. CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRangeCompare * _stepCompare, ICompare * _equalCompare, CSteppedInputLookahead * _input, unsigned _numEqualFields)
  431. {
  432. allocator = _allocator;
  433. stepCompare = _stepCompare;
  434. equalCompare = _equalCompare;
  435. input = _input;
  436. matched.setown(createBitSet());
  437. numMatched = 0;
  438. readIndex = 0;
  439. numEqualFields = _numEqualFields;
  440. }
  441. CFilteredInputBuffer::~CFilteredInputBuffer()
  442. {
  443. }
  444. const void * CFilteredInputBuffer::consume()
  445. {
  446. if (!rows.isItem(readIndex))
  447. return NULL;
  448. const void * ret = rows.item(readIndex);
  449. rows.replace(NULL, readIndex);
  450. readIndex++;
  451. return ret;
  452. }
  453. const void * CFilteredInputBuffer::consumeGE(const void * seek, unsigned numFields)
  454. {
  455. while (rows.isItem(readIndex))
  456. {
  457. const void * cur = rows.item(readIndex);
  458. if (stepCompare->docompare(cur, seek, numFields) >= 0)
  459. {
  460. rows.replace(NULL, readIndex);
  461. readIndex++;
  462. return cur;
  463. }
  464. readIndex++;
  465. }
  466. return NULL;
  467. }
  468. void CFilteredInputBuffer::fill(const void * equalityRow)
  469. {
  470. const void * next = input->consume();
  471. assertex(next);
  472. append(next);
  473. if (equalityRow)
  474. {
  475. loop
  476. {
  477. bool matches = true;
  478. SmartStepExtra stepExtra(SSEFreturnMismatches, NULL);
  479. const void * next = input->nextGE(equalityRow, numEqualFields, matches, stepExtra);
  480. if (!next || !matches || equalCompare->docompare(equalityRow, next) != 0)
  481. break;
  482. append(input->consume());
  483. }
  484. }
  485. else
  486. {
  487. loop
  488. {
  489. const void * next = input->consume();
  490. if (!next)
  491. break;
  492. append(next);
  493. }
  494. }
  495. }
  496. void CFilteredInputBuffer::removeMatched()
  497. {
  498. ForEachItemInRev(i, rows)
  499. {
  500. if (isMatched(i))
  501. remove(i);
  502. }
  503. }
  504. void CFilteredInputBuffer::removeUnmatched()
  505. {
  506. ForEachItemInRev(i, rows)
  507. {
  508. if (!isMatched(i))
  509. remove(i);
  510. }
  511. }
  512. void CFilteredInputBuffer::remove(unsigned i)
  513. {
  514. const void * row = rows.item(i);
  515. rows.remove(i);
  516. allocator->releaseRow(row);
  517. }
  518. void CFilteredInputBuffer::reset()
  519. {
  520. ForEachItemIn(i, rows)
  521. {
  522. const void * cur = rows.item(i);
  523. if (cur)
  524. allocator->releaseRow(cur);
  525. }
  526. rows.kill();
  527. matched->reset();
  528. numMatched = 0;
  529. readIndex = 0;
  530. }
  531. CFilteredSteppedMerger::CFilteredSteppedMerger()
  532. {
  533. matches = NULL;
  534. joinKind = 0;
  535. numInputs = 0;
  536. equalCompare = NULL;
  537. extraCompare = NULL;
  538. globalCompare = NULL;
  539. minMatches = 0;
  540. maxMatches = 0;
  541. fullyMatchedLevel = 0;
  542. }
  543. CFilteredSteppedMerger::~CFilteredSteppedMerger()
  544. {
  545. delete [] matches;
  546. }
  547. void CFilteredSteppedMerger::init(IEngineRowAllocator * _allocator, IHThorNWayMergeJoinArg & helper, CSteppedInputLookaheadArray * inputArray)
  548. {
  549. unsigned flags = helper.getJoinFlags();
  550. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  551. numInputs = inputArray->ordinality();
  552. matches = new const void * [numInputs];
  553. equalCompare = helper.queryEqualCompare();
  554. extraCompare = helper.queryNonSteppedCompare();
  555. globalCompare = NULL;
  556. unsigned numEqualFields = helper.numEqualFields();
  557. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  558. globalCompare = helper.queryGlobalCompare();
  559. if (joinKind == IHThorNWayMergeJoinArg::MJFmofn)
  560. {
  561. minMatches = helper.getMinMatches();
  562. maxMatches = helper.getMaxMatches();
  563. }
  564. else
  565. {
  566. minMatches = numInputs;
  567. maxMatches = numInputs;
  568. }
  569. IRangeCompare * stepCompare = helper.querySteppingMeta()->queryCompare();
  570. ForEachItemIn(i, *inputArray)
  571. inputs.append(*new CFilteredInputBuffer(_allocator, stepCompare, equalCompare, &inputArray->item(i), numEqualFields));
  572. merger.init(_allocator, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  573. merger.initInputs(&inputs);
  574. }
  575. //ISteppedJoinRowGenerator
  576. void CFilteredSteppedMerger::beforeProcessCandidates(const void * equalityRow, bool needToVerifyNext, const bool * matched)
  577. {
  578. //Exaustively read from each of the inputs into each of the buffers
  579. ForEachItemIn(i, inputs)
  580. {
  581. if (!needToVerifyNext || matched[i])
  582. inputs.item(i).fill(equalityRow);
  583. }
  584. postFilterRows();
  585. //No point priming the rows here - will be just as efficient to use the default action
  586. }
  587. void CFilteredSteppedMerger::afterProcessCandidates()
  588. {
  589. ForEachItemIn(i, inputs)
  590. inputs.item(i).reset();
  591. merger.reset();
  592. }
  593. void CFilteredSteppedMerger::cleanupAllCandidates()
  594. {
  595. merger.reset(); // not strictly necessary...
  596. }
  597. void CFilteredSteppedMerger::afterProcessingAll()
  598. {
  599. merger.cleanup();
  600. }
  601. const void * CFilteredSteppedMerger::nextOutputRow()
  602. {
  603. return merger.nextRow();
  604. }
  605. const void * CFilteredSteppedMerger::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  606. {
  607. return merger.nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  608. }
  609. bool CFilteredSteppedMerger::tagMatches(unsigned level, unsigned numRows)
  610. {
  611. CFilteredInputBuffer & right = inputs.item(level);
  612. unsigned maxLevel = inputs.ordinality()-1;
  613. ConstPointerArray & curRows = right.rows;
  614. if (curRows.ordinality())
  615. {
  616. bool valid = false;
  617. const void * lhs = matches[numRows-1];
  618. ForEachItemIn(i, curRows)
  619. {
  620. //If we have had a match at this level, and this item is already matched,
  621. //and all levels higher than this have already been completely matched),
  622. //then no need to check this item (and its children) again, since it won't change anything.
  623. bool alreadyMatched = right.isMatched(i);
  624. if (!valid || level + 1 < fullyMatchedLevel || !alreadyMatched)
  625. {
  626. const void * rhs = curRows.item(i);
  627. unsigned matchedRows = numRows;
  628. bool recurse;
  629. if (!extraCompare || extraCompare->match(lhs, rhs))
  630. {
  631. matches[matchedRows++] = rhs;
  632. recurse = matchedRows <= maxMatches;
  633. }
  634. else
  635. {
  636. //for mofn, check enough levels left to create a potential match, for others it will fail.
  637. unsigned remain = maxLevel-level;
  638. recurse = (numRows + remain >= minMatches);
  639. }
  640. if (recurse)
  641. {
  642. bool isFullMatch;
  643. if (level == maxLevel)
  644. isFullMatch = (!globalCompare || globalCompare->match(matchedRows, matches));
  645. else
  646. isFullMatch = tagMatches(level+1, matchedRows);
  647. if (isFullMatch)
  648. {
  649. valid = true;
  650. if (!alreadyMatched)
  651. right.noteMatch(i);
  652. //If the previous level is fully matched, and so is this one - then update the minimum fully matched level
  653. if ((level + 1 == fullyMatchedLevel) && right.isFullyMatched())
  654. fullyMatchedLevel--;
  655. //If all rows in this level and above are fully matched, then iterating any further will have no effect.
  656. //Could potentially reduce a O(N^m) to O(mN) if the majority of elements match.
  657. if (level >= fullyMatchedLevel)
  658. break;
  659. }
  660. }
  661. }
  662. }
  663. return valid;
  664. }
  665. else
  666. {
  667. //mofn may still be ok with a skipped level or two
  668. unsigned remain = maxLevel-level;
  669. if (numRows + remain >= minMatches)
  670. {
  671. if (level == maxLevel)
  672. return (!globalCompare || globalCompare->match(numRows, matches));
  673. else
  674. return tagMatches(level+1, numRows);
  675. }
  676. return false;
  677. }
  678. }
  679. void CFilteredSteppedMerger::tagMatches()
  680. {
  681. unsigned numInputs = inputs.ordinality();
  682. fullyMatchedLevel = numInputs;
  683. //for m of n, need to start matching at levels 0,1,.. numLevels - minMatches
  684. unsigned iterateLevels = numInputs - minMatches;
  685. for (unsigned level =0; level <= iterateLevels; level++)
  686. {
  687. CFilteredInputBuffer & left = inputs.item(level);
  688. ForEachItemIn(i, left.rows)
  689. {
  690. matches[0] = left.rows.item(i);
  691. bool thisMatched;
  692. //mofn(1) may not have another level, to just check global compare.
  693. if (level == numInputs-1)
  694. thisMatched = (!globalCompare || globalCompare->match(1, matches));
  695. else
  696. thisMatched = tagMatches(level+1, 1);
  697. if (thisMatched)
  698. {
  699. if (!left.isMatched(i))
  700. left.noteMatch(i);
  701. //Check if this level, and all above are now fully matched. If so, we're done.
  702. if ((level + 1 == fullyMatchedLevel) && left.isFullyMatched())
  703. {
  704. fullyMatchedLevel--;
  705. break;
  706. }
  707. }
  708. }
  709. if (level >= fullyMatchedLevel)
  710. break;
  711. }
  712. }
  713. void CFilteredSteppedMerger::postFilterRows()
  714. {
  715. tagMatches();
  716. unsigned max = inputs.ordinality();
  717. switch (joinKind)
  718. {
  719. case IHThorNWayMergeJoinArg::MJFinner:
  720. case IHThorNWayMergeJoinArg::MJFmofn:
  721. {
  722. for (unsigned i=0; i < max; i++)
  723. inputs.item(i).removeUnmatched();
  724. break;
  725. }
  726. case IHThorNWayMergeJoinArg::MJFleftouter:
  727. {
  728. for (unsigned i=1; i < max; i++)
  729. inputs.item(i).removeUnmatched();
  730. break;
  731. }
  732. case IHThorNWayMergeJoinArg::MJFleftonly:
  733. {
  734. inputs.item(0).removeMatched();
  735. unsigned max = inputs.ordinality();
  736. for (unsigned i=1; i < max; i++)
  737. inputs.item(i).reset();
  738. break;
  739. }
  740. }
  741. }
  742. //---------------------------------------------------------------------------
  743. CMergeJoinProcessor::CMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : helper(_arg)
  744. {
  745. mergeSteppingMeta = helper.querySteppingMeta();
  746. assertex(mergeSteppingMeta);
  747. stepCompare = mergeSteppingMeta->queryCompare();
  748. equalCompare = helper.queryEqualCompare();
  749. equalCompareEq = helper.queryEqualCompareEq();
  750. numEqualFields = helper.numEqualFields();
  751. flags = helper.getJoinFlags();
  752. matched = NULL;
  753. candidateEqualityRow = NULL;
  754. numExternalEqualFields = 0;
  755. conjunctionOptimizer = NULL;
  756. tempSeekBuffer = NULL;
  757. lowestSeekRow = NULL;
  758. combineConjunctions = true;
  759. allInputsAreOuterInputs = false;
  760. maxSeekRecordSize = 0;
  761. numInputs = 0;
  762. eof = true;
  763. assertex(helper.numOrderFields() == mergeSteppingMeta->getNumFields());
  764. bool hasPostfilter = false;
  765. thisSteppingMeta.init(mergeSteppingMeta->getNumFields(), mergeSteppingMeta->queryFields(), stepCompare, mergeSteppingMeta->queryDistance(), hasPostfilter);
  766. }
  767. CMergeJoinProcessor::~CMergeJoinProcessor()
  768. {
  769. afterProcessing();
  770. }
  771. void CMergeJoinProcessor::addInput(ISteppedInput * _input)
  772. {
  773. IInputSteppingMeta * _meta = _input->queryInputSteppingMeta();
  774. verifySteppingCompatible(_meta, mergeSteppingMeta);
  775. rawInputs.append(*LINK(_input));
  776. }
  777. void CMergeJoinProcessor::afterProcessing()
  778. {
  779. cleanupCandidates();
  780. if (outputProcessor)
  781. {
  782. outputProcessor->afterProcessingAll();
  783. outputProcessor.clear();
  784. }
  785. if (conjunctionOptimizer)
  786. {
  787. conjunctionOptimizer->afterProcessing();
  788. delete conjunctionOptimizer;
  789. conjunctionOptimizer = NULL;
  790. }
  791. delete [] matched;
  792. matched = NULL;
  793. inputs.kill();
  794. rawInputs.kill();
  795. orderedInputs.kill();
  796. if (lowestSeekRow)
  797. {
  798. inputAllocator->releaseRow(lowestSeekRow);
  799. lowestSeekRow = NULL;
  800. }
  801. if (tempSeekBuffer)
  802. {
  803. inputAllocator->releaseRow(tempSeekBuffer);
  804. tempSeekBuffer = NULL;
  805. }
  806. //Now free the allocators
  807. inputAllocator.clear();
  808. outputAllocator.clear();
  809. maxSeekRecordSize = 0;
  810. }
  811. void CMergeJoinProcessor::createTempSeekBuffer()
  812. {
  813. tempSeekBuffer = inputAllocator->createRow();
  814. #ifdef _DEBUG
  815. //Clear the complete tempSeekBBuffer record, so that toXML() can be used to trace the seek row in roxie
  816. if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
  817. {
  818. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, inputAllocator->queryOutputMeta()->getMinRecordSize());
  819. helper.createLowInputRow(rowBuilder);
  820. }
  821. #endif
  822. }
  823. void CMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  824. {
  825. inputAllocator.set(_inputAllocator);
  826. outputAllocator.set(_outputAllocator);
  827. //The seek components must all be fixed width, so the seek record size must be <= the minimum size of the input record
  828. maxSeekRecordSize = inputAllocator->queryOutputMeta()->getMinRecordSize();
  829. bool paranoid = (flags & IHThorNWayMergeJoinArg::MJFassertsorted) != 0;
  830. ForEachItemIn(i1, rawInputs)
  831. {
  832. ISteppedInput & cur = rawInputs.item(i1);
  833. inputs.append(* new CSteppedInputLookahead(&cur, cur.queryInputSteppingMeta(), inputAllocator, stepCompare, paranoid));
  834. }
  835. if (flags & IHThorNWayMergeJoinArg::MJFhasclearlow)
  836. {
  837. RtlDynamicRowBuilder rowBuilder(inputAllocator);
  838. size32_t size = helper.createLowInputRow(rowBuilder);
  839. lowestSeekRow = rowBuilder.finalizeRowClear(size);
  840. }
  841. cleanupCandidates();
  842. eof = false;
  843. numInputs = inputs.ordinality();
  844. matched = new bool[numInputs];
  845. if (numInputs == 0)
  846. eof = true;
  847. //Sort the inputs by the preferred processing order (if provided), ensuring no duplicates
  848. clearMatches();
  849. ForEachItemIn(i2, searchOrder)
  850. {
  851. unsigned next = searchOrder.item(i2);
  852. if (next < numInputs && !matched[next])
  853. {
  854. orderedInputs.append(OLINK(inputs.item(next)));
  855. matched[next] = true;
  856. }
  857. }
  858. //MORE: We really should move the most-stepable inputs to the start
  859. for (unsigned i3 = 0; i3 < numInputs; i3++)
  860. {
  861. if (!matched[i3])
  862. orderedInputs.append(OLINK(inputs.item(i3)));
  863. }
  864. }
  865. bool CMergeJoinProcessor::createConjunctionOptimizer()
  866. {
  867. if (inputs.ordinality())
  868. {
  869. conjunctionOptimizer = new CSteppedConjunctionOptimizer(inputAllocator, helper, this);
  870. if (gatherConjunctions(*conjunctionOptimizer) && conjunctionOptimizer->worthCombining())
  871. {
  872. conjunctionOptimizer->beforeProcessing();
  873. return true;
  874. }
  875. delete conjunctionOptimizer;
  876. conjunctionOptimizer = NULL;
  877. }
  878. combineConjunctions = false;
  879. return false;
  880. }
  881. void CMergeJoinProcessor::createMerger()
  882. {
  883. ICompareEq * extraCompare = helper.queryNonSteppedCompare();
  884. bool hasGlobalCompare = (flags & IHThorNWayMergeJoinArg::MJFglobalcompare) != 0;
  885. if (!extraCompare && !hasGlobalCompare)
  886. {
  887. Owned<CUnfilteredSteppedMerger> simpleMerger = new CUnfilteredSteppedMerger(numEqualFields);
  888. simpleMerger->init(inputAllocator, equalCompare, helper.queryMergeCompare(), (flags & IHThorNWayMergeJoinArg::MJFdedup) != 0, stepCompare);
  889. simpleMerger->initInputs(&inputs);
  890. outputProcessor.setown(simpleMerger.getClear());
  891. }
  892. else
  893. {
  894. Owned<CFilteredSteppedMerger> simpleMerger = new CFilteredSteppedMerger();
  895. simpleMerger->init(inputAllocator, helper, &inputs);
  896. outputProcessor.setown(simpleMerger.getClear());
  897. }
  898. }
  899. void CMergeJoinProcessor::createEqualityJoinProcessor()
  900. {
  901. if (numEqualFields >= helper.numOrderFields())
  902. outputProcessor.setown(new CEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  903. else
  904. outputProcessor.setown(new CSortedEqualityJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  905. }
  906. void CMergeJoinProcessor::finishCandidates()
  907. {
  908. if (outputProcessor)
  909. outputProcessor->afterProcessCandidates();
  910. assertex(hasCandidates());
  911. inputAllocator->releaseRow(candidateEqualityRow);
  912. candidateEqualityRow = NULL;
  913. }
  914. bool CMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  915. {
  916. allInputsAreOuterInputs = true;
  917. ForEachItemIn(i, inputs)
  918. {
  919. CSteppedInputLookahead & cur = inputs.item(i);
  920. if (!cur.gatherConjunctions(collector))
  921. collector.addInput(cur);
  922. else
  923. {
  924. collector.addPseudoInput(cur);
  925. allInputsAreOuterInputs = false;
  926. }
  927. }
  928. collector.addJoin(*this);
  929. return true;
  930. }
  931. const void * CMergeJoinProcessor::nextInputRow()
  932. {
  933. if (!hasCandidates() && !findCandidates(NULL, 0))
  934. return NULL;
  935. loop
  936. {
  937. const void * next = nextCandidate();
  938. if (next)
  939. return next;
  940. finishCandidates();
  941. //Abort early if externally optimized, and not proximity (since they may not have read all records for this equality)
  942. if ((numEqualFields == numExternalEqualFields) && candidatesExhaustEquality())
  943. return NULL;
  944. if (!findCandidates(NULL, 0))
  945. return NULL;
  946. }
  947. }
  948. const void * CMergeJoinProcessor::nextInputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  949. {
  950. //First check the next row from the candidates, it may be ok.
  951. if (hasCandidates())
  952. {
  953. unsigned compareFields = numFields < numEqualFields ? numFields : numEqualFields;
  954. //check whether the candidates could possibly return the match
  955. if (stepCompare->docompare(candidateEqualityRow, seek, compareFields) == 0)
  956. {
  957. const void * next = nextCandidateGE(seek, numFields, wasCompleteMatch, stepExtra);
  958. if (next)
  959. return next; // note must match equality to have been returned.
  960. }
  961. finishCandidates();
  962. }
  963. if (!findCandidates(seek, numFields))
  964. return NULL;
  965. return nextInputRow();
  966. }
  967. void CMergeJoinProcessor::resetEOF()
  968. {
  969. ForEachItemIn(i, inputs)
  970. inputs.item(i).resetEOF();
  971. }
  972. void CMergeJoinProcessor::queryResetEOF()
  973. {
  974. resetEOF();
  975. }
  976. const void * CMergeJoinProcessor::nextInGroup()
  977. {
  978. if (conjunctionOptimizer)
  979. return conjunctionOptimizer->next();
  980. if (combineConjunctions)
  981. {
  982. if (numExternalEqualFields == 0)
  983. {
  984. if (createConjunctionOptimizer())
  985. return conjunctionOptimizer->next();
  986. }
  987. else
  988. combineConjunctions = false; // being used inside a conjunction optimizer => don't create another..
  989. }
  990. return nextInputRow();
  991. }
  992. const void * CMergeJoinProcessor::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  993. {
  994. if (conjunctionOptimizer)
  995. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  996. if (combineConjunctions)
  997. {
  998. if (createConjunctionOptimizer())
  999. return conjunctionOptimizer->nextGE(seek, numFields, wasCompleteMatch, stepExtra);
  1000. }
  1001. return nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  1002. }
  1003. void CMergeJoinProcessor::startRestrictedJoin(const void * equalityRow, unsigned numEqualityFields)
  1004. {
  1005. assertex(numExternalEqualFields == 0);
  1006. numExternalEqualFields = numEqualityFields;
  1007. eof = false;
  1008. }
  1009. void CMergeJoinProcessor::stopRestrictedJoin()
  1010. {
  1011. numExternalEqualFields = 0;
  1012. if (hasCandidates())
  1013. finishCandidates();
  1014. //There are no more matches for this (outer) equality condition, so all active rows need to be thrown away.
  1015. if (outputProcessor)
  1016. outputProcessor->cleanupAllCandidates();
  1017. }
  1018. void CMergeJoinProcessor::setCandidateRow(const void * row, bool inputsMayBeEmpty, const bool * matched)
  1019. {
  1020. candidateEqualityRow = inputAllocator->linkRow(row);
  1021. const void * restrictionRow = (numEqualFields == numExternalEqualFields) ? NULL : candidateEqualityRow;
  1022. outputProcessor->beforeProcessCandidates(restrictionRow, inputsMayBeEmpty, matched);
  1023. }
  1024. //---------------------------------------------------------------------------
  1025. CAndMergeJoinProcessor::CAndMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1026. {
  1027. }
  1028. void CAndMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1029. {
  1030. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1031. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1032. createEqualityJoinProcessor();
  1033. else
  1034. createMerger();
  1035. }
  1036. bool CAndMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1037. {
  1038. if (eof)
  1039. return false;
  1040. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1041. const void * equalValue;
  1042. unsigned firstInput = 0;
  1043. if (inputsMustMatchEquality && allInputsAreOuterInputs)
  1044. {
  1045. //special case - all inputs are already advanced to the correct place, so just start generating candidates
  1046. //for nested conjunctions they may already be exausted though
  1047. equalValue = orderedInputs.item(firstInput).next();
  1048. if (!equalValue)
  1049. {
  1050. eof = true;
  1051. return false;
  1052. }
  1053. }
  1054. else
  1055. {
  1056. if (!seekValue)
  1057. {
  1058. numSeekFields = numEqualFields;
  1059. seekValue = lowestSeekRow;
  1060. }
  1061. bool matchedCompletely = true;
  1062. equalValue = orderedInputs.item(firstInput).next(seekValue, numSeekFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1063. if (!equalValue)
  1064. {
  1065. eof = true;
  1066. return false;
  1067. }
  1068. PreservedRow savedRow(inputAllocator);
  1069. unsigned matchCount = 0;
  1070. clearMatches();
  1071. if (matchedCompletely)
  1072. {
  1073. matched[firstInput] = true;
  1074. matchCount++;
  1075. }
  1076. else
  1077. {
  1078. equalValue = orderedInputs.item(firstInput).consume();
  1079. savedRow.setown(equalValue);
  1080. }
  1081. unsigned lastInput = firstInput;
  1082. while (matchCount != numInputs)
  1083. {
  1084. unsigned nextInput = nextToMatch(lastInput);
  1085. lastInput = nextInput;
  1086. bool matchedCompletely = true;
  1087. const void * nextRow = orderedInputs.item(nextInput).nextGE(equalValue, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1088. if (!nextRow)
  1089. {
  1090. eof = true;
  1091. return false;
  1092. }
  1093. #ifdef CHECK_CONSISTENCY
  1094. if (inputsMustMatchEquality)
  1095. {
  1096. if (equalCompare->docompare(nextRow, equalValue) != 0)
  1097. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1098. }
  1099. else
  1100. {
  1101. if (equalCompare->docompare(nextRow, equalValue) < 0)
  1102. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1103. }
  1104. #endif
  1105. if (!inputsMustMatchEquality)
  1106. {
  1107. if (!equalCompareEq->match(nextRow, equalValue))
  1108. {
  1109. //value didn't match => skip all the previously matched entries.
  1110. for (unsigned i=0; i < numInputs; i++)
  1111. {
  1112. if (matched[i])
  1113. {
  1114. matched[i] = false;
  1115. orderedInputs.item(i).skip();
  1116. if (--matchCount == 0)
  1117. break;
  1118. }
  1119. }
  1120. if (!matchedCompletely)
  1121. {
  1122. //Need to preserve nextRow, otherwise it will be gone after we skip
  1123. equalValue = orderedInputs.item(nextInput).consume();
  1124. savedRow.setown(equalValue);
  1125. }
  1126. else
  1127. equalValue = nextRow;
  1128. }
  1129. }
  1130. if (matchedCompletely)
  1131. {
  1132. matched[nextInput] = true;
  1133. matchCount++;
  1134. }
  1135. }
  1136. }
  1137. //Set up the mergeProcessor with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
  1138. //with the candidate.
  1139. //clone one of the rows
  1140. setCandidateRow(equalValue, false, NULL);
  1141. return true;
  1142. }
  1143. unsigned CAndMergeJoinProcessor::nextToMatch(unsigned lastInput) const
  1144. {
  1145. for (unsigned i=0; i < numInputs; i++)
  1146. {
  1147. //Don't seek on the last input again (it may have found a keyed match, but not matched the post filter)
  1148. if ((i != lastInput) && !matched[i])
  1149. return i;
  1150. }
  1151. throwUnexpected();
  1152. }
  1153. //---------------------------------------------------------------------------
  1154. CAndLeftMergeJoinProcessor::CAndLeftMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1155. {
  1156. combineConjunctions = false; // No advantage using this as the base of a combined conjunction
  1157. isLeftOnly = (flags & IHThorNWayMergeJoinArg::MJFkindmask) == IHThorNWayMergeJoinArg::MJFleftonly;
  1158. //Left only with a not stepped comparison needs to be done as a left outer at the stepping level
  1159. if (isLeftOnly && (helper.queryNonSteppedCompare() || helper.queryGlobalCompare()))
  1160. isLeftOnly = false;
  1161. }
  1162. void CAndLeftMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1163. {
  1164. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1165. createTempSeekBuffer();
  1166. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1167. createEqualityJoinProcessor();
  1168. else
  1169. createMerger();
  1170. }
  1171. bool CAndLeftMergeJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1172. {
  1173. if (eof)
  1174. return false;
  1175. CSteppedInputLookahead & input0 = inputs.item(0);
  1176. bool wasMatched = true;
  1177. const void * lhs = input0.next(seekValue, numSeekFields, wasMatched, nonBufferedMatchStepExtra);
  1178. assertex(wasMatched);
  1179. if (!lhs)
  1180. {
  1181. eof = true;
  1182. return false;
  1183. }
  1184. unsigned matchCount = 1;
  1185. while (matchCount != numInputs)
  1186. {
  1187. bool matchedCompletely = true; // we don't care what the next rhs value is - as long as it can't match the left
  1188. const void * rhs = orderedInputs.item(matchCount).nextGE(lhs, numEqualFields, matchedCompletely, unknownFrequencyTermStepExtra);
  1189. if (rhs)
  1190. {
  1191. int c = equalCompare->docompare(rhs, lhs);
  1192. if (c < 0)
  1193. throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
  1194. if (c == 0)
  1195. {
  1196. assertex(matchedCompletely);
  1197. //previously the (matchCount+1) test wasn't here, so it aborted as soon as there was any match.
  1198. if (isLeftOnly && (matchCount+1 == numInputs))
  1199. {
  1200. if (numEqualFields == numExternalEqualFields)
  1201. {
  1202. //I think this is worth doing here...
  1203. //Skip input0 to a mismatch value, so the optimizer doesn't waste time reading extra equalities
  1204. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1205. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1206. input0.skip(); // invalidates lhs
  1207. if (calculatedNextSeek)
  1208. {
  1209. bool wasMatched = true;
  1210. input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1211. }
  1212. eof = true;
  1213. return false;
  1214. }
  1215. //Create the next join value if that is possible
  1216. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1217. bool calculatedNextSeek = helper.createNextJoinValue(rowBuilder, lhs);
  1218. input0.skip(); // invalidates lhs
  1219. bool wasMatched = true;
  1220. if (calculatedNextSeek)
  1221. lhs = input0.nextGE(tempSeekBuffer, numEqualFields, wasMatched, nonBufferedMatchStepExtra);
  1222. else
  1223. lhs = input0.next();
  1224. if (!lhs)
  1225. {
  1226. eof = true;
  1227. return false;
  1228. }
  1229. matchCount = 0; //incremented at tail of loop
  1230. }
  1231. }
  1232. else
  1233. break;
  1234. }
  1235. else
  1236. break;
  1237. matchCount++;
  1238. }
  1239. clearMatches();
  1240. matched[0] = true;
  1241. if (matchCount != numInputs)
  1242. {
  1243. //Failed to match completely => generate a match for just the left. Skip any matched rows so far and break out.
  1244. for (unsigned i=1; i < matchCount; i++)
  1245. orderedInputs.item(i).skip();
  1246. matchCount = 1;
  1247. }
  1248. else
  1249. {
  1250. for (unsigned i=1; i < numInputs; i++)
  1251. matched[i] = true;
  1252. }
  1253. //LEFT ONLY will only merge 1 stream, LEFT OUTER will merge as many as match LEFT
  1254. setCandidateRow(lhs, true, matched);
  1255. return true;
  1256. }
  1257. bool CAndLeftMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1258. {
  1259. CSteppedInputLookahead & cur = inputs.item(0);
  1260. if (!cur.gatherConjunctions(collector))
  1261. collector.addInput(cur);
  1262. collector.addJoin(*this);
  1263. return true;
  1264. }
  1265. //---------------------------------------------------------------------------
  1266. void BestMatchManager::associate(unsigned input, const void * value)
  1267. {
  1268. unsigned curIndex = 0;
  1269. while (curIndex != numEntries)
  1270. {
  1271. BestMatchItem & cur = matches.item(curIndex);
  1272. int c = compare->docompare(value, cur.value);
  1273. if (c <= 0)
  1274. {
  1275. if (c == 0)
  1276. {
  1277. //insert at the end of the duplicates
  1278. curIndex += cur.duplicates;
  1279. cur.duplicates++;
  1280. }
  1281. //Move a record at the end of the list to the correct position, ready for updating.
  1282. if (curIndex != numEntries)
  1283. matches.rotateR(curIndex, numEntries);
  1284. break; // now go and modify record at position curIndex
  1285. }
  1286. curIndex += cur.duplicates;
  1287. }
  1288. assertex(matches.isItem(curIndex));
  1289. BestMatchItem & inserted = matches.item(curIndex);
  1290. inserted.duplicates = 1;
  1291. inserted.value = value;
  1292. inserted.input = input;
  1293. numEntries++;
  1294. return;
  1295. }
  1296. unsigned BestMatchManager::getValueOffset(unsigned idx) const
  1297. {
  1298. unsigned offset = 0;
  1299. while (idx--)
  1300. offset += matches.item(offset).duplicates;
  1301. return offset;
  1302. }
  1303. void BestMatchManager::init(ICompare * _compare, unsigned numInputs)
  1304. {
  1305. compare = _compare;
  1306. numEntries = 0;
  1307. for (unsigned i=0; i < numInputs; i++)
  1308. matches.append(* new BestMatchItem);
  1309. }
  1310. void BestMatchManager::kill()
  1311. {
  1312. matches.kill();
  1313. }
  1314. unsigned BestMatchManager::getInput(unsigned whichValue, unsigned inputIndex) const
  1315. {
  1316. return matches.item(getValueOffset(whichValue) + inputIndex).input;
  1317. }
  1318. unsigned BestMatchManager::getInput0(unsigned inputIndex) const
  1319. {
  1320. return matches.item(inputIndex).input;
  1321. }
  1322. unsigned BestMatchManager::numInputs(unsigned whichValue) const
  1323. {
  1324. return matches.item(getValueOffset(whichValue)).duplicates;
  1325. }
  1326. void BestMatchManager::remove(unsigned whichValue)
  1327. {
  1328. unsigned offset = getValueOffset(whichValue);
  1329. unsigned duplicates = matches.item(offset).duplicates;
  1330. matches.rotateLN(offset, numEntries-1, duplicates);
  1331. numEntries -= duplicates;
  1332. }
  1333. const void * BestMatchManager::queryValue(unsigned whichValue) const
  1334. {
  1335. return matches.item(getValueOffset(whichValue)).value;
  1336. }
  1337. //---------------------------------------------------------------------------
  1338. CMofNMergeJoinProcessor::CMofNMergeJoinProcessor(IHThorNWayMergeJoinArg & _arg) : CMergeJoinProcessor(_arg)
  1339. {
  1340. combineConjunctions = false;
  1341. alive = NULL;
  1342. candidateMask = NULL;
  1343. minMatches = 0;
  1344. maxMatches = 0;
  1345. numActive = 0;
  1346. }
  1347. void CMofNMergeJoinProcessor::afterProcessing()
  1348. {
  1349. delete [] alive;
  1350. delete [] candidateMask;
  1351. alive = NULL;
  1352. candidateMask = NULL;
  1353. matches.kill();
  1354. CMergeJoinProcessor::afterProcessing();
  1355. }
  1356. void CMofNMergeJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1357. {
  1358. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1359. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1360. createEqualityJoinProcessor();
  1361. else
  1362. createMerger();
  1363. minMatches = helper.getMinMatches();
  1364. maxMatches = helper.getMaxMatches();
  1365. if (minMatches == 0)
  1366. throw MakeStringException(99, "Need a non-zero minimum number of matches");
  1367. alive = new bool [numInputs];
  1368. candidateMask = new bool [numInputs];
  1369. for (unsigned i= 0; i < numInputs; i++)
  1370. alive[i] = true;
  1371. numActive = numInputs;
  1372. matches.init(equalCompare, numInputs);
  1373. }
  1374. bool CMofNMergeJoinProcessor::findCandidates(const void * originalSeekValue, unsigned numOriginalSeekFields)
  1375. {
  1376. if (numActive < minMatches)
  1377. return false;
  1378. unsigned numFreeToMismatch = numActive - minMatches;
  1379. const void * seekValue = originalSeekValue;
  1380. unsigned numSeekFields = numOriginalSeekFields;
  1381. //This should be true, because after candidates are matched their values are removed.
  1382. assertex(matches.numInputs() <= numFreeToMismatch); //
  1383. //MORE: This needs rewriting, so that mismatches are handled coorectly. In particular,
  1384. while (matches.numInputs() < numActive)
  1385. {
  1386. unsigned nextInput = nextToMatch();
  1387. bool matchedCompletely = true;
  1388. // MORE: This needs rewriting, so that mismatches are handled coorectly. In particular, the matches need to retain information about whether
  1389. // they matched fully, since that will optimize where could be sought next.
  1390. const void * value = inputs.item(nextInput).next(seekValue, numSeekFields, matchedCompletely, nonBufferedMatchStepExtra);
  1391. //NOTE: matchedCompletely is currently always true. More work is needed if not true.
  1392. assertex(matchedCompletely);
  1393. if (value)
  1394. {
  1395. if (matchedCompletely)
  1396. {
  1397. matched[nextInput] = true;
  1398. matches.associate(nextInput, value);
  1399. }
  1400. }
  1401. else
  1402. {
  1403. alive[nextInput] = false;
  1404. numActive--;
  1405. numFreeToMismatch--;
  1406. if (numActive < minMatches)
  1407. return false;
  1408. }
  1409. unsigned matchCount = matches.numInputs();
  1410. if (matchCount > numFreeToMismatch)
  1411. {
  1412. unsigned numMatch0 = matches.numInputs0();
  1413. if ((matchCount - numMatch0 > numFreeToMismatch) || (numMatch0 > maxMatches))
  1414. {
  1415. //clear seekValue, because seek value won't be valid any more after the skips - may be updated later.
  1416. seekValue = originalSeekValue;
  1417. numSeekFields = numOriginalSeekFields;
  1418. //No way that the first element is going to match, so remove all inputs associated with it.
  1419. for (unsigned i= 0; i < numMatch0; i++)
  1420. {
  1421. unsigned input = matches.getInput0(i);
  1422. inputs.item(input).skip();
  1423. matched[input] = false;
  1424. }
  1425. matches.remove(0);
  1426. }
  1427. //Lowest element now provides the best seek position.
  1428. if (matches.numInputs() > numFreeToMismatch)
  1429. {
  1430. seekValue = matches.queryValue(0);
  1431. numSeekFields = numEqualFields;
  1432. }
  1433. }
  1434. }
  1435. //matches(0) contains the next match set, set a set of flags indicating which inputs to use
  1436. unsigned numMatches = matches.numInputs0();
  1437. for (unsigned i1=0; i1< numInputs; i1++)
  1438. candidateMask[i1] = false;
  1439. for (unsigned i2=0; i2 < numMatches; i2++)
  1440. candidateMask[matches.getInput0(i2)] = true;
  1441. setCandidateRow(matches.queryValue(0), true, candidateMask);
  1442. //Now cleanup housekeeping, so that findCandidates() is ready to find the next block
  1443. for (unsigned i3=0; i3 < numInputs; i3++)
  1444. if (candidateMask[i3])
  1445. matched[i3] = false;
  1446. matches.remove(0);
  1447. return true;
  1448. }
  1449. bool CMofNMergeJoinProcessor::gatherConjunctions(ISteppedConjunctionCollector & collector)
  1450. {
  1451. //MORE: We may need to create pseudo inputs in order to process these optimially.
  1452. return false;
  1453. }
  1454. unsigned CMofNMergeJoinProcessor::nextToMatch() const
  1455. {
  1456. for (unsigned i= 0; i < numInputs; i++)
  1457. {
  1458. if (alive[i] && !matched[i])
  1459. return i;
  1460. }
  1461. throwUnexpected();
  1462. }
  1463. //---------------------------------------------------------------------------
  1464. /*
  1465. NOTES on the distances... this is far from simple once you get arbitrary trees involved.
  1466. given a join expression right.x between left.x - a and left.x + b i.e. up to a Before and b after
  1467. a is maxRightBeforeLeft
  1468. b is maxLeftBeforeRight
  1469. We define a function D(x,y) which is the maximum value which can be deducted from row a, to provide a valid value for row b
  1470. Given a tree of join expressions, we can calculate a distance function between any pair of inputs.
  1471. J1(a,b,c) = D(i,i+1)=4, D(i+1,i) = 10
  1472. J2(d, e) = D(i,i+1)=-1, D(i+1,i) = 12
  1473. J3(J1, J2) = D(i,i+1)=0 D(i+1,i) = 5
  1474. =>
  1475. For each join we define
  1476. D(i, lowest) - maximum value to deduct from row i to obtain the lowest
  1477. D(highest, i) - maximum value to deduct from highest to obtain row i
  1478. by definition these must both be >= 0, for a simple input they are both 0
  1479. A join's extend is given by
  1480. D(highest,lowest) = max(D(highest,i)+D(i,lowest))
  1481. We're only interested in the maximum values, which are obtained by the maximum distances between the elements. The lowest and highest memebers
  1482. of the group are going to be the ends. So we use the maximum of those distances to work out D(i,low) and D(high, i), being careful to only use
  1483. the range if it is valid (e.g., the end must be possible to be the highest/lowest)
  1484. D(a,b) = 4, D(b,a) = 10
  1485. D(b,c) = 4, D(c,b) = 10
  1486. D(a,c) = 8, D(c,a) = 20
  1487. D(d,e) = -1, D(e,d) = 12
  1488. D(a, lowest) = 8 D(highest, a) = 20
  1489. D(b, lowest) = 10 D(highest, b) = 10
  1490. D(c, lowest) = 20 D(highest, c) = 8
  1491. D(highest, lowest) = 28
  1492. Then assuming the left is the highest and the right is the lowest we have
  1493. D(a,e) = DJ1(a, lowest) + DJ3(i,i+1) + DJ2(highest, e)
  1494. = 8 + 0 + 0
  1495. For >2 terms, you also need to take into account the size of a term given by D(highest,lowest)
  1496. */
  1497. inline unsigned __int64 adjustRangeValue(unsigned __int64 rangeValue, __int64 delta)
  1498. {
  1499. if ((delta >= 0) || (rangeValue > (unsigned __int64)-delta))
  1500. return rangeValue + delta;
  1501. return 0;
  1502. }
  1503. //---------------------------------------------------------------------------
  1504. //This class is created for each each input of a nary-join, it maintains a queue of potential records.
  1505. CNaryJoinLookaheadQueue::CNaryJoinLookaheadQueue(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookahead * _input, CNaryJoinLookaheadQueue * _left, const void * * _activeRowPtr) : helper(_helper), rows(_inputAllocator), unmatchedRows(_inputAllocator)
  1506. {
  1507. equalCompareEq = helper.queryEqualCompareEq();
  1508. nonSteppedCompareEq = helper.queryNonSteppedCompare();
  1509. numEqualFields = helper.numEqualFields();
  1510. stepCompare = helper.querySteppingMeta()->queryCompare();
  1511. input.set(_input);
  1512. activeRowPtr = _activeRowPtr;
  1513. left = _left;
  1514. equalityRow = NULL;
  1515. curRow = 0;
  1516. maxRow = 0;
  1517. numSkipped = 0;
  1518. done = true;
  1519. }
  1520. bool CNaryJoinLookaheadQueue::beforeProcessCandidates(const void * _equalityRow, bool needToVerifyNext)
  1521. {
  1522. done = false;
  1523. equalityRow = _equalityRow;
  1524. rows.kill();
  1525. numSkipped = 0;
  1526. if (matchedLeft)
  1527. matchedLeft->reset();
  1528. // next is guaranteed to match the equality condition for AND, proximity but not for m of n/left outer...
  1529. if (!needToVerifyNext || nextUnqueued())
  1530. {
  1531. consumeNextInput();
  1532. return true;
  1533. }
  1534. return false;
  1535. }
  1536. void CNaryJoinLookaheadQueue::clearPending()
  1537. {
  1538. rows.kill();
  1539. }
  1540. bool CNaryJoinLookaheadQueue::ensureNonEmpty()
  1541. {
  1542. if (rows.ordinality())
  1543. return true;
  1544. if (nextUnqueued())
  1545. {
  1546. consumeNextInput();
  1547. return true;
  1548. }
  1549. return false;
  1550. }
  1551. bool CNaryJoinLookaheadQueue::firstSelection()
  1552. {
  1553. if (!left)
  1554. {
  1555. assertex(maxRow != 0);
  1556. curRow = 0;
  1557. *activeRowPtr = rows.item(curRow);
  1558. return true;
  1559. }
  1560. if (!left->firstSelection())
  1561. return false;
  1562. return findValidSelection(0);
  1563. }
  1564. bool CNaryJoinLookaheadQueue::findValidSelection(unsigned initialRow)
  1565. {
  1566. assertex(left);
  1567. const unsigned max = maxRow;
  1568. unsigned candidateRow = initialRow;
  1569. loop
  1570. {
  1571. const void * leftRow = left->activeRow();
  1572. while (candidateRow < max)
  1573. {
  1574. const void * rightRow = rows.item(candidateRow);
  1575. if (!nonSteppedCompareEq || nonSteppedCompareEq->match(leftRow, rightRow))
  1576. {
  1577. curRow = candidateRow;
  1578. *activeRowPtr = rightRow;
  1579. return true;
  1580. }
  1581. candidateRow++;
  1582. }
  1583. if (!left->nextSelection())
  1584. return false;
  1585. candidateRow = 0;
  1586. }
  1587. }
  1588. const void * CNaryJoinLookaheadQueue::nextUnqueued()
  1589. {
  1590. if (equalityRow)
  1591. {
  1592. bool matches = true;
  1593. const void * next = input->nextGE(equalityRow, numEqualFields, matches, nonBufferedMismatchStepExtra);
  1594. if (next && matches && equalCompareEq->match(next, equalityRow))
  1595. return next;
  1596. return NULL;
  1597. }
  1598. else
  1599. return input->next();
  1600. }
  1601. bool CNaryJoinLookaheadQueue::nextSelection()
  1602. {
  1603. if (left)
  1604. return findValidSelection(curRow+1);
  1605. curRow++;
  1606. if (curRow >= maxRow)
  1607. return false;
  1608. *activeRowPtr = rows.item(curRow);
  1609. return true;
  1610. }
  1611. bool CNaryJoinLookaheadQueue::ensureCandidateExists(unsigned __int64 minDistance, unsigned __int64 maxDistance)
  1612. {
  1613. loop
  1614. {
  1615. const void * next = rows.head();
  1616. if (!next)
  1617. break;
  1618. unsigned __int64 distance = helper.extractRangeValue(next);
  1619. if (distance >= minDistance)
  1620. {
  1621. assertex(distance <= maxDistance);
  1622. return true;
  1623. }
  1624. rows.skip();
  1625. }
  1626. loop
  1627. {
  1628. const void * next = nextUnqueued();
  1629. if (!next)
  1630. return false;
  1631. unsigned __int64 distance = helper.extractRangeValue(next);
  1632. if (distance >= minDistance)
  1633. {
  1634. if (distance <= maxDistance)
  1635. {
  1636. consumeNextInput();
  1637. return true;
  1638. }
  1639. return false;
  1640. }
  1641. input->skip();
  1642. }
  1643. }
  1644. bool CNaryJoinLookaheadQueue::checkExistsGE(const void * seek, unsigned numFields)
  1645. {
  1646. loop
  1647. {
  1648. const void * next = rows.head();
  1649. if (!next)
  1650. return false;
  1651. if (stepCompare->docompare(next, seek, numFields) >= 0)
  1652. return true;
  1653. rows.skip();
  1654. }
  1655. }
  1656. unsigned CNaryJoinLookaheadQueue::readAheadTo(unsigned __int64 maxDistance, bool canConsumeBeyondMax)
  1657. {
  1658. const void * tail = rows.tail();
  1659. if (tail && helper.extractRangeValue(tail) > maxDistance)
  1660. {
  1661. unsigned limit = rows.ordinality() - 1;
  1662. //Already have all the records, return how many...
  1663. while (limit > 0)
  1664. {
  1665. const void * prev = rows.item(limit-1);
  1666. if (helper.extractRangeValue(prev) <= maxDistance)
  1667. return limit;
  1668. --limit;
  1669. }
  1670. return 0;
  1671. }
  1672. while (!done)
  1673. {
  1674. const void * next = nextUnqueued();
  1675. if (!next)
  1676. {
  1677. done = true;
  1678. break;
  1679. }
  1680. //This is a bit nasty. We need to consume the next value to ensure that the lowest spotter always has the next valid
  1681. //but it means we might be reading this input for too long
  1682. if (helper.extractRangeValue(next) > maxDistance)
  1683. {
  1684. if (!canConsumeBeyondMax)
  1685. break;
  1686. consumeNextInput();
  1687. return rows.ordinality()-1;
  1688. }
  1689. consumeNextInput();
  1690. }
  1691. return rows.ordinality();
  1692. }
  1693. void CNaryJoinLookaheadQueue::readCandidateAll()
  1694. {
  1695. loop
  1696. {
  1697. const void * next = nextUnqueued();
  1698. if (!next)
  1699. return;
  1700. consumeNextInput();
  1701. }
  1702. }
  1703. void CNaryJoinLookaheadQueue::skip()
  1704. {
  1705. if (matchedLeft && !matchedLeft->test(numSkipped))
  1706. {
  1707. unmatchedRows.enqueue(rows.dequeue());
  1708. }
  1709. else
  1710. rows.skip();
  1711. numSkipped++;
  1712. }
  1713. bool CNaryJoinLookaheadQueue::flushUnmatched()
  1714. {
  1715. while (rows.ordinality())
  1716. skip();
  1717. return unmatchedRows.ordinality() != 0;
  1718. }
  1719. //---------------------------------------------------------------------------
  1720. CProximityJoinProcessor::CProximityJoinProcessor(IHThorNWayMergeJoinArg & _helper) :
  1721. CMergeJoinProcessor(_helper)
  1722. {
  1723. maxRightBeforeLeft = 0;
  1724. maxLeftBeforeRight = 0;
  1725. }
  1726. void CProximityJoinProcessor::beforeProcessing(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator)
  1727. {
  1728. CMergeJoinProcessor::beforeProcessing(_inputAllocator, _outputAllocator);
  1729. createTempSeekBuffer();
  1730. //Have to delay creating the actual join joinProcessor because maxRightBeforeLeft() etc. can be onStart dependant.
  1731. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  1732. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  1733. //Handle phrases using a different class i) because the general scheme doesn't quite work and ii) for efficiency
  1734. if (flags & IHThorNWayMergeJoinArg::MJFtransform)
  1735. {
  1736. if ((maxRightBeforeLeft < 0 || maxLeftBeforeRight < 0))
  1737. outputProcessor.setown(new CAnchoredRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1738. else
  1739. outputProcessor.setown(new CProximityRangeJoinGenerator(inputAllocator, outputAllocator, helper, inputs));
  1740. }
  1741. else
  1742. createMerger();
  1743. }
  1744. bool CProximityJoinProcessor::findCandidates(const void * seekValue, unsigned numSeekFields)
  1745. {
  1746. unsigned firstInput = 0;//searchOrder.item(0);
  1747. bool wasCompleteMatch = true;
  1748. if (eof || !inputs.item(firstInput).next(seekValue, numSeekFields, wasCompleteMatch, nonBufferedMatchStepExtra))
  1749. return false;
  1750. unsigned matchCount = 1;
  1751. clearMatches();
  1752. matched[firstInput] = true;
  1753. const unsigned numJoinFields = numEqualFields + 1;
  1754. const bool inputsMustMatchEquality = (numEqualFields == numExternalEqualFields);
  1755. while (matchCount != numInputs)
  1756. {
  1757. unsigned nextInput = nextToMatch();
  1758. unsigned baseInput = getBestToSeekFrom(nextInput);
  1759. RtlStaticRowBuilder rowBuilder(tempSeekBuffer, maxSeekRecordSize);
  1760. helper.adjustRangeValue(rowBuilder, inputs.item(baseInput).next(), -maxDistanceBefore(baseInput, nextInput));
  1761. bool wasCompleteMatch = true;
  1762. //MORE: Would it help to allow mismatches? I would have thought so, but there was a previous comment sayimg "I don't think so because of the range calculation"
  1763. const void * nextRow = inputs.item(nextInput).nextGE(tempSeekBuffer, numJoinFields, wasCompleteMatch, nonBufferedMatchStepExtra);
  1764. assertex(wasCompleteMatch);
  1765. if (!nextRow)
  1766. {
  1767. eof = true;
  1768. return false;
  1769. }
  1770. if (inputsMustMatchEquality || equalityComponentMatches(nextRow, tempSeekBuffer))
  1771. {
  1772. //Now check if this new record causes other records to be too far away
  1773. unsigned __int64 thisRangeValue = helper.extractRangeValue(nextRow);
  1774. for (unsigned i=0; i<numInputs; i++)
  1775. {
  1776. if (matched[i])
  1777. {
  1778. unsigned __int64 seekRangeValue = adjustRangeValue(thisRangeValue, -maxDistanceBefore(nextInput, i));
  1779. if (getRangeValue(i) < seekRangeValue)
  1780. {
  1781. inputs.item(i).skip();
  1782. matched[i] = false;
  1783. if (--matchCount == 0)
  1784. break;
  1785. }
  1786. }
  1787. }
  1788. }
  1789. else
  1790. {
  1791. for (unsigned i=0; i<numInputs; i++)
  1792. {
  1793. if (matched[i])
  1794. {
  1795. inputs.item(i).skip();
  1796. matched[i] = false;
  1797. matchCount--;
  1798. }
  1799. }
  1800. }
  1801. matched[nextInput] = true;
  1802. matchCount++;
  1803. }
  1804. setCandidateRow(inputs.item(0).next(), false, NULL);
  1805. return true;
  1806. }
  1807. __int64 CProximityJoinProcessor::maxDistanceBefore(unsigned fixedInput, unsigned searchInput) const
  1808. {
  1809. assertex(outputProcessor); // sanity check to ensure this isn't called before maxXBeforeY are set up
  1810. if (searchInput < fixedInput)
  1811. return maxLeftBeforeRight * (fixedInput - searchInput);
  1812. else
  1813. return maxRightBeforeLeft * (searchInput - fixedInput);
  1814. }
  1815. unsigned CProximityJoinProcessor::nextToMatch() const
  1816. {
  1817. for (unsigned i=0; i < numInputs; i++)
  1818. {
  1819. unsigned next = i;//searchOrder.item(i);
  1820. if (!matched[next])
  1821. return next;
  1822. }
  1823. throwUnexpected();
  1824. }
  1825. //Choose the input to seek from that restricts the input being sought the most.
  1826. unsigned CProximityJoinProcessor::getBestToSeekFrom(unsigned seekInput) const
  1827. {
  1828. unsigned __int64 bestRangeValue = 0;
  1829. unsigned best = NotFound;
  1830. //MORE: This can be optimized!
  1831. for (unsigned i=0; i < numInputs; i++)
  1832. {
  1833. if (matched[i])
  1834. {
  1835. //Calculate the value of the distance
  1836. __int64 distanceBefore = maxDistanceBefore(i, seekInput);
  1837. unsigned __int64 rangeValue = adjustRangeValue(getRangeValue(i), -distanceBefore);
  1838. if (rangeValue >= bestRangeValue)
  1839. {
  1840. bestRangeValue = rangeValue;
  1841. best = i;
  1842. }
  1843. }
  1844. }
  1845. assertex(best != NotFound);
  1846. return best;
  1847. }
  1848. //---------------------------------------------------------------------------
  1849. //NULL passed to CSteppedInputLookahead first parameter means nextGE() must be overridden
  1850. CJoinGenerator::CJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  1851. helper(_helper), inputAllocator(_inputAllocator), outputAllocator(_outputAllocator)
  1852. {
  1853. state = JSdone;
  1854. unsigned flags = helper.getJoinFlags();
  1855. stepCompare = helper.querySteppingMeta()->queryCompare();
  1856. globalCompare = NULL;
  1857. if (flags & IHThorNWayMergeJoinArg::MJFglobalcompare)
  1858. globalCompare = helper.queryGlobalCompare();
  1859. unsigned numInputs = _inputs.ordinality();
  1860. rows = new const void * [numInputs];
  1861. CNaryJoinLookaheadQueue * prev = NULL;
  1862. ForEachItemIn(i, _inputs)
  1863. {
  1864. CNaryJoinLookaheadQueue * queue = new CNaryJoinLookaheadQueue(inputAllocator, helper, &_inputs.item(i), prev, rows + i);
  1865. inputs.append(*queue);
  1866. prev = queue;
  1867. }
  1868. isSpecialLeftJoin = false;
  1869. numActiveInputs = numInputs;
  1870. lastActiveInput = numInputs ? &inputs.tos() : NULL;
  1871. joinKind = (flags & IHThorNWayMergeJoinArg::MJFkindmask);
  1872. switch (joinKind)
  1873. {
  1874. case IHThorNWayMergeJoinArg::MJFleftonly:
  1875. case IHThorNWayMergeJoinArg::MJFleftouter:
  1876. if (helper.queryNonSteppedCompare() || globalCompare)
  1877. {
  1878. isSpecialLeftJoin = true;
  1879. if (numInputs)
  1880. inputs.item(0).trackUnmatched();
  1881. }
  1882. break;
  1883. case IHThorNWayMergeJoinArg::MJFmofn:
  1884. if (helper.queryNonSteppedCompare() || globalCompare)
  1885. throw MakeStringException(99, "MOFN JOIN with non stepped condition not yet supported");
  1886. break;
  1887. }
  1888. }
  1889. CJoinGenerator::~CJoinGenerator()
  1890. {
  1891. delete [] rows;
  1892. }
  1893. void CJoinGenerator::beforeProcessCandidates(const void * candidateRow, bool needToVerifyNext, const bool * matched)
  1894. {
  1895. if (needToVerifyNext)
  1896. {
  1897. numActiveInputs = 0;
  1898. CNaryJoinLookaheadQueue * prev = NULL;
  1899. ForEachItemIn(i, inputs)
  1900. {
  1901. CNaryJoinLookaheadQueue & cur = inputs.item(i);
  1902. if (cur.beforeProcessCandidates(candidateRow, needToVerifyNext))
  1903. {
  1904. cur.updateContext(prev, rows + numActiveInputs);
  1905. prev = &cur;
  1906. numActiveInputs++;
  1907. }
  1908. }
  1909. lastActiveInput = prev;
  1910. }
  1911. else
  1912. {
  1913. ForEachItemIn(i, inputs)
  1914. inputs.item(i).beforeProcessCandidates(candidateRow, needToVerifyNext);
  1915. }
  1916. state = JSfirst;
  1917. }
  1918. void CJoinGenerator::cleanupAllCandidates()
  1919. {
  1920. //Remove all pending candidates - only called if outer join optimization is enabled
  1921. //afterProcessCandidates() will already have been called.
  1922. ForEachItemIn(i, inputs)
  1923. inputs.item(i).clearPending();
  1924. }
  1925. void CJoinGenerator::afterProcessCandidates()
  1926. {
  1927. }
  1928. const void * CJoinGenerator::nextOutputRow()
  1929. {
  1930. RtlDynamicRowBuilder rowBuilder(outputAllocator, false);
  1931. loop
  1932. {
  1933. if (isSpecialLeftJoin)
  1934. {
  1935. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1936. loop
  1937. {
  1938. const void * unmatchedLeft = left.nextUnmatched();
  1939. if (!unmatchedLeft)
  1940. break;
  1941. rowBuilder.ensureRow();
  1942. size32_t retSize = helper.transform(rowBuilder, 1, &unmatchedLeft);
  1943. left.skipUnmatched();
  1944. if (retSize)
  1945. return rowBuilder.finalizeRowClear(retSize);
  1946. }
  1947. }
  1948. switch (state)
  1949. {
  1950. case JSdone:
  1951. if (isSpecialLeftJoin)
  1952. {
  1953. CNaryJoinLookaheadQueue & left = inputs.item(0);
  1954. left.readCandidateAll();
  1955. if (left.flushUnmatched())
  1956. break; // round again
  1957. }
  1958. return NULL;
  1959. case JShascandidate:
  1960. {
  1961. state = JSnextcandidate;
  1962. //If is left only join, and has an additional equality criteria, then ignore matches.
  1963. //If left only, and no extra equality - or only one dataset has matches, then all matches are real left only matches
  1964. if (isSpecialLeftJoin && (joinKind == IHThorNWayMergeJoinArg::MJFleftonly) && (numActiveInputs != 1))
  1965. break;
  1966. rowBuilder.ensureRow();
  1967. size32_t retSize = helper.transform(rowBuilder, numActiveInputs, rows);
  1968. if (retSize)
  1969. return rowBuilder.finalizeRowClear(retSize);
  1970. break;
  1971. }
  1972. case JSnextcandidate:
  1973. if (nextCandidate())
  1974. state = JShascandidate;
  1975. else
  1976. {
  1977. if (state != JSdone)
  1978. state = JSgathercandidates;
  1979. }
  1980. break;
  1981. case JSfirst:
  1982. case JSgathercandidates:
  1983. if (gatherNextCandidates())
  1984. state = JShascandidate;
  1985. else
  1986. state = JSdone;
  1987. break;
  1988. default:
  1989. throwUnexpected();
  1990. }
  1991. }
  1992. }
  1993. const void * CJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  1994. {
  1995. //A stupid version. We could possibly skip on the lowest value if we knew the fields were assigned from the lowest value in the input
  1996. //which would potentially save a lot of transforms.
  1997. //would also probably need the input to match the output.
  1998. loop
  1999. {
  2000. const void * next = nextOutputRow();
  2001. if (!next || stepCompare->docompare(next, seek, numFields) >= 0)
  2002. return next;
  2003. outputAllocator->releaseRow(next);
  2004. }
  2005. }
  2006. bool CJoinGenerator::firstSelection()
  2007. {
  2008. if (lastActiveInput->firstSelection())
  2009. {
  2010. if (globalCompare && !globalCompare->match(numActiveInputs, rows))
  2011. return nextSelection();
  2012. if (isSpecialLeftJoin)
  2013. inputs.item(0).noteMatched();
  2014. return true;
  2015. }
  2016. return false;
  2017. }
  2018. bool CJoinGenerator::nextSelection()
  2019. {
  2020. while (lastActiveInput->nextSelection())
  2021. {
  2022. if (!globalCompare || globalCompare->match(numActiveInputs, rows))
  2023. {
  2024. if (isSpecialLeftJoin)
  2025. inputs.item(0).noteMatched();
  2026. return true;
  2027. }
  2028. }
  2029. return false;
  2030. }
  2031. //---------------------------------------------------------------------------
  2032. CEqualityJoinGenerator::CEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2033. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2034. {
  2035. lowestInput = NULL;
  2036. }
  2037. void CEqualityJoinGenerator::afterProcessCandidates()
  2038. {
  2039. lowestInput = NULL;
  2040. CJoinGenerator::afterProcessCandidates();
  2041. }
  2042. bool CEqualityJoinGenerator::nextCandidate()
  2043. {
  2044. if (nextSelection())
  2045. return true;
  2046. selectNextLowestInput();
  2047. return false;
  2048. }
  2049. /*
  2050. o Walk the input which is guaranteed to be the lowest
  2051. o Once that is done throw away that record, and choose the next.
  2052. */
  2053. bool CEqualityJoinGenerator::doGatherNextCandidates()
  2054. {
  2055. ForEachItemIn(iInput, inputs)
  2056. {
  2057. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2058. if (&curInput != lowestInput)
  2059. curInput.setCandidateAll();
  2060. else
  2061. curInput.setCandidateLowest();
  2062. }
  2063. return firstSelection();
  2064. }
  2065. bool CEqualityJoinGenerator::gatherNextCandidates()
  2066. {
  2067. if (state == JSfirst)
  2068. {
  2069. prefetchAllCandidates();
  2070. selectLowestInput();
  2071. }
  2072. else if (lowestInput->empty())
  2073. return false;
  2074. loop
  2075. {
  2076. if (doGatherNextCandidates())
  2077. return true;
  2078. if (!selectNextLowestInput())
  2079. return false;
  2080. }
  2081. }
  2082. void CEqualityJoinGenerator::prefetchAllCandidates()
  2083. {
  2084. //could be done in parallel, but
  2085. ForEachItemIn(i, inputs)
  2086. {
  2087. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2088. curInput.readCandidateAll();
  2089. }
  2090. }
  2091. void CEqualityJoinGenerator::selectLowestInput()
  2092. {
  2093. ForEachItemIn(i, inputs)
  2094. {
  2095. CNaryJoinLookaheadQueue & curInput = inputs.item(i);
  2096. if (!curInput.empty())
  2097. {
  2098. lowestInput = &curInput;
  2099. return;
  2100. }
  2101. }
  2102. throwUnexpected();
  2103. }
  2104. bool CEqualityJoinGenerator::selectNextLowestInput()
  2105. {
  2106. lowestInput->skip();
  2107. if (lowestInput->empty())
  2108. {
  2109. state = JSdone;
  2110. return false;
  2111. }
  2112. return true;
  2113. }
  2114. //---------------------------------------------------------------------------
  2115. CSortedEqualityJoinGenerator::CSortedEqualityJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2116. CEqualityJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2117. {
  2118. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2119. lowestSpotter.initInputs();
  2120. }
  2121. CSortedEqualityJoinGenerator::~CSortedEqualityJoinGenerator()
  2122. {
  2123. lowestSpotter.cleanup();
  2124. }
  2125. void CSortedEqualityJoinGenerator::afterProcessCandidates()
  2126. {
  2127. lowestSpotter.reset();
  2128. CEqualityJoinGenerator::afterProcessCandidates();
  2129. }
  2130. void CSortedEqualityJoinGenerator::selectLowestInput()
  2131. {
  2132. unsigned iLowest = lowestSpotter.queryNextInput();
  2133. assertex(iLowest != NotFound);
  2134. lowestInput = &inputs.item(iLowest);
  2135. }
  2136. bool CSortedEqualityJoinGenerator::selectNextLowestInput()
  2137. {
  2138. lowestSpotter.skipRow();
  2139. if (lowestInput->empty())
  2140. {
  2141. state = JSdone;
  2142. return false;
  2143. }
  2144. CSortedEqualityJoinGenerator::selectLowestInput();
  2145. return true;
  2146. }
  2147. //---------------------------------------------------------------------------
  2148. CRangeJoinGenerator::CRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2149. CJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2150. {
  2151. maxRightBeforeLeft = helper.maxRightBeforeLeft();
  2152. maxLeftBeforeRight = helper.maxLeftBeforeRight();
  2153. }
  2154. //---------------------------------------------------------------------------
  2155. CAnchoredRangeJoinGenerator::CAnchoredRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2156. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs)
  2157. {
  2158. iLowest = maxRightBeforeLeft < 0 ? 0 : inputs.ordinality()-1;
  2159. lowestInput = &inputs.item(iLowest);
  2160. }
  2161. bool CAnchoredRangeJoinGenerator::nextCandidate()
  2162. {
  2163. if (nextSelection())
  2164. return true;
  2165. lowestInput->skip();
  2166. return false;
  2167. }
  2168. /*
  2169. o Walk the input which is guaranteed to be the lowest
  2170. o Once that is done throw away that record, and choose the next.
  2171. */
  2172. bool CAnchoredRangeJoinGenerator::doGatherNextCandidates()
  2173. {
  2174. const void * lowestRow = lowestInput->next();
  2175. if (!lowestRow)
  2176. return false;
  2177. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2178. ForEachItemIn(iInput, inputs)
  2179. {
  2180. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2181. if (iInput != iLowest)
  2182. {
  2183. __int64 maxLowestBeforeCur = maxDistanceAfterLowest(iInput);
  2184. assertex(maxLowestBeforeCur > 0);
  2185. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2186. if (!curInput.setCandidateRange(maxDistance, false))
  2187. return false;
  2188. }
  2189. }
  2190. lowestInput->setCandidateLowest();
  2191. return firstSelection();
  2192. }
  2193. const void * CAnchoredRangeJoinGenerator::nextOutputRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  2194. {
  2195. //Note: Skip any lower values that are less than seek value, but don't read any more
  2196. if (!lowestInput->checkExistsGE(seek, numFields))
  2197. return NULL;
  2198. return CRangeJoinGenerator::nextOutputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  2199. }
  2200. bool CAnchoredRangeJoinGenerator::nextMatchesAnyConsumed()
  2201. {
  2202. const void * lowestRow = lowestInput->next();
  2203. bool consumePending = false;
  2204. if (!lowestRow)
  2205. {
  2206. lowestRow = lowestInput->nextUnqueued();
  2207. if (!lowestRow)
  2208. return false;
  2209. consumePending = true;
  2210. }
  2211. //Throw any non-matching rows away, and return true if there are no other rows left.
  2212. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2213. ForEachItemIn(iInput, inputs)
  2214. {
  2215. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2216. if (iInput != iLowest)
  2217. {
  2218. //note: maxRightBeforeLeft is -minRightAfterLeft
  2219. __int64 minCurAfterLowest;
  2220. if (iInput < iLowest)
  2221. minCurAfterLowest = (-maxLeftBeforeRight) * (iLowest - iInput);
  2222. else
  2223. minCurAfterLowest = (-maxRightBeforeLeft) * (iInput - iLowest);
  2224. assertex(minCurAfterLowest >= 0);
  2225. if (!curInput.ensureCandidateExists(lowestDistance+minCurAfterLowest, lowestDistance + maxDistanceAfterLowest(iInput)))
  2226. return false;
  2227. }
  2228. }
  2229. //A potential match, so consume the potential start word and try again
  2230. if (consumePending)
  2231. lowestInput->consumeNextInput();
  2232. return true;
  2233. }
  2234. bool CAnchoredRangeJoinGenerator::gatherNextCandidates()
  2235. {
  2236. loop
  2237. {
  2238. if (!nextMatchesAnyConsumed())
  2239. return false;
  2240. if (doGatherNextCandidates())
  2241. return true;
  2242. lowestInput->skip();
  2243. }
  2244. }
  2245. //---------------------------------------------------------------------------
  2246. CProximityRangeJoinGenerator::CProximityRangeJoinGenerator(IEngineRowAllocator * _inputAllocator, IEngineRowAllocator * _outputAllocator, IHThorNWayMergeJoinArg & _helper, CSteppedInputLookaheadArray & _inputs) :
  2247. CRangeJoinGenerator(_inputAllocator, _outputAllocator, _helper, _inputs), lowestSpotter(inputs)
  2248. {
  2249. lowestSpotter.init(inputAllocator, helper.queryMergeCompare(), helper.querySteppingMeta()->queryCompare());
  2250. lowestSpotter.initInputs();
  2251. }
  2252. CProximityRangeJoinGenerator::~CProximityRangeJoinGenerator()
  2253. {
  2254. lowestSpotter.cleanup();
  2255. }
  2256. void CProximityRangeJoinGenerator::afterProcessCandidates()
  2257. {
  2258. lowestSpotter.reset();
  2259. CRangeJoinGenerator::afterProcessCandidates();
  2260. }
  2261. bool CProximityRangeJoinGenerator::nextCandidate()
  2262. {
  2263. if (nextSelection())
  2264. return true;
  2265. if (!lowestSpotter.skipNextLowest())
  2266. state = JSdone;
  2267. return false;
  2268. }
  2269. /*
  2270. First version.....
  2271. o Walk the input datasets in the order lowest first.
  2272. o Perform the cross product of that record with all others that could possibly match.
  2273. o Once that is done throw away that record, and choose the next.
  2274. o Abort as soon as any of the inputs contains no records within potential range.
  2275. */
  2276. bool CProximityRangeJoinGenerator::gatherNextCandidates(unsigned iLowest)
  2277. {
  2278. CNaryJoinLookaheadQueue & lowestInput = inputs.item(iLowest);
  2279. const void * lowestRow = lowestInput.next();
  2280. unsigned __int64 lowestDistance = helper.extractRangeValue(lowestRow);
  2281. ForEachItemIn(iInput, inputs)
  2282. {
  2283. CNaryJoinLookaheadQueue & curInput = inputs.item(iInput);
  2284. if (iInput != iLowest)
  2285. {
  2286. __int64 maxLowestBeforeCur;
  2287. if (iInput < iLowest)
  2288. maxLowestBeforeCur = maxRightBeforeLeft * (iLowest - iInput);
  2289. else
  2290. maxLowestBeforeCur = maxLeftBeforeRight * (iInput - iLowest);
  2291. assertex(maxLowestBeforeCur >= 0); // should have created an anchored varient if not true
  2292. // maxLowestBeforeCur = maxCurAfterLowest
  2293. unsigned __int64 maxDistance = lowestDistance + maxLowestBeforeCur;
  2294. if (!curInput.setCandidateRange(maxDistance, true))
  2295. return false;
  2296. }
  2297. else
  2298. curInput.setCandidateLowest();
  2299. }
  2300. return firstSelection();
  2301. }
  2302. bool CProximityRangeJoinGenerator::gatherNextCandidates()
  2303. {
  2304. loop
  2305. {
  2306. unsigned iLowest = lowestSpotter.queryNextInput();
  2307. assertex(iLowest != NotFound);
  2308. if (gatherNextCandidates(iLowest))
  2309. return true;
  2310. //It would be really nice to break out early if there were no more potential matches, but even if there
  2311. //is only one matching stream we need to keep walking the consumed records, because the later records
  2312. //may pull in the relevant related records, and we can't sensibly put back the consumed records
  2313. if (!lowestSpotter.skipNextLowest())
  2314. {
  2315. //No more records within this document => can't ever match
  2316. state = JSdone;
  2317. return false;
  2318. }
  2319. }
  2320. }