Browse Source

HPCC-21482 Allow embedded activities to return non streamed datasets

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 6 năm trước cách đây
mục cha
commit
f93c284608

+ 7 - 0
common/deftype/deftype.cpp

@@ -3327,6 +3327,13 @@ ITypeInfo * replaceChildType(ITypeInfo * type, ITypeInfo * newChild)
     case type_rule:
         newType.setown(makeRuleType(LINK(newChild)));
         break;
+    case type_function:
+    {
+        IFunctionTypeExtra * extra = dynamic_cast<IFunctionTypeExtra *>(type);
+        assertex(extra);
+        newType.setown(makeFunctionType(LINK(newChild), LINK(extra->queryParameters()), LINK(extra->queryDefaults()), LINK(extra->queryAttributes())));
+        break;
+    }
     default:
         throwUnexpected();
     }

+ 2 - 1
ecl/hqlcpp/hqlcpp.cpp

@@ -6118,7 +6118,8 @@ void HqlCppTranslator::doBuildCall(BuildCtx & ctx, const CHqlBoundTarget * tgt,
             }
             const CHqlBoundTarget * curTarget;
             if (tgt && !tgt->isFixedSize() && 
-                (hasLinkCountedModifier(targetType) == hasLinkCountedModifier(retType)))
+                (hasLinkCountedModifier(targetType) == hasLinkCountedModifier(retType)) &&
+                !hasStreamedModifier(targetType))
             {
                 doneAssign = true;
                 curTarget = tgt;

+ 11 - 3
ecl/hqlcpp/hqlcppds.cpp

@@ -2760,7 +2760,7 @@ void HqlCppTranslator::buildDatasetAssign(BuildCtx & ctx, const CHqlBoundTarget
     case no_translated:
         {
             bool sourceOutOfLine = isArrayRowset(exprType);
-            if (sourceOutOfLine != targetOutOfLine && !hasStreamedModifier(exprType))
+            if (sourceOutOfLine != targetOutOfLine && !hasStreamedModifier(exprType) && !hasStreamedModifier(to))
             {
                 IAtom * serializeFormat = internalAtom; // The format of serialized expressions in memory must match the internal serialization format
                 OwnedITypeInfo serializedSourceType = getSerializedForm(exprType, serializeFormat);
@@ -2810,7 +2810,7 @@ void HqlCppTranslator::buildDatasetAssign(BuildCtx & ctx, const CHqlBoundTarget
                 IIdAtom * func = NULL;
                 if (!isArrayRowset(to))
                 {
-                    if (!isArrayRowset(exprType))
+                    if (!isArrayRowset(exprType) && !hasStreamedModifier(to))
                         func = dataset2DatasetXId;
                 }
                 else if (hasLinkCountedModifier(to))
@@ -2847,9 +2847,17 @@ void HqlCppTranslator::buildDatasetAssign(BuildCtx & ctx, const CHqlBoundTarget
 
                 if (func)
                 {
+                    OwnedHqlExpr function = needFunction(func);
+                    ITypeInfo * funcType = function->queryType();
+                    ITypeInfo * funcDsType = funcType->queryChildType();
+                    ITypeInfo * funcRowType = funcDsType->queryChildType();
+                    ITypeInfo * toRecordType = to->queryChildType()->queryChildType();
+
+                    Owned<ITypeInfo> newRowType = replaceChildType(funcRowType, toRecordType);
+                    Owned<ITypeInfo> newType = replaceChildType(funcDsType, newRowType);
                     HqlExprArray args;
                     args.append(*LINK(expr));
-                    OwnedHqlExpr call = bindFunctionCall(func, args);
+                    OwnedHqlExpr call = bindFunctionCall(func, args, newType);
                     buildExprAssign(ctx, target, call);
                     return;
                 }

+ 2 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -8983,7 +8983,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityEmbed(BuildCtx & ctx, IHqlExpr
     if (expr->isDataset())
     {
         MemberFunction func(*this, instance->startctx, "virtual IRowStream * createOutput(IThorActivityContext * activityContext) override");
-        buildReturn(func.ctx, newCall);
+        OwnedITypeInfo streamedType = setStreamedAttr(newCall->queryType(), true);
+        buildReturn(func.ctx, newCall, streamedType);
     }
     else
     {

+ 29 - 0
testing/regress/ecl/embedactivity5.ecl

@@ -0,0 +1,29 @@
+r := RECORD
+    UNSIGNED value;
+END;
+
+//This function returns the sum of the squares of the inputs from the dataset as a single non streamed row
+
+dataset(r) myDataset(streamed dataset(r) ds) := EMBED(C++ : activity)
+#include <stdio.h>
+#body
+    unsigned __int64 sum = 0;
+    for (;;)
+    {
+        const byte * next = (const byte *)ds->nextRow();
+        if (!next)
+            break;
+        unsigned __int64 value = *(const unsigned __int64 *)next;
+        rtlReleaseRow(next);
+        sum += value * value;
+    }
+
+    __lenResult = sizeof(unsigned __int64);
+    __result = rtlMalloc(__lenResult);
+    *(unsigned __int64 *)__result = sum;
+ENDEMBED;
+
+
+ds1 := DATASET([1,3,4,5,9,10,1,1], r, distributed);
+
+output(TABLE(myDataset(ds1), { unsigned value := SUM(GROUP, value); }));

+ 31 - 0
testing/regress/ecl/embedactivity5b.ecl

@@ -0,0 +1,31 @@
+r := RECORD
+    UNSIGNED value;
+END;
+
+//This function returns the sum of the squares of the inputs from the dataset as a single non streamed row
+
+linkcounted dataset(r) myDataset(streamed dataset(r) ds) := EMBED(C++ : activity)
+#include <stdio.h>
+#body
+    unsigned __int64 sum = 0;
+    for (;;)
+    {
+        const byte * next = (const byte *)ds->nextRow();
+        if (!next)
+            break;
+        unsigned __int64 value = *(const unsigned __int64 *)next;
+        rtlReleaseRow(next);
+        sum += value * value;
+    }
+
+    byte * row = (byte *)_resultAllocator->createRow();
+    *(unsigned __int64 *)row = sum;
+    __countResult = 1;
+    __result = _resultAllocator->createRowset(1);
+    *__result = row;
+ENDEMBED;
+
+
+ds1 := DATASET([1,3,4,5,9,10,1,1], r, distributed);
+
+output(TABLE(myDataset(ds1), { unsigned value := SUM(GROUP, value); }));

+ 3 - 0
testing/regress/ecl/key/embedactivity5.xml

@@ -0,0 +1,3 @@
+<Dataset name='Result 1'>
+ <Row><value>234</value></Row>
+</Dataset>

+ 3 - 0
testing/regress/ecl/key/embedactivity5b.xml

@@ -0,0 +1,3 @@
+<Dataset name='Result 1'>
+ <Row><value>234</value></Row>
+</Dataset>