Pārlūkot izejas kodu

Merge pull request #4427 from ghalliday/issue9248

HPCC-9248 Option to expand hash distribute and match existing distribution

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 gadi atpakaļ
vecāks
revīzija
dc78c95580

+ 64 - 2
ecl/hql/hqlmeta.cpp

@@ -1471,7 +1471,7 @@ bool matchDedupDistribution(IHqlExpression * distn, const HqlExprArray & equalit
  b) All references to fields from the dataset must match the join element
 */
 
-static bool checkDistributedCoLocally(IHqlExpression * distribute1, IHqlExpression * distribute2, HqlExprArray & sort1, HqlExprArray & sort2)
+static bool checkDistributedCoLocally(IHqlExpression * distribute1, IHqlExpression * distribute2, const HqlExprArray & sort1, const HqlExprArray & sort2)
 {
     unsigned match1 = sort1.find(*distribute1->queryBody());
     unsigned match2 = sort2.find(*distribute2->queryBody());
@@ -1508,6 +1508,68 @@ static bool checkDistributedCoLocally(IHqlExpression * distribute1, IHqlExpressi
 }
 
 
+//Convert a function of fields referenced in oldSort, to fields referenced in newSort.
+IHqlExpression * createMatchingDistribution(IHqlExpression * expr, const HqlExprArray & oldSort, const HqlExprArray & newSort)
+{
+    unsigned match = oldSort.find(*expr->queryBody());
+    if (match != NotFound)
+        return LINK(&newSort.item(match));
+
+    node_operator op = expr->getOperator();
+    switch (op)
+    {
+    case no_hash:
+    case no_hash32:
+    case no_hash64:
+    case no_hashmd5:
+    case no_add:
+    case no_xor:
+    case no_bxor:
+    case no_sortlist:
+    case no_cast:
+    case no_implicitcast:
+    case no_negate:
+    case no_trim:
+        break;
+    case no_field:
+    case no_select:
+    case no_sortpartition:
+        return NULL;
+    case no_constant:
+        break;
+    case no_attr:
+    case no_attr_expr:
+        if (expr->queryName() == internalAtom)
+        {
+            //HASH,internal - only valid if the types of the old and new sorts match exactly
+            ForEachItemIn(i, oldSort)
+            {
+                if (oldSort.item(i).queryType() != newSort.item(i).queryType())
+                    return NULL;
+            }
+        }
+        break;
+    default:
+        return NULL;
+    }
+
+    unsigned max = expr->numChildren();
+    if (max == 0)
+        return LINK(expr);
+
+    HqlExprArray args;
+    args.ensure(max);
+    ForEachChild(i, expr)
+    {
+        IHqlExpression * mapped = createMatchingDistribution(expr->queryChild(i), oldSort, newSort);
+        if (!mapped)
+            return NULL;
+        args.append(*mapped);
+    }
+    return expr->clone(args);
+}
+
+
 static IHqlExpression * queryColocalDataset(IHqlExpression * expr)
 {
     loop
@@ -1524,7 +1586,7 @@ static IHqlExpression * queryColocalDataset(IHqlExpression * expr)
 }
 
 //Check if the distribution functions are essentially identical, except for the places 
-bool isDistributedCoLocally(IHqlExpression * dataset1, IHqlExpression * dataset2, HqlExprArray & sort1, HqlExprArray & sort2)
+bool isDistributedCoLocally(IHqlExpression * dataset1, IHqlExpression * dataset2, const HqlExprArray & sort1, const HqlExprArray & sort2)
 {
     IHqlExpression * distribute1 = queryDistribution(dataset1);
     IHqlExpression * distribute2 = queryDistribution(dataset2);

+ 2 - 1
ecl/hql/hqlmeta.hpp

@@ -84,7 +84,8 @@ extern HQL_API IHqlExpression * convertSubSortToGroupedSort(IHqlExpression * exp
 extern HQL_API bool reorderMatchExistingLocalSort(HqlExprArray & sortedLeft, HqlExprArray & reorderedRight, IHqlExpression * dataset, const HqlExprArray & left, const HqlExprArray & right);
 
 extern HQL_API IHqlExpression * preserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, IHqlExpression * persistName);
-extern HQL_API bool isDistributedCoLocally(IHqlExpression * dataset1, IHqlExpression * dataset2, HqlExprArray & sort1, HqlExprArray & sort2);
+extern HQL_API bool isDistributedCoLocally(IHqlExpression * dataset1, IHqlExpression * dataset2, const HqlExprArray & sort1, const HqlExprArray & sort2);
+extern HQL_API IHqlExpression * createMatchingDistribution(IHqlExpression * expr, const HqlExprArray & oldSort, const HqlExprArray & newSort);
 
 inline IHqlExpression * queryRemoveOmitted(IHqlExpression * expr)
 {

+ 2 - 2
ecl/hqlcpp/hqlcpp.cpp

@@ -1615,7 +1615,6 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.resourceMaxHeavy, "resourceMaxHeavy", 1),
         DebugOption(options.resourceMaxDistribute, "resourceMaxDistribute", 2),
         DebugOption(options.unlimitedResources,"unlimitedResources", false),
-        DebugOption(options.resourceUseMpForDistribute,"resourceUseMpForDistribute", false),
         DebugOption(options.filteredReadSpillThreshold, "filteredReadSpillThreshold", 999),
         DebugOption(options.allowThroughSpill,"allowThroughSpill", true),
         DebugOption(options.minimiseSpills,"minimiseSpills", false),
@@ -1702,6 +1701,8 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.removeXpathFromOutput,"removeXpathFromOutput",false),
         DebugOption(options.canLinkConstantRows,"canLinkConstantRows",true),
         DebugOption(options.checkAmbiguousRollupCondition,"checkAmbiguousRollupCondition",true),
+        DebugOption(options.matchExistingDistributionForJoin,"matchExistingDistributionForJoin",true),
+        DebugOption(options.expandHashJoin,"expandHashJoin",false),
     };
 
     //get options values from workunit
@@ -1738,7 +1739,6 @@ void HqlCppTranslator::cacheOptions()
 
     //The following cases handle options whose default values are dependent on other options.  
     //Or where one debug options sets more than one option
-    options.hasResourceUseMpForDistribute = wu()->hasDebugValue("resourceUseMpForDistribute");
     if (options.spanMultipleCpp)
     {
         options.activitiesPerCpp = wu()->getDebugValueInt("activitiesPerCpp", DEFAULT_ACTIVITIES_PER_CPP);

+ 2 - 2
ecl/hqlcpp/hqlcpp.ipp

@@ -637,8 +637,6 @@ struct HqlCppOptions
     bool                globalAutoHoist;
     bool                expandRepeatAnyAsDfa;
     bool                unlimitedResources;
-    bool                hasResourceUseMpForDistribute;
-    bool                resourceUseMpForDistribute;
     bool                allowThroughSpill;
     bool                minimiseSpills;
     bool                spillMultiCondition;
@@ -717,6 +715,8 @@ struct HqlCppOptions
     bool                canLinkConstantRows;
     bool                checkAmbiguousRollupCondition;
     bool                paranoidCheckSelects;
+    bool                matchExistingDistributionForJoin;
+    bool                expandHashJoin;
 };
 
 //Any information gathered while processing the query should be moved into here, rather than cluttering up the translator class

+ 2 - 15
ecl/hqlcpp/hqlresource.cpp

@@ -51,16 +51,8 @@
 
 static void setHashResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
 {
-    if (options.useMpForDistribute)
-    {
-        unsigned memneeded = MEM_Const_Minimal+resources.clusterSize*4*DISTRIBUTE_SINGLE_BUFFER_SIZE+DISTRIBUTE_PULL_BUFFER_SIZE;
-        resources.set(RESslavememory, memneeded).set(REShashdist, 1);
-    }
-    else
-    {
-        resources.set(RESslavememory, MEM_Const_Minimal+DISTRIBUTE_PULL_BUFFER_SIZE).set(REShashdist, 1);
-        resources.setManyToManySockets(2);
-    }
+    unsigned memneeded = MEM_Const_Minimal+resources.clusterSize*4*DISTRIBUTE_SINGLE_BUFFER_SIZE+DISTRIBUTE_PULL_BUFFER_SIZE;
+    resources.set(RESslavememory, memneeded).set(REShashdist, 1);
 }
 
 
@@ -1782,11 +1774,6 @@ EclResourcer::EclResourcer(IErrorReceiver * _errors, IConstWorkUnit * _wu, Clust
     }
 
 
-    options.useMpForDistribute = (targetClusterType == ThorLCRCluster);
-    
-    if (_translatorOptions.hasResourceUseMpForDistribute)
-        options.useMpForDistribute = _translatorOptions.resourceUseMpForDistribute;
-
     options.isChildQuery = false;
     options.targetClusterType = targetClusterType;
     options.filteredSpillThreshold = _translatorOptions.filteredReadSpillThreshold;

+ 0 - 1
ecl/hqlcpp/hqlresource.ipp

@@ -59,7 +59,6 @@ public:
     bool     preventKeyedSplit;
     bool     preventSteppedSplit;
     bool     minimizeSkewBeforeSpill;
-    bool     useMpForDistribute;
     bool     expandSingleConstRow;
     bool     createSpillAsDataset;
     bool     optimizeSharedInputs;

+ 54 - 12
ecl/hqlcpp/hqlttcpp.cpp

@@ -2579,6 +2579,21 @@ bool ThorHqlTransformer::isLightweightJoinCandidate(IHqlExpression * expr, bool
     return false;
 }
 
+static IHqlExpression * createDistributedInput(IHqlExpression * ds, IHqlExpression * sortlist, bool internal)
+{
+    //could use a more optimal hash function since comparing against self, so fields are same type
+    IHqlExpression * internalExpr = internal ?  createAttribute(internalAtom) : NULL;
+    OwnedHqlExpr activeDist = createValue(no_hash32, LINK(unsignedType), LINK(sortlist), internalExpr);
+    OwnedHqlExpr dist = replaceSelector(activeDist, queryActiveTableSelector(), ds);
+    return createDataset(no_distribute, LINK(ds), LINK(dist));
+}
+
+static IHqlExpression * createDistributedInput(IHqlExpression * ds, const HqlExprArray & sorts, bool internal)
+{
+    OwnedHqlExpr sortlist = createValueSafe(no_sortlist, makeSortListType(NULL), sorts);
+    return createDistributedInput(ds, sortlist, internal);
+}
+
 IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression * expr)
 {
     IHqlExpression * leftDs = expr->queryChild(0);
@@ -2608,7 +2623,6 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
         HqlExprArray args;
         unwindChildren(args, expr);
         removeProperty(args, hashAtom);
-//      args.append(*createAttribute(_normalized_Atom));
         return expr->clone(args);
     }
 
@@ -2682,13 +2696,34 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
         if (isDistributedCoLocally(leftDs, rightDs, leftSorts, rightSorts))
             return appendOwnedOperand(expr, createLocalAttribute());
 
-        //MORE: If left side (assumed to be the largest) is already distributed, it would be more efficient
-        //to redistribute the rhs by a matching hash function (or use cosort), and then join locally.
-        //Be careful about the persist scaling factors though.
-        if (!isPersistDistribution(queryDistribution(leftDs)) && isPartitionedForGroup(leftDs, leftSorts, true))
+        if (options.matchExistingDistributionForJoin)
+        {
+            //Should this exclude lookup joins??
+            //On balance it is probably worthwhile since it means that only 1/clustersize data is on each node.
+
+            //If left side (assumed to be the largest) is already distributed, it would be more efficient
+            //to redistribute the rhs by a matching hash function (or use cosort), and then join locally.
+            //Be careful about the persist scaling factors though.
+            //SORT partitions should be supported once they are persisted by the system
+            IHqlExpression * leftDistribution = queryDistribution(leftDs);
+            if (!isPersistDistribution(leftDistribution) && !isSortedDistribution(leftDistribution) && isPartitionedForGroup(leftDs, leftSorts, true))
+            {
+                //MORE: May need a flag to stop this - to prevent issues with skew.
+                OwnedHqlExpr newHash = createMatchingDistribution(leftDistribution, leftSorts, rightSorts);
+                if (newHash)
+                {
+                    OwnedHqlExpr dist = replaceSelector(newHash, queryActiveTableSelector(), rightDs);
+                    OwnedHqlExpr newRhs = createDataset(no_distribute, LINK(rightDs), LINK(dist));
+                    OwnedHqlExpr newJoin = replaceChild(expr, 1, newRhs);
+                    return appendOwnedOperand(newJoin, createLocalAttribute());
+                }
+            }
+        }
+        else
         {
-            DBGLOG("MORE: Potential for distributed join optimization");
-            //MORE: May need a flag to stop this - to prevent issues with skew.
+            IHqlExpression * leftDistribution = queryDistribution(leftDs);
+            if (!isPersistDistribution(leftDistribution) && !isSortedDistribution(leftDistribution) && isPartitionedForGroup(leftDs, leftSorts, true))
+                DBGLOG("MORE: Potential for distributed join optimization");
         }
     }
 
@@ -2826,11 +2861,7 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
             //Only likely to catch this partition test if isLimitedSubstringJoin true, otherwise caught above
             if (!isPartitionedForGroup(leftDs, sortlist, true))
             {
-                //could use a more optimal hash function since comparing against self, so fields are same type
-                OwnedHqlExpr activeDist = createValue(no_hash32, LINK(unsignedType), LINK(sortlist), createAttribute(internalAtom));
-                //OwnedHqlExpr activeDist = createValue(no_hash, LINK(unsignedType), LINK(sortlist));
-                OwnedHqlExpr dist = replaceSelector(activeDist, queryActiveTableSelector(), leftDs);
-                distribute.setown(createDataset(no_distribute, LINK(leftDs), LINK(dist)));
+                distribute.setown(createDistributedInput(leftDs, sortlist, true));
                 distribute.setown(cloneInheritedAnnotations(expr, distribute));
             }
             else
@@ -2845,6 +2876,17 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
         }
     }
 
+    if (options.expandHashJoin && isThorCluster(targetClusterType) && expr->hasProperty(hashAtom) && !isLimitedSubstringJoin)
+    {
+        HqlExprArray args;
+        args.append(*createDistributedInput(leftDs, leftSorts, false));
+        args.append(*createDistributedInput(rightDs, rightSorts, false));
+        unwindChildren(args, expr, 2);
+        removeProperty(args, hashAtom);
+        args.append(*createLocalAttribute());
+        return expr->clone(args);
+    }
+
     if (isThorCluster(targetClusterType) && isLocal && options.implicitJoinSubSort)
     {
         IHqlExpression * noSortAttr = expr->queryProperty(noSortAtom);