소스 검색

Merge pull request #8394 from ghalliday/issue15204

HPCC-15204 Ensure execution on another cluster modifies the distribution

Reviewed-By: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 년 전
부모
커밋
fcd0c6bec2
6개의 변경된 파일31개의 추가작업 그리고 20개의 파일을 삭제
  1. 2 0
      ecl/hql/hqlatoms.cpp
  2. 1 0
      ecl/hql/hqlatoms.hpp
  3. 20 15
      ecl/hql/hqlmeta.cpp
  4. 1 1
      ecl/hql/hqlmeta.hpp
  5. 2 2
      ecl/hqlcpp/hqlresource.cpp
  6. 5 2
      ecl/hqlcpp/hqlttcpp.cpp

+ 2 - 0
ecl/hql/hqlatoms.cpp

@@ -345,6 +345,7 @@ IAtom * refreshAtom;
 IAtom * _remote_Atom;
 IAtom * renameAtom;
 IAtom * repeatAtom;
+IAtom * resizeAtom;
 IAtom * _resourced_Atom;
 IAtom * responseAtom;
 IAtom * restartAtom;
@@ -787,6 +788,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
     MAKESYSATOM(remote);
     MAKEATOM(rename);
     MAKEATOM(repeat);
+    MAKEATOM(resize);
     MAKESYSATOM(resourced);
     MAKEATOM(response);
     MAKEATOM(restart);

+ 1 - 0
ecl/hql/hqlatoms.hpp

@@ -350,6 +350,7 @@ extern HQL_API IAtom * refreshAtom;
 extern HQL_API IAtom * _remote_Atom;
 extern HQL_API IAtom * renameAtom;
 extern HQL_API IAtom * repeatAtom;
+extern HQL_API IAtom * resizeAtom;
 extern HQL_API IAtom * _resourced_Atom;
 extern HQL_API IAtom * responseAtom;
 extern HQL_API IAtom * restartAtom;

+ 20 - 15
ecl/hql/hqlmeta.cpp

@@ -721,7 +721,7 @@ bool isSortedDistribution(IHqlExpression * distribution)
 
 bool isPersistDistribution(IHqlExpression * distribution)
 {
-    return isKnownDistribution(distribution) && (distribution->getOperator() == no_bxor);
+    return isKnownDistribution(distribution) && (distribution->queryName() == resizeAtom);
 }
 
 void extractMeta(CHqlMetaInfo & meta, IHqlExpression * expr)
@@ -1292,8 +1292,13 @@ static bool includesFieldsOutsideGrouping(IHqlExpression * distribution, const H
             return false;
         return includesFieldsOutsideGrouping(distribution->queryChild(0), groups);
     case no_attr:
-        //may be flags on hash32,trim etc.
-        return (distribution->queryName() == unknownAtom);
+    case no_attr_expr:
+        {
+            if (distribution->queryName() == unknownAtom)
+                return true;
+            //may be flags on hash32,trim or resize() etc. check children are ok
+            break;
+        }
     default:
         return true;
     }
@@ -1738,7 +1743,7 @@ IHqlExpression * createMatchingDistribution(IHqlExpression * expr, const HqlExpr
                         return NULL;
                 }
             }
-            else if (expr == cacheAnyAttribute)
+            else if ((expr == cacheAnyAttribute) || (name == resizeAtom))
                 return NULL;
             break;
         }
@@ -1828,7 +1833,7 @@ ITypeInfo * createDatasetType(ITypeInfo * recordType, bool isGrouped)
     return type.getClear();
 }
 
-static IHqlExpression * createPreserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, IHqlExpression * persistName)
+static IHqlExpression * createPreserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, bool clusterSizeMayChange)
 {
     CHqlMetaInfo meta;
     if (original->isDataset())
@@ -1838,16 +1843,16 @@ static IHqlExpression * createPreserveTableInfo(IHqlExpression * newTable, IHqlE
     IHqlExpression * localSort = meta.localUngroupedSortOrder;
     IHqlExpression * grouping = meta.grouping;
     IHqlExpression * groupSort = meta.groupSortOrder;
-    if (persistName && isKnownDistribution(distribution))
+    if (clusterSizeMayChange && isKnownDistribution(distribution))
     {
-        if (!distribution->isAttribute())
+        if (!distribution->isAttribute() || (distribution->queryName() == resizeAtom))
         {
-            //Cluster size may not match so generate a unique modifier.  Needs to modify enough distribute no longer a nop,
-            //but not too much to not get hoisted, or introduce extra dependencies.
-            //At the moment bxor with a sequence number since I can't see anyone ever doing that.
-            __int64 seq = getExpressionCRC(persistName);
-            OwnedHqlExpr uid = createConstant(distribution->queryType()->castFrom(true, seq));
-            distribution.setown(createValue(no_bxor, distribution->getType(), LINK(distribution), LINK(uid)));
+            //Cluster size may not match => the distribution should be modified so that it no longer matches the
+            //distribution of a DISTRIBUTE executed within the cluster, but still contains enough information to
+            //deduce that a self-join can be made local.
+            unsigned seq = getExpressionCRC(original);
+            OwnedHqlExpr uid = getSizetConstant(seq);
+            distribution.setown(createExprAttribute(resizeAtom, LINK(distribution), LINK(uid)));
         }
         else if (isSortDistribution(distribution))
         {
@@ -1896,9 +1901,9 @@ static IHqlExpression * optimizePreserveMeta(IHqlExpression * expr)
 }
 
 
-IHqlExpression * preserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, IHqlExpression * persistName)
+IHqlExpression * preserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, bool clusterSizeMayChange)
 {
-    OwnedHqlExpr preserved = createPreserveTableInfo(newTable, original, loseDistribution, persistName);
+    OwnedHqlExpr preserved = createPreserveTableInfo(newTable, original, loseDistribution, clusterSizeMayChange);
     return optimizePreserveMeta(preserved);
 }
 

+ 1 - 1
ecl/hql/hqlmeta.hpp

@@ -127,7 +127,7 @@ extern HQL_API IHqlExpression * convertSubSortToGroupedSort(IHqlExpression * exp
 
 extern HQL_API bool reorderMatchExistingLocalSort(HqlExprArray & sortedLeft, HqlExprArray & reorderedRight, IHqlExpression * dataset, const HqlExprArray & left, const HqlExprArray & right);
 
-extern HQL_API IHqlExpression * preserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, IHqlExpression * persistName);
+extern HQL_API IHqlExpression * preserveTableInfo(IHqlExpression * newTable, IHqlExpression * original, bool loseDistribution, bool clusterSizeMayChange);
 extern HQL_API bool isDistributedCoLocally(IHqlExpression * dataset1, IHqlExpression * dataset2, const HqlExprArray & sort1, const HqlExprArray & sort2);
 extern HQL_API IHqlExpression * createMatchingDistribution(IHqlExpression * expr, const HqlExprArray & oldSort, const HqlExprArray & newSort);
 

+ 2 - 2
ecl/hqlcpp/hqlresource.cpp

@@ -2671,7 +2671,7 @@ IHqlExpression * SpillerInfo::createSpilledRead(IHqlExpression * spillReason)
         loseDistribution = false;
     }
 
-    dataset.setown(preserveTableInfo(dataset, original, loseDistribution, NULL));
+    dataset.setown(preserveTableInfo(dataset, original, loseDistribution, false));
     return wrapRowOwn(dataset.getClear());
 }
 
@@ -6699,7 +6699,7 @@ IHqlExpression * SpillActivityTransformer::createTransformed(IHqlExpression * ex
             else
                 ret.setown(createDataset(readOp, args));
             const bool loseDistribution = false;
-            return preserveTableInfo(ret, ds, loseDistribution, NULL);
+            return preserveTableInfo(ret, ds, loseDistribution, false);
         }
     }
     return NewHqlTransformer::createTransformed(expr);

+ 5 - 2
ecl/hqlcpp/hqlttcpp.cpp

@@ -5353,7 +5353,10 @@ void GlobalAttributeInfo::doSplitGlobalDefinition(ITypeInfo * type, IHqlExpressi
             OwnedHqlExpr getValue = createDataset(no_table, args);
             //getValue.setown(cloneInheritedAnnotations(value, getValue));
             if (persistOp != no_stored)
-                getValue.setown(preserveTableInfo(getValue, value, false, (persistOp == no_persist) ? filename : NULL));
+            {
+                bool clusterSizeMayChange = (persistOp == no_persist) || (cluster != nullptr);
+                getValue.setown(preserveTableInfo(getValue, value, false, clusterSizeMayChange));
+            }
 
             //Note: getValue->queryType() != valueType because the dataset used for field resolution has changed...
             if (value->isDictionary())
@@ -5455,7 +5458,7 @@ void GlobalAttributeInfo::splitSmallDataset(IHqlExpression * value, SharedHqlExp
         OwnedHqlExpr wuRead = value->isDictionary() ? createDictionary(no_workunit_dataset, args) : createDataset(no_workunit_dataset, args);
         //wuRead.setown(cloneInheritedAnnotations(value, wuRead));
         if (persistOp != no_stored)
-            getOutput->setown(preserveTableInfo(wuRead, value, true, NULL));
+            getOutput->setown(preserveTableInfo(wuRead, value, true, false));
         else
             getOutput->set(wuRead);
     }