|
@@ -698,6 +698,41 @@ bool ResourceGraphInfo::addCondition(IHqlExpression * condition)
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+bool ResourceGraphInfo::isSharedInput(IHqlExpression * expr)
|
|
|
+{
|
|
|
+ IHqlExpression * body = expr->queryBody();
|
|
|
+ if (unbalancedExternalSources.contains(*body))
|
|
|
+ return false;
|
|
|
+ if (queryResourceInfo(expr)->expandRatherThanSplit())
|
|
|
+ return false;
|
|
|
+ unsigned numUses = 0;
|
|
|
+ ForEachItemIn(i, balancedExternalSources)
|
|
|
+ {
|
|
|
+ if (body == &balancedExternalSources.item(i))
|
|
|
+ numUses++;
|
|
|
+ }
|
|
|
+ //NumUses could be zero if an input should be expanded, and that input is shared by another graph which also expands
|
|
|
+ //the input. E.g. project(meta(diskread).
|
|
|
+ return numUses > 1;
|
|
|
+}
|
|
|
+
|
|
|
+void ResourceGraphInfo::addSharedInput(IHqlExpression * expr, IHqlExpression * mapped)
|
|
|
+{
|
|
|
+ sharedInputs.append(*LINK(expr));
|
|
|
+ sharedInputs.append(*LINK(mapped));
|
|
|
+}
|
|
|
+
|
|
|
+IHqlExpression * ResourceGraphInfo::queryMappedSharedInput(IHqlExpression * expr)
|
|
|
+{
|
|
|
+ unsigned max = sharedInputs.ordinality();
|
|
|
+ for (unsigned i=0; i < max; i+= 2)
|
|
|
+ {
|
|
|
+ if (expr == &sharedInputs.item(i))
|
|
|
+ return &sharedInputs.item(i+1);
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
bool ResourceGraphInfo::allocateResources(const CResources & value, const CResources & limit)
|
|
|
{
|
|
|
if (resources.addExceeds(value, limit))
|
|
@@ -897,6 +932,12 @@ bool ResourceGraphInfo::mergeInSource(ResourceGraphInfo & other, const CResource
|
|
|
if (options->checkResources() && !allocateResources(other.resources, limit))
|
|
|
return false;
|
|
|
|
|
|
+ mergeGraph(other, isConditionalLink, mergeConditions);
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+void ResourceGraphInfo::mergeGraph(ResourceGraphInfo & other, bool isConditionalLink, bool mergeConditions)
|
|
|
+{
|
|
|
#ifdef TRACE_RESOURCING
|
|
|
DBGLOG("Merging%s source into%s sink", other.isUnconditional ? "" : " conditional", isUnconditional ? "" : " conditional");
|
|
|
other.display();
|
|
@@ -938,8 +979,22 @@ bool ResourceGraphInfo::mergeInSource(ResourceGraphInfo & other, const CResource
|
|
|
conditions.append(OLINK(other.conditions.item(i)));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
//sources and sinks are updated elsewhere...
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+bool ResourceGraphInfo::mergeInSibling(ResourceGraphInfo & other, const CResources & limit)
|
|
|
+{
|
|
|
+ if ((!isUnconditional || !other.isUnconditional) && !hasSameConditions(other))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (isDependentOn(other, false) || other.isDependentOn(*this, false))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (options->checkResources() && !allocateResources(other.resources, limit))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ mergeGraph(other, false, false);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -1323,9 +1378,9 @@ IHqlExpression * ResourcerInfo::createSpiller(IHqlExpression * transformed, bool
|
|
|
else
|
|
|
{
|
|
|
if (transformed->isDataset())
|
|
|
- split.setown(createDataset(no_split, LINK(transformed), createAttribute(balancedAtom)));
|
|
|
+ split.setown(createDataset(no_split, LINK(transformed), createComma(createAttribute(balancedAtom), createUniqueId())));
|
|
|
else
|
|
|
- split.setown(createRow(no_split, LINK(transformed), createAttribute(balancedAtom)));
|
|
|
+ split.setown(createRow(no_split, LINK(transformed), createComma(createAttribute(balancedAtom), createUniqueId())));
|
|
|
split.setown(cloneInheritedAnnotations(original, split));
|
|
|
}
|
|
|
|
|
@@ -1487,6 +1542,7 @@ bool ResourcerInfo::expandRatherThanSpill(bool noteOtherSpills)
|
|
|
case no_nohoist:
|
|
|
case no_section:
|
|
|
case no_sectioninput:
|
|
|
+ case no_dataset_alias:
|
|
|
expr = expr->queryChild(0);
|
|
|
break;
|
|
|
case no_newusertable:
|
|
@@ -1612,6 +1668,7 @@ bool ResourcerInfo::expandRatherThanSplit()
|
|
|
case no_compound_inline:
|
|
|
case no_section:
|
|
|
case no_sectioninput:
|
|
|
+ case no_dataset_alias:
|
|
|
break;
|
|
|
case no_select:
|
|
|
if (options->targetClusterType == RoxieCluster)
|
|
@@ -1786,6 +1843,8 @@ EclResourcer::EclResourcer(IErrorReceiver * _errors, IConstWorkUnit * _wu, Clust
|
|
|
options.expandSingleConstRow = true;
|
|
|
options.createSpillAsDataset = _translatorOptions.optimizeSpillProject && (targetClusterType != HThorCluster);
|
|
|
options.useLinkedRawIterator = _translatorOptions.useLinkedRawIterator;
|
|
|
+ options.combineSiblings = _translatorOptions.combineSiblingGraphs && (targetClusterType != HThorCluster) && (targetClusterType != RoxieCluster);
|
|
|
+ options.optimizeSharedInputs = _translatorOptions.optimizeSharedGraphInputs && options.combineSiblings;
|
|
|
}
|
|
|
|
|
|
EclResourcer::~EclResourcer()
|
|
@@ -2364,6 +2423,10 @@ protected:
|
|
|
|
|
|
bool isEvaluateable(IHqlExpression * ds, bool ignoreInline = false)
|
|
|
{
|
|
|
+ //Don't hoist an alias - it could create unnecessary duplicate spills - hoist its input
|
|
|
+ if (ds->getOperator() == no_dataset_alias)
|
|
|
+ return false;
|
|
|
+
|
|
|
//Not allowed to hoist
|
|
|
if (isContextDependent(ds, (conditionalDepth == 0), true))
|
|
|
return false;
|
|
@@ -2583,6 +2646,25 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
|
|
|
if (options.preventSteppedSplit)
|
|
|
insideNeverSplit = true;
|
|
|
break;
|
|
|
+ case no_compound_diskread:
|
|
|
+ case no_compound_disknormalize:
|
|
|
+ case no_compound_diskaggregate:
|
|
|
+ case no_compound_diskcount:
|
|
|
+ case no_compound_diskgroupaggregate:
|
|
|
+ case no_compound_indexread:
|
|
|
+ case no_compound_indexnormalize:
|
|
|
+ case no_compound_indexaggregate:
|
|
|
+ case no_compound_indexcount:
|
|
|
+ case no_compound_indexgroupaggregate:
|
|
|
+ case no_compound_childread:
|
|
|
+ case no_compound_childnormalize:
|
|
|
+ case no_compound_childaggregate:
|
|
|
+ case no_compound_childcount:
|
|
|
+ case no_compound_childgroupaggregate:
|
|
|
+ case no_compound_selectnew:
|
|
|
+ case no_compound_inline:
|
|
|
+ insideNeverSplit = true;
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
ITypeInfo * type = expr->queryType();
|
|
@@ -3453,18 +3535,26 @@ void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned which
|
|
|
if (!info)
|
|
|
return;
|
|
|
|
|
|
- if (info->currentSource == whichSource)
|
|
|
+ if (graph && info->graph && info->graph != graph)
|
|
|
{
|
|
|
- if (info->pathToSplitter != path)
|
|
|
- info->balanced = false;
|
|
|
+ if ((info->currentSource == whichSource) && (info->pathToSplitter != path))
|
|
|
+ graph->unbalancedExternalSources.append(*LINK(expr->queryBody()));
|
|
|
+ info->currentSource = whichSource;
|
|
|
+ info->pathToSplitter.set(path);
|
|
|
return;
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (info->currentSource == whichSource)
|
|
|
+ {
|
|
|
+ if (info->pathToSplitter != path)
|
|
|
+ info->balanced = false;
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (graph && info->graph && info->graph != graph)
|
|
|
- return;
|
|
|
-
|
|
|
- info->currentSource = whichSource;
|
|
|
- info->pathToSplitter.set(path);
|
|
|
+ info->currentSource = whichSource;
|
|
|
+ info->pathToSplitter.set(path);
|
|
|
+ }
|
|
|
|
|
|
if (info->containsActivity)
|
|
|
{
|
|
@@ -3557,6 +3647,65 @@ void EclResourcer::spotUnbalancedSplitters(HqlExprArray & exprs)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void EclResourcer::spotSharedInputs(IHqlExpression * expr, ResourceGraphInfo * graph)
|
|
|
+{
|
|
|
+ ResourcerInfo * info = queryResourceInfo(expr);
|
|
|
+ if (!info)
|
|
|
+ return;
|
|
|
+
|
|
|
+ if (info->graph && info->graph != graph)
|
|
|
+ {
|
|
|
+ IHqlExpression * body = expr->queryBody();
|
|
|
+ if (!graph->unbalancedExternalSources.contains(*body))
|
|
|
+ graph->balancedExternalSources.append(*LINK(body));
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (info->isSplit())
|
|
|
+ {
|
|
|
+ //overload currentSource to track if we have visited this splitter before. It cannot have value value NotFound up to now
|
|
|
+ if (info->currentSource == NotFound)
|
|
|
+ return;
|
|
|
+ info->currentSource = NotFound;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (info->containsActivity)
|
|
|
+ {
|
|
|
+ unsigned first = getFirstActivityArgument(expr);
|
|
|
+ unsigned num = getNumActivityArguments(expr);
|
|
|
+ unsigned last = first + num;
|
|
|
+ for (unsigned idx = first; idx < last; idx++)
|
|
|
+ {
|
|
|
+ spotSharedInputs(expr->queryChild(idx), graph);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void EclResourcer::spotSharedInputs()
|
|
|
+{
|
|
|
+ //Thor only handles one graph at a time, so only walk expressions within a single graph.
|
|
|
+ ForEachItemIn(i1, graphs)
|
|
|
+ {
|
|
|
+ ResourceGraphInfo & curGraph = graphs.item(i1);
|
|
|
+ HqlExprCopyArray visited;
|
|
|
+ ForEachItemIn(i2, curGraph.sinks)
|
|
|
+ {
|
|
|
+ ResourceGraphLink & cur = curGraph.sinks.item(i2);
|
|
|
+ IHqlExpression * curExpr = cur.sourceNode;
|
|
|
+ if (!visited.contains(*curExpr))
|
|
|
+ {
|
|
|
+ ResourcerInfo * info = queryResourceInfo(curExpr);
|
|
|
+ if (!info->isExternalSpill() && !info->expandRatherThanSpill(true))
|
|
|
+ {
|
|
|
+ spotSharedInputs(curExpr, &curGraph);
|
|
|
+ visited.append(*curExpr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//------------------------------------------------------------------------------------------
|
|
|
// PASS6: Merge sub graphs that can share resources and don't have dependencies
|
|
|
// MORE: Once sources are merged, should try merging between trees.
|
|
@@ -3638,7 +3787,7 @@ bool EclResourcer::queryMergeGraphLink(ResourceGraphLink & link)
|
|
|
}
|
|
|
|
|
|
|
|
|
-void EclResourcer::mergeSubGraphs(unsigned pass)
|
|
|
+unsigned EclResourcer::getMaxDepth() const
|
|
|
{
|
|
|
unsigned maxDepth = 0;
|
|
|
for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
|
|
@@ -3647,7 +3796,13 @@ void EclResourcer::mergeSubGraphs(unsigned pass)
|
|
|
if (depth > maxDepth)
|
|
|
maxDepth = depth;
|
|
|
}
|
|
|
+ return maxDepth;
|
|
|
+}
|
|
|
|
|
|
+
|
|
|
+void EclResourcer::mergeSubGraphs(unsigned pass)
|
|
|
+{
|
|
|
+ unsigned maxDepth = getMaxDepth();
|
|
|
for (unsigned curDepth = maxDepth+1; curDepth-- != 0;)
|
|
|
{
|
|
|
mergeAgain:
|
|
@@ -3721,11 +3876,48 @@ mergeAgain:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- ForEachItemInRev(idx2, graphs)
|
|
|
+void EclResourcer::mergeSiblings()
|
|
|
+{
|
|
|
+ unsigned maxDepth = getMaxDepth();
|
|
|
+ for (unsigned curDepth = maxDepth+1; curDepth-- != 0;)
|
|
|
{
|
|
|
- if (graphs.item(idx2).isDead)
|
|
|
- graphs.remove(idx2);
|
|
|
+ for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
|
|
|
+ {
|
|
|
+ ResourceGraphInfo & cur = graphs.item(idx);
|
|
|
+ if ((cur.getDepth() == curDepth) && !cur.isDead)
|
|
|
+ {
|
|
|
+ ForEachItemIn(idxSource, cur.sources)
|
|
|
+ {
|
|
|
+ ResourceGraphLink & curLink = cur.sources.item(idxSource);
|
|
|
+ ResourceGraphInfo * source = curLink.sourceGraph;
|
|
|
+ IHqlExpression * sourceNode = curLink.sourceNode;
|
|
|
+ ResourcerInfo * sourceInfo = queryResourceInfo(sourceNode);
|
|
|
+ if (sourceInfo->neverSplit || sourceInfo->expandRatherThanSplit())
|
|
|
+ continue;
|
|
|
+
|
|
|
+ for (unsigned iSink = 0; iSink < source->sinks.ordinality(); )
|
|
|
+ {
|
|
|
+ ResourceGraphLink & secondLink = source->sinks.item(iSink);
|
|
|
+ ResourceGraphInfo * sink = secondLink.sinkGraph;
|
|
|
+ if (sink && (sink != &cur) && !sink->isDead && sourceNode->queryBody() == secondLink.sourceNode->queryBody())
|
|
|
+ {
|
|
|
+ if (cur.mergeInSibling(*sink, *resourceLimit))
|
|
|
+ {
|
|
|
+ //NB: Following cannot remove sources below the current index.
|
|
|
+ replaceGraphReferences(sink, &cur);
|
|
|
+ sink->isDead = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ iSink++;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ iSink++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3733,6 +3925,15 @@ void EclResourcer::mergeSubGraphs()
|
|
|
{
|
|
|
for (unsigned pass=0; pass < 2; pass++)
|
|
|
mergeSubGraphs(pass);
|
|
|
+
|
|
|
+ if (options.combineSiblings)
|
|
|
+ mergeSiblings();
|
|
|
+
|
|
|
+ ForEachItemInRev(idx2, graphs)
|
|
|
+ {
|
|
|
+ if (graphs.item(idx2).isDead)
|
|
|
+ graphs.remove(idx2);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------------------
|
|
@@ -4140,25 +4341,23 @@ IHqlExpression * EclResourcer::createResourced(IHqlExpression * expr, ResourceGr
|
|
|
if (info->graph != ownerGraph)
|
|
|
{
|
|
|
assertex(!defineSideEffect);
|
|
|
+ bool isShared = options.optimizeSharedInputs && ownerGraph && ownerGraph->isSharedInput(expr);
|
|
|
+ if (isShared)
|
|
|
+ {
|
|
|
+ IHqlExpression * mapped = ownerGraph->queryMappedSharedInput(expr->queryBody());
|
|
|
+ if (mapped)
|
|
|
+ return LINK(mapped);
|
|
|
+ }
|
|
|
+
|
|
|
IHqlExpression * source;
|
|
|
if (info->expandRatherThanSpill(true))
|
|
|
{
|
|
|
- if (options.minimiseSpills)
|
|
|
- {
|
|
|
- OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandInParent, false);
|
|
|
- if (queryAddUniqueToActivity(resourced))
|
|
|
- source = appendUniqueAttr(resourced);
|
|
|
- else
|
|
|
- source = LINK(resourced);
|
|
|
- }
|
|
|
+ bool expandChildInParent = options.minimiseSpills ? expandInParent : true;
|
|
|
+ OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandChildInParent, false);
|
|
|
+ if (queryAddUniqueToActivity(resourced))
|
|
|
+ source = appendUniqueAttr(resourced);
|
|
|
else
|
|
|
- {
|
|
|
- OwnedHqlExpr child = doCreateResourced(expr, info->graph, true, false);
|
|
|
- if (queryAddUniqueToActivity(child))
|
|
|
- source = appendUniqueAttr(child);
|
|
|
- else
|
|
|
- source = LINK(child);
|
|
|
- }
|
|
|
+ source = LINK(resourced);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -4186,6 +4385,12 @@ IHqlExpression * EclResourcer::createResourced(IHqlExpression * expr, ResourceGr
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if (isShared)
|
|
|
+ {
|
|
|
+ source = createDatasetF(no_split, source, createAttribute(balancedAtom), createUniqueId(), NULL);
|
|
|
+ ownerGraph->addSharedInput(expr->queryBody(), source);
|
|
|
+ }
|
|
|
+
|
|
|
return source;
|
|
|
}
|
|
|
|
|
@@ -4420,6 +4625,8 @@ void EclResourcer::resourceGraph(HqlExprArray & exprs, HqlExprArray & transforme
|
|
|
trace();
|
|
|
#endif
|
|
|
spotUnbalancedSplitters(exprs);
|
|
|
+ if (options.optimizeSharedInputs)
|
|
|
+ spotSharedInputs();
|
|
|
|
|
|
if (spotThroughAggregate)
|
|
|
optimizeAggregates();
|
|
@@ -4472,15 +4679,16 @@ void expandLists(HqlExprArray & args, IHqlExpression * expr)
|
|
|
|
|
|
IHqlExpression * resourceThorGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize, IHqlExpression * graphIdExpr)
|
|
|
{
|
|
|
- EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
- if (graphIdExpr)
|
|
|
- resourcer.setNewChildQuery(graphIdExpr, 0);
|
|
|
-
|
|
|
- HqlExprArray exprs;
|
|
|
- expandLists(exprs, expr);
|
|
|
-
|
|
|
HqlExprArray transformed;
|
|
|
- resourcer.resourceGraph(exprs, transformed);
|
|
|
+ {
|
|
|
+ EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
+ if (graphIdExpr)
|
|
|
+ resourcer.setNewChildQuery(graphIdExpr, 0);
|
|
|
+
|
|
|
+ HqlExprArray exprs;
|
|
|
+ expandLists(exprs, expr);
|
|
|
+ resourcer.resourceGraph(exprs, transformed);
|
|
|
+ }
|
|
|
hoistNestedCompound(translator, transformed);
|
|
|
return createActionList(transformed);
|
|
|
}
|
|
@@ -4490,21 +4698,23 @@ static IHqlExpression * doResourceGraph(HqlCppTranslator & translator, HqlExprCo
|
|
|
ClusterType targetClusterType, unsigned clusterSize,
|
|
|
IHqlExpression * graphIdExpr, unsigned * numResults, bool isChild, bool useGraphResults)
|
|
|
{
|
|
|
- EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
- if (isChild)
|
|
|
- resourcer.setChildQuery(true);
|
|
|
- resourcer.setNewChildQuery(graphIdExpr, *numResults);
|
|
|
- resourcer.setUseGraphResults(useGraphResults);
|
|
|
+ HqlExprArray transformed;
|
|
|
+ {
|
|
|
+ EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
+ if (isChild)
|
|
|
+ resourcer.setChildQuery(true);
|
|
|
+ resourcer.setNewChildQuery(graphIdExpr, *numResults);
|
|
|
+ resourcer.setUseGraphResults(useGraphResults);
|
|
|
|
|
|
- if (activeRows)
|
|
|
- resourcer.tagActiveCursors(*activeRows);
|
|
|
+ if (activeRows)
|
|
|
+ resourcer.tagActiveCursors(*activeRows);
|
|
|
|
|
|
- HqlExprArray exprs;
|
|
|
- expandLists(exprs, expr);
|
|
|
+ HqlExprArray exprs;
|
|
|
+ expandLists(exprs, expr);
|
|
|
|
|
|
- HqlExprArray transformed;
|
|
|
- resourcer.resourceGraph(exprs, transformed);
|
|
|
- *numResults = resourcer.numGraphResults();
|
|
|
+ resourcer.resourceGraph(exprs, transformed);
|
|
|
+ *numResults = resourcer.numGraphResults();
|
|
|
+ }
|
|
|
hoistNestedCompound(translator, transformed);
|
|
|
return createActionList(transformed);
|
|
|
}
|
|
@@ -4528,13 +4738,15 @@ IHqlExpression * resourceLoopGraph(HqlCppTranslator & translator, HqlExprCopyArr
|
|
|
|
|
|
IHqlExpression * resourceRemoteGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize)
|
|
|
{
|
|
|
- EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
+ HqlExprArray transformed;
|
|
|
+ {
|
|
|
+ EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
|
|
|
|
|
|
- HqlExprArray exprs;
|
|
|
- expandLists(exprs, expr);
|
|
|
+ HqlExprArray exprs;
|
|
|
+ expandLists(exprs, expr);
|
|
|
|
|
|
- HqlExprArray transformed;
|
|
|
- resourcer.resourceRemoteGraph(exprs, transformed);
|
|
|
+ resourcer.resourceRemoteGraph(exprs, transformed);
|
|
|
+ }
|
|
|
hoistNestedCompound(translator, transformed);
|
|
|
return createActionList(transformed);
|
|
|
}
|
|
@@ -4555,12 +4767,30 @@ d) if (a, b(f1) +b(f2), c) needs to link b twice though!
|
|
|
|
|
|
*/
|
|
|
|
|
|
+/*
|
|
|
+This transformer converts spill activities to no_dataset/no_output, and also converts splitters of splitters into
|
|
|
+a single splitter.
|
|
|
+*/
|
|
|
+
|
|
|
class SpillActivityTransformer : public NewHqlTransformer
|
|
|
{
|
|
|
public:
|
|
|
SpillActivityTransformer();
|
|
|
|
|
|
+protected:
|
|
|
+ virtual void analyseExpr(IHqlExpression * expr);
|
|
|
virtual IHqlExpression * createTransformed(IHqlExpression * expr);
|
|
|
+
|
|
|
+ bool isUnbalanced(IHqlExpression * body)
|
|
|
+ {
|
|
|
+ ANewTransformInfo * info = queryTransformExtra(body);
|
|
|
+ return info->spareByte1 != 0;
|
|
|
+ }
|
|
|
+ void setUnbalanced(IHqlExpression * body)
|
|
|
+ {
|
|
|
+ ANewTransformInfo * info = queryTransformExtra(body);
|
|
|
+ info->spareByte1 = true;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
static HqlTransformerInfo spillActivityTransformerInfo("SpillActivityTransformer");
|
|
@@ -4569,6 +4799,30 @@ SpillActivityTransformer::SpillActivityTransformer()
|
|
|
{
|
|
|
}
|
|
|
|
|
|
+void SpillActivityTransformer::analyseExpr(IHqlExpression * expr)
|
|
|
+{
|
|
|
+ IHqlExpression * body = expr->queryBody();
|
|
|
+ if (alreadyVisited(body))
|
|
|
+ return;
|
|
|
+ if (body->getOperator() == no_split)
|
|
|
+ {
|
|
|
+ IHqlExpression * input = body->queryChild(0);
|
|
|
+ if (input->getOperator() == no_split)
|
|
|
+ {
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ IHqlExpression * cur = input->queryChild(0);
|
|
|
+ if (cur->getOperator() != no_split)
|
|
|
+ break;
|
|
|
+ input = cur;
|
|
|
+ }
|
|
|
+ if (!body->hasProperty(balancedAtom))
|
|
|
+ setUnbalanced(input->queryBody());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ NewHqlTransformer::analyseExpr(expr);
|
|
|
+}
|
|
|
+
|
|
|
IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * expr)
|
|
|
{
|
|
|
IHqlExpression * annotation = queryTransformAnnotation(expr);
|
|
@@ -4577,6 +4831,16 @@ IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * ex
|
|
|
|
|
|
switch (expr->getOperator())
|
|
|
{
|
|
|
+ case no_split:
|
|
|
+ {
|
|
|
+ IHqlExpression * input = expr->queryChild(0);
|
|
|
+ if (input->getOperator() == no_split)
|
|
|
+ return transform(input);
|
|
|
+ OwnedHqlExpr transformed = NewHqlTransformer::createTransformed(expr);
|
|
|
+ if (transformed->hasProperty(balancedAtom) && isUnbalanced(expr))
|
|
|
+ return removeProperty(transformed, balancedAtom);
|
|
|
+ return transformed.getClear();
|
|
|
+ }
|
|
|
case no_writespill:
|
|
|
{
|
|
|
HqlExprArray args;
|
|
@@ -4608,5 +4872,6 @@ IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * ex
|
|
|
IHqlExpression * convertSpillsToActivities(IHqlExpression * expr)
|
|
|
{
|
|
|
SpillActivityTransformer transformer;
|
|
|
+ transformer.analyse(expr, 0);
|
|
|
return transformer.transformRoot(expr);
|
|
|
}
|