thorstep2.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jlog.hpp"
  15. #include "jqueue.tpp"
  16. #include "jexcept.hpp"
  17. #include "thorcommon.hpp"
  18. #include "thorstep2.ipp"
  19. //------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  20. const static SmartStepExtra knownLowestFrequencyTermStepExtra(SSEFreadAhead, NULL);
  21. const static SmartStepExtra knownOtherFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  22. const static SmartStepExtra presumedLowestFrequencyTermStepExtra(SSEFreturnMismatches|SSEFreadAhead, NULL);
  23. const static SmartStepExtra presumedLowestFrequencyTermReseekStepExtra(SSEFreadAhead, NULL);
  24. const static SmartStepExtra presumedOtherFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  25. const static SmartStepExtra unknownFrequencyTermStepExtra(SSEFreturnMismatches, NULL);
  26. //#define TRACE_JOIN_OPTIMIZATION
  27. /*
  28. It is REALLY important that we pick the best input for skipping on, and there are a few issues:
  29. a) We don't want occasional jumps in a more signficant join componet - e.g., part / source from (part, source, doc)
  30. to pick an otherwise infrequent term
  31. b) Ordering within the data (e.g., by state) can mean that one input may occasionally skip a long way
  32. c) It is really important to know what the best input is - because we want to post filter that input as heavily as possible
  33. d) It hardly matters which other input is chosen in combination with the best input, because they are all likely to end up
  34. (i) not matching unless they are highly correlated
  35. (ii) not skipping anything in the least frequent index.
  36. e) Spending a little bit of time working out the best input will pay dividends
  37. So the latest approach is as follows:
  38. 1) Use the median skip distance from the last 3 values - that should prevent insignficant join components skewing the results
  39. 2) Make sure all inputs have been read at least 3 times before we allow the best input to be post filtered.
  40. 3) Iterate through each of the other less significant inputs in turn, so that if the best input changes, or we got it wrong, or an input
  41. occasionally skips a long way (e.g. state) it will get corrected/used.
  42. */
  43. int compareInitialInputOrder(CInterface * const * _left, CInterface * const * _right)
  44. {
  45. OrderedInput * left = static_cast<OrderedInput *>(*_left);
  46. OrderedInput * right = static_cast<OrderedInput *>(*_right);
  47. if (left->hasPriority())
  48. {
  49. if (!right->hasPriority())
  50. return -1;
  51. if (left->getPriority() < right->getPriority())
  52. return -1;
  53. if (left->getPriority() > right->getPriority())
  54. return +1;
  55. return (int)(left->originalIndex - right->originalIndex);
  56. }
  57. else
  58. {
  59. if (right->hasPriority())
  60. return +1;
  61. }
  62. if (left->canOptimizeOrder())
  63. {
  64. if (!right->canOptimizeOrder())
  65. return -1;
  66. }
  67. else
  68. {
  69. if (right->canOptimizeOrder())
  70. return +1;
  71. }
  72. return (int)(left->originalIndex - right->originalIndex);
  73. }
  74. OrderedInput::OrderedInput(CSteppedInputLookahead & _input, unsigned _originalIndex, bool hasDistance) : originalIndex(_originalIndex)
  75. {
  76. input.set(&_input);
  77. matched = false;
  78. skipCost = 1; // This should be enhanced...
  79. numSamples = 0;
  80. nextDistanceIndex = 0;
  81. distanceCalculator = NULL;
  82. stepFlags = input->getStepFlags();
  83. priority = input->getPriority();
  84. if (hasDistance)
  85. distanceCalculator = _input.queryDistance();
  86. optimizeOrder = !hasPriority() && (distanceCalculator && input->canSmartStep());
  87. // make it low initially to ensure that all inputs are sought to get some kind of idea of how good they are
  88. medianDistance.set(0,0);
  89. }
  90. void OrderedInput::updateSequentialDistance(const void * seek, const void * next, unsigned numFields, bool seekToSameLocation, bool nowMatched)
  91. {
  92. //Note: With large numbers of matches in all documents, the ordered inputs will remain stable,
  93. //because the first input will have a better distance than the exact matches.
  94. assertex(distanceCalculator);
  95. SkipDistance thisDistance;
  96. thisDistance.field = distanceCalculator->getDistance(thisDistance.distance, seek, next, numFields);
  97. #ifdef IGNORE_SEEK_TO_SELF
  98. //If the previous seek was a nextGE() that returned a row which didn't match the criteia, that then matched
  99. //a different index, and we then get a subsequent call which does match property on the same record
  100. //then we shouldn't include this is the skipping distance.
  101. if (thisDistance.field == DISTANCE_EXACT_MATCH)
  102. {
  103. assertex(nowMatched);
  104. if (nowMatched && seekToSameLocation)
  105. return;
  106. }
  107. #endif
  108. distances[nextDistanceIndex].set(thisDistance);
  109. if (nextDistanceIndex == MaxDistanceSamples-1)
  110. nextDistanceIndex = 0;
  111. else
  112. nextDistanceIndex++;
  113. if (numSamples < MaxDistanceSamples)
  114. numSamples++;
  115. if (numSamples < MaxDistanceSamples)
  116. {
  117. //choose the last to start with - fairly arbitrary.
  118. medianDistance.set(thisDistance);
  119. }
  120. else
  121. {
  122. int c01 = distances[0].compare(distances[1]);
  123. unsigned median;
  124. if (c01 == 0)
  125. {
  126. //Same => the median must be the same as either of them.
  127. median = 0;
  128. }
  129. else
  130. {
  131. int c02 = distances[0].compare(distances[2]);
  132. if (c01 < 0)
  133. {
  134. if (c02 >= 0)
  135. {
  136. //c <= a < b
  137. median = 0;
  138. }
  139. else
  140. {
  141. //a < b, a < c => smallest of b,c
  142. int c12 = distances[1].compare(distances[2]);
  143. median = c12 <= 0 ? 1 : 2;
  144. }
  145. }
  146. else
  147. {
  148. if (c02 <= 0)
  149. {
  150. // c >= a > b
  151. median = 0;
  152. }
  153. else
  154. {
  155. // a > b, a > c => median is largest if b,c
  156. int c12 = distances[1].compare(distances[2]);
  157. median = c12 >= 0 ? 1 : 2;
  158. }
  159. }
  160. }
  161. #ifdef TRACE_JOIN_OPTIMIZATION
  162. if (medianDistance.compare(distances[median]) != 0)
  163. DBGLOG("Median for input %d changed from %d:%" I64F "d to %d:%" I64F "d",
  164. originalIndex, medianDistance.field, medianDistance.distance,
  165. distances[median].field, distances[median].distance);
  166. #endif
  167. medianDistance.set(distances[median]);
  168. }
  169. }
  170. //------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  171. /*
  172. We walk the tree commoning up all conjunctions that share at least one equal field.
  173. numEqualFields = intersection of all conjunctions that are being combined
  174. numMergeFields = merge fields of root conjunction
  175. NB: numEqualityFields <= numMergeFields;
  176. //create INewSteppedInput wrappers for non-conjunction inputs
  177. //the XConjunctionOptimizers implement the stepped input otherwise.
  178. */
  179. CSteppedConjunctionOptimizer::CSteppedConjunctionOptimizer(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _arg, ISteppedInput * _root) : helper(_arg), inputAllocator(_inputAllocator)
  180. {
  181. mergeSteppingMeta = helper.querySteppingMeta();
  182. assertex(mergeSteppingMeta);
  183. stepCompare = mergeSteppingMeta->queryCompare();
  184. equalCompare = helper.queryEqualCompare();
  185. numEqualFields = helper.numEqualFields();
  186. numInputs = 0;
  187. numOptimizeInputs = 0;
  188. numPriorityInputs = 0;
  189. equalityRow = NULL;
  190. prevEqualityRow = NULL;
  191. prevPartitionRow = NULL;
  192. partitionCompare = NULL;
  193. if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhaspartition)
  194. partitionCompare = helper.queryPartitionCompareEq();
  195. rootActivity = _root;
  196. lowestSeekRow = NULL;
  197. seekMatchTicker = 0;
  198. inputsHaveMedian = false; // only relevant if (numOptimizeInputs != 0)
  199. inputHasPostfilter = false;
  200. inputIsDistributed = false;
  201. eof = false;
  202. maxOptimizeInput = 0;
  203. }
  204. CSteppedConjunctionOptimizer::~CSteppedConjunctionOptimizer()
  205. {
  206. afterProcessing();
  207. }
  208. void CSteppedConjunctionOptimizer::addInput(CSteppedInputLookahead & _input)
  209. {
  210. inputs.append(OLINK(_input));
  211. if (_input.hasPostFilter())
  212. inputHasPostfilter = true;
  213. if (_input.hasPriority())
  214. numPriorityInputs++;
  215. if (_input.readsRowsRemotely())
  216. inputIsDistributed = true;
  217. numInputs++;
  218. }
  219. void CSteppedConjunctionOptimizer::addPseudoInput(CSteppedInputLookahead & _input)
  220. {
  221. pseudoInputs.append(OLINK(_input));
  222. }
  223. void CSteppedConjunctionOptimizer::addJoin(ISteppedJoin & _join)
  224. {
  225. unsigned thisEqualFields = _join.getNumEqualFields();
  226. assertex(thisEqualFields);
  227. if (numEqualFields > thisEqualFields)
  228. numEqualFields = thisEqualFields;
  229. joins.append(OLINK(_join));
  230. }
  231. void CSteppedConjunctionOptimizer::afterProcessing()
  232. {
  233. if (hasCandidates())
  234. finishCandidates();
  235. inputs.kill();
  236. orderedInputs.kill();
  237. if (prevEqualityRow)
  238. {
  239. inputAllocator->releaseRow(prevEqualityRow);
  240. prevEqualityRow = NULL;
  241. }
  242. if (lowestSeekRow)
  243. {
  244. inputAllocator->releaseRow(lowestSeekRow);
  245. lowestSeekRow = NULL;
  246. }
  247. if (prevPartitionRow)
  248. {
  249. inputAllocator->releaseRow(prevPartitionRow);
  250. prevPartitionRow = NULL;
  251. }
  252. }
  253. void associateRemoteInputs(CIArrayOf<OrderedInput> & orderedInputs, unsigned numPriorityInputs)
  254. {
  255. //If we know for sure the primary input, then tag it as worth reading ahead - otherwise it will be dynamically set later.
  256. if (numPriorityInputs > 0)
  257. {
  258. OrderedInput & input0 = orderedInputs.item(0);
  259. //Only read ahead etc. if this is a real index - not if it is a join.
  260. if (!input0.isJoin())
  261. {
  262. input0.setReadAhead(true);
  263. input0.setAlwaysReadExact();
  264. }
  265. }
  266. //Work out the last input of known priority which is read remotely.
  267. unsigned maxPriorityRemote = numPriorityInputs;
  268. while ((maxPriorityRemote >= 2) && !orderedInputs.item(maxPriorityRemote-1).readsRowsRemotely())
  269. maxPriorityRemote--;
  270. //If the second ordered input is known to be read remotely, then we want to send multiple seek requests at the same time.
  271. //MORE: Maybe we should consider doing this to all other inputs if only one priority input is known.
  272. if (maxPriorityRemote >= 2)
  273. {
  274. for (unsigned i=1; i < maxPriorityRemote; i++)
  275. {
  276. IMultipleStepSeekInfo * seekInfo = orderedInputs.item(i-1).createMutipleReadWrapper();
  277. orderedInputs.item(i).createMultipleSeekWrapper(seekInfo);
  278. }
  279. }
  280. }
  281. void CSteppedConjunctionOptimizer::beforeProcessing()
  282. {
  283. //NB: This function is only called once, after we have decided it is worth processing.
  284. assertex(!eof); // just check it isn't called more than once
  285. assertex(numInputs);
  286. bool hasDistance = (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasdistance) != 0;
  287. for (unsigned i3 = 0; i3 < numInputs; i3++)
  288. {
  289. OrderedInput & next = *new OrderedInput(inputs.item(i3), i3, hasDistance);
  290. orderedInputs.append(next);
  291. if (next.canOptimizeOrder())
  292. numOptimizeInputs++;
  293. }
  294. //Sort so that inputs are ordered (priority-inputs, optimizable, non-optimizable)
  295. orderedInputs.sort(compareInitialInputOrder);
  296. //If only a single re-orderable input, treat it as unorderable.
  297. if (numOptimizeInputs == 1)
  298. {
  299. assertex(orderedInputs.item(numPriorityInputs).canOptimizeOrder());
  300. orderedInputs.item(numPriorityInputs).stopOptimizeOrder();
  301. numOptimizeInputs = 0;
  302. }
  303. maxOptimizeInput = numPriorityInputs + numOptimizeInputs;
  304. associateRemoteInputs(orderedInputs, numPriorityInputs);
  305. //MORE: If some inputs have known priority, and other remote inputs don't, then we could consider
  306. // connecting the unknown inputs to the last known inputs.
  307. ForEachItemIn(i4, joins)
  308. joins.item(i4).markRestrictedJoin(numEqualFields);
  309. assertex(helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow); // Don't support (very) old workunits that don't define this..
  310. if (helper.getJoinFlags() & IHThorNWayMergeJoinArg::MJFhasclearlow)
  311. {
  312. RtlDynamicRowBuilder rowBuilder(inputAllocator);
  313. size32_t size = helper.createLowInputRow(rowBuilder);
  314. lowestSeekRow = rowBuilder.finalizeRowClear(size);
  315. }
  316. }
  317. void CSteppedConjunctionOptimizer::finishCandidates()
  318. {
  319. rootActivity->resetEOF();
  320. ForEachItemIn(i1, inputs)
  321. inputs.item(i1).setRestriction(NULL, 0);
  322. ForEachItemIn(i2, joins)
  323. joins.item(i2).stopRestrictedJoin();
  324. ForEachItemIn(i3, pseudoInputs)
  325. pseudoInputs.item(i3).clearPending();
  326. if (prevEqualityRow)
  327. inputAllocator->releaseRow(prevEqualityRow);
  328. prevEqualityRow = equalityRow;
  329. equalityRow = NULL;
  330. }
  331. const void * CSteppedConjunctionOptimizer::next()
  332. {
  333. if (!hasCandidates() && !findCandidates(NULL, 0))
  334. return NULL;
  335. for (;;)
  336. {
  337. const void * next = rootActivity->nextInputRow();
  338. if (next)
  339. return next;
  340. finishCandidates();
  341. if (!findCandidates(NULL, 0))
  342. return NULL;
  343. }
  344. }
  345. const void * CSteppedConjunctionOptimizer::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  346. {
  347. //First check the next row from the candidates, it may be ok.
  348. if (hasCandidates())
  349. {
  350. if (numFields <= numEqualFields)
  351. {
  352. if (stepCompare->docompare(seek, equalityRow, numFields) == 0)
  353. return next();
  354. }
  355. else
  356. {
  357. //This would be pretty unusual. - proximity(or(x and y), z) might trigger it.
  358. if (stepCompare->docompare(seek, equalityRow, numEqualFields) == 0)
  359. {
  360. const void * nextRow = rootActivity->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
  361. if (nextRow)
  362. return nextRow;
  363. }
  364. }
  365. finishCandidates();
  366. }
  367. if (!findCandidates(seek, numFields))
  368. return NULL;
  369. //MORE: If isCompleteMatch is provided, and seek row doesn't match returned, then shouldn't do complex join processing.
  370. return next();
  371. }
  372. /*
  373. I considered using a (doubly) linked list for storing the ordered inputs as an alternative to an expanding array,
  374. but decided the array was simplest, and probably more efficient.
  375. A linked list would require 6 pointer modifications regardless of the number of places moved by the input, the
  376. expanding array requires (#places moved +1) pointers moved, which is likely to be smaller, and traversing the array is more
  377. efficient.
  378. */
  379. void CSteppedConjunctionOptimizer::getInputOrderText(StringBuffer & s)
  380. {
  381. ForEachItemIn(i, orderedInputs)
  382. {
  383. if (i) s.append(" ");
  384. s.append(orderedInputs.item(i).originalIndex);
  385. }
  386. }
  387. void CSteppedConjunctionOptimizer::updateOrder(unsigned whichInput)
  388. {
  389. //Change the position of this input in the ordering list, based on the distance skipped.
  390. OrderedInput & input = orderedInputs.item(whichInput);
  391. if ((whichInput > numPriorityInputs) && input.skipsFasterThan(orderedInputs.item(whichInput-1)))
  392. {
  393. unsigned prev = whichInput-1;
  394. while ((prev > numPriorityInputs) && input.skipsFasterThan(orderedInputs.item(prev-1)))
  395. prev--;
  396. //If this is probably now the lowest frequmcy input, enable read ahead for that input
  397. if ((prev == 0) && inputsHaveMedian)
  398. {
  399. orderedInputs.item(0).setReadAhead(false);
  400. input.setReadAhead(true);
  401. }
  402. orderedInputs.rotateR(prev, whichInput);
  403. #ifdef TRACE_JOIN_OPTIMIZATION
  404. StringBuffer ordered;
  405. getInputOrderText(ordered);
  406. DBGLOG("Input %d promoted from %d to %d [%s]", input.originalIndex, whichInput, prev, ordered.str());
  407. #endif
  408. }
  409. else if ((whichInput+1 < maxOptimizeInput) && orderedInputs.item(whichInput+1).skipsFasterThan(input))
  410. {
  411. unsigned next = whichInput+1;
  412. while ((next+1 < maxOptimizeInput) && orderedInputs.item(next+1).skipsFasterThan(input))
  413. next++;
  414. //If this is no longer probably the lowest frequmcy input, enable read ahead for other input
  415. if ((whichInput == 0) && inputsHaveMedian)
  416. {
  417. orderedInputs.item(1).setReadAhead(true);
  418. input.setReadAhead(false);
  419. }
  420. orderedInputs.rotateL(whichInput, next);
  421. #ifdef TRACE_JOIN_OPTIMIZATION
  422. StringBuffer ordered;
  423. getInputOrderText(ordered);
  424. DBGLOG("Input %d demoted from %d to %d [%s]", input.originalIndex, whichInput, next, ordered.str());
  425. #endif
  426. }
  427. }
  428. //This is the key function, once we have a candidate match we are very likely to generate some records.
  429. bool CSteppedConjunctionOptimizer::findCandidates(const void * seekValue, unsigned numSeekFields)
  430. {
  431. if (eof)
  432. return false;
  433. for (unsigned i=0; i < numInputs; i++)
  434. orderedInputs.item(i).matched = false;
  435. const void * equalValue;
  436. if (!seekValue)
  437. {
  438. //Always use next() with some buffer if possible - so that the early termination without post filtering
  439. //can be done (&matchedCompletely)
  440. numSeekFields = numEqualFields;
  441. if (prevEqualityRow)
  442. equalValue = prevEqualityRow;
  443. else
  444. equalValue = lowestSeekRow;
  445. }
  446. else
  447. equalValue = seekValue;
  448. PreservedRow savedRow(inputAllocator);
  449. unsigned matchCount = 0;
  450. unsigned inputProvidingSeek = (unsigned)NotFound;
  451. while (matchCount != numInputs)
  452. {
  453. unsigned nextInput = nextToMatch();
  454. OrderedInput & curInput = orderedInputs.item(nextInput);
  455. bool canOptimizeInputOrder = curInput.canOptimizeOrder();
  456. const SmartStepExtra * extra;
  457. if (!canOptimizeInputOrder)
  458. extra = (nextInput == 0) ? &knownLowestFrequencyTermStepExtra : &knownOtherFrequencyTermStepExtra;
  459. else if (inputsHaveMedian)
  460. extra = (nextInput == 0) ? &presumedLowestFrequencyTermStepExtra : &presumedOtherFrequencyTermStepExtra;
  461. else
  462. extra = &unknownFrequencyTermStepExtra;
  463. unsigned curOriginalInput = curInput.originalIndex;
  464. bool isReSeek = (inputProvidingSeek == curOriginalInput);
  465. bool matchedCompletely = true;
  466. const void * nextRow = curInput.next(equalValue, numSeekFields, isReSeek, matchedCompletely, *extra);
  467. //If we're seeking the most promising row, and it returned a mismatch, and even after seeking it is the best term for seeking,
  468. //then force the input to return a real result. (Since this sign. reduces the number of lookups in the second input)
  469. if (!matchedCompletely && (nextInput == 0) && inputsHaveMedian && nextRow)
  470. {
  471. if (curInput.skipsFasterThan(orderedInputs.item(1)))
  472. {
  473. matchedCompletely = true;
  474. nextRow = curInput.next(equalValue, numSeekFields, true, matchedCompletely, presumedLowestFrequencyTermReseekStepExtra);
  475. assertex(matchedCompletely);
  476. }
  477. }
  478. if (!nextRow)
  479. {
  480. eof = true;
  481. return false;
  482. }
  483. //Allow a partition option to indicate when the data is likely to change distribution
  484. //and for it to be worth us re-calculating the medians for all inputs
  485. if (partitionCompare && (numOptimizeInputs != 0) && (nextInput == 0))
  486. {
  487. if (prevPartitionRow)
  488. {
  489. if (!partitionCompare->match(nextRow, prevPartitionRow))
  490. {
  491. if (numPriorityInputs == 0)
  492. orderedInputs.item(0).setReadAhead(false);
  493. void * linked = inputAllocator->linkRow(nextRow);
  494. inputAllocator->releaseRow(prevPartitionRow);
  495. prevPartitionRow = linked;
  496. inputsHaveMedian = false;
  497. seekMatchTicker = 0;
  498. }
  499. }
  500. else
  501. prevPartitionRow = inputAllocator->linkRow(nextRow);
  502. }
  503. int c = 0;
  504. if (equalValue)
  505. {
  506. c = stepCompare->docompare(nextRow, equalValue, numSeekFields);
  507. if (canOptimizeInputOrder)
  508. {
  509. if (inputsHaveMedian)
  510. {
  511. //Only increment the seek counter if not the first input - since that is always picked if
  512. //not matched,
  513. if (nextInput != 0)
  514. seekMatchTicker++;
  515. }
  516. else
  517. {
  518. seekMatchTicker++;
  519. if (!inputsHaveMedian && (seekMatchTicker == numOptimizeInputs*MaxDistanceSamples))
  520. {
  521. inputsHaveMedian = true;
  522. if (numPriorityInputs == 0)
  523. orderedInputs.item(0).setReadAhead(true);
  524. }
  525. }
  526. }
  527. }
  528. assertex(c >= 0);
  529. if (c > 0)
  530. {
  531. if (matchCount)
  532. {
  533. //value didn't match => skip all the previously matched entries.
  534. for (unsigned i=0; i < numInputs; i++)
  535. {
  536. OrderedInput & cur = orderedInputs.item(i);
  537. if ((i != nextInput) && cur.matched)
  538. {
  539. cur.skip();
  540. if (--matchCount == 0)
  541. break;
  542. }
  543. }
  544. }
  545. if (!matchedCompletely)
  546. {
  547. //Need to preserve nextRow, otherwise it will be gone after we skip
  548. nextRow = curInput.consumeAndSkip();
  549. savedRow.setown(nextRow);
  550. }
  551. else
  552. {
  553. matchCount++;
  554. }
  555. inputProvidingSeek = curOriginalInput;
  556. }
  557. else
  558. {
  559. assertex(matchedCompletely); // should only be false if the stepped conditions fail to match
  560. matchCount++;
  561. }
  562. equalValue = nextRow; // update equalRow, partly because we shouldn't assume that seekValue can be linked
  563. if (canOptimizeInputOrder)
  564. updateOrder(nextInput);
  565. numSeekFields = numEqualFields;
  566. }
  567. //Set up the merger with the appropriate inputs. NB: inputs, not orderedInputs, and prime the initial rows to avoid extra comparisons
  568. //with the candidate.
  569. //clone one of the rows
  570. equalityRow = inputAllocator->linkRow(equalValue);
  571. ForEachItemIn(i1, inputs)
  572. inputs.item(i1).setRestriction(equalityRow, numEqualFields);
  573. ForEachItemIn(i2, joins)
  574. joins.item(i2).startRestrictedJoin(equalityRow, numEqualFields);
  575. return true;
  576. }
  577. unsigned CSteppedConjunctionOptimizer::nextToMatch() const
  578. {
  579. //A heuristic for picking the next input to pull.
  580. //If each of the distance-capable inputs has been matched 3 times then
  581. // pick the 1st ordered unmatched input
  582. //else
  583. // pick the first input that hasn't been read enough times yet.
  584. if (numOptimizeInputs != 0)
  585. {
  586. if (!inputsHaveMedian)
  587. {
  588. unsigned requiredSamples = seekMatchTicker / numOptimizeInputs;
  589. //First try and find an unmatched optimizable input
  590. for (unsigned i=0; i < numInputs; i++)
  591. {
  592. OrderedInput & cur = orderedInputs.item(i);
  593. if (!cur.matched && cur.canOptimizeOrder())
  594. {
  595. if (cur.getNumSamples() == requiredSamples)
  596. return i;
  597. }
  598. }
  599. //fall back to trying an input that can't be optimized
  600. }
  601. }
  602. for (unsigned i=0; i < numInputs; i++)
  603. {
  604. if (!orderedInputs.item(i).matched)
  605. return i;
  606. }
  607. throwUnexpected();
  608. }
  609. bool CSteppedConjunctionOptimizer::worthCombining()
  610. {
  611. unsigned flags = helper.getJoinFlags();
  612. if (flags & MJFneveroptimize)
  613. return false;
  614. if (flags & MJFalwaysoptimize)
  615. return true;
  616. return joins.ordinality() > 1 ||
  617. inputs.ordinality() > 2 ||
  618. inputHasPostfilter ||
  619. (numPriorityInputs > 0) ||
  620. inputIsDistributed;
  621. }
  622. /*
  623. How do you tell which is the best?
  624. a) cost = total # if skipping index reads. (possibly different values for within block, and random)
  625. b) benefit = total # of docs skipped (between requested and received records)
  626. c) would need some kind of running average.
  627. The following is an outline of the approach:
  628. *) The root conjunction walks its children to get a list of all conjunction inputs, and a separate list of conjunctions.
  629. [Note, LEFT ONLY and MofN won't add all their inputs. MofN may need pseudo inputs]
  630. *) The criteria for joining is the intersection of the conjunction equality conditions. (i.e. lowest number of equality fields)
  631. *) All these inputs are skipped to the appropriate places, purely as a pre-execution optimization.
  632. In particular left only and the complications of the proximity are not addressed.
  633. *) The non-root conjunctions would be marked to indicate that they didn't need to do this.
  634. *) A simple AND with non-conjunction inputs would similarly not do it. (Although it may want to because of the order optimization).
  635. *) Once a match is found, a restriction is set on all raw inputs on the range of values peek() can return.
  636. *) All joins are reset (if an eof flag/merger or something needs reseting), and tagged to indicate the number of equality fields being filtered
  637. externally.
  638. *) find/nextCandidates() is executed as expected on the input rows. When it fails, it bails out early if the outer restriction matches this.
  639. *) When next() fails it.
  640. a) clears all the restrictions.
  641. b) calculates the next stepping record.
  642. c) does a seekGE() on that new steeping record
  643. d) If can't calulate, do next() on best input until a different row is obtained + use that to seek.
  644. *) You could also do this initial conjunction processing for the proximity as well (it would further restrict by segment). However you may end
  645. up having to save and restore the equality restriction on the inputs, and it is unlikely to gain much.
  646. **** Does the failure to expand M of N's inputs cause problems? May want an option to return after the first failed input I guess.
  647. Could effectively be managed by creating I(0)..I(m-1) pseudo inputs, where input I(i) ensures there are i+1 matches.
  648. Internally the M of N would optimize its input order.
  649. - initial version doesn't bother.
  650. * The cost/benefit for an input would be stored in the input so could be shared.
  651. Sentence:
  652. ~~~~~~~~
  653. Sentence needs a range of values returned from the LHS to implement, unless represented in ECL as a ternary operator.
  654. IThorSteppingInput
  655. {
  656. virtual void * peek(void * & upper); // next record?
  657. virtual void * peekGE(seek, #fields, void * &upper); // what would the next item be - minimum for those fields, but not guaranteed.
  658. }
  659. a) Proximity requires a transform or other complex cross product to generate the rows.
  660. Other operators:
  661. Sentance:
  662. (a op b) not contain <s>
  663. lhs->peek(low, high);
  664. rhs->peekGE(low);
  665. if (
  666. Container:
  667. <x>f()</x>
  668. lhs = <x></x>
  669. rhs = f()
  670. lhs->peek(lowL, highL);
  671. rhs->peekGE(lowL, lowR, highR);
  672. if (!in range)
  673. if (lowR < lo)
  674. lhs->peekLT(lowR);?
  675. else
  676. lhs=->
  677. */