瀏覽代碼

HPCC-21062 Add ECL syntax for DISTRIBUTE(ds, ALL)

Also implement much of the underlying code to support

    DISTRIBUTE(ds, f(LEFT, TARGETNODE))

to allow distribution to a set of nodes

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 6 年之前
父節點
當前提交
4d02d8bc4f

+ 1 - 0
common/thorhelper/commonext.cpp

@@ -210,6 +210,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKselfdenormalizegroup] = "selfdenormalizegroup";
     kindArray[TAKspillread] = "spillread" ;
     kindArray[TAKspillwrite] = "spillwrite" ;
+    kindArray[TAKnwaydistribute] = "nwaydistribute";
 
 //Non standard
     kindArray[TAKsubgraph] = "subgraph";

+ 1 - 0
common/thorhelper/thorcommon.cpp

@@ -818,6 +818,7 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKquantile:               return "Quantile";
     case TAKspillread:              return "Spill Read";
     case TAKspillwrite:             return "Spill Write";
+    case TAKnwaydistribute:         return "Nway Distribute";
     }
     throwUnexpected();
 }

+ 5 - 1
ecl/hql/hqlattr.cpp

@@ -306,6 +306,7 @@ unsigned getOperatorMetaFlags(node_operator op)
     case no_selectfields:
     case no_addfiles:
     case no_distribute:
+    case no_nwaydistribute:
     case no_normalize:
     case no_distributed:
     case no_preservemeta:
@@ -633,7 +634,7 @@ unsigned getOperatorMetaFlags(node_operator op)
 
     case no_unused6:
     case no_unused13: case no_unused14: case no_unused15:
-    case no_unused33: case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
+    case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
     case no_unused40: case no_unused41: case no_unused42: case no_unused43: case no_unused44: case no_unused45: case no_unused46: case no_unused47: case no_unused48: case no_unused49:
     case no_unused50: case no_unused52:
     case no_unused80:
@@ -1643,6 +1644,7 @@ bool isLocalActivity(IHqlExpression * expr)
     switch (expr->getOperator())
     {
     case no_distribute:
+    case no_nwaydistribute:
     case no_keyeddistribute:
     case no_if:
     case no_chooseds:
@@ -1772,6 +1774,7 @@ bool isGroupedActivity(IHqlExpression * expr)
     case no_group:
     case no_enth:
     case no_distribute:
+    case no_nwaydistribute:
     case no_fetch:
     case no_keyeddistribute:
     case no_merge:
@@ -2606,6 +2609,7 @@ IHqlExpression * calcRowInformation(IHqlExpression * expr)
             return getRecordCountInfo(ds);
         }
     case no_allnodes:
+    case no_nwaydistribute:
         {
             retrieveRowInformation(info, ds);
             info.scaleRange(RCclusterSizeEstimate);

+ 6 - 1
ecl/hql/hqlexpr.cpp

@@ -1596,6 +1596,7 @@ const char *getOpString(node_operator op)
     case no_distribute: return "DISTRIBUTE";
     case no_distributed: return "DISTRIBUTED";
     case no_keyeddistribute: return "DISTRIBUTE";
+    case no_nwaydistribute: return "DISTRIBUTE";
 
     case no_rank: return "RANK";
     case no_ranked: return "RANKED";
@@ -1954,7 +1955,7 @@ const char *getOpString(node_operator op)
 
     case no_unused6:
     case no_unused13: case no_unused14: case no_unused15:
-    case no_unused33: case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
+    case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
     case no_unused40: case no_unused41: case no_unused42: case no_unused43: case no_unused44: case no_unused45: case no_unused46: case no_unused47: case no_unused48: case no_unused49:
     case no_unused50: case no_unused52:
     case no_unused80:
@@ -2315,6 +2316,7 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
     case no_grouped:
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_cosort:
     case no_keyed:
@@ -2593,6 +2595,7 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
     case no_dedup:
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_preservemeta:
     case no_enth:
@@ -2897,6 +2900,7 @@ bool definesColumnList(IHqlExpression * dataset)
     case no_dedup:
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_preservemeta:
     case no_enth:
@@ -6806,6 +6810,7 @@ void CHqlDataset::cacheParent()
     // distributing:
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_preservemeta:
     // fewer records

+ 1 - 1
ecl/hql/hqlexpr.hpp

@@ -357,7 +357,7 @@ enum node_operator : unsigned short {
         no_likely,
         no_unlikely,
         no_inline,
-    no_unused33,
+        no_nwaydistribute,
     no_unused34,
     no_unused35,
     no_unused36,

+ 9 - 0
ecl/hql/hqlfold.cpp

@@ -4354,6 +4354,14 @@ IHqlExpression * NullFolderMixin::foldNullDataset(IHqlExpression * expr)
                 return removeParentNode(expr);
             break;
         }
+    case no_nwaydistribute:
+    {
+        if (isNull(child))
+            return replaceWithNull(expr);
+        if (isFail(child))
+            return removeParentNode(expr);
+        break;
+    }
     case no_sort:
     case no_subsort:
     case no_sorted:
@@ -6680,6 +6688,7 @@ HqlConstantPercolator * CExprFolderTransformer::gatherConstants(IHqlExpression *
     case no_assertgrouped:
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_preservemeta:
     case no_assertdistributed:

+ 18 - 2
ecl/hql/hqlgram.y

@@ -8244,8 +8244,24 @@ simpleDataSet
                         }
     | DISTRIBUTE '(' startTopFilter startDistributeAttrs ',' expression optDistributeAttrs ')' endTopFilter
                         {
-                            parser->normalizeExpression($6, type_numeric, false);
-                            $$.setExpr(createDataset(no_distribute, $3.getExpr(), createComma($6.getExpr(), $7.getExpr())));
+                            IHqlExpression *criterion = $6.queryExpr();
+                            node_operator op = no_distribute;
+                            if (criterion->isBoolean())
+                            {
+                                op = no_nwaydistribute;
+                            }
+                            else if (criterion->getOperator() == no_all)
+                            {
+                                //Special case DISTRIBUTE(ds, ALL) as DISTRIBUTE(ds, true) - i.e. distribute to all nodes.
+                                $6.clear();
+                                $6.setExpr(createConstant(true));
+                                op = no_nwaydistribute;
+                            }
+                            else
+                            {
+                                parser->normalizeExpression($6, type_numeric, false);
+                            }
+                            $$.setExpr(createDataset(op, $3.getExpr(), createComma($6.getExpr(), $7.getExpr())));
                             $$.setPosition($1);
                         }
     | DISTRIBUTE '(' startTopFilter startDistributeAttrs optDistributeAttrs ')' endTopFilter

+ 1 - 1
ecl/hql/hqlir.cpp

@@ -290,7 +290,7 @@ const char * getOperatorIRText(node_operator op)
     EXPAND_CASE(no,likely);
     EXPAND_CASE(no,unlikely);
     EXPAND_CASE(no,inline);
-    EXPAND_CASE(no,unused33);
+    EXPAND_CASE(no,nwaydistribute);
     EXPAND_CASE(no,unused34);
     EXPAND_CASE(no,unused35);
     EXPAND_CASE(no,unused36);

+ 3 - 0
ecl/hql/hqlmeta.cpp

@@ -2466,6 +2466,8 @@ void calculateDatasetMeta(CHqlMetaInfo & meta, IHqlExpression * expr)
             }
             break;
         }
+    case no_nwaydistribute:
+        break;
     case no_unordered:
         {
             extractMeta(meta, dataset);
@@ -3410,6 +3412,7 @@ ITypeInfo * calculateDatasetType(node_operator op, const HqlExprArray & parms)
     }
     case no_distribute:
     case no_distributed:
+    case no_nwaydistribute:
     case no_unordered:
     case no_assertdistributed:
         newRecordType.set(recordType);

+ 17 - 0
ecl/hql/hqlopt.cpp

@@ -3082,6 +3082,7 @@ IHqlExpression * CTreeOptimizer::doCreateTransformed(IHqlExpression * transforme
             case no_distributed:
             case no_unordered:
             case no_distribute:
+            case no_nwaydistribute:
             case no_group:
             case no_grouped:
             case no_keyeddistribute:
@@ -3409,6 +3410,7 @@ IHqlExpression * CTreeOptimizer::doCreateTransformed(IHqlExpression * transforme
                     break;
                 return moveProjectionOverSimple(transformed, true, false);
             case no_distribute:
+            case no_nwaydistribute:
                 //Cannot move a count project over anything that changes the order of the records.
                 if (transformedCountProject)
                     break;
@@ -3607,6 +3609,7 @@ IHqlExpression * CTreeOptimizer::doCreateTransformed(IHqlExpression * transforme
             case no_preload:
                 return swapNodeWithChild(transformed);
             case no_distribute:
+            case no_nwaydistribute:
             case no_sort:
             case no_subsort:
                 if ((options & HOOminimizeNetworkAndMemory) && increasesRowSize(transformed))
@@ -3797,6 +3800,20 @@ IHqlExpression * CTreeOptimizer::doCreateTransformed(IHqlExpression * transforme
             }
             break;
         }
+    case no_nwaydistribute:
+        {
+            //DISTRIBUTE(DISTRIBUTE(ds, hash(x), f()) - the original distribution will be lost, so can be removed.
+            switch(child->getOperator())
+            {
+            case no_distributed:
+            case no_distribute:
+            case no_keyeddistribute:
+            case no_sort:
+            case no_subsort:
+                return removeChildNode(transformed);
+            }
+            break;
+        }
     case no_distributed:
         {
             switch(child->getOperator())

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -1537,6 +1537,7 @@ public:
     ABoundActivity * doBuildActivityLoop(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityMerge(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityNonEmpty(BuildCtx & ctx, IHqlExpression * expr);
+    ABoundActivity * doBuildActivityNWayDistribute(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityNWayMerge(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityNWayMergeJoin(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityNormalize(BuildCtx & ctx, IHqlExpression * expr);

+ 58 - 0
ecl/hqlcpp/hqlhtcpp.cpp

@@ -6146,6 +6146,7 @@ double HqlCppTranslator::getComplexity(IHqlExpression * expr, ClusterType cluste
             complexity = 5;
         break;
     case no_distribute:
+    case no_nwaydistribute:
         if (isThorCluster(cluster))
             complexity = 5;
         break;
@@ -6515,6 +6516,9 @@ ABoundActivity * HqlCppTranslator::buildActivity(BuildCtx & ctx, IHqlExpression
             case no_assertdistributed:
                 result = doBuildActivityDistribute(ctx, expr);
                 break;
+            case no_nwaydistribute:
+                result = doBuildActivityNWayDistribute(ctx, expr);
+                break;
             case no_keyeddistribute:
                 result = doBuildActivityKeyedDistribute(ctx, expr);
                 break;
@@ -14616,6 +14620,60 @@ ABoundActivity * HqlCppTranslator::doBuildActivityDistribute(BuildCtx & ctx, IHq
     }
 }
 
+ABoundActivity * HqlCppTranslator::doBuildActivityNWayDistribute(BuildCtx & ctx, IHqlExpression * expr)
+{
+    IHqlExpression * dataset = expr->queryChild(0);
+    IHqlExpression * cond = expr->queryChild(1);
+    bool isAll = matchesBoolean(cond, true);
+
+    if (!targetThor() || insideChildQuery(ctx))
+    {
+        if (isAll)
+        {
+            if (isGrouped(dataset))
+            {
+                Owned<ABoundActivity> boundInput = buildCachedActivity(ctx, dataset);
+                return doBuildActivityUngroup(ctx, expr, boundInput);
+            }
+            return buildCachedActivity(ctx, dataset);
+        }
+
+        // NWAY - DISTRIBUTE within hthor or roxie is equivalent to a filter with
+        // TARGETCHANNEL substituted with 1, followed by an ungroup.
+        // MORE: Implement when DISTRIBUTE(ds, bool) is being implemented
+        UNIMPLEMENTED_X("DISTRIBUTE(NWAY)");
+    }
+
+    if (isUngroup(dataset))
+        dataset = dataset->queryChild(0);
+
+    Owned<ABoundActivity> boundDataset = buildCachedActivity(ctx, dataset);
+
+    //Generate the instance definition for a DISTRIBUTE, ALL/SET...
+    Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKnwaydistribute, expr, "NWayDistribute");
+    if (isAll)
+        instance->graphLabel.set("Distribute All");
+    buildActivityFramework(instance);
+    buildInstancePrefix(instance);
+    if (!isAll)
+    {
+        UNIMPLEMENTED_X("DISTRIBUTE(NWAY)");
+        // Come back to this when DISTRIBUTE(ds, bool) is being implemented
+        doBuildBoolFunction(instance->startctx, "include", cond);
+    }
+
+    StringBuffer flags;
+    if (isAll)
+        flags.append("|SDFisall");
+    if (flags.length())
+        doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);
+
+    buildInstanceSuffix(instance);
+    buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
+
+    return instance->getBoundActivity();
+}
+
 //---------------------------------------------------------------------------
 // no_rollup
 

+ 1 - 0
ecl/hqlcpp/hqliproj.cpp

@@ -858,6 +858,7 @@ static unsigned getActivityCost(IHqlExpression * expr, ClusterType targetCluster
                 break;
             case no_keyeddistribute:
             case no_distribute:
+            case no_nwaydistribute:
             case no_cosort:
                 return CostNetworkCopy;
             case no_topn:

+ 1 - 0
ecl/hqlcpp/hqlresource.cpp

@@ -1665,6 +1665,7 @@ void getResources(IHqlExpression * expr, CResources & resources, const CResource
         break;
     case no_distribute:
     case no_keyeddistribute:
+    case no_nwaydistribute:
         resources.setLightweight();
         setHashResources(expr, resources, options);
         break;

+ 5 - 0
rtl/eclrtl/eclhelper_base.cpp

@@ -479,6 +479,11 @@ double CThorHashDistributeArg::getSkew() { return 0; }           // 0=default
 double CThorHashDistributeArg::getTargetSkew() { return 0; }           // 0=default
 ICompare * CThorHashDistributeArg::queryMergeCompare() { return NULL; }
 
+//CThorNWayDistributeArg
+
+unsigned CThorNWayDistributeArg::getFlags() { return 0; }
+bool     CThorNWayDistributeArg::include(const byte * left, unsigned targetNode) { return true; }    // default to include all
+
 //CThorHashDedupArg
 
 unsigned CThorHashDedupArg::getFlags() { return HDFkeepleft; }

+ 14 - 0
rtl/include/eclhelper.hpp

@@ -1041,6 +1041,7 @@ enum ThorActivityKind
     TAKjsonfetch,
     TAKspillread,
     TAKspillwrite,
+    TAKnwaydistribute,
 
     TAKlast
 };
@@ -1895,6 +1896,19 @@ struct IHThorHashDistributeArg : public IHThorArg
     virtual ICompare * queryMergeCompare()=0;       // iff TAKhasdistributemerge
 };
 
+enum
+{
+    SDFisall   = 0x0001,
+};
+
+
+struct IHThorNWayDistributeArg : public IHThorArg
+{
+    virtual unsigned   getFlags()=0;
+    virtual bool       include(const byte * left, unsigned targetNode) = 0;
+    inline bool        isAll() { return (getFlags() & SDFisall) != 0; }
+};
+
 struct IHThorHashDedupArg : public IHThorArg
 {
     virtual ICompare * queryCompare()=0;

+ 6 - 0
rtl/include/eclhelper_base.hpp

@@ -623,6 +623,12 @@ class ECLRTL_API CThorHashDistributeArg : public CThorArgOf<IHThorHashDistribute
     virtual ICompare * queryMergeCompare() override;
 };
 
+class ECLRTL_API CThorNWayDistributeArg : public CThorArgOf<IHThorNWayDistributeArg>
+{
+    virtual unsigned getFlags() override;
+    virtual bool     include(const byte * left, unsigned targetNode) override;
+};
+
 class ECLRTL_API CThorHashDedupArg : public CThorArgOf<IHThorHashDedupArg>
 {
     virtual unsigned getFlags() override;

+ 56 - 0
testing/regress/ecl/distriball1.ecl

@@ -0,0 +1,56 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//Some of these tests need to be verified by looking at the generate code
+//MORE: Remove these lines when the code has been implemented in the engines.
+//nothor
+//noroxie
+//nohthor
+
+unsigned numRows := 1000;
+
+ids := dataset(numRows, transform({unsigned id}, SELF.id := COUNTER), DISTRIBUTED);
+
+//Check that all the nodes are duplicated
+d1 := distribute(ids, ALL);
+o1 := output(count(d1) - numRows * CLUSTERSIZE);
+
+//Check combinations of distributes are not combined
+d2 := distribute(d1, ALL);
+o2 := output(count(d1) - numRows * CLUSTERSIZE * CLUSTERSIZE);
+
+//Check that distribute does not remove distribute,set
+d3a := distribute(d1, hash(id));
+d3b := DISTRIBUTED(NOFOLD(d3a), hash(id), ASSERT);
+o3 := output(count(d3b) - numRows * CLUSTERSIZE);
+
+//Distribute, ALL can removeCheck that distribute does not remove distribute,set
+d4a := distribute(ids, hash(id));
+d4b := DISTRIBUTE(d4a, ALL);
+o4 := output(count(d4b) - numRows * CLUSTERSIZE);
+
+//Distribute, ALL on an empty dataset is an empty dataset
+d5a := distribute(ids(false), ALL);
+o5 := output(count(d5a));
+
+SEQUENTIAL(
+    o1,
+    o2,
+    o3,
+    o4,
+    o5,
+    );

+ 62 - 0
testing/regress/ecl/distriball2.ecl

@@ -0,0 +1,62 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//Some of these tests need to be verified by looking at the generate code
+//MORE: Remove these lines when the code has been implemented in the engines.
+//nothor
+//noroxie
+//nohthor
+
+//MORE: Include a much larger scale version of this test in the performance suite.
+
+unsigned numRows := 1000;
+
+ids := dataset(numRows, transform({unsigned id}, SELF.id := COUNTER), DISTRIBUTED);
+//Same ids, opposite nodes
+ids2 := dataset(numRows, transform({unsigned id}, SELF.id := numRows - (COUNTER-1)), DISTRIBUTED);
+
+//Same ids, ids duplicated twice
+ids3 := dataset(numRows*2, transform({unsigned id}, SELF.id := 1 + (numRows - COUNTER) % numRows), DISTRIBUTED);
+
+j1 := JOIN(ids, ids2, LEFT.id = RIGHT.id, LOOKUP);
+o1 := output(count(j1) - numRows);
+
+//Check that distribute all followed by a local join produces the same result
+d2 := distribute(ids2, ALL);
+j2 := JOIN(ids, d2, LEFT.id = RIGHT.id, LOCAL, LOOKUP);
+o2 := output(count(j2) - numRows);
+
+j3 := JOIN(ids, ids3, LEFT.id = RIGHT.id, LOOKUP, MANY);
+o3 := output(count(j3) - numRows*2);
+
+//Check that distribute all followed by a local join produces the same result
+d4 := distribute(ids3, ALL);
+j4 := JOIN(ids, d4, LEFT.id = RIGHT.id, LOCAL, LOOKUP, MANY);
+o4 := output(count(j4) - numRows*2);
+
+//Now check if multiple local lookup joins are resourced into the same graph
+j5a := JOIN(j4, d4, LEFT.id = RIGHT.id, LOCAL, LOOKUP, MANY);
+j5b := JOIN(j5a, d4, LEFT.id = RIGHT.id, LOCAL, LOOKUP, MANY);
+o5 := output(count(j5b) - numRows*2*2*2);
+
+SEQUENTIAL(
+    o1,
+    o2,
+    o3,
+    o4,
+    o5,
+    );

+ 15 - 0
testing/regress/ecl/key/distriball1.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>0</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>0</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>0</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>0</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>0</Result_5></Row>
+</Dataset>

+ 15 - 0
testing/regress/ecl/key/distriball2.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>0</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>0</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>0</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>0</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>0</Result_5></Row>
+</Dataset>