thorstep2.ipp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef THORSTEP2_IPP_INCL
  14. #define THORSTEP2_IPP_INCL
  15. #include "jqueue.tpp"
  16. #include "thorcommon.ipp"
  17. #include "thorstep.ipp"
  18. #define IGNORE_SEEK_TO_SELF
  19. enum { MaxDistanceSamples = 3 };
  20. //---------------------------------------------------------------------------
  21. class SkipDistance
  22. {
  23. public:
  24. SkipDistance() { field = 0; distance = 0; };
  25. //-1 less signficant, 0 same, +1 more signficant
  26. int compare(const SkipDistance & other) const
  27. {
  28. //More how can we incorporate some concept of the cost?
  29. //Much better to pull from a simple input than another complex join, but how can you quantify.
  30. //e.g., a read from a small inline table, may be much better than smart stepping on an index!
  31. //The lower the field, the more significant
  32. if (field < other.field)
  33. return +1;
  34. if (field > other.field)
  35. return -1;
  36. if (field >= DISTANCE_EXACT_MATCH)
  37. {
  38. //skipDistance not signficant.
  39. //More also take cost into account?)
  40. return 0;
  41. }
  42. //Higher the skip distance the better
  43. if (distance < other.distance)
  44. return -1;
  45. if (distance > other.distance)
  46. return +1;
  47. return 0;
  48. }
  49. inline void set(unsigned _field, unsigned __int64 _distance) { field = _field; distance = _distance; }
  50. inline void set(const SkipDistance & other) { field = other.field; distance = other.distance; }
  51. public:
  52. unsigned __int64 distance;
  53. unsigned field;
  54. };
  55. class OrderedInput : public CInterface
  56. {
  57. public:
  58. OrderedInput(CSteppedInputLookahead & _input, unsigned _originalIndex, bool hasDistance);
  59. //MORE: Possibly better if wasCompleteMatch not passed, and queries instead.
  60. inline const void * next(const void * seek, unsigned numFields, bool seekToSameLocation, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  61. {
  62. #ifdef _DEBUG
  63. assertex(seek);
  64. #endif
  65. const void * next = input->next(seek, numFields, wasCompleteMatch, stepExtra);
  66. matched = wasCompleteMatch;
  67. if (!next)
  68. return NULL;
  69. if (optimizeOrder)
  70. {
  71. if (seek && stepExtra.returnMismatches()) // only update distance when postfilter can be short circuited.
  72. updateSequentialDistance(seek, next, numFields, seekToSameLocation, matched);
  73. }
  74. return next;
  75. }
  76. inline void skip()
  77. {
  78. matched = false;
  79. input->skip();
  80. }
  81. inline const void * consumeAndSkip()
  82. {
  83. matched = false;
  84. return input->consume();
  85. }
  86. inline bool canOptimizeOrder() { return optimizeOrder; }
  87. inline unsigned getNumSamples() { return numSamples; }
  88. inline bool hasPriority() const { return (stepFlags & SSFhaspriority) != 0; }
  89. inline bool isJoin() const { return (stepFlags & SSFisjoin) != 0; }
  90. inline double getPriority() { return priority; }
  91. inline bool readsRowsRemotely() { return input->readsRowsRemotely(); }
  92. inline void setAlwaysReadExact() { input->setAlwaysReadExact(); }
  93. inline void setReadAhead(bool _value) { input->setReadAhead(_value); }
  94. inline void stopOptimizeOrder() { optimizeOrder = false; }
  95. inline IMultipleStepSeekInfo * createMutipleReadWrapper() { return input->createMutipleReadWrapper(); }
  96. inline void createMultipleSeekWrapper(IMultipleStepSeekInfo * wrapper) { input->createMultipleSeekWrapper(wrapper); }
  97. //Could possibly improve by averaging <n> distances. But would be more complicated, and may not be any better.
  98. bool skipsFasterThan(const OrderedInput & other) const
  99. {
  100. return medianDistance.compare(other.medianDistance) > 0;
  101. }
  102. void updateSequentialDistance(const void * seek, const void * next, unsigned numFields, bool seekToSameLocation, bool nowMatched);
  103. protected:
  104. double priority;
  105. Owned<CSteppedInputLookahead> input;
  106. unsigned numSamples;
  107. unsigned nextDistanceIndex;
  108. SkipDistance distances[MaxDistanceSamples];
  109. SkipDistance medianDistance;
  110. IDistanceCalculator * distanceCalculator;
  111. unsigned skipCost;
  112. unsigned stepFlags;
  113. bool optimizeOrder;
  114. public:
  115. const unsigned originalIndex;
  116. bool matched;
  117. };
  118. //NOTE: The lifetime of this class is withing a start()/stop() of the containing activity.
  119. class THORHELPER_API CSteppedConjunctionOptimizer : implements ISteppedConjunctionCollector
  120. {
  121. public:
  122. CSteppedConjunctionOptimizer(IEngineRowAllocator * _inputAllocator, IHThorNWayMergeJoinArg & _arg, ISteppedInput * _root);
  123. ~CSteppedConjunctionOptimizer();
  124. virtual void addInput(CSteppedInputLookahead & _input);
  125. virtual void addPseudoInput(CSteppedInputLookahead & _input);
  126. virtual void addJoin(ISteppedJoin & _input);
  127. //interface IHThorInput
  128. void afterProcessing();
  129. void beforeProcessing();
  130. const void * next();
  131. const void * nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra);
  132. //Once it reorders the inputs dynamically, the following will be better
  133. bool worthCombining();
  134. protected:
  135. inline bool hasCandidates() const { return equalityRow != NULL; }
  136. inline bool optimizePostFilter(unsigned nextInput) { return (nextInput != 0) || !inputsHaveMedian; }
  137. void getInputOrderText(StringBuffer & s);
  138. void finishCandidates();
  139. void updateOrder(unsigned whichInput);
  140. private:
  141. protected:
  142. IHThorNWayMergeJoinArg & helper;
  143. Linked<IEngineRowAllocator> inputAllocator;
  144. ISteppingMeta * mergeSteppingMeta;
  145. IRangeCompare * stepCompare;
  146. ICompare * equalCompare;
  147. ICompareEq * partitionCompare;
  148. ISteppedInput * rootActivity; // linking would create a cycle
  149. void * equalityRow;
  150. void * prevEqualityRow;
  151. const void * lowestSeekRow;
  152. void * prevPartitionRow;
  153. unsigned numEqualFields;
  154. unsigned numInputs;
  155. unsigned numPriorityInputs;
  156. unsigned numOptimizeInputs;
  157. unsigned maxOptimizeInput;
  158. unsigned seekMatchTicker; // not really a counter - used to select which input is read next
  159. CSteppedInputLookaheadArray inputs;
  160. CSteppedInputLookaheadArray pseudoInputs;
  161. CIArrayOf<OrderedInput> orderedInputs;
  162. IArrayOf<ISteppedJoin> joins;
  163. bool eof;
  164. bool inputsHaveMedian;
  165. bool inputHasPostfilter;
  166. bool inputIsDistributed;
  167. protected:
  168. virtual bool findCandidates(const void * seekValue, unsigned numSeekFields);
  169. private:
  170. unsigned nextToMatch() const;
  171. };
  172. void associateRemoteInputs(CIArrayOf<OrderedInput> & orderedInputs, unsigned numPriorityInputs);
  173. int compareInitialInputOrder(CInterface * const * _left, CInterface * const * _right);
  174. #endif