/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include "jlib.hpp"
#include "hqlexpr.hpp"
#include "hqlattr.hpp"
#include "hqlmeta.hpp"
#include "hqlutil.hpp"
#include "hqlcpputil.hpp"
#include "hqlthql.hpp"
#include "hqlcatom.hpp"
#include "hqlfold.hpp"
#include "hqlcerrors.hpp"
#include "hqltrans.ipp"
#include "hqlpmap.hpp"
#include "hqltcppc.ipp"
#include "hqlttcpp.ipp"
#include "hqlresource.ipp"
#include "../../thorlcr/thorutil/thbufdef.hpp"
#define MINIMAL_CHANGES
#define MAX_INLINE_COMMON_COUNT 5
//#define TRACE_RESOURCING
//#define VERIFY_RESOURCING
//#define SPOT_UNCONDITIONAL_CONDITIONS
#define DEFAULT_MAX_SOCKETS 2000 // configurable by setting max_sockets in .ini
#define DEFAULT_TOTAL_MEMORY ((1024*1024*1800)-DEFAULT_LARGEMEM_BUFFER_SIZE)
#define FIXED_CLUSTER_SIZE 400
#define MEM_Const_Minimal (1*1024*1024)
#define DEFAULT_MAX_ACTIVITIES 100
//=== The following provides information about how each kind of activity is resourced ====
static void setHashResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
{
if (options.useMpForDistribute)
{
unsigned memneeded = MEM_Const_Minimal+resources.clusterSize*4*DISTRIBUTE_SINGLE_BUFFER_SIZE+DISTRIBUTE_PULL_BUFFER_SIZE;
resources.set(RESslavememory, memneeded).set(REShashdist, 1);
}
else
{
resources.set(RESslavememory, MEM_Const_Minimal+DISTRIBUTE_PULL_BUFFER_SIZE).set(REShashdist, 1);
resources.setManyToManySockets(2);
}
}
//MORE: Use a single function to map an hqlexpression to an activity kind.
void getResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
{
//MORE: What effect should a child query have? Definitely the following, but what about other resourcing?
if (options.isChildQuery || queryHint(expr, lightweightAtom))
{
resources.setLightweight();
return;
}
bool isLocal = isLocalActivity(expr);
bool isGrouped = isGroupedActivity(expr);
switch (expr->getOperator())
{
case no_join:
case no_selfjoin:
case no_denormalize:
case no_denormalizegroup:
if (isKeyedJoin(expr) || expr->hasProperty(_lightweight_Atom))
resources.setLightweight();
else if (expr->hasProperty(lookupAtom))
{
if (expr->hasProperty(fewAtom))
{
resources.setLightweight().set(RESslavememory, MEM_Const_Minimal+LOOKUPJOINL_SMART_BUFFER_SIZE);
resources.setManyToMasterSockets(1);
}
else
{
resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+LOOKUPJOINL_SMART_BUFFER_SIZE);
resources.setManyToMasterSockets(1);
}
}
else if (expr->hasProperty(hashAtom))
{
resources.setHeavyweight();
setHashResources(expr, resources, options);
}
else
{
resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+SORT_BUFFER_TOTAL+JOINR_SMART_BUFFER_SIZE);
if (!isLocal)
{
#ifndef SORT_USING_MP
resources.setManyToManySockets(2);
#endif
}
}
break;
case no_dedup:
if (isGrouped || (!expr->hasProperty(allAtom) && !expr->hasProperty(hashAtom)))
{
resources.setLightweight();
if (!isGrouped && !isLocal)
{
resources.set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
resources.setManyToMasterSockets(1);
}
}
else if (isLocal)
{
resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
//This can't be right....
//resources.setManyToMasterSockets(1);
}
else
{
//hash dedup.
resources.setHeavyweight();
setHashResources(expr, resources, options);
}
break;
case no_rollup:
resources.setLightweight();
if (!isGrouped && !isLocal)
{
resources.set(RESslavememory, MEM_Const_Minimal+DEDUP_SMART_BUFFER_SIZE);
resources.setManyToMasterSockets(1);
}
break;
case no_distribute:
case no_keyeddistribute:
resources.setLightweight();
setHashResources(expr, resources, options);
break;
case no_sort:
if (isGrouped)
{
if (expr->hasProperty(manyAtom))
resources.setHeavyweight();
else
resources.setLightweight();
}
else if (expr->hasProperty(fewAtom) && isLocal)
resources.setLightweight();
else if (isLocal)
resources.setHeavyweight();
else
{
resources.setHeavyweight();
#ifndef SORT_USING_MP
resources.setManyToManySockets(2);
#endif
}
break;
case no_topn:
resources.setLightweight();
break;
case no_pipe:
//surely it should be something like this.
resources.setLightweight().set(RESslavesocket, 1);
break;
case no_table:
{
IHqlExpression * mode = expr->queryChild(2);
if (mode && mode->getOperator() == no_pipe)
{
resources.setLightweight().set(RESslavesocket, 1);
}
else
{
resources.setLightweight();
if (expr->hasProperty(_workflowPersist_Atom) && expr->hasProperty(distributedAtom))
setHashResources(expr, resources, options); // may require a hash distribute
}
break;
}
case no_output:
{
IHqlExpression * filename = expr->queryChild(1);
if (expr->hasProperty(_spill_Atom))
{
//resources.setLightweight(); // assume no resources(!)
}
else if (filename && filename->getOperator() == no_pipe)
{
resources.setLightweight().set(RESslavesocket, 1);
}
else if (filename && !filename->isAttribute())
{
resources.setLightweight();
}
else
{
resources.setLightweight().set(RESslavememory, WORKUNITWRITE_SMART_BUFFER_SIZE);
}
break;
}
case no_distribution:
resources.setLightweight().set(RESmastersocket, 16).set(RESslavesocket, 1);
break;
case no_aggregate:
case no_newaggregate:
{
IHqlExpression * grouping = queryRealChild(expr, 3);
if (grouping)
{
//Is this really correct???
resources.setLightweight();
setHashResources(expr, resources, options);
}
else
{
resources.setLightweight();
//if (!isGrouped)
// resources.set(RESmastersocket, 16).set(RESslavesocket, 1);
}
}
break;
case no_hqlproject:
resources.setLightweight();
//Add a flag onto count project to indicate it is a different variety.
if (expr->hasProperty(_countProject_Atom) && !isLocal)
resources.set(RESslavememory, COUNTPROJECT_SMART_BUFFER_SIZE);
break;
case no_enth:
resources.setLightweight();
if (!isLocal)
resources.set(RESslavememory, CHOOSESETS_SMART_BUFFER_SIZE);
break;
case no_metaactivity:
if (expr->hasProperty(pullAtom))
resources.setLightweight().set(RESslavememory, PULL_SMART_BUFFER_SIZE);
break;
case no_setresult:
case no_extractresult:
case no_outputscalar:
resources.setLightweight();//.set(RESmastersocket, 1).set(RESslavesocket, 1);
break;
case no_choosesets:
resources.setLightweight();
if (!isLocal || expr->hasProperty(enthAtom) || expr->hasProperty(lastAtom))
resources.set(RESslavememory, CHOOSESETS_SMART_BUFFER_SIZE);
break;
case no_iterate:
resources.setLightweight();
if (!isGrouped && !isLocal)
resources.setManyToMasterSockets(1).set(RESslavememory, ITERATE_SMART_BUFFER_SIZE);
break;
case no_choosen:
resources.setLightweight().set(RESslavememory, FIRSTN_SMART_BUFFER_SIZE);
break;
case no_spill:
case no_spillgraphresult:
//assumed to take no resources;
break;
case no_addfiles:
case no_merge:
{
resources.setLightweight();
unsigned bufSize = FUNNEL_PERINPUT_BUFF_SIZE*expr->numChildren();
if (bufSize < FUNNEL_MIN_BUFF_SIZE) bufSize = FUNNEL_MIN_BUFF_SIZE;
resources.set(RESslavememory, MEM_Const_Minimal+bufSize);
break;
}
case no_compound:
//MORE: Should really be the total resources for the lhs... Really needs more thought.
break;
case no_libraryselect:
//Do not allocate any resources for this, we don't want it to cause a spill under any circumstances
break;
case no_split:
//Should really be included in the cost....
default:
resources.setLightweight();
break;
}
}
CResources & CResources::setLightweight()
{
return set(RESslavememory, 0x10000).set(RESactivities, 1);
}
CResources & CResources::setHeavyweight()
{
return set(RESslavememory, 0x100000).set(RESheavy, 1).set(RESactivities, 1);
}
//-------------------------------------------------------------------------------------------
const char * queryResourceName(ResourceType kind)
{
switch (kind)
{
case RESslavememory: return "Slave Memory";
case RESslavesocket: return "Slave Sockets";
case RESmastermemory: return "Master Memory";
case RESmastersocket: return "Master Sockets";
case REShashdist: return "Hash Distributes";
case RESheavy: return "Heavyweight";
case RESactivities: return "Activities";
}
return "Unknown";
}
inline ResourcerInfo * queryResourceInfo(IHqlExpression * expr) { return (ResourcerInfo *)expr->queryBody()->queryTransformExtra(); }
inline bool isResourcedActivity(IHqlExpression * expr)
{
ResourcerInfo * extra = queryResourceInfo(expr);
return (extra && (extra->isActivity || extra->containsActivity));
}
bool isWorthForcingHoist(IHqlExpression * expr)
{
loop
{
switch (expr->getOperator())
{
case no_selectnth:
case no_filter:
case no_newaggregate:
if (isResourcedActivity(expr->queryChild(0)))
return true;
break;
default:
return false;
}
expr = expr->queryChild(0);
}
}
void CResources::add(const CResources & other)
{
for (unsigned i = 0; i < RESmax; i++)
resource[i] += other.resource[i];
}
bool CResources::addExceeds(const CResources & other, const CResources & limit) const
{
for (unsigned i = 0; i < RESmax; i++)
{
if (resource[i] + other.resource[i] > limit.resource[i])
{
//DBGLOG("Cannot merge because limit for %s exceeded (%d cf %d)", queryResourceName((ResourceType)i), resource[i] + other.resource[i], limit.resource[i]);
return true;
}
}
return false;
}
StringBuffer & CResources::getExceedsReason(StringBuffer & reasonText, const CResources & other, const CResources & limit) const
{
bool first = true;
for (unsigned i = 0; i < RESmax; i++)
{
if (resource[i] + other.resource[i] > limit.resource[i])
{
if (!first) reasonText.append(", ");
first = false;
reasonText.appendf("%s (%d>%d)", queryResourceName((ResourceType)i), resource[i] + other.resource[i], limit.resource[i]);
}
}
return reasonText;
}
bool CResources::exceeds(const CResources & limit) const
{
for (unsigned i = 0; i < RESmax; i++)
{
if (resource[i] > limit.resource[i])
return true;
}
return false;
}
void CResources::maximize(const CResources & other)
{
for (unsigned i = 0; i < RESmax; i++)
{
if (resource[i] < other.resource[i])
resource[i] = other.resource[i];
}
}
CResources & CResources::setManyToMasterSockets(unsigned numSockets)
{
set(RESslavesocket, numSockets);
set(RESmastersocket, numSockets * clusterSize);
return *this;
}
CResources & CResources::setManyToManySockets(unsigned numSockets)
{
return set(RESslavesocket, numSockets * clusterSize);
}
void CResources::sub(const CResources & other)
{
for (unsigned i = 0; i < RESmax; i++)
resource[i] -= other.resource[i];
}
//===========================================================================
inline bool isAffectedByResourcing(IHqlExpression * expr)
{
switch (expr->getOperator())
{
case no_record:
case no_constant:
case no_attr:
return false;
}
return true;
}
bool isSimpleAggregateResult(IHqlExpression * expr)
{
//MORE: no_extractresult is really what is meant
if (expr->getOperator() != no_extractresult)
return false;
IHqlExpression * value = expr->queryChild(0);
if (value->getOperator() != no_datasetfromrow)
return false;
//MORE: This currently doesn't hoist selects from nested records, but not sure there is a syntax to do that either.
IHqlExpression * ds = value->queryChild(0);
if (!isSelectFirstRow(ds))
return false;
ds = ds->queryChild(0);
if (ds->getOperator() != no_newaggregate)
return false;
return true;
}
bool lightweightAndReducesDatasetSize(IHqlExpression * expr)
{
switch (expr->getOperator())
{
case no_hqlproject:
case no_newusertable:
return reducesRowSize(expr);
case no_dedup:
if (isGroupedActivity(expr) || (!expr->hasProperty(allAtom) && !expr->hasProperty(hashAtom)))
return true;
break;
case no_aggregate:
case no_newaggregate:
{
IHqlExpression * grouping = queryRealChild(expr, 3);
if (grouping)
return false;
return true;
}
case no_rollupgroup:
case no_rollup:
case no_choosen:
case no_choosesets:
case no_enth:
case no_sample:
case no_filter:
case no_limit:
case no_filtergroup:
return true;
case no_group:
//removing grouping will reduce size of the spill file.
if (expr->queryType()->queryGroupInfo() == NULL)
return true;
break;
}
return false;
}
bool heavyweightAndReducesSizeOrSkew(IHqlExpression * expr)
{
switch (expr->getOperator())
{
case no_aggregate:
case no_newaggregate:
{
//more; hash aggregate?
break;
}
case no_distribute:
return true;
}
return false;
}
//---------------------------------------------------------------------------
IHqlExpression * CResourceOptions::createSpillName(bool isGraphResult)
{
if (isGraphResult)
return getSizetConstant(nextResult++);
StringBuffer s;
s.append("~spill::");
getUniqueId(s);
if (!mangleSpillNameWithWuid)
return createConstant(s.str());
s.append("_");
if (filenameMangler)
{
s.append(filenameMangler);
return createConstant(s.str());
}
ITypeInfo * type = makeStringType(UNKNOWN_LENGTH, NULL, NULL);
return createValue(no_concat, type, createConstant(s.str()), createValue(no_wuid, LINK(type)));
}
//---------------------------------------------------------------------------
static bool isIndex(IHqlExpression * expr)
{
switch (expr->getOperator())
{
case no_keyindex:
case no_newkeyindex:
return true;
}
return false;
}
IHqlExpression * appendUniqueAttr(IHqlExpression * expr)
{
return replaceOwnedProperty(expr, createUniqueId());
}
bool queryAddUniqueToActivity(IHqlExpression * expr)
{
if (!isSourceActivity(expr))
return false;
switch (expr->getOperator())
{
case no_workunit_dataset:
case no_getgraphresult:
case no_getgraphloopresult:
case no_xmlproject:
case no_datasetfromrow:
case no_rows:
case no_allnodes:
case no_thisnode:
case no_select: // it can get lost, and that then causes inconsistent trees.
return false;
}
return true;
}
//---------------------------------------------------------------------------
ResourceGraphLink::ResourceGraphLink(ResourceGraphInfo * _sourceGraph, IHqlExpression * _sourceNode, ResourceGraphInfo * _sinkGraph, IHqlExpression * _sinkNode, LinkKind _linkKind)
{
assertex(_sourceGraph);
sourceGraph.set(_sourceGraph);
sourceNode.set(_sourceNode);
sinkGraph.set(_sinkGraph);
sinkNode.set(_sinkNode);
assertex(!sinkGraph || sinkNode);
linkKind = _linkKind;
trace("create");
}
ResourceGraphLink::~ResourceGraphLink()
{
trace("delete");
}
void ResourceGraphLink::changeSourceGraph(ResourceGraphInfo * newGraph)
{
sourceGraph->sinks.zap(*this);
sourceGraph.set(newGraph);
newGraph->sinks.append(*this);
trace("change source");
}
void ResourceGraphLink::changeSinkGraph(ResourceGraphInfo * newGraph)
{
sinkGraph->sources.zap(*this);
sinkGraph.set(newGraph);
newGraph->sources.append(*this);
trace("change sink");
}
bool ResourceGraphLink::isRedundantLink()
{
ResourcerInfo * info = queryResourceInfo(sourceNode);
return info->expandRatherThanSpill(false);
}
void ResourceGraphLink::trace(const char * name)
{
#ifdef TRACE_RESOURCING
PrintLog("%s: %lx source(%lx,%lx) sink(%lx,%lx) %s", name, this, sourceGraph.get(), sourceNode->queryBody(), sinkGraph.get(), sinkNode ? sinkNode->queryBody() : NULL,
linkKind == ConditionalLink ? "conditional" : linkKind == SequenceLink ? "sequence" : "");
#endif
}
ResourceGraphDependencyLink::ResourceGraphDependencyLink(ResourceGraphInfo * _sourceGraph, IHqlExpression * _sourceNode, ResourceGraphInfo * _sinkGraph, IHqlExpression * _sinkNode)
: ResourceGraphLink(_sourceGraph, _sourceNode, _sinkGraph, _sinkNode, UnconditionalLink)
{
}
void ResourceGraphDependencyLink::changeSourceGraph(ResourceGraphInfo * newGraph)
{
sourceGraph.set(newGraph);
trace("change source");
}
void ResourceGraphDependencyLink::changeSinkGraph(ResourceGraphInfo * newGraph)
{
sinkGraph.set(newGraph);
newGraph->dependsOn.append(*this);
trace("change sink");
}
//---------------------------------------------------------------------------
ResourceGraphInfo::ResourceGraphInfo(CResourceOptions * _options) : resources(_options->clusterSize)
{
options = _options;
depth = 0;
beenResourced = false;
isUnconditional = false;
mergedConditionSource = false;
hasConditionSource = false;
isDead = false;
gatheredDependants = false;
startedGeneratingResourced = false;
inheritedExpandedDependencies = false;
}
ResourceGraphInfo::~ResourceGraphInfo()
{
}
bool ResourceGraphInfo::addCondition(IHqlExpression * condition)
{
if (conditions.find(*condition) == NotFound)
{
conditions.append(*LINK(condition));
#ifdef SPOT_UNCONDITIONAL_CONDITIONS
_ATOM name = condition->queryName();
_ATOM invName = NULL;
if (name == trueAtom)
invName = falseAtom;
else if (name == falseAtom)
invName = trueAtom;
if (invName)
{
IHqlExpression * parent = condition->queryChild(1);
OwnedHqlExpr invTag = createAttribute(invName, LINK(condition->queryChild(0)), LINK(parent));
if (conditions.find(*invTag) != NotFound)
{
if (!parent)
return true;
return addCondition(parent);
}
}
#endif
}
return false;
}
bool ResourceGraphInfo::allocateResources(const CResources & value, const CResources & limit)
{
if (resources.addExceeds(value, limit))
return false;
resources.add(value);
return true;
}
bool ResourceGraphInfo::containsActiveSinks()
{
ForEachItemIn(idx, sinks)
{
ResourcerInfo * info = queryResourceInfo(sinks.item(idx).sourceNode);
if (!info->expandRatherThanSpill(false))
return true;
}
return false;
}
void ResourceGraphInfo::display()
{
StringBuffer s;
s.appendf("graph %p src(", this);
ForEachItemIn(idxs, sources)
s.appendf("%p ", sources.item(idxs).sourceGraph.get());
s.append(") dep(");
ForEachItemIn(idxd, dependsOn)
s.appendf("%p ", dependsOn.item(idxd).sourceGraph.get());
s.append(")");
if (isUnconditional)
s.append(" ");
DBGLOG("%s", s.str());
}
void ResourceGraphInfo::expandDependants(ResourceGraphArray & target)
{
ForEachItemIn(idx, dependsOn)
{
ResourceGraphInfo * source = dependsOn.item(idx).sourceGraph;
if (target.find(*source) == NotFound)
target.append(*LINK(source));
}
ForEachItemIn(idx2, sources)
{
ResourceGraphLink & cur = sources.item(idx2);
// if (!queryResourceInfo(cur.sourceNode)->expandRatherThanSpill())
{
ResourceGraphInfo * source = cur.sourceGraph;
if (target.find(*source) == NotFound)
target.append(*LINK(source));
}
}
ForEachItemIn(idx3, indirectSources)
{
ResourceGraphInfo & cur = indirectSources.item(idx3);
if (target.find(cur) == NotFound)
target.append(OLINK(cur));
}
}
void ResourceGraphInfo::gatherDependants(bool recalculate)
{
if (gatheredDependants && !recalculate)
return;
gatheredDependants = true;
depth = 0;
indirectSources.kill();
ForEachItemIn(idx, dependsOn)
{
ResourceGraphInfo * source = dependsOn.item(idx).sourceGraph;
source->gatherDependants(false);
source->expandDependants(indirectSources);
if (source->depth >= depth)
depth = source->depth + 1;
}
ForEachItemIn(idx2, sources)
{
ResourceGraphInfo * source = sources.item(idx2).sourceGraph;
source->gatherDependants(false);
source->expandDependants(indirectSources);
if (source->getDepth() >= depth)
depth = source->getDepth() + 1;
}
}
void ResourceGraphInfo::getMergeFailReason(StringBuffer & reasonText, ResourceGraphInfo * otherGraph, const CResources & limit)
{
resources.getExceedsReason(reasonText, otherGraph->resources, limit);
}
void ResourceGraphInfo::replaceReferences(ResourceGraphInfo * oldGraph, ResourceGraphInfo * newGraph)
{
if (indirectSources.find(*oldGraph) != NotFound)
gatheredDependants = false;
}
unsigned ResourceGraphInfo::getDepth()
{
gatherDependants(false);
return depth;
}
bool ResourceGraphInfo::hasSameConditions(ResourceGraphInfo & other)
{
if (conditions.ordinality() != other.conditions.ordinality())
return false;
ForEachItemIn(i, conditions)
if (other.conditions.find(conditions.item(i)) == NotFound)
return false;
return true;
}
bool ResourceGraphInfo::isDependentOn(ResourceGraphInfo & other, bool allowDirect)
{
gatherDependants(false);
ForEachItemIn(idx, dependsOn)
{
ResourceGraphInfo * cur = dependsOn.item(idx).sourceGraph;
if (cur == &other)
return true;
}
ForEachItemIn(idx2, indirectSources)
{
ResourceGraphInfo & cur = indirectSources.item(idx2);
if (&cur == &other)
return true;
}
if (!allowDirect)
{
ForEachItemIn(idx3, sources)
{
if (sources.item(idx3).sourceGraph == &other)
return true;
}
}
return false;
}
bool ResourceGraphInfo::isVeryCheap()
{
if (sinks.ordinality() != 1)
return false;
IHqlExpression * sourceNode = sinks.item(0).sourceNode;
if (isSimpleAggregateResult(sourceNode))
return true;
// Not sure about the following....
// if (sourceNode->getOperator() == no_setresult)
// return true;
//Could be other examples...
return false;
}
bool ResourceGraphInfo::mergeInSource(ResourceGraphInfo & other, const CResources & limit, bool isConditionalLink)
{
bool mergeConditions = false;
if (!isUnconditional)
{
//if it is conditional and it is very cheap then it is still more efficient to merge
//rather than read from a spill file.
if (other.isUnconditional || !hasSameConditions(other))
{
if ((hasConditionSource || !isVeryCheap()) && (other.sinks.ordinality() != 1))
return false;
mergeConditions = true;
}
}
if (isDependentOn(other, true))
return false;
if (!options->canSplit())
{
//Don't merge two graphs that will cause a splitter to be created
//Either already used internally, or an output will be merged twice
HqlExprArray mergeNodes;
ForEachItemIn(idx, sources)
{
ResourceGraphLink & cur = sources.item(idx);
if (cur.sourceGraph == &other)
{
IHqlExpression * sourceNode = cur.sourceNode->queryBody();
ResourcerInfo * info = queryResourceInfo(sourceNode);
if ((info->numInternalUses() != 0) || (mergeNodes.find(*sourceNode) != NotFound) || info->preventMerge())
return false;
mergeNodes.append(*LINK(sourceNode));
}
}
}
if (options->checkResources() && !allocateResources(other.resources, limit))
return false;
#ifdef TRACE_RESOURCING
DBGLOG("Merging%s source into%s sink", other.isUnconditional ? "" : " conditional", isUnconditional ? "" : " conditional");
other.display();
display();
PrintLog("Merge %p into %p", &other, this);
#endif
if (other.hasConditionSource)
hasConditionSource = true;
//Recalculate the dependents, because sources of the source merged in may no longer be indirect
//although they may be via another path.
gatheredDependants = false;
//Any of my sources will already have the correct dependencies, however,
//anything that is a sink of the source has just gained some dependencies.
ForEachItemIn(idx, other.sinks)
{
ResourceGraphInfo * graph = other.sinks.item(idx).sinkGraph;
if (graph)
graph->gatheredDependants = false;
}
//If was very cheap and merged into an unconditional graph, make sure this is now tagged as
//unconditional...
if (other.isUnconditional)
isUnconditional = true;
//We need to stop spills being an arm of a conditional branch - otherwise they won't get executed.
//so see if we have merged any conditional branches in
if (isConditionalLink || other.mergedConditionSource)
mergedConditionSource = true;
if (mergeConditions)
{
//Replace conditions with those of parent. Only called when a very simple graph is
//merged in, so replace conditions instead of merging
conditions.kill();
ForEachItemIn(i, other.conditions)
conditions.append(OLINK(other.conditions.item(i)));
}
//sources and sinks are updated elsewhere...
return true;
}
void ResourceGraphInfo::removeResources(const CResources & value)
{
resources.sub(value);
}
//---------------------------------------------------------------------------
ResourcerInfo::ResourcerInfo(IHqlExpression * _original, CResourceOptions * _options)
{
original.set(_original);
numUses = 0;
numExternalUses = 0;
gatheredDependencies = false;
containsActivity = false;
isActivity = false;
transformed = NULL;
conditionSourceCount = 0;
pathToExpr = PathUnknown;
outputToUseForSpill = NULL;
isAlreadyInScope = false;
isSpillPoint = false;
options = _options;
balanced = true;
currentSource = 0;
linkedFromChild = false;
neverSplit = false;
}
void ResourcerInfo::setConditionSource(IHqlExpression * condition, bool isFirst)
{
if (isFirst)
{
conditionSourceCount++;
graph->hasConditionSource = true;
}
}
bool ResourcerInfo::addCondition(IHqlExpression * condition)
{
conditions.append(*LINK(condition));
return graph->addCondition(condition);
}
void ResourcerInfo::addSpillFlags(HqlExprArray & args, bool isRead)
{
IHqlExpression * grouping = (IHqlExpression *)original->queryType()->queryGroupInfo();
if (grouping)
args.append(*createAttribute(groupedAtom));
if (outputToUseForSpill)
{
assertex(isRead);
if (outputToUseForSpill->hasProperty(__compressed__Atom))
args.append(*createAttribute(__compressed__Atom));
}
else
{
args.append(*createAttribute(__compressed__Atom));
args.append(*createAttribute(_spill_Atom));
args.append(*createAttribute(_noReplicate_Atom));
if (options->targetRoxie())
args.append(*createAttribute(_noAccess_Atom));
}
if (isRead)
{
args.append(*createAttribute(_noVirtual_Atom)); // Don't interpret virtual fields as virtual...
if (!original->isDataset() || hasSingleRow(original))
args.append(*createAttribute(rowAtom)); // add a flag so the selectnth[1] can be removed later...
}
else
{
unsigned numUses = numExternalUses;
if (false) // options->cloneConditionals)
{
//Remove all the links from conditional graphs...
ForEachItemIn(i1, graph->sinks)
{
ResourceGraphLink & cur = graph->sinks.item(i1);
if ((cur.sourceNode == original) && (cur.linkKind == ConditionalLink || !cur.sinkGraph->isUnconditional))
numUses--;
}
numUses += calcNumConditionalUses();
}
args.append(*createAttribute(_tempCount_Atom, getSizetConstant(numUses)));
}
}
bool ResourcerInfo::alwaysExpand()
{
return (original->getOperator() == no_mapto);
}
static IHqlExpression * stripTrueFalse(IHqlExpression * expr)
{
if (expr->queryName() == instanceAtom)
{
IHqlExpression * parent = expr->queryChild(2);
if (parent)
parent = stripTrueFalse(parent);
return createAttribute(instanceAtom, LINK(expr->queryChild(0)), LINK(expr->queryChild(1)), parent);
}
else
{
IHqlExpression * parent = expr->queryChild(1);
if (parent && parent->isAttribute())
parent = stripTrueFalse(parent);
return createAttribute(tempAtom, LINK(expr->queryChild(0)), parent);
}
}
unsigned ResourcerInfo::calcNumConditionalUses()
{
//for thor and hthor, where the conditional graph is cloned, the maximum number of times it can be read
//is 1 for a shared conditional graph, or once for each combination of conditions it is used as the
//input for. However if it is used more than once in a single branch of a condition it needs to be counted
//several times.
//It is always better to overestimate than underestimate. (But even better to get it right.)
HqlExprArray uniqueConditions;
UnsignedArray uniqueCounts;
ForEachItemIn(idx, conditions)
{
IHqlExpression & cur = conditions.item(idx);
OwnedHqlExpr unique = stripTrueFalse(&cur);
unsigned numOccurences = 0;
ForEachItemIn(j, conditions)
if (&conditions.item(j) == &cur)
numOccurences++;
unsigned match = uniqueConditions.find(*unique);
if (match == NotFound)
{
uniqueConditions.append(*unique.getClear());
uniqueCounts.append(numOccurences);
}
else
{
if (uniqueCounts.item(match) < numOccurences)
uniqueCounts.replace(numOccurences, match);
}
}
unsigned condUses = 0;
ForEachItemIn(k, uniqueCounts)
condUses += uniqueCounts.item(k);
return condUses;
}
// Each aggregate has form. Setresult(select(selectnth(aggregate...,1),field),name,seq)
IHqlExpression * ResourcerInfo::createAggregation(IHqlExpression * expr)
{
LinkedHqlExpr transformed = expr;
ForEachItemIn(idx, aggregates)
{
IHqlExpression & cur = aggregates.item(idx);
IHqlExpression * row2ds = cur.queryChild(0);
IHqlExpression * selectnth = row2ds->queryChild(0);
IHqlExpression * aggregate = selectnth->queryChild(0);
IHqlExpression * aggregateRecord = aggregate->queryChild(1);
assertex(aggregate->getOperator() == no_newaggregate);
HqlExprArray aggargs, setargs;
//Through aggregate has form (dataset, record, transform, list of set results);
aggargs.append(*LINK(transformed));
aggargs.append(*LINK(aggregateRecord));
IHqlExpression * mapped = replaceSelector(aggregate->queryChild(2), original, expr->queryNormalizedSelector());
aggargs.append(*mapped);
OwnedHqlExpr active = createDataset(no_anon, LINK(aggregateRecord), NULL);
OwnedHqlExpr mappedSelect = replaceSelector(cur.queryChild(1), row2ds, active);
setargs.append(*LINK(active));
setargs.append(*LINK(mappedSelect));
unwindChildren(setargs, &cur, 1);
aggargs.append(*createValue(no_extractresult, makeVoidType(), setargs));
transformed.setown(createDataset(no_throughaggregate, aggargs));
}
return transformed.getClear();
}
bool ResourcerInfo::useGraphResult()
{
if (!options->useGraphResults)
return false;
if (linkedFromChild)
return true;
if (options->targetClusterType != HThorCluster)
return false;
return true;
}
bool ResourcerInfo::useGlobalResult()
{
return (linkedFromChild && !useGraphResult());
}
IHqlExpression * ResourcerInfo::createSpillName()
{
if (outputToUseForSpill)
return LINK(outputToUseForSpill->queryChild(1));
if (!spillName)
{
if (useGraphResult())
spillName.setown(options->createSpillName(true));
else if (useGlobalResult())
{
StringBuffer valueText;
getUniqueId(valueText.append("spill"));
spillName.setown(createConstant(valueText.str()));
}
else
spillName.setown(options->createSpillName(false));
}
return LINK(spillName);
}
IHqlExpression * ResourcerInfo::createSpilledRead(IHqlExpression * spillReason)
{
OwnedHqlExpr dataset;
HqlExprArray args;
IHqlExpression * record = original->queryRecord();
bool loseDistribution = true;
if (useGraphResult())
{
args.append(*LINK(record));
args.append(*LINK(options->graphIdExpr));
args.append(*createSpillName());
if (isGrouped(original))
args.append(*createAttribute(groupedAtom));
if (!original->isDataset())
args.append(*createAttribute(rowAtom));
args.append(*createAttribute(_spill_Atom));
IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
if (recordCountAttr)
args.append(*LINK(recordCountAttr));
dataset.setown(createDataset(no_getgraphresult, args));
}
else if (useGlobalResult())
{
args.append(*LINK(record));
args.append(*createAttribute(nameAtom, createSpillName()));
args.append(*createAttribute(sequenceAtom, getLocalSequenceNumber()));
addSpillFlags(args, true);
IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
if (recordCountAttr)
args.append(*LINK(recordCountAttr));
dataset.setown(createDataset(no_workunit_dataset, args));
}
else
{
if (spilledDataset)
{
args.append(*LINK(spilledDataset));
args.append(*createSpillName());
}
else
{
args.append(*createSpillName());
args.append(*LINK(record));
}
args.append(*createValue(no_thor));
addSpillFlags(args, true);
args.append(*createUniqueId());
if (options->isChildQuery && options->targetRoxie())
{
args.append(*createAttribute(_colocal_Atom));
args.append(*createLocalAttribute());
}
if (spillReason)
args.append(*LINK(spillReason));
if (spilledDataset)
dataset.setown(createDataset(no_readspill, args));
else
{
IHqlExpression * recordCountAttr = queryRecordCountInfo(original);
if (recordCountAttr)
args.append(*LINK(recordCountAttr));
dataset.setown(createDataset(no_table, args));
}
loseDistribution = false;
}
dataset.setown(preserveTableInfo(dataset, original, loseDistribution, NULL));
return wrapRowOwn(dataset.getClear());
}
IHqlExpression * ResourcerInfo::createSpilledWrite(IHqlExpression * transformed)
{
assertex(!outputToUseForSpill);
HqlExprArray args;
if (useGraphResult())
{
assertex(options->graphIdExpr);
args.append(*LINK(transformed));
args.append(*LINK(options->graphIdExpr));
args.append(*createSpillName());
args.append(*createAttribute(_spill_Atom));
return createValue(no_setgraphresult, makeVoidType(), args);
}
else if (useGlobalResult())
{
args.append(*LINK(transformed));
args.append(*createAttribute(sequenceAtom, getLocalSequenceNumber()));
args.append(*createAttribute(namedAtom, createSpillName()));
addSpillFlags(args, false);
return createValue(no_output, makeVoidType(), args);
}
else
{
if (options->createSpillAsDataset)
{
IHqlExpression * value = LINK(transformed);
if (value->isDatarow())
value = createDatasetFromRow(value);
spilledDataset.setown(createDataset(no_commonspill, value));
args.append(*LINK(spilledDataset));
}
else
args.append(*LINK(transformed));
args.append(*createSpillName());
addSpillFlags(args, false);
if (options->createSpillAsDataset)
return createValue(no_writespill, makeVoidType(), args);
return createValue(no_output, makeVoidType(), args);
}
}
bool ResourcerInfo::okToSpillThrough()
{
bool isGraphResult = useGraphResult();
if (isGraphResult)
return options->allowThroughResult;
if (useGlobalResult())
return false;
return (options->allowThroughSpill && !options->createSpillAsDataset);
}
bool ResourcerInfo::spillSharesSplitter()
{
if (outputToUseForSpill || useGraphResult() || useGlobalResult())
return false;
if (okToSpillThrough())
return false;
if (!isSplit() || !balanced)
return false;
return true;
}
IHqlExpression * ResourcerInfo::createSpiller(IHqlExpression * transformed, bool reuseSplitter)
{
if (outputToUseForSpill)
return LINK(transformed);
if (!okToSpillThrough())
{
OwnedHqlExpr split;
if (reuseSplitter)
{
assertex(transformed->getOperator() == no_split);
split.set(transformed);
}
else
{
if (transformed->isDataset())
split.setown(createDataset(no_split, LINK(transformed), createAttribute(balancedAtom)));
else
split.setown(createRow(no_split, LINK(transformed), createAttribute(balancedAtom)));
split.setown(cloneInheritedAnnotations(original, split));
}
splitterOutput.setown(createSpilledWrite(split));
return split.getClear();
}
HqlExprArray args;
node_operator op;
args.append(*LINK(transformed));
if (useGraphResult())
{
op = no_spillgraphresult;
args.append(*LINK(options->graphIdExpr));
args.append(*createSpillName());
args.append(*createAttribute(_spill_Atom));
}
else
{
op = no_spill;
args.append(*createSpillName());
addSpillFlags(args, false);
}
OwnedHqlExpr spill;
if (original->isDatarow())
spill.setown(createRow(op, args));
else
spill.setown(createDataset(op, args));
return cloneInheritedAnnotations(original, spill);
}
IHqlExpression * ResourcerInfo::createSplitter(IHqlExpression * transformed)
{
if (transformed->getOperator() == no_libraryscopeinstance)
return LINK(transformed);
IHqlExpression * attr = createUniqueId();
if (balanced)
attr = createComma(attr, createAttribute(balancedAtom));
OwnedHqlExpr split;
if (transformed->isDataset())
split.setown(createDataset(no_split, LINK(transformed), attr));
else
split.setown(createRow(no_split, LINK(transformed), attr));
return cloneInheritedAnnotations(original, split);
}
IHqlExpression * ResourcerInfo::createTransformedExpr(IHqlExpression * expr)
{
LinkedHqlExpr transformed = expr;
if (aggregates.ordinality())
transformed.setown(createAggregation(transformed));
if (spillSharesSplitter())
{
transformed.setown(createSplitter(transformed));
if (isExternalSpill())
transformed.setown(createSpiller(transformed, true));
}
else
{
if (isExternalSpill())
transformed.setown(createSpiller(transformed, false));
if (isSplit())
transformed.setown(createSplitter(transformed));
else if (isSpilledWrite())
transformed.setown(createSpilledWrite(transformed));
}
return transformed.getClear();
}
bool ResourcerInfo::expandRatherThanSpill(bool noteOtherSpills)
{
if (options->shareDontExpand)
return expandRatherThanSplit();
IHqlExpression * expr = original;
if (!options->minimiseSpills || linkedFromChild)
noteOtherSpills = false;
if (noteOtherSpills)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (info && info->isSpilledWrite())
return (info->transformed == NULL);
}
bool isFiltered = false;
bool isProcessed = false;
loop
{
ResourcerInfo * info = queryResourceInfo(expr);
if (info && info->neverSplit)
return true;
node_operator op = expr->getOperator();
switch (op)
{
case no_table:
{
//This is only executed for hthor/thor. Roxie has used expandRatherThanSplit().
//We need to balance the saving from reading reduced data in the other branches with the cost of
//writing the spill file to disk.
if (isFiltered && (numExternalUses >= options->filteredSpillThreshold))
return false;
IHqlExpression * mode = expr->queryChild(2);
switch (mode->getOperator())
{
case no_thor: case no_flat:
//MORE: The following is possibly better - but roxie should be able to read from non spill data files in child queries fine
//if ((options->targetClusterType == RoxieCluster) && linkedFromChild)) return false;
return true;
default:
return false;
}
}
case no_stepped:
return true;
case no_inlinetable:
{
IHqlExpression * transforms = expr->queryChild(0);
//The inline table code means this should generate smaller code, and more efficient
if (!isFiltered && !isProcessed && transforms->isConstant())
return true;
if (transforms->numChildren() > MAX_INLINE_COMMON_COUNT)
return false;
return true;
}
case no_getresult:
case no_temptable:
case no_rows:
case no_xmlproject:
case no_workunit_dataset:
return !isProcessed && !isFiltered;
case no_getgraphresult:
return !expr->hasProperty(_streaming_Atom); // we must not duplicate streaming inputs!
case no_keyindex:
case no_newkeyindex:
if (!isFiltered)
return true;
return options->cloneFilteredIndex;
case no_datasetfromrow:
if (getNumActivityArguments(expr) == 0)
return true;
return false;
case no_fail:
case no_null:
return !expr->isAction();
case no_assertsorted:
case no_sorted:
case no_grouped:
case no_distributed:
case no_preservemeta:
case no_nofold:
case no_nohoist:
case no_section:
case no_sectioninput:
expr = expr->queryChild(0);
break;
case no_newusertable:
case no_limit:
case no_keyedlimit:
expr = expr->queryChild(0);
isProcessed = true;
break;
case no_hqlproject:
if (expr->hasProperty(_countProject_Atom) || expr->hasProperty(prefetchAtom))
return false;
expr = expr->queryChild(0);
isProcessed = true;
break;
//MORE: Not so sure about all the following, include them so behaviour doesn't change
case no_compound_diskread:
case no_compound_disknormalize:
case no_compound_diskaggregate:
case no_compound_diskcount:
case no_compound_diskgroupaggregate:
case no_compound_indexread:
case no_compound_indexnormalize:
case no_compound_indexaggregate:
case no_compound_indexcount:
case no_compound_indexgroupaggregate:
case no_compound_childread:
case no_compound_childnormalize:
case no_compound_childaggregate:
case no_compound_childcount:
case no_compound_childgroupaggregate:
case no_compound_selectnew:
case no_compound_inline:
expr = expr->queryChild(0);
break;
case no_filter:
isFiltered = true;
expr = expr->queryChild(0);
break;
case no_select:
{
if (options->targetClusterType == RoxieCluster)
return false;
if (!expr->hasProperty(newAtom))
return true;
expr = expr->queryChild(0);
break;
}
default:
return false;
}
//The following reduces the number of spills by taking into account other spills.
if (noteOtherSpills)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (info)
{
if (info->isSpilledWrite())
return (info->transformed == NULL);
if (info->numExternalUses)
{
if (isFiltered && (numExternalUses >= options->filteredSpillThreshold))
return false;
return true;
}
}
}
}
}
bool ResourcerInfo::expandRatherThanSplit()
{
//MORE: This doesn't really work - should do indexMatching first.
//should only expand if one side that uses this is also filtered
IHqlExpression * expr = original;
loop
{
ResourcerInfo * info = queryResourceInfo(expr);
if (info && info->neverSplit)
return true;
switch (expr->getOperator())
{
case no_keyindex:
case no_newkeyindex:
case no_rowset:
case no_getgraphloopresultset:
return true;
case no_null:
return !expr->isAction();
case no_inlinetable:
if (options->expandSingleConstRow && hasSingleRow(expr))
{
IHqlExpression * values = expr->queryChild(0);
if (values->queryChild(0)->isConstant())
return true;
}
return false;
case no_stepped:
case no_rowsetrange:
case no_rowsetindex:
return true;
case no_sorted:
case no_grouped:
case no_distributed:
case no_preservemeta:
case no_compound_diskread:
case no_compound_disknormalize:
case no_compound_diskaggregate:
case no_compound_diskcount:
case no_compound_diskgroupaggregate:
case no_compound_indexread:
case no_compound_indexnormalize:
case no_compound_indexaggregate:
case no_compound_indexcount:
case no_compound_indexgroupaggregate:
case no_compound_childread:
case no_compound_childnormalize:
case no_compound_childaggregate:
case no_compound_childcount:
case no_compound_childgroupaggregate:
case no_compound_selectnew:
case no_compound_inline:
case no_section:
case no_sectioninput:
break;
case no_select:
if (options->targetClusterType == RoxieCluster)
return false;
if (!expr->hasProperty(newAtom))
return true;
break;
default:
return false;
}
expr = expr->queryChild(0);
}
}
bool neverCommonUp(IHqlExpression * expr)
{
loop
{
node_operator op = expr->getOperator();
switch (op)
{
case no_keyindex:
case no_newkeyindex:
return true;
case no_filter:
expr = expr->queryChild(0);
break;
default:
return false;
}
}
}
bool ResourcerInfo::neverCommonUp()
{
return ::neverCommonUp(original);
}
bool ResourcerInfo::isExternalSpill()
{
if (expandRatherThanSpill(true) || (numInternalUses() == 0))
return false;
return (numExternalUses != 0);
}
bool ResourcerInfo::isSplit()
{
return numSplitPaths() > 1;
}
unsigned ResourcerInfo::numSplitPaths()
{
unsigned internal = numInternalUses();
if ((internal == 0) || !options->allowSplitBetweenSubGraphs)
return internal;
//MORE
return internal;
}
bool ResourcerInfo::isSpilledWrite()
{
if (numInternalUses() == 0)
return true;
return false;
}
IHqlExpression * ResourcerInfo::wrapRowOwn(IHqlExpression * expr)
{
if (!original->isDataset())
expr = createRow(no_selectnth, expr, getSizetConstant(1));
return expr;
}
//---------------------------------------------------------------------------
EclResourcer::EclResourcer(IErrorReceiver * _errors, IConstWorkUnit * _wu, ClusterType _targetClusterType, unsigned _clusterSize, const HqlCppOptions & _translatorOptions)
{
wu.set(_wu);
errors = _errors;
lockTransformMutex();
targetClusterType = _targetClusterType;
clusterSize = _clusterSize ? _clusterSize : FIXED_CLUSTER_SIZE;
insideNeverSplit = false;
options.mangleSpillNameWithWuid = false;
options.minimizeSpillSize = _translatorOptions.minimizeSpillSize;
bool wuidIsConstant = (targetClusterType == RoxieCluster) || !wu->getCloneable();
if (wuidIsConstant)
{
SCMStringBuffer wuNameText;
wu->getWuid(wuNameText);
options.filenameMangler.set(wuNameText.str());
}
unsigned totalMemory = _translatorOptions.resourceMaxMemory ? _translatorOptions.resourceMaxMemory : DEFAULT_TOTAL_MEMORY;
unsigned maxSockets = _translatorOptions.resourceMaxSockets ? _translatorOptions.resourceMaxSockets : DEFAULT_MAX_SOCKETS;
unsigned maxActivities = _translatorOptions.resourceMaxActivities ? _translatorOptions.resourceMaxActivities : DEFAULT_MAX_ACTIVITIES;
unsigned maxHeavy = _translatorOptions.resourceMaxHeavy;
unsigned maxDistribute = _translatorOptions.resourceMaxDistribute;
resourceLimit = new CResources(0);
resourceLimit->set(RESactivities, maxActivities);
switch (targetClusterType)
{
case ThorCluster:
case ThorLCRCluster:
resourceLimit->set(RESheavy, maxHeavy).set(REShashdist, maxDistribute);
resourceLimit->set(RESmastersocket, maxSockets).set(RESslavememory,totalMemory);
resourceLimit->set(RESslavesocket, maxSockets).set(RESmastermemory, totalMemory);
break;
default:
resourceLimit->set(RESheavy, 0xffffffff).set(REShashdist, 0xffffffff);
resourceLimit->set(RESmastersocket, 0xffffffff).set(RESslavememory, 0xffffffff);
resourceLimit->set(RESslavesocket, 0xffffffff).set(RESmastermemory, 0xffffffff);
clusterSize = 1;
break;
}
if (_translatorOptions.unlimitedResources)
{
resourceLimit->set(RESheavy, 0xffffffff).set(REShashdist, 0xffffffff);
resourceLimit->set(RESmastersocket, 0xffffffff).set(RESslavememory,0xffffffff);
resourceLimit->set(RESslavesocket, 0xffffffff).set(RESmastermemory,0xffffffff);
}
#ifdef MPHASHDISTRIBUTE
options.useMpForDistribute = true;
#else
options.useMpForDistribute = false;
#endif
if (_translatorOptions.hasResourceUseMpForDistribute)
options.useMpForDistribute = _translatorOptions.resourceUseMpForDistribute;
options.isChildQuery = false;
options.targetClusterType = targetClusterType;
options.filteredSpillThreshold = _translatorOptions.filteredReadSpillThreshold;
options.allowThroughSpill = _translatorOptions.allowThroughSpill;
options.allowThroughResult = (targetClusterType != RoxieCluster) && (targetClusterType != ThorLCRCluster);
options.cloneFilteredIndex = (targetClusterType != RoxieCluster);
options.spillSharedConditionals = (targetClusterType == RoxieCluster);
options.shareDontExpand = (targetClusterType == RoxieCluster);
options.graphIdExpr = NULL;
//MORE The following doesn't always work - it gets sometimes confused about spill files - see latestheaderbuild for an example.
//Try again once cloneConditionals is false for thor
options.minimiseSpills = _translatorOptions.minimiseSpills;
spillMultiCondition = _translatorOptions.spillMultiCondition;
spotThroughAggregate = _translatorOptions.spotThroughAggregate && (targetClusterType != RoxieCluster) && (targetClusterType != ThorLCRCluster);
options.noConditionalLinks = (targetClusterType == RoxieCluster);
options.hoistResourced = _translatorOptions.hoistResourced;
options.useGraphResults = false; // modified by later call
options.groupedChildIterators = _translatorOptions.groupedChildIterators;
options.allowSplitBetweenSubGraphs = false;//(targetClusterType == RoxieCluster);
options.supportsChildQueries = (targetClusterType != ThorCluster);
options.clusterSize = clusterSize;
options.preventKeyedSplit = _translatorOptions.preventKeyedSplit;
options.preventSteppedSplit = _translatorOptions.preventSteppedSplit;
options.minimizeSkewBeforeSpill = _translatorOptions.minimizeSkewBeforeSpill;
options.expandSingleConstRow = true;
options.createSpillAsDataset = _translatorOptions.optimizeSpillProject && (targetClusterType != HThorCluster);
}
EclResourcer::~EclResourcer()
{
delete resourceLimit;
unlockTransformMutex();
}
void EclResourcer::setChildQuery(bool value)
{
options.isChildQuery = value;
if (value)
options.createSpillAsDataset = false;
}
void EclResourcer::setNewChildQuery(IHqlExpression * graphIdExpr, unsigned numResults)
{
options.graphIdExpr = graphIdExpr;
options.nextResult = numResults;
}
void EclResourcer::changeGraph(IHqlExpression * expr, ResourceGraphInfo * newGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
info->graph.set(newGraph);
ForEachItemInRev(idx, links)
{
ResourceGraphLink & cur = links.item(idx);
if (cur.sourceNode == expr)
cur.changeSourceGraph(newGraph);
else if (cur.sinkNode == expr)
cur.changeSinkGraph(newGraph);
assertex(cur.sinkGraph != cur.sourceGraph);
}
}
ResourceGraphInfo * EclResourcer::createGraph()
{
ResourceGraphInfo * graph = new ResourceGraphInfo(&options);
graphs.append(*LINK(graph));
//PrintLog("Create graph %p", graph);
return graph;
}
void EclResourcer::connectGraphs(ResourceGraphInfo * sourceGraph, IHqlExpression * sourceNode, ResourceGraphInfo * sinkGraph, IHqlExpression * sinkNode, LinkKind kind)
{
ResourceGraphLink * link = new ResourceGraphLink(sourceGraph, sourceNode, sinkGraph, sinkNode, kind);
links.append(*link);
if (sourceGraph)
sourceGraph->sinks.append(*link);
if (sinkGraph)
sinkGraph->sources.append(*link);
}
ResourcerInfo * EclResourcer::queryCreateResourceInfo(IHqlExpression * expr)
{
IHqlExpression * body = expr->queryBody();
ResourcerInfo * info = (ResourcerInfo *)body->queryTransformExtra();
if (!info)
{
info = new ResourcerInfo(expr, &options);
body->setTransformExtraOwned(info);
}
return info;
}
void EclResourcer::replaceGraphReferences(IHqlExpression * expr, ResourceGraphInfo * oldGraph, ResourceGraphInfo * newGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return;
if (info->isActivity && info->graph != oldGraph)
return;
info->graph.set(newGraph);
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx=first; idx < last; idx++)
replaceGraphReferences(expr->queryChild(idx), oldGraph, newGraph);
}
void EclResourcer::removeLink(ResourceGraphLink & link, bool keepExternalUses)
{
ResourcerInfo * info = (ResourcerInfo *)queryResourceInfo(link.sourceNode);
assertex(info && info->numExternalUses > 0);
if (!keepExternalUses)
info->numExternalUses--;
if (link.sinkGraph)
link.sinkGraph->sources.zap(link);
link.sourceGraph->sinks.zap(link);
links.zap(link);
}
void EclResourcer::replaceGraphReferences(ResourceGraphInfo * oldGraph, ResourceGraphInfo * newGraph)
{
ForEachItemIn(idx1, oldGraph->sinks)
{
ResourceGraphLink & sink = oldGraph->sinks.item(idx1);
replaceGraphReferences(sink.sourceNode, oldGraph, newGraph);
}
ForEachItemInRev(idx2, links)
{
ResourceGraphLink & cur = links.item(idx2);
if (cur.sourceGraph == oldGraph)
{
if (cur.sinkGraph == newGraph)
removeLink(cur, false);
else
cur.changeSourceGraph(newGraph);
}
else if (cur.sinkGraph == oldGraph)
{
if (cur.sourceGraph == newGraph)
removeLink(cur, false);
else
cur.changeSinkGraph(newGraph);
}
}
ForEachItemIn(idx3, graphs)
graphs.item(idx3).replaceReferences(oldGraph, newGraph);
}
//------------------------------------------------------------------------------------------
// PASS1: Gather information about splitter locations..
void EclResourcer::tagActiveCursors(HqlExprCopyArray & activeRows)
{
ForEachItemIn(i, activeRows)
{
IHqlExpression & cur = activeRows.item(i);
activeSelectors.append(cur);
queryCreateResourceInfo(&cur)->isAlreadyInScope = true;
}
}
inline bool projectSelectorDatasetToField(IHqlExpression * row)
{
return ((row->getOperator() == no_selectnth) && getFieldCount(row->queryRecord()) > 1);
}
static HqlTransformerInfo eclHoistLocatorInfo("EclHoistLocator");
class EclHoistLocator : public NewHqlTransformer
{
public:
EclHoistLocator(HqlExprCopyArray & _originalMatches, HqlExprArray & _matches, BoolArray & _alwaysHoistMatches)
: NewHqlTransformer(eclHoistLocatorInfo), originalMatched(_originalMatches), matched(_matches), alwaysHoistMatches(_alwaysHoistMatches)
{
}
void noteDataset(IHqlExpression * expr, IHqlExpression * hoisted, bool alwaysHoist)
{
unsigned match = originalMatched.find(*expr);
if (match == NotFound)
{
if (!hoisted)
hoisted = expr;
originalMatched.append(*expr);
matched.append(*LINK(hoisted));
alwaysHoistMatches.append(alwaysHoist);
}
else if (alwaysHoist && !alwaysHoistMatches.item(match))
alwaysHoistMatches.replace(true, match);
}
void noteScalar(IHqlExpression * expr, IHqlExpression * value)
{
if (!originalMatched.contains(*expr))
{
OwnedHqlExpr hoisted;
if (value->getOperator() == no_select)
{
bool isNew;
IHqlExpression * row = querySelectorDataset(value, isNew);
if(isNew || row->isDatarow())
{
if (projectSelectorDatasetToField(row))
{
IHqlExpression * ds = row->queryChild(0);
//Project down to a single field.so implicit fields can still be optimized
IHqlExpression * field = value->queryChild(1);
OwnedHqlExpr record = createRecord(field);
OwnedHqlExpr self = getSelf(record);
OwnedHqlExpr activeDs = createRow(no_activetable, LINK(ds->queryNormalizedSelector()));
OwnedHqlExpr selected = replaceSelector(value, row, activeDs);
OwnedHqlExpr assign = createAssign(createSelectExpr(LINK(self), LINK(field)), selected.getClear());
OwnedHqlExpr transform = createValue(no_newtransform, makeTransformType(record->getType()), LINK(assign));
OwnedHqlExpr projectedDs = createDataset(no_newusertable, LINK(ds), createComma(LINK(record), LINK(transform)));
hoisted.setown(replaceChild(row, 0, projectedDs));
}
if (!hoisted)
hoisted.set(row);
}
}
else if (value->getOperator() == no_createset)
{
IHqlExpression * ds = value->queryChild(0);
IHqlExpression * selected = value->queryChild(1);
OwnedHqlExpr field;
//Project down to a single field.so implicit fields can still be optimized
if (selected->getOperator() == no_select)
field.set(selected->queryChild(1));
else
field.setown(createField(valueAtom, selected->getType(), NULL));
OwnedHqlExpr record = createRecord(field);
OwnedHqlExpr self = getSelf(record);
OwnedHqlExpr assign = createAssign(createSelectExpr(LINK(self), LINK(field)), LINK(selected));
OwnedHqlExpr transform = createValue(no_newtransform, makeTransformType(record->getType()), LINK(assign));
hoisted.setown(createDataset(no_newusertable, LINK(ds), createComma(LINK(record), LINK(transform))));
}
if (!hoisted)
{
OwnedHqlExpr field = createField(valueAtom, value->getType(), NULL);
OwnedHqlExpr record = createRecord(field);
OwnedHqlExpr self = getSelf(record);
OwnedHqlExpr assign = createAssign(createSelectExpr(LINK(self), LINK(field)), LINK(value));
OwnedHqlExpr transform = createValue(no_transform, makeTransformType(record->getType()), LINK(assign));
hoisted.setown(createRow(no_createrow, LINK(transform)));
}
originalMatched.append(*expr);
matched.append(*hoisted.getClear());
alwaysHoistMatches.append(true);
}
}
protected:
HqlExprCopyArray & originalMatched;
HqlExprArray & matched;
BoolArray & alwaysHoistMatches;
};
class EclChildSplitPointLocator : public EclHoistLocator
{
public:
EclChildSplitPointLocator(IHqlExpression * _original, HqlExprCopyArray & _selectors, HqlExprCopyArray & _originalMatches, HqlExprArray & _matches, BoolArray & _alwaysHoistMatches, bool _groupedChildIterators, bool _supportsChildQueries)
: EclHoistLocator(_originalMatches, _matches, _alwaysHoistMatches), selectors(_selectors), groupedChildIterators(_groupedChildIterators), supportsChildQueries(_supportsChildQueries)
{
original = _original;
okToSelect = false;
gathered = false;
conditionalDepth = 0;
executedOnce = false;
switch (original->getOperator())
{
case no_call:
case no_externalcall:
case no_libraryscopeinstance:
okToSelect = true;
break;
}
}
void findSplitPoints(IHqlExpression * expr, unsigned from, unsigned to, bool _executedOnce)
{
for (unsigned i=from; i < to; i++)
{
IHqlExpression * cur = expr->queryChild(i);
executedOnce = _executedOnce || cur->isAttribute(); // assume attributes are only executed once.
findSplitPoints(cur);
}
}
protected:
void findSplitPoints(IHqlExpression * expr)
{
//containsNonActiveDataset() would be nice - but that isn't percolated outside assigns etc.
if (containsAnyDataset(expr) || containsMustHoist(expr))
{
if (!gathered)
{
gatherAmbiguousSelectors(original);
gathered = true;
}
analyse(expr, 0);
}
}
bool queryHoistDataset(IHqlExpression * ds)
{
bool alwaysHoist = true;
if (executedOnce)
{
if (conditionalDepth != 0)
{
if (supportsChildQueries || canProcessInline(NULL, ds))
alwaysHoist = false;
}
}
return alwaysHoist;
}
bool queryNoteDataset(IHqlExpression * ds)
{
bool alwaysHoist = queryHoistDataset(ds);
//MORE: It should be possible to remove this condition, but it causes problems with resourcing hsss.xhql amongst others -> disable for the moment
if (alwaysHoist)
noteDataset(ds, ds, alwaysHoist);
return alwaysHoist;
}
virtual void analyseExpr(IHqlExpression * expr)
{
if (alreadyVisited(expr))
return;
node_operator op = expr->getOperator();
switch (op)
{
case no_select:
if (expr->hasProperty(newAtom))
{
IHqlExpression * ds = expr->queryChild(0);
if (isEvaluateable(ds))
{
//MORE: Following isn't a very nice test - stops implicit denormalize getting messed up
if (expr->isDataset())
break;
//Debtable.....
//Don't hoist counts on indexes or dataset - it may mean they are evaluated more frequently than need be.
//If dependencies and root graphs are handled correctly this could be deleted.
if (isCompoundAggregate(ds))
break;
if (!expr->isDatarow() && !expr->isDataset())
{
if (queryHoistDataset(ds))
{
noteScalar(expr, expr);
return;
}
}
else
{
if (queryNoteDataset(ds))
return;
}
}
}
break;
case no_createset:
{
IHqlExpression * ds = expr->queryChild(0);
if (isEvaluateable(ds))
{
if (queryHoistDataset(ds))
{
noteScalar(expr, expr);
//?? queryNoteDataset(ds);
return;
}
}
break;
}
case no_assign:
{
IHqlExpression * rhs = expr->queryChild(1);
//if rhs is a new, evaluatable, dataset then we want to add it
if (rhs->isDataset() && isEvaluateable(rhs))
{
if (queryNoteDataset(rhs))
return;
}
break;
}
case no_sizeof:
case no_countfile:
case no_allnodes:
case no_nohoist:
return;
case no_globalscope:
case no_evalonce:
if (expr->isDataset() || expr->isDatarow())
noteDataset(expr, expr->queryChild(0), true);
else
noteScalar(expr, expr->queryChild(0));
return;
case no_thisnode:
throwUnexpected();
case no_getgraphresult:
if (expr->hasProperty(_streaming_Atom))
{
noteDataset(expr, expr, true);
return;
}
break;
case no_getgraphloopresult:
noteDataset(expr, expr, true);
return;
case no_selectnth:
if (expr->queryChild(1)->isConstant())
{
IHqlExpression * ds = expr->queryChild(0);
switch (ds->getOperator())
{
case no_getgraphresult:
if (!expr->hasProperty(_streaming_Atom))
break;
//fallthrough
case no_getgraphloopresult:
noteDataset(expr, expr, true);
return;
}
}
break;
}
bool wasOkToSelect = okToSelect;
if (expr->isDataset())
{
switch (expr->getOperator())
{
case no_compound_diskread:
case no_compound_disknormalize:
case no_compound_diskaggregate:
case no_compound_diskcount:
case no_compound_diskgroupaggregate:
case no_compound_indexread:
case no_compound_indexnormalize:
case no_compound_indexaggregate:
case no_compound_indexcount:
case no_compound_indexgroupaggregate:
case no_compound_childread:
case no_compound_childnormalize:
case no_compound_childaggregate:
case no_compound_childcount:
case no_compound_childgroupaggregate:
case no_compound_selectnew:
case no_compound_inline:
case no_newkeyindex:
case no_keyindex:
case no_table:
okToSelect = false;
break;
}
if (okToSelect && isEvaluateable(expr))
{
if (queryNoteDataset(expr))
return;
}
}
else
okToSelect = true;
switch (op)
{
case no_if:
{
IHqlExpression * cond = expr->queryChild(0);
// bool condCanBeEvaluated = isEvaluateable(cond, true);
analyseExpr(cond);
if (expr->isDataset() || expr->isDatarow())
conditionalDepth++;
doAnalyseChildren(expr, 1);
if (expr->isDataset() || expr->isDatarow())
conditionalDepth--;
break;
}
case no_mapto:
{
analyseExpr(expr->queryChild(0));
if (expr->isDataset() || expr->isDatarow())
conditionalDepth++;
analyseExpr(expr->queryChild(1));
if (expr->isDataset() || expr->isDatarow())
conditionalDepth--;
break;
}
default:
NewHqlTransformer::analyseExpr(expr);
break;
}
okToSelect = wasOkToSelect;
}
bool isCompoundAggregate(IHqlExpression * ds)
{
return false;
//Generates worse code unless we take into account whether or not the newDisk operation flags are enabled.
if (!isTrivialSelectN(ds))
return false;
IHqlExpression * agg = ds->queryChild(0);
if (isSimpleCountAggregate(agg, true))
return true;
return false;
}
void gatherAmbiguousSelectors(IHqlExpression * expr)
{
#ifndef ENSURE_SELSEQ_UID
//Horrible code to try and cope with ambiguous left selectors.
//o Tree is ambiguous so same child expression can occur in different contexts - so can't depend on the context it is found in to work out if can hoist
//o If any selector that is hidden within child expressions matches one in scope then can't hoist it.
//If the current expression creates a selector, then can't hoist anything that depends on it [only add to hidden if in selectors to reduce searching]
//o Want to hoist as much as possible.
if (selectors.empty())
return;
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
unsigned max = expr->numChildren();
unsigned i;
HqlExprCopyArray hiddenSelectors;
for (i = 0; i < first; i++)
expr->queryChild(i)->gatherTablesUsed(&hiddenSelectors, NULL);
for (i = last; i < max; i++)
expr->queryChild(i)->gatherTablesUsed(&hiddenSelectors, NULL);
ForEachItemIn(iSel, selectors)
{
IHqlExpression & cur = selectors.item(iSel);
if (hiddenSelectors.contains(cur))
ambiguousSelectors.append(cur);
}
switch (getChildDatasetType(expr))
{
case childdataset_datasetleft:
case childdataset_left:
{
IHqlExpression * ds = expr->queryChild(0);
IHqlExpression * selSeq = querySelSeq(expr);
OwnedHqlExpr left = createSelector(no_left, ds, selSeq);
if (selectors.contains(*left))
ambiguousSelectors.append(*left);
break;
}
case childdataset_same_left_right:
case childdataset_top_left_right:
case childdataset_nway_left_right:
{
IHqlExpression * ds = expr->queryChild(0);
IHqlExpression * selSeq = querySelSeq(expr);
OwnedHqlExpr left = createSelector(no_left, ds, selSeq);
OwnedHqlExpr right = createSelector(no_right, ds, selSeq);
if (selectors.contains(*left))
ambiguousSelectors.append(*left);
if (selectors.contains(*right))
ambiguousSelectors.append(*right);
break;
}
case childdataset_leftright:
{
IHqlExpression * leftDs = expr->queryChild(0);
IHqlExpression * rightDs = expr->queryChild(1);
IHqlExpression * selSeq = querySelSeq(expr);
OwnedHqlExpr left = createSelector(no_left, leftDs, selSeq);
OwnedHqlExpr right = createSelector(no_right, rightDs, selSeq);
if (selectors.contains(*left))
ambiguousSelectors.append(*left);
if (selectors.contains(*right))
ambiguousSelectors.append(*right);
break;
}
}
#endif
}
bool isEvaluateable(IHqlExpression * ds, bool ignoreInline = false)
{
//Not allowed to hoist
if (isContextDependent(ds, (conditionalDepth == 0), true))
return false;
//MORE: Needs more work for child queries - need a GroupedChildIterator activity
if (isGrouped(ds) && selectors.ordinality() && !groupedChildIterators)
return false;
//Check datasets are available
HqlExprCopyArray scopeUsed;
ds->gatherTablesUsed(NULL, &scopeUsed);
ForEachItemIn(i, scopeUsed)
{
IHqlExpression & cur = scopeUsed.item(i);
if (!selectors.contains(cur))
return false;
if (ambiguousSelectors.contains(cur))
return false;
}
#ifdef MINIMAL_CHANGES
if (isInlineTrivialDataset(ds) || (!ignoreInline && canProcessInline(NULL, ds)))
// if (isInlineTrivialDataset(ds))// || (!ignoreInline && canProcessInline(NULL, ds)))
return false;
#else
//Not worth hoisting...
if (isTrivialDataset(ds))
return false;
#endif
return true;
}
protected:
IHqlExpression * original;
HqlExprCopyArray & selectors;
HqlExprCopyArray ambiguousSelectors;
unsigned conditionalDepth;
bool okToSelect;
bool gathered;
bool groupedChildIterators;
bool executedOnce;
bool supportsChildQueries;
};
void EclResourcer::gatherChildSplitPoints(IHqlExpression * expr, BoolArray & alwaysHoistChild, ResourcerInfo * info, unsigned first, unsigned last)
{
//NB: Don't call member functions to ensure correct nesting of transform mutexes.
EclChildSplitPointLocator locator(expr, activeSelectors, info->originalChildDependents, info->childDependents, alwaysHoistChild, options.groupedChildIterators, options.supportsChildQueries);
unsigned max = expr->numChildren();
//If child queries are supported then don't hoist the expressions if they might only be evaluated once
//because they may be conditional
switch (expr->getOperator())
{
case no_setresult:
case no_selectnth:
//set results, only done once=>don't hoist conditionals
locator.findSplitPoints(expr, last, max, true);
return;
}
locator.findSplitPoints(expr, 0, first, true); // IF() conditions only evaluated once... => don't force
locator.findSplitPoints(expr, last, max, false);
}
class EclThisNodeLocator : public EclHoistLocator
{
public:
EclThisNodeLocator(HqlExprCopyArray & _originalMatches, HqlExprArray & _matches, BoolArray & _alwaysHoistMatches) : EclHoistLocator(_originalMatches, _matches, _alwaysHoistMatches)
{
allNodesDepth = 0;
}
protected:
virtual void analyseExpr(IHqlExpression * expr)
{
//NB: This doesn't really work for no_thisnode occurring in multiple contexts. We should probably hoist it from everywhere if it is hoistable from anywhere,
// although theoretically that gives us problems with ambiguous selectors.
if (alreadyVisited(expr) || !containsThisNode(expr))
return;
node_operator op = expr->getOperator();
switch (op)
{
case no_allnodes:
allNodesDepth++;
NewHqlTransformer::analyseExpr(expr);
allNodesDepth--;
return;
case no_thisnode:
if (allNodesDepth == 0)
{
if (expr->isDataset() || expr->isDatarow())
noteDataset(expr, expr->queryChild(0), true);
else
noteScalar(expr, expr->queryChild(0));
return;
}
allNodesDepth--;
NewHqlTransformer::analyseExpr(expr);
allNodesDepth++;
return;
}
NewHqlTransformer::analyseExpr(expr);
}
protected:
unsigned allNodesDepth;
};
bool EclResourcer::findSplitPoints(IHqlExpression * expr)
{
ResourcerInfo * info = queryResourceInfo(expr);
bool savedInsideNeverSplit = insideNeverSplit;
if (info && info->numUses)
{
if (insideNeverSplit)
info->neverSplit = true;
if (info->isAlreadyInScope && (info->numUses == 0) && expr->isDatarow())
{
// A row is already bound to a temporary
info->isActivity = true;
info->containsActivity = true;
info->numUses++;
//More: May need to force child activities to not be resourced (e.g., if somehow visited via another path)
//info->preserve = true;
return info->containsActivity;
}
if (info->isAlreadyInScope || info->isActivity || !info->containsActivity)
{
info->numUses++;
return info->containsActivity;
}
}
else
{
info = queryCreateResourceInfo(expr);
info->numUses++;
if (insideNeverSplit)
info->neverSplit = true;
bool isActivity = true;
switch (expr->getOperator())
{
case no_select:
//either a select from a setresult or use of a child-dataset
if (expr->hasProperty(newAtom))
{
info->containsActivity = findSplitPoints(expr->queryChild(0));
assertex(queryResourceInfo(expr->queryChild(0))->isActivity);
}
if (expr->isDataset() || expr->isDatarow())
{
info->isActivity = true;
info->containsActivity = true;
}
return info->containsActivity;
case no_mapto:
throwUnexpected();
info->containsActivity = findSplitPoints(expr->queryChild(1));
return info->containsActivity;
case no_activerow:
info->isActivity = true;
info->containsActivity = false;
return false;
case no_attr:
case no_attr_expr:
case no_attr_link:
case no_rowset: // don't resource this as an activity
case no_getgraphloopresultset:
info->isActivity = false;
info->containsActivity = false;
return false;
case no_datasetlist:
isActivity = false;
break;
case no_rowsetrange:
{
//Don't resource this as an activity if it is a function of the input graph rows,
//however we do want to if it is coming from a dataset list.
IHqlExpression * ds = expr->queryChild(0);
//MORE: Should walk further down the tree to allow for nested rowsetranges etc.
if (ds->getOperator() == no_rowset || ds->getOperator() == no_getgraphloopresultset)
{
info->isActivity = false;
info->containsActivity = false;
return false;
}
isActivity = false;
break;
}
case no_keyedlimit:
if (options.preventKeyedSplit)
insideNeverSplit = true;
break;
case no_filter:
if (options.preventKeyedSplit && filterIsKeyed(expr))
insideNeverSplit = true;
break;
case no_hqlproject:
case no_newusertable:
case no_aggregate:
case no_newaggregate:
if (options.preventKeyedSplit && expr->hasProperty(keyedAtom))
insideNeverSplit = true;
break;
case no_stepped:
case no_mergejoin:
case no_nwayjoin:
if (options.preventSteppedSplit)
insideNeverSplit = true;
break;
}
ITypeInfo * type = expr->queryType();
if (!type || type->isScalar())
{
insideNeverSplit = savedInsideNeverSplit;
return false;
}
info->isActivity = isActivity;
info->containsActivity = true;
}
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
if (options.hoistResourced)
{
BoolArray alwaysHoistChild;
switch (expr->getOperator())
{
case no_allnodes:
{
//MORE: This needs to recursively walk and lift any contained no_selfnode, but don't go past another nested no_allnodes;
EclThisNodeLocator locator(info->originalChildDependents, info->childDependents, alwaysHoistChild);
locator.analyse(expr->queryChild(0), 0);
break;
}
default:
{
for (unsigned idx=first; idx < last; idx++)
{
IHqlExpression * cur = expr->queryChild(idx);
findSplitPoints(cur);
}
insideNeverSplit = savedInsideNeverSplit;
gatherChildSplitPoints(expr, alwaysHoistChild, info, first, last);
break;
}
}
insideNeverSplit = false;
ForEachItemIn(i2, info->childDependents)
{
IHqlExpression & cur = info->childDependents.item(i2);
if (alwaysHoistChild.item(i2))
findSplitPoints(&cur);
else
conditionalChildren.append(cur);
}
}
else
{
for (unsigned idx=first; idx < last; idx++)
findSplitPoints(expr->queryChild(idx));
}
insideNeverSplit = savedInsideNeverSplit;
return info->containsActivity;
}
void EclResourcer::findSplitPoints(HqlExprArray & exprs)
{
ForEachItemIn(idx, exprs)
findSplitPoints(&exprs.item(idx));
extendSplitPoints();
}
void EclResourcer::extendSplitPoints()
{
for (unsigned i=0; i < conditionalChildren.ordinality(); i++)
{
IHqlExpression & cur = conditionalChildren.item(i);
if (isWorthForcingHoist(&cur))
findSplitPoints(&cur);
}
}
//------------------------------------------------------------------------------------------
// PASS2: Actually create the subgraphs based on splitters.
void EclResourcer::createInitialGraph(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerGraph, LinkKind linkKind, bool forceNewGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return;
LinkKind childLinkKind = UnconditionalLink;
Linked thisGraph = ownerGraph;
bool forceNewChildGraph = false;
if (info->isActivity)
{
//Need to ensure no_libraryselects are not separated from the no_libraryscopeinstance
//so ensure they are placed in the same graph.
node_operator op = expr->getOperator();
if (op == no_libraryselect)
{
ResourcerInfo * moduleInfo = queryResourceInfo(expr->queryChild(1));
if (!info->graph && moduleInfo->graph)
info->graph.set(moduleInfo->graph);
}
if (info->graph)
{
connectGraphs(info->graph, expr, ownerGraph, owner, linkKind);
info->numExternalUses++;
return;
}
unsigned numUses = info->numUses;
switch (op)
{
case no_libraryscopeinstance:
numUses = 1;
break;
case no_libraryselect:
forceNewGraph = true;
break;
}
if (!ownerGraph || numUses > 1 || (linkKind != UnconditionalLink) || forceNewGraph)
{
thisGraph.setown(createGraph());
connectGraphs(thisGraph, expr, ownerGraph, owner, linkKind);
info->numExternalUses++;
}
info->graph.set(thisGraph);
switch (expr->getOperator())
{
case no_compound:
//NB: First argument is forced into a separate graph
createInitialGraph(expr->queryChild(0), expr, NULL, UnconditionalLink, true);
createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, false);
return;
case no_executewhen:
createInitialGraph(expr->queryChild(0), expr, thisGraph, UnconditionalLink, false);
createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, true);
return;
case no_keyindex:
case no_newkeyindex:
return;
case no_parallel:
{
ForEachChild(i, expr)
createInitialGraph(expr->queryChild(i), expr, thisGraph, UnconditionalLink, true);
return;
}
case no_if:
//conditional nodes, the child branches are marked as conditional
childLinkKind = UnconditionalLink;
thisGraph->mergedConditionSource = true;
if (!options.noConditionalLinks || expr->isAction())
forceNewChildGraph = true;
break;
// case no_nonempty:
case no_sequential:
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
createInitialGraph(expr->queryChild(first), expr, thisGraph, SequenceLink, true);
for (unsigned idx = first+1; idx < last; idx++)
createInitialGraph(expr->queryChild(idx), expr, thisGraph, SequenceLink, true);
return;
}
case no_case:
case no_map:
{
throwUnexpected();
}
case no_output:
{
//Tag the inputs to an output statement, so that if a spill was going to occur we read
//from the output file instead of spilling.
//Needs the grouping to be saved in the same way. Could cope with compressed matching, but not
//much point - since fairly unlikely.
IHqlExpression * filename = expr->queryChild(1);
if (filename && (filename->getOperator() == no_constant) && !expr->hasProperty(xmlAtom) && !expr->hasProperty(csvAtom))
{
IHqlExpression * dataset = expr->queryChild(0);
if (expr->hasProperty(groupedAtom) == (dataset->queryType()->queryGroupInfo() != NULL))
{
StringBuffer filenameText;
filename->queryValue()->getStringValue(filenameText);
ResourcerInfo * childInfo = queryResourceInfo(dataset);
if (!childInfo->linkedFromChild && !isUpdatedConditionally(expr))
childInfo->outputToUseForSpill = expr;
}
}
if (isUpdatedConditionally(expr))
thisGraph->mergedConditionSource = true;
break;
}
case no_buildindex:
if (isUpdatedConditionally(expr))
thisGraph->mergedConditionSource = true;
break;
}
}
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx = first; idx < last; idx++)
createInitialGraph(expr->queryChild(idx), expr, thisGraph, childLinkKind, forceNewChildGraph);
ForEachItemIn(i2, info->childDependents)
{
IHqlExpression & cur = info->childDependents.item(i2);
if (isResourcedActivity(&cur))
createInitialGraph(&cur, expr, thisGraph, SequenceLink, true);
}
}
void EclResourcer::createInitialGraphs(HqlExprArray & exprs)
{
ForEachItemIn(idx, exprs)
createInitialGraph(&exprs.item(idx), NULL, NULL, UnconditionalLink, false);
}
void EclResourcer::createInitialRemoteGraph(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerGraph, bool forceNewGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return;
Linked thisGraph = ownerGraph;
if (info->isActivity)
{
if (info->graph)
{
connectGraphs(info->graph, expr, ownerGraph, owner, UnconditionalLink);
info->numExternalUses++;
return;
}
if (!ownerGraph || forceNewGraph)
{
thisGraph.setown(createGraph());
connectGraphs(thisGraph, expr, ownerGraph, owner, UnconditionalLink);
info->numExternalUses++;
}
info->graph.set(thisGraph);
switch (expr->getOperator())
{
case no_compound:
createInitialRemoteGraph(expr->queryChild(0), expr, NULL, true);
createInitialRemoteGraph(expr->queryChild(1), expr, thisGraph, false);
return;
case no_executewhen:
createInitialRemoteGraph(expr->queryChild(0), expr, thisGraph, false);
createInitialRemoteGraph(expr->queryChild(1), expr, thisGraph, true);
return;
}
}
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx = first; idx < last; idx++)
createInitialRemoteGraph(expr->queryChild(idx), expr, thisGraph, false);
}
void EclResourcer::createInitialRemoteGraphs(HqlExprArray & exprs)
{
ForEachItemIn(idx, exprs)
createInitialRemoteGraph(&exprs.item(idx), NULL, NULL, false);
}
//------------------------------------------------------------------------------------------
// PASS3: Tag graphs/links that are conditional or unconditional
void EclResourcer::markChildDependentsAsUnconditional(ResourcerInfo * info, IHqlExpression * condition)
{
if (options.hoistResourced)
{
ForEachItemIn(i2, info->childDependents)
{
IHqlExpression & cur = info->childDependents.item(i2);
if (isResourcedActivity(&cur))
markAsUnconditional(&cur, NULL, condition);
}
}
}
void EclResourcer::markAsUnconditional(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, IHqlExpression * condition)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return;
if (!info->isActivity)
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx=first; idx < last; idx++)
markAsUnconditional(expr->queryChild(idx), ownerGraph, condition);
return;
}
if (condition)
if (info->addCondition(condition))
condition = NULL;
if (info->pathToExpr == ResourcerInfo::PathUnconditional)
return;
if ((info->pathToExpr == ResourcerInfo::PathConditional) && condition)
{
if (targetClusterType == RoxieCluster)
{
if (spillMultiCondition)
{
if (info->graph != ownerGraph)
info->graph->isUnconditional = true;
}
return;
}
else
{
if (info->graph != ownerGraph)
return;
}
}
bool wasConditional = (info->pathToExpr == ResourcerInfo::PathConditional);
if (!condition)
{
info->pathToExpr = ResourcerInfo::PathUnconditional;
if (info->graph != ownerGraph)
info->graph->isUnconditional = true;
}
else
info->pathToExpr = ResourcerInfo::PathConditional;
node_operator op = expr->getOperator();
switch (op)
{
case no_if:
if (options.noConditionalLinks)
break;
if (condition)
markCondition(expr, condition, wasConditional);
else
{
//This list is processed in a second phase.
if (rootConditions.find(*expr) == NotFound)
rootConditions.append(*LINK(expr));
}
markChildDependentsAsUnconditional(info, condition);
return;
case no_sequential:
// case no_nonempty:
if (!options.isChildQuery)
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
IHqlExpression * child0 = expr->queryChild(0);
markAsUnconditional(child0, info->graph, condition);
queryResourceInfo(child0)->graph->hasConditionSource = true; // force it to generate even if contains something very simple e.g., null action
for (unsigned idx = first+1; idx < last; idx++)
{
OwnedHqlExpr tag = createAttribute(instanceAtom, LINK(expr), getSizetConstant(idx), LINK(condition));
IHqlExpression * child = expr->queryChild(idx);
markAsUnconditional(child, queryResourceInfo(child)->graph, tag);
queryResourceInfo(child)->setConditionSource(tag, !wasConditional);
}
markChildDependentsAsUnconditional(info, condition);
return;
}
break;
case no_case:
case no_map:
UNIMPLEMENTED;
}
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx=first; idx < last; idx++)
markAsUnconditional(expr->queryChild(idx), info->graph, condition);
markChildDependentsAsUnconditional(info, condition);
}
void EclResourcer::markConditionBranch(unsigned childIndex, IHqlExpression * expr, IHqlExpression * condition, bool wasConditional)
{
IHqlExpression * child = expr->queryChild(childIndex);
if (child)
{
OwnedHqlExpr tag = createAttribute(((childIndex==1) ? trueAtom : falseAtom), LINK(expr), LINK(condition));
markAsUnconditional(child, queryResourceInfo(child)->graph, tag);
queryResourceInfo(child)->setConditionSource(tag, !wasConditional);
}
}
void EclResourcer::markCondition(IHqlExpression * expr, IHqlExpression * condition, bool wasConditional)
{
markConditionBranch(1, expr, condition, wasConditional);
markConditionBranch(2, expr, condition, wasConditional);
}
void EclResourcer::markConditions(HqlExprArray & exprs)
{
ForEachItemIn(idx, exprs)
markAsUnconditional(&exprs.item(idx), NULL, NULL);
ForEachItemIn(idx2, rootConditions)
markCondition(&rootConditions.item(idx2), NULL, false);
}
//------------------------------------------------------------------------------------------
// PASS4: Split subgraphs based on resource requirements for activities
//This will need to be improved if we allow backtracking to get the best combination of activities to fit in the subgraph
void EclResourcer::createResourceSplit(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerNewGraph, ResourceGraphInfo * originalGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
info->graph.setown(createGraph());
info->graph->isUnconditional = originalGraph->isUnconditional;
changeGraph(expr, info->graph);
connectGraphs(info->graph, expr, ownerNewGraph, owner, UnconditionalLink);
info->numExternalUses++;
}
void EclResourcer::getResources(IHqlExpression * expr, CResources & exprResources)
{
::getResources(expr, exprResources, options);
}
bool EclResourcer::calculateResourceSpillPoints(IHqlExpression * expr, ResourceGraphInfo * graph, CResources & resourcesSoFar, bool hasGoodSpillPoint, bool canSpill)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return true;
if (!info->isActivity)
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
if (last - first == 1)
return calculateResourceSpillPoints(expr->queryChild(first), graph, resourcesSoFar, hasGoodSpillPoint, canSpill);
for (unsigned idx = first; idx < last; idx++)
calculateResourceSpillPoints(expr->queryChild(idx), graph, resourcesSoFar, false, canSpill);
return true;
}
if (info->graph != graph)
return true;
CResources exprResources(clusterSize);
getResources(expr, exprResources);
info->isSpillPoint = false;
Owned curResources = LINK(&resourcesSoFar);
if (resourcesSoFar.addExceeds(exprResources, *resourceLimit))
{
if (hasGoodSpillPoint)
return false;
info->isSpillPoint = true;
spilled = true;
curResources.setown(new CResources(clusterSize));
if (exprResources.exceeds(*resourceLimit))
throwError2(HQLERR_CannotResourceActivity, getOpString(expr->getOperator()), clusterSize);
}
if (options.minimizeSkewBeforeSpill)
{
if (canSpill && heavyweightAndReducesSizeOrSkew(expr))
{
//if the input activity is going to cause us to run out of resources, then it is going to be better to split here than anywhere else
//this code may conceivably cause extra spills far away because the hash distributes are moved.
IHqlExpression * childExpr = expr->queryChild(0);
ResourcerInfo * childInfo = queryResourceInfo(childExpr);
if (childInfo->graph == graph)
{
CResources childResources(clusterSize);
getResources(childExpr, childResources);
childResources.add(exprResources);
if (curResources->addExceeds(childResources, *resourceLimit))
{
info->isSpillPoint = true;
spilled = true;
calculateResourceSpillPoints(childExpr, graph, exprResources, false, true);
return true;
}
}
//otherwise continue as normal.
}
}
curResources->add(exprResources);
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
if (hasGoodSpillPoint)
{
if (exprResources.resource[RESheavy] || exprResources.resource[REShashdist] || last-first != 1)
hasGoodSpillPoint = false;
}
else if (!info->isSpillPoint && canSpill)
{
if (lightweightAndReducesDatasetSize(expr) || queryHint(expr, spillAtom))
{
CResources savedResources(*curResources);
if (!calculateResourceSpillPoints(expr->queryChild(0), graph, *curResources, true, true))
{
curResources->set(savedResources);
info->isSpillPoint = true;
spilled = true;
calculateResourceSpillPoints(expr->queryChild(0), graph, exprResources, false, true);
}
return true;
}
}
if (expr->getOperator() == no_if)
{
//For conditions, spill on intersection of resources used, not union.
CResources savedResources(*curResources);
if (!calculateResourceSpillPoints(expr->queryChild(1), graph, *curResources, hasGoodSpillPoint, true))
return false;
if (expr->queryChild(2))
{
if (!calculateResourceSpillPoints(expr->queryChild(2), graph, savedResources, hasGoodSpillPoint, true))
return false;
curResources->maximize(savedResources);
}
}
else
{
for (unsigned idx = first; idx < last; idx++)
if (!calculateResourceSpillPoints(expr->queryChild(idx), graph, *curResources, hasGoodSpillPoint, true))
return false;
}
return true;
}
void EclResourcer::insertResourceSpillPoints(IHqlExpression * expr, IHqlExpression * owner, ResourceGraphInfo * ownerOriginalGraph, ResourceGraphInfo * ownerNewGraph)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return;
if (!info->isActivity)
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx = first; idx < last; idx++)
insertResourceSpillPoints(expr->queryChild(idx), expr, ownerOriginalGraph, ownerNewGraph);
return;
}
ResourceGraphInfo * originalGraph = info->graph; //NB: Graph will never cease to exist, so don't need to link.
if (originalGraph != ownerOriginalGraph)
return;
if (info->isSpillPoint)
createResourceSplit(expr, owner, ownerNewGraph, originalGraph);
else if (info->graph != ownerNewGraph)
changeGraph(expr, ownerNewGraph);
CResources exprResources(clusterSize);
getResources(expr, exprResources);
bool ok = info->graph->allocateResources(exprResources, *resourceLimit);
assertex(ok);
if (expr->getOperator() == no_if)
{
CResources savedResources(info->graph->resources);
insertResourceSpillPoints(expr->queryChild(1), expr, originalGraph, info->graph);
if (expr->queryChild(2))
{
CResources branchResources(info->graph->resources);
info->graph->resources.set(savedResources);
insertResourceSpillPoints(expr->queryChild(2), expr, originalGraph, info->graph);
info->graph->resources.maximize(branchResources);
}
}
else
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx = first; idx < last; idx++)
insertResourceSpillPoints(expr->queryChild(idx), expr, originalGraph, info->graph);
}
}
void EclResourcer::resourceSubGraph(ResourceGraphInfo * graph)
{
if (graph->beenResourced)
return;
graph->beenResourced = true;
ForEachItemIn(idx, graph->sources)
resourceSubGraph(graph->sources.item(idx).sourceGraph);
IHqlExpression * sourceNode = graph->sinks.item(0).sourceNode->queryBody();
#ifdef _DEBUG
//Sanity check, ensure there is only a single sink for this graph.
//However because libraryselects are tightly bound to their library instance there may be multiple library selects.
//They won't affect the resourcing though, since they'll plug into the same library instance, and the selects use no resources.
ForEachItemIn(idx2, graph->sinks)
{
IHqlExpression * thisSink = graph->sinks.item(idx2).sourceNode->queryBody();
if (thisSink->getOperator() != no_libraryselect)
assertex(thisSink == sourceNode);
}
#endif
spilled = false;
CResources resources(clusterSize);
calculateResourceSpillPoints(sourceNode, graph, resources, false, false);
if (spilled)
insertResourceSpillPoints(sourceNode, NULL, graph, graph);
else
graph->resources.set(resources);
}
void EclResourcer::resourceSubGraphs(HqlExprArray & exprs)
{
ForEachItemIn(idx, graphs)
resourceSubGraph(&graphs.item(idx));
}
//------------------------------------------------------------------------------------------
// PASS5: Link subrgaphs with dependency information so they don't get merged by accident.
void EclResourcer::addDependencySource(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr)
{
//MORE: Should we check this doesn't already exist?
dependencySource.search.append(*LINK(search));
dependencySource.graphs.append(*LINK(curGraph));
dependencySource.exprs.append(*LINK(expr));
}
void EclResourcer::addDependencyUse(IHqlExpression * search, ResourceGraphInfo * curGraph, IHqlExpression * expr)
{
unsigned index = dependencySource.search.find(*search);
if (index != NotFound)
{
if (&dependencySource.graphs.item(index) == curGraph)
{
//Don't give a warning if get/set is within the same activity (e.g., within a local())
if (&dependencySource.exprs.item(index) != expr)
//errors->reportWarning(HQLWRN_RecursiveDependendencies, HQLWRN_RecursiveDependendencies_Text, *codeGeneratorAtom, 0, 0, 0);
errors->reportError(HQLWRN_RecursiveDependendencies, HQLWRN_RecursiveDependendencies_Text, *codeGeneratorAtom, 0, 0, 0);
}
else
{
ResourceGraphLink * link = new ResourceGraphDependencyLink(&dependencySource.graphs.item(index), &dependencySource.exprs.item(index), curGraph, expr);
curGraph->dependsOn.append(*link);
links.append(*link);
}
}
}
void EclResourcer::addRefExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr)
{
IHqlExpression * filename = queryTableFilename(expr);
if (filename)
{
OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
addDependencySource(value, curGraph, activityExpr);
}
}
bool EclResourcer::addExprDependency(IHqlExpression * expr, ResourceGraphInfo * curGraph, IHqlExpression * activityExpr)
{
switch (expr->getOperator())
{
case no_buildindex:
case no_output:
{
IHqlExpression * filename = queryRealChild(expr, 1);
if (filename)
{
switch (filename->getOperator())
{
case no_pipe:
// allWritten = true;
break;
default:
OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
addDependencySource(value, curGraph, activityExpr);
break;
}
}
else
{
IHqlExpression * seq = querySequence(expr);
assertex(seq && seq->queryValue());
IHqlExpression * name = queryResultName(expr);
OwnedHqlExpr value = createAttribute(resultAtom, LINK(seq), LINK(name));
addDependencySource(value, curGraph, activityExpr);
}
return true;
}
case no_keydiff:
{
addRefExprDependency(expr->queryChild(0), curGraph, activityExpr);
addRefExprDependency(expr->queryChild(1), curGraph, activityExpr);
OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(2)));
addDependencySource(value, curGraph, activityExpr);
return true;
}
case no_keypatch:
{
addRefExprDependency(expr->queryChild(0), curGraph, activityExpr);
OwnedHqlExpr patchName = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(1)));
addDependencyUse(patchName, curGraph, activityExpr);
OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(expr->queryChild(2)));
addDependencySource(value, curGraph, activityExpr);
return true;
}
case no_table:
{
IHqlExpression * filename = expr->queryChild(0);
OwnedHqlExpr value = createAttribute(fileAtom, getNormalizedFilename(filename));
addDependencyUse(value, curGraph, activityExpr);
return !filename->isConstant();
}
case no_select:
return expr->hasProperty(newAtom);
case no_workunit_dataset:
{
IHqlExpression * sequence = queryPropertyChild(expr, sequenceAtom, 0);
IHqlExpression * name = queryPropertyChild(expr, nameAtom, 0);
OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
addDependencyUse(value, curGraph, activityExpr);
return false;
}
case no_getresult:
{
IHqlExpression * sequence = queryPropertyChild(expr, sequenceAtom, 0);
IHqlExpression * name = queryPropertyChild(expr, namedAtom, 0);
OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
addDependencyUse(value, curGraph, activityExpr);
return false;
}
case no_getgraphresult:
{
OwnedHqlExpr value = createAttribute(resultAtom, LINK(expr->queryChild(1)), LINK(expr->queryChild(2)));
addDependencyUse(value, curGraph, activityExpr);
return false;
}
case no_setgraphresult:
{
OwnedHqlExpr value = createAttribute(resultAtom, LINK(expr->queryChild(1)), LINK(expr->queryChild(2)));
addDependencySource(value, curGraph, activityExpr);
return true;
}
case no_ensureresult:
case no_setresult:
case no_extractresult:
{
IHqlExpression * sequence = queryPropertyChild(expr, sequenceAtom, 0);
IHqlExpression * name = queryPropertyChild(expr, namedAtom, 0);
OwnedHqlExpr value = createAttribute(resultAtom, LINK(sequence), LINK(name));
addDependencySource(value, curGraph, activityExpr);
return true;
}
case no_attr:
case no_attr_link:
case no_record:
case no_field:
return false; //no need to look any further
default:
return true;
}
}
void EclResourcer::doAddChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
{
if (expr->queryTransformExtra())
return;
expr->setTransformExtraUnlinked(expr);
if (addExprDependency(expr, graph, activityExpr))
{
ForEachChild(idx, expr)
doAddChildDependencies(expr->queryChild(idx), graph, activityExpr);
}
}
void EclResourcer::addChildDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
{
if (graph)
{
lockTransformMutex();
doAddChildDependencies(expr, graph, activityExpr);
unlockTransformMutex();
}
}
void EclResourcer::addDependencies(IHqlExpression * expr, ResourceGraphInfo * graph, IHqlExpression * activityExpr)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (info && info->containsActivity)
{
if (info->isActivity)
{
if (info->gatheredDependencies)
return;
info->gatheredDependencies = true;
graph = info->graph;
activityExpr = expr;
}
if (addExprDependency(expr, graph, activityExpr))
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
ForEachChild(idx, expr)
{
if ((idx >= first) && (idx < last))
addDependencies(expr->queryChild(idx), graph, activityExpr);
else
addChildDependencies(expr->queryChild(idx), graph, activityExpr);
}
}
}
else
addChildDependencies(expr, graph, activityExpr);
ForEachItemIn(i, info->childDependents)
{
IHqlExpression & cur = info->childDependents.item(i);
if (isResourcedActivity(&cur))
{
addDependencies(&cur, NULL, NULL);
ResourcerInfo * sourceInfo = queryResourceInfo(&cur);
sourceInfo->noteUsedFromChild();
ResourceGraphLink * link = new ResourceGraphDependencyLink(sourceInfo->graph, &cur, graph, expr);
graph->dependsOn.append(*link);
links.append(*link);
}
}
}
void EclResourcer::addDependencies(HqlExprArray & exprs)
{
ForEachItemIn(idx, exprs)
addDependencies(&exprs.item(idx), NULL, NULL);
}
void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info)
return;
if (info->currentSource == whichSource)
{
if (info->pathToSplitter != path)
info->balanced = false;
return;
}
if (graph && info->graph && info->graph != graph)
return;
info->currentSource = whichSource;
info->pathToSplitter.set(path);
if (info->containsActivity)
{
unsigned first = getFirstActivityArgument(expr);
unsigned num = getNumActivityArguments(expr);
bool modify = false;
if (num > 1)
{
switch (expr->getOperator())
{
case no_addfiles:
if (expr->hasProperty(_ordered_Atom) || expr->hasProperty(_orderedPull_Atom) || isGrouped(expr))
modify = true;
break;
default:
modify = true;
break;
}
}
unsigned last = first + num;
for (unsigned idx = first; idx < last; idx++)
{
OwnedHqlExpr childPath = modify ? createAttribute(pathAtom, getSizetConstant(idx), LINK(path)) : LINK(path);
spotUnbalancedSplitters(expr->queryChild(idx), whichSource, childPath, graph);
}
}
//Now check dependencies between graphs (for roxie)
if (!graph)
{
if (info->graph)
{
GraphLinkArray & graphLinks = info->graph->dependsOn;
ForEachItemIn(i, graphLinks)
{
ResourceGraphLink & link = graphLinks.item(i);
if (link.sinkNode == expr)
{
OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
spotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
}
}
}
else
{
ForEachItemIn(i, links)
{
ResourceGraphLink & link = links.item(i);
if (link.sinkNode == expr)
{
OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
spotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
}
}
}
}
}
void EclResourcer::spotUnbalancedSplitters(HqlExprArray & exprs)
{
unsigned curSource = 1;
switch (targetClusterType)
{
case HThorCluster:
break;
case ThorCluster:
case ThorLCRCluster:
{
//Thor only handles one graph at a time, so only walk expressions within a single graph.
ForEachItemIn(i1, graphs)
{
ResourceGraphInfo & curGraph = graphs.item(i1);
ForEachItemIn(i2, curGraph.sinks)
{
ResourceGraphLink & cur = curGraph.sinks.item(i2);
spotUnbalancedSplitters(cur.sourceNode, curSource++, 0, &curGraph);
}
}
}
break;
case RoxieCluster:
{
//Roxie pulls all at once, so need to analyse globally.
ForEachItemIn(idx, exprs)
spotUnbalancedSplitters(&exprs.item(idx), curSource++, 0, NULL);
break;
}
}
}
//------------------------------------------------------------------------------------------
// PASS6: Merge sub graphs that can share resources and don't have dependencies
// MORE: Once sources are merged, should try merging between trees.
static bool conditionsMatch(const HqlExprArray & left, const HqlExprArray & right)
{
if (left.ordinality() != right.ordinality())
return false;
ForEachItemIn(i, left)
{
if (!left.contains(right.item(i)) || !right.contains(left.item(i)))
return false;
}
return true;
}
bool EclResourcer::queryMergeGraphLink(ResourceGraphLink & link)
{
if (link.linkKind == UnconditionalLink)
{
//Don't combine any dependencies
const GraphLinkArray & sinks = link.sourceGraph->sinks;
ForEachItemIn(i1, sinks)
{
ResourceGraphLink & cur = sinks.item(i1);
if (cur.sinkGraph && cur.sourceNode->isAction())
return false;
}
//Roxie pulls all subgraphs at same time, so no problem with conditional links since handled at run time.
if (options.noConditionalLinks)
return true;
//No conditionals in the sink graph=>will be executed just as frequently
if (!link.sinkGraph->mergedConditionSource)
return true;
//Is this the only place this source graph is used? If so, always fine to merge
if (sinks.ordinality() == 1)
return true;
//1) if context the source graph is being merged into is unconditional, then it is ok [ could have conditional and unconditional paths to same graph]
//2) if context is conditional, then we don't really want to do it unless the conditions on all sinks are identical, and the only links occur between these two graphs.
// (situation occurs with spill fed into two branches of a join).
bool isConditionalInSinkGraph = false;
bool accessedFromManyGraphs = false;
ForEachItemIn(i, sinks)
{
ResourceGraphLink & cur = sinks.item(i);
if (cur.sinkNode)
{
if (cur.sinkGraph != link.sinkGraph)
accessedFromManyGraphs = true;
else
{
if (!isConditionalInSinkGraph)
{
ResourcerInfo * sinkInfo = queryResourceInfo(cur.sinkNode);
//If this is conditional, don't merge if there is a link to another graph
if ((!cur.isDependency() && sinkInfo->isConditionExpr()) ||
//if (sinkInfo->isConditionExpr() ||
(!sinkInfo->isUnconditional() && sinkInfo->conditions.ordinality()))
isConditionalInSinkGraph = true;
}
}
}
}
if (isConditionalInSinkGraph && accessedFromManyGraphs)
return false;
return true;
}
return false;
}
void EclResourcer::mergeSubGraphs(unsigned pass)
{
unsigned maxDepth = 0;
for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
{
unsigned depth = graphs.item(idx).getDepth();
if (depth > maxDepth)
maxDepth = depth;
}
for (unsigned curDepth = maxDepth+1; curDepth-- != 0;)
{
mergeAgain:
for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
{
ResourceGraphInfo & cur = graphs.item(idx);
if ((cur.getDepth() == curDepth) && !cur.isDead)
{
bool tryAgain;
do
{
tryAgain = false;
for (unsigned idxSource = 0; idxSource < cur.sources.ordinality(); /*incremented in loop*/)
{
ResourceGraphLink & curLink = cur.sources.item(idxSource);
ResourceGraphInfo * source = curLink.sourceGraph;
IHqlExpression * sourceNode = curLink.sourceNode;
bool tryToMerge;
bool expandSourceInPlace = queryResourceInfo(sourceNode)->expandRatherThanSpill(false);
if (pass == 0)
tryToMerge = !expandSourceInPlace;
else
tryToMerge = expandSourceInPlace;
if (tryToMerge)
{
bool ok = true;
ResourcerInfo * sourceResourceInfo = queryResourceInfo(sourceNode);
if (sourceResourceInfo->outputToUseForSpill && (targetClusterType == HThorCluster))
{
if (curLink.sinkNode != sourceResourceInfo->outputToUseForSpill)
ok = false;
}
unsigned curSourceDepth = source->getDepth();
//MORE: Merging identical conditionals?
if (ok && queryMergeGraphLink(curLink) &&
!sourceResourceInfo->expandRatherThanSplit() &&
cur.mergeInSource(*source, *resourceLimit, (curLink.linkKind == ConditionalLink)))
{
//NB: Following cannot remove sources below the current index.
replaceGraphReferences(source, &cur);
source->isDead = true;
#ifdef VERIFY_RESOURCING
checkRecursion(&cur);
#endif
unsigned newDepth = cur.getDepth();
//Unusual: The source we are merging with has just increased in depth, so any
//dependents have also increased in depth. Need to try again at different depth
//to see if one of those will merge in.
if (newDepth > curSourceDepth)
{
curDepth += (newDepth - curSourceDepth);
goto mergeAgain;
}
//depth of this element has changed, so don't check to see if it merges with any other
//sources on this iteration.
if (newDepth != curDepth)
{
tryAgain = false;
break;
}
tryAgain = true;
}
else
idxSource++;
}
else
idxSource++;
}
} while (tryAgain);
}
}
}
ForEachItemInRev(idx2, graphs)
{
if (graphs.item(idx2).isDead)
graphs.remove(idx2);
}
}
void EclResourcer::mergeSubGraphs()
{
for (unsigned pass=0; pass < 2; pass++)
mergeSubGraphs(pass);
}
//------------------------------------------------------------------------------------------
// PASS7: Optimize aggregates off of splitters into through aggregates.
bool EclResourcer::optimizeAggregate(IHqlExpression * expr)
{
if (!isSimpleAggregateResult(expr))
return false;
//expr is a no_extractresult
if (queryResourceInfo(expr)->childDependents.ordinality())
return false;
IHqlExpression * row2ds = expr->queryChild(0); // no_datasetfromrow
IHqlExpression * selectNth = row2ds->queryChild(0);
ResourcerInfo * row2dsInfo = queryResourceInfo(row2ds);
//If more than one set result for the same aggregate don't merge the aggregation because
//it messes up the internal count. Should really fix it and do multiple stores in the
//through aggregate.
if (row2dsInfo->numInternalUses() > 1)
return false;
//Be careful not to lose any spills...
if (row2dsInfo->numExternalUses)
return false;
ResourcerInfo * selectNthInfo = queryResourceInfo(selectNth);
if (selectNthInfo->numExternalUses)
return false;
IHqlExpression * aggregate = selectNth->queryChild(0); // no_newaggregate
IHqlExpression * parent = aggregate->queryChild(0);
ResourcerInfo * info = queryResourceInfo(parent);
if (info->numInternalUses() <= 1)
return false;
//ok, we can go ahead and merge.
info->aggregates.append(*LINK(expr));
// info->numExternalUses--;
// info->numUses--;
return true;
}
void EclResourcer::optimizeAggregates()
{
for (unsigned idx = 0; idx < graphs.ordinality(); idx++)
{
ResourceGraphInfo & cur = graphs.item(idx);
for (unsigned idxSink = 0; idxSink < cur.sinks.ordinality(); /*incremented in loop*/)
{
ResourceGraphLink & link = cur.sinks.item(idxSink);
if ((link.sinkGraph == NULL) && optimizeAggregate(link.sourceNode))
cur.sinks.remove(idxSink);
else
idxSink++;
}
}
}
//------------------------------------------------------------------------------------------
// PASS8: Improve efficiency by merging the split points slightly
IHqlExpression * EclResourcer::findPredecessor(IHqlExpression * expr, IHqlExpression * search, IHqlExpression * prev)
{
if (expr == search)
return prev;
ResourcerInfo * info = queryResourceInfo(expr);
if (info && info->containsActivity)
{
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
for (unsigned idx=first; idx < last; idx++)
{
IHqlExpression * match = findPredecessor(expr->queryChild(idx), search, expr);
if (match)
return match;
}
}
return NULL;
}
IHqlExpression * EclResourcer::findPredecessor(ResourcerInfo * search)
{
ForEachItemIn(idx, links)
{
ResourceGraphLink & cur = links.item(idx);
if (cur.sourceGraph == search->graph)
{
IHqlExpression * match = findPredecessor(cur.sourceNode, search->original, NULL);
if (match)
return match;
}
}
return NULL;
}
void EclResourcer::moveExternalSpillPoints()
{
if (options.minimizeSpillSize == 0)
return;
//if we have a external spill point where all the external outputs reduce their data significantly
//either via a project or a filter, then it might be worth including those activities in the main
//graph and ,if all external children reduce data, then may be best to filter
ForEachItemIn(idx, links)
{
ResourceGraphLink & cur = links.item(idx);
if ((cur.linkKind == UnconditionalLink) && cur.sinkGraph)
{
while (lightweightAndReducesDatasetSize(cur.sinkNode))
{
ResourcerInfo * sourceInfo = queryResourceInfo(cur.sourceNode);
if (!sourceInfo->isExternalSpill() || (sourceInfo->numExternalUses > options.minimizeSpillSize))
break;
ResourcerInfo * sinkInfo = queryResourceInfo(cur.sinkNode);
if (sinkInfo->numInternalUses() != 1)
break;
IHqlExpression * sinkPred = findPredecessor(sinkInfo);
sinkInfo->graph.set(cur.sourceGraph);
sourceInfo->numExternalUses--;
sinkInfo->numExternalUses++;
cur.sourceNode.set(cur.sinkNode);
cur.sinkNode.set(sinkPred);
}
}
}
}
//------------------------------------------------------------------------------------------
// PASS9: Create a new expression tree representing the information
static HqlTransformerInfo childDependentReplacerInfo("ChildDependentReplacer");
class ChildDependentReplacer : public MergingHqlTransformer
{
public:
ChildDependentReplacer(const HqlExprCopyArray & _childDependents, const HqlExprArray & _replacements)
: MergingHqlTransformer(childDependentReplacerInfo), childDependents(_childDependents), replacements(_replacements)
{
}
protected:
virtual IHqlExpression * createTransformed(IHqlExpression * expr)
{
unsigned match = childDependents.find(*expr);
if (match != NotFound)
return LINK(&replacements.item(match));
return MergingHqlTransformer::createTransformed(expr);
}
protected:
const HqlExprCopyArray & childDependents;
const HqlExprArray & replacements;
};
static IHqlExpression * getScalarReplacement(IHqlExpression * original, IHqlExpression * replacement)
{
IHqlExpression * value = original;
//First skip any wrappers which are there to cause things to be hoisted.
loop
{
node_operator op = value->getOperator();
if ((op != no_globalscope) && (op != no_thisnode) && (op != no_evalonce))
break;
value = value->queryChild(0);
}
//Now modify the spilled result depending on how the spilled result was created (see EclHoistLocator::noteScalar() above)
if (value->getOperator() == no_select)
{
IHqlExpression * field = value->queryChild(1);
bool isNew;
IHqlExpression * ds = querySelectorDataset(value, isNew);
if(isNew || ds->isDatarow())
{
if (projectSelectorDatasetToField(ds))
return createNewSelectExpr(LINK(replacement), LINK(field));
return replaceSelectorDataset(value, replacement);
}
}
else if (value->getOperator() == no_createset)
{
IHqlExpression * record = replacement->queryRecord();
IHqlExpression * field = record->queryChild(0);
return createValue(no_createset, original->getType(), LINK(replacement), createSelectExpr(LINK(replacement->queryNormalizedSelector()), LINK(field)));
}
IHqlExpression * record = replacement->queryRecord();
return createNewSelectExpr(LINK(replacement), LINK(record->queryChild(0)));
}
IHqlExpression * EclResourcer::replaceResourcedReferences(ResourcerInfo * info, IHqlExpression * expr)
{
if (!isAffectedByResourcing(expr))
return LINK(expr);
LinkedHqlExpr mapped = expr;
if (info && (info->childDependents.ordinality()))
{
HqlExprArray replacements;
ForEachItemIn(i, info->originalChildDependents)
{
IHqlExpression & cur = info->childDependents.item(i);
LinkedHqlExpr replacement = &cur;
if (isResourcedActivity(&cur))
replacement.setown(createResourced(&cur, NULL, false, false));
IHqlExpression * original = &info->originalChildDependents.item(i);
if (!original->isDataset() && !original->isDatarow())
replacement.setown(getScalarReplacement(original, replacement));
replacements.append(*replacement.getClear());
}
ChildDependentReplacer replacer(info->originalChildDependents, replacements);
mapped.setown(replacer.transformRoot(mapped));
}
return mapped.getClear();
}
IHqlExpression * EclResourcer::doCreateResourced(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, bool expandInParent, bool defineSideEffect)
{
ResourcerInfo * info = queryResourceInfo(expr);
node_operator op = expr->getOperator();
HqlExprArray args;
bool same = true;
unsigned first = getFirstActivityArgument(expr);
unsigned last = first + getNumActivityArguments(expr);
OwnedHqlExpr transformed;
switch (op)
{
case no_if:
{
ForEachChild(idx, expr)
{
IHqlExpression * child = expr->queryChild(idx);
IHqlExpression * resourced;
if ((idx < first) || (idx >= last))
resourced = replaceResourcedReferences(info, child);
else
resourced = createResourced(child, ownerGraph, expandInParent, false);
if (child != resourced)
same = false;
args.append(*resourced);
}
break;
}
case no_case:
case no_map:
UNIMPLEMENTED;
case no_keyed:
return LINK(expr);
case no_compound:
transformed.setown(createResourced(expr->queryChild(1), ownerGraph, expandInParent, false));
break;
case no_executewhen:
{
args.append(*createResourced(expr->queryChild(0), ownerGraph, expandInParent, false));
args.append(*createResourced(expr->queryChild(1), ownerGraph, expandInParent, false));
assertex(args.item(1).getOperator() == no_callsideeffect);
unwindChildren(args, expr, 2);
same = false;
break;
}
case no_join:
case no_denormalize:
case no_denormalizegroup:
if (false)//if (isKeyedJoin(expr))
{
args.append(*createResourced(expr->queryChild(0), ownerGraph, expandInParent, false));
args.append(*LINK(expr->queryChild(1)));
unsigned max = expr->numChildren();
for (unsigned idx=2; idx < max; idx++)
args.append(*replaceResourcedReferences(info, expr->queryChild(idx)));
same = false;
break;
}
//fall through...
default:
{
IHqlExpression * activeTable = NULL;
// Check to see if the activity has a dataset which is in scope for the rest of its arguments.
// If so we'll need to remap references from the children.
if (hasActiveTopDataset(expr) && (first != last))
activeTable = expr->queryChild(0);
ForEachChild(idx, expr)
{
IHqlExpression * child = expr->queryChild(idx);
IHqlExpression * resourced;
if ((idx < first) || (idx >= last))
{
LinkedHqlExpr mapped = child;
if (activeTable && isAffectedByResourcing(child))
{
IHqlExpression * activeTableTransformed = &args.item(0);
if (activeTable != activeTableTransformed)
mapped.setown(scopedReplaceSelector(child, activeTable, activeTableTransformed));
}
resourced = replaceResourcedReferences(info, mapped);
}
else
resourced = createResourced(child, ownerGraph, expandInParent, false);
if (child != resourced)
same = false;
args.append(*resourced);
}
}
break;
}
if (!transformed)
transformed.setown(same ? LINK(expr) : expr->clone(args));
if (!expandInParent)
{
if (!transformed->isAction())
transformed.setown(info->createTransformedExpr(transformed));
else if (defineSideEffect)
transformed.setown(createValue(no_definesideeffect, LINK(transformed), createUniqueId()));
}
return transformed.getClear();
}
/*
Need to be careful because result should not reuse the same expression tree unless that element is a splitter.
createResourced()
{
if (!isActivity)
if (!containsActivity)
replace any refs to the activeTable with whatever it has been mapped to
else
recurse
else if (!isDefinedInSameGraph)
expand/create reader
set active table
else isSplitter and alreadyGeneratedForThisGraph
return previous result
else
create transformed
}
*/
void EclResourcer::doCheckRecursion(ResourceGraphInfo * graph, PointerArray & visited)
{
visited.append(graph);
ForEachItemIn(idxD, graph->dependsOn)
checkRecursion(graph->dependsOn.item(idxD).sourceGraph, visited);
ForEachItemIn(idxS, graph->sources)
checkRecursion(graph->sources.item(idxS).sourceGraph, visited);
visited.pop();
}
void EclResourcer::checkRecursion(ResourceGraphInfo * graph, PointerArray & visited)
{
if (visited.find(graph) != NotFound)
throwUnexpected();
doCheckRecursion(graph, visited);
}
void EclResourcer::checkRecursion(ResourceGraphInfo * graph)
{
PointerArray visited;
doCheckRecursion(graph, visited);
}
IHqlExpression * EclResourcer::createResourced(IHqlExpression * expr, ResourceGraphInfo * ownerGraph, bool expandInParent, bool defineSideEffect)
{
ResourcerInfo * info = queryResourceInfo(expr);
if (!info || !info->containsActivity)
return replaceResourcedReferences(info, expr);
if (!info->isActivity)
{
assertex(!defineSideEffect);
HqlExprArray args;
bool same = true;
ForEachChild(idx, expr)
{
IHqlExpression * cur = expr->queryChild(idx);
IHqlExpression * curResourced = createResourced(cur, ownerGraph, expandInParent, false);
args.append(*curResourced);
if (cur != curResourced)
same = false;
}
if (same)
return LINK(expr);
return expr->clone(args);
}
if (info->graph != ownerGraph)
{
assertex(!defineSideEffect);
IHqlExpression * source;
if (info->expandRatherThanSpill(true))
{
if (options.minimiseSpills)
{
OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandInParent, false);
if (queryAddUniqueToActivity(resourced))
source = appendUniqueAttr(resourced);
else
source = LINK(resourced);
}
else
{
OwnedHqlExpr child = doCreateResourced(expr, info->graph, true, false);
if (queryAddUniqueToActivity(child))
source = appendUniqueAttr(child);
else
source = LINK(child);
}
}
else
{
if (!expr->isAction())
{
OwnedHqlExpr reason;
if (ownerGraph && options.checkResources())
{
StringBuffer reasonText;
ownerGraph->getMergeFailReason(reasonText, info->graph, *resourceLimit);
if (reasonText.length())
{
reasonText.insert(0, "Resource limit spill: ");
reason.setown(createAttribute(_spillReason_Atom, createConstant(reasonText.str())));
}
}
source = info->createSpilledRead(reason);
}
else
{
IHqlExpression * uid = info->transformed->queryProperty(_uid_Atom);
source = createValue(no_callsideeffect, makeVoidType(), LINK(uid));
//source = LINK(info->transformed);
}
}
return source;
}
if (!expandInParent && info->transformed && info->isSplit())
{
return LINK(info->transformed);
}
OwnedHqlExpr resourced = doCreateResourced(expr, ownerGraph, expandInParent, defineSideEffect);
if (queryAddUniqueToActivity(resourced))// && !resourced->hasProperty(_internal_Atom))
resourced.setown(appendUniqueAttr(resourced));
if (!expandInParent)
{
info->transformed = resourced;
}
return resourced.getClear();
}
void EclResourcer::createResourced(ResourceGraphInfo * graph, HqlExprArray & transformed)
{
if (graph->createdGraph || graph->isDead)
return;
if (!graph->containsActiveSinks() && (!graph->hasConditionSource))
return;
#ifdef VERIFY_RESOURCING
checkRecursion(graph);
#endif
// DBGLOG("Prepare to CreateResourced(%p)", graph);
if (graph->startedGeneratingResourced)
throwError(HQLWRN_RecursiveDependendencies);
graph->startedGeneratingResourced = true;
ForEachItemIn(idxD, graph->dependsOn)
createResourced(graph->dependsOn.item(idxD).sourceGraph, transformed);
ForEachItemIn(idxS, graph->sources)
createResourced(graph->sources.item(idxS).sourceGraph, transformed);
// DBGLOG("Create resourced %p", graph);
//First generate the graphs for all the unconditional sinks
HqlExprArray args;
ForEachItemIn(idx, graph->sinks)
{
ResourceGraphLink & sink = graph->sinks.item(idx);
IHqlExpression * sinkNode = sink.sourceNode;
ResourcerInfo * info = queryResourceInfo(sinkNode);
//If graph is unconditional, any condition sinks are forced to be generated (and spilt)
if (!info->transformed)
{
// if it is a spiller, then it will be generated from another sink
if (!info->isExternalSpill())
{
IHqlExpression * resourced = createResourced(sinkNode, graph, false, sinkNode->isAction() && sink.sinkGraph);
assertex(info->transformed);
args.append(*resourced);
}
}
}
ForEachItemIn(i2, graph->sinks)
{
ResourceGraphLink & sink = graph->sinks.item(i2);
IHqlExpression * sinkNode = sink.sourceNode;
ResourcerInfo * info = queryResourceInfo(sinkNode);
IHqlExpression * splitter = info->splitterOutput;
if (splitter && !args.contains(*splitter))
args.append(*LINK(splitter));
}
if (args.ordinality() == 0)
graph->isDead = true;
else
{
if (options.useGraphResults)
args.append(*createAttribute(childAtom));
graph->createdGraph.setown(createValue(no_subgraph, makeVoidType(), args));
transformed.append(*LINK(graph->createdGraph));
}
}
void EclResourcer::inheritRedundantDependencies(ResourceGraphInfo * thisGraph)
{
if (thisGraph->inheritedExpandedDependencies)
return;
thisGraph->inheritedExpandedDependencies = true;
ForEachItemIn(idx, thisGraph->sources)
{
ResourceGraphLink & cur = thisGraph->sources.item(idx);
if (cur.isRedundantLink())
{
inheritRedundantDependencies(cur.sourceGraph);
ForEachItemIn(i, cur.sourceGraph->dependsOn)
{
ResourceGraphLink & curDepend = cur.sourceGraph->dependsOn.item(i);
ResourceGraphLink * link = new ResourceGraphDependencyLink(curDepend.sourceGraph, curDepend.sourceNode, thisGraph, cur.sinkNode);
thisGraph->dependsOn.append(*link);
links.append(*link);
}
}
}
}
void EclResourcer::createResourced(HqlExprArray & transformed)
{
//Before removing null links (e.g., where the source graph is expanded inline), need to make sure
//dependencies are cloned, otherwise graphs can be generated in the wrong order
ForEachItemIn(idx1, graphs)
inheritRedundantDependencies(&graphs.item(idx1));
ForEachItemInRev(idx2, links)
{
ResourceGraphLink & cur = links.item(idx2);
if (cur.isRedundantLink())
removeLink(cur, true);
}
ForEachItemIn(idx3, graphs)
createResourced(&graphs.item(idx3), transformed);
}
static int compareGraphDepth(CInterface * * _l, CInterface * * _r)
{
ResourceGraphInfo * l = (ResourceGraphInfo *)*_l;
ResourceGraphInfo * r = (ResourceGraphInfo *)*_r;
return l->getDepth() - r->getDepth();
}
static int compareLinkDepth(CInterface * * _l, CInterface * * _r)
{
ResourceGraphLink * l = (ResourceGraphLink *)*_l;
ResourceGraphLink * r = (ResourceGraphLink *)*_r;
int diff = l->sourceGraph->getDepth() - r->sourceGraph->getDepth();
if (diff) return diff;
if (l->sinkGraph)
if (r->sinkGraph)
return l->sinkGraph->getDepth() - r->sinkGraph->getDepth();
else
return -1;
else
if (r->sinkGraph)
return +1;
else
return 0;
}
void EclResourcer::display(StringBuffer & out)
{
CIArrayOf sortedGraphs;
ForEachItemIn(j1, graphs)
sortedGraphs.append(OLINK(graphs.item(j1)));
sortedGraphs.sort(compareGraphDepth);
out.append("Graphs:\n");
ForEachItemIn(i, sortedGraphs)
{
ResourceGraphInfo & cur = sortedGraphs.item(i);
out.appendf("%d: depth(%d) uncond(%d) cond(%d) %s {%p}\n", i, cur.getDepth(), cur.isUnconditional, cur.hasConditionSource, cur.isDead ? "dead" : "", &cur);
ForEachItemIn(j, cur.sources)
{
ResourceGraphLink & link = cur.sources.item(j);
out.appendf(" Source: %p %s\n", link.sinkNode.get(), getOpString(link.sinkNode->getOperator()));
}
ForEachItemIn(k, cur.sinks)
{
ResourceGraphLink & link = cur.sinks.item(k);
IHqlExpression * sourceNode = link.sourceNode;
ResourcerInfo * sourceInfo = queryResourceInfo(sourceNode);
out.appendf(" Sink: %p %s cond(%d,%d) int(%d) ext(%d)\n", sourceNode, getOpString(sourceNode->getOperator()), sourceInfo->conditions.ordinality(), sourceInfo->conditionSourceCount, sourceInfo->numInternalUses(), sourceInfo->numExternalUses);
}
ForEachItemIn(i3, dependencySource.graphs)
{
if (&dependencySource.graphs.item(i3) == &cur)
{
StringBuffer s;
toECL(&dependencySource.search.item(i3), s);
out.appendf(" Creates: %s\n", s.str());
}
}
}
out.append("Links:\n");
CIArrayOf sortedLinks;
ForEachItemIn(j2, links)
sortedLinks.append(OLINK(links.item(j2)));
sortedLinks.sort(compareLinkDepth);
ForEachItemIn(i2, sortedLinks)
{
ResourceGraphLink & link = sortedLinks.item(i2);
unsigned len = out.length();
out.appendf(" Source: %d %s", sortedGraphs.find(*link.sourceGraph), getOpString(link.sourceNode->getOperator()));
if (link.sinkNode)
{
out.padTo(len+30);
out.appendf(" Sink: %d %s", sortedGraphs.find(*link.sinkGraph), getOpString(link.sinkNode->getOperator()));
}
if (link.linkKind == ConditionalLink)
out.append(" ");
else if (link.linkKind == SequenceLink)
out.append(" ");
out.newline();
}
}
void EclResourcer::trace()
{
StringBuffer s;
display(s);
DBGLOG("%s", s.str());
}
//---------------------------------------------------------------------------
void EclResourcer::resourceGraph(HqlExprArray & exprs, HqlExprArray & transformed)
{
//NB: This only resources a single level of queries. SubQueries should be resourced in a separate
//pass so that commonality between different activities/subgraphs isn't introduced/messed up.
findSplitPoints(exprs);
createInitialGraphs(exprs);
markConditions(exprs);
if (options.checkResources())
resourceSubGraphs(exprs);
addDependencies(exprs);
#ifdef TRACE_RESOURCING
trace();
#endif
mergeSubGraphs();
#ifdef TRACE_RESOURCING
trace();
#endif
spotUnbalancedSplitters(exprs);
if (spotThroughAggregate)
optimizeAggregates();
moveExternalSpillPoints();
createResourced(transformed);
}
void EclResourcer::resourceRemoteGraph(HqlExprArray & exprs, HqlExprArray & transformed)
{
//NB: This only resources a single level of queries. SubQueries should be resourced in a separate
//pass so that commonality between different activities/subgraphs isn't introduced/messed up.
findSplitPoints(exprs);
createInitialRemoteGraphs(exprs);
markConditions(exprs);
addDependencies(exprs);
#ifdef TRACE_RESOURCING
trace();
#endif
mergeSubGraphs();
#ifdef TRACE_RESOURCING
trace();
#endif
createResourced(transformed);
}
//---------------------------------------------------------------------------
void expandLists(HqlExprArray & args, IHqlExpression * expr)
{
switch (expr->getOperator())
{
case no_comma:
case no_compound:
case no_parallel:
case no_actionlist:
// for the moment, expand root parallel nodes, it generates much better code.
// I should really come up with a better way of implementing sequential/parallel.
{
ForEachChild(idx, expr)
expandLists(args, expr->queryChild(idx));
break;
}
default:
args.append(*LINK(expr));
break;
}
}
IHqlExpression * resourceThorGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize, IHqlExpression * graphIdExpr)
{
EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
if (graphIdExpr)
resourcer.setNewChildQuery(graphIdExpr, 0);
HqlExprArray exprs;
expandLists(exprs, expr);
HqlExprArray transformed;
resourcer.resourceGraph(exprs, transformed);
hoistNestedCompound(translator, transformed);
return createActionList(transformed);
}
static IHqlExpression * doResourceGraph(HqlCppTranslator & translator, HqlExprCopyArray * activeRows, IHqlExpression * expr,
ClusterType targetClusterType, unsigned clusterSize,
IHqlExpression * graphIdExpr, unsigned * numResults, bool isChild, bool useGraphResults)
{
EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
if (isChild)
resourcer.setChildQuery(true);
resourcer.setNewChildQuery(graphIdExpr, *numResults);
resourcer.setUseGraphResults(useGraphResults);
if (activeRows)
resourcer.tagActiveCursors(*activeRows);
HqlExprArray exprs;
expandLists(exprs, expr);
HqlExprArray transformed;
resourcer.resourceGraph(exprs, transformed);
*numResults = resourcer.numGraphResults();
hoistNestedCompound(translator, transformed);
return createActionList(transformed);
}
IHqlExpression * resourceLibraryGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize, IHqlExpression * graphIdExpr, unsigned * numResults)
{
return doResourceGraph(translator, NULL, expr, targetClusterType, clusterSize, graphIdExpr, numResults, false, true); //?? what value for isChild (e.g., thor library call). Need to gen twice?
}
IHqlExpression * resourceNewChildGraph(HqlCppTranslator & translator, HqlExprCopyArray & activeRows, IHqlExpression * expr, ClusterType targetClusterType, IHqlExpression * graphIdExpr, unsigned * numResults)
{
return doResourceGraph(translator, &activeRows, expr, targetClusterType, 0, graphIdExpr, numResults, true, true);
}
IHqlExpression * resourceLoopGraph(HqlCppTranslator & translator, HqlExprCopyArray & activeRows, IHqlExpression * expr, ClusterType targetClusterType, IHqlExpression * graphIdExpr, unsigned * numResults, bool insideChildGraph)
{
return doResourceGraph(translator, &activeRows, expr, targetClusterType, 0, graphIdExpr, numResults, insideChildGraph, true);
}
IHqlExpression * resourceRemoteGraph(HqlCppTranslator & translator, IHqlExpression * expr, ClusterType targetClusterType, unsigned clusterSize)
{
EclResourcer resourcer(translator.queryErrors(), translator.wu(), targetClusterType, clusterSize, translator.queryOptions());
HqlExprArray exprs;
expandLists(exprs, expr);
HqlExprArray transformed;
resourcer.resourceRemoteGraph(exprs, transformed);
hoistNestedCompound(translator, transformed);
return createActionList(transformed);
}
/*
Conditions:
They are nasty. We process the tree in two passes. First we tag anything which must be evaluated, and
save a list of condition statements to process later.
Second pass we tag conditionals.
a) all left and right branches of a condition are tagged. [conditionSourceCount]
b) all conditional expressions are tagged with the conditions they are evaluated for.
[if the condition lists match then it should be possible to merge the graphs]
c) The spill count for an node should ignore the number of links from conditional graphs,
but should add the number of conditions.
d) if (a, b(f1) +b(f2), c) needs to link b twice though!
*/
class SpillActivityTransformer : public NewHqlTransformer
{
public:
SpillActivityTransformer();
virtual IHqlExpression * createTransformed(IHqlExpression * expr);
};
static HqlTransformerInfo spillActivityTransformerInfo("SpillActivityTransformer");
SpillActivityTransformer::SpillActivityTransformer()
: NewHqlTransformer(spillActivityTransformerInfo)
{
}
IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * expr)
{
IHqlExpression * annotation = queryTransformAnnotation(expr);
if (annotation)
return annotation;
switch (expr->getOperator())
{
case no_writespill:
{
HqlExprArray args;
transformChildren(expr, args);
return createValue(no_output, makeVoidType(), args);
}
case no_commonspill:
return transform(expr->queryChild(0));
case no_readspill:
{
OwnedHqlExpr ds = transform(expr->queryChild(0));
HqlExprArray args;
args.append(*transform(expr->queryChild(1)));
args.append(*LINK(ds->queryRecord()));
ForEachChildFrom(i, expr, 2)
{
IHqlExpression * cur = expr->queryChild(i);
args.append(*transform(cur));
}
IHqlExpression * recordCountAttr = queryRecordCountInfo(expr);
if (recordCountAttr)
args.append(*LINK(recordCountAttr));
return createDataset(no_table, args);
}
}
return NewHqlTransformer::createTransformed(expr);
}
IHqlExpression * convertSpillsToActivities(IHqlExpression * expr)
{
SpillActivityTransformer transformer;
return transformer.transformRoot(expr);
}