浏览代码

HPCC-14000 Better fix for incorrect balanced splitters

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 年之前
父节点
当前提交
94496b1820
共有 3 个文件被更改,包括 142 次插入14 次删除
  1. 1 1
      ecl/hqlcpp/hqlcpp.cpp
  2. 131 12
      ecl/hqlcpp/hqlresource.cpp
  3. 10 1
      ecl/hqlcpp/hqlresource.ipp

+ 1 - 1
ecl/hqlcpp/hqlcpp.cpp

@@ -1752,7 +1752,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.alwaysUseGraphResults,"alwaysUseGraphResults",false),
         DebugOption(options.noConditionalLinks,"noConditionalLinks",false),
         DebugOption(options.reportAssertFilenameTail,"reportAssertFilenameTail",false),        
-        DebugOption(options.newBalancedSpotter,"newBalancedSpotter",false),
+        DebugOption(options.newBalancedSpotter,"newBalancedSpotter",true),
         DebugOption(options.keyedJoinPreservesOrder,"keyedJoinPreservesOrder",true),
         DebugOption(options.expandSelectCreateRow,"expandSelectCreateRow",false),
         DebugOption(options.obfuscateOutput,"obfuscateOutput",false),

+ 131 - 12
ecl/hqlcpp/hqlresource.cpp

@@ -2791,6 +2791,9 @@ ResourcerInfo::ResourcerInfo(IHqlExpression * _original, CResourceOptions * _opt
     visited = false;
     lastPass = 0;
     balancedExternalUses = 0;
+    balancedInternalUses = 0;
+    balancedVisiting = false;
+    removedParallelPullers = false;
 #ifdef TRACE_BALANCED
     balanceId = 0;
 #endif
@@ -2948,6 +2951,8 @@ void ResourcerInfo::resetBalanced()
     curBalanceLink = 0;
     balancedVisiting = false;
     balancedExternalUses = 0;
+    balancedInternalUses = 0;
+    removedParallelPullers = false;
 }
 
 bool ResourcerInfo::spillSharesSplitter()
@@ -5027,6 +5032,7 @@ void CSplitterInfo::addLink(IHqlExpression * source, IHqlExpression * sink, bool
 
         sourceInfo->balancedLinks.append(*link);
         sinkInfo->balancedLinks.append(*LINK(link));
+        sourceInfo->balancedInternalUses++;
 
 #ifdef TRACE_BALANCED
         printf("\tn%u -> n%u;\n", sourceInfo->balanceId, sinkInfo->balanceId);
@@ -5039,7 +5045,18 @@ void CSplitterInfo::addLink(IHqlExpression * source, IHqlExpression * sink, bool
     }
 }
 
-bool CSplitterInfo::allInputsPulledIndependently(IHqlExpression * expr) const
+void CSplitterLink::mergeSinkLink(CSplitterLink & sinkLink)
+{
+    IHqlExpression * newSink = sinkLink.querySink();
+    assertex(newSink);
+    assertex(sinkLink.hasSource(querySink()));
+    sink.set(newSink);
+    ResourcerInfo * sinkInfo = queryResourceInfo(newSink);
+    unsigned sinkPos = sinkInfo->balancedLinks.find(sinkLink);
+    sinkInfo->balancedLinks.replace(OLINK(*this), sinkPos);
+}
+
+bool CSplitterInfo::allInputsPulledIndependently(IHqlExpression * expr)
 {
     switch (expr->getOperator())
     {
@@ -5077,13 +5094,7 @@ bool CSplitterInfo::isBalancedSplitter(IHqlExpression * expr) const
     ResourcerInfo * info = queryResourceInfo(expr);
     if (!info->balanced)
         return false;
-    unsigned numOutputs = info->balancedExternalUses;
-    ForEachItemIn(i, info->balancedLinks)
-    {
-        CSplitterLink & cur = info->balancedLinks.item(i);
-        if (cur.hasSource(expr))
-            numOutputs++;
-    }
+    unsigned numOutputs = info->balancedExternalUses + info->balancedInternalUses;
     return (numOutputs > 1);
 }
 
@@ -5126,10 +5137,7 @@ void CSplitterInfo::gatherPotentialSplitters(IHqlExpression * expr, IHqlExpressi
         if (alreadyVisited)
             return;
 
-        if (allInputsPulledIndependently(expr))
-            sink = NULL;
-        else
-            sink = expr;
+        sink = expr;
     }
 
     if (info->containsActivity)
@@ -5275,10 +5283,121 @@ IHqlExpression * EclResourcer::walkPotentialSplitterLinks(CSplitterInfo & connec
     return NULL;
 }
 
+bool EclResourcer::removePassThrough(CSplitterInfo & connections, ResourcerInfo & info)
+{
+    if (info.balancedLinks.ordinality() != 2)
+        return false;
+
+    CSplitterLink & link0 = info.balancedLinks.item(0);
+    CSplitterLink & link1 = info.balancedLinks.item(1);
+
+    CSplitterLink * sourceLink;
+    CSplitterLink * sinkLink;
+    if (link0.hasSource(info.original) && link1.hasSink(info.original))
+    {
+        sourceLink = &link1;
+        sinkLink = &link0;
+    }
+    else if (link0.hasSink(info.original) && link1.hasSource(info.original))
+    {
+        sourceLink = &link0;
+        sinkLink = &link1;
+    }
+    else
+        return false;
+
+    if (!sinkLink->querySink())
+        return false;
+
+#ifdef TRACE_BALANCED
+    printf("//remove node %u since now pass-through\n", info.balanceId);
+#endif
+
+    sourceLink->mergeSinkLink(*sinkLink);
+    return true;
+}
+
+void EclResourcer::removeDuplicateIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info)
+{
+    IHqlExpression * expr = info.original;
+    loop
+    {
+        bool again = false;
+        for (unsigned i=0; i < info.balancedLinks.ordinality(); i++)
+        {
+            CSplitterLink & cur = info.balancedLinks.item(i);
+            if (cur.hasSource(expr))
+            {
+                IHqlExpression * sink = cur.queryOther(expr);
+                assertex(sink);
+                ResourcerInfo & sinkInfo = *queryResourceInfo(sink);
+                if (CSplitterInfo::allInputsPulledIndependently(sink))
+                {
+                    unsigned numRemoved = 0;
+                    for (unsigned j=info.balancedLinks.ordinality()-1; j > i; j--)
+                    {
+                        CSplitterLink & next = info.balancedLinks.item(j);
+                        if (next.hasSource(expr) && next.hasSink(sink))
+                        {
+                            info.balancedLinks.remove(j);
+                            sinkInfo.balancedLinks.zap(next);
+                            numRemoved++;
+                        }
+                    }
+
+#ifdef TRACE_BALANCED
+                    if (numRemoved)
+                        printf("//removed %u duplicate links from %u to %u\n", numRemoved, info.balanceId, sinkInfo.balanceId);
+#endif
+
+                }
+
+                //Removing duplicate links has turned the source item into a pass-through.
+                //Replace references to the sink activity with references to its sink
+                //to possibly allow more to be removed.
+                if (removePassThrough(connections, sinkInfo))
+                {
+#ifdef TRACE_BALANCED
+                    printf("//remove %u now pass-through\n", sinkInfo.balanceId);
+#endif
+                    again = true;
+                }
+            }
+        }
+        if (!again)
+            break;
+    }
+}
+
+
+void EclResourcer::optimizeIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info)
+{
+    if (info.removedParallelPullers)
+        return;
+    info.removedParallelPullers = true;
+
+    removeDuplicateIndependentLinks(connections, info);
+
+    //Recurse over inputs to this activity (each call may remove links)
+    for (unsigned i=0; i < info.balancedLinks.ordinality(); i++)
+    {
+        CSplitterLink & cur = info.balancedLinks.item(i);
+        if (cur.hasSink(info.original))
+            optimizeIndependentLinks(connections, *queryResourceInfo(cur.querySource()));
+    }
+}
+
+
 void EclResourcer::optimizeConditionalLinks(CSplitterInfo & connections)
 {
     //MORE: IF() can be special cased.  If it creates two identical links then one of them can be removed
     //Implement by post processing the links and removing duplicates
+    ForEachItemIn(i, connections.sinks)
+    {
+        IHqlExpression & cur = connections.sinks.item(i);
+        ResourcerInfo * info = queryResourceInfo(&cur);
+        optimizeIndependentLinks(connections, *info);
+    }
 }
 
 void EclResourcer::walkPotentialSplitters(CSplitterInfo & connections)

+ 10 - 1
ecl/hqlcpp/hqlresource.ipp

@@ -276,6 +276,10 @@ public:
 
     bool hasSink(IHqlExpression * expr) const { return sink == expr->queryBody(); }
     bool hasSource(IHqlExpression * expr) const { return source == expr->queryBody(); }
+    IHqlExpression * querySource() const { return source; }
+    IHqlExpression * querySink() const { return sink; }
+
+    void mergeSinkLink(CSplitterLink & sinkLink);
 
 private:
     LinkedHqlExpr source;
@@ -290,7 +294,7 @@ public:
     ~CSplitterInfo();
 
     void addLink(IHqlExpression * source, IHqlExpression * sink, bool isExternal);
-    bool allInputsPulledIndependently(IHqlExpression * expr) const;
+    static bool allInputsPulledIndependently(IHqlExpression * expr);
     void gatherPotentialSplitters(IHqlExpression * expr, IHqlExpression * sink, ResourceGraphInfo * graph, bool isDependency);
     bool isSplitOrBranch(IHqlExpression * expr) const;
     bool isBalancedSplitter(IHqlExpression * expr) const;
@@ -430,6 +434,7 @@ public:
     unsigned curBalanceLink;
     unsigned lastPass;
     unsigned balancedExternalUses;
+    unsigned balancedInternalUses;
 #ifdef TRACE_BALANCED
     unsigned balanceId;
 #endif
@@ -447,6 +452,7 @@ public:
     bool projectResult:1;
     bool visited:1;
     bool balancedVisiting:1;
+    bool removedParallelPullers:1;
 };
 
 class EclResourceDependencyGatherer;
@@ -520,6 +526,9 @@ protected:
     void spotSharedInputs(IHqlExpression * expr, ResourceGraphInfo * graph);
     void spotSharedInputs();
 
+    bool removePassThrough(CSplitterInfo & connections, ResourcerInfo & info);
+    void removeDuplicateIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info);
+    void optimizeIndependentLinks(CSplitterInfo & connections, ResourcerInfo & info);
     void optimizeConditionalLinks(CSplitterInfo & connections);
     IHqlExpression * walkPotentialSplitterLinks(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink * link);
     IHqlExpression * walkPotentialSplitters(CSplitterInfo & connections, IHqlExpression * expr, const CSplitterLink & link);