|
@@ -1939,7 +1939,10 @@ IHqlExpression * ThorHqlTransformer::createTransformed(IHqlExpression * expr)
|
|
|
case no_selfjoin:
|
|
|
case no_denormalize:
|
|
|
case no_denormalizegroup:
|
|
|
- normalized = normalizeJoinOrDenormalize(transformed);
|
|
|
+ if (transformed->hasAttribute(groupAtom))
|
|
|
+ normalized = normalizeJoinAndGroup(transformed);
|
|
|
+ else
|
|
|
+ normalized = normalizeJoinOrDenormalize(transformed);
|
|
|
break;
|
|
|
case no_cosort:
|
|
|
case no_sort:
|
|
@@ -2595,6 +2598,96 @@ static IHqlExpression * createDistributedInput(IHqlExpression * ds, const HqlExp
|
|
|
return createDistributedInput(ds, sortlist, internal);
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+
|
|
|
+Perform the following transformation:
|
|
|
+
|
|
|
+R := JOIN(l, r, LEFT.key = RIGHT.key AND fuzzy(LEFT,RIGHT), t(LEFT,RIGHT), GROUP(LEFT.id1, LEFT.id2), ATMOST(optional))
|
|
|
+
|
|
|
+DL := DISTRIBUTE(L, HASH(key));
|
|
|
+DR := DISTRIBUTE(R, HASH(key));
|
|
|
+SL := SORT(DL, id, LOCAL); // Later replace this with a LEFTSORT() attribute on the join (so can optimize self join)
|
|
|
+//If it is a self join, SR == SL
|
|
|
+JR := JOIN(SL, DR, LEFT.key = RIGHT.key, t(LEFT,RIGHT), LOOKUP MANY, LOCAL);
|
|
|
+DJ := DISTRIBUTE(J, HASH(leftid1, leftid2), MERGE(leftid1, leftid2));
|
|
|
+R := GROUP(DJ, leftid1, leftid2, LOCAL);
|
|
|
+
|
|
|
+*/
|
|
|
+
|
|
|
+IHqlExpression * ThorHqlTransformer::normalizeJoinAndGroup(IHqlExpression * expr)
|
|
|
+{
|
|
|
+ IHqlExpression * oldLeft = expr->queryChild(0);
|
|
|
+ IHqlExpression * oldRight = expr->queryChild(1);
|
|
|
+ LinkedHqlExpr newLeft = oldLeft;
|
|
|
+ LinkedHqlExpr newRight = oldRight;
|
|
|
+ IHqlExpression * groupOrder = queryAttributeChild(expr, groupAtom, 0);
|
|
|
+ node_operator op = expr->getOperator();
|
|
|
+
|
|
|
+ bool hasLocal = isLocalActivity(expr);
|
|
|
+ bool alwaysLocal = !translator.targetThor();
|
|
|
+ if (!hasLocal && !alwaysLocal)
|
|
|
+ {
|
|
|
+ JoinSortInfo joinInfo;
|
|
|
+ joinInfo.findJoinSortOrders(expr, false);
|
|
|
+
|
|
|
+ OwnedHqlExpr leftList = createValueSafe(no_sortlist, makeSortListType(NULL), joinInfo.queryLeftReq());
|
|
|
+ OwnedHqlExpr mappedLeftList = replaceSelector(leftList, queryActiveTableSelector(), newLeft->queryNormalizedSelector());
|
|
|
+ OwnedHqlExpr hashLeft = createValue(no_hash32, makeIntType(4, false), mappedLeftList.getClear());
|
|
|
+ newLeft.setown(createDataset(no_distribute, LINK(newLeft), LINK(hashLeft)));
|
|
|
+
|
|
|
+ if (oldRight == oldLeft)
|
|
|
+ newRight.set(newLeft);
|
|
|
+ else if (op != no_selfjoin)
|
|
|
+ {
|
|
|
+ OwnedHqlExpr rightList = createValueSafe(no_sortlist, makeSortListType(NULL), joinInfo.queryRightReq());
|
|
|
+ OwnedHqlExpr mappedRightList = replaceSelector(rightList, queryActiveTableSelector(), newRight->queryNormalizedSelector());
|
|
|
+ OwnedHqlExpr hashRight = createValue(no_hash32, makeIntType(4, false), mappedRightList.getClear());
|
|
|
+ newRight.setown(createDataset(no_distribute, LINK(newRight), LINK(hashRight)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ OwnedHqlExpr newLocalAttr = alwaysLocal ? NULL : createLocalAttribute();
|
|
|
+
|
|
|
+ //Sort the left hand dataset into grouping order.
|
|
|
+ assertex(groupOrder);
|
|
|
+ OwnedHqlExpr left = createSelector(no_left, expr->queryChild(0), querySelSeq(expr));
|
|
|
+ OwnedHqlExpr leftSortOrder = replaceSelector(groupOrder, left, newLeft);
|
|
|
+ newLeft.setown(createDatasetF(no_sort, newLeft.getClear(), LINK(leftSortOrder), LINK(newLocalAttr), NULL));
|
|
|
+
|
|
|
+ if (oldRight == oldLeft)
|
|
|
+ newRight.set(newLeft);
|
|
|
+
|
|
|
+ //Now create the modified join
|
|
|
+ HqlExprArray joinArgs;
|
|
|
+ joinArgs.append(*LINK(newLeft));
|
|
|
+ joinArgs.append(*LINK(newRight));
|
|
|
+ unwindChildren(joinArgs, expr, 2);
|
|
|
+ removeProperty(joinArgs, groupAtom);
|
|
|
+ if (!hasLocal && !alwaysLocal)
|
|
|
+ joinArgs.append(*createLocalAttribute());
|
|
|
+ OwnedHqlExpr newJoin = expr->clone(joinArgs);
|
|
|
+
|
|
|
+ //Now need to map the fields from the input dataset to the join output
|
|
|
+ NewProjectMapper2 mapper;
|
|
|
+ mapper.setMapping(newJoin->queryChild(3));
|
|
|
+ bool matchedAll = true;
|
|
|
+ OwnedHqlExpr mappedOrder = mapper.collapseFields(groupOrder, left, newJoin->queryNormalizedSelector(), left, &matchedAll);
|
|
|
+ assertex(matchedAll); // This is checked in the parser, so shouldn't be triggered here.
|
|
|
+
|
|
|
+ //Distribute the result
|
|
|
+ LinkedHqlExpr distributed = newJoin;
|
|
|
+ if (!hasLocal && !alwaysLocal)
|
|
|
+ {
|
|
|
+ OwnedHqlExpr hashOut = createValue(no_hash32, makeIntType(4, false), LINK(mappedOrder));
|
|
|
+ OwnedHqlExpr mergeOut = createExprAttribute(mergeAtom, LINK(mappedOrder));
|
|
|
+ distributed.setown(createDatasetF(no_distribute, LINK(newJoin), hashOut.getClear(), mergeOut.getClear(), NULL));
|
|
|
+ }
|
|
|
+
|
|
|
+ //And finally group it.
|
|
|
+ return createDatasetF(no_group, LINK(distributed), LINK(mappedOrder), LINK(newLocalAttr), NULL);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression * expr)
|
|
|
{
|
|
|
IHqlExpression * leftDs = expr->queryChild(0);
|
|
@@ -4892,7 +4985,7 @@ void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpressio
|
|
|
extraOutputAttr.setown(createComma(LINK(expr->queryAttribute(expireAtom)), LINK(expr->queryAttribute(clusterAtom))));
|
|
|
numPersistInstances = multiplePersistInstances ? -1 : 0;
|
|
|
if (expr->hasAttribute(multipleAtom))
|
|
|
- numPersistInstances = getIntValue(queryAttributeChild(expr, multipleAtom, 0), -1);
|
|
|
+ numPersistInstances = (int)getIntValue(queryAttributeChild(expr, multipleAtom, 0), -1);
|
|
|
else if (expr->hasAttribute(singleAtom))
|
|
|
numPersistInstances = 0;
|
|
|
|