浏览代码

HPCC-10606 Add STREAMED flag to JOIN to prevent implicit keyed or lookup

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 10 年之前
父节点
当前提交
31af6a9f1e

+ 1 - 1
ecl/hql/hqlexpr.cpp

@@ -14392,7 +14392,7 @@ bool isKeyedJoin(IHqlExpression * expr)
     node_operator op = expr->getOperator();
     if ((op == no_join) || (op == no_joincount) || (op == no_denormalize) || (op == no_denormalizegroup))
     {
-        if (expr->hasAttribute(allAtom) || expr->hasAttribute(lookupAtom) || expr->hasAttribute(smartAtom))
+        if (expr->hasAttribute(allAtom) || expr->hasAttribute(lookupAtom) || expr->hasAttribute(smartAtom) || expr->hasAttribute(streamedAtom))
             return false;
         if (expr->hasAttribute(keyedAtom) || containsAssertKeyed(expr->queryChild(2)))
             return true;

+ 1 - 0
ecl/hql/hqlgram.y

@@ -10258,6 +10258,7 @@ JoinFlag
                             $$.setExpr(createExprAttribute(unstableAtom));
                             $$.setPosition($1);
                         }
+    | STREAMED          {   $$.setExpr(createAttribute(streamedAtom)); $$.setPosition($1); }
     ;
 
 

+ 12 - 2
ecl/hql/hqlgram2.cpp

@@ -7725,13 +7725,14 @@ void HqlGram::checkJoinFlags(const attribute &err, IHqlExpression * join)
     bool isLookup = join->hasAttribute(lookupAtom);
     bool isSmart = join->hasAttribute(smartAtom);
     bool isAll = join->hasAttribute(allAtom);
+    bool isStreamed = join->hasAttribute(streamedAtom);
     IHqlExpression * rowLimit = join->queryAttribute(rowLimitAtom);
     IHqlExpression * keyed = join->queryAttribute(keyedAtom);
 
     if (keyed)
     {
-        if (isAll || isLookup || isSmart)
-            reportError(ERR_KEYEDINDEXINVALID, err, "LOOKUP/ALL/SMART is not compatible with KEYED");
+        if (isAll || isLookup || isSmart || isStreamed)
+            reportError(ERR_KEYEDINDEXINVALID, err, "LOOKUP/ALL/SMART/STREAMED is not compatible with KEYED");
     }
     else if (isLookup)
     {
@@ -7740,11 +7741,20 @@ void HqlGram::checkJoinFlags(const attribute &err, IHqlExpression * join)
             reportWarning(CategorySyntax, ERR_KEYEDINDEXINVALID, err.pos, "ALL is not compatible with LOOKUP");
         if (isSmart)
             reportError(ERR_KEYEDINDEXINVALID, err.pos, "SMART is not compatible with LOOKUP");
+        if (isStreamed)
+            reportError(ERR_KEYEDINDEXINVALID, err.pos, "STREAMED is not compatible with LOOKUP");
     }
     else if (isSmart)
     {
         if (isAll)
             reportError(ERR_KEYEDINDEXINVALID, err, "ALL is not compatible with KEYED");
+        if (isStreamed)
+            reportError(ERR_KEYEDINDEXINVALID, err.pos, "STREAMED is not compatible with SMART");
+    }
+    else if (isAll)
+    {
+        if (isStreamed)
+            reportError(ERR_KEYEDINDEXINVALID, err.pos, "STREAMED is not compatible with ALL");
     }
 
     if (keyed)

+ 5 - 3
ecl/hql/hqlmeta.cpp

@@ -2190,7 +2190,8 @@ void calculateDatasetMeta(CHqlMetaInfo & meta, IHqlExpression * expr)
             bool isAllJoin = expr->queryAttribute(allAtom) != NULL;
             bool isHashJoin = expr->queryAttribute(hashAtom) != NULL;
             bool isSmartJoin = expr->queryAttribute(smartAtom) != NULL;
-            bool isKeyedJoin = !isAllJoin && !isLookupJoin && !isSmartJoin && (expr->queryAttribute(keyedAtom) || isKey(expr->queryChild(1)));
+            bool isStreamedJoin = expr->queryAttribute(streamedAtom) != NULL;
+            bool isKeyedJoin = !isAllJoin && !isLookupJoin && !isSmartJoin && !isStreamedJoin && (expr->queryAttribute(keyedAtom) || isKey(expr->queryChild(1)));
             bool isLocal = (expr->queryAttribute(localAtom) != NULL);
             bool fo = expr->queryAttribute(fullonlyAtom) || expr->queryAttribute(fullouterAtom);
             bool createDefaultLeft = fo || expr->queryAttribute(rightonlyAtom) || expr->queryAttribute(rightouterAtom);
@@ -3100,9 +3101,10 @@ ITypeInfo * calculateDatasetType(node_operator op, const HqlExprArray & parms)
     case no_joincount:
         {
             bool isLookupJoin = queryAttribute(lookupAtom, parms) != NULL;
-            bool isSmartJoin = queryAttribute(smartAtom, parms) != NULL;
             bool isAllJoin = queryAttribute(allAtom, parms) != NULL;
-            bool isKeyedJoin = !isAllJoin && !isLookupJoin && !isSmartJoin && (queryAttribute(keyedAtom, parms) || isKey(&parms.item(1)));
+            bool isSmartJoin = queryAttribute(smartAtom, parms) != NULL;
+            bool isStreamedJoin = queryAttribute(streamedAtom, parms) != NULL;
+            bool isKeyedJoin = !isAllJoin && !isLookupJoin && !isSmartJoin && !isStreamedJoin && (queryAttribute(keyedAtom, parms) || isKey(&parms.item(1)));
 
             recordArg = 3;
             if (queryAttribute(groupAtom, parms))

+ 4 - 2
ecl/hqlcpp/hqlttcpp.cpp

@@ -2756,7 +2756,8 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
         return removeAttribute(expr, hashAtom);
 
     //Check to see if this join should be done as a keyed join...
-    if (!expr->hasAttribute(lookupAtom) && !expr->hasAttribute(smartAtom) && !expr->hasAttribute(allAtom))
+    if (!expr->hasAttribute(lookupAtom) && !expr->hasAttribute(smartAtom) &&
+        !expr->hasAttribute(allAtom) && !expr->hasAttribute(streamedAtom))
     {
         if (rightDs->getOperator() == no_filter)
         {
@@ -2924,7 +2925,8 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
     }
 
     //Sort,Sort->join is O(NlnN) lookup join using a hash table is O(N) =>convert for hthor/roxie
-    if (!isThorCluster(targetClusterType) && !expr->hasAttribute(_normalized_Atom) && !expr->hasAttribute(smartAtom))
+    if (!isThorCluster(targetClusterType) &&
+        !expr->hasAttribute(_normalized_Atom) && !expr->hasAttribute(smartAtom) && !expr->hasAttribute(streamedAtom))
     {
         bool createLookup = false;
         if ((op == no_join) && options.convertJoinToLookup)

+ 39 - 0
ecl/regress/issue10606.ecl

@@ -0,0 +1,39 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+indexRecord :=
+        RECORD
+string14            surname;
+string20            forename
+=>
+string20            alias;
+unsigned8           filepos{virtual(fileposition)};
+        END;
+
+nameKey := INDEX(indexRecord, 'sequence.idx');
+
+
+inRecord := RECORD
+    string            surname;
+    string            forename;
+END;
+
+inDs := DATASET('in', inRecord, THOR);
+
+j := JOIN(inDs, nameKey, LEFT.surname = RIGHT.surname AND LEFT.forename = RIGHT.forename, STREAMED);
+output(j);
+

+ 37 - 0
ecl/regress/issue10606b.ecl

@@ -0,0 +1,37 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+indexRecord := RECORD
+    string14 surname;
+    string20 forename
+    =>
+    string20 alias;
+    unsigned8 filepos{virtual(fileposition)};
+END;
+
+nameKey := INDEX(indexRecord, 'sequence.idx');
+
+inRecord := RECORD
+    string14 surname;
+    string20 forename;
+END;
+
+inDs := DATASET('in', inRecord, THOR);
+
+j := JOIN(inDs, nameKey, LEFT.surname = RIGHT.surname AND LEFT.forename = RIGHT.forename, STREAMED);
+output(j);
+

+ 37 - 0
ecl/regress/issue10606c.ecl

@@ -0,0 +1,37 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+indexRecord := RECORD
+    string14 surname;
+    string20 forename
+    =>
+    string20 alias;
+    unsigned8 filepos{virtual(fileposition)};
+END;
+
+nameKey := INDEX(indexRecord, 'sequence.idx');
+
+inRecord := RECORD
+    string14 surname;
+    string20 forename;
+END;
+
+inDs := DATASET('in', inRecord, THOR);
+
+j := JOIN(inDs, nameKey, LEFT.surname = RIGHT.surname, STREAMED);
+output(j);
+

+ 37 - 0
ecl/regress/issue10606d.ecl

@@ -0,0 +1,37 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+indexRecord := RECORD
+    string14 surname
+    =>
+    string20 forename;
+    string20 alias;
+    unsigned8 filepos{virtual(fileposition)};
+END;
+
+nameKey := INDEX(indexRecord, 'sequence.idx');
+
+inRecord := RECORD
+    string14 surname;
+    string20 forename;
+END;
+
+inDs := DATASET('in', inRecord, THOR);
+
+j := JOIN(inDs, nameKey, LEFT.surname = RIGHT.surname AND LEFT.forename = RIGHT.forename, STREAMED);
+output(j);
+