hqlresource.cpp 234 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlattr.hpp"
  17. #include "hqlmeta.hpp"
  18. #include "hqlutil.hpp"
  19. #include "hqlcpputil.hpp"
  20. #include "hqlthql.hpp"
  21. #include "hqlcatom.hpp"
  22. #include "hqlfold.hpp"
  23. #include "hqlcerrors.hpp"
  24. #include "hqltrans.ipp"
  25. #include "hqlpmap.hpp"
  26. #include "hqltcppc.ipp"
  27. #include "hqlttcpp.ipp"
  28. #include "hqlresource.ipp"
  29. #include "../../thorlcr/thorutil/thbufdef.hpp"
  30. // See comment at the end of the file for an overview of the process performed by these classes.
  31. #define MINIMAL_CHANGES
  32. #define MAX_INLINE_COMMON_COUNT 5
  33. //#define TRACE_RESOURCING
  34. //#define VERIFY_RESOURCING
  35. //#define SPOT_UNCONDITIONAL_CONDITIONS
  36. #define DEFAULT_MAX_SOCKETS 2000 // configurable by setting max_sockets in .ini
  37. #define DEFAULT_TOTAL_MEMORY ((1024*1024*1800))
  38. #define FIXED_CLUSTER_SIZE 400
  39. #define MEM_Const_Minimal (1*1024*1024)
  40. #define DEFAULT_MAX_ACTIVITIES 100
  41. static IHqlExpression * backtrackPseudoExpr;
  42. MODULE_INIT(INIT_PRIORITY_STANDARD)
  43. {
  44. backtrackPseudoExpr = createAttribute(retryAtom);
  45. return true;
  46. }
  47. MODULE_EXIT()
  48. {
  49. ::Release(backtrackPseudoExpr);
  50. }
  51. //---------------------------------------------------------------------------------------------------------------------
  52. inline bool isAffectedByResourcing(IHqlExpression * expr)
  53. {
  54. switch (expr->getOperator())
  55. {
  56. case no_record:
  57. case no_constant:
  58. case no_attr:
  59. return false;
  60. }
  61. return true;
  62. }
  63. static node_operator expandLists(node_operator op, HqlExprArray & args, IHqlExpression * expr);
  64. static void expandChildren(node_operator op, HqlExprArray & args, IHqlExpression * expr)
  65. {
  66. ForEachChild(idx, expr)
  67. expandLists(op, args, expr->queryChild(idx));
  68. }
  69. static node_operator expandLists(node_operator op, HqlExprArray & args, IHqlExpression * expr)
  70. {
  71. switch (expr->getOperator())
  72. {
  73. case no_sequential:
  74. case no_orderedactionlist:
  75. if (op != no_parallel)
  76. {
  77. expandChildren(no_sequential, args, expr);
  78. return no_sequential;
  79. }
  80. break;
  81. case no_comma:
  82. case no_compound:
  83. case no_actionlist:
  84. case no_parallel:
  85. if (op != no_sequential)
  86. {
  87. expandChildren(no_parallel, args, expr);
  88. return no_parallel;
  89. }
  90. break;
  91. case no_null:
  92. return op;
  93. }
  94. args.append(*LINK(expr));
  95. return op;
  96. }
  97. //---------------------------------------------------------------------------------------------------------------------
  98. CResourceOptions::CResourceOptions(ClusterType _targetClusterType, unsigned _clusterSize, const HqlCppOptions & _translatorOptions, UniqueSequenceCounter & _spillSequence)
  99. : spillSequence(_spillSequence)
  100. {
  101. filteredSpillThreshold = 0;
  102. minimizeSpillSize = 0;
  103. allowThroughSpill = false;
  104. allowThroughResult = false;
  105. cloneFilteredIndex = false;
  106. spillSharedConditionals = false;
  107. shareDontExpand = false;
  108. useGraphResults = false;
  109. noConditionalLinks = false;
  110. minimiseSpills = false;
  111. hoistResourced = false;
  112. isChildQuery = false;
  113. groupedChildIterators = false;
  114. allowSplitBetweenSubGraphs = false;
  115. preventKeyedSplit = false;
  116. preventSteppedSplit = false;
  117. minimizeSkewBeforeSpill = false;
  118. expandSingleConstRow = false;
  119. createSpillAsDataset = false;
  120. optimizeSharedInputs = false;
  121. combineSiblings = false;
  122. actionLinkInNewGraph = false;
  123. convertCompoundToExecuteWhen = false;
  124. useResultsForChildSpills = false;
  125. alwaysUseGraphResults = false;
  126. newBalancedSpotter = false;
  127. graphIdExpr = NULL;
  128. nextResult = 0;
  129. state.updateSequence = 0;
  130. targetClusterType = _targetClusterType;
  131. clusterSize = _clusterSize ? _clusterSize : FIXED_CLUSTER_SIZE;
  132. minimizeSpillSize = _translatorOptions.minimizeSpillSize;
  133. switch (targetClusterType)
  134. {
  135. case ThorLCRCluster:
  136. break;
  137. default:
  138. clusterSize = 1;
  139. break;
  140. }
  141. isChildQuery = false;
  142. filteredSpillThreshold = _translatorOptions.filteredReadSpillThreshold;
  143. allowThroughSpill = (targetClusterType != RoxieCluster) && (targetClusterType != ThorLCRCluster) && _translatorOptions.allowThroughSpill;
  144. allowThroughResult = (targetClusterType != RoxieCluster) && (targetClusterType != ThorLCRCluster);
  145. cloneFilteredIndex = (targetClusterType != RoxieCluster);
  146. spillSharedConditionals = (targetClusterType == RoxieCluster);
  147. shareDontExpand = (targetClusterType == RoxieCluster);
  148. graphIdExpr = NULL;
  149. //MORE The following doesn't always work - it gets sometimes confused about spill files - see latestheaderbuild for an example.
  150. //Try again once cloneConditionals is false for thor
  151. minimiseSpills = _translatorOptions.minimiseSpills;
  152. spillMultiCondition = _translatorOptions.spillMultiCondition;
  153. spotThroughAggregate = _translatorOptions.spotThroughAggregate && (targetClusterType != RoxieCluster) && (targetClusterType != ThorLCRCluster);
  154. noConditionalLinks = (targetClusterType == RoxieCluster) || ((targetClusterType != HThorCluster) && _translatorOptions.noConditionalLinks);
  155. hoistResourced = _translatorOptions.hoistResourced;
  156. alwaysUseGraphResults = _translatorOptions.alwaysUseGraphResults;
  157. useGraphResults = false; // modified by later call
  158. groupedChildIterators = _translatorOptions.groupedChildIterators;
  159. allowSplitBetweenSubGraphs = false;//(targetClusterType == RoxieCluster);
  160. preventKeyedSplit = _translatorOptions.preventKeyedSplit;
  161. preventSteppedSplit = _translatorOptions.preventSteppedSplit;
  162. minimizeSkewBeforeSpill = _translatorOptions.minimizeSkewBeforeSpill;
  163. expandSingleConstRow = true;
  164. createSpillAsDataset = _translatorOptions.optimizeSpillProject && (targetClusterType != HThorCluster);
  165. combineSiblings = _translatorOptions.combineSiblingGraphs && (targetClusterType != HThorCluster) && (targetClusterType != RoxieCluster);
  166. optimizeSharedInputs = _translatorOptions.optimizeSharedGraphInputs && combineSiblings;
  167. actionLinkInNewGraph = _translatorOptions.actionLinkInNewGraph || (targetClusterType == HThorCluster);
  168. convertCompoundToExecuteWhen = false;
  169. useResultsForChildSpills = _translatorOptions.useResultsForChildSpills;
  170. newBalancedSpotter = _translatorOptions.newBalancedSpotter;
  171. alwaysReuseGlobalSpills = _translatorOptions.alwaysReuseGlobalSpills;
  172. }
  173. void CResourceOptions::setChildQuery(bool value)
  174. {
  175. isChildQuery = value;
  176. }
  177. void CResourceOptions::setNewChildQuery(IHqlExpression * _graphIdExpr, unsigned _numResults)
  178. {
  179. graphIdExpr = _graphIdExpr;
  180. nextResult = _numResults;
  181. }
  182. bool CResourceOptions::useGraphResult(bool linkedFromChild)
  183. {
  184. if (useResultsForChildSpills && linkedFromChild)
  185. return true;
  186. if (alwaysUseGraphResults)
  187. return true;
  188. if (!useGraphResults)
  189. return false;
  190. if (linkedFromChild)
  191. return true;
  192. //Roxie converts spills into splitters, so best to retain them
  193. if (targetClusterType == RoxieCluster)
  194. return false;
  195. return true;
  196. }
  197. bool CResourceOptions::useGlobalResult(bool linkedFromChild)
  198. {
  199. return (linkedFromChild && !useGraphResult(linkedFromChild));
  200. }
  201. //---------------------------------------------------------------------------------------------------------------------
  202. inline bool projectSelectorDatasetToField(IHqlExpression * row)
  203. {
  204. return ((row->getOperator() == no_selectnth) && getFieldCount(row->queryRecord()) > 1);
  205. }
  206. static IHqlExpression * skipScalarWrappers(IHqlExpression * value)
  207. {
  208. for (;;)
  209. {
  210. node_operator op = value->getOperator();
  211. if ((op != no_globalscope) && (op != no_thisnode) && (op != no_evalonce))
  212. return value;
  213. value = value->queryChild(0);
  214. }
  215. }
  216. static HqlTransformerInfo eclHoistLocatorInfo("EclHoistLocator");
  217. class EclHoistLocator : public NewHqlTransformer
  218. {
  219. public:
  220. EclHoistLocator(ChildDependentArray & _matched) : NewHqlTransformer(eclHoistLocatorInfo), matched(_matched)
  221. {
  222. alwaysSingle = true;
  223. }
  224. void analyseChild(IHqlExpression * expr, bool _alwaysSingle)
  225. {
  226. alwaysSingle = _alwaysSingle;
  227. analyse(expr, 0);
  228. }
  229. void noteDataset(IHqlExpression * expr, IHqlExpression * hoisted, bool alwaysHoist)
  230. {
  231. unsigned match = matched.findOriginal(expr);
  232. if (match == NotFound)
  233. {
  234. if (!hoisted)
  235. hoisted = expr;
  236. CChildDependent & depend = * new CChildDependent(expr, hoisted, alwaysHoist, alwaysSingle, false);
  237. matched.append(depend);
  238. }
  239. else
  240. {
  241. CChildDependent & prev = matched.item(match);
  242. if (alwaysHoist && !prev.alwaysHoist)
  243. prev.alwaysHoist = true;
  244. if (alwaysSingle && !prev.isSingleNode)
  245. prev.isSingleNode = true;
  246. }
  247. }
  248. void noteScalar(IHqlExpression * expr, IHqlExpression * value)
  249. {
  250. unsigned match = matched.findOriginal(expr);
  251. if (match == NotFound)
  252. {
  253. value = skipScalarWrappers(value);
  254. OwnedHqlExpr hoisted;
  255. IHqlExpression * projected = NULL;
  256. if (value->getOperator() == no_select)
  257. {
  258. bool isNew;
  259. IHqlExpression * row = querySelectorDataset(value, isNew);
  260. if(isNew || row->isDatarow())
  261. {
  262. if (projectSelectorDatasetToField(row))
  263. projected = value;
  264. hoisted.set(row);
  265. }
  266. else
  267. {
  268. //very unusual - possibly a thisnode(myfield) hoisted from an allnodes
  269. //use a createrow of a single element in the default behavour below
  270. }
  271. }
  272. else if (value->getOperator() == no_createset)
  273. {
  274. IHqlExpression * ds = value->queryChild(0);
  275. IHqlExpression * selected = value->queryChild(1);
  276. OwnedHqlExpr field;
  277. //Project down to a single field.so implicit fields can still be optimized
  278. if (selected->getOperator() == no_select)
  279. field.set(selected->queryChild(1));
  280. else
  281. field.setown(createFieldFromValue(valueId, selected));
  282. OwnedHqlExpr record = createRecord(field);
  283. OwnedHqlExpr self = getSelf(record);
  284. OwnedHqlExpr assign = createAssign(createSelectExpr(LINK(self), LINK(field)), LINK(selected));
  285. OwnedHqlExpr transform = createValue(no_newtransform, makeTransformType(record->getType()), LINK(assign));
  286. hoisted.setown(createDataset(no_newusertable, LINK(ds), createComma(LINK(record), LINK(transform))));
  287. }
  288. if (!hoisted)
  289. {
  290. OwnedHqlExpr field = createField(valueId, value->getType(), NULL);
  291. OwnedHqlExpr record = createRecord(field);
  292. OwnedHqlExpr self = getSelf(record);
  293. OwnedHqlExpr assign = createAssign(createSelectExpr(LINK(self), LINK(field)), LINK(value));
  294. OwnedHqlExpr transform = createValue(no_transform, makeTransformType(record->getType()), LINK(assign));
  295. hoisted.setown(createRow(no_createrow, LINK(transform)));
  296. }
  297. CChildDependent & depend = * new CChildDependent(expr, hoisted, true, true, true);
  298. depend.projected = projected;
  299. matched.append(depend);
  300. }
  301. }
  302. protected:
  303. ChildDependentArray & matched;
  304. bool alwaysSingle;
  305. };
  306. class EclChildSplitPointLocator : public EclHoistLocator
  307. {
  308. public:
  309. EclChildSplitPointLocator(IHqlExpression * _original, HqlExprCopyArray & _selectors, ChildDependentArray & _matches, bool _groupedChildIterators)
  310. : EclHoistLocator(_matches), selectors(_selectors), groupedChildIterators(_groupedChildIterators)
  311. {
  312. original = _original;
  313. okToSelect = false;
  314. gathered = false;
  315. conditionalDepth = 0;
  316. executedOnce = false;
  317. switch (original->getOperator())
  318. {
  319. case no_call:
  320. case no_externalcall:
  321. case no_libraryscopeinstance:
  322. okToSelect = true;
  323. break;
  324. }
  325. }
  326. void findSplitPoints(IHqlExpression * expr, unsigned from, unsigned to, bool _alwaysSingle, bool _executedOnce)
  327. {
  328. alwaysSingle = _alwaysSingle;
  329. for (unsigned i=from; i < to; i++)
  330. {
  331. IHqlExpression * cur = expr->queryChild(i);
  332. executedOnce = _executedOnce || cur->isAttribute(); // assume attributes are only executed once.
  333. findSplitPoints(cur);
  334. }
  335. alwaysSingle = false;
  336. }
  337. protected:
  338. void findSplitPoints(IHqlExpression * expr)
  339. {
  340. //containsNonActiveDataset() would be nice - but that isn't percolated outside assigns etc.
  341. if (containsAnyDataset(expr) || containsMustHoist(expr) || !expr->isIndependentOfScope())
  342. {
  343. if (!gathered)
  344. {
  345. gatherAmbiguousSelectors(original);
  346. gathered = true;
  347. }
  348. analyse(expr, 0);
  349. }
  350. }
  351. bool queryHoistDataset(IHqlExpression * ds)
  352. {
  353. bool alwaysHoist = true;
  354. if (executedOnce)
  355. {
  356. if (conditionalDepth != 0)
  357. alwaysHoist = false;
  358. }
  359. return alwaysHoist;
  360. }
  361. bool queryNoteDataset(IHqlExpression * ds)
  362. {
  363. bool alwaysHoist = queryHoistDataset(ds);
  364. //MORE: It should be possible to remove this condition, but it causes problems with resourcing hsss.xhql amongst others -> disable for the moment
  365. if (alwaysHoist)
  366. noteDataset(ds, ds, alwaysHoist);
  367. return alwaysHoist;
  368. }
  369. virtual void analyseExpr(IHqlExpression * expr)
  370. {
  371. if (alreadyVisited(expr))
  372. return;
  373. node_operator op = expr->getOperator();
  374. switch (op)
  375. {
  376. case no_select:
  377. {
  378. bool isNew;
  379. IHqlExpression * ds = querySelectorDataset(expr, isNew);
  380. if (isNew)
  381. {
  382. if (isEvaluateable(ds))
  383. {
  384. //MORE: Following isn't a very nice test - stops implicit denormalize getting messed up
  385. if (expr->isDataset())
  386. break;
  387. //Debtable.....
  388. //Don't hoist counts on indexes or dataset - it may mean they are evaluated more frequently than need be.
  389. //If dependencies and root graphs are handled correctly this could be deleted.
  390. if (isCompoundAggregate(ds))
  391. break;
  392. if (!expr->isDatarow() && !expr->isDataset() && !expr->isDictionary())
  393. {
  394. if (queryHoistDataset(ds))
  395. {
  396. noteScalar(expr, expr);
  397. return;
  398. }
  399. }
  400. else
  401. {
  402. if (queryNoteDataset(ds))
  403. return;
  404. }
  405. }
  406. }
  407. break;
  408. }
  409. case no_createset:
  410. {
  411. IHqlExpression * ds = expr->queryChild(0);
  412. if (isEvaluateable(ds))
  413. {
  414. if (queryHoistDataset(ds))
  415. {
  416. noteScalar(expr, expr);
  417. //?? queryNoteDataset(ds);
  418. return;
  419. }
  420. }
  421. break;
  422. }
  423. case no_assign:
  424. {
  425. IHqlExpression * rhs = expr->queryChild(1);
  426. //if rhs is a new, evaluatable, dataset then we want to add it
  427. if ((rhs->isDataset() || rhs->isDictionary()) && isEvaluateable(rhs))
  428. {
  429. if (queryNoteDataset(rhs))
  430. return;
  431. }
  432. break;
  433. }
  434. case no_sizeof:
  435. case no_allnodes:
  436. case no_nohoist:
  437. case no_forcegraph:
  438. return;
  439. case no_globalscope:
  440. case no_evalonce:
  441. if (expr->hasAttribute(optAtom) && !expr->isIndependentOfScope())
  442. break;
  443. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary())
  444. noteDataset(expr, expr->queryChild(0), true);
  445. else
  446. noteScalar(expr, expr->queryChild(0));
  447. return;
  448. case no_thisnode:
  449. throwUnexpected();
  450. case no_getgraphresult:
  451. if (expr->hasAttribute(_streaming_Atom) || expr->hasAttribute(_distributed_Atom))
  452. {
  453. noteDataset(expr, expr, true);
  454. return;
  455. }
  456. break;
  457. case no_getgraphloopresult:
  458. noteDataset(expr, expr, true);
  459. return;
  460. case no_createdictionary:
  461. if (isEvaluateable(expr) && !isConstantDictionary(expr))
  462. noteDataset(expr, expr, true);
  463. return;
  464. case no_selectnth:
  465. if (expr->queryChild(1)->isConstant())
  466. {
  467. IHqlExpression * ds = expr->queryChild(0);
  468. switch (ds->getOperator())
  469. {
  470. case no_getgraphresult:
  471. if (!expr->hasAttribute(_streaming_Atom) && !expr->hasAttribute(_distributed_Atom))
  472. break;
  473. //fallthrough
  474. case no_getgraphloopresult:
  475. noteDataset(expr, expr, true);
  476. return;
  477. }
  478. }
  479. break;
  480. }
  481. bool wasOkToSelect = okToSelect;
  482. if (expr->isDataset())
  483. {
  484. switch (expr->getOperator())
  485. {
  486. case no_compound_diskread:
  487. case no_compound_disknormalize:
  488. case no_compound_diskaggregate:
  489. case no_compound_diskcount:
  490. case no_compound_diskgroupaggregate:
  491. case no_compound_indexread:
  492. case no_compound_indexnormalize:
  493. case no_compound_indexaggregate:
  494. case no_compound_indexcount:
  495. case no_compound_indexgroupaggregate:
  496. case no_compound_childread:
  497. case no_compound_childnormalize:
  498. case no_compound_childaggregate:
  499. case no_compound_childcount:
  500. case no_compound_childgroupaggregate:
  501. case no_compound_selectnew:
  502. case no_compound_inline:
  503. case no_newkeyindex:
  504. case no_keyindex:
  505. case no_table:
  506. okToSelect = false;
  507. break;
  508. }
  509. if (okToSelect && isEvaluateable(expr))
  510. {
  511. if (queryNoteDataset(expr))
  512. return;
  513. }
  514. }
  515. else
  516. okToSelect = true;
  517. switch (op)
  518. {
  519. case no_if:
  520. case no_choose:
  521. case no_chooseds:
  522. {
  523. IHqlExpression * cond = expr->queryChild(0);
  524. analyseExpr(cond);
  525. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary() || expr->isAction())
  526. conditionalDepth++;
  527. doAnalyseChildren(expr, 1);
  528. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary() || expr->isAction())
  529. conditionalDepth--;
  530. break;
  531. }
  532. case no_mapto:
  533. {
  534. analyseExpr(expr->queryChild(0));
  535. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary())
  536. conditionalDepth++;
  537. analyseExpr(expr->queryChild(1));
  538. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary())
  539. conditionalDepth--;
  540. break;
  541. }
  542. case no_attr_expr:
  543. //Ignore internal tracking attributes e.g., _selectors_Atom
  544. if (!isInternalAttributeName(expr->queryName()))
  545. {
  546. //Default action for no_attr_expr is to not walk children, but we need to here.
  547. bool wasExecutedOnce = executedOnce;
  548. executedOnce = true;
  549. analyseChildren(expr);
  550. executedOnce = wasExecutedOnce;
  551. }
  552. break;
  553. default:
  554. NewHqlTransformer::analyseExpr(expr);
  555. break;
  556. }
  557. okToSelect = wasOkToSelect;
  558. }
  559. bool isCompoundAggregate(IHqlExpression * ds)
  560. {
  561. return false;
  562. //Generates worse code unless we take into account whether or not the newDisk operation flags are enabled.
  563. if (!isTrivialSelectN(ds))
  564. return false;
  565. IHqlExpression * agg = ds->queryChild(0);
  566. if (isSimpleCountAggregate(agg, true))
  567. return true;
  568. return false;
  569. }
  570. void gatherAmbiguousSelectors(IHqlExpression * expr)
  571. {
  572. //Horrible code to try and cope with ambiguous left selectors.
  573. //o Tree is ambiguous so same child expression can occur in different contexts - so can't depend on the context it is found in to work out if can hoist
  574. //o If any selector that is hidden within child expressions matches one in scope then can't hoist it.
  575. //If the current expression creates a selector, then can't hoist anything that depends on it [only add to hidden if in selectors to reduce searching]
  576. //o Want to hoist as much as possible.
  577. if (selectors.empty())
  578. return;
  579. unsigned first = getFirstActivityArgument(expr);
  580. unsigned last = first + getNumActivityArguments(expr);
  581. unsigned max = expr->numChildren();
  582. unsigned i;
  583. HqlExprCopyArray hiddenSelectors;
  584. for (i = 0; i < first; i++)
  585. expr->queryChild(i)->gatherTablesUsed(&hiddenSelectors, NULL);
  586. for (i = last; i < max; i++)
  587. expr->queryChild(i)->gatherTablesUsed(&hiddenSelectors, NULL);
  588. ForEachItemIn(iSel, selectors)
  589. {
  590. IHqlExpression & cur = selectors.item(iSel);
  591. if (hiddenSelectors.contains(cur))
  592. ambiguousSelectors.append(cur);
  593. }
  594. switch (getChildDatasetType(expr))
  595. {
  596. case childdataset_datasetleft:
  597. case childdataset_left:
  598. {
  599. IHqlExpression * ds = expr->queryChild(0);
  600. IHqlExpression * selSeq = querySelSeq(expr);
  601. OwnedHqlExpr left = createSelector(no_left, ds, selSeq);
  602. if (selectors.contains(*left))
  603. ambiguousSelectors.append(*left);
  604. break;
  605. }
  606. case childdataset_same_left_right:
  607. case childdataset_top_left_right:
  608. case childdataset_nway_left_right:
  609. {
  610. IHqlExpression * ds = expr->queryChild(0);
  611. IHqlExpression * selSeq = querySelSeq(expr);
  612. OwnedHqlExpr left = createSelector(no_left, ds, selSeq);
  613. OwnedHqlExpr right = createSelector(no_right, ds, selSeq);
  614. if (selectors.contains(*left))
  615. ambiguousSelectors.append(*left);
  616. if (selectors.contains(*right))
  617. ambiguousSelectors.append(*right);
  618. break;
  619. }
  620. case childdataset_leftright:
  621. {
  622. IHqlExpression * leftDs = expr->queryChild(0);
  623. IHqlExpression * rightDs = expr->queryChild(1);
  624. IHqlExpression * selSeq = querySelSeq(expr);
  625. OwnedHqlExpr left = createSelector(no_left, leftDs, selSeq);
  626. OwnedHqlExpr right = createSelector(no_right, rightDs, selSeq);
  627. if (selectors.contains(*left))
  628. ambiguousSelectors.append(*left);
  629. if (selectors.contains(*right))
  630. ambiguousSelectors.append(*right);
  631. break;
  632. }
  633. }
  634. }
  635. bool isEvaluateable(IHqlExpression * ds, bool ignoreInline = false)
  636. {
  637. //Don't hoist an alias - it could create unnecessary duplicate spills - hoist its input
  638. if (ds->getOperator() == no_dataset_alias)
  639. return false;
  640. //Not allowed to hoist
  641. if (isContextDependent(ds, (conditionalDepth == 0), true))
  642. return false;
  643. //MORE: Needs more work for child queries - need a GroupedChildIterator activity
  644. if (isGrouped(ds) && selectors.ordinality() && !groupedChildIterators)
  645. return false;
  646. //Check datasets are available
  647. HqlExprCopyArray scopeUsed;
  648. ds->gatherTablesUsed(NULL, &scopeUsed);
  649. ForEachItemIn(i, scopeUsed)
  650. {
  651. IHqlExpression & cur = scopeUsed.item(i);
  652. if (!selectors.contains(cur))
  653. return false;
  654. if (ambiguousSelectors.contains(cur))
  655. return false;
  656. }
  657. if (!isEfficientToHoistDataset(ds, ignoreInline))
  658. return false;
  659. return true;
  660. }
  661. bool isEfficientToHoistDataset(IHqlExpression * ds, bool ignoreInline) const
  662. {
  663. //MORE: This whole function could do with some significant improvements. Whether it is inefficient to hoist
  664. //depends on at least the following...
  665. //a) cost of serializing v cost of re-evaluating (which can depend on the engine).
  666. //b) How many times it will be evaluated in the child context
  667. if (ds->getOperator() == no_createdictionary)
  668. return true;
  669. if (isInlineTrivialDataset(ds))
  670. return false;
  671. #ifdef MINIMAL_CHANGES
  672. if (!ignoreInline)
  673. {
  674. //Generally this appears to be better to hoist since it involves calling a transform.
  675. IHqlExpression * root = queryRoot(ds);
  676. if (root && root->getOperator() == no_dataset_from_transform)
  677. return true;
  678. if (canProcessInline(NULL, ds))
  679. return false;
  680. }
  681. #endif
  682. return true;
  683. }
  684. protected:
  685. IHqlExpression * original;
  686. HqlExprCopyArray & selectors;
  687. HqlExprCopyArray ambiguousSelectors;
  688. unsigned conditionalDepth;
  689. bool okToSelect;
  690. bool gathered;
  691. bool groupedChildIterators;
  692. bool executedOnce;
  693. };
  694. class EclThisNodeLocator : public EclHoistLocator
  695. {
  696. public:
  697. EclThisNodeLocator(ChildDependentArray & _matches)
  698. : EclHoistLocator(_matches)
  699. {
  700. allNodesDepth = 0;
  701. }
  702. protected:
  703. virtual void analyseExpr(IHqlExpression * expr)
  704. {
  705. //NB: This doesn't really work for no_thisnode occurring in multiple contexts. We should probably hoist it from everywhere if it is hoistable from anywhere,
  706. // although theoretically that gives us problems with ambiguous selectors.
  707. if (alreadyVisited(expr) || !containsThisNode(expr))
  708. return;
  709. node_operator op = expr->getOperator();
  710. switch (op)
  711. {
  712. case no_allnodes:
  713. allNodesDepth++;
  714. NewHqlTransformer::analyseExpr(expr);
  715. allNodesDepth--;
  716. return;
  717. case no_thisnode:
  718. if (allNodesDepth == 0)
  719. {
  720. if (expr->isDataset() || expr->isDatarow() || expr->isDictionary())
  721. noteDataset(expr, expr->queryChild(0), true);
  722. else
  723. noteScalar(expr, expr->queryChild(0));
  724. return;
  725. }
  726. allNodesDepth--;
  727. NewHqlTransformer::analyseExpr(expr);
  728. allNodesDepth++;
  729. return;
  730. }
  731. NewHqlTransformer::analyseExpr(expr);
  732. }
  733. protected:
  734. unsigned allNodesDepth;
  735. };
  736. static HqlTransformerInfo childDependentReplacerInfo("ChildDependentReplacer");
  737. class ChildDependentReplacer : public MergingHqlTransformer
  738. {
  739. public:
  740. ChildDependentReplacer(const HqlExprCopyArray & _childDependents, const HqlExprArray & _replacements)
  741. : MergingHqlTransformer(childDependentReplacerInfo), childDependents(_childDependents), replacements(_replacements)
  742. {
  743. }
  744. protected:
  745. virtual IHqlExpression * createTransformed(IHqlExpression * expr)
  746. {
  747. unsigned match = childDependents.find(*expr);
  748. if (match != NotFound)
  749. return LINK(&replacements.item(match));
  750. return MergingHqlTransformer::createTransformed(expr);
  751. }
  752. protected:
  753. const HqlExprCopyArray & childDependents;
  754. const HqlExprArray & replacements;
  755. };
  756. //---------------------------------------------------------------------------------------------------------------------
  757. bool canUseResultInChildQuery(IHqlExpression * expr)
  758. {
  759. return (expr->getOperator() == no_setgraphresult);
  760. }
  761. class ActivityInvariantInfo : public SpillerInfo
  762. {
  763. public:
  764. ActivityInvariantInfo(IHqlExpression * _original, CResourceOptions * _options) : SpillerInfo(_original, _options)
  765. {
  766. containsActivity = false;
  767. isActivity = false;
  768. visited = false;
  769. isAlreadyInScope = false;
  770. projectResult = true; // projected must also be non empty to actually project this dataset
  771. }
  772. void addProjected(IHqlExpression * next)
  773. {
  774. if (projectResult && !projected.contains(*next))
  775. projected.append(*LINK(next));
  776. }
  777. void clearProjected()
  778. {
  779. projectResult = false;
  780. projected.kill();
  781. }
  782. inline bool isResourcedActivity() const
  783. {
  784. return isActivity || containsActivity;
  785. }
  786. void noteUsedFromChild()
  787. {
  788. linkedFromChild = true;
  789. if (outputToUseForSpill && !canUseResultInChildQuery(outputToUseForSpill))
  790. outputToUseForSpill = NULL;
  791. }
  792. public:
  793. HqlExprArray projected;
  794. HqlExprAttr projectedExpr;
  795. ChildDependentArray childDependents;
  796. HqlExprAttr hoistedRead;
  797. bool containsActivity:1;
  798. bool isActivity:1;
  799. bool visited:1;
  800. bool isAlreadyInScope:1;
  801. bool projectResult:1;
  802. };
  803. //Spot expressions that are invariant within an activity, and so would be more efficient if only executed once.
  804. //MORE: Should there be an activity traversal base class?
  805. static HqlTransformerInfo activityInvariantHoisterInfo("ActivityInvariantHoister");
  806. class ActivityInvariantHoister : public NewHqlTransformer
  807. {
  808. public:
  809. ActivityInvariantHoister(CResourceOptions & _options)
  810. : NewHqlTransformer(activityInvariantHoisterInfo), options(_options)
  811. {
  812. }
  813. void tagActiveCursors(HqlExprCopyArray * activeRows);
  814. void transformRoot(const HqlExprArray & in, HqlExprArray & out);
  815. IHqlExpression * transformRoot(IHqlExpression * expr);
  816. protected:
  817. void gatherChildSplitPoints(IHqlExpression * expr, ActivityInvariantInfo * info, unsigned first, unsigned last);
  818. bool findSplitPoints(IHqlExpression * expr, bool isProjected);
  819. void extendSplitPoints();
  820. IHqlExpression * projectChildDependent(IHqlExpression * expr);
  821. void projectChildDependents();
  822. void findSplitPoints(const HqlExprArray & exprs);
  823. IHqlExpression * createTransformed(IHqlExpression * expr);
  824. IHqlExpression * replaceResourcedReferences(ActivityInvariantInfo * info, IHqlExpression * expr);
  825. void doTransformRoot(const HqlExprArray & in);
  826. void ensureHoisted(IHqlExpression * expr, bool isSingleNode);
  827. static inline bool isResourcedActivity(IHqlExpression * expr)
  828. {
  829. if (!expr)
  830. return false;
  831. ActivityInvariantInfo * extra = queryOptBodyInfo(expr);
  832. return extra && extra->isResourcedActivity();
  833. }
  834. bool isWorthForcingHoist(IHqlExpression * expr)
  835. {
  836. for (;;)
  837. {
  838. switch (expr->getOperator())
  839. {
  840. case no_selectnth:
  841. case no_filter:
  842. case no_newaggregate:
  843. if (isResourcedActivity(expr->queryChild(0)))
  844. return true;
  845. break;
  846. default:
  847. return false;
  848. }
  849. expr = expr->queryChild(0);
  850. }
  851. }
  852. //NewHqlTransformer
  853. virtual ANewTransformInfo * createTransformInfo(IHqlExpression * expr)
  854. {
  855. return new ActivityInvariantInfo(expr, &options);
  856. }
  857. ActivityInvariantInfo * queryBodyInfo(IHqlExpression * expr)
  858. {
  859. return static_cast<ActivityInvariantInfo *>(queryTransformExtra(expr->queryBody()));
  860. }
  861. static ActivityInvariantInfo * queryOptBodyInfo(IHqlExpression * expr)
  862. {
  863. return static_cast<ActivityInvariantInfo *>(expr->queryBody()->queryTransformExtra());
  864. }
  865. private:
  866. HqlExprArray result;
  867. CResourceOptions & options;
  868. HqlExprCopyArray activeSelectors;
  869. ChildDependentArray allChildDependents;
  870. };
  871. void ActivityInvariantHoister::doTransformRoot(const HqlExprArray & in)
  872. {
  873. findSplitPoints(in);
  874. //If no child dependents have been found then copy instead of transforming
  875. if (allChildDependents.ordinality() != 0)
  876. {
  877. ForEachItemIn(i, in)
  878. result.append(*doTransformRootExpr(&in.item(i)));
  879. }
  880. else
  881. {
  882. ForEachItemIn(i, in)
  883. result.append(*LINK(&in.item(i)));
  884. }
  885. }
  886. void ActivityInvariantHoister::tagActiveCursors(HqlExprCopyArray * activeRows)
  887. {
  888. if (!activeRows)
  889. return;
  890. ForEachItemIn(i, *activeRows)
  891. {
  892. IHqlExpression & cur = activeRows->item(i);
  893. activeSelectors.append(cur);
  894. queryBodyInfo(&cur)->isAlreadyInScope = true;
  895. }
  896. }
  897. void ActivityInvariantHoister::transformRoot(const HqlExprArray & in, HqlExprArray & out)
  898. {
  899. doTransformRoot(in);
  900. return out.swapWith(result);
  901. }
  902. IHqlExpression * ActivityInvariantHoister::transformRoot(IHqlExpression * expr)
  903. {
  904. HqlExprArray exprs;
  905. node_operator expandOp = options.isChildQuery ? no_any : no_parallel;
  906. node_operator createOp = expandLists(expandOp, exprs, expr);
  907. doTransformRoot(exprs);
  908. switch (createOp)
  909. {
  910. case no_sequential:
  911. case no_orderedactionlist:
  912. return createCompound(no_orderedactionlist, result);
  913. default:
  914. return createCompound(no_actionlist, result);
  915. }
  916. }
  917. void ActivityInvariantHoister::gatherChildSplitPoints(IHqlExpression * expr, ActivityInvariantInfo * info, unsigned first, unsigned last)
  918. {
  919. //NB: Don't call member functions to ensure correct nesting of transform mutexes.
  920. EclChildSplitPointLocator locator(expr, activeSelectors, info->childDependents, options.groupedChildIterators);
  921. unsigned max = expr->numChildren();
  922. //If child queries are supported then don't hoist the expressions if they might only be evaluated once
  923. //because they may be conditional
  924. bool alwaysOnce = false;
  925. switch (expr->getOperator())
  926. {
  927. case no_setresult:
  928. case no_selectnth:
  929. //set results, only done once=>don't hoist conditionals
  930. locator.findSplitPoints(expr, last, max, true, true);
  931. return;
  932. case no_createrow:
  933. case no_datasetfromrow:
  934. case no_projectrow:
  935. alwaysOnce = true;
  936. break;
  937. case no_loop:
  938. if ((options.targetClusterType != RoxieCluster) && !options.isChildQuery)
  939. {
  940. //This is ugly! The body is executed in parallel, so don't force that as a work unit result
  941. //It means some child query expressions within loops don't get forced into work unit writes
  942. //but that just means that the generated code will be not as good as it could be.
  943. const unsigned bodyArg = 4;
  944. locator.findSplitPoints(expr, 1, bodyArg, true, false);
  945. locator.findSplitPoints(expr, bodyArg, bodyArg+1, false, false);
  946. locator.findSplitPoints(expr, bodyArg+1, max, true, false);
  947. return;
  948. }
  949. break;
  950. }
  951. locator.findSplitPoints(expr, 0, first, true, true); // IF() conditions only evaluated once... => don't force
  952. locator.findSplitPoints(expr, last, max, true, alwaysOnce);
  953. }
  954. bool ActivityInvariantHoister::findSplitPoints(IHqlExpression * expr, bool isProjected)
  955. {
  956. ActivityInvariantInfo * info = queryBodyInfo(expr);
  957. if (info->visited)
  958. {
  959. if (!isProjected)
  960. info->clearProjected();
  961. if (info->isAlreadyInScope || info->isActivity || !info->containsActivity)
  962. return info->containsActivity;
  963. }
  964. else
  965. {
  966. info->visited = true;
  967. if (!isProjected)
  968. info->clearProjected();
  969. bool isActivity = true;
  970. switch (expr->getOperator())
  971. {
  972. case no_select:
  973. //either a select from a setresult or use of a child-dataset
  974. if (isNewSelector(expr))
  975. {
  976. info->containsActivity = findSplitPoints(expr->queryChild(0), false);
  977. assertex(queryBodyInfo(expr->queryChild(0))->isActivity);
  978. }
  979. if (expr->isDataset() || expr->isDatarow())
  980. {
  981. info->isActivity = true;
  982. info->containsActivity = true;
  983. }
  984. return info->containsActivity;
  985. case no_setgraphresult:
  986. {
  987. IHqlExpression * dataset = expr->queryChild(0);
  988. ActivityInvariantInfo * childInfo = queryBodyInfo(dataset);
  989. childInfo->setPotentialSpillFile(expr);
  990. break;
  991. }
  992. case no_mapto:
  993. throwUnexpected();
  994. case no_activerow:
  995. info->isActivity = true;
  996. info->containsActivity = false;
  997. return false;
  998. case no_rowset: // don't resource this as an activity
  999. isActivity = false;
  1000. break;
  1001. case no_attr:
  1002. case no_attr_expr:
  1003. case no_attr_link:
  1004. case no_getgraphloopresultset:
  1005. info->isActivity = false;
  1006. info->containsActivity = false;
  1007. return false;
  1008. case no_datasetlist:
  1009. isActivity = false;
  1010. break;
  1011. case no_rowsetrange:
  1012. {
  1013. //Don't resource this as an activity if it is a function of the input graph rows,
  1014. //however we do want to if it is coming from a dataset list.
  1015. IHqlExpression * ds = expr->queryChild(0);
  1016. //MORE: Should walk further down the tree to allow for nested rowsetranges etc.
  1017. if (ds->getOperator() == no_rowset || ds->getOperator() == no_getgraphloopresultset)
  1018. {
  1019. info->isActivity = false;
  1020. info->containsActivity = false;
  1021. return false;
  1022. }
  1023. isActivity = false;
  1024. break;
  1025. }
  1026. }
  1027. ITypeInfo * type = expr->queryType();
  1028. if (!type || type->isScalar())
  1029. return false;
  1030. info->isActivity = isActivity;
  1031. info->containsActivity = true;
  1032. }
  1033. unsigned first = getFirstActivityArgument(expr);
  1034. unsigned last = first + getNumActivityArguments(expr);
  1035. if (options.hoistResourced)
  1036. {
  1037. switch (expr->getOperator())
  1038. {
  1039. case no_allnodes:
  1040. {
  1041. //MORE: This needs to recursively walk and lift any contained no_selfnode, but don't go past another nested no_allnodes;
  1042. EclThisNodeLocator locator(info->childDependents);
  1043. locator.analyseChild(expr->queryChild(0), true);
  1044. break;
  1045. }
  1046. case no_childquery:
  1047. throwUnexpected();
  1048. default:
  1049. {
  1050. for (unsigned idx=first; idx < last; idx++)
  1051. {
  1052. IHqlExpression * cur = expr->queryChild(idx);
  1053. findSplitPoints(cur, false);
  1054. }
  1055. gatherChildSplitPoints(expr, info, first, last);
  1056. break;
  1057. }
  1058. }
  1059. ForEachItemIn(i2, info->childDependents)
  1060. {
  1061. CChildDependent & cur = info->childDependents.item(i2);
  1062. ActivityInvariantInfo * hoistedInfo = queryBodyInfo(cur.hoisted);
  1063. if (cur.projected)
  1064. hoistedInfo->addProjected(cur.projected);
  1065. else
  1066. hoistedInfo->clearProjected();
  1067. if (cur.alwaysHoist)
  1068. findSplitPoints(cur.hoisted, (cur.projected != NULL));
  1069. allChildDependents.append(OLINK(cur));
  1070. }
  1071. }
  1072. else
  1073. {
  1074. for (unsigned idx=first; idx < last; idx++)
  1075. findSplitPoints(expr->queryChild(idx), false);
  1076. }
  1077. return info->containsActivity;
  1078. }
  1079. void ActivityInvariantHoister::extendSplitPoints()
  1080. {
  1081. //NB: findSplitPoints might call this array to be extended
  1082. for (unsigned i1=0; i1 < allChildDependents.ordinality(); i1++)
  1083. {
  1084. CChildDependent & cur = allChildDependents.item(i1);
  1085. if (!cur.alwaysHoist && isWorthForcingHoist(cur.hoisted))
  1086. findSplitPoints(cur.hoisted, (cur.projected != NULL));
  1087. }
  1088. }
  1089. IHqlExpression * ActivityInvariantHoister::projectChildDependent(IHqlExpression * expr)
  1090. {
  1091. ActivityInvariantInfo * info = queryBodyInfo(expr);
  1092. if (!info || !info->projectResult || info->projected.empty())
  1093. return expr;
  1094. if (info->projectedExpr)
  1095. return info->projectedExpr;
  1096. assertex(expr->getOperator() == no_selectnth);
  1097. IHqlExpression * row = expr;
  1098. assertex(projectSelectorDatasetToField(row));
  1099. unsigned totalFields = getFieldCount(row->queryRecord());
  1100. if (totalFields == info->projected.ordinality())
  1101. {
  1102. info->clearProjected();
  1103. return expr;
  1104. }
  1105. //Create a projection containing each of the fields that are used from the child queries.
  1106. IHqlExpression * ds = row->queryChild(0);
  1107. HqlExprArray fields;
  1108. HqlExprArray values;
  1109. ForEachItemIn(i, info->projected)
  1110. {
  1111. IHqlExpression * value = &info->projected.item(i);
  1112. IHqlExpression * field = value->queryChild(1);
  1113. LinkedHqlExpr projectedField = field;
  1114. //Check for a very unusual situation where the same field is projected from two different sub records
  1115. while (fields.contains(*projectedField))
  1116. projectedField.setown(cloneFieldMangleName(field)); // Generates a new mangled name each time
  1117. OwnedHqlExpr activeDs = createRow(no_activetable, LINK(ds->queryNormalizedSelector()));
  1118. fields.append(*LINK(projectedField));
  1119. values.append(*replaceSelector(value, row, activeDs));
  1120. }
  1121. OwnedHqlExpr projectedRecord = createRecord(fields);
  1122. OwnedHqlExpr self = getSelf(projectedRecord);
  1123. HqlExprArray assigns;
  1124. ForEachItemIn(i2, fields)
  1125. {
  1126. IHqlExpression * field = &fields.item(i2);
  1127. IHqlExpression * value = &values.item(i2);
  1128. assigns.append(*createAssign(createSelectExpr(LINK(self), LINK(field)), LINK(value)));
  1129. }
  1130. OwnedHqlExpr transform = createValue(no_newtransform, makeTransformType(projectedRecord->getType()), assigns);
  1131. OwnedHqlExpr projectedDs = createDataset(no_newusertable, LINK(ds), createComma(LINK(projectedRecord), LINK(transform)));
  1132. info->projectedExpr.setown(replaceChild(row, 0, projectedDs));
  1133. findSplitPoints(info->projectedExpr, false);
  1134. //Ensure child dependents that are no longer used are not processed - otherwise the usage counts will be wrong.
  1135. ForEachItemIn(ic, info->childDependents)
  1136. {
  1137. CChildDependent & cur = info->childDependents.item(ic);
  1138. //Clearing these ensures that isResourcedActivity returns false;
  1139. cur.hoisted.clear();
  1140. cur.projectedHoisted.clear();
  1141. }
  1142. return info->projectedExpr;
  1143. }
  1144. void ActivityInvariantHoister::projectChildDependents()
  1145. {
  1146. ForEachItemIn(i2, allChildDependents)
  1147. {
  1148. CChildDependent & cur = allChildDependents.item(i2);
  1149. if (isResourcedActivity(cur.hoisted))
  1150. cur.projectedHoisted.set(projectChildDependent(cur.hoisted));
  1151. }
  1152. }
  1153. void ActivityInvariantHoister::findSplitPoints(const HqlExprArray & exprs)
  1154. {
  1155. //Finding items that should be hoisted from child queries, and evaluated at this level is tricky:
  1156. //* After hoisting a child expression, that may in turn contain other child expressions.
  1157. //* Some child expressions are only worth hoisting if they are a simple function of something
  1158. // that will be evaluated here.
  1159. //* If we're creating a single project for each x[1] then that needs to be done after all the
  1160. // expressions to hoist have been found.
  1161. //* The usage counts have to correct - including
  1162. //* The select child dependents for a ds[n] need to be inherited by the project(ds)[n]
  1163. //=>
  1164. //First walk the expression tree gathering all the expressions that will be used.
  1165. //Project the expressions that need projecting.
  1166. ForEachItemIn(idx, exprs)
  1167. findSplitPoints(&exprs.item(idx), false);
  1168. extendSplitPoints();
  1169. projectChildDependents();
  1170. }
  1171. static IHqlExpression * getScalarReplacement(CChildDependent & cur, ActivityInvariantInfo * hoistedInfo, IHqlExpression * replacement)
  1172. {
  1173. //First skip any wrappers which are there to cause things to be hoisted.
  1174. IHqlExpression * value = skipScalarWrappers(cur.original);
  1175. //Now modify the spilled result depending on how the spilled result was created (see EclHoistLocator::noteScalar() above)
  1176. if (value->getOperator() == no_select)
  1177. {
  1178. bool isNew;
  1179. IHqlExpression * ds = querySelectorDataset(value, isNew);
  1180. if(isNew || ds->isDatarow())
  1181. {
  1182. if (cur.hoisted != cur.projectedHoisted)
  1183. {
  1184. assertex(cur.projected);
  1185. unsigned match = hoistedInfo->projected.find(*cur.projected);
  1186. assertex(match != NotFound);
  1187. IHqlExpression * projectedRecord = cur.projectedHoisted->queryRecord();
  1188. IHqlExpression * projectedField = projectedRecord->queryChild(match);
  1189. return createNewSelectExpr(LINK(replacement), LINK(projectedField));
  1190. }
  1191. return replaceSelectorDataset(value, replacement);
  1192. }
  1193. //Very unusual - can occur when a thisnode(somefield) is extracted from an allnodes.
  1194. //It will have gone through the default case in the noteScalar() code
  1195. }
  1196. else if (value->getOperator() == no_createset)
  1197. {
  1198. IHqlExpression * record = replacement->queryRecord();
  1199. IHqlExpression * field = record->queryChild(0);
  1200. return createValue(no_createset, cur.original->getType(), LINK(replacement), createSelectExpr(LINK(replacement->queryNormalizedSelector()), LINK(field)));
  1201. }
  1202. IHqlExpression * record = replacement->queryRecord();
  1203. return createNewSelectExpr(LINK(replacement), LINK(record->queryChild(0)));
  1204. }
  1205. IHqlExpression * ActivityInvariantHoister::replaceResourcedReferences(ActivityInvariantInfo * info, IHqlExpression * expr)
  1206. {
  1207. if (!isAffectedByResourcing(expr))
  1208. return LINK(expr);
  1209. //Process each element in a sort list independently. Otherwise (a very obscure bug)
  1210. //sort(ds, { ds, ds.x }) will incorrectly become SORT(ds, { ds', ds'.x })
  1211. if (expr->getOperator() == no_sortlist)
  1212. {
  1213. bool same = true;
  1214. HqlExprArray args;
  1215. args.ensure(expr->numChildren());
  1216. ForEachChild(i, expr)
  1217. {
  1218. IHqlExpression * cur = expr->queryChild(i);
  1219. IHqlExpression * mapped = replaceResourcedReferences(info, cur);
  1220. args.append(*mapped);
  1221. if (cur != mapped)
  1222. same = false;
  1223. }
  1224. return same ? LINK(expr) : expr->clone(args);
  1225. }
  1226. LinkedHqlExpr mapped = expr;
  1227. if (info && (info->childDependents.ordinality()))
  1228. {
  1229. HqlExprCopyArray originals;
  1230. HqlExprArray replacements;
  1231. ForEachItemIn(i, info->childDependents)
  1232. {
  1233. CChildDependent & cur = info->childDependents.item(i);
  1234. if (!isResourcedActivity(cur.projectedHoisted))
  1235. continue;
  1236. ActivityInvariantInfo * info = queryBodyInfo(cur.projectedHoisted);
  1237. LinkedHqlExpr replacement = info->hoistedRead;
  1238. assertex(replacement);
  1239. IHqlExpression * original = cur.original;
  1240. if (!original->isDataset() && !original->isDatarow() && !original->isDictionary())
  1241. replacement.setown(getScalarReplacement(cur, queryBodyInfo(cur.hoisted), replacement));
  1242. originals.append(*original);
  1243. replacements.append(*replacement.getClear());
  1244. }
  1245. if (originals.ordinality())
  1246. {
  1247. ChildDependentReplacer replacer(originals, replacements);
  1248. mapped.setown(replacer.transformRoot(mapped));
  1249. }
  1250. }
  1251. return mapped.getClear();
  1252. }
  1253. IHqlExpression * ActivityInvariantHoister::createTransformed(IHqlExpression * expr)
  1254. {
  1255. ActivityInvariantInfo * info = queryBodyInfo(expr);
  1256. if (!info || !info->containsActivity)
  1257. return replaceResourcedReferences(info, expr);
  1258. ForEachItemIn(i2, info->childDependents)
  1259. {
  1260. CChildDependent & cur = info->childDependents.item(i2);
  1261. ensureHoisted(cur.projectedHoisted, cur.isSingleNode);
  1262. }
  1263. node_operator op = expr->getOperator();
  1264. HqlExprArray args;
  1265. OwnedHqlExpr transformed;
  1266. bool same = true;
  1267. if (!info->isActivity)
  1268. {
  1269. ForEachChild(idx, expr)
  1270. {
  1271. IHqlExpression * child = expr->queryChild(idx);
  1272. IHqlExpression * resourced = transform(child);
  1273. if (child != resourced)
  1274. same = false;
  1275. args.append(*resourced);
  1276. }
  1277. }
  1278. else
  1279. {
  1280. unsigned first = getFirstActivityArgument(expr);
  1281. unsigned last = first + getNumActivityArguments(expr);
  1282. switch (op)
  1283. {
  1284. case no_if:
  1285. case no_choose:
  1286. case no_chooseds:
  1287. {
  1288. ForEachChild(idx, expr)
  1289. {
  1290. IHqlExpression * child = expr->queryChild(idx);
  1291. IHqlExpression * resourced;
  1292. if ((idx < first) || (idx >= last))
  1293. resourced = replaceResourcedReferences(info, child);
  1294. else
  1295. resourced = transform(child);
  1296. if (child != resourced)
  1297. same = false;
  1298. args.append(*resourced);
  1299. }
  1300. break;
  1301. }
  1302. case no_case:
  1303. case no_map:
  1304. UNIMPLEMENTED;
  1305. case no_keyed:
  1306. return LINK(expr);
  1307. case no_compound:
  1308. case no_executewhen:
  1309. same = transformChildren(expr, args);
  1310. break;
  1311. case no_select:
  1312. {
  1313. //If this isn't a new selector, then it must be <LEFT|RIGHT>.child-dataset, which will not be mapped
  1314. //and the dataset will not have been resourced
  1315. if (isNewSelector(expr))
  1316. {
  1317. IHqlExpression * ds = expr->queryChild(0);
  1318. OwnedHqlExpr newDs = transform(ds);
  1319. if (ds != newDs)
  1320. {
  1321. args.append(*LINK(newDs));
  1322. unwindChildren(args, expr, 1);
  1323. if (!expr->hasAttribute(newAtom) && (newDs->getOperator() != no_select))
  1324. args.append(*LINK(queryNewSelectAttrExpr()));
  1325. same = false;
  1326. }
  1327. }
  1328. break;
  1329. }
  1330. case no_join:
  1331. case no_denormalize:
  1332. case no_denormalizegroup:
  1333. if (false)//if (isKeyedJoin(expr))
  1334. {
  1335. args.append(*transform(expr->queryChild(0)));
  1336. args.append(*LINK(expr->queryChild(1)));
  1337. unsigned max = expr->numChildren();
  1338. for (unsigned idx=2; idx < max; idx++)
  1339. args.append(*replaceResourcedReferences(info, expr->queryChild(idx)));
  1340. same = false;
  1341. break;
  1342. }
  1343. //fall through...
  1344. default:
  1345. {
  1346. IHqlExpression * activeTable = NULL;
  1347. // Check to see if the activity has a dataset which is in scope for the rest of its arguments.
  1348. // If so we'll need to remap references from the children.
  1349. if (hasActiveTopDataset(expr) && (first != last))
  1350. activeTable = expr->queryChild(0);
  1351. ForEachChild(idx, expr)
  1352. {
  1353. IHqlExpression * child = expr->queryChild(idx);
  1354. IHqlExpression * resourced;
  1355. if ((idx < first) || (idx >= last))
  1356. {
  1357. LinkedHqlExpr mapped = child;
  1358. if (activeTable && isAffectedByResourcing(child))
  1359. {
  1360. IHqlExpression * activeTableTransformed = &args.item(0);
  1361. if (activeTable != activeTableTransformed)
  1362. mapped.setown(scopedReplaceSelector(child, activeTable, activeTableTransformed));
  1363. }
  1364. resourced = replaceResourcedReferences(info, mapped);
  1365. }
  1366. else
  1367. resourced = transform(child);
  1368. if (child != resourced)
  1369. same = false;
  1370. args.append(*resourced);
  1371. }
  1372. }
  1373. break;
  1374. }
  1375. }
  1376. if (!transformed)
  1377. transformed.setown(same ? LINK(expr) : expr->clone(args));
  1378. return transformed.getClear();
  1379. }
  1380. void ActivityInvariantHoister::ensureHoisted(IHqlExpression * expr, bool isSingleNode)
  1381. {
  1382. ActivityInvariantInfo * info = queryBodyInfo(expr);
  1383. if (!info->hoistedRead)
  1384. {
  1385. if (isSingleNode)
  1386. info->noteUsedFromChild();
  1387. OwnedHqlExpr hoisted = transform(expr);
  1388. if (!info->queryOutputSpillFile())
  1389. {
  1390. //NOTE: ActivityInvariantHoister does not create no_readspill etc., because the transformation in
  1391. result.append(*info->createSpilledWrite(hoisted, false));
  1392. }
  1393. //Create the read activity which will be substituted where it was used.
  1394. OwnedHqlExpr hoistedRead = info->createSpilledRead(hoisted);
  1395. info->hoistedRead.setown(expr->cloneAllAnnotations(hoistedRead));
  1396. }
  1397. }
  1398. //---------------------------------------------------------------------------------------------------------------------
  1399. //=== The following provides information about how each kind of activity is resourced ====
  1400. static void setHashResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
  1401. {
  1402. unsigned memneeded = MEM_Const_Minimal+DISTRIBUTE_RESMEM(resources.clusterSize);
  1403. resources.set(RESslavememory, memneeded).set(REShashdist, 1);
  1404. }
  1405. //MORE: Use a single function to map an IHqlExpression to an activity kind.
  1406. void getResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
  1407. {
  1408. //MORE: What effect should a child query have? Definitely the following, but what about other resourcing?
  1409. if (options.isChildQuery || queryHint(expr, lightweightAtom))
  1410. {
  1411. resources.setLightweight();
  1412. return;
  1413. }
  1414. bool isLocal = isLocalActivity(expr);
  1415. bool isGrouped = isGroupedActivity(expr);
  1416. switch (expr->getOperator())
  1417. {
  1418. case no_join:
  1419. case no_selfjoin:
  1420. case no_denormalize:
  1421. case no_denormalizegroup:
  1422. if (isKeyedJoin(expr) || expr->hasAttribute(_lightweight_Atom))
  1423. resources.setLightweight();
  1424. else if (expr->hasAttribute(lookupAtom))
  1425. {
  1426. if (expr->hasAttribute(fewAtom))
  1427. {
  1428. resources.setLightweight().set(RESslavememory, MEM_Const_Minimal+LOOKUPJOINL_SMART_BUFFER_SIZE);
  1429. resources.setManyToMasterSockets(1);
  1430. }
  1431. else
  1432. {
  1433. resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+LOOKUPJOINL_SMART_BUFFER_SIZE);
  1434. resources.setManyToMasterSockets(1);
  1435. }
  1436. }
  1437. else if (expr->hasAttribute(smartAtom))
  1438. {
  1439. resources.setHeavyweight();
  1440. setHashResources(expr, resources, options);
  1441. }
  1442. else if (expr->hasAttribute(hashAtom))
  1443. {
  1444. resources.setHeavyweight();
  1445. setHashResources(expr, resources, options);
  1446. }
  1447. else
  1448. {
  1449. resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+SORT_BUFFER_TOTAL+JOIN_SMART_BUFFER_SIZE);
  1450. if (!isLocal)
  1451. {
  1452. #ifndef SORT_USING_MP
  1453. resources.setManyToManySockets(2);
  1454. #endif
  1455. }
  1456. }
  1457. break;
  1458. case no_dedup:
  1459. if (isGrouped || (!expr->hasAttribute(allAtom) && !expr->hasAttribute(hashAtom)))
  1460. {
  1461. resources.setLightweight();
  1462. if (!isGrouped && !isLocal)
  1463. {
  1464. resources.set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
  1465. resources.setManyToMasterSockets(1);
  1466. }
  1467. }
  1468. else if (isLocal)
  1469. {
  1470. resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
  1471. //This can't be right....
  1472. //resources.setManyToMasterSockets(1);
  1473. }
  1474. else
  1475. {
  1476. //hash dedup.
  1477. resources.setHeavyweight();
  1478. setHashResources(expr, resources, options);
  1479. }
  1480. break;
  1481. case no_rollup:
  1482. resources.setLightweight();
  1483. if (!isGrouped && !isLocal)
  1484. {
  1485. resources.set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
  1486. //MORE: Is this still correct?
  1487. resources.setManyToMasterSockets(1);
  1488. }
  1489. break;
  1490. case no_distribute:
  1491. case no_keyeddistribute:
  1492. resources.setLightweight();
  1493. setHashResources(expr, resources, options);
  1494. break;
  1495. case no_subsort:
  1496. if (expr->hasAttribute(manyAtom))
  1497. resources.setHeavyweight();
  1498. else
  1499. resources.setLightweight();
  1500. break;
  1501. case no_sort:
  1502. if (isGrouped)
  1503. {
  1504. if (expr->hasAttribute(manyAtom))
  1505. resources.setHeavyweight();
  1506. else
  1507. resources.setLightweight();
  1508. }
  1509. else if (expr->hasAttribute(fewAtom) && isLocal)
  1510. resources.setLightweight();
  1511. else if (isLocal)
  1512. resources.setHeavyweight();
  1513. else
  1514. {
  1515. resources.setHeavyweight();
  1516. #ifndef SORT_USING_MP
  1517. resources.setManyToManySockets(2);
  1518. #endif
  1519. }
  1520. break;
  1521. case no_topn:
  1522. resources.setLightweight();
  1523. break;
  1524. case no_pipe:
  1525. //surely it should be something like this.
  1526. resources.setLightweight().set(RESslavesocket, 1);
  1527. break;
  1528. case no_table:
  1529. {
  1530. IHqlExpression * mode = expr->queryChild(2);
  1531. if (mode && mode->getOperator() == no_pipe)
  1532. {
  1533. resources.setLightweight().set(RESslavesocket, 1);
  1534. }
  1535. else
  1536. {
  1537. resources.setLightweight();
  1538. if (expr->hasAttribute(_workflowPersist_Atom) && expr->hasAttribute(distributedAtom))
  1539. setHashResources(expr, resources, options); // may require a hash distribute
  1540. }
  1541. break;
  1542. }
  1543. case no_output:
  1544. {
  1545. IHqlExpression * filename = expr->queryChild(1);
  1546. if (expr->hasAttribute(_spill_Atom))
  1547. {
  1548. //resources.setLightweight(); // assume no resources(!)
  1549. }
  1550. else if (filename && filename->getOperator() == no_pipe)
  1551. {
  1552. resources.setLightweight().set(RESslavesocket, 1);
  1553. }
  1554. else if (filename && !filename->isAttribute())
  1555. {
  1556. resources.setLightweight();
  1557. }
  1558. else
  1559. {
  1560. resources.setLightweight().set(RESslavememory, WORKUNITWRITE_SMART_BUFFER_SIZE);
  1561. }
  1562. break;
  1563. }
  1564. case no_distribution:
  1565. resources.setLightweight().set(RESmastersocket, 16).set(RESslavesocket, 1);
  1566. break;
  1567. case no_aggregate:
  1568. case no_newaggregate:
  1569. {
  1570. IHqlExpression * grouping = queryRealChild(expr, 3);
  1571. if (grouping)
  1572. {
  1573. //Is this really correct???
  1574. resources.setLightweight();
  1575. setHashResources(expr, resources, options);
  1576. }
  1577. else
  1578. {
  1579. resources.setLightweight();
  1580. //if (!isGrouped)
  1581. // resources.set(RESmastersocket, 16).set(RESslavesocket, 1);
  1582. }
  1583. }
  1584. break;
  1585. case no_hqlproject:
  1586. resources.setLightweight();
  1587. //Add a flag onto count project to indicate it is a different variety.
  1588. if (expr->hasAttribute(_countProject_Atom) && !isLocal)
  1589. resources.set(RESslavememory, COUNTPROJECT_SMART_BUFFER_SIZE);
  1590. break;
  1591. case no_enth:
  1592. resources.setLightweight();
  1593. if (!isLocal)
  1594. resources.set(RESslavememory, CHOOSESETS_SMART_BUFFER_SIZE);
  1595. break;
  1596. case no_metaactivity:
  1597. if (expr->hasAttribute(pullAtom))
  1598. resources.setLightweight().set(RESslavememory, PULL_SMART_BUFFER_SIZE);
  1599. break;
  1600. case no_setresult:
  1601. case no_extractresult:
  1602. case no_outputscalar:
  1603. resources.setLightweight();//.set(RESmastersocket, 1).set(RESslavesocket, 1);
  1604. break;
  1605. case no_choosesets:
  1606. resources.setLightweight();
  1607. if (!isLocal || expr->hasAttribute(enthAtom) || expr->hasAttribute(lastAtom))
  1608. resources.set(RESslavememory, CHOOSESETS_SMART_BUFFER_SIZE);
  1609. break;
  1610. case no_iterate:
  1611. resources.setLightweight();
  1612. if (!isGrouped && !isLocal)
  1613. resources.setManyToMasterSockets(1).set(RESslavememory, ITERATE_SMART_BUFFER_SIZE);
  1614. break;
  1615. case no_choosen:
  1616. resources.setLightweight().set(RESslavememory, FIRSTN_SMART_BUFFER_SIZE);
  1617. break;
  1618. case no_spill:
  1619. case no_spillgraphresult:
  1620. //assumed to take no resources;
  1621. break;
  1622. case no_addfiles:
  1623. case no_merge:
  1624. {
  1625. resources.setLightweight();
  1626. unsigned bufSize = FUNNEL_PERINPUT_BUFF_SIZE*expr->numChildren();
  1627. if (bufSize < FUNNEL_MIN_BUFF_SIZE) bufSize = FUNNEL_MIN_BUFF_SIZE;
  1628. resources.set(RESslavememory, MEM_Const_Minimal+bufSize);
  1629. break;
  1630. }
  1631. case no_compound:
  1632. //MORE: Should really be the total resources for the lhs... Really needs more thought.
  1633. break;
  1634. case no_libraryselect:
  1635. //Do not allocate any resources for this, we don't want it to cause a spill under any circumstances
  1636. break;
  1637. case no_split:
  1638. //Should really be included in the cost....
  1639. default:
  1640. resources.setLightweight();
  1641. break;
  1642. }
  1643. }
  1644. CResources & CResources::setLightweight()
  1645. {
  1646. return set(RESslavememory, 0x10000).set(RESactivities, 1);
  1647. }
  1648. CResources & CResources::setHeavyweight()
  1649. {
  1650. return set(RESslavememory, 0x100000).set(RESheavy, 1).set(RESactivities, 1);
  1651. }
  1652. //-------------------------------------------------------------------------------------------
  1653. const char * queryResourceName(ResourceType kind)
  1654. {
  1655. switch (kind)
  1656. {
  1657. case RESslavememory: return "Slave Memory";
  1658. case RESslavesocket: return "Slave Sockets";
  1659. case RESmastermemory: return "Master Memory";
  1660. case RESmastersocket: return "Master Sockets";
  1661. case REShashdist: return "Hash Distributes";
  1662. case RESheavy: return "Heavyweight";
  1663. case RESactivities: return "Activities";
  1664. }
  1665. return "Unknown";
  1666. }
  1667. inline ResourcerInfo * queryResourceInfo(IHqlExpression * expr) { return (ResourcerInfo *)expr->queryBody()->queryTransformExtra(); }
  1668. inline bool isResourcedActivity(IHqlExpression * expr)
  1669. {
  1670. if (!expr)
  1671. return false;
  1672. ResourcerInfo * extra = queryResourceInfo(expr);
  1673. return extra && extra->isResourcedActivity();
  1674. }
  1675. bool isWorthForcingHoist(IHqlExpression * expr)
  1676. {
  1677. for (;;)
  1678. {
  1679. switch (expr->getOperator())
  1680. {
  1681. case no_selectnth:
  1682. case no_filter:
  1683. case no_newaggregate:
  1684. if (isResourcedActivity(expr->queryChild(0)))
  1685. return true;
  1686. break;
  1687. default:
  1688. return false;
  1689. }
  1690. expr = expr->queryChild(0);
  1691. }
  1692. }
  1693. void CResources::add(const CResources & other)
  1694. {
  1695. for (unsigned i = 0; i < RESmax; i++)
  1696. resource[i] += other.resource[i];
  1697. }
  1698. bool CResources::addExceeds(const CResources & other, const CResources & limit) const
  1699. {
  1700. for (unsigned i = 0; i < RESmax; i++)
  1701. {
  1702. if (resource[i] + other.resource[i] > limit.resource[i])
  1703. {
  1704. //DBGLOG("Cannot merge because limit for %s exceeded (%u cf %u)", queryResourceName((ResourceType)i), resource[i] + other.resource[i], limit.resource[i]);
  1705. return true;
  1706. }
  1707. }
  1708. return false;
  1709. }
  1710. StringBuffer & CResources::getExceedsReason(StringBuffer & reasonText, const CResources & other, const CResources & limit) const
  1711. {
  1712. bool first = true;
  1713. for (unsigned i = 0; i < RESmax; i++)
  1714. {
  1715. if (resource[i] + other.resource[i] > limit.resource[i])
  1716. {
  1717. if (!first) reasonText.append(", ");
  1718. first = false;
  1719. reasonText.appendf("%s (%u>%u)", queryResourceName((ResourceType)i), resource[i] + other.resource[i], limit.resource[i]);
  1720. }
  1721. }
  1722. return reasonText;
  1723. }
  1724. bool CResources::exceeds(const CResources & limit) const
  1725. {
  1726. for (unsigned i = 0; i < RESmax; i++)
  1727. {
  1728. if (resource[i] > limit.resource[i])
  1729. return true;
  1730. }
  1731. return false;
  1732. }
  1733. void CResources::maximize(const CResources & other)
  1734. {
  1735. for (unsigned i = 0; i < RESmax; i++)
  1736. {
  1737. if (resource[i] < other.resource[i])
  1738. resource[i] = other.resource[i];
  1739. }
  1740. }
  1741. CResources & CResources::setManyToMasterSockets(unsigned numSockets)
  1742. {
  1743. set(RESslavesocket, numSockets);
  1744. set(RESmastersocket, numSockets * clusterSize);
  1745. return *this;
  1746. }
  1747. CResources & CResources::setManyToManySockets(unsigned numSockets)
  1748. {
  1749. return set(RESslavesocket, numSockets * clusterSize);
  1750. }
  1751. void CResources::sub(const CResources & other)
  1752. {
  1753. for (unsigned i = 0; i < RESmax; i++)
  1754. resource[i] -= other.resource[i];
  1755. }
  1756. //===========================================================================
  1757. bool isSimpleAggregateResult(IHqlExpression * expr)
  1758. {
  1759. //MORE: no_extractresult is really what is meant
  1760. if (expr->getOperator() != no_extractresult)
  1761. return false;
  1762. IHqlExpression * value = expr->queryChild(0);
  1763. if (value->getOperator() != no_datasetfromrow)
  1764. return false;
  1765. //MORE: This currently doesn't hoist selects from nested records, but not sure there is a syntax to do that either.
  1766. IHqlExpression * ds = value->queryChild(0);
  1767. if (!isSelectFirstRow(ds))
  1768. return false;
  1769. ds = ds->queryChild(0);
  1770. if (ds->getOperator() != no_newaggregate)
  1771. return false;
  1772. return true;
  1773. }
  1774. bool lightweightAndReducesDatasetSize(IHqlExpression * expr)
  1775. {
  1776. switch (expr->getOperator())
  1777. {
  1778. case no_hqlproject:
  1779. case no_newusertable:
  1780. return reducesRowSize(expr);
  1781. case no_dedup:
  1782. if (isGroupedActivity(expr) || (!expr->hasAttribute(allAtom) && !expr->hasAttribute(hashAtom)))
  1783. return true;
  1784. break;
  1785. case no_aggregate:
  1786. case no_newaggregate:
  1787. {
  1788. IHqlExpression * grouping = queryRealChild(expr, 3);
  1789. if (grouping)
  1790. return false;
  1791. return true;
  1792. }
  1793. case no_rollupgroup:
  1794. case no_rollup:
  1795. case no_choosen:
  1796. case no_choosesets:
  1797. case no_enth:
  1798. case no_sample:
  1799. case no_filter:
  1800. case no_limit:
  1801. case no_filtergroup:
  1802. return true;
  1803. case no_group:
  1804. //removing grouping will reduce size of the spill file.
  1805. if (!isGrouped(expr))
  1806. return true;
  1807. break;
  1808. }
  1809. return false;
  1810. }
  1811. bool heavyweightAndReducesSizeOrSkew(IHqlExpression * expr)
  1812. {
  1813. switch (expr->getOperator())
  1814. {
  1815. case no_aggregate:
  1816. case no_newaggregate:
  1817. {
  1818. //more; hash aggregate?
  1819. break;
  1820. }
  1821. case no_distribute:
  1822. return true;
  1823. }
  1824. return false;
  1825. }
  1826. //---------------------------------------------------------------------------
  1827. IHqlExpression * CResourceOptions::createResultSpillName()
  1828. {
  1829. return getSizetConstant(nextResult++);
  1830. }
  1831. IHqlExpression * CResourceOptions::createDiskSpillName()
  1832. {
  1833. StringBuffer s;
  1834. s.append("~spill::");
  1835. appendUniqueId(s, spillSequence.next());
  1836. return createConstant(s.str());
  1837. }
  1838. IHqlExpression * CResourceOptions::createGlobalSpillName()
  1839. {
  1840. StringBuffer valueText;
  1841. appendUniqueId(valueText.append("spill"), spillSequence.next());
  1842. return createConstant(valueText.str());
  1843. }
  1844. //---------------------------------------------------------------------------
  1845. IHqlExpression * appendUniqueAttr(IHqlExpression * expr)
  1846. {
  1847. return replaceOwnedAttribute(expr, createUniqueId());
  1848. }
  1849. bool queryAddUniqueToActivity(IHqlExpression * expr)
  1850. {
  1851. if (!isSourceActivity(expr))
  1852. return false;
  1853. switch (expr->getOperator())
  1854. {
  1855. case no_workunit_dataset:
  1856. case no_getgraphresult:
  1857. case no_getgraphloopresult:
  1858. case no_xmlproject:
  1859. case no_datasetfromrow:
  1860. case no_datasetfromdictionary:
  1861. case no_rows:
  1862. case no_allnodes:
  1863. case no_thisnode:
  1864. case no_select: // it can get lost, and that then causes inconsistent trees.
  1865. return false;
  1866. }
  1867. return true;
  1868. }
  1869. //---------------------------------------------------------------------------
  1870. ResourceGraphLink::ResourceGraphLink(ResourceGraphInfo * _sourceGraph, IHqlExpression * _sourceNode, ResourceGraphInfo * _sinkGraph, IHqlExpression * _sinkNode, LinkKind _linkKind)
  1871. {
  1872. assertex(_sourceGraph);
  1873. sourceGraph.set(_sourceGraph);
  1874. sourceNode.set(_sourceNode);
  1875. sinkGraph.set(_sinkGraph);
  1876. sinkNode.set(_sinkNode);
  1877. assertex(!sinkGraph || sinkNode);
  1878. linkKind = _linkKind;
  1879. trace("create");
  1880. }
  1881. ResourceGraphLink::~ResourceGraphLink()
  1882. {
  1883. trace("delete");
  1884. }
  1885. void ResourceGraphLink::changeSourceGraph(ResourceGraphInfo * newGraph)
  1886. {
  1887. sourceGraph->sinks.zap(*this);
  1888. sourceGraph.set(newGraph);
  1889. newGraph->sinks.append(*this);
  1890. trace("change source");
  1891. }
  1892. void ResourceGraphLink::changeSinkGraph(ResourceGraphInfo * newGraph)
  1893. {
  1894. sinkGraph->sources.zap(*this);
  1895. sinkGraph.set(newGraph);
  1896. newGraph->sources.append(*this);
  1897. trace("change sink");
  1898. }
  1899. bool ResourceGraphLink::isRedundantLink()
  1900. {
  1901. ResourcerInfo * info = queryResourceInfo(sourceNode);
  1902. return info->expandRatherThanSpill(false);
  1903. }
  1904. void ResourceGraphLink::trace(const char * name)
  1905. {
  1906. #ifdef TRACE_RESOURCING
  1907. PrintLog("%s: %lx source(%lx,%lx) sink(%lx,%lx) %s", name, this, sourceGraph.get(), sourceNode->queryBody(), sinkGraph.get(), sinkNode ? sinkNode->queryBody() : NULL,
  1908. linkKind == SequenceLink ? "sequence" : "");
  1909. #endif
  1910. }
  1911. ResourceGraphDependencyLink::ResourceGraphDependencyLink(ResourceGraphInfo * _sourceGraph, IHqlExpression * _sourceNode, ResourceGraphInfo * _sinkGraph, IHqlExpression * _sinkNode, IHqlExpression * _dependency)
  1912. : ResourceGraphLink(_sourceGraph, _sourceNode, _sinkGraph, _sinkNode, UnconditionalLink), dependency(_dependency)
  1913. {
  1914. }
  1915. void ResourceGraphDependencyLink::changeSourceGraph(ResourceGraphInfo * newGraph)
  1916. {
  1917. sourceGraph.set(newGraph);
  1918. trace("change source");
  1919. }
  1920. void ResourceGraphDependencyLink::changeSinkGraph(ResourceGraphInfo * newGraph)
  1921. {
  1922. sinkGraph.set(newGraph);
  1923. newGraph->dependsOn.append(*this);
  1924. trace("change sink");
  1925. }
  1926. //---------------------------------------------------------------------------
  1927. ResourceGraphInfo::ResourceGraphInfo(CResourceOptions * _options) : resources(_options->clusterSize)
  1928. {
  1929. options = _options;
  1930. depth = 0;
  1931. depthSequence = -1;
  1932. beenResourced = false;
  1933. isUnconditional = false;
  1934. mergedConditionSource = false;
  1935. hasConditionSource = false;
  1936. hasSequentialSource = false;
  1937. hasRootActivity = false;
  1938. isDead = false;
  1939. startedGeneratingResourced = false;
  1940. inheritedExpandedDependencies = false;
  1941. cachedDependent.other = NULL;
  1942. cachedDependent.ignoreSources = false;
  1943. cachedDependent.updateSequence = 0;
  1944. cachedDependent.value = false;
  1945. }
  1946. ResourceGraphInfo::~ResourceGraphInfo()
  1947. {
  1948. }
  1949. bool ResourceGraphInfo::addCondition(IHqlExpression * condition)
  1950. {
  1951. if (conditions.find(*condition) == NotFound)
  1952. {
  1953. conditions.append(*LINK(condition));
  1954. #ifdef SPOT_UNCONDITIONAL_CONDITIONS
  1955. IAtom * name = condition->queryName();
  1956. IAtom * invName = NULL;
  1957. if (name == trueAtom)
  1958. invName = falseAtom;
  1959. else if (name == falseAtom)
  1960. invName = trueAtom;
  1961. if (invName)
  1962. {
  1963. IHqlExpression * parent = condition->queryChild(1);
  1964. OwnedHqlExpr invTag = createAttribute(invName, LINK(condition->queryChild(0)), LINK(parent));
  1965. if (conditions.find(*invTag) != NotFound)
  1966. {
  1967. if (!parent)
  1968. return true;
  1969. return addCondition(parent);
  1970. }
  1971. }
  1972. #endif
  1973. }
  1974. return false;
  1975. }
  1976. bool ResourceGraphInfo::isSharedInput(IHqlExpression * expr)
  1977. {
  1978. IHqlExpression * body = expr->queryBody();
  1979. if (unbalancedExternalSources.contains(*body))
  1980. return false;
  1981. if (queryResourceInfo(expr)->expandRatherThanSplit())
  1982. return false;
  1983. unsigned numUses = 0;
  1984. ForEachItemIn(i, balancedExternalSources)
  1985. {
  1986. if (body == &balancedExternalSources.item(i))
  1987. numUses++;
  1988. }
  1989. //NumUses could be zero if an input should be expanded, and that input is shared by another graph which also expands
  1990. //the input. E.g. project(meta(diskread).
  1991. return numUses > 1;
  1992. }
  1993. void ResourceGraphInfo::addSharedInput(IHqlExpression * expr, IHqlExpression * mapped)
  1994. {
  1995. sharedInputs.append(*LINK(expr));
  1996. sharedInputs.append(*LINK(mapped));
  1997. }
  1998. IHqlExpression * ResourceGraphInfo::queryMappedSharedInput(IHqlExpression * expr)
  1999. {
  2000. unsigned max = sharedInputs.ordinality();
  2001. for (unsigned i=0; i < max; i+= 2)
  2002. {
  2003. if (expr == &sharedInputs.item(i))
  2004. return &sharedInputs.item(i+1);
  2005. }
  2006. return NULL;
  2007. }
  2008. bool ResourceGraphInfo::allocateResources(const CResources & value, const CResources & limit)
  2009. {
  2010. if (resources.addExceeds(value, limit))
  2011. return false;
  2012. resources.add(value);
  2013. return true;
  2014. }
  2015. bool ResourceGraphInfo::canMergeActionAsSibling(bool sequential) const
  2016. {
  2017. //sequential requires root actions in separate graphs
  2018. if (sequential && hasRootActivity)
  2019. return false;
  2020. //Dependent actions need to be in a separate graph if this flag is set.
  2021. if (options->actionLinkInNewGraph && !hasRootActivity)
  2022. return false;
  2023. return true;
  2024. }
  2025. bool ResourceGraphInfo::containsActiveSinks() const
  2026. {
  2027. ForEachItemIn(idx, sinks)
  2028. {
  2029. ResourcerInfo * info = queryResourceInfo(sinks.item(idx).sourceNode);
  2030. if (!info->expandRatherThanSpill(false))
  2031. return true;
  2032. }
  2033. return false;
  2034. }
  2035. bool ResourceGraphInfo::containsActionSink() const
  2036. {
  2037. ForEachItemIn(idx, sinks)
  2038. {
  2039. IHqlExpression * sink = sinks.item(idx).sourceNode;
  2040. if (sink->isAction())
  2041. return true;
  2042. }
  2043. return false;
  2044. }
  2045. void ResourceGraphInfo::display()
  2046. {
  2047. StringBuffer s;
  2048. s.appendf("graph %p src(", this);
  2049. ForEachItemIn(idxs, sources)
  2050. s.appendf("%p ", sources.item(idxs).sourceGraph.get());
  2051. s.append(") dep(");
  2052. ForEachItemIn(idxd, dependsOn)
  2053. s.appendf("%p ", dependsOn.item(idxd).sourceGraph.get());
  2054. s.append(")");
  2055. if (isUnconditional)
  2056. s.append(" <unconditional>");
  2057. DBGLOG("%s", s.str());
  2058. }
  2059. void ResourceGraphInfo::getMergeFailReason(StringBuffer & reasonText, ResourceGraphInfo * otherGraph, const CResources & limit)
  2060. {
  2061. resources.getExceedsReason(reasonText, otherGraph->resources, limit);
  2062. }
  2063. unsigned ResourceGraphInfo::getDepth()
  2064. {
  2065. //If no graphs have been merged since this was last called it is still valid.
  2066. if (depthSequence == options->state.updateSequence)
  2067. return depth;
  2068. depthSequence = options->state.updateSequence;
  2069. depth = 0;
  2070. ForEachItemIn(idx, dependsOn)
  2071. {
  2072. ResourceGraphInfo * source = dependsOn.item(idx).sourceGraph;
  2073. if (source->getDepth() >= depth)
  2074. depth = source->getDepth() + 1;
  2075. }
  2076. ForEachItemIn(idx2, sources)
  2077. {
  2078. ResourceGraphInfo * source = sources.item(idx2).sourceGraph;
  2079. if (source->getDepth() >= depth)
  2080. depth = source->getDepth() + 1;
  2081. }
  2082. return depth;
  2083. }
  2084. bool ResourceGraphInfo::hasSameConditions(ResourceGraphInfo & other)
  2085. {
  2086. if (conditions.ordinality() != other.conditions.ordinality())
  2087. return false;
  2088. ForEachItemIn(i, conditions)
  2089. if (other.conditions.find(conditions.item(i)) == NotFound)
  2090. return false;
  2091. return true;
  2092. }
  2093. bool ResourceGraphInfo::evalDependentOn(ResourceGraphInfo & other, bool ignoreSources)
  2094. {
  2095. ForEachItemIn(idx, dependsOn)
  2096. {
  2097. ResourceGraphInfo * cur = dependsOn.item(idx).sourceGraph;
  2098. if (cur == &other)
  2099. return true;
  2100. if (cur->isDependentOn(other, false))
  2101. return true;
  2102. }
  2103. ForEachItemIn(idx2, sources)
  2104. {
  2105. ResourceGraphInfo * cur = sources.item(idx2).sourceGraph;
  2106. if ((cur == &other) && !ignoreSources)
  2107. return true;
  2108. if (cur->isDependentOn(other, false))
  2109. return true;
  2110. }
  2111. return false;
  2112. }
  2113. bool ResourceGraphInfo::isDependentOn(ResourceGraphInfo & other, bool ignoreSources)
  2114. {
  2115. //Cache the last query so that traversal doesn't convert a graph into a tree walk
  2116. if ((cachedDependent.other == &other) && (cachedDependent.ignoreSources == ignoreSources) &&
  2117. (cachedDependent.updateSequence == options->state.updateSequence))
  2118. return cachedDependent.value;
  2119. if (getDepth() <= other.getDepth())
  2120. return false;
  2121. cachedDependent.other = &other;
  2122. cachedDependent.ignoreSources = ignoreSources;
  2123. cachedDependent.updateSequence = options->state.updateSequence;
  2124. cachedDependent.value = evalDependentOn(other, ignoreSources);
  2125. return cachedDependent.value;
  2126. }
  2127. bool ResourceGraphInfo::isVeryCheap()
  2128. {
  2129. if (sinks.ordinality() != 1)
  2130. return false;
  2131. IHqlExpression * sourceNode = sinks.item(0).sourceNode;
  2132. if (isSimpleAggregateResult(sourceNode))
  2133. return true;
  2134. // Not sure about the following....
  2135. // if (sourceNode->getOperator() == no_setresult)
  2136. // return true;
  2137. //Could be other examples...
  2138. return false;
  2139. }
  2140. bool ResourceGraphInfo::mergeInSource(ResourceGraphInfo & other, const CResources & limit, bool mergeInSpillOutput)
  2141. {
  2142. bool mergeConditions = mergeInSpillOutput;
  2143. if (!isUnconditional && !mergeInSpillOutput)
  2144. {
  2145. //if it is conditional and it is very cheap then it is still more efficient to merge
  2146. //rather than read from a spill file.
  2147. if (other.isUnconditional || !hasSameConditions(other))
  2148. {
  2149. if ((hasConditionSource || !isVeryCheap()) && (other.sinks.ordinality() != 1))
  2150. return false;
  2151. mergeConditions = true;
  2152. }
  2153. }
  2154. if (isDependentOn(other, true))
  2155. return false;
  2156. if (!options->canSplit())
  2157. {
  2158. //Don't merge two graphs that will cause a splitter to be created
  2159. //Either already used internally, or an output will be merged twice
  2160. HqlExprArray mergeNodes;
  2161. ForEachItemIn(idx, sources)
  2162. {
  2163. ResourceGraphLink & cur = sources.item(idx);
  2164. if (cur.sourceGraph == &other)
  2165. {
  2166. IHqlExpression * sourceNode = cur.sourceNode->queryBody();
  2167. ResourcerInfo * info = queryResourceInfo(sourceNode);
  2168. if ((info->numInternalUses() != 0) || (mergeNodes.find(*sourceNode) != NotFound) || info->preventMerge())
  2169. return false;
  2170. mergeNodes.append(*LINK(sourceNode));
  2171. }
  2172. }
  2173. }
  2174. if (options->checkResources() && !allocateResources(other.resources, limit))
  2175. return false;
  2176. if (hasSequentialSource && other.hasSequentialSource)
  2177. return false;
  2178. if (mergeConditions)
  2179. {
  2180. //Merging a conditional spill output into a child graph should not make that graph conditional
  2181. isUnconditional = other.isUnconditional;
  2182. }
  2183. mergeGraph(other, mergeConditions);
  2184. return true;
  2185. }
  2186. void ResourceGraphInfo::mergeGraph(ResourceGraphInfo & other, bool mergeConditions)
  2187. {
  2188. #ifdef TRACE_RESOURCING
  2189. DBGLOG("Merging%s source into%s sink", other.isUnconditional ? "" : " conditional", isUnconditional ? "" : " conditional");
  2190. other.display();
  2191. display();
  2192. PrintLog("Merge %p into %p", &other, this);
  2193. #endif
  2194. if (other.hasConditionSource)
  2195. hasConditionSource = true;
  2196. if (other.hasSequentialSource)
  2197. {
  2198. assertex(!hasSequentialSource);
  2199. hasSequentialSource = true;
  2200. }
  2201. //Recalculate the dependents, because sources of the source merged in may no longer be indirect
  2202. //although they may be via another path.
  2203. options->noteGraphsChanged();
  2204. //If was very cheap and merged into an unconditional graph, make sure this is now tagged as
  2205. //unconditional...
  2206. if (other.isUnconditional)
  2207. isUnconditional = true;
  2208. //We need to stop spills being an arm of a conditional branch - otherwise they won't get executed.
  2209. //so see if we have merged any conditional branches in
  2210. if (other.mergedConditionSource)
  2211. mergedConditionSource = true;
  2212. if (mergeConditions)
  2213. {
  2214. //Replace conditions with those of parent. Only called when a very simple graph is
  2215. //merged in, so replace conditions instead of merging
  2216. conditions.kill();
  2217. ForEachItemIn(i, other.conditions)
  2218. conditions.append(OLINK(other.conditions.item(i)));
  2219. }
  2220. //sources and sinks are updated elsewhere...
  2221. }
  2222. bool ResourceGraphInfo::mergeInSibling(ResourceGraphInfo & other, const CResources & limit)
  2223. {
  2224. if ((!isUnconditional || !other.isUnconditional) && !hasSameConditions(other))
  2225. return false;
  2226. if (hasSequentialSource && other.hasSequentialSource)
  2227. return false;
  2228. if (isDependentOn(other, false) || other.isDependentOn(*this, false))
  2229. return false;
  2230. if (options->checkResources() && !allocateResources(other.resources, limit))
  2231. return false;
  2232. mergeGraph(other, false);
  2233. return true;
  2234. }
  2235. void ResourceGraphInfo::removeResources(const CResources & value)
  2236. {
  2237. resources.sub(value);
  2238. }
  2239. //---------------------------------------------------------------------------
  2240. unsigned ChildDependentArray::findOriginal(IHqlExpression * expr)
  2241. {
  2242. ForEachItem(i)
  2243. {
  2244. if (item(i).original == expr)
  2245. return i;
  2246. }
  2247. return NotFound;
  2248. }
  2249. //---------------------------------------------------------------------------
  2250. static void appendCloneAttribute(HqlExprArray & args, IHqlExpression * expr, IAtom * name)
  2251. {
  2252. IHqlExpression * prop = expr->queryAttribute(name);
  2253. if (prop)
  2254. args.append(*LINK(prop));
  2255. }
  2256. SpillerInfo::SpillerInfo(IHqlExpression * _original, CResourceOptions * _options) : NewTransformInfo(_original), options(_options)
  2257. {
  2258. outputToUseForSpill = NULL;
  2259. linkedFromChild = false;
  2260. }
  2261. void SpillerInfo::addSpillFlags(HqlExprArray & args, bool isRead)
  2262. {
  2263. if (isGrouped(original))
  2264. args.append(*createAttribute(groupedAtom));
  2265. if (outputToUseForSpill)
  2266. {
  2267. assertex(isRead);
  2268. appendCloneAttribute(args, outputToUseForSpill, __compressed__Atom);
  2269. appendCloneAttribute(args, outputToUseForSpill, jobTempAtom);
  2270. appendCloneAttribute(args, outputToUseForSpill, _spill_Atom);
  2271. }
  2272. else
  2273. {
  2274. args.append(*createAttribute(__compressed__Atom));
  2275. args.append(*createAttribute(_spill_Atom));
  2276. args.append(*createAttribute(_noReplicate_Atom));
  2277. if (options->targetRoxie())
  2278. args.append(*createAttribute(_noAccess_Atom));
  2279. }
  2280. if (isRead)
  2281. {
  2282. args.append(*createAttribute(_noVirtual_Atom)); // Don't interpret virtual fields as virtual...
  2283. if (!original->isDataset() || hasSingleRow(original))
  2284. args.append(*createAttribute(rowAtom)); // add a flag so the selectnth[1] can be removed later...
  2285. }
  2286. }
  2287. IHqlExpression * SpillerInfo::createSpillName()
  2288. {
  2289. if (outputToUseForSpill)
  2290. {
  2291. switch (outputToUseForSpill->getOperator())
  2292. {
  2293. case no_setgraphresult:
  2294. return LINK(outputToUseForSpill->queryChild(2));
  2295. case no_output:
  2296. {
  2297. if (isFileOutput(outputToUseForSpill))
  2298. return LINK(outputToUseForSpill->queryChild(1));
  2299. IHqlExpression * nameAttr = outputToUseForSpill->queryAttribute(namedAtom);
  2300. assertex(nameAttr);
  2301. return LINK(nameAttr->queryChild(0));
  2302. }
  2303. default:
  2304. throwUnexpected();
  2305. }
  2306. }
  2307. if (!spillName)
  2308. {
  2309. if (useGraphResult())
  2310. spillName.setown(options->createResultSpillName());
  2311. else if (useGlobalResult())
  2312. spillName.setown(options->createGlobalSpillName());
  2313. else
  2314. spillName.setown(options->createDiskSpillName());
  2315. }
  2316. return LINK(spillName);
  2317. }
  2318. IHqlExpression * SpillerInfo::createSpilledRead(IHqlExpression * spillReason)
  2319. {
  2320. OwnedHqlExpr dataset;
  2321. HqlExprArray args;
  2322. IHqlExpression * record = original->queryRecord();
  2323. bool loseDistribution = true;
  2324. if (useGraphResult())
  2325. {
  2326. if (spilledDataset)
  2327. args.append(*LINK(spilledDataset));
  2328. else
  2329. args.append(*LINK(record));
  2330. args.append(*LINK(options->graphIdExpr));
  2331. args.append(*createSpillName());
  2332. if (isGrouped(original))
  2333. args.append(*createAttribute(groupedAtom));
  2334. if (!original->isDataset())
  2335. args.append(*createAttribute(rowAtom));
  2336. args.append(*createAttribute(_spill_Atom));
  2337. IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
  2338. if (recordCountAttr)
  2339. args.append(*LINK(recordCountAttr));
  2340. if (options->targetThor() && original->isDataset() && !options->isChildQuery)
  2341. args.append(*createAttribute(_distributed_Atom));
  2342. node_operator readOp = spilledDataset ? no_readspill : no_getgraphresult;
  2343. if (original->isDatarow())
  2344. dataset.setown(createRow(readOp, args));
  2345. else if (original->isDictionary())
  2346. dataset.setown(createDictionary(readOp, args));
  2347. else
  2348. dataset.setown(createDataset(readOp, args));
  2349. loseDistribution = false;
  2350. }
  2351. else if (useGlobalResult())
  2352. {
  2353. args.append(*LINK(record));
  2354. args.append(*createAttribute(nameAtom, createSpillName()));
  2355. args.append(*createAttribute(sequenceAtom, getLocalSequenceNumber()));
  2356. addSpillFlags(args, true);
  2357. IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
  2358. if (recordCountAttr)
  2359. args.append(*LINK(recordCountAttr));
  2360. if (original->isDictionary())
  2361. dataset.setown(createDictionary(no_workunit_dataset, args));
  2362. else
  2363. dataset.setown(createDataset(no_workunit_dataset, args));
  2364. }
  2365. else
  2366. {
  2367. if (spilledDataset)
  2368. {
  2369. args.append(*LINK(spilledDataset));
  2370. args.append(*createSpillName());
  2371. }
  2372. else
  2373. {
  2374. args.append(*createSpillName());
  2375. args.append(*LINK(record));
  2376. }
  2377. args.append(*createValue(no_thor));
  2378. addSpillFlags(args, true);
  2379. args.append(*createUniqueId());
  2380. args.append(*createExprAttribute(_signed_Atom, createConstant("hpcc")));
  2381. if (options->isChildQuery && options->targetRoxie())
  2382. {
  2383. args.append(*createAttribute(_colocal_Atom));
  2384. args.append(*createLocalAttribute());
  2385. }
  2386. if (spillReason)
  2387. args.append(*LINK(spillReason));
  2388. if (spilledDataset)
  2389. dataset.setown(createDataset(no_readspill, args));
  2390. else
  2391. {
  2392. IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
  2393. if (recordCountAttr)
  2394. args.append(*LINK(recordCountAttr));
  2395. dataset.setown(createDataset(no_table, args));
  2396. }
  2397. if (original->isDictionary() && !dataset->isDictionary())
  2398. dataset.setown(createDictionary(no_createdictionary, LINK(dataset)));
  2399. loseDistribution = false;
  2400. }
  2401. dataset.setown(preserveTableInfo(dataset, original, loseDistribution, false));
  2402. return wrapRowOwn(dataset.getClear());
  2403. }
  2404. IHqlExpression * SpillerInfo::createSpilledWrite(IHqlExpression * transformed, bool allowCommonDataset)
  2405. {
  2406. assertex(!outputToUseForSpill);
  2407. HqlExprArray args;
  2408. if (useGraphResult())
  2409. {
  2410. assertex(options->graphIdExpr);
  2411. if (options->createSpillAsDataset && !linkedFromChild && allowCommonDataset)
  2412. {
  2413. IHqlExpression * value = LINK(transformed);
  2414. if (value->isDatarow())
  2415. value = createDatasetFromRow(value);
  2416. spilledDataset.setown(createDataset(no_commonspill, value));
  2417. args.append(*LINK(spilledDataset));
  2418. }
  2419. else
  2420. args.append(*LINK(transformed));
  2421. args.append(*LINK(options->graphIdExpr));
  2422. args.append(*createSpillName());
  2423. args.append(*createAttribute(_spill_Atom));
  2424. if (linkedFromChild)
  2425. args.append(*createAttribute(_accessedFromChild_Atom));
  2426. args.append(*createAttribute(_graphLocal_Atom));
  2427. if (spilledDataset)
  2428. return createValue(no_writespill, makeVoidType(), args);
  2429. return createValue(no_setgraphresult, makeVoidType(), args);
  2430. }
  2431. else if (useGlobalResult())
  2432. {
  2433. args.append(*LINK(transformed));
  2434. args.append(*createAttribute(sequenceAtom, getLocalSequenceNumber()));
  2435. args.append(*createAttribute(namedAtom, createSpillName()));
  2436. addSpillFlags(args, false);
  2437. args.append(*createAttribute(_graphLocal_Atom));
  2438. return createValue(no_output, makeVoidType(), args);
  2439. }
  2440. else
  2441. {
  2442. LinkedHqlExpr dataset = LINK(transformed);
  2443. if (dataset->isDictionary())
  2444. dataset.setown(createDataset(no_datasetfromdictionary, dataset.getClear()));
  2445. if (options->createSpillAsDataset && allowCommonDataset)
  2446. {
  2447. IHqlExpression * value = dataset.getClear();
  2448. if (value->isDatarow())
  2449. value = createDatasetFromRow(value);
  2450. spilledDataset.setown(createDataset(no_commonspill, value));
  2451. args.append(*LINK(spilledDataset));
  2452. }
  2453. else
  2454. args.append(*dataset.getClear());
  2455. args.append(*createSpillName());
  2456. addSpillFlags(args, false);
  2457. args.append(*createAttribute(_graphLocal_Atom));
  2458. if (spilledDataset)
  2459. return createValue(no_writespill, makeVoidType(), args);
  2460. return createValue(no_output, makeVoidType(), args);
  2461. }
  2462. }
  2463. void SpillerInfo::setPotentialSpillFile(IHqlExpression * expr)
  2464. {
  2465. if (!expr ||
  2466. ((canUseResultInChildQuery(expr) || !isUsedFromChild()) && (expr->getOperator() != no_setgraphresult)))
  2467. outputToUseForSpill = expr;
  2468. }
  2469. bool SpillerInfo::useGraphResult()
  2470. {
  2471. if (outputToUseForSpill)
  2472. return (outputToUseForSpill->getOperator() == no_setgraphresult);
  2473. return options->useGraphResult(linkedFromChild);
  2474. }
  2475. bool SpillerInfo::useGlobalResult()
  2476. {
  2477. if (outputToUseForSpill)
  2478. return isWorkunitOutput(outputToUseForSpill);
  2479. return options->useGlobalResult(linkedFromChild);
  2480. }
  2481. IHqlExpression * SpillerInfo::wrapRowOwn(IHqlExpression * expr)
  2482. {
  2483. if (!original->isDataset() && !original->isDictionary() && !expr->isDatarow())
  2484. expr = createRow(no_selectnth, expr, getSizetConstant(1));
  2485. return expr;
  2486. }
  2487. ResourcerInfo::ResourcerInfo(IHqlExpression * _original, CResourceOptions * _options) : SpillerInfo(_original, _options)
  2488. {
  2489. numUses = 0;
  2490. numExternalUses = 0;
  2491. gatheredDependencies = false;
  2492. containsActivity = false;
  2493. isActivity = false;
  2494. isRootActivity = false;
  2495. conditionSourceCount = 0;
  2496. pathToExpr = PathUnknown;
  2497. isAlreadyInScope = false;
  2498. isSpillPoint = false;
  2499. balanced = true;
  2500. currentSource = 0;
  2501. linkedFromChild = false;
  2502. forceHoist = false;
  2503. neverSplit = false;
  2504. isConditionalFilter = false;
  2505. projectResult = true; // projected must also be non empty to actually project this dataset
  2506. visited = false;
  2507. lastPass = 0;
  2508. balancedExternalUses = 0;
  2509. balancedInternalUses = 0;
  2510. balancedVisiting = false;
  2511. removedParallelPullers = false;
  2512. #ifdef TRACE_BALANCED
  2513. balanceId = 0;
  2514. #endif
  2515. }
  2516. void ResourcerInfo::setConditionSource(IHqlExpression * condition, bool isFirst)
  2517. {
  2518. if (isFirst)
  2519. {
  2520. conditionSourceCount++;
  2521. graph->hasConditionSource = true;
  2522. }
  2523. }
  2524. bool ResourcerInfo::addCondition(IHqlExpression * condition)
  2525. {
  2526. conditions.append(*LINK(condition));
  2527. return graph->addCondition(condition);
  2528. }
  2529. void ResourcerInfo::addProjected(IHqlExpression * next)
  2530. {
  2531. if (projectResult && !projected.contains(*next))
  2532. projected.append(*LINK(next));
  2533. }
  2534. void ResourcerInfo::clearProjected()
  2535. {
  2536. projectResult = false;
  2537. projected.kill();
  2538. }
  2539. bool ResourcerInfo::alwaysExpand()
  2540. {
  2541. return (original->getOperator() == no_mapto);
  2542. }
  2543. static IHqlExpression * stripTrueFalse(IHqlExpression * expr)
  2544. {
  2545. if (expr->queryName() == instanceAtom)
  2546. {
  2547. IHqlExpression * parent = expr->queryChild(2);
  2548. if (parent)
  2549. parent = stripTrueFalse(parent);
  2550. return createAttribute(instanceAtom, LINK(expr->queryChild(0)), LINK(expr->queryChild(1)), parent);
  2551. }
  2552. else
  2553. {
  2554. IHqlExpression * parent = expr->queryChild(1);
  2555. if (parent && parent->isAttribute())
  2556. parent = stripTrueFalse(parent);
  2557. return createAttribute(tempAtom, LINK(expr->queryChild(0)), parent);
  2558. }
  2559. }
  2560. unsigned ResourcerInfo::calcNumConditionalUses()
  2561. {
  2562. //for thor and hthor, where the conditional graph is cloned, the maximum number of times it can be read
  2563. //is 1 for a shared conditional graph, or once for each combination of conditions it is used as the
  2564. //input for. However if it is used more than once in a single branch of a condition it needs to be counted
  2565. //several times.
  2566. //It is always better to overestimate than underestimate. (But even better to get it right.)
  2567. HqlExprArray uniqueConditions;
  2568. UnsignedArray uniqueCounts;
  2569. ForEachItemIn(idx, conditions)
  2570. {
  2571. IHqlExpression & cur = conditions.item(idx);
  2572. OwnedHqlExpr unique = stripTrueFalse(&cur);
  2573. unsigned numOccurences = 0;
  2574. ForEachItemIn(j, conditions)
  2575. if (&conditions.item(j) == &cur)
  2576. numOccurences++;
  2577. unsigned match = uniqueConditions.find(*unique);
  2578. if (match == NotFound)
  2579. {
  2580. uniqueConditions.append(*unique.getClear());
  2581. uniqueCounts.append(numOccurences);
  2582. }
  2583. else
  2584. {
  2585. if (uniqueCounts.item(match) < numOccurences)
  2586. uniqueCounts.replace(numOccurences, match);
  2587. }
  2588. }
  2589. unsigned condUses = 0;
  2590. ForEachItemIn(k, uniqueCounts)
  2591. condUses += uniqueCounts.item(k);
  2592. return condUses;
  2593. }
  2594. // Each aggregate has form. Setresult(select(selectnth(aggregate...,1),field),name,seq)
  2595. IHqlExpression * ResourcerInfo::createAggregation(IHqlExpression * expr)
  2596. {
  2597. LinkedHqlExpr transformed = expr;
  2598. ForEachItemIn(idx, aggregates)
  2599. {
  2600. IHqlExpression & cur = aggregates.item(idx);
  2601. IHqlExpression * row2ds = cur.queryChild(0);
  2602. IHqlExpression * selectnth = row2ds->queryChild(0);
  2603. IHqlExpression * aggregate = selectnth->queryChild(0);
  2604. IHqlExpression * aggregateRecord = aggregate->queryChild(1);
  2605. assertex(aggregate->getOperator() == no_newaggregate);
  2606. HqlExprArray aggargs, setargs;
  2607. //Through aggregate has form (dataset, record, transform, list of set results);
  2608. aggargs.append(*LINK(transformed));
  2609. aggargs.append(*LINK(aggregateRecord));
  2610. IHqlExpression * mapped = replaceSelector(aggregate->queryChild(2), original, expr->queryNormalizedSelector());
  2611. aggargs.append(*mapped);
  2612. OwnedHqlExpr active = createDataset(no_anon, LINK(aggregateRecord), NULL);
  2613. OwnedHqlExpr mappedSelect = replaceSelector(cur.queryChild(1), row2ds, active);
  2614. setargs.append(*LINK(active));
  2615. setargs.append(*LINK(mappedSelect));
  2616. unwindChildren(setargs, &cur, 1);
  2617. aggargs.append(*createValue(no_extractresult, makeVoidType(), setargs));
  2618. transformed.setown(createDataset(no_throughaggregate, aggargs));
  2619. }
  2620. return transformed.getClear();
  2621. }
  2622. bool ResourcerInfo::okToSpillThrough()
  2623. {
  2624. bool isGraphResult = useGraphResult();
  2625. if (isGraphResult)
  2626. return options->allowThroughResult;
  2627. if (useGlobalResult())
  2628. return false;
  2629. return (options->allowThroughSpill && !options->createSpillAsDataset);
  2630. }
  2631. void ResourcerInfo::noteUsedFromChild(bool _forceHoist)
  2632. {
  2633. linkedFromChild = true;
  2634. outputToUseForSpill = NULL;
  2635. if (_forceHoist)
  2636. forceHoist = true;
  2637. }
  2638. unsigned ResourcerInfo::numInternalUses()
  2639. {
  2640. return numUses - numExternalUses - aggregates.ordinality();
  2641. }
  2642. void ResourcerInfo::resetBalanced()
  2643. {
  2644. // don't reset balanced since it may be set for an external result
  2645. balancedLinks.kill();
  2646. curBalanceLink = 0;
  2647. balancedVisiting = false;
  2648. balancedExternalUses = 0;
  2649. balancedInternalUses = 0;
  2650. removedParallelPullers = false;
  2651. }
  2652. bool ResourcerInfo::spillSharesSplitter()
  2653. {
  2654. if (outputToUseForSpill || useGraphResult() || useGlobalResult())
  2655. return false;
  2656. if (okToSpillThrough())
  2657. return false;
  2658. if (!isSplit() || !balanced)
  2659. return false;
  2660. return true;
  2661. }
  2662. void ResourcerInfo::setRootActivity()
  2663. {
  2664. isRootActivity = true;
  2665. }
  2666. IHqlExpression * ResourcerInfo::createSpiller(IHqlExpression * transformed, bool reuseSplitter)
  2667. {
  2668. if (outputToUseForSpill)
  2669. return LINK(transformed);
  2670. if (!okToSpillThrough())
  2671. {
  2672. OwnedHqlExpr split;
  2673. if (reuseSplitter)
  2674. {
  2675. assertex(transformed->getOperator() == no_split);
  2676. split.set(transformed);
  2677. }
  2678. else
  2679. {
  2680. if (transformed->isDataset())
  2681. split.setown(createDataset(no_split, LINK(transformed), createComma(createAttribute(balancedAtom), createUniqueId())));
  2682. else
  2683. split.setown(createRow(no_split, LINK(transformed), createComma(createAttribute(balancedAtom), createUniqueId())));
  2684. split.setown(cloneInheritedAnnotations(original, split));
  2685. }
  2686. splitterOutput.setown(createSpilledWrite(split, true));
  2687. return split.getClear();
  2688. }
  2689. HqlExprArray args;
  2690. node_operator op;
  2691. args.append(*LINK(transformed));
  2692. if (useGraphResult())
  2693. {
  2694. op = no_spillgraphresult;
  2695. args.append(*LINK(options->graphIdExpr));
  2696. args.append(*createSpillName());
  2697. args.append(*createAttribute(_spill_Atom));
  2698. }
  2699. else
  2700. {
  2701. op = no_spill;
  2702. args.append(*createSpillName());
  2703. addSpillFlags(args, false);
  2704. }
  2705. OwnedHqlExpr spill;
  2706. if (original->isDatarow())
  2707. spill.setown(createRow(op, args));
  2708. else
  2709. spill.setown(createDataset(op, args));
  2710. return cloneInheritedAnnotations(original, spill);
  2711. }
  2712. IHqlExpression * ResourcerInfo::createSplitter(IHqlExpression * transformed)
  2713. {
  2714. if (transformed->getOperator() == no_libraryscopeinstance)
  2715. return LINK(transformed);
  2716. IHqlExpression * attr = createUniqueId();
  2717. if (balanced)
  2718. attr = createComma(attr, createAttribute(balancedAtom));
  2719. OwnedHqlExpr split;
  2720. if (transformed->isDataset())
  2721. split.setown(createDataset(no_split, LINK(transformed), attr));
  2722. else
  2723. split.setown(createRow(no_split, LINK(transformed), attr));
  2724. return cloneInheritedAnnotations(original, split);
  2725. }
  2726. IHqlExpression * ResourcerInfo::createTransformedExpr(IHqlExpression * expr)
  2727. {
  2728. LinkedHqlExpr transformed = expr;
  2729. if (aggregates.ordinality())
  2730. transformed.setown(createAggregation(transformed));
  2731. if (spillSharesSplitter())
  2732. {
  2733. transformed.setown(createSplitter(transformed));
  2734. if (isExternalSpill())
  2735. transformed.setown(createSpiller(transformed, true));
  2736. }
  2737. else
  2738. {
  2739. if (isExternalSpill())
  2740. transformed.setown(createSpiller(transformed, false));
  2741. if (isSplit())
  2742. transformed.setown(createSplitter(transformed));
  2743. else if (isSpilledWrite())
  2744. transformed.setown(createSpilledWrite(transformed, true));
  2745. }
  2746. return transformed.getClear();
  2747. }
  2748. bool ResourcerInfo::expandRatherThanSpill(bool noteOtherSpills)
  2749. {
  2750. if (options->shareDontExpand)
  2751. return expandRatherThanSplit();
  2752. IHqlExpression * expr = original;
  2753. if (!options->minimiseSpills || linkedFromChild)
  2754. noteOtherSpills = false;
  2755. if (noteOtherSpills)
  2756. {
  2757. ResourcerInfo * info = queryResourceInfo(expr);
  2758. if (info && info->isSpilledWrite())
  2759. return (info->queryTransformed() == NULL);
  2760. }
  2761. bool isFiltered = false;
  2762. double filterLikelihood = 1.0;
  2763. bool isProcessed = false;
  2764. for (;;)
  2765. {
  2766. ResourcerInfo * info = queryResourceInfo(expr);
  2767. if (info && info->neverSplit)
  2768. return true;
  2769. if (info && info->forceHoist)
  2770. return false;
  2771. if (!canDuplicateActivity(expr))
  2772. return false;
  2773. node_operator op = expr->getOperator();
  2774. switch (op)
  2775. {
  2776. case no_table:
  2777. {
  2778. //This is only executed for hthor/thor. Roxie has used expandRatherThanSplit().
  2779. //We need to balance the saving from reading reduced data in the other branches with the cost of
  2780. //writing the spill file to disk.
  2781. if (isFiltered && numExternalUses >= options->filteredSpillThreshold)
  2782. return false;
  2783. IHqlExpression * mode = expr->queryChild(2);
  2784. switch (mode->getOperator())
  2785. {
  2786. case no_thor: case no_flat:
  2787. //MORE: The following is possibly better - but roxie should be able to read from non spill data files in child queries fine
  2788. //if ((options->targetClusterType == RoxieCluster) && linkedFromChild)) return false;
  2789. break;
  2790. default:
  2791. return false;
  2792. }
  2793. if (isFiltered)
  2794. {
  2795. if (isKnownLikelihood(filterLikelihood))
  2796. {
  2797. // Calculation of when to spill/not spill:
  2798. // Where :
  2799. // r = cost(read), w = cost(write), f = cost(filter),
  2800. // n = number uses, p = probability of filter(likelihood)
  2801. //
  2802. // Cost of using spill files:
  2803. // = r + f + pw + npr
  2804. // = r + rp + npr (assuming w~=r and f~=0)
  2805. //
  2806. // Cost of not spilling (expanding)
  2807. // = n(r+f)
  2808. // = nr (assuming f~=0)
  2809. //
  2810. // Spill when "cost of using spill files" < "cost of not spilling (expanding)"
  2811. // r + rp + npr < nr
  2812. // Which simplifies to :
  2813. // p < (n - 1) / (n +1)
  2814. if (filterLikelihood < (double)(numUses-1)/(numUses+1))
  2815. return false;
  2816. return true;
  2817. }
  2818. }
  2819. return true;
  2820. }
  2821. case no_stepped:
  2822. return true;
  2823. case no_inlinetable:
  2824. {
  2825. IHqlExpression * transforms = expr->queryChild(0);
  2826. //The inline table code means this should generate smaller code, and more efficient
  2827. if (!isFiltered && !isProcessed && transforms->isConstant())
  2828. return true;
  2829. if (transforms->numChildren() > MAX_INLINE_COMMON_COUNT)
  2830. return false;
  2831. return true;
  2832. }
  2833. case no_getresult:
  2834. case no_temptable:
  2835. case no_rows:
  2836. case no_xmlproject:
  2837. case no_workunit_dataset:
  2838. return !isProcessed && !isFiltered;
  2839. case no_getgraphresult:
  2840. return !expr->hasAttribute(_streaming_Atom); // we must not duplicate streaming inputs!
  2841. case no_keyindex:
  2842. case no_newkeyindex:
  2843. if (!isFiltered)
  2844. return true;
  2845. return options->cloneFilteredIndex;
  2846. case no_datasetfromrow:
  2847. if (getNumActivityArguments(expr) == 0)
  2848. return true;
  2849. return false;
  2850. case no_fail:
  2851. case no_null:
  2852. return !expr->isAction();
  2853. case no_assertsorted:
  2854. case no_sorted:
  2855. case no_grouped:
  2856. case no_distributed:
  2857. case no_preservemeta:
  2858. case no_unordered:
  2859. case no_nofold:
  2860. case no_nohoist:
  2861. case no_nocombine:
  2862. case no_section:
  2863. case no_sectioninput:
  2864. case no_dataset_alias:
  2865. expr = expr->queryChild(0);
  2866. break;
  2867. case no_newusertable:
  2868. case no_limit:
  2869. case no_keyedlimit:
  2870. expr = expr->queryChild(0);
  2871. isProcessed = true;
  2872. break;
  2873. case no_hqlproject:
  2874. if (expr->hasAttribute(_countProject_Atom) || expr->hasAttribute(prefetchAtom))
  2875. return false;
  2876. expr = expr->queryChild(0);
  2877. isProcessed = true;
  2878. break;
  2879. //MORE: Not so sure about all the following, include them so behaviour doesn't change
  2880. case no_compound_diskread:
  2881. case no_compound_disknormalize:
  2882. case no_compound_diskaggregate:
  2883. case no_compound_diskcount:
  2884. case no_compound_diskgroupaggregate:
  2885. case no_compound_indexread:
  2886. case no_compound_indexnormalize:
  2887. case no_compound_indexaggregate:
  2888. case no_compound_indexcount:
  2889. case no_compound_indexgroupaggregate:
  2890. case no_compound_childread:
  2891. case no_compound_childnormalize:
  2892. case no_compound_childaggregate:
  2893. case no_compound_childcount:
  2894. case no_compound_childgroupaggregate:
  2895. case no_compound_selectnew:
  2896. case no_compound_inline:
  2897. case no_datasetfromdictionary:
  2898. expr = expr->queryChild(0);
  2899. break;
  2900. case no_filter:
  2901. {
  2902. if (isKnownLikelihood(filterLikelihood))
  2903. {
  2904. double likelihood = queryActivityLikelihood(expr);
  2905. if (isKnownLikelihood(likelihood))
  2906. // Combine the likelihood of the 2 filter conditions
  2907. // N.B. this only works if the filter probability are independent
  2908. filterLikelihood *= likelihood;
  2909. else
  2910. // One of the filter probability is unknown, so the overall probability is unknown
  2911. setUnknownLikelihood(filterLikelihood);
  2912. }
  2913. isFiltered = true;
  2914. expr = expr->queryChild(0);
  2915. break;
  2916. }
  2917. case no_select:
  2918. {
  2919. if (options->targetClusterType == RoxieCluster)
  2920. return false;
  2921. if (!isNewSelector(expr))
  2922. return true;
  2923. expr = expr->queryChild(0);
  2924. break;
  2925. }
  2926. default:
  2927. return false;
  2928. }
  2929. //The following reduces the number of spills by taking into account other spills.
  2930. if (noteOtherSpills)
  2931. {
  2932. ResourcerInfo * info = queryResourceInfo(expr);
  2933. if (info)
  2934. {
  2935. if (info->isSpilledWrite())
  2936. return (info->queryTransformed() == NULL);
  2937. if (info->numExternalUses)
  2938. {
  2939. if (isFiltered)
  2940. {
  2941. if (numExternalUses >= options->filteredSpillThreshold)
  2942. return false;
  2943. if (isKnownLikelihood(filterLikelihood) &&
  2944. (filterLikelihood < (double)(numUses-1)/(numUses+1)))
  2945. return false;
  2946. }
  2947. return true;
  2948. }
  2949. }
  2950. }
  2951. }
  2952. }
  2953. bool ResourcerInfo::expandRatherThanSplit()
  2954. {
  2955. //MORE: This doesn't really work - should do indexMatching first.
  2956. //should only expand if one side that uses this is also filtered
  2957. IHqlExpression * expr = original;
  2958. for (;;)
  2959. {
  2960. ResourcerInfo * info = queryResourceInfo(expr);
  2961. if (info && info->neverSplit)
  2962. return true;
  2963. switch (expr->getOperator())
  2964. {
  2965. case no_keyindex:
  2966. case no_newkeyindex:
  2967. case no_rowset:
  2968. case no_getgraphloopresultset:
  2969. return true;
  2970. case no_null:
  2971. return !expr->isAction();
  2972. case no_inlinetable:
  2973. if (options->expandSingleConstRow && hasSingleRow(expr))
  2974. {
  2975. IHqlExpression * values = expr->queryChild(0);
  2976. if (values->queryChild(0)->isConstant())
  2977. return true;
  2978. }
  2979. return false;
  2980. case no_stepped:
  2981. case no_rowsetrange:
  2982. case no_rowsetindex:
  2983. return true;
  2984. case no_sorted:
  2985. case no_grouped:
  2986. case no_distributed:
  2987. case no_preservemeta:
  2988. case no_unordered:
  2989. case no_compound_diskread:
  2990. case no_compound_disknormalize:
  2991. case no_compound_diskaggregate:
  2992. case no_compound_diskcount:
  2993. case no_compound_diskgroupaggregate:
  2994. case no_compound_indexread:
  2995. case no_compound_indexnormalize:
  2996. case no_compound_indexaggregate:
  2997. case no_compound_indexcount:
  2998. case no_compound_indexgroupaggregate:
  2999. case no_compound_childread:
  3000. case no_compound_childnormalize:
  3001. case no_compound_childaggregate:
  3002. case no_compound_childcount:
  3003. case no_compound_childgroupaggregate:
  3004. case no_compound_selectnew:
  3005. case no_compound_inline:
  3006. case no_section:
  3007. case no_sectioninput:
  3008. case no_dataset_alias:
  3009. break;
  3010. case no_select:
  3011. if (options->targetClusterType == RoxieCluster)
  3012. return false;
  3013. if (!isNewSelector(expr))
  3014. {
  3015. if (!hasLinkCountedModifier(expr))
  3016. return false;
  3017. return true;
  3018. }
  3019. break;
  3020. case no_rows:
  3021. //If executing in a child query then you'll have less thread contention if the iterator is duplicated
  3022. //So should probably uncomment the following.
  3023. //return true;
  3024. return false;
  3025. default:
  3026. return false;
  3027. }
  3028. expr = expr->queryChild(0);
  3029. }
  3030. }
  3031. bool ResourcerInfo::hasDependency() const
  3032. {
  3033. if (graph)
  3034. {
  3035. GraphLinkArray & graphLinks = graph->dependsOn;
  3036. ForEachItemIn(i, graphLinks)
  3037. {
  3038. ResourceGraphLink & link = graphLinks.item(i);
  3039. if (link.sinkNode == original)
  3040. return true;
  3041. }
  3042. }
  3043. else
  3044. {
  3045. //Really should save some information away... Err on side of caution
  3046. return true;
  3047. }
  3048. return false;
  3049. }
  3050. bool neverCommonUp(IHqlExpression * expr)
  3051. {
  3052. for (;;)
  3053. {
  3054. node_operator op = expr->getOperator();
  3055. switch (op)
  3056. {
  3057. case no_keyindex:
  3058. case no_newkeyindex:
  3059. return true;
  3060. case no_filter:
  3061. expr = expr->queryChild(0);
  3062. break;
  3063. default:
  3064. return false;
  3065. }
  3066. }
  3067. }
  3068. bool ResourcerInfo::neverCommonUp()
  3069. {
  3070. return ::neverCommonUp(original);
  3071. }
  3072. bool ResourcerInfo::isExternalSpill()
  3073. {
  3074. if (expandRatherThanSpill(true) || (numInternalUses() == 0))
  3075. return false;
  3076. return (numExternalUses != 0);
  3077. }
  3078. bool ResourcerInfo::isSplit()
  3079. {
  3080. return numSplitPaths() > 1;
  3081. }
  3082. unsigned ResourcerInfo::numSplitPaths()
  3083. {
  3084. unsigned internal = numInternalUses();
  3085. if ((internal == 0) || !options->allowSplitBetweenSubGraphs)
  3086. return internal;
  3087. //MORE
  3088. return internal;
  3089. }
  3090. bool ResourcerInfo::isSpilledWrite()
  3091. {
  3092. if (numInternalUses() == 0)
  3093. return true;
  3094. return false;
  3095. }
  3096. //---------------------------------------------------------------------------
  3097. EclResourcer::EclResourcer(IErrorReceiver & _errors, IConstWorkUnit * _wu, const HqlCppOptions & _translatorOptions, CResourceOptions & _options)
  3098. : options(_options)
  3099. {
  3100. wu.set(_wu);
  3101. errors = &_errors;
  3102. lockTransformMutex();
  3103. targetClusterType = options.targetClusterType;
  3104. insideNeverSplit = false;
  3105. insideSteppedNeverSplit = false;
  3106. sequential = false;
  3107. thisPass = 1;
  3108. unsigned totalMemory = _translatorOptions.resourceMaxMemory ? _translatorOptions.resourceMaxMemory : DEFAULT_TOTAL_MEMORY;
  3109. unsigned maxSockets = _translatorOptions.resourceMaxSockets ? _translatorOptions.resourceMaxSockets : DEFAULT_MAX_SOCKETS;
  3110. unsigned maxActivities = _translatorOptions.resourceMaxActivities ? _translatorOptions.resourceMaxActivities : DEFAULT_MAX_ACTIVITIES;
  3111. unsigned maxHeavy = _translatorOptions.resourceMaxHeavy;
  3112. unsigned maxDistribute = _translatorOptions.resourceMaxDistribute;
  3113. resourceLimit = new CResources(0);
  3114. resourceLimit->set(RESactivities, maxActivities);
  3115. switch (targetClusterType)
  3116. {
  3117. case ThorLCRCluster:
  3118. resourceLimit->set(RESheavy, maxHeavy).set(REShashdist, maxDistribute);
  3119. resourceLimit->set(RESmastersocket, maxSockets).set(RESslavememory,totalMemory);
  3120. resourceLimit->set(RESslavesocket, maxSockets).set(RESmastermemory, totalMemory);
  3121. break;
  3122. default:
  3123. resourceLimit->set(RESheavy, 0xffffffff).set(REShashdist, 0xffffffff);
  3124. resourceLimit->set(RESmastersocket, 0xffffffff).set(RESslavememory, 0xffffffff);
  3125. resourceLimit->set(RESslavesocket, 0xffffffff).set(RESmastermemory, 0xffffffff);
  3126. break;
  3127. }
  3128. if (_translatorOptions.unlimitedResources || options.unlimitedResources)
  3129. {
  3130. resourceLimit->set(RESheavy, 0xffffffff).set(REShashdist, 0xffffffff);
  3131. resourceLimit->set(RESmastersocket, 0xffffffff).set(RESslavememory,0xffffffff);
  3132. resourceLimit->set(RESslavesocket, 0xffffffff).set(RESmastermemory,0xffffffff);
  3133. }
  3134. spilled = false;
  3135. }
  3136. EclResourcer::~EclResourcer()
  3137. {
  3138. delete resourceLimit;
  3139. unlockTransformMutex();
  3140. }
  3141. void EclResourcer::changeGraph(IHqlExpression * expr, ResourceGraphInfo * newGraph)
  3142. {
  3143. ResourcerInfo * info = queryResourceInfo(expr);
  3144. info->graph.set(newGraph);
  3145. ForEachItemInRev(idx, links)
  3146. {
  3147. ResourceGraphLink & cur = links.item(idx);
  3148. if (cur.sourceNode == expr)
  3149. cur.changeSourceGraph(newGraph);
  3150. else if (cur.sinkNode == expr)
  3151. cur.changeSinkGraph(newGraph);
  3152. assertex(cur.sinkGraph != cur.sourceGraph);
  3153. }
  3154. }
  3155. ResourceGraphInfo * EclResourcer::createGraph()
  3156. {
  3157. ResourceGraphInfo * graph = new ResourceGraphInfo(&options);
  3158. graphs.append(*LINK(graph));
  3159. //PrintLog("Create graph %p", graph);
  3160. return graph;
  3161. }
  3162. void EclResourcer::connectGraphs(ResourceGraphInfo * sourceGraph, IHqlExpression * sourceNode, ResourceGraphInfo * sinkGraph, IHqlExpression * sinkNode, LinkKind kind)
  3163. {
  3164. ResourceGraphLink * link = new ResourceGraphLink(sourceGraph, sourceNode, sinkGraph, sinkNode, kind);
  3165. links.append(*link);
  3166. if (sourceGraph)
  3167. sourceGraph->sinks.append(*link);
  3168. if (sinkGraph)
  3169. sinkGraph->sources.append(*link);
  3170. }
  3171. ResourcerInfo * EclResourcer::queryCreateResourceInfo(IHqlExpression * expr)
  3172. {
  3173. IHqlExpression * body = expr->queryBody();
  3174. ResourcerInfo * info = (ResourcerInfo *)body->queryTransformExtra();
  3175. if (!info)
  3176. {
  3177. info = new ResourcerInfo(expr, &options);
  3178. body->setTransformExtraOwned(info);
  3179. }
  3180. return info;
  3181. }
  3182. void EclResourcer::replaceGraphReferences(IHqlExpression * expr, ResourceGraphInfo * oldGraph, ResourceGraphInfo * newGraph)
  3183. {
  3184. ResourcerInfo * info = queryResourceInfo(expr);
  3185. if (!info || !info->containsActivity)
  3186. return;
  3187. if (info->isActivity && info->graph != oldGraph)
  3188. return;
  3189. info->graph.set(newGraph);
  3190. unsigned first = getFirstActivityArgument(expr);
  3191. unsigned last = first + getNumActivityArguments(expr);
  3192. for (unsigned idx=first; idx < last; idx++)
  3193. replaceGraphReferences(expr->queryChild(idx), oldGraph, newGraph);
  3194. }
  3195. void EclResourcer::removeLink(ResourceGraphLink & link, bool keepExternalUses)
  3196. {
  3197. ResourcerInfo * info = (ResourcerInfo *)queryResourceInfo(link.sourceNode);
  3198. assertex(info && info->numExternalUses > 0);
  3199. if (!keepExternalUses)
  3200. info->numExternalUses--;
  3201. if (link.sinkGraph)
  3202. link.sinkGraph->sources.zap(link);
  3203. link.sourceGraph->sinks.zap(link);
  3204. links.zap(link);
  3205. }
  3206. void EclResourcer::replaceGraphReferences(ResourceGraphInfo * oldGraph, ResourceGraphInfo * newGraph)
  3207. {
  3208. ForEachItemIn(idx1, oldGraph->sinks)
  3209. {
  3210. ResourceGraphLink & sink = oldGraph->sinks.item(idx1);
  3211. replaceGraphReferences(sink.sourceNode, oldGraph, newGraph);
  3212. }
  3213. ForEachItemInRev(idx2, links)
  3214. {
  3215. ResourceGraphLink & cur = links.item(idx2);
  3216. if (cur.sourceGraph == oldGraph)
  3217. {
  3218. if (cur.sinkGraph == newGraph)
  3219. removeLink(cur, false);
  3220. else
  3221. cur.changeSourceGraph(newGraph);
  3222. }
  3223. else if (cur.sinkGraph == oldGraph)
  3224. {
  3225. if (cur.sourceGraph == newGraph)
  3226. removeLink(cur, false);
  3227. else
  3228. cur.changeSinkGraph(newGraph);
  3229. }
  3230. }
  3231. }
  3232. //------------------------------------------------------------------------------------------
  3233. // PASS1: Gather information about splitter locations..
  3234. void EclResourcer::tagActiveCursors(HqlExprCopyArray * activeRows)
  3235. {
  3236. if (!activeRows)
  3237. return;
  3238. ForEachItemIn(i, *activeRows)
  3239. {
  3240. IHqlExpression & cur = activeRows->item(i);
  3241. activeSelectors.append(cur);
  3242. queryCreateResourceInfo(&cur)->isAlreadyInScope = true;
  3243. }
  3244. }
  3245. static bool isPotentialCompoundSteppedIndexRead(IHqlExpression * expr)
  3246. {
  3247. for (;;)
  3248. {
  3249. switch (expr->getOperator())
  3250. {
  3251. case no_compound_diskread:
  3252. case no_compound_disknormalize:
  3253. case no_compound_diskaggregate:
  3254. case no_compound_diskcount:
  3255. case no_compound_diskgroupaggregate:
  3256. case no_compound_childread:
  3257. case no_compound_childnormalize:
  3258. case no_compound_childaggregate:
  3259. case no_compound_childcount:
  3260. case no_compound_childgroupaggregate:
  3261. case no_compound_selectnew:
  3262. case no_compound_inline:
  3263. return false;
  3264. case no_compound_indexread:
  3265. case no_newkeyindex:
  3266. return true;
  3267. case no_getgraphloopresult:
  3268. return true; // Could be an index read in another graph iteration, so don't combine
  3269. case no_keyedlimit:
  3270. case no_preload:
  3271. case no_filter:
  3272. case no_hqlproject:
  3273. case no_newusertable:
  3274. case no_limit:
  3275. case no_sorted:
  3276. case no_preservemeta:
  3277. case no_distributed:
  3278. case no_unordered:
  3279. case no_grouped:
  3280. case no_stepped:
  3281. case no_section:
  3282. case no_sectioninput:
  3283. case no_dataset_alias:
  3284. break;
  3285. case no_choosen:
  3286. {
  3287. IHqlExpression * arg2 = expr->queryChild(2);
  3288. if (arg2 && !canDuplicateExpr(arg2))
  3289. return false;
  3290. break;
  3291. }
  3292. default:
  3293. return false;
  3294. }
  3295. expr = expr->queryChild(0);
  3296. }
  3297. }
  3298. bool EclResourcer::findSplitPoints(IHqlExpression * expr, bool isProjected)
  3299. {
  3300. ResourcerInfo * info = queryResourceInfo(expr);
  3301. if (info && info->visited)
  3302. {
  3303. if (!isProjected)
  3304. info->clearProjected();
  3305. if (info->isAlreadyInScope || info->isActivity || !info->containsActivity)
  3306. return info->containsActivity;
  3307. }
  3308. else
  3309. {
  3310. info = queryCreateResourceInfo(expr);
  3311. info->visited = true;
  3312. if (!isProjected)
  3313. info->clearProjected();
  3314. bool isActivity = true;
  3315. switch (expr->getOperator())
  3316. {
  3317. case no_select:
  3318. //either a select from a setresult or use of a child-dataset
  3319. if (isNewSelector(expr))
  3320. {
  3321. info->containsActivity = findSplitPoints(expr->queryChild(0), false);
  3322. assertex(queryResourceInfo(expr->queryChild(0))->isActivity);
  3323. }
  3324. if (expr->isDataset() || expr->isDatarow())
  3325. {
  3326. info->isActivity = true;
  3327. info->containsActivity = true;
  3328. }
  3329. return info->containsActivity;
  3330. case no_mapto:
  3331. throwUnexpected();
  3332. case no_activerow:
  3333. info->isActivity = true;
  3334. info->containsActivity = false;
  3335. return false;
  3336. case no_rowset: // don't resource this as an activity
  3337. isActivity = false;
  3338. break;
  3339. case no_attr:
  3340. case no_attr_expr:
  3341. case no_attr_link:
  3342. case no_getgraphloopresultset:
  3343. info->isActivity = false;
  3344. info->containsActivity = false;
  3345. return false;
  3346. case no_datasetlist:
  3347. isActivity = false;
  3348. break;
  3349. case no_rowsetrange:
  3350. {
  3351. //Don't resource this as an activity if it is a function of the input graph rows,
  3352. //however we do want to if it is coming from a dataset list.
  3353. IHqlExpression * ds = expr->queryChild(0);
  3354. //MORE: Should walk further down the tree to allow for nested rowsetranges etc.
  3355. if (ds->getOperator() == no_rowset || ds->getOperator() == no_getgraphloopresultset)
  3356. {
  3357. info->isActivity = false;
  3358. info->containsActivity = false;
  3359. return false;
  3360. }
  3361. isActivity = false;
  3362. break;
  3363. }
  3364. }
  3365. ITypeInfo * type = expr->queryType();
  3366. if (!type || type->isScalar())
  3367. return false;
  3368. info->isActivity = isActivity;
  3369. info->containsActivity = true;
  3370. }
  3371. unsigned first = getFirstActivityArgument(expr);
  3372. unsigned last = first + getNumActivityArguments(expr);
  3373. for (unsigned idx=first; idx < last; idx++)
  3374. findSplitPoints(expr->queryChild(idx), false);
  3375. return info->containsActivity;
  3376. }
  3377. void EclResourcer::findSplitPoints(HqlExprArray & exprs)
  3378. {
  3379. //Finding items that should be hoisted from child queries, and evaluated at this level is tricky:
  3380. //* After hoisting a child expression, that may in turn contain other child expressions.
  3381. //* Some child expressions are only worth hoisting if they are a simple function of something
  3382. // that will be evaluated here.
  3383. //* If we're creating a single project for each x[1] then that needs to be done after all the
  3384. // expressions to hoist have been found.
  3385. //* The usage counts have to correct - including
  3386. //* The select child dependents for a ds[n] need to be inherited by the project(ds)[n]
  3387. //=>
  3388. //First walk the expression tree gathering all the expressions that will be used.
  3389. //Project the expressions that need projecting.
  3390. //Finally walk the tree again, this time calculating the correct usage counts.
  3391. //
  3392. //On reflection the hoisting and projecting could be implemented as a completely separate transformation.
  3393. //However, it shares a reasonable amount of the logic with the rest of the resourcing, so keep together
  3394. //for the moment.
  3395. ForEachItemIn(idx, exprs)
  3396. findSplitPoints(&exprs.item(idx), false);
  3397. deriveUsageCounts(exprs);
  3398. }
  3399. void EclResourcer::deriveUsageCounts(IHqlExpression * expr)
  3400. {
  3401. ResourcerInfo * info = queryResourceInfo(expr);
  3402. if (!info || !(info->containsActivity || info->isActivity))
  3403. return;
  3404. bool savedInsideNeverSplit = insideNeverSplit;
  3405. bool savedInsideSteppedNeverSplit = insideSteppedNeverSplit;
  3406. if (insideSteppedNeverSplit && info)
  3407. {
  3408. if (!isPotentialCompoundSteppedIndexRead(expr) && (expr->getOperator() != no_datasetlist) && expr->getOperator() != no_inlinetable)
  3409. insideSteppedNeverSplit = false;
  3410. }
  3411. if (info->numUses)
  3412. {
  3413. if (insideNeverSplit || insideSteppedNeverSplit)
  3414. info->neverSplit = true;
  3415. if (info->isAlreadyInScope || info->isActivity || !info->containsActivity)
  3416. {
  3417. info->numUses++;
  3418. return;
  3419. }
  3420. }
  3421. else
  3422. {
  3423. info->numUses++;
  3424. if (insideNeverSplit || insideSteppedNeverSplit)
  3425. info->neverSplit = true;
  3426. switch (expr->getOperator())
  3427. {
  3428. case no_select:
  3429. //either a select from a setresult or use of a child-dataset
  3430. if (isNewSelector(expr))
  3431. deriveUsageCounts(expr->queryChild(0));
  3432. return;
  3433. case no_activerow:
  3434. case no_attr:
  3435. case no_attr_expr:
  3436. case no_attr_link:
  3437. case no_rowset: // don't resource this as an activity
  3438. case no_getgraphloopresultset:
  3439. return;
  3440. case no_keyedlimit:
  3441. if (options.preventKeyedSplit)
  3442. insideNeverSplit = true;
  3443. break;
  3444. case no_filter:
  3445. if (options.preventKeyedSplit && filterIsKeyed(expr))
  3446. insideNeverSplit = true;
  3447. else
  3448. {
  3449. LinkedHqlExpr invariant;
  3450. OwnedHqlExpr cond = extractFilterConditions(invariant, expr, expr->queryNormalizedSelector(), false, false);
  3451. if (invariant)
  3452. info->isConditionalFilter = true;
  3453. }
  3454. break;
  3455. case no_hqlproject:
  3456. case no_newusertable:
  3457. case no_aggregate:
  3458. case no_newaggregate:
  3459. if (options.preventKeyedSplit && expr->hasAttribute(keyedAtom))
  3460. insideNeverSplit = true;
  3461. break;
  3462. case no_stepped:
  3463. case no_mergejoin:
  3464. case no_nwayjoin:
  3465. if (options.preventSteppedSplit)
  3466. insideSteppedNeverSplit = true;
  3467. break;
  3468. case no_compound_diskread:
  3469. case no_compound_disknormalize:
  3470. case no_compound_diskaggregate:
  3471. case no_compound_diskcount:
  3472. case no_compound_diskgroupaggregate:
  3473. case no_compound_indexread:
  3474. case no_compound_indexnormalize:
  3475. case no_compound_indexaggregate:
  3476. case no_compound_indexcount:
  3477. case no_compound_indexgroupaggregate:
  3478. case no_compound_childread:
  3479. case no_compound_childnormalize:
  3480. case no_compound_childaggregate:
  3481. case no_compound_childcount:
  3482. case no_compound_childgroupaggregate:
  3483. case no_compound_selectnew:
  3484. case no_compound_inline:
  3485. insideNeverSplit = true;
  3486. break;
  3487. }
  3488. ITypeInfo * type = expr->queryType();
  3489. if (!type || type->isScalar())
  3490. {
  3491. insideNeverSplit = savedInsideNeverSplit;
  3492. insideSteppedNeverSplit = savedInsideSteppedNeverSplit;
  3493. return;
  3494. }
  3495. }
  3496. unsigned first = getFirstActivityArgument(expr);
  3497. unsigned last = first + getNumActivityArguments(expr);
  3498. for (unsigned idx=first; idx < last; idx++)
  3499. deriveUsageCounts(expr->queryChild(idx));
  3500. insideNeverSplit = savedInsideNeverSplit;
  3501. insideSteppedNeverSplit = savedInsideSteppedNeverSplit;
  3502. }
  3503. void EclResourcer::deriveUsageCounts(const HqlExprArray & exprs)
  3504. {
  3505. ForEachItemIn(idx2, exprs)
  3506. deriveUsageCounts(&exprs.item(idx2));
  3507. }
  3508. //------------------------------------------------------------------------------------------
  3509. // PASS2: Actually create the subgraphs based on splitters.
  3510. void EclResourcer::createInitialGraph(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerGraph, LinkKind linkKind, bool forceNewGraph)
  3511. {
  3512. ResourcerInfo * info = queryResourceInfo(expr);
  3513. if (!info || !info->containsActivity)
  3514. return;
  3515. LinkKind childLinkKind = UnconditionalLink;
  3516. Linked<ResourceGraphInfo> thisGraph = ownerGraph;
  3517. bool forceNewChildGraph = false;
  3518. if (info->isActivity)
  3519. {
  3520. //Need to ensure no_libraryselects are not separated from the no_libraryscopeinstance
  3521. //so ensure they are placed in the same graph.
  3522. node_operator op = expr->getOperator();
  3523. if (op == no_libraryselect)
  3524. {
  3525. ResourcerInfo * moduleInfo = queryResourceInfo(expr->queryChild(1));
  3526. if (!info->graph && moduleInfo->graph)
  3527. info->graph.set(moduleInfo->graph);
  3528. }
  3529. if (info->graph)
  3530. {
  3531. connectGraphs(info->graph, expr, ownerGraph, owner, linkKind);
  3532. info->numExternalUses++;
  3533. return;
  3534. }
  3535. unsigned numUses = info->numUses;
  3536. switch (op)
  3537. {
  3538. case no_libraryscopeinstance:
  3539. numUses = 1;
  3540. break;
  3541. case no_libraryselect:
  3542. forceNewGraph = true;
  3543. break;
  3544. }
  3545. if (!ownerGraph || numUses > 1 || (linkKind != UnconditionalLink) || forceNewGraph)
  3546. {
  3547. thisGraph.setown(createGraph());
  3548. connectGraphs(thisGraph, expr, ownerGraph, owner, linkKind);
  3549. info->numExternalUses++;
  3550. if (!ownerGraph && sequential)
  3551. {
  3552. if (!expr->hasAttribute(_graphLocal_Atom))
  3553. thisGraph->hasSequentialSource = true;
  3554. }
  3555. }
  3556. info->graph.set(thisGraph);
  3557. if (info->isRootActivity)
  3558. thisGraph->hasRootActivity = true;
  3559. switch (expr->getOperator())
  3560. {
  3561. case no_compound:
  3562. //NB: First argument is forced into a separate graph
  3563. if (options.convertCompoundToExecuteWhen)
  3564. {
  3565. createInitialGraph(expr->queryChild(0), expr, thisGraph, UnconditionalLink, true);
  3566. }
  3567. else
  3568. {
  3569. HqlExprArray actions;
  3570. expr->queryChild(0)->unwindList(actions, no_actionlist);
  3571. ForEachItemIn(i, actions)
  3572. createInitialGraph(&actions.item(i), expr, NULL, UnconditionalLink, true);
  3573. }
  3574. createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, false);
  3575. return;
  3576. case no_executewhen:
  3577. {
  3578. bool newGraph = expr->isAction() && (options.targetClusterType == HThorCluster);
  3579. createInitialGraph(expr->queryChild(0), expr, thisGraph, UnconditionalLink, newGraph);
  3580. createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, true);
  3581. return;
  3582. }
  3583. case no_keyindex:
  3584. case no_newkeyindex:
  3585. return;
  3586. case no_parallel:
  3587. case no_actionlist:
  3588. {
  3589. ForEachChild(i, expr)
  3590. createInitialGraph(expr->queryChild(i), expr, thisGraph, UnconditionalLink, options.actionLinkInNewGraph);
  3591. return;
  3592. }
  3593. case no_if:
  3594. case no_choose:
  3595. case no_chooseds:
  3596. //conditional nodes, the child branches are marked as conditional
  3597. childLinkKind = UnconditionalLink;
  3598. thisGraph->mergedConditionSource = true;
  3599. if (!options.noConditionalLinks || (expr->isAction() && options.actionLinkInNewGraph))
  3600. forceNewChildGraph = true;
  3601. break;
  3602. case no_filter:
  3603. if (info->isConditionalFilter)
  3604. {
  3605. thisGraph->mergedConditionSource = true;
  3606. if (!options.noConditionalLinks)
  3607. forceNewChildGraph = true;
  3608. }
  3609. break;
  3610. // case no_nonempty:
  3611. case no_sequential:
  3612. case no_orderedactionlist:
  3613. {
  3614. unsigned first = getFirstActivityArgument(expr);
  3615. unsigned last = first + getNumActivityArguments(expr);
  3616. createInitialGraph(expr->queryChild(first), expr, thisGraph, SequenceLink, options.actionLinkInNewGraph);
  3617. for (unsigned idx = first+1; idx < last; idx++)
  3618. createInitialGraph(expr->queryChild(idx), expr, thisGraph, SequenceLink, options.actionLinkInNewGraph);
  3619. return;
  3620. }
  3621. case no_case:
  3622. case no_map:
  3623. {
  3624. throwUnexpected();
  3625. }
  3626. case no_setgraphresult:
  3627. {
  3628. IHqlExpression * dataset = expr->queryChild(0);
  3629. ResourcerInfo * childInfo = queryResourceInfo(dataset);
  3630. childInfo->setPotentialSpillFile(expr);
  3631. break;
  3632. }
  3633. case no_output:
  3634. {
  3635. //Tag the inputs to an output statement, so that if a spill was going to occur we read
  3636. //from the output file instead of spilling.
  3637. //Needs the grouping to be saved in the same way. Could cope with compressed matching, but not
  3638. //much point - since fairly unlikely.
  3639. IHqlExpression * filename = queryRealChild(expr, 1);
  3640. IHqlExpression * dataset = expr->queryChild(0);
  3641. if (filename)
  3642. {
  3643. if ((filename->getOperator() == no_constant) && !expr->hasAttribute(xmlAtom) && !expr->hasAttribute(jsonAtom) && !expr->hasAttribute(csvAtom))
  3644. {
  3645. if (expr->hasAttribute(groupedAtom) == isGrouped(dataset))
  3646. {
  3647. ResourcerInfo * childInfo = queryResourceInfo(dataset);
  3648. if (!isUpdatedConditionally(expr))
  3649. childInfo->setPotentialSpillFile(expr);
  3650. }
  3651. }
  3652. }
  3653. else
  3654. {
  3655. if (matchesConstantValue(queryAttributeChild(expr, sequenceAtom, 0), ResultSequenceInternal))
  3656. {
  3657. //Do not read from a spilled workunit result in thor unless it is a child query
  3658. //Otherwise the distribution is lost causing inefficient code.
  3659. if (options.alwaysReuseGlobalSpills || (options.targetClusterType != ThorLCRCluster) || options.isChildQuery)
  3660. {
  3661. IHqlExpression * dataset = expr->queryChild(0);
  3662. ResourcerInfo * childInfo = queryResourceInfo(dataset);
  3663. childInfo->setPotentialSpillFile(expr);
  3664. }
  3665. }
  3666. }
  3667. if (isUpdatedConditionally(expr))
  3668. thisGraph->mergedConditionSource = true;
  3669. break;
  3670. }
  3671. case no_buildindex:
  3672. if (isUpdatedConditionally(expr))
  3673. thisGraph->mergedConditionSource = true;
  3674. break;
  3675. }
  3676. }
  3677. unsigned first = getFirstActivityArgument(expr);
  3678. unsigned last = first + getNumActivityArguments(expr);
  3679. for (unsigned idx = first; idx < last; idx++)
  3680. createInitialGraph(expr->queryChild(idx), expr, thisGraph, childLinkKind, forceNewChildGraph);
  3681. }
  3682. void EclResourcer::createInitialGraphs(HqlExprArray & exprs)
  3683. {
  3684. ForEachItemIn(idx, exprs)
  3685. {
  3686. IHqlExpression * rootExpr = &exprs.item(idx);
  3687. ResourcerInfo * info = queryResourceInfo(rootExpr);
  3688. assertex(info->isActivity);
  3689. if (!rootExpr->hasAttribute(_lazy_Atom) && !rootExpr->hasAttribute(_graphLocal_Atom))
  3690. info->setRootActivity();
  3691. createInitialGraph(rootExpr, NULL, NULL, UnconditionalLink, false);
  3692. }
  3693. }
  3694. void EclResourcer::createInitialRemoteGraph(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerGraph, bool forceNewGraph)
  3695. {
  3696. ResourcerInfo * info = queryResourceInfo(expr);
  3697. if (!info || !info->containsActivity)
  3698. return;
  3699. Linked<ResourceGraphInfo> thisGraph = ownerGraph;
  3700. if (info->isActivity)
  3701. {
  3702. if (info->graph)
  3703. {
  3704. connectGraphs(info->graph, expr, ownerGraph, owner, UnconditionalLink);
  3705. info->numExternalUses++;
  3706. return;
  3707. }
  3708. if (!ownerGraph || forceNewGraph)
  3709. {
  3710. thisGraph.setown(createGraph());
  3711. connectGraphs(thisGraph, expr, ownerGraph, owner, UnconditionalLink);
  3712. info->numExternalUses++;
  3713. }
  3714. info->graph.set(thisGraph);
  3715. switch (expr->getOperator())
  3716. {
  3717. case no_compound:
  3718. if (options.convertCompoundToExecuteWhen)
  3719. createInitialRemoteGraph(expr->queryChild(0), expr, thisGraph, true);
  3720. else
  3721. createInitialRemoteGraph(expr->queryChild(0), expr, NULL, true);
  3722. createInitialRemoteGraph(expr->queryChild(1), expr, thisGraph, false);
  3723. return;
  3724. case no_executewhen:
  3725. createInitialRemoteGraph(expr->queryChild(0), expr, thisGraph, false);
  3726. createInitialRemoteGraph(expr->queryChild(1), expr, thisGraph, true);
  3727. return;
  3728. }
  3729. }
  3730. unsigned first = getFirstActivityArgument(expr);
  3731. unsigned last = first + getNumActivityArguments(expr);
  3732. for (unsigned idx = first; idx < last; idx++)
  3733. createInitialRemoteGraph(expr->queryChild(idx), expr, thisGraph, false);
  3734. }
  3735. void EclResourcer::createInitialRemoteGraphs(HqlExprArray & exprs)
  3736. {
  3737. ForEachItemIn(idx, exprs)
  3738. createInitialRemoteGraph(&exprs.item(idx), NULL, NULL, false);
  3739. }
  3740. //------------------------------------------------------------------------------------------
  3741. // PASS3: Tag graphs/links that are conditional or unconditional
  3742. void EclResourcer::markChildDependentsAsUnconditional(ResourcerInfo * info, IHqlExpression * condition)
  3743. {
  3744. ForEachItemIn(iDepend, info->dependsOn)
  3745. markAsUnconditional(info->dependsOn.item(iDepend).sourceNode, info->graph, condition);
  3746. }
  3747. void EclResourcer::markAsUnconditional(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, IHqlExpression * condition)
  3748. {
  3749. ResourcerInfo * info = queryResourceInfo(expr);
  3750. if (!info || !info->containsActivity)
  3751. return;
  3752. if (!info->isActivity)
  3753. {
  3754. unsigned first = getFirstActivityArgument(expr);
  3755. unsigned last = first + getNumActivityArguments(expr);
  3756. for (unsigned idx=first; idx < last; idx++)
  3757. markAsUnconditional(expr->queryChild(idx), ownerGraph, condition);
  3758. return;
  3759. }
  3760. if (condition)
  3761. if (info->addCondition(condition))
  3762. condition = NULL;
  3763. if (info->pathToExpr == ResourcerInfo::PathUnconditional)
  3764. return;
  3765. if ((info->pathToExpr == ResourcerInfo::PathConditional) && condition)
  3766. {
  3767. if (targetClusterType == RoxieCluster)
  3768. {
  3769. if (options.spillMultiCondition)
  3770. {
  3771. if (info->graph != ownerGraph)
  3772. info->graph->isUnconditional = true;
  3773. }
  3774. return;
  3775. }
  3776. else
  3777. {
  3778. if (info->graph != ownerGraph)
  3779. return;
  3780. }
  3781. }
  3782. bool wasConditional = (info->pathToExpr == ResourcerInfo::PathConditional);
  3783. if (!condition)
  3784. {
  3785. info->pathToExpr = ResourcerInfo::PathUnconditional;
  3786. if (info->graph != ownerGraph)
  3787. info->graph->isUnconditional = true;
  3788. }
  3789. else
  3790. info->pathToExpr = ResourcerInfo::PathConditional;
  3791. node_operator op = expr->getOperator();
  3792. switch (op)
  3793. {
  3794. case no_if:
  3795. case no_choose:
  3796. case no_chooseds:
  3797. if (options.noConditionalLinks)
  3798. break;
  3799. if (condition)
  3800. markCondition(expr, condition, wasConditional);
  3801. else
  3802. {
  3803. //This list is processed in a second phase.
  3804. if (rootConditions.find(*expr) == NotFound)
  3805. rootConditions.append(*LINK(expr));
  3806. }
  3807. markChildDependentsAsUnconditional(info, condition);
  3808. return;
  3809. case no_filter:
  3810. if (!info->isConditionalFilter || options.noConditionalLinks)
  3811. break;
  3812. if (condition)
  3813. markCondition(expr, condition, wasConditional);
  3814. else
  3815. {
  3816. //This list is processed in a second phase.
  3817. if (rootConditions.find(*expr) == NotFound)
  3818. rootConditions.append(*LINK(expr));
  3819. }
  3820. markChildDependentsAsUnconditional(info, condition);
  3821. return;
  3822. case no_compound:
  3823. if (!options.convertCompoundToExecuteWhen)
  3824. {
  3825. HqlExprArray actions;
  3826. expr->queryChild(0)->unwindList(actions, no_actionlist);
  3827. ForEachItemIn(i, actions)
  3828. markAsUnconditional(&actions.item(i), NULL, NULL);
  3829. markAsUnconditional(expr->queryChild(1), ownerGraph, condition);
  3830. return;
  3831. }
  3832. break;
  3833. case no_sequential:
  3834. case no_orderedactionlist:
  3835. // case no_nonempty:
  3836. if (!options.isChildQuery)
  3837. {
  3838. unsigned first = getFirstActivityArgument(expr);
  3839. unsigned last = first + getNumActivityArguments(expr);
  3840. IHqlExpression * child0 = expr->queryChild(0);
  3841. markAsUnconditional(child0, info->graph, condition);
  3842. queryResourceInfo(child0)->graph->hasConditionSource = true; // force it to generate even if contains something very simple e.g., null action
  3843. for (unsigned idx = first+1; idx < last; idx++)
  3844. {
  3845. OwnedHqlExpr tag = createAttribute(instanceAtom, LINK(expr), getSizetConstant(idx), LINK(condition));
  3846. IHqlExpression * child = expr->queryChild(idx);
  3847. markAsUnconditional(child, queryResourceInfo(child)->graph, tag);
  3848. queryResourceInfo(child)->setConditionSource(tag, !wasConditional);
  3849. }
  3850. markChildDependentsAsUnconditional(info, condition);
  3851. return;
  3852. }
  3853. break;
  3854. case no_case:
  3855. case no_map:
  3856. UNIMPLEMENTED;
  3857. }
  3858. unsigned first = getFirstActivityArgument(expr);
  3859. unsigned last = first + getNumActivityArguments(expr);
  3860. for (unsigned idx=first; idx < last; idx++)
  3861. markAsUnconditional(expr->queryChild(idx), info->graph, condition);
  3862. markChildDependentsAsUnconditional(info, condition);
  3863. }
  3864. void EclResourcer::markConditionBranch(unsigned childIndex, IHqlExpression * expr, IHqlExpression * condition, bool wasConditional)
  3865. {
  3866. IHqlExpression * child = queryRealChild(expr, childIndex);
  3867. if (child)
  3868. {
  3869. OwnedHqlExpr tag;
  3870. if (expr->getOperator() == no_if)
  3871. tag.setown(createAttribute(((childIndex==1) ? trueAtom : falseAtom), LINK(expr), LINK(condition)));
  3872. else
  3873. tag.setown(createAttribute(trueAtom, LINK(expr), LINK(condition), getSizetConstant(childIndex)));
  3874. markAsUnconditional(child, queryResourceInfo(child)->graph, tag);
  3875. queryResourceInfo(child)->setConditionSource(tag, !wasConditional);
  3876. }
  3877. }
  3878. void EclResourcer::markCondition(IHqlExpression * expr, IHqlExpression * condition, bool wasConditional)
  3879. {
  3880. if (expr->getOperator() == no_filter)
  3881. {
  3882. markConditionBranch(0, expr, condition, wasConditional);
  3883. }
  3884. else
  3885. {
  3886. ForEachChildFrom(i, expr, 1)
  3887. markConditionBranch(i, expr, condition, wasConditional);
  3888. }
  3889. }
  3890. void EclResourcer::markConditions(HqlExprArray & exprs)
  3891. {
  3892. ForEachItemIn(idx, exprs)
  3893. {
  3894. IHqlExpression & cur = exprs.item(idx);
  3895. if (!cur.hasAttribute(_graphLocal_Atom))
  3896. {
  3897. //MORE: What should lazy do when implemented?
  3898. markAsUnconditional(&cur, NULL, NULL);
  3899. }
  3900. }
  3901. //More efficient to process the conditional branches once all unconditional branches have been walked.
  3902. ForEachItemIn(idx2, rootConditions)
  3903. markCondition(&rootConditions.item(idx2), NULL, false);
  3904. }
  3905. //------------------------------------------------------------------------------------------
  3906. // PASS4: Split subgraphs based on resource requirements for activities
  3907. //This will need to be improved if we allow backtracking to get the best combination of activities to fit in the subgraph
  3908. void EclResourcer::createResourceSplit(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerNewGraph, ResourceGraphInfo * originalGraph)
  3909. {
  3910. ResourcerInfo * info = queryResourceInfo(expr);
  3911. info->graph.setown(createGraph());
  3912. info->graph->isUnconditional = originalGraph->isUnconditional;
  3913. changeGraph(expr, info->graph);
  3914. connectGraphs(info->graph, expr, ownerNewGraph, owner, UnconditionalLink);
  3915. info->numExternalUses++;
  3916. }
  3917. void EclResourcer::getResources(IHqlExpression * expr, CResources & exprResources)
  3918. {
  3919. ::getResources(expr, exprResources, options);
  3920. }
  3921. bool EclResourcer::calculateResourceSpillPoints(IHqlExpression * expr, ResourceGraphInfo * graph, CResources & resourcesSoFar, bool hasGoodSpillPoint, bool canSpill)
  3922. {
  3923. ResourcerInfo * info = queryResourceInfo(expr);
  3924. if (!info || !info->containsActivity)
  3925. return true;
  3926. if (!info->isActivity)
  3927. {
  3928. unsigned first = getFirstActivityArgument(expr);
  3929. unsigned last = first + getNumActivityArguments(expr);
  3930. if (last - first == 1)
  3931. return calculateResourceSpillPoints(expr->queryChild(first), graph, resourcesSoFar, hasGoodSpillPoint, canSpill);
  3932. for (unsigned idx = first; idx < last; idx++)
  3933. calculateResourceSpillPoints(expr->queryChild(idx), graph, resourcesSoFar, false, canSpill);
  3934. return true;
  3935. }
  3936. if (info->graph != graph)
  3937. return true;
  3938. CResources exprResources(options.clusterSize);
  3939. getResources(expr, exprResources);
  3940. info->isSpillPoint = false;
  3941. Owned<CResources> curResources = LINK(&resourcesSoFar);
  3942. if (resourcesSoFar.addExceeds(exprResources, *resourceLimit))
  3943. {
  3944. if (hasGoodSpillPoint)
  3945. return false;
  3946. info->isSpillPoint = true;
  3947. spilled = true;
  3948. curResources.setown(new CResources(options.clusterSize));
  3949. if (exprResources.exceeds(*resourceLimit))
  3950. throwError2(HQLERR_CannotResourceActivity, getOpString(expr->getOperator()), options.clusterSize);
  3951. }
  3952. if (options.minimizeSkewBeforeSpill)
  3953. {
  3954. if (canSpill && heavyweightAndReducesSizeOrSkew(expr))
  3955. {
  3956. //if the input activity is going to cause us to run out of resources, then it is going to be better to split here than anywhere else
  3957. //this code may conceivably cause extra spills far away because the hash distributes are moved.
  3958. IHqlExpression * childExpr = expr->queryChild(0);
  3959. ResourcerInfo * childInfo = queryResourceInfo(childExpr);
  3960. if (childInfo->graph == graph)
  3961. {
  3962. CResources childResources(options.clusterSize);
  3963. getResources(childExpr, childResources);
  3964. childResources.add(exprResources);
  3965. if (curResources->addExceeds(childResources, *resourceLimit))
  3966. {
  3967. info->isSpillPoint = true;
  3968. spilled = true;
  3969. calculateResourceSpillPoints(childExpr, graph, exprResources, false, true);
  3970. return true;
  3971. }
  3972. }
  3973. //otherwise continue as normal.
  3974. }
  3975. }
  3976. curResources->add(exprResources);
  3977. unsigned first = getFirstActivityArgument(expr);
  3978. unsigned last = first + getNumActivityArguments(expr);
  3979. if (hasGoodSpillPoint)
  3980. {
  3981. if (exprResources.resource[RESheavy] || exprResources.resource[REShashdist] || last-first != 1)
  3982. hasGoodSpillPoint = false;
  3983. }
  3984. else if (!info->isSpillPoint && canSpill)
  3985. {
  3986. if (lightweightAndReducesDatasetSize(expr) || queryHint(expr, spillAtom))
  3987. {
  3988. CResources savedResources(*curResources);
  3989. if (!calculateResourceSpillPoints(expr->queryChild(0), graph, *curResources, true, true))
  3990. {
  3991. curResources->set(savedResources);
  3992. info->isSpillPoint = true;
  3993. spilled = true;
  3994. calculateResourceSpillPoints(expr->queryChild(0), graph, exprResources, false, true);
  3995. }
  3996. return true;
  3997. }
  3998. }
  3999. node_operator op = expr->getOperator();
  4000. if ((op == no_if) || (op == no_choose) || (op == no_chooseds))
  4001. {
  4002. //For conditions, spill on intersection of resources used, not union.
  4003. CResources savedResources(*curResources);
  4004. if (!calculateResourceSpillPoints(expr->queryChild(1), graph, *curResources, hasGoodSpillPoint, true))
  4005. return false;
  4006. ForEachChildFrom(i, expr, 2)
  4007. {
  4008. if (expr->queryChild(i)->isAttribute())
  4009. continue;
  4010. if (!calculateResourceSpillPoints(expr->queryChild(i), graph, savedResources, hasGoodSpillPoint, true))
  4011. return false;
  4012. curResources->maximize(savedResources);
  4013. }
  4014. }
  4015. else
  4016. {
  4017. for (unsigned idx = first; idx < last; idx++)
  4018. if (!calculateResourceSpillPoints(expr->queryChild(idx), graph, *curResources, hasGoodSpillPoint, true))
  4019. return false;
  4020. }
  4021. return true;
  4022. }
  4023. void EclResourcer::insertResourceSpillPoints(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerOriginalGraph, ResourceGraphInfo * ownerNewGraph)
  4024. {
  4025. ResourcerInfo * info = queryResourceInfo(expr);
  4026. if (!info || !info->containsActivity)
  4027. return;
  4028. if (!info->isActivity)
  4029. {
  4030. unsigned first = getFirstActivityArgument(expr);
  4031. unsigned last = first + getNumActivityArguments(expr);
  4032. for (unsigned idx = first; idx < last; idx++)
  4033. insertResourceSpillPoints(expr->queryChild(idx), expr, ownerOriginalGraph, ownerNewGraph);
  4034. return;
  4035. }
  4036. ResourceGraphInfo * originalGraph = info->graph; //NB: Graph will never cease to exist, so don't need to link.
  4037. if (originalGraph != ownerOriginalGraph)
  4038. return;
  4039. if (info->isSpillPoint)
  4040. createResourceSplit(expr, owner, ownerNewGraph, originalGraph);
  4041. else if (info->graph != ownerNewGraph)
  4042. changeGraph(expr, ownerNewGraph);
  4043. CResources exprResources(options.clusterSize);
  4044. getResources(expr, exprResources);
  4045. bool ok = info->graph->allocateResources(exprResources, *resourceLimit);
  4046. assertex(ok);
  4047. node_operator op = expr->getOperator();
  4048. if ((op == no_if) || (op == no_choose) || (op == no_chooseds))
  4049. {
  4050. CResources savedResources(info->graph->resources);
  4051. insertResourceSpillPoints(expr->queryChild(1), expr, originalGraph, info->graph);
  4052. ForEachChildFrom(i, expr, 2)
  4053. {
  4054. if (expr->queryChild(i)->isAttribute())
  4055. continue;
  4056. CResources branchResources(info->graph->resources);
  4057. info->graph->resources.set(savedResources);
  4058. insertResourceSpillPoints(expr->queryChild(i), expr, originalGraph, info->graph);
  4059. info->graph->resources.maximize(branchResources);
  4060. }
  4061. }
  4062. else
  4063. {
  4064. unsigned first = getFirstActivityArgument(expr);
  4065. unsigned last = first + getNumActivityArguments(expr);
  4066. for (unsigned idx = first; idx < last; idx++)
  4067. insertResourceSpillPoints(expr->queryChild(idx), expr, originalGraph, info->graph);
  4068. }
  4069. }
  4070. void EclResourcer::resourceSubGraph(ResourceGraphInfo * graph)
  4071. {
  4072. if (graph->beenResourced)
  4073. return;
  4074. graph->beenResourced = true;
  4075. ForEachItemIn(idx, graph->sources)
  4076. resourceSubGraph(graph->sources.item(idx).sourceGraph);
  4077. IHqlExpression * sourceNode = graph->sinks.item(0).sourceNode->queryBody();
  4078. #ifdef _DEBUG
  4079. //Sanity check, ensure there is only a single sink for this graph.
  4080. //However because libraryselects are tightly bound to their library instance there may be multiple library selects.
  4081. //They won't affect the resourcing though, since they'll plug into the same library instance, and the selects use no resources.
  4082. ForEachItemIn(idx2, graph->sinks)
  4083. {
  4084. IHqlExpression * thisSink = graph->sinks.item(idx2).sourceNode->queryBody();
  4085. if (thisSink->getOperator() != no_libraryselect)
  4086. assertex(thisSink == sourceNode);
  4087. }
  4088. #endif
  4089. spilled = false;
  4090. CResources resources(options.clusterSize);
  4091. calculateResourceSpillPoints(sourceNode, graph, resources, false, false);
  4092. if (spilled)
  4093. insertResourceSpillPoints(sourceNode, NULL, graph, graph);
  4094. else
  4095. graph->resources.set(resources);
  4096. }
  4097. void EclResourcer::resourceSubGraphs(HqlExprArray & exprs)
  4098. {
  4099. ForEachItemIn(idx, graphs)
  4100. resourceSubGraph(&graphs.item(idx));
  4101. }
  4102. //------------------------------------------------------------------------------------------
  4103. // PASS5: Link subrgaphs with dependency information so they don't get merged by accident.
  4104. struct DependencySourceInfo
  4105. {
  4106. HqlExprArray search;
  4107. CIArrayOf<ResourceGraphInfo> graphs;
  4108. HqlExprArray exprs;
  4109. };
  4110. class EclResourceDependencyGatherer
  4111. {
  4112. public:
  4113. EclResourceDependencyGatherer(IErrorReceiver * _errors)
  4114. : errors(_errors)
  4115. {
  4116. }
  4117. void addChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr);
  4118. bool addExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr);
  4119. void appendLinkOwn(ResourceGraphDependencyLink & link) { links.append(link); }
  4120. const CIArrayOf<ResourceGraphLink> & queryLinks() const { return links; }
  4121. protected:
  4122. void addDependencySource(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr);
  4123. void addDependencyUse(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr);
  4124. void addRefExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr);
  4125. void doAddChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr);
  4126. protected:
  4127. DependencySourceInfo dependencySource;
  4128. IErrorReceiver * errors;
  4129. CIArrayOf<ResourceGraphLink> links;
  4130. };
  4131. void EclResourceDependencyGatherer::addDependencySource(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr)
  4132. {
  4133. //MORE: Should we check this doesn't already exist?
  4134. dependencySource.search.append(*LINK(search));
  4135. dependencySource.graphs.append(*LINK(curGraph));
  4136. dependencySource.exprs.append(*LINK(expr));
  4137. }
  4138. void EclResourceDependencyGatherer::addDependencyUse(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr)
  4139. {
  4140. unsigned index = dependencySource.search.find(*search);
  4141. if (index != NotFound)
  4142. {
  4143. if (&dependencySource.graphs.item(index) == curGraph)
  4144. {
  4145. //Don't give a warning if get/set is within the same activity (e.g., within a local())
  4146. if (&dependencySource.exprs.item(index) != expr)
  4147. {
  4148. StringBuffer ecl;
  4149. getExprECL(search, ecl);
  4150. VStringBuffer msg(HQLWRN_RecursiveDependendencies_Text, ecl.str());
  4151. errors->reportError(HQLWRN_RecursiveDependendencies, msg.str(), str(codeGeneratorId), 0, 0, 0);
  4152. }
  4153. }
  4154. else
  4155. {
  4156. IHqlExpression * sourceExpr = &dependencySource.exprs.item(index);
  4157. ResourceGraphLink * link = new ResourceGraphDependencyLink(&dependencySource.graphs.item(index), sourceExpr, curGraph, expr, search);
  4158. links.append(*link);
  4159. }
  4160. }
  4161. }
  4162. void EclResourceDependencyGatherer::addRefExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr)
  4163. {
  4164. IHqlExpression * filename = queryTableFilename(expr);
  4165. if (filename)
  4166. {
  4167. OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
  4168. addDependencySource(value, curGraph, activityExpr);
  4169. }
  4170. }
  4171. bool EclResourceDependencyGatherer::addExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr)
  4172. {
  4173. switch (expr->getOperator())
  4174. {
  4175. case no_buildindex:
  4176. case no_output:
  4177. {
  4178. IHqlExpression * filename = queryRealChild(expr, 1);
  4179. if (filename)
  4180. {
  4181. switch (filename->getOperator())
  4182. {
  4183. case no_pipe:
  4184. // allWritten = true;
  4185. break;
  4186. default:
  4187. OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
  4188. addDependencySource(value, curGraph, activityExpr);
  4189. break;
  4190. }
  4191. }
  4192. else
  4193. {
  4194. IHqlExpression * seq = querySequence(expr);
  4195. assertex(seq && seq->queryValue());
  4196. IHqlExpression * name = queryResultName(expr);
  4197. OwnedHqlExpr value = createAttribute(resultAtom, LINK(seq), LINK(name));
  4198. addDependencySource(value, curGraph, activityExpr);
  4199. }
  4200. return true;
  4201. }
  4202. case no_keydiff:
  4203. {
  4204. addRefExprDependency(expr->queryChild(0), curGraph, activityExpr);
  4205. addRefExprDependency(expr->queryChild(1), curGraph, activityExpr);
  4206. OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(2)));
  4207. addDependencySource(value, curGraph, activityExpr);
  4208. return true;
  4209. }
  4210. case no_keypatch:
  4211. {
  4212. addRefExprDependency(expr->queryChild(0), curGraph, activityExpr);
  4213. OwnedHqlExpr patchName = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(1)));
  4214. addDependencyUse(patchName, curGraph, activityExpr);
  4215. OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(2)));
  4216. addDependencySource(value, curGraph, activityExpr);
  4217. return true;
  4218. }
  4219. case no_table:
  4220. {
  4221. IHqlExpression * filename = expr->queryChild(0);
  4222. OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
  4223. addDependencyUse(value, curGraph, activityExpr);
  4224. return !filename->isConstant();
  4225. }
  4226. case no_select:
  4227. return isNewSelector(expr);
  4228. case no_workunit_dataset:
  4229. {
  4230. IHqlExpression * sequence = queryAttributeChild(expr, sequenceAtom, 0);
  4231. IHqlExpression * name = queryAttributeChild(expr, nameAtom, 0);
  4232. OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
  4233. addDependencyUse(value, curGraph, activityExpr);
  4234. return false;
  4235. }
  4236. case no_getresult:
  4237. {
  4238. IHqlExpression * sequence = queryAttributeChild(expr, sequenceAtom, 0);
  4239. IHqlExpression * name = queryAttributeChild(expr, namedAtom, 0);
  4240. OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
  4241. addDependencyUse(value, curGraph, activityExpr);
  4242. return false;
  4243. }
  4244. case no_getgraphresult:
  4245. {
  4246. OwnedHqlExpr value = createAttribute(resultAtom, LINK(expr->queryChild(1)), LINK(expr->queryChild(2)));
  4247. addDependencyUse(value, curGraph, activityExpr);
  4248. return false;
  4249. }
  4250. case no_setgraphresult:
  4251. {
  4252. OwnedHqlExpr value = createAttribute(resultAtom, LINK(expr->queryChild(1)), LINK(expr->queryChild(2)));
  4253. addDependencySource(value, curGraph, activityExpr);
  4254. return true;
  4255. }
  4256. case no_ensureresult:
  4257. case no_setresult:
  4258. case no_extractresult:
  4259. {
  4260. IHqlExpression * sequence = queryAttributeChild(expr, sequenceAtom, 0);
  4261. IHqlExpression * name = queryAttributeChild(expr, namedAtom, 0);
  4262. OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
  4263. addDependencySource(value, curGraph, activityExpr);
  4264. return true;
  4265. }
  4266. case no_attr:
  4267. case no_attr_link:
  4268. case no_record:
  4269. case no_field:
  4270. return false; //no need to look any further
  4271. default:
  4272. return true;
  4273. }
  4274. }
  4275. void EclResourceDependencyGatherer::doAddChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
  4276. {
  4277. if (expr->queryTransformExtra())
  4278. return;
  4279. expr->setTransformExtraUnlinked(expr);
  4280. if (addExprDependency(expr, graph, activityExpr))
  4281. {
  4282. ForEachChild(idx, expr)
  4283. doAddChildDependencies(expr->queryChild(idx), graph, activityExpr);
  4284. }
  4285. }
  4286. void EclResourceDependencyGatherer::addChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
  4287. {
  4288. if (graph)
  4289. {
  4290. TransformMutexBlock block;
  4291. doAddChildDependencies(expr, graph, activityExpr);
  4292. }
  4293. }
  4294. //-----------------------------------------
  4295. void EclResourcer::addDependencies(EclResourceDependencyGatherer & gatherer, IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
  4296. {
  4297. ResourcerInfo * info = queryResourceInfo(expr);
  4298. if (!info)
  4299. return;
  4300. if (info->containsActivity)
  4301. {
  4302. if (info->isActivity)
  4303. {
  4304. if (info->gatheredDependencies)
  4305. return;
  4306. info->gatheredDependencies = true;
  4307. graph = info->graph;
  4308. activityExpr = expr;
  4309. }
  4310. if (gatherer.addExprDependency(expr, graph, activityExpr))
  4311. {
  4312. unsigned first = getFirstActivityArgument(expr);
  4313. unsigned last = first + getNumActivityArguments(expr);
  4314. ForEachChild(idx, expr)
  4315. {
  4316. if ((idx >= first) && (idx < last))
  4317. addDependencies(gatherer, expr->queryChild(idx), graph, activityExpr);
  4318. else
  4319. gatherer.addChildDependencies(expr->queryChild(idx), graph, activityExpr);
  4320. }
  4321. }
  4322. }
  4323. else
  4324. gatherer.addChildDependencies(expr, graph, activityExpr);
  4325. }
  4326. void EclResourcer::addDependencies(HqlExprArray & exprs)
  4327. {
  4328. EclResourceDependencyGatherer gatherer(errors);
  4329. ForEachItemIn(idx, exprs)
  4330. addDependencies(gatherer, &exprs.item(idx), NULL, NULL);
  4331. const CIArrayOf<ResourceGraphLink> & dependLinks = gatherer.queryLinks();
  4332. ForEachItemIn(i, dependLinks)
  4333. {
  4334. ResourceGraphLink & curLink = dependLinks.item(i);
  4335. //NOTE: queryResourceInfo() cannot be called inside gatherer because that uses the transform mutex
  4336. //for something different
  4337. ResourcerInfo * sinkInfo = queryResourceInfo(curLink.sinkNode);
  4338. sinkInfo->dependsOn.append(curLink);
  4339. curLink.sinkGraph->dependsOn.append(curLink);
  4340. links.append(OLINK(curLink));
  4341. }
  4342. }
  4343. //--------------------------------------------------------
  4344. void EclResourcer::oldSpotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph)
  4345. {
  4346. ResourcerInfo * info = queryResourceInfo(expr);
  4347. if (!info)
  4348. return;
  4349. if (graph && info->graph && info->graph != graph)
  4350. {
  4351. if ((info->currentSource == whichSource) && (info->pathToSplitter != path))
  4352. graph->unbalancedExternalSources.append(*LINK(expr->queryBody()));
  4353. info->currentSource = whichSource;
  4354. info->pathToSplitter.set(path);
  4355. return;
  4356. }
  4357. else
  4358. {
  4359. if (info->currentSource == whichSource)
  4360. {
  4361. if (info->pathToSplitter != path)
  4362. info->balanced = false;
  4363. return;
  4364. }
  4365. info->currentSource = whichSource;
  4366. info->pathToSplitter.set(path);
  4367. }
  4368. if (info->containsActivity)
  4369. {
  4370. unsigned first = getFirstActivityArgument(expr);
  4371. unsigned num = getNumActivityArguments(expr);
  4372. bool modify = false;
  4373. if (num > 1)
  4374. {
  4375. switch (expr->getOperator())
  4376. {
  4377. case no_addfiles:
  4378. if (isOrdered(expr) || isGrouped(expr))
  4379. modify = true;
  4380. break;
  4381. default:
  4382. modify = true;
  4383. break;
  4384. }
  4385. }
  4386. unsigned last = first + num;
  4387. for (unsigned idx = first; idx < last; idx++)
  4388. {
  4389. OwnedHqlExpr childPath = modify ? createAttribute(pathAtom, getSizetConstant(idx), LINK(path)) : LINK(path);
  4390. oldSpotUnbalancedSplitters(expr->queryChild(idx), whichSource, childPath, graph);
  4391. }
  4392. }
  4393. //Now check dependencies between graphs (for roxie)
  4394. if (!graph)
  4395. {
  4396. if (info->graph)
  4397. {
  4398. GraphLinkArray & graphLinks = info->graph->dependsOn;
  4399. ForEachItemIn(i, graphLinks)
  4400. {
  4401. ResourceGraphLink & link = graphLinks.item(i);
  4402. if (link.sinkNode == expr)
  4403. {
  4404. OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
  4405. oldSpotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
  4406. }
  4407. }
  4408. }
  4409. else
  4410. {
  4411. ForEachItemIn(i, links)
  4412. {
  4413. ResourceGraphLink & link = links.item(i);
  4414. if (link.sinkNode == expr)
  4415. {
  4416. OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
  4417. oldSpotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
  4418. }
  4419. }
  4420. }
  4421. }
  4422. }
  4423. void EclResourcer::oldSpotUnbalancedSplitters(HqlExprArray & exprs)
  4424. {
  4425. unsigned curSource = 1;
  4426. switch (targetClusterType)
  4427. {
  4428. case HThorCluster:
  4429. break;
  4430. case ThorLCRCluster:
  4431. {
  4432. //Thor only handles one graph at a time, so only walk expressions within a single graph.
  4433. ForEachItemIn(i1, graphs)
  4434. {
  4435. ResourceGraphInfo & curGraph = graphs.item(i1);
  4436. ForEachItemIn(i2, curGraph.sinks)
  4437. {
  4438. ResourceGraphLink & cur = curGraph.sinks.item(i2);
  4439. oldSpotUnbalancedSplitters(cur.sourceNode, curSource++, 0, &curGraph);
  4440. }
  4441. }
  4442. }
  4443. break;
  4444. case RoxieCluster:
  4445. {
  4446. //Roxie pulls all at once, so need to analyse globally.
  4447. ForEachItemIn(idx, exprs)
  4448. oldSpotUnbalancedSplitters(&exprs.item(idx), curSource++, 0, NULL);
  4449. break;
  4450. }
  4451. }
  4452. }
  4453. void EclResourcer::spotSharedInputs(IHqlExpression * expr, ResourceGraphInfo * graph)
  4454. {
  4455. ResourcerInfo * info = queryResourceInfo(expr);
  4456. if (!info)
  4457. return;
  4458. if (info->graph && info->graph != graph)
  4459. {
  4460. IHqlExpression * body = expr->queryBody();
  4461. if (!graph->unbalancedExternalSources.contains(*body))
  4462. graph->balancedExternalSources.append(*LINK(body));
  4463. return;
  4464. }
  4465. if (info->isSplit())
  4466. {
  4467. //overload currentSource to track if we have visited this splitter before. It cannot have value value NotFound up to now
  4468. if (info->currentSource == NotFound)
  4469. return;
  4470. info->currentSource = NotFound;
  4471. }
  4472. if (info->containsActivity)
  4473. {
  4474. unsigned first = getFirstActivityArgument(expr);
  4475. unsigned num = getNumActivityArguments(expr);
  4476. unsigned last = first + num;
  4477. for (unsigned idx = first; idx < last; idx++)
  4478. {
  4479. spotSharedInputs(expr->queryChild(idx), graph);
  4480. }
  4481. }
  4482. }
  4483. void EclResourcer::spotSharedInputs()
  4484. {
  4485. //Thor only handles one graph at a time, so only walk expressions within a single graph.
  4486. ForEachItemIn(i1, graphs)
  4487. {
  4488. ResourceGraphInfo & curGraph = graphs.item(i1);
  4489. HqlExprCopyArray visited;
  4490. ForEachItemIn(i2, curGraph.sinks)
  4491. {
  4492. ResourceGraphLink & cur = curGraph.sinks.item(i2);
  4493. IHqlExpression * curExpr = cur.sourceNode->queryBody();
  4494. if (!visited.contains(*curExpr))
  4495. {
  4496. ResourcerInfo * info = queryResourceInfo(curExpr);
  4497. if (!info->isExternalSpill() && !info->expandRatherThanSpill(true))
  4498. {
  4499. spotSharedInputs(curExpr, &curGraph);
  4500. visited.append(*curExpr);
  4501. }
  4502. }
  4503. }
  4504. }
  4505. }
  4506. //--------------------------------------------------------------------------------------------------------------------
  4507. /*
  4508. * Splitters can either have a limited or unlimited read-ahead. A splitter with unlimited read-ahead is likely to use
  4509. * more memory, and needs to be able to spill its read ahead buffer to disk. A splitter with limited read-ahead
  4510. * ("balanced") is likely to be more efficient - but it can also potentially cause deadlock.
  4511. *
  4512. * A balanced splitter can deadlock because each of its output activities is effectively dependent on the others, which
  4513. * can create dependency cycles in the graph.
  4514. *
  4515. * Say you have f(x,y) and g(x,y), if x and y are unbalanced splitters you have directed edges (f->x),(f->y),(g->x),(g->y)
  4516. * which contains no cycles. If x is balanced then you also have (x->f),(x->g) - still without any cycles, but if y is
  4517. * also balanced then you gain the edges (y->f,y->g) which creates a cycle. So either x or y need to become unbalanced.
  4518. *
  4519. * The previous implementation worked by tagging each splitter with the paths that were used to get there, and
  4520. * marking a splitter as unbalanced if there was more than one path from the same output. However that fails to catch the
  4521. * case where you have f(a,b) and g(a,b) which create more complicated cycles.
  4522. *
  4523. * This is essentially a graph traversal problem, where a balanced splitter makes each output dependent on each other
  4524. * (they effectively become bi-directional dependencies). Converting balanced splitters to unbalanced is equivalent to
  4525. * calculating the feedback arc set that will remove all the cycles. (Unfortunately calculating the minimal set is NP
  4526. * hard, but because we are dealing with a specialised graph we can avoid that.)
  4527. *
  4528. * The approach is as follows
  4529. * Loop through each output
  4530. * If not already visited (from another output), then perform a depth-first traversal of the graph for that output
  4531. *
  4532. * When traversing a node we mark it as being visited, and then walk through the other links.
  4533. * If node at the end of the current link is being visited then we have found a cycle - so we need to introduce
  4534. * a unbalanced splitter to remove some of the paths. That splitter must be on the path between the two
  4535. * visits to the nodes.
  4536. * - It could be the node we have just reached
  4537. * - It could be any other node on the route.
  4538. *
  4539. * We can tell which one it is because
  4540. * - if is is the node we have just reached, the visiting link must be from an output, and previous link from node to output
  4541. * - if on the path then exit path will be to an output, and entry path from an output.
  4542. *
  4543. * If not then there must be by definition another node on that path that satisfies that condition.
  4544. *
  4545. * Note: This may over-estimate the number of splitters that need to be marked as unbalanced, but in general it is fairly good.
  4546. */
  4547. CSplitterInfo::CSplitterInfo(EclResourcer & _resourcer, bool _preserveBalanced, bool _ignoreExternalDependencies)
  4548. : resourcer(_resourcer), preserveBalanced(_preserveBalanced), ignoreExternalDependencies(_ignoreExternalDependencies)
  4549. {
  4550. nextBalanceId = 0;
  4551. #ifdef TRACE_BALANCED
  4552. printf("digraph {\n");
  4553. #endif
  4554. }
  4555. CSplitterInfo::~CSplitterInfo()
  4556. {
  4557. if (preserveBalanced)
  4558. restoreBalanced();
  4559. #ifdef TRACE_BALANCED
  4560. printf("}\n");
  4561. #endif
  4562. }
  4563. void CSplitterInfo::addLink(IHqlExpression * source, IHqlExpression * sink, bool isExternal)
  4564. {
  4565. ResourcerInfo * sourceInfo = queryResourceInfo(source);
  4566. #ifdef TRACE_BALANCED
  4567. if (sourceInfo->balanceId == 0)
  4568. {
  4569. sourceInfo->balanceId = ++nextBalanceId;
  4570. printf("\tn%u [label=\"%u\"]; // %s\n", sourceInfo->balanceId, sourceInfo->balanceId, getOpString(source->getOperator()));
  4571. }
  4572. #endif
  4573. if (isExternal)
  4574. {
  4575. if (!externalSources.contains(*source))
  4576. {
  4577. externalSources.append(*source);
  4578. if (preserveBalanced)
  4579. wasBalanced.append(sourceInfo->balanced);
  4580. }
  4581. }
  4582. sourceInfo->balanced = true;
  4583. if (sink)
  4584. {
  4585. CSplitterLink * link = new CSplitterLink(source, sink);
  4586. ResourcerInfo * sinkInfo = queryResourceInfo(sink);
  4587. sourceInfo->balancedLinks.append(*link);
  4588. sinkInfo->balancedLinks.append(*LINK(link));
  4589. sourceInfo->balancedInternalUses++;
  4590. #ifdef TRACE_BALANCED
  4591. printf("\tn%u -> n%u;\n", sourceInfo->balanceId, sinkInfo->balanceId);
  4592. #endif
  4593. }
  4594. else
  4595. {
  4596. sinks.append(*source);
  4597. sourceInfo->balancedExternalUses++;
  4598. }
  4599. }
  4600. void CSplitterLink::mergeSinkLink(CSplitterLink & sinkLink)
  4601. {
  4602. IHqlExpression * newSink = sinkLink.querySink();
  4603. assertex(newSink);
  4604. assertex(sinkLink.hasSource(querySink()));
  4605. sink.set(newSink);
  4606. ResourcerInfo * sinkInfo = queryResourceInfo(newSink);
  4607. unsigned sinkPos = sinkInfo->balancedLinks.find(sinkLink);
  4608. sinkInfo->balancedLinks.replace(OLINK(*this), sinkPos);
  4609. }
  4610. bool CSplitterInfo::isSplitOrBranch(IHqlExpression * expr) const
  4611. {
  4612. unsigned num = getNumActivityArguments(expr);
  4613. if (num > 1)
  4614. return true;
  4615. //Is this potentially splitter? Better to have false positives...
  4616. ResourcerInfo * info = queryResourceInfo(expr);
  4617. assertex(info);
  4618. if (info->numUses > 1)
  4619. return true;
  4620. if (info->hasDependency())
  4621. return true;
  4622. return false;
  4623. }
  4624. bool CSplitterInfo::isBalancedSplitter(IHqlExpression * expr) const
  4625. {
  4626. ResourcerInfo * info = queryResourceInfo(expr);
  4627. if (!info->balanced)
  4628. return false;
  4629. unsigned numOutputs = info->balancedExternalUses + info->balancedInternalUses;
  4630. return (numOutputs > 1);
  4631. }
  4632. void CSplitterInfo::gatherPotentialSplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph, bool isDependency)
  4633. {
  4634. ResourcerInfo * info = queryResourceInfo(expr);
  4635. if (!info)
  4636. return;
  4637. switch (expr->getOperator())
  4638. {
  4639. case no_null:
  4640. case no_fail:
  4641. //Any sources that never generate any rows are always fine as a splitter
  4642. return;
  4643. case no_attr:
  4644. case no_attr_expr:
  4645. //Anything that doesn't correspond to an activity should do nothing
  4646. return;
  4647. //MORE: A source that generates a single row, and subsequent rows are never read, is always fine.
  4648. // but if read as a dataset it could deadlock unless there was a 1 row read-ahead.
  4649. }
  4650. bool alreadyVisited = resourcer.checkAlreadyVisited(info);
  4651. if (!alreadyVisited)
  4652. {
  4653. info->resetBalanced();
  4654. #ifdef TRACE_BALANCED
  4655. info->balanceId = 0;
  4656. #endif
  4657. }
  4658. if (graph && info->graph && info->graph != graph)
  4659. {
  4660. if (!ignoreExternalDependencies || !isDependency)
  4661. addLink(expr, sink, true);
  4662. return;
  4663. }
  4664. if (isSplitOrBranch(expr) || !sink)
  4665. {
  4666. addLink(expr, sink, false);
  4667. if (alreadyVisited)
  4668. return;
  4669. sink = expr;
  4670. }
  4671. if (info->containsActivity)
  4672. {
  4673. unsigned first = getFirstActivityArgument(expr);
  4674. unsigned num = getNumActivityArguments(expr);
  4675. unsigned last = first + num;
  4676. for (unsigned idx = first; idx < last; idx++)
  4677. gatherPotentialSplitters(expr->queryChild(idx), sink, graph, false);
  4678. }
  4679. //MORE: dependencies should have onStart as their sink, and there should be a link between onStart and allSinks.
  4680. //Now check dependencies between graphs (for roxie) and possibly within a graph for thor
  4681. if (info->graph)
  4682. {
  4683. GraphLinkArray & graphLinks = info->graph->dependsOn;
  4684. ForEachItemIn(i, graphLinks)
  4685. {
  4686. ResourceGraphLink & link = graphLinks.item(i);
  4687. if (link.sinkNode == expr)
  4688. gatherPotentialDependencySplitters(link.sourceNode, sink, graph);
  4689. }
  4690. }
  4691. else
  4692. {
  4693. ForEachItemIn(i, resourcer.links)
  4694. {
  4695. ResourceGraphLink & link = resourcer.links.item(i);
  4696. if (link.isDependency() && (link.sinkNode == expr))
  4697. {
  4698. gatherPotentialDependencySplitters(link.sourceNode, sink, graph);
  4699. }
  4700. }
  4701. }
  4702. }
  4703. void CSplitterInfo::gatherPotentialDependencySplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph)
  4704. {
  4705. //Strictly speaking dependencies are executed *before* the activity.
  4706. //I experimented with making them dependent on a onStart pseudo expression, but that doesn't work because
  4707. //there are multiple onStart items (as the dependencies nest).
  4708. //There is still the outside possibility of deadlock between expressions and dependencies of items inside
  4709. //non-ordered concats, but they would only occur in roxie, and this flag is currently ignored by roxie.
  4710. gatherPotentialSplitters(expr, sink, graph, true);
  4711. }
  4712. void CSplitterInfo::restoreBalanced()
  4713. {
  4714. ForEachItemIn(i, externalSources)
  4715. {
  4716. IHqlExpression & cur = externalSources.item(i);
  4717. ResourcerInfo * info = queryResourceInfo(&cur);
  4718. info->balanced = wasBalanced.item(i);
  4719. }
  4720. }
  4721. IHqlExpression * EclResourcer::walkPotentialSplitters(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink & link)
  4722. {
  4723. ResourcerInfo * info = queryResourceInfo(expr);
  4724. //If already visited all the links, then no need to do any more.
  4725. if (info->finishedWalkingSplitters())
  4726. return NULL;
  4727. //Are we currently in the process of visiting this node?
  4728. if (info->balancedVisiting)
  4729. {
  4730. //If this is an unbalanced splitter, then any loops will be found when walking from there
  4731. if (!info->balanced)
  4732. return nullptr;
  4733. #ifdef TRACE_BALANCED
  4734. printf("//Follow %u->%u has problems....\n", queryResourceInfo(link.queryOther(expr))->balanceId, info->balanceId);
  4735. #endif
  4736. if (link.hasSource(expr))
  4737. {
  4738. //Walking up the tree, and the current node is being visited => it is a node that needs to become unbalanced - if it is a balanced splitter
  4739. if (connections.isBalancedSplitter(expr))
  4740. {
  4741. const CSplitterLink * originalLink = info->queryCurrentLink();
  4742. if (originalLink->hasSource(expr))
  4743. return expr->queryBody();
  4744. }
  4745. //Must be a node with multiple inputs, walked from one input, and visited another.
  4746. return backtrackPseudoExpr;
  4747. }
  4748. else
  4749. {
  4750. //found a loop-> need to do something about it.
  4751. return backtrackPseudoExpr;
  4752. }
  4753. }
  4754. #ifdef TRACE_BALANCED
  4755. printf("//Follow %u->%u\n", queryResourceInfo(link.queryOther(expr))->balanceId, info->balanceId);
  4756. #endif
  4757. return walkPotentialSplitterLinks(connections, expr, &link);
  4758. }
  4759. IHqlExpression * EclResourcer::walkPotentialSplitterLinks(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink * link)
  4760. {
  4761. ResourcerInfo * info = queryResourceInfo(expr);
  4762. info->balancedVisiting = true;
  4763. //This may iterate through all links again - but will return quickly if already visited
  4764. for (unsigned i=0; i < info->balancedLinks.ordinality(); )
  4765. {
  4766. const CSplitterLink & cur = info->balancedLinks.item(i);
  4767. if (&cur != link) // don't walk the link we reached here by
  4768. {
  4769. //Only follow links in unbalanced splitters if this is an input, or walking from inputs to outputs
  4770. if (info->balanced || cur.hasSink(expr) || (link && link->hasSink(expr)))
  4771. {
  4772. info->curBalanceLink = i;
  4773. IHqlExpression * problem = walkPotentialSplitters(connections, cur.queryOther(expr), cur);
  4774. if (problem)
  4775. {
  4776. bool forceUnbalanced = false;
  4777. if (problem == backtrackPseudoExpr)
  4778. {
  4779. //both links are to outputs
  4780. if (link && link->hasSource(expr) && cur.hasSource(expr))
  4781. {
  4782. assertex(connections.isBalancedSplitter(expr));
  4783. forceUnbalanced = true;
  4784. }
  4785. }
  4786. else
  4787. forceUnbalanced = (expr->queryBody() == problem);
  4788. if (!forceUnbalanced)
  4789. {
  4790. //No longer visiting - we'll come back here again later
  4791. info->balancedVisiting = false;
  4792. return problem;
  4793. }
  4794. #ifdef TRACE_BALANCED
  4795. printf("\tn%u [color=red];\n", info->balanceId);
  4796. printf("//%u marked as unbalanced\n", info->balanceId);
  4797. #endif
  4798. info->balanced = false;
  4799. //walk the link again
  4800. continue;
  4801. }
  4802. }
  4803. }
  4804. i++;
  4805. }
  4806. info->curBalanceLink = info->balancedLinks.ordinality();
  4807. return NULL;
  4808. }
  4809. bool EclResourcer::removePassThrough(CSplitterInfo & connections, ResourcerInfo & info)
  4810. {
  4811. if (info.balancedLinks.ordinality() != 2)
  4812. return false;
  4813. CSplitterLink & link0 = info.balancedLinks.item(0);
  4814. CSplitterLink & link1 = info.balancedLinks.item(1);
  4815. CSplitterLink * sourceLink;
  4816. CSplitterLink * sinkLink;
  4817. if (link0.hasSource(info.original) && link1.hasSink(info.original))
  4818. {
  4819. sourceLink = &link1;
  4820. sinkLink = &link0;
  4821. }
  4822. else if (link0.hasSink(info.original) && link1.hasSource(info.original))
  4823. {
  4824. sourceLink = &link0;
  4825. sinkLink = &link1;
  4826. }
  4827. else
  4828. return false;
  4829. if (!sinkLink->querySink())
  4830. return false;
  4831. #ifdef TRACE_BALANCED
  4832. printf("//remove node %u since now pass-through\n", info.balanceId);
  4833. #endif
  4834. sourceLink->mergeSinkLink(*sinkLink);
  4835. return true;
  4836. }
  4837. bool EclResourcer::allInputsPulledIndependently(IHqlExpression * expr) const
  4838. {
  4839. switch (expr->getOperator())
  4840. {
  4841. case no_addfiles:
  4842. if (isOrdered(expr) || isGrouped(expr))
  4843. return false;
  4844. if (!expr->hasAttribute(orderedAtom) && options.isChildQuery)
  4845. return false;
  4846. return true;
  4847. case no_parallel:
  4848. //MORE; This can probably return true - and generate fewer unbalanced splitters.
  4849. break;
  4850. }
  4851. return false;
  4852. }
  4853. void EclResourcer::removeDuplicateIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info)
  4854. {
  4855. IHqlExpression * expr = info.original;
  4856. for (;;)
  4857. {
  4858. bool again = false;
  4859. for (unsigned i=0; i < info.balancedLinks.ordinality(); i++)
  4860. {
  4861. CSplitterLink & cur = info.balancedLinks.item(i);
  4862. if (cur.hasSource(expr))
  4863. {
  4864. IHqlExpression * sink = cur.queryOther(expr);
  4865. assertex(sink);
  4866. ResourcerInfo & sinkInfo = *queryResourceInfo(sink);
  4867. if (allInputsPulledIndependently(sink))
  4868. {
  4869. unsigned numRemoved = 0;
  4870. for (unsigned j=info.balancedLinks.ordinality()-1; j > i; j--)
  4871. {
  4872. CSplitterLink & next = info.balancedLinks.item(j);
  4873. if (next.hasSource(expr) && next.hasSink(sink))
  4874. {
  4875. info.balancedLinks.remove(j);
  4876. sinkInfo.balancedLinks.zap(next);
  4877. numRemoved++;
  4878. }
  4879. }
  4880. #ifdef TRACE_BALANCED
  4881. if (numRemoved)
  4882. printf("//removed %u duplicate links from %u to %u\n", numRemoved, info.balanceId, sinkInfo.balanceId);
  4883. #endif
  4884. }
  4885. //Removing duplicate links has turned the source item into a pass-through.
  4886. //Replace references to the sink activity with references to its sink
  4887. //to possibly allow more to be removed.
  4888. if (removePassThrough(connections, sinkInfo))
  4889. {
  4890. #ifdef TRACE_BALANCED
  4891. printf("//remove %u now pass-through\n", sinkInfo.balanceId);
  4892. #endif
  4893. again = true;
  4894. }
  4895. }
  4896. }
  4897. if (!again)
  4898. break;
  4899. }
  4900. }
  4901. void EclResourcer::optimizeIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info)
  4902. {
  4903. if (info.removedParallelPullers)
  4904. return;
  4905. info.removedParallelPullers = true;
  4906. removeDuplicateIndependentLinks(connections, info);
  4907. //Recurse over inputs to this activity (each call may remove links)
  4908. for (unsigned i=0; i < info.balancedLinks.ordinality(); i++)
  4909. {
  4910. CSplitterLink & cur = info.balancedLinks.item(i);
  4911. if (cur.hasSink(info.original))
  4912. optimizeIndependentLinks(connections, *queryResourceInfo(cur.querySource()));
  4913. }
  4914. }
  4915. void EclResourcer::optimizeConditionalLinks(CSplitterInfo & connections)
  4916. {
  4917. //MORE: IF() can be special cased. If it creates two identical links then one of them can be removed
  4918. //Implement by post processing the links and removing duplicates
  4919. ForEachItemIn(i, connections.sinks)
  4920. {
  4921. IHqlExpression & cur = connections.sinks.item(i);
  4922. ResourcerInfo * info = queryResourceInfo(&cur);
  4923. optimizeIndependentLinks(connections, *info);
  4924. }
  4925. }
  4926. void EclResourcer::walkPotentialSplitters(CSplitterInfo & connections)
  4927. {
  4928. ForEachItemIn(i, connections.sinks)
  4929. {
  4930. IHqlExpression & cur = connections.sinks.item(i);
  4931. ResourcerInfo * info = queryResourceInfo(&cur);
  4932. if (!info->finishedWalkingSplitters())
  4933. {
  4934. IHqlExpression * problem = walkPotentialSplitterLinks(connections, &cur, NULL);
  4935. assertex(!problem);
  4936. }
  4937. }
  4938. }
  4939. void EclResourcer::extractSharedInputs(CSplitterInfo & connections, ResourceGraphInfo & graph)
  4940. {
  4941. ForEachItemIn(i, connections.externalSources)
  4942. {
  4943. IHqlExpression & cur = connections.externalSources.item(i);
  4944. ResourcerInfo * info = queryResourceInfo(&cur);
  4945. if (connections.isBalancedSplitter(&cur))
  4946. {
  4947. //Add two entries for compatibility with old code.
  4948. graph.balancedExternalSources.append(*LINK(cur.queryBody()));
  4949. graph.balancedExternalSources.append(*LINK(cur.queryBody()));
  4950. }
  4951. }
  4952. }
  4953. void EclResourcer::spotUnbalancedSplitters(const HqlExprArray & exprs)
  4954. {
  4955. switch (targetClusterType)
  4956. {
  4957. case HThorCluster:
  4958. break;
  4959. case ThorLCRCluster:
  4960. {
  4961. //Thor only handles one graph at a time, so only walk expressions within a single graph.
  4962. ForEachItemIn(i1, graphs)
  4963. {
  4964. ResourceGraphInfo & curGraph = graphs.item(i1);
  4965. CSplitterInfo info(*this, true, true);
  4966. nextPass();
  4967. ForEachItemIn(i2, curGraph.sinks)
  4968. {
  4969. ResourceGraphLink & cur = curGraph.sinks.item(i2);
  4970. info.gatherPotentialSplitters(cur.sourceNode, NULL, &curGraph, false);
  4971. }
  4972. optimizeConditionalLinks(info);
  4973. walkPotentialSplitters(info);
  4974. extractSharedInputs(info, curGraph);
  4975. }
  4976. }
  4977. break;
  4978. case RoxieCluster:
  4979. {
  4980. //Roxie pulls all at once, so need to analyse globally.
  4981. CSplitterInfo info(*this, false, false);
  4982. nextPass();
  4983. ForEachItemIn(i2, exprs)
  4984. info.gatherPotentialSplitters(&exprs.item(i2), NULL, NULL, false);
  4985. optimizeConditionalLinks(info);
  4986. walkPotentialSplitters(info);
  4987. //no splitters from reading at start of a subgraph
  4988. break;
  4989. }
  4990. }
  4991. }
  4992. //------------------------------------------------------------------------------------------
  4993. // PASS6: Merge sub graphs that can share resources and don't have dependencies
  4994. // MORE: Once sources are merged, should try merging between trees.
  4995. static bool conditionsMatch(const HqlExprArray & left, const HqlExprArray & right)
  4996. {
  4997. if (left.ordinality() != right.ordinality())
  4998. return false;
  4999. ForEachItemIn(i, left)
  5000. {
  5001. if (!left.contains(right.item(i)) || !right.contains(left.item(i)))
  5002. return false;
  5003. }
  5004. return true;
  5005. }
  5006. bool EclResourcer::queryMergeGraphLink(ResourceGraphLink & link)
  5007. {
  5008. if (link.linkKind == UnconditionalLink)
  5009. {
  5010. //Don't combine any dependencies
  5011. const GraphLinkArray & sinks = link.sourceGraph->sinks;
  5012. ForEachItemIn(i1, sinks)
  5013. {
  5014. ResourceGraphLink & cur = sinks.item(i1);
  5015. if (cur.sinkGraph && cur.sourceNode->isAction())
  5016. return false;
  5017. }
  5018. //Roxie pulls all subgraphs at same time, so no problem with conditional links since handled at run time.
  5019. if (options.noConditionalLinks)
  5020. return true;
  5021. //No conditionals in the sink graph=>will be executed just as frequently
  5022. if (!link.sinkGraph->mergedConditionSource)
  5023. return true;
  5024. //Is this the only place this source graph is used? If so, always fine to merge
  5025. if (sinks.ordinality() == 1)
  5026. return true;
  5027. //1) if context the source graph is being merged into is unconditional, then it is ok [ could have conditional and unconditional paths to same graph]
  5028. //2) if context is conditional, then we don't really want to do it unless the conditions on all sinks are identical, and the only links occur between these two graphs.
  5029. // (situation occurs with spill fed into two branches of a join).
  5030. bool isConditionalInSinkGraph = false;
  5031. bool accessedFromManyGraphs = false;
  5032. ForEachItemIn(i, sinks)
  5033. {
  5034. ResourceGraphLink & cur = sinks.item(i);
  5035. if (cur.sinkNode)
  5036. {
  5037. if (cur.sinkGraph != link.sinkGraph)
  5038. accessedFromManyGraphs = true;
  5039. else
  5040. {
  5041. if (!isConditionalInSinkGraph)
  5042. {
  5043. ResourcerInfo * sinkInfo = queryResourceInfo(cur.sinkNode);
  5044. //If this is conditional, don't merge if there is a link to another graph
  5045. if ((!cur.isDependency() && sinkInfo->isConditionExpr()) ||
  5046. //if (sinkInfo->isConditionExpr() ||
  5047. (!sinkInfo->isUnconditional() && sinkInfo->conditions.ordinality()))
  5048. isConditionalInSinkGraph = true;
  5049. }
  5050. }
  5051. }
  5052. }
  5053. if (isConditionalInSinkGraph && accessedFromManyGraphs)
  5054. return false;
  5055. return true;
  5056. }
  5057. return false;
  5058. }
  5059. bool EclResourcer::checkAlreadyVisited(ResourcerInfo * info)
  5060. {
  5061. if (info->lastPass == thisPass)
  5062. return true;
  5063. info->lastPass = thisPass;
  5064. return false;
  5065. }
  5066. unsigned EclResourcer::getMaxDepth() const
  5067. {
  5068. unsigned maxDepth = 0;
  5069. for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
  5070. {
  5071. unsigned depth = graphs.item(idx).getDepth();
  5072. if (depth > maxDepth)
  5073. maxDepth = depth;
  5074. }
  5075. return maxDepth;
  5076. }
  5077. void EclResourcer::mergeSubGraphs(unsigned pass)
  5078. {
  5079. unsigned maxDepth = getMaxDepth();
  5080. for (unsigned curDepth = maxDepth+1; curDepth-- != 0;)
  5081. {
  5082. mergeAgain:
  5083. for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
  5084. {
  5085. ResourceGraphInfo & cur = graphs.item(idx);
  5086. if ((cur.getDepth() == curDepth) && !cur.isDead)
  5087. {
  5088. bool tryAgain;
  5089. do
  5090. {
  5091. tryAgain = false;
  5092. for (unsigned idxSource = 0; idxSource < cur.sources.ordinality(); /*incremented in loop*/)
  5093. {
  5094. ResourceGraphLink & curLink = cur.sources.item(idxSource);
  5095. ResourceGraphInfo * source = curLink.sourceGraph;
  5096. IHqlExpression * sourceNode = curLink.sourceNode;
  5097. bool tryToMerge;
  5098. bool expandSourceInPlace = queryResourceInfo(sourceNode)->expandRatherThanSpill(false);
  5099. if (pass == 0)
  5100. tryToMerge = !expandSourceInPlace;
  5101. else
  5102. tryToMerge = expandSourceInPlace;
  5103. if (tryToMerge)
  5104. {
  5105. bool ok = true;
  5106. bool mergeInSpillOutput = false;
  5107. ResourcerInfo * sourceResourceInfo = queryResourceInfo(sourceNode);
  5108. IHqlExpression * sourceSpillOutput = sourceResourceInfo->queryOutputSpillFile();
  5109. if (sourceSpillOutput)
  5110. {
  5111. if (targetClusterType == HThorCluster)
  5112. {
  5113. if (curLink.sinkNode != sourceSpillOutput)
  5114. ok = false;
  5115. }
  5116. //If a dataset is being spilled to be read from a child expression etc. it is going to
  5117. //be more efficient to use that spilled expression => try and force it to merge
  5118. if (curLink.sinkNode == sourceSpillOutput)
  5119. mergeInSpillOutput = true;
  5120. }
  5121. if (sequential && source->containsActionSink())
  5122. ok = false;
  5123. unsigned curSourceDepth = source->getDepth();
  5124. //MORE: Merging identical conditionals?
  5125. if (ok && queryMergeGraphLink(curLink) &&
  5126. !sourceResourceInfo->expandRatherThanSplit() &&
  5127. cur.mergeInSource(*source, *resourceLimit, mergeInSpillOutput))
  5128. {
  5129. //NB: Following cannot remove sources below the current index.
  5130. replaceGraphReferences(source, &cur);
  5131. source->isDead = true;
  5132. #ifdef VERIFY_RESOURCING
  5133. checkRecursion(&cur);
  5134. #endif
  5135. unsigned newDepth = cur.getDepth();
  5136. //Unusual: The source we are merging with has just increased in depth, so any
  5137. //dependents have also increased in depth. Need to try again at different depth
  5138. //to see if one of those will merge in.
  5139. if (newDepth > curSourceDepth)
  5140. {
  5141. curDepth += (newDepth - curSourceDepth);
  5142. goto mergeAgain;
  5143. }
  5144. //depth of this element has changed, so don't check to see if it merges with any other
  5145. //sources on this iteration.
  5146. if (newDepth != curDepth)
  5147. {
  5148. tryAgain = false;
  5149. break;
  5150. }
  5151. tryAgain = true;
  5152. }
  5153. else
  5154. idxSource++;
  5155. }
  5156. else
  5157. idxSource++;
  5158. }
  5159. } while (tryAgain);
  5160. }
  5161. }
  5162. }
  5163. }
  5164. void EclResourcer::mergeSiblings()
  5165. {
  5166. unsigned maxDepth = getMaxDepth();
  5167. for (unsigned curDepth = maxDepth+1; curDepth-- != 0;)
  5168. {
  5169. for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
  5170. {
  5171. ResourceGraphInfo & cur = graphs.item(idx);
  5172. if ((cur.getDepth() == curDepth) && !cur.isDead)
  5173. {
  5174. if (cur.containsActionSink())
  5175. {
  5176. if (!cur.canMergeActionAsSibling(sequential))
  5177. continue;
  5178. }
  5179. ForEachItemIn(idxSource, cur.sources)
  5180. {
  5181. ResourceGraphLink & curLink = cur.sources.item(idxSource);
  5182. ResourceGraphInfo * source = curLink.sourceGraph;
  5183. IHqlExpression * sourceNode = curLink.sourceNode;
  5184. ResourcerInfo * sourceInfo = queryResourceInfo(sourceNode);
  5185. if (sourceInfo->neverSplit || sourceInfo->expandRatherThanSplit())
  5186. continue;
  5187. for (unsigned iSink = 0; iSink < source->sinks.ordinality(); )
  5188. {
  5189. ResourceGraphLink & secondLink = source->sinks.item(iSink);
  5190. ResourceGraphInfo * sink = secondLink.sinkGraph;
  5191. bool ok = false;
  5192. if (sink && (sink != &cur) && !sink->isDead && sourceNode->queryBody() == secondLink.sourceNode->queryBody())
  5193. {
  5194. ok = true;
  5195. if (sequential && !sink->canMergeActionAsSibling(sequential))
  5196. ok = false;
  5197. if (ok && cur.mergeInSibling(*sink, *resourceLimit))
  5198. {
  5199. //NB: Following cannot remove sources below the current index.
  5200. replaceGraphReferences(sink, &cur);
  5201. sink->isDead = true;
  5202. }
  5203. else
  5204. iSink++;
  5205. }
  5206. else
  5207. iSink++;
  5208. }
  5209. }
  5210. }
  5211. }
  5212. }
  5213. }
  5214. void EclResourcer::mergeSubGraphs()
  5215. {
  5216. for (unsigned pass=0; pass < 2; pass++)
  5217. mergeSubGraphs(pass);
  5218. if (options.combineSiblings)
  5219. mergeSiblings();
  5220. ForEachItemInRev(idx2, graphs)
  5221. {
  5222. if (graphs.item(idx2).isDead)
  5223. graphs.remove(idx2);
  5224. }
  5225. //If there is a link setting info->queryOutputSpillFile() which hasn't been merged, then need to clear
  5226. //otherwise you will get an internal error
  5227. ForEachItemIn(iLink, links)
  5228. {
  5229. ResourceGraphLink & curLink = links.item(iLink);
  5230. ResourcerInfo * sourceInfo = queryResourceInfo(curLink.sourceNode);
  5231. IHqlExpression * outputFile = sourceInfo->queryOutputSpillFile();
  5232. if (outputFile && outputFile == curLink.sinkNode)
  5233. sourceInfo->setPotentialSpillFile(NULL);
  5234. }
  5235. }
  5236. //------------------------------------------------------------------------------------------
  5237. // PASS7: Optimize aggregates off of splitters into through aggregates.
  5238. bool EclResourcer::optimizeAggregate(IHqlExpression * expr)
  5239. {
  5240. if (!isSimpleAggregateResult(expr))
  5241. return false;
  5242. IHqlExpression * row2ds = expr->queryChild(0); // no_datasetfromrow
  5243. IHqlExpression * selectNth = row2ds->queryChild(0);
  5244. ResourcerInfo * row2dsInfo = queryResourceInfo(row2ds);
  5245. //If more than one set result for the same aggregate don't merge the aggregation because
  5246. //it messes up the internal count. Should really fix it and do multiple stores in the
  5247. //through aggregate.
  5248. if (row2dsInfo->numInternalUses() > 1)
  5249. return false;
  5250. //Be careful not to lose any spills...
  5251. if (row2dsInfo->numExternalUses)
  5252. return false;
  5253. ResourcerInfo * selectNthInfo = queryResourceInfo(selectNth);
  5254. if (!selectNthInfo || selectNthInfo->numExternalUses)
  5255. return false;
  5256. IHqlExpression * aggregate = selectNth->queryChild(0); // no_newaggregate
  5257. IHqlExpression * parent = aggregate->queryChild(0);
  5258. ResourcerInfo * info = queryResourceInfo(parent);
  5259. if (info->numInternalUses() <= 1)
  5260. return false;
  5261. //ok, we can go ahead and merge.
  5262. info->aggregates.append(*LINK(expr));
  5263. // info->numExternalUses--;
  5264. // info->numUses--;
  5265. return true;
  5266. }
  5267. void EclResourcer::optimizeAggregates()
  5268. {
  5269. for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
  5270. {
  5271. ResourceGraphInfo & cur = graphs.item(idx);
  5272. for (unsigned idxSink = 0; idxSink < cur.sinks.ordinality(); /*incremented in loop*/)
  5273. {
  5274. ResourceGraphLink & link = cur.sinks.item(idxSink);
  5275. if ((link.sinkGraph == NULL) && optimizeAggregate(link.sourceNode))
  5276. cur.sinks.remove(idxSink);
  5277. else
  5278. idxSink++;
  5279. }
  5280. }
  5281. }
  5282. //------------------------------------------------------------------------------------------
  5283. // PASS8: Improve efficiency by merging the split points slightly
  5284. IHqlExpression * EclResourcer::findPredecessor(IHqlExpression * expr, IHqlExpression * search, IHqlExpression * prev)
  5285. {
  5286. if (expr == search)
  5287. return prev;
  5288. ResourcerInfo * info = queryResourceInfo(expr);
  5289. if (info && info->containsActivity)
  5290. {
  5291. unsigned first = getFirstActivityArgument(expr);
  5292. unsigned last = first + getNumActivityArguments(expr);
  5293. for (unsigned idx=first; idx < last; idx++)
  5294. {
  5295. IHqlExpression * match = findPredecessor(expr->queryChild(idx), search, expr);
  5296. if (match)
  5297. return match;
  5298. }
  5299. }
  5300. return NULL;
  5301. }
  5302. IHqlExpression * EclResourcer::findPredecessor(ResourcerInfo * search)
  5303. {
  5304. ForEachItemIn(idx, links)
  5305. {
  5306. ResourceGraphLink & cur = links.item(idx);
  5307. if (cur.sourceGraph == search->graph)
  5308. {
  5309. IHqlExpression * match = findPredecessor(cur.sourceNode, search->original, NULL);
  5310. if (match)
  5311. return match;
  5312. }
  5313. }
  5314. return NULL;
  5315. }
  5316. void EclResourcer::moveExternalSpillPoints()
  5317. {
  5318. if (options.minimizeSpillSize == 0)
  5319. return;
  5320. //if we have a external spill point where all the external outputs reduce their data significantly
  5321. //either via a project or a filter, then it might be worth including those activities in the main
  5322. //graph and ,if all external children reduce data, then may be best to filter
  5323. ForEachItemIn(idx, links)
  5324. {
  5325. ResourceGraphLink & cur = links.item(idx);
  5326. if ((cur.linkKind == UnconditionalLink) && cur.sinkGraph)
  5327. {
  5328. while (lightweightAndReducesDatasetSize(cur.sinkNode))
  5329. {
  5330. ResourcerInfo * sourceInfo = queryResourceInfo(cur.sourceNode);
  5331. if (!sourceInfo->isExternalSpill() || (sourceInfo->numExternalUses > options.minimizeSpillSize))
  5332. break;
  5333. ResourcerInfo * sinkInfo = queryResourceInfo(cur.sinkNode);
  5334. if (sinkInfo->numInternalUses() != 1)
  5335. break;
  5336. IHqlExpression * sinkPred = findPredecessor(sinkInfo);
  5337. sinkInfo->graph.set(cur.sourceGraph);
  5338. sourceInfo->numExternalUses--;
  5339. sinkInfo->numExternalUses++;
  5340. cur.sourceNode.set(cur.sinkNode);
  5341. cur.sinkNode.set(sinkPred);
  5342. }
  5343. }
  5344. }
  5345. }
  5346. //------------------------------------------------------------------------------------------
  5347. // PASS9: Create a new expression tree representing the information
  5348. static IHqlExpression * getScalarReplacement(CChildDependent & cur, ResourcerInfo * hoistedInfo, IHqlExpression * replacement)
  5349. {
  5350. //First skip any wrappers which are there to cause things to be hoisted.
  5351. IHqlExpression * value = skipScalarWrappers(cur.original);
  5352. //Now modify the spilled result depending on how the spilled result was created (see EclHoistLocator::noteScalar() above)
  5353. if (value->getOperator() == no_select)
  5354. {
  5355. bool isNew;
  5356. IHqlExpression * ds = querySelectorDataset(value, isNew);
  5357. if(isNew || ds->isDatarow())
  5358. {
  5359. if (cur.hoisted != cur.projectedHoisted)
  5360. {
  5361. assertex(cur.projected);
  5362. unsigned match = hoistedInfo->projected.find(*cur.projected);
  5363. assertex(match != NotFound);
  5364. IHqlExpression * projectedRecord = cur.projectedHoisted->queryRecord();
  5365. IHqlExpression * projectedField = projectedRecord->queryChild(match);
  5366. return createNewSelectExpr(LINK(replacement), LINK(projectedField));
  5367. }
  5368. return replaceSelectorDataset(value, replacement);
  5369. }
  5370. //Very unusual - can occur when a thisnode(somefield) is extracted from an allnodes.
  5371. //It will have gone through the default case in the noteScalar() code
  5372. }
  5373. else if (value->getOperator() == no_createset)
  5374. {
  5375. IHqlExpression * record = replacement->queryRecord();
  5376. IHqlExpression * field = record->queryChild(0);
  5377. return createValue(no_createset, cur.original->getType(), LINK(replacement), createSelectExpr(LINK(replacement->queryNormalizedSelector()), LINK(field)));
  5378. }
  5379. IHqlExpression * record = replacement->queryRecord();
  5380. return createNewSelectExpr(LINK(replacement), LINK(record->queryChild(0)));
  5381. }
  5382. IHqlExpression * EclResourcer::doCreateResourced(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, bool expandInParent, bool defineSideEffect)
  5383. {
  5384. ResourcerInfo * info = queryResourceInfo(expr);
  5385. node_operator op = expr->getOperator();
  5386. HqlExprArray args;
  5387. bool same = true;
  5388. unsigned first = getFirstActivityArgument(expr);
  5389. unsigned last = first + getNumActivityArguments(expr);
  5390. OwnedHqlExpr transformed;
  5391. switch (op)
  5392. {
  5393. case no_if:
  5394. case no_choose:
  5395. case no_chooseds:
  5396. {
  5397. ForEachChild(idx, expr)
  5398. {
  5399. IHqlExpression * child = expr->queryChild(idx);
  5400. IHqlExpression * resourced;
  5401. if ((idx < first) || (idx >= last))
  5402. resourced = LINK(child);
  5403. else
  5404. resourced = createResourced(child, ownerGraph, expandInParent, false);
  5405. if (child != resourced)
  5406. same = false;
  5407. args.append(*resourced);
  5408. }
  5409. break;
  5410. }
  5411. case no_case:
  5412. case no_map:
  5413. UNIMPLEMENTED;
  5414. case no_keyed:
  5415. return LINK(expr);
  5416. case no_compound:
  5417. if (options.convertCompoundToExecuteWhen)
  5418. {
  5419. //NB: Arguments to no_executewhen are the reverse of no_compound.
  5420. args.append(*createResourced(expr->queryChild(1), ownerGraph, expandInParent, false));
  5421. args.append(*createResourced(expr->queryChild(0), ownerGraph, expandInParent, false));
  5422. transformed.setown(createDataset(no_executewhen, args));
  5423. }
  5424. else
  5425. transformed.setown(createResourced(expr->queryChild(1), ownerGraph, expandInParent, false));
  5426. break;
  5427. case no_executewhen:
  5428. {
  5429. args.append(*createResourced(expr->queryChild(0), ownerGraph, expandInParent, false));
  5430. args.append(*createResourced(expr->queryChild(1), ownerGraph, expandInParent, false));
  5431. assertex(args.item(1).getOperator() == no_callsideeffect);
  5432. unwindChildren(args, expr, 2);
  5433. same = false;
  5434. break;
  5435. }
  5436. case no_select:
  5437. {
  5438. //If this isn't a new selector, then it must be <LEFT|RIGHT>.child-dataset, which will not be mapped
  5439. //and the dataset will not have been resourced
  5440. if (isNewSelector(expr))
  5441. {
  5442. IHqlExpression * ds = expr->queryChild(0);
  5443. OwnedHqlExpr newDs = createResourced(ds, ownerGraph, expandInParent, false);
  5444. if (ds != newDs)
  5445. {
  5446. args.append(*LINK(newDs));
  5447. unwindChildren(args, expr, 1);
  5448. if (!expr->hasAttribute(newAtom) && (newDs->getOperator() != no_select))
  5449. args.append(*LINK(queryNewSelectAttrExpr()));
  5450. same = false;
  5451. }
  5452. }
  5453. break;
  5454. }
  5455. default:
  5456. {
  5457. IHqlExpression * activeTable = NULL;
  5458. // Check to see if the activity has a dataset which is in scope for the rest of its arguments.
  5459. // If so we'll need to remap references from the children.
  5460. if (hasActiveTopDataset(expr) && (first != last))
  5461. activeTable = expr->queryChild(0);
  5462. ForEachChild(idx, expr)
  5463. {
  5464. IHqlExpression * child = expr->queryChild(idx);
  5465. IHqlExpression * resourced;
  5466. if ((idx < first) || (idx >= last))
  5467. {
  5468. LinkedHqlExpr mapped = child;
  5469. if (activeTable && isAffectedByResourcing(child))
  5470. {
  5471. IHqlExpression * activeTableTransformed = &args.item(0);
  5472. if (activeTable != activeTableTransformed)
  5473. mapped.setown(scopedReplaceSelector(child, activeTable, activeTableTransformed));
  5474. }
  5475. resourced = mapped.getClear();
  5476. }
  5477. else
  5478. resourced = createResourced(child, ownerGraph, expandInParent, false);
  5479. if (child != resourced)
  5480. same = false;
  5481. args.append(*resourced);
  5482. }
  5483. }
  5484. break;
  5485. }
  5486. if (!transformed)
  5487. transformed.setown(same ? LINK(expr) : expr->clone(args));
  5488. if (!expandInParent)
  5489. {
  5490. if (!transformed->isAction())
  5491. transformed.setown(info->createTransformedExpr(transformed));
  5492. else if (defineSideEffect)
  5493. transformed.setown(createValue(no_definesideeffect, LINK(transformed), createUniqueId()));
  5494. }
  5495. return transformed.getClear();
  5496. }
  5497. /*
  5498. Need to be careful because result should not reuse the same expression tree unless that element is a splitter.
  5499. createResourced()
  5500. {
  5501. if (!isActivity)
  5502. if (!containsActivity)
  5503. replace any refs to the activeTable with whatever it has been mapped to
  5504. else
  5505. recurse
  5506. else if (!isDefinedInSameGraph)
  5507. expand/create reader
  5508. set active table
  5509. else isSplitter and alreadyGeneratedForThisGraph
  5510. return previous result
  5511. else
  5512. create transformed
  5513. }
  5514. */
  5515. void EclResourcer::doCheckRecursion(ResourceGraphInfo * graph, PointerArray & visited)
  5516. {
  5517. visited.append(graph);
  5518. ForEachItemIn(idxD, graph->dependsOn)
  5519. checkRecursion(graph->dependsOn.item(idxD).sourceGraph, visited);
  5520. ForEachItemIn(idxS, graph->sources)
  5521. checkRecursion(graph->sources.item(idxS).sourceGraph, visited);
  5522. visited.pop();
  5523. }
  5524. void EclResourcer::checkRecursion(ResourceGraphInfo * graph, PointerArray & visited)
  5525. {
  5526. if (visited.find(graph) != NotFound)
  5527. throwUnexpected();
  5528. doCheckRecursion(graph, visited);
  5529. }
  5530. void EclResourcer::checkRecursion(ResourceGraphInfo * graph)
  5531. {
  5532. PointerArray visited;
  5533. doCheckRecursion(graph, visited);
  5534. }
  5535. IHqlExpression * EclResourcer::createResourced(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, bool expandInParent, bool defineSideEffect)
  5536. {
  5537. ResourcerInfo * info = queryResourceInfo(expr);
  5538. if (!info || !info->containsActivity)
  5539. return LINK(expr);
  5540. if (!info->isActivity)
  5541. {
  5542. assertex(!defineSideEffect);
  5543. HqlExprArray args;
  5544. bool same = true;
  5545. ForEachChild(idx, expr)
  5546. {
  5547. IHqlExpression * cur = expr->queryChild(idx);
  5548. IHqlExpression * curResourced = createResourced(cur, ownerGraph, expandInParent, false);
  5549. args.append(*curResourced);
  5550. if (cur != curResourced)
  5551. same = false;
  5552. }
  5553. if (same)
  5554. return LINK(expr);
  5555. return expr->clone(args);
  5556. }
  5557. if (info->graph != ownerGraph)
  5558. {
  5559. assertex(!defineSideEffect);
  5560. bool isShared = options.optimizeSharedInputs && ownerGraph && ownerGraph->isSharedInput(expr);
  5561. if (isShared)
  5562. {
  5563. IHqlExpression * mapped = ownerGraph->queryMappedSharedInput(expr->queryBody());
  5564. if (mapped)
  5565. return LINK(mapped);
  5566. }
  5567. IHqlExpression * source;
  5568. if (info->expandRatherThanSpill(true))
  5569. {
  5570. bool expandChildInParent = options.minimiseSpills ? expandInParent : true;
  5571. OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandChildInParent, false);
  5572. if (queryAddUniqueToActivity(resourced))
  5573. source = appendUniqueAttr(resourced);
  5574. else
  5575. source = LINK(resourced);
  5576. }
  5577. else
  5578. {
  5579. if (!expr->isAction())
  5580. {
  5581. OwnedHqlExpr reason;
  5582. if (ownerGraph && options.checkResources())
  5583. {
  5584. StringBuffer reasonText;
  5585. ownerGraph->getMergeFailReason(reasonText, info->graph, *resourceLimit);
  5586. if (reasonText.length())
  5587. {
  5588. reasonText.insert(0, "Resource limit spill: ");
  5589. reason.setown(createAttribute(_spillReason_Atom, createConstant(reasonText.str())));
  5590. }
  5591. }
  5592. source = info->createSpilledRead(reason);
  5593. }
  5594. else
  5595. {
  5596. IHqlExpression * transformed = info->queryTransformed();
  5597. if (transformed->getOperator() == no_definesideeffect)
  5598. {
  5599. IHqlExpression * uid = transformed->queryAttribute(_uid_Atom);
  5600. assertex(uid);
  5601. source = createValue(no_callsideeffect, makeVoidType(), LINK(uid));
  5602. }
  5603. else
  5604. source = LINK(transformed);
  5605. }
  5606. }
  5607. if (isShared)
  5608. {
  5609. if (!source->isAction())
  5610. {
  5611. if (source->isDataset())
  5612. source = createDatasetF(no_split, source, createAttribute(balancedAtom), createUniqueId(), NULL);
  5613. else
  5614. source = createRowF(no_split, source, createAttribute(balancedAtom), createUniqueId(), NULL);
  5615. ownerGraph->addSharedInput(expr->queryBody(), source);
  5616. }
  5617. }
  5618. return source;
  5619. }
  5620. if (!expandInParent && info->queryTransformed() && info->isSplit())
  5621. {
  5622. return LINK(info->queryTransformed());
  5623. }
  5624. OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandInParent, defineSideEffect);
  5625. if (queryAddUniqueToActivity(resourced))// && !resourced->hasAttribute(_internal_Atom))
  5626. resourced.setown(appendUniqueAttr(resourced));
  5627. if (!expandInParent)
  5628. {
  5629. info->setTransformed(resourced);
  5630. }
  5631. return resourced.getClear();
  5632. }
  5633. void EclResourcer::createResourced(ResourceGraphInfo * graph, HqlExprArray & transformed)
  5634. {
  5635. if (graph->createdGraph || graph->isDead)
  5636. return;
  5637. if (!graph->containsActiveSinks() && (!graph->hasConditionSource))
  5638. return;
  5639. #ifdef VERIFY_RESOURCING
  5640. checkRecursion(graph);
  5641. #endif
  5642. // DBGLOG("Prepare to CreateResourced(%p)", graph);
  5643. if (graph->startedGeneratingResourced)
  5644. throwError1(HQLWRN_RecursiveDependendencies, "");
  5645. graph->startedGeneratingResourced = true;
  5646. ForEachItemIn(idxD, graph->dependsOn)
  5647. createResourced(graph->dependsOn.item(idxD).sourceGraph, transformed);
  5648. ForEachItemIn(idxS, graph->sources)
  5649. createResourced(graph->sources.item(idxS).sourceGraph, transformed);
  5650. // DBGLOG("Create resourced %p", graph);
  5651. //First generate the graphs for all the unconditional sinks
  5652. HqlExprArray args;
  5653. ForEachItemIn(idx, graph->sinks)
  5654. {
  5655. ResourceGraphLink & sink = graph->sinks.item(idx);
  5656. IHqlExpression * sinkNode = sink.sourceNode;
  5657. ResourcerInfo * info = queryResourceInfo(sinkNode);
  5658. //If graph is unconditional, any condition sinks are forced to be generated (and spilt)
  5659. if (!info->queryTransformed())
  5660. {
  5661. // if it is a spiller, then it will be generated from another sink
  5662. if (!info->isExternalSpill())
  5663. {
  5664. IHqlExpression * resourced = createResourced(sinkNode, graph, false, sinkNode->isAction() && sink.sinkGraph);
  5665. assertex(info->queryTransformed());
  5666. args.append(*resourced);
  5667. }
  5668. }
  5669. }
  5670. ForEachItemIn(i2, graph->sinks)
  5671. {
  5672. ResourceGraphLink & sink = graph->sinks.item(i2);
  5673. IHqlExpression * sinkNode = sink.sourceNode;
  5674. ResourcerInfo * info = queryResourceInfo(sinkNode);
  5675. IHqlExpression * splitter = info->splitterOutput;
  5676. if (splitter && !args.contains(*splitter))
  5677. args.append(*LINK(splitter));
  5678. }
  5679. if (args.ordinality() == 0)
  5680. graph->isDead = true;
  5681. else
  5682. {
  5683. if (options.useGraphResults || options.alwaysUseGraphResults)
  5684. args.append(*createAttribute(childAtom));
  5685. graph->createdGraph.setown(createValue(no_subgraph, makeVoidType(), args));
  5686. transformed.append(*LINK(graph->createdGraph));
  5687. }
  5688. }
  5689. void EclResourcer::inheritRedundantDependencies(ResourceGraphInfo * thisGraph)
  5690. {
  5691. if (thisGraph->inheritedExpandedDependencies)
  5692. return;
  5693. thisGraph->inheritedExpandedDependencies = true;
  5694. ForEachItemIn(idx, thisGraph->sources)
  5695. {
  5696. ResourceGraphLink & cur = thisGraph->sources.item(idx);
  5697. if (cur.isRedundantLink())
  5698. {
  5699. inheritRedundantDependencies(cur.sourceGraph);
  5700. ForEachItemIn(i, cur.sourceGraph->dependsOn)
  5701. {
  5702. ResourceGraphLink & curDepend = cur.sourceGraph->dependsOn.item(i);
  5703. ResourceGraphLink * link = new ResourceGraphDependencyLink(curDepend.sourceGraph, curDepend.sourceNode, thisGraph, cur.sinkNode, cur.queryDependency());
  5704. thisGraph->dependsOn.append(*link);
  5705. links.append(*link);
  5706. }
  5707. }
  5708. }
  5709. }
  5710. void EclResourcer::createResourced(HqlExprArray & transformed)
  5711. {
  5712. //Before removing null links (e.g., where the source graph is expanded inline), need to make sure
  5713. //dependencies are cloned, otherwise graphs can be generated in the wrong order
  5714. ForEachItemIn(idx1, graphs)
  5715. inheritRedundantDependencies(&graphs.item(idx1));
  5716. ForEachItemInRev(idx2, links)
  5717. {
  5718. ResourceGraphLink & cur = links.item(idx2);
  5719. if (cur.isRedundantLink())
  5720. removeLink(cur, true);
  5721. }
  5722. ForEachItemIn(idx3, graphs)
  5723. createResourced(&graphs.item(idx3), transformed);
  5724. }
  5725. static int compareGraphDepth(CInterface * const * _l, CInterface * const * _r)
  5726. {
  5727. ResourceGraphInfo * l = (ResourceGraphInfo *)*_l;
  5728. ResourceGraphInfo * r = (ResourceGraphInfo *)*_r;
  5729. return l->getDepth() - r->getDepth();
  5730. }
  5731. static int compareLinkDepth(CInterface * const * _l, CInterface * const * _r)
  5732. {
  5733. ResourceGraphLink * l = (ResourceGraphLink *)*_l;
  5734. ResourceGraphLink * r = (ResourceGraphLink *)*_r;
  5735. int diff = l->sourceGraph->getDepth() - r->sourceGraph->getDepth();
  5736. if (diff) return diff;
  5737. if (l->sinkGraph)
  5738. if (r->sinkGraph)
  5739. return l->sinkGraph->getDepth() - r->sinkGraph->getDepth();
  5740. else
  5741. return -1;
  5742. else
  5743. if (r->sinkGraph)
  5744. return +1;
  5745. else
  5746. return 0;
  5747. }
  5748. void EclResourcer::display(StringBuffer & out)
  5749. {
  5750. CIArrayOf<ResourceGraphInfo> sortedGraphs;
  5751. ForEachItemIn(j1, graphs)
  5752. sortedGraphs.append(OLINK(graphs.item(j1)));
  5753. sortedGraphs.sort(compareGraphDepth);
  5754. out.append("Graphs:\n");
  5755. ForEachItemIn(i, sortedGraphs)
  5756. {
  5757. ResourceGraphInfo & cur = sortedGraphs.item(i);
  5758. out.appendf("%d: depth(%d) uncond(%d) cond(%d) %s {%p}\n", i, cur.getDepth(), cur.isUnconditional, cur.hasConditionSource, cur.isDead ? "dead" : "", &cur);
  5759. ForEachItemIn(j, cur.sources)
  5760. {
  5761. ResourceGraphLink & link = cur.sources.item(j);
  5762. out.appendf(" Source: %p %s\n", link.sinkNode.get(), getOpString(link.sinkNode->getOperator()));
  5763. }
  5764. ForEachItemIn(k, cur.sinks)
  5765. {
  5766. ResourceGraphLink & link = cur.sinks.item(k);
  5767. IHqlExpression * sourceNode = link.sourceNode;
  5768. ResourcerInfo * sourceInfo = queryResourceInfo(sourceNode);
  5769. out.appendf(" Sink: %p %s cond(%d,%d) int(%d) ext(%d)\n", sourceNode, getOpString(sourceNode->getOperator()), sourceInfo->conditions.ordinality(), sourceInfo->conditionSourceCount, sourceInfo->numInternalUses(), sourceInfo->numExternalUses);
  5770. }
  5771. ForEachItemIn(i3, links)
  5772. {
  5773. ResourceGraphLink & curLink = links.item(i3);
  5774. if ((curLink.sourceGraph == &cur) && curLink.queryDependency())
  5775. {
  5776. StringBuffer s;
  5777. toECL(curLink.queryDependency(), s);
  5778. out.appendf(" Creates: %s\n", s.str());
  5779. }
  5780. }
  5781. }
  5782. out.append("Links:\n");
  5783. CIArrayOf<ResourceGraphLink> sortedLinks;
  5784. ForEachItemIn(j2, links)
  5785. sortedLinks.append(OLINK(links.item(j2)));
  5786. sortedLinks.sort(compareLinkDepth);
  5787. ForEachItemIn(i2, sortedLinks)
  5788. {
  5789. ResourceGraphLink & link = sortedLinks.item(i2);
  5790. unsigned len = out.length();
  5791. out.appendf(" Source: %d %s", (unsigned)sortedGraphs.find(*link.sourceGraph), getOpString(link.sourceNode->getOperator()));
  5792. if (link.sinkNode)
  5793. {
  5794. out.padTo(len+30);
  5795. out.appendf(" Sink: %d %s", (unsigned)sortedGraphs.find(*link.sinkGraph), getOpString(link.sinkNode->getOperator()));
  5796. }
  5797. if (link.linkKind == SequenceLink)
  5798. out.append(" <sequence>");
  5799. if (link.isDependency())
  5800. out.append(" <dependency>");
  5801. out.newline();
  5802. }
  5803. }
  5804. void EclResourcer::trace()
  5805. {
  5806. StringBuffer s;
  5807. display(s);
  5808. DBGLOG("%s", s.str());
  5809. }
  5810. //---------------------------------------------------------------------------
  5811. void EclResourcer::resourceGraph(IHqlExpression * expr, HqlExprArray & transformed)
  5812. {
  5813. if (isSequentialActionList(expr))
  5814. setSequential(true);
  5815. HqlExprArray exprs;
  5816. node_operator expandOp = options.isChildQuery ? no_any : no_parallel;
  5817. expandLists(expandOp, exprs, expr);
  5818. //NB: This only resources a single level of queries. SubQueries should be resourced in a separate
  5819. //pass so that commonality between different activities/subgraphs isn't introduced/messed up.
  5820. findSplitPoints(exprs);
  5821. createInitialGraphs(exprs);
  5822. addDependencies(exprs);
  5823. markConditions(exprs);
  5824. if (options.checkResources())
  5825. resourceSubGraphs(exprs);
  5826. #ifdef TRACE_RESOURCING
  5827. trace();
  5828. #endif
  5829. mergeSubGraphs();
  5830. #ifdef TRACE_RESOURCING
  5831. trace();
  5832. #endif
  5833. if (!options.newBalancedSpotter)
  5834. {
  5835. oldSpotUnbalancedSplitters(exprs);
  5836. if (options.optimizeSharedInputs)
  5837. spotSharedInputs();
  5838. }
  5839. else
  5840. spotUnbalancedSplitters(exprs);
  5841. if (options.spotThroughAggregate)
  5842. optimizeAggregates();
  5843. moveExternalSpillPoints();
  5844. createResourced(transformed);
  5845. }
  5846. void EclResourcer::resourceRemoteGraph(IHqlExpression * expr, HqlExprArray & transformed)
  5847. {
  5848. HqlExprArray exprs;
  5849. expandLists(no_any, exprs, expr);
  5850. //NB: This only resources a single level of queries. SubQueries should be resourced in a separate
  5851. //pass so that commonality between different activities/subgraphs isn't introduced/messed up.
  5852. findSplitPoints(exprs);
  5853. createInitialRemoteGraphs(exprs);
  5854. markConditions(exprs);
  5855. addDependencies(exprs);
  5856. #ifdef TRACE_RESOURCING
  5857. trace();
  5858. #endif
  5859. mergeSubGraphs();
  5860. #ifdef TRACE_RESOURCING
  5861. trace();
  5862. #endif
  5863. createResourced(transformed);
  5864. }
  5865. //---------------------------------------------------------------------------
  5866. IHqlExpression * resourceThorGraph(HqlCppTranslator & translator, IHqlExpression * _expr, ClusterType targetClusterType, unsigned clusterSize, IHqlExpression * graphIdExpr)
  5867. {
  5868. CResourceOptions options(targetClusterType, clusterSize, translator.queryOptions(), translator.querySpillSequence());
  5869. if (graphIdExpr)
  5870. options.setNewChildQuery(graphIdExpr, 0);
  5871. LinkedHqlExpr expr = _expr;
  5872. {
  5873. ActivityInvariantHoister hoister(options);
  5874. HqlExprArray hoisted;
  5875. expr.setown(hoister.transformRoot(expr));
  5876. translator.traceExpression("AfterInvariant Child", expr);
  5877. }
  5878. HqlExprArray transformed;
  5879. {
  5880. EclResourcer resourcer(translator.queryErrorProcessor(), translator.wu(), translator.queryOptions(), options);
  5881. resourcer.resourceGraph(expr, transformed);
  5882. }
  5883. hoistNestedCompound(translator, transformed);
  5884. return createActionList(transformed);
  5885. }
  5886. static IHqlExpression * doResourceGraph(HqlCppTranslator & translator, HqlExprCopyArray * activeRows, IHqlExpression * _expr,
  5887. ClusterType targetClusterType, unsigned clusterSize,
  5888. IHqlExpression * graphIdExpr, unsigned numResults, bool isChild, bool useGraphResults, bool unlimitedResources)
  5889. {
  5890. LinkedHqlExpr expr = _expr;
  5891. HqlExprArray transformed;
  5892. unsigned totalResults;
  5893. CResourceOptions options(targetClusterType, clusterSize, translator.queryOptions(), translator.querySpillSequence());
  5894. if (isChild)
  5895. options.setChildQuery(true);
  5896. if (unlimitedResources)
  5897. options.unlimitedResources = true;
  5898. options.setNewChildQuery(graphIdExpr, numResults);
  5899. options.setUseGraphResults(useGraphResults);
  5900. {
  5901. ActivityInvariantHoister hoister(options);
  5902. hoister.tagActiveCursors(activeRows);
  5903. HqlExprArray hoisted;
  5904. expr.setown(hoister.transformRoot(expr));
  5905. translator.traceExpression("AfterInvariant Child", expr);
  5906. }
  5907. {
  5908. EclResourcer resourcer(translator.queryErrorProcessor(), translator.wu(), translator.queryOptions(), options);
  5909. resourcer.tagActiveCursors(activeRows);
  5910. resourcer.resourceGraph(expr, transformed);
  5911. totalResults = resourcer.numGraphResults();
  5912. }
  5913. hoistNestedCompound(translator, transformed);
  5914. if (totalResults == 0)
  5915. totalResults = 1;
  5916. transformed.append(*createAttribute(numResultsAtom, getSizetConstant(totalResults)));
  5917. transformed.append(*LINK(graphIdExpr));
  5918. if (isSequentialActionList(expr))
  5919. transformed.append(*createAttribute(sequentialAtom));
  5920. return createValue(no_subgraph, makeVoidType(), transformed);
  5921. }
  5922. IHqlExpression * resourceLibraryGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize, IHqlExpression * graphIdExpr, unsigned numResults)
  5923. {
  5924. return doResourceGraph(translator, NULL, expr, targetClusterType, clusterSize, graphIdExpr, numResults, false, true, false); //?? what value for isChild (e.g., thor library call). Need to gen twice?
  5925. }
  5926. IHqlExpression * resourceNewChildGraph(HqlCppTranslator & translator, HqlExprCopyArray & activeRows, IHqlExpression * expr, ClusterType targetClusterType, IHqlExpression * graphIdExpr, unsigned numResults)
  5927. {
  5928. return doResourceGraph(translator, &activeRows, expr, targetClusterType, 0, graphIdExpr, numResults, true, true, false);
  5929. }
  5930. IHqlExpression * resourceLoopGraph(HqlCppTranslator & translator, HqlExprCopyArray & activeRows, IHqlExpression * expr, ClusterType targetClusterType, IHqlExpression * graphIdExpr, unsigned numResults, bool insideChildQuery, bool unlimitedResources)
  5931. {
  5932. return doResourceGraph(translator, &activeRows, expr, targetClusterType, 0, graphIdExpr, numResults, insideChildQuery, true, unlimitedResources);
  5933. }
  5934. IHqlExpression * resourceRemoteGraph(HqlCppTranslator & translator, IHqlExpression * _expr, ClusterType targetClusterType, unsigned clusterSize)
  5935. {
  5936. CResourceOptions options(targetClusterType, clusterSize, translator.queryOptions(), translator.querySpillSequence());
  5937. LinkedHqlExpr expr = _expr;
  5938. {
  5939. ActivityInvariantHoister hoister(options);
  5940. HqlExprArray hoisted;
  5941. expr.setown(hoister.transformRoot(expr));
  5942. translator.traceExpression("AfterInvariant Child", expr);
  5943. }
  5944. HqlExprArray transformed;
  5945. {
  5946. EclResourcer resourcer(translator.queryErrorProcessor(), translator.wu(), translator.queryOptions(), options);
  5947. resourcer.resourceRemoteGraph(expr, transformed);
  5948. }
  5949. hoistNestedCompound(translator, transformed);
  5950. return createActionList(transformed);
  5951. }
  5952. /*
  5953. Conditions:
  5954. They are nasty. We process the tree in two passes. First we tag anything which must be evaluated, and
  5955. save a list of condition statements to process later.
  5956. Second pass we tag conditionals.
  5957. a) all left and right branches of a condition are tagged. [conditionSourceCount]
  5958. b) all conditional expressions are tagged with the conditions they are evaluated for.
  5959. [if the condition lists match then it should be possible to merge the graphs]
  5960. c) The spill count for an node should ignore the number of links from conditional graphs,
  5961. but should add the number of conditions.
  5962. d) if (a, b(f1) +b(f2), c) needs to link b twice though!
  5963. */
  5964. /*
  5965. This transformer converts spill activities to no_dataset/no_output, and also converts splitters of splitters into
  5966. a single splitter.
  5967. */
  5968. class SpillActivityTransformer : public NewHqlTransformer
  5969. {
  5970. public:
  5971. SpillActivityTransformer(bool _createGraphResults);
  5972. protected:
  5973. virtual void analyseExpr(IHqlExpression * expr);
  5974. virtual IHqlExpression * createTransformed(IHqlExpression * expr);
  5975. bool isUnbalanced(IHqlExpression * body)
  5976. {
  5977. ANewTransformInfo * info = queryTransformExtra(body);
  5978. return info->spareByte1 != 0;
  5979. }
  5980. void setUnbalanced(IHqlExpression * body)
  5981. {
  5982. ANewTransformInfo * info = queryTransformExtra(body);
  5983. info->spareByte1 = true;
  5984. }
  5985. protected:
  5986. bool createGraphResults;
  5987. };
  5988. static HqlTransformerInfo spillActivityTransformerInfo("SpillActivityTransformer");
  5989. SpillActivityTransformer::SpillActivityTransformer(bool _createGraphResults)
  5990. : NewHqlTransformer(spillActivityTransformerInfo), createGraphResults(_createGraphResults)
  5991. {
  5992. }
  5993. void SpillActivityTransformer::analyseExpr(IHqlExpression * expr)
  5994. {
  5995. IHqlExpression * body = expr->queryBody();
  5996. if (alreadyVisited(body))
  5997. return;
  5998. //If splitters are commoned up ensure unbalanced splitters stay unbalanced.
  5999. if ((body->getOperator() == no_split) && !body->hasAttribute(balancedAtom))
  6000. {
  6001. IHqlExpression * splitter = NULL;
  6002. IHqlExpression * cur = body->queryChild(0);
  6003. for (;;)
  6004. {
  6005. node_operator op = cur->getOperator();
  6006. if (op == no_split)
  6007. splitter = cur;
  6008. else if (op != no_commonspill)
  6009. break;
  6010. cur = cur->queryChild(0);
  6011. }
  6012. if (splitter)
  6013. setUnbalanced(splitter->queryBody());
  6014. }
  6015. NewHqlTransformer::analyseExpr(expr);
  6016. }
  6017. IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * expr)
  6018. {
  6019. IHqlExpression * annotation = queryTransformAnnotation(expr);
  6020. if (annotation)
  6021. return annotation;
  6022. switch (expr->getOperator())
  6023. {
  6024. case no_split:
  6025. {
  6026. OwnedHqlExpr input = transform(expr->queryChild(0));
  6027. if (input->getOperator() == no_split)
  6028. return input.getClear();
  6029. OwnedHqlExpr transformed = NewHqlTransformer::createTransformed(expr);
  6030. if (transformed->hasAttribute(balancedAtom) && isUnbalanced(expr))
  6031. return removeAttribute(transformed, balancedAtom);
  6032. return transformed.getClear();
  6033. }
  6034. case no_writespill:
  6035. {
  6036. HqlExprArray args;
  6037. transformChildren(expr, args);
  6038. if (createGraphResults)
  6039. return createValue(no_setgraphresult, makeVoidType(), args);
  6040. return createValue(no_output, makeVoidType(), args);
  6041. }
  6042. case no_commonspill:
  6043. return transform(expr->queryChild(0));
  6044. case no_readspill:
  6045. {
  6046. OwnedHqlExpr ds = transform(expr->queryChild(0));
  6047. node_operator readOp = createGraphResults ? no_getgraphresult : no_table;
  6048. HqlExprArray args;
  6049. if (!createGraphResults)
  6050. args.append(*transform(expr->queryChild(1)));
  6051. args.append(*LINK(ds->queryRecord()));
  6052. if (createGraphResults)
  6053. args.append(*transform(expr->queryChild(1)));
  6054. ForEachChildFrom(i, expr, 2)
  6055. {
  6056. IHqlExpression * cur = expr->queryChild(i);
  6057. args.append(*transform(cur));
  6058. }
  6059. IHqlExpression * recordCountAttr = queryRecordCountInfo(expr);
  6060. if (recordCountAttr)
  6061. args.append(*LINK(recordCountAttr));
  6062. OwnedHqlExpr ret;
  6063. if (expr->isDatarow())
  6064. ret.setown(createRow(readOp, args));
  6065. else if (expr->isDictionary())
  6066. ret.setown(createDictionary(readOp, args));
  6067. else
  6068. ret.setown(createDataset(readOp, args));
  6069. const bool loseDistribution = false;
  6070. return preserveTableInfo(ret, ds, loseDistribution, false);
  6071. }
  6072. }
  6073. return NewHqlTransformer::createTransformed(expr);
  6074. }
  6075. IHqlExpression * convertSpillsToActivities(IHqlExpression * expr, bool createGraphResults)
  6076. {
  6077. SpillActivityTransformer transformer(createGraphResults);
  6078. transformer.analyse(expr, 0);
  6079. return transformer.transformRoot(expr);
  6080. }
  6081. /*
  6082. The classes in this file are responsible for converting a logical declarative graph into an execution graph.
  6083. This process requires multiple stages:
  6084. a) Spot which dataset expression should be evaluated once outside the activity they are contained in, and create a shared result.
  6085. b) Spot scalar expressions that should be evaluated once outside the activity they are contained in, and create a shared result.
  6086. c) Combine setresult activities that are
  6087. a) always unconditionally executed. or
  6088. b) linked by dependencies and the extra cost of evaluation outweighs the cost of saving the temporary
  6089. c) Only used as dependents from the same set of activities.
  6090. d) Only used within the same set of conditions (a superset of (c) but possibly harder to acheive)
  6091. d) Spot global expressions that will be evaluated in more than one setresult/(or other activity that is only
  6092. executed once - e.g., IF) and create a shared result.
  6093. e) Repeat (c) again.
  6094. f) Split the logical graph into subgraphs. For Thor the subgraphs have the following requirements:
  6095. 1) Dependencies are in separate subgraphs from the activities that need them
  6096. 2) Lazy results are in separate subgraphs from non-lazy results.
  6097. **When lazy results are implemented, check if this really is still a requirement, or can Thor child graphs
  6098. be executed more like roxie?
  6099. 3) Conditional subgraphs are not combined
  6100. 4) Each subgraph is within the appropriate resource limits.
  6101. g) Optimize the spills to reduce the number of fields spilled to disk and read from disk.
  6102. h) Create splitters within subgraphs, and for Thor mark if they are balanced or not. (i.e., Can they deadlock if they don't read ahead?)
  6103. i) Convert spills into diskread/diskwrite (or result read/write) activities
  6104. Previously these were all done in a single transformation, but that has now been split up into separate stages for simplicity.
  6105. Because they are implemented by different stages it is necessary to keep track of which outputs and results
  6106. are required, and which are primarily dependencies within the graph. (Note external results that are lazily executed
  6107. may also be marked as lazy.) The following attributes are used:
  6108. attr(_lazy_Atom, optCondition) - this external result is evaluated lazily. The condition is optional.
  6109. attr(_graphLocal_Atom) - this result is only evaluated within the graph. (May want to not save via the workunit.)
  6110. attr(_update_Atom) - a dependency of something with a ,UPDATE flag so may not be evaluated. (Not currently used.)
  6111. */