Browse Source

HPCC-11400 Fix issues with balanced splitters

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 11 years ago
parent
commit
7aaa09237a

+ 2 - 0
ecl/hqlcpp/hqlcatom.cpp

@@ -53,6 +53,7 @@ IAtom * eclAtom;
 IAtom * endAtom;
 IAtom * ensureCapacityAtom;
 IAtom * fileAtom;
+IAtom * funnelAtom;
 IAtom * forceAllCheckAtom;
 IAtom * goAtom;
 IAtom * guardAtom;
@@ -1440,6 +1441,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEATOM(ensureCapacity);
     MAKEATOM(file);
     MAKEATOM(forceAllCheck);
+    MAKEATOM(funnel);
     MAKEATOM(go);
     MAKEATOM(guard);
     MAKEATOM(helper);

+ 1 - 0
ecl/hqlcpp/hqlcatom.hpp

@@ -54,6 +54,7 @@ extern IAtom * endAtom;
 extern IAtom * ensureCapacityAtom;
 extern IAtom * fileAtom;
 extern IAtom * forceAllCheckAtom;
+extern IAtom * funnelAtom;
 extern IAtom * goAtom;
 extern IAtom * guardAtom;
 extern IAtom * helperAtom;

+ 1 - 0
ecl/hqlcpp/hqlcpp.cpp

@@ -1728,6 +1728,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.useResultsForChildSpills,"useResultsForChildSpills",false),
         DebugOption(options.alwaysUseGraphResults,"alwaysUseGraphResults",false),
         DebugOption(options.reportAssertFilenameTail,"reportAssertFilenameTail",false),        
+        DebugOption(options.newBalancedSpotter,"newBalancedSpotter",true),
     };
 
     //get options values from workunit

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -734,6 +734,7 @@ struct HqlCppOptions
     bool                useResultsForChildSpills;
     bool                alwaysUseGraphResults;
     bool                reportAssertFilenameTail;
+    bool                newBalancedSpotter;
 };
 
 //Any information gathered while processing the query should be moved into here, rather than cluttering up the translator class

+ 490 - 11
ecl/hqlcpp/hqlresource.cpp

@@ -47,6 +47,19 @@
 #define MEM_Const_Minimal (1*1024*1024)
 #define DEFAULT_MAX_ACTIVITIES  100
 
+static IHqlExpression * backtrackPseudoExpr;
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    backtrackPseudoExpr = createAttribute(retryAtom);
+    return true;
+}
+MODULE_EXIT()
+{
+    ::Release(backtrackPseudoExpr);
+}
+
+
 //=== The following provides information about how each kind of activity is resourced ====
 
 static void setHashResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
@@ -995,7 +1008,6 @@ void ResourceGraphInfo::removeResources(const CResources & value)
 }
 
 
-
 //---------------------------------------------------------------------------
 
 unsigned ChildDependentArray::findOriginal(IHqlExpression * expr)
@@ -1040,6 +1052,11 @@ ResourcerInfo::ResourcerInfo(IHqlExpression * _original, CResourceOptions * _opt
     isConditionalFilter = false;
     projectResult = true; // projected must also be non empty to actually project this dataset
     visited = false;
+    lastPass = 0;
+    balancedExternalUses = 0;
+#ifdef TRACE_BALANCED
+    balanceId = 0;
+#endif
 }
 
 void ResourcerInfo::setConditionSource(IHqlExpression * condition, bool isFirst)            
@@ -1427,6 +1444,15 @@ unsigned ResourcerInfo::numInternalUses()
     return numUses - numExternalUses - aggregates.ordinality();
 }
 
+void ResourcerInfo::resetBalanced()
+{
+    // don't reset balanced since it may be set for an external result
+    balancedLinks.kill();
+    curBalanceLink = 0;
+    balancedVisiting = false;
+    balancedExternalUses = 0;
+}
+
 bool ResourcerInfo::spillSharesSplitter()
 {
     if (outputToUseForSpill || useGraphResult() || useGlobalResult())
@@ -1776,6 +1802,26 @@ bool ResourcerInfo::expandRatherThanSplit()
     }
 }
 
+bool ResourcerInfo::hasDependency() const
+{
+    if (graph)
+    {
+        GraphLinkArray & graphLinks = graph->dependsOn;
+        ForEachItemIn(i, graphLinks)
+        {
+            ResourceGraphLink & link = graphLinks.item(i);
+            if (link.sinkNode == original)
+                return true;
+        }
+    }
+    else
+    {
+        //Really should save some information away...  Err on side of caution
+        return true;
+    }
+    return false;
+}
+
 bool neverCommonUp(IHqlExpression * expr)
 {
     loop
@@ -1851,6 +1897,7 @@ EclResourcer::EclResourcer(IErrorReceiver & _errors, IConstWorkUnit * _wu, Clust
     insideSteppedNeverSplit = false;
     sequential = false;
     options.minimizeSpillSize = _translatorOptions.minimizeSpillSize;
+    thisPass = 1;
 
     unsigned totalMemory = _translatorOptions.resourceMaxMemory ? _translatorOptions.resourceMaxMemory : DEFAULT_TOTAL_MEMORY;
     unsigned maxSockets = _translatorOptions.resourceMaxSockets ? _translatorOptions.resourceMaxSockets : DEFAULT_MAX_SOCKETS;
@@ -1914,6 +1961,7 @@ EclResourcer::EclResourcer(IErrorReceiver & _errors, IConstWorkUnit * _wu, Clust
     options.actionLinkInNewGraph = _translatorOptions.actionLinkInNewGraph || (targetClusterType == HThorCluster);
     options.convertCompoundToExecuteWhen = false;
     options.useResultsForChildSpills = _translatorOptions.useResultsForChildSpills;
+    options.newBalancedSpotter = _translatorOptions.newBalancedSpotter;
     spilled = false;
 }
 
@@ -3968,7 +4016,7 @@ void EclResourcer::addDependencies(HqlExprArray & exprs)
         addDependencies(&exprs.item(idx), NULL, NULL);
 }
 
-void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph)
+void EclResourcer::oldSpotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph)
 {
     ResourcerInfo * info = queryResourceInfo(expr);
     if (!info)
@@ -4018,7 +4066,8 @@ void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned which
         for (unsigned idx = first; idx < last; idx++)
         {
             OwnedHqlExpr childPath = modify ? createAttribute(pathAtom, getSizetConstant(idx), LINK(path)) : LINK(path);
-            spotUnbalancedSplitters(expr->queryChild(idx), whichSource, childPath, graph);
+
+            oldSpotUnbalancedSplitters(expr->queryChild(idx), whichSource, childPath, graph);
         }
     }
 
@@ -4035,7 +4084,7 @@ void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned which
                 if (link.sinkNode == expr)
                 {
                     OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
-                    spotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
+                    oldSpotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
                 }
             }
         }
@@ -4047,14 +4096,14 @@ void EclResourcer::spotUnbalancedSplitters(IHqlExpression * expr, unsigned which
                 if (link.sinkNode == expr)
                 {
                     OwnedHqlExpr childPath = createAttribute(dependencyAtom, LINK(link.sourceNode), LINK(path));
-                    spotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
+                    oldSpotUnbalancedSplitters(link.sourceNode, whichSource, childPath, graph);
                 }
             }
         }
     }
 }
 
-void EclResourcer::spotUnbalancedSplitters(HqlExprArray & exprs)
+void EclResourcer::oldSpotUnbalancedSplitters(HqlExprArray & exprs)
 {
     unsigned curSource = 1;
     switch (targetClusterType)
@@ -4070,7 +4119,7 @@ void EclResourcer::spotUnbalancedSplitters(HqlExprArray & exprs)
                 ForEachItemIn(i2, curGraph.sinks)
                 {
                     ResourceGraphLink & cur = curGraph.sinks.item(i2);
-                    spotUnbalancedSplitters(cur.sourceNode, curSource++, 0, &curGraph);
+                    oldSpotUnbalancedSplitters(cur.sourceNode, curSource++, 0, &curGraph);
                 }
             }
         }
@@ -4079,7 +4128,7 @@ void EclResourcer::spotUnbalancedSplitters(HqlExprArray & exprs)
         {
             //Roxie pulls all at once, so need to analyse globally.
             ForEachItemIn(idx, exprs)
-                spotUnbalancedSplitters(&exprs.item(idx), curSource++, 0, NULL);
+                oldSpotUnbalancedSplitters(&exprs.item(idx), curSource++, 0, NULL);
             break;
         }
     }
@@ -4144,6 +4193,422 @@ void EclResourcer::spotSharedInputs()
     }
 }
 
+//--------------------------------------------------------------------------------------------------------------------
+
+/*
+ * Splitters can either have a limited or unlimited read-ahead.  A splitter with unlimited read-ahead is likely to use
+ * more memory, and needs to be able to spill its read ahead buffer to disk.  A splitter with limited read-ahead
+ * ("balanced") is likely to be more efficient - but it can also potentially cause deadlock.
+ *
+ * A balanced splitter can deadlock because each of its output activities is effectively dependent on the others, which
+ * can create dependency cycles in the graph.
+ *
+ * Say you have f(x,y) and g(x,y), if x and y are unbalanced splitters you have directed edges (f->x),(f->y),(g->x),(g->y)
+ * which contains no cycles.  If x is balanced then you also have (x->f),(x->g) - still without any cycles, but if y is
+ * also balanced then you gain the edges (y->f,y->g) which creates a cycle.  So either x or y need to become unbalanced.
+ *
+ * The previous implementation worked by tagging each splitter with the paths that were used to get there, and
+ * marking a splitter as unbalanced if there was more than one path from the same output.  However that fails to catch the
+ * case where you have f(a,b) and g(a,b) which create more complicated cycles.
+ *
+ * This is essentially a graph traversal problem, where a balanced splitter makes each output dependent on each other
+ * (they effectively become bi-directional dependencies).  Converting balanced splitters to unbalanced is equivalent to
+ * calculating the feedback arc set that will remove all the cycles.  (Unfortunately calculating the minimal set is NP
+ * hard, but because we are dealing with a specialised graph we can avoid that.)
+ *
+ * The approach is as follows
+ *   Loop through each output
+ *     If not already visited (from another output), then perform a depth-first traversal of the graph for that output
+ *
+ * When traversing a node we mark it as being visited, and then walk through the other links.
+ *    If node at the end of the current link is being visited then we have found a cycle - so we need to introduce
+ *       a unbalanced splitter to remove some of the paths.  That splitter must be on the path between the two
+ *       visits to the nodes.
+ *       - It could be the node we have just reached
+ *       - It could be any other node on the route.
+ *
+ *    We can tell which one it is because
+ *    - if is is the node we have just reached, the visiting link must be from an output, and previous link from node to output
+ *    - if on the path then exit path will be to an output, and entry path from an output.
+ *
+ *    If not then there must be by definition another node on that path that satisfies that condition.
+ *
+ * Note: This may over-estimate the number of splitters that need to be marked as unbalanced, but in general it is fairly good.
+ */
+
+
+CSplitterInfo::CSplitterInfo(EclResourcer & _resourcer, bool _preserveBalanced, bool _ignoreExternalDependencies)
+: resourcer(_resourcer), preserveBalanced(_preserveBalanced), ignoreExternalDependencies(_ignoreExternalDependencies)
+{
+    nextBalanceId = 0;
+#ifdef TRACE_BALANCED
+    printf("digraph {\n");
+#endif
+}
+
+CSplitterInfo::~CSplitterInfo()
+{
+    if (preserveBalanced)
+        restoreBalanced();
+#ifdef TRACE_BALANCED
+    printf("}\n");
+#endif
+}
+
+
+void CSplitterInfo::addLink(IHqlExpression * source, IHqlExpression * sink, bool isExternal)
+{
+    ResourcerInfo * sourceInfo = queryResourceInfo(source);
+#ifdef TRACE_BALANCED
+    if (sourceInfo->balanceId == 0)
+    {
+        sourceInfo->balanceId = ++nextBalanceId;
+        printf("\tn%u [label=\"%u\"]; // %s\n", sourceInfo->balanceId, sourceInfo->balanceId, getOpString(source->getOperator()));
+    }
+#endif
+    if (isExternal)
+    {
+        if (!externalSources.contains(*source))
+        {
+            externalSources.append(*source);
+            if (preserveBalanced)
+                wasBalanced.append(sourceInfo->balanced);
+        }
+    }
+
+    sourceInfo->balanced = true;
+
+    if (sink)
+    {
+        CSplitterLink * link = new CSplitterLink(source, sink);
+        ResourcerInfo * sinkInfo = queryResourceInfo(sink);
+
+        sourceInfo->balancedLinks.append(*link);
+        sinkInfo->balancedLinks.append(*LINK(link));
+
+#ifdef TRACE_BALANCED
+        printf("\tn%u -> n%u;\n", sourceInfo->balanceId, sinkInfo->balanceId);
+#endif
+    }
+    else
+    {
+        sinks.append(*source);
+        sourceInfo->balancedExternalUses++;
+    }
+}
+
+bool CSplitterInfo::allInputsPulledIndependently(IHqlExpression * expr) const
+{
+    switch (expr->getOperator())
+    {
+    case no_addfiles:
+        if (expr->hasAttribute(_ordered_Atom) || expr->hasAttribute(_orderedPull_Atom) || isGrouped(expr))
+            return false;
+        return true;
+    case no_parallel:
+        //MORE; This can probably return true - and generate fewer unbalanced splitters.
+        break;
+    }
+    return false;
+}
+
+bool CSplitterInfo::isSplitOrBranch(IHqlExpression * expr) const
+{
+    unsigned num = getNumActivityArguments(expr);
+    if (num > 1)
+        return true;
+
+    //Is this potentially splitter?  Better to have false positives...
+    ResourcerInfo * info = queryResourceInfo(expr);
+    assertex(info);
+    if (info->numUses > 1)
+        return true;
+
+    if (info->hasDependency())
+        return true;
+
+    return false;
+}
+
+bool CSplitterInfo::isBalancedSplitter(IHqlExpression * expr) const
+{
+    ResourcerInfo * info = queryResourceInfo(expr);
+    if (!info->balanced)
+        return false;
+    unsigned numOutputs = info->balancedExternalUses;
+    ForEachItemIn(i, info->balancedLinks)
+    {
+        CSplitterLink & cur = info->balancedLinks.item(i);
+        if (cur.hasSource(expr))
+            numOutputs++;
+    }
+    return (numOutputs > 1);
+}
+
+
+void CSplitterInfo::gatherPotentialSplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph, bool isDependency)
+{
+    ResourcerInfo * info = queryResourceInfo(expr);
+    if (!info)
+        return;
+
+    switch (expr->getOperator())
+    {
+    case no_null:
+    case no_fail:
+        //Any sources that never generate any rows are always fine as a splitter
+        return;
+    //MORE: A source that generates a single row, and subsequent rows are never read, is always fine.
+    //      but if read as a dataset it could deadlock unless there was a 1 row read-ahead.
+
+    }
+    bool alreadyVisited = resourcer.checkAlreadyVisited(info);
+    if (!alreadyVisited)
+    {
+        info->resetBalanced();
+#ifdef TRACE_BALANCED
+        info->balanceId = 0;
+#endif
+    }
+
+    if (graph && info->graph && info->graph != graph)
+    {
+        if (!ignoreExternalDependencies || !isDependency)
+            addLink(expr, sink, true);
+        return;
+    }
+
+    if (isSplitOrBranch(expr) || !sink)
+    {
+        addLink(expr, sink, false);
+        if (alreadyVisited)
+            return;
+
+        if (allInputsPulledIndependently(expr))
+            sink = NULL;
+        else
+            sink = expr;
+    }
+
+    if (info->containsActivity)
+    {
+        unsigned first = getFirstActivityArgument(expr);
+        unsigned num = getNumActivityArguments(expr);
+        unsigned last = first + num;
+        for (unsigned idx = first; idx < last; idx++)
+            gatherPotentialSplitters(expr->queryChild(idx), sink, graph, false);
+    }
+
+    //MORE: dependencies should have onStart as their sink, and there should be a link between onStart and allSinks.
+    //Now check dependencies between graphs (for roxie) and possibly within a graph for thor
+    if (info->graph)
+    {
+        GraphLinkArray & graphLinks = info->graph->dependsOn;
+        ForEachItemIn(i, graphLinks)
+        {
+            ResourceGraphLink & link = graphLinks.item(i);
+            if (link.sinkNode == expr)
+                gatherPotentialDependencySplitters(link.sourceNode, sink, graph);
+        }
+    }
+    else
+    {
+        ForEachItemIn(i, resourcer.links)
+        {
+            ResourceGraphLink & link = resourcer.links.item(i);
+            if (link.isDependency() && (link.sinkNode == expr))
+            {
+                gatherPotentialDependencySplitters(link.sourceNode, sink, graph);
+            }
+        }
+    }
+}
+
+void CSplitterInfo::gatherPotentialDependencySplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph)
+{
+    //Strictly speaking dependencies are executed *before* the activity.
+    //I experimented with making them dependent on a onStart pseudo expression, but that doesn't work because
+    //there are multiple onStart items (as the dependencies nest).
+    //There is still the outside possibility of deadlock between expressions and dependencies of items inside
+    //non-ordered concats, but they would only occur in roxie, and this flag is currently ignored by roxie.
+    gatherPotentialSplitters(expr, sink, graph, true);
+}
+
+void CSplitterInfo::restoreBalanced()
+{
+    ForEachItemIn(i, externalSources)
+    {
+        IHqlExpression & cur = externalSources.item(i);
+        ResourcerInfo * info = queryResourceInfo(&cur);
+        info->balanced = wasBalanced.item(i);
+    }
+}
+
+IHqlExpression * EclResourcer::walkPotentialSplitters(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink & link)
+{
+    ResourcerInfo * info = queryResourceInfo(expr);
+
+    //If already visited all the links, then no need to do any more.
+    if (info->finishedWalkingSplitters())
+        return NULL;
+
+    //Are we currently in the process of visiting this node?
+    if (info->balancedVisiting)
+    {
+#ifdef TRACE_BALANCED
+        printf("//Follow %u->%u has problems....\n", queryResourceInfo(link.queryOther(expr))->balanceId, info->balanceId);
+#endif
+        if (link.hasSource(expr))
+        {
+            //Walking up the tree, and the current node is being visited => it is a node that needs to become unbalanced - if it is a balanced splitter
+            if (connections.isBalancedSplitter(expr))
+            {
+                const CSplitterLink * originalLink = info->queryCurrentLink();
+                if (originalLink->hasSource(expr))
+                    return expr->queryBody();
+            }
+
+            //Must be a node with multiple inputs, walked from one input, and visited another.
+            return backtrackPseudoExpr;
+        }
+        else
+        {
+            //found a loop-> need to do something about it.
+            return backtrackPseudoExpr;
+        }
+    }
+
+#ifdef TRACE_BALANCED
+    printf("//Follow %u->%u\n", queryResourceInfo(link.queryOther(expr))->balanceId, info->balanceId);
+#endif
+    return walkPotentialSplitterLinks(connections, expr, &link);
+}
+
+IHqlExpression * EclResourcer::walkPotentialSplitterLinks(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink * link)
+{
+    ResourcerInfo * info = queryResourceInfo(expr);
+    info->balancedVisiting = true;
+
+    //This may iterate through all links again - but will return quickly if already visited
+    for (unsigned i=0; i < info->balancedLinks.ordinality(); i++)
+    {
+        const CSplitterLink & cur = info->balancedLinks.item(i);
+        if (&cur != link) // don't walk the link we reached here by
+        {
+            if (info->balanced || cur.hasSink(expr))
+            {
+                info->curBalanceLink = i;
+                IHqlExpression * problem = walkPotentialSplitters(connections, cur.queryOther(expr), cur);
+                if (problem)
+                {
+                    bool forceUnbalanced = false;
+                    if (problem == backtrackPseudoExpr)
+                    {
+                        //both links are to outputs
+                        if (link && link->hasSource(expr) && cur.hasSource(expr))
+                        {
+                            assertex(connections.isBalancedSplitter(expr));
+                            forceUnbalanced = true;
+                        }
+                    }
+                    else
+                        forceUnbalanced = (expr->queryBody() == problem);
+
+                    if (!forceUnbalanced)
+                    {
+                        //No longer visiting - we'll come back here again later
+                        info->balancedVisiting = false;
+                        return problem;
+                    }
+#ifdef TRACE_BALANCED
+                    printf("\tn%u [color=red];\n", info->balanceId);
+                    printf("//%u marked as unbalanced\n", info->balanceId);
+#endif
+                    info->balanced = false;
+                }
+            }
+        }
+    }
+    info->curBalanceLink = info->balancedLinks.ordinality();
+    return NULL;
+}
+
+void EclResourcer::optimizeConditionalLinks(CSplitterInfo & connections)
+{
+    //MORE: IF() can be special cased.  If it creates two identical links then one of them can be removed
+    //Implement by post processing the links and removing duplicates
+}
+
+void EclResourcer::walkPotentialSplitters(CSplitterInfo & connections)
+{
+    ForEachItemIn(i, connections.sinks)
+    {
+        IHqlExpression & cur = connections.sinks.item(i);
+        ResourcerInfo * info = queryResourceInfo(&cur);
+        if (!info->finishedWalkingSplitters())
+        {
+            IHqlExpression * problem = walkPotentialSplitterLinks(connections, &cur, NULL);
+            assertex(!problem);
+        }
+    }
+}
+
+void EclResourcer::extractSharedInputs(CSplitterInfo & connections, ResourceGraphInfo & graph)
+{
+    ForEachItemIn(i, connections.externalSources)
+    {
+        IHqlExpression & cur = connections.externalSources.item(i);
+        ResourcerInfo * info = queryResourceInfo(&cur);
+        if (connections.isBalancedSplitter(&cur))
+        {
+            //Add two entries for compatibility with old code.
+            graph.balancedExternalSources.append(*LINK(cur.queryBody()));
+            graph.balancedExternalSources.append(*LINK(cur.queryBody()));
+        }
+    }
+}
+
+void EclResourcer::spotUnbalancedSplitters(const HqlExprArray & exprs)
+{
+    switch (targetClusterType)
+    {
+    case HThorCluster:
+        break;
+    case ThorLCRCluster:
+        {
+            //Thor only handles one graph at a time, so only walk expressions within a single graph.
+            ForEachItemIn(i1, graphs)
+            {
+                ResourceGraphInfo & curGraph = graphs.item(i1);
+                CSplitterInfo info(*this, true, true);
+                nextPass();
+                ForEachItemIn(i2, curGraph.sinks)
+                {
+                    ResourceGraphLink & cur = curGraph.sinks.item(i2);
+                    info.gatherPotentialSplitters(cur.sourceNode, NULL, &curGraph, false);
+                }
+
+                optimizeConditionalLinks(info);
+                walkPotentialSplitters(info);
+                extractSharedInputs(info, curGraph);
+            }
+        }
+        break;
+    case RoxieCluster:
+        {
+            //Roxie pulls all at once, so need to analyse globally.
+            CSplitterInfo info(*this, false, false);
+            nextPass();
+            ForEachItemIn(i2, exprs)
+                info.gatherPotentialSplitters(&exprs.item(i2), NULL, NULL, false);
+
+            optimizeConditionalLinks(info);
+            walkPotentialSplitters(info);
+            //no splitters from reading at start of a subgraph
+            break;
+        }
+    }
+}
 //------------------------------------------------------------------------------------------
 // PASS6: Merge sub graphs that can share resources and don't have dependencies
 // MORE: Once sources are merged, should try merging between trees.
@@ -4225,6 +4690,14 @@ bool EclResourcer::queryMergeGraphLink(ResourceGraphLink & link)
 }
 
 
+bool EclResourcer::checkAlreadyVisited(ResourcerInfo * info)
+{
+    if (info->lastPass == thisPass)
+        return true;
+    info->lastPass = thisPass;
+    return false;
+}
+
 unsigned EclResourcer::getMaxDepth() const
 {
     unsigned maxDepth = 0;
@@ -5154,9 +5627,15 @@ void EclResourcer::resourceGraph(IHqlExpression * expr, HqlExprArray & transform
 #ifdef TRACE_RESOURCING
     trace();
 #endif
-    spotUnbalancedSplitters(exprs);
-    if (options.optimizeSharedInputs)
-        spotSharedInputs();
+
+    if (!options.newBalancedSpotter)
+    {
+        oldSpotUnbalancedSplitters(exprs);
+        if (options.optimizeSharedInputs)
+            spotSharedInputs();
+    }
+    else
+        spotUnbalancedSplitters(exprs);
 
     if (spotThroughAggregate)
         optimizeAggregates();

+ 103 - 17
ecl/hqlcpp/hqlresource.ipp

@@ -19,6 +19,8 @@
 
 #include "hqlresource.hpp"
 
+//#define TRACE_BALANCED
+
 enum ResourceType { 
 //Slave specific
     RESslavememory,
@@ -67,6 +69,7 @@ public:
     bool     convertCompoundToExecuteWhen;
     bool     useResultsForChildSpills;
     bool     alwaysUseGraphResults;
+    bool     newBalancedSpotter;
 
     IHqlExpression * graphIdExpr;
     unsigned nextResult;
@@ -236,13 +239,65 @@ public:
     unsigned findOriginal(IHqlExpression * expr);
 };
 
-class ResourcerInfo : public CInterface, public IInterface
+class CSplitterLink : public CInterface
+{
+public:
+    CSplitterLink(IHqlExpression * _source, IHqlExpression * _sink) : source(_source->queryBody()), sink(_sink->queryBody())
+    {
+    }
+
+    IHqlExpression * queryOther(IHqlExpression * expr) const
+    {
+        if (expr->queryBody() == source)
+            return sink;
+        else
+            return source;
+    }
+
+    bool hasSink(IHqlExpression * expr) const { return sink == expr->queryBody(); }
+    bool hasSource(IHqlExpression * expr) const { return source == expr->queryBody(); }
+
+private:
+    LinkedHqlExpr source;
+    LinkedHqlExpr sink;
+};
+
+class EclResourcer;
+class CSplitterInfo
+{
+public:
+    CSplitterInfo(EclResourcer & _resourcer, bool _preserveBalanced, bool _ignoreExternalDependencies);
+    ~CSplitterInfo();
+
+    void addLink(IHqlExpression * source, IHqlExpression * sink, bool isExternal);
+    bool allInputsPulledIndependently(IHqlExpression * expr) const;
+    void gatherPotentialSplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph, bool isDependency);
+    bool isSplitOrBranch(IHqlExpression * expr) const;
+    bool isBalancedSplitter(IHqlExpression * expr) const;
+    void restoreBalanced();
+
+private:
+    CSplitterInfo();
+
+    void gatherPotentialDependencySplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph);
+
+public:
+    EclResourcer & resourcer;
+    HqlExprCopyArray externalSources;
+    HqlExprCopyArray sinks;
+    HqlExprCopyArray dependents;
+    BoolArray wasBalanced;
+    unsigned nextBalanceId;
+    bool preserveBalanced;
+    bool ignoreExternalDependencies;
+};
+
+class ResourcerInfo : public CInterfaceOf<IInterface>
 {
 public:
     enum { PathUnknown, PathConditional, PathUnconditional };
 
     ResourcerInfo(IHqlExpression * _original, CResourceOptions * _options);
-    IMPLEMENT_IINTERFACE
 
     IHqlExpression * createSpilledRead(IHqlExpression * spillReason);
     IHqlExpression * createTransformedExpr(IHqlExpression * expr);
@@ -254,6 +309,7 @@ public:
     void clearProjected();
     bool expandRatherThanSpill(bool noteOtherSpills);
     bool expandRatherThanSplit();
+    bool hasDependency() const;
     bool isExternalSpill();
     bool neverCommonUp();
     bool isSplit();
@@ -262,6 +318,7 @@ public:
     void noteUsedFromChild(bool _forceHoist);
     unsigned numInternalUses();
     unsigned numSplitPaths();
+    void resetBalanced();
     void setConditionSource(IHqlExpression * condition, bool isFirst);
 
     //hthor - don't merge anything to a global result because we don't allow splitters.
@@ -288,6 +345,16 @@ public:
         return isActivity || containsActivity;
     }
     void setRootActivity();
+    bool finishedWalkingSplitters() const
+    {
+        return (curBalanceLink == balancedLinks.ordinality());
+    }
+    const CSplitterLink * queryCurrentLink() const
+    {
+        if (balancedLinks.isItem(curBalanceLink))
+            return &balancedLinks.item(curBalanceLink);
+        return NULL;
+    }
 
 protected:
     bool spillSharesSplitter();
@@ -318,25 +385,33 @@ public:
     HqlExprAttr splitterOutput;
     HqlExprArray projected;
     HqlExprAttr projectedExpr;
+    CIArrayOf<CSplitterLink> balancedLinks;
 
     unsigned numUses;
     unsigned numExternalUses;
     unsigned conditionSourceCount;
     unsigned currentSource;
+    unsigned curBalanceLink;
+    unsigned lastPass;
+    unsigned balancedExternalUses;
+#ifdef TRACE_BALANCED
+    unsigned balanceId;
+#endif
     byte pathToExpr;
-    bool containsActivity;
-    bool isActivity;
-    bool isRootActivity;
-    bool gatheredDependencies;
-    bool isSpillPoint;
-    bool balanced;
-    bool isAlreadyInScope;
-    bool linkedFromChild;
-    bool forceHoist;
-    bool neverSplit;
-    bool isConditionalFilter;
-    bool projectResult;
-    bool visited;
+    bool containsActivity:1;
+    bool isActivity:1;
+    bool isRootActivity:1;
+    bool gatheredDependencies:1;
+    bool isSpillPoint:1;
+    bool balanced:1;
+    bool isAlreadyInScope:1;
+    bool linkedFromChild:1;
+    bool forceHoist:1;
+    bool neverSplit:1;
+    bool isConditionalFilter:1;
+    bool projectResult:1;
+    bool visited:1;
+    bool balancedVisiting:1;
 };
 
 struct DependencySourceInfo
@@ -350,6 +425,7 @@ struct DependencySourceInfo
 class EclResourcer
 {
     friend class SelectHoistTransformer;
+    friend class CSplitterInfo;
 public:
     EclResourcer(IErrorReceiver & _errors, IConstWorkUnit * _wu, ClusterType _targetClusterType, unsigned _clusterSize, const HqlCppOptions & _translatorOptions);
     ~EclResourcer();
@@ -425,13 +501,20 @@ protected:
     void mergeSiblings();
 
 //Pass 6b
-    void spotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph);
-    void spotUnbalancedSplitters(HqlExprArray & exprs);
+    void oldSpotUnbalancedSplitters(IHqlExpression * expr, unsigned whichSource, IHqlExpression * path, ResourceGraphInfo * graph);
+    void oldSpotUnbalancedSplitters(HqlExprArray & exprs);
 
 //Pass 6c
     void spotSharedInputs(IHqlExpression * expr, ResourceGraphInfo * graph);
     void spotSharedInputs();
 
+    void optimizeConditionalLinks(CSplitterInfo & connections);
+    IHqlExpression * walkPotentialSplitterLinks(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink * link);
+    IHqlExpression * walkPotentialSplitters(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink & link);
+    void walkPotentialSplitters(CSplitterInfo & connections);
+    void extractSharedInputs(CSplitterInfo & connections, ResourceGraphInfo & graph);
+    void spotUnbalancedSplitters(const HqlExprArray & exprs);
+
 //Pass 7
     bool optimizeAggregate(IHqlExpression * expr);
     void optimizeAggregates();
@@ -456,6 +539,8 @@ protected:
     void checkRecursion(ResourceGraphInfo * graph, PointerArray & visited);
     void checkRecursion(ResourceGraphInfo * graph);
     unsigned getMaxDepth() const;
+    bool checkAlreadyVisited(ResourcerInfo * info);
+    void nextPass() { thisPass++; }
 
 protected:
     Owned<IConstWorkUnit> wu;
@@ -466,6 +551,7 @@ protected:
     unsigned clusterSize;
     CResources * resourceLimit;
     IErrorReceiver * errors;
+    unsigned thisPass;
     bool spilled;
     bool spillMultiCondition;
     bool spotThroughAggregate;