Browse Source

Merge pull request #11235 from ghalliday/issue17548

HPCC-17548 Fix problems with streamed dataset parameters

Reviewed-By: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 years ago
parent
commit
054035e002

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -1195,6 +1195,7 @@ public:
     IHqlCppDatasetBuilder * createLimitedDatasetBuilder(IHqlExpression * record, IHqlExpression * maxCount);
     IHqlCppDatasetBuilder * createLinkedDatasetBuilder(IHqlExpression * record, IHqlExpression * choosenLimit = NULL);
     IHqlCppDatasetBuilder * createLinkedDictionaryBuilder(IHqlExpression * record);
+    IHqlCppDatasetBuilder * createStreamedDatasetBuilder(IHqlExpression * record);
     IReferenceSelector * createSelfSelect(BuildCtx & ctx, IReferenceSelector * target, IHqlExpression * expr, IHqlExpression * rootSelector);
     IReferenceSelector * createReferenceSelector(BoundRow * cursor, IHqlExpression * path);
     IReferenceSelector * createReferenceSelector(BoundRow * cursor);

+ 5 - 1
ecl/hqlcpp/hqlcppds.cpp

@@ -2807,7 +2807,11 @@ void HqlCppTranslator::buildDatasetAssign(BuildCtx & ctx, const CHqlBoundTarget
 
     IHqlExpression * record = ::queryRecord(to);
     Owned<IHqlCppDatasetBuilder> builder;
-    if (targetOutOfLine)
+    if (hasStreamedModifier(to))
+    {
+        builder.setown(createStreamedDatasetBuilder(record));
+    }
+    else if (targetOutOfLine)
     {
         if (isDictionaryType(target.queryType()))
         {

+ 43 - 0
ecl/hqlcpp/hqlcset.cpp

@@ -2024,6 +2024,44 @@ void LinkedDictionaryBuilder::buildDeclare(BuildCtx & ctx)
 }
 
 
+StreamedDatasetBuilder::StreamedDatasetBuilder(HqlCppTranslator & _translator, IHqlExpression * _record) : LinkedDatasetBuilderBase(_translator, _record)
+{
+}
+
+void StreamedDatasetBuilder::buildDeclare(BuildCtx & ctx)
+{
+    StringBuffer decl, allocatorName;
+
+    OwnedHqlExpr curActivityId = translator.getCurrentActivityId(ctx);
+    translator.ensureRowAllocator(allocatorName, ctx, record, curActivityId);
+
+    decl.append("RtlStreamedDatasetBuilder ").append(instanceName).append("(");
+    decl.append(allocatorName);
+    decl.append(");");
+
+    ctx.addQuoted(decl);
+}
+
+void StreamedDatasetBuilder::buildFinish(BuildCtx & ctx, const CHqlBoundTarget & target)
+{
+    //more: should I do this by really calling a function?
+    StringBuffer s;
+    assertex(hasWrapperModifier(target.queryType()));
+    assertex(hasStreamedModifier(target.queryType()));
+    translator.generateExprCpp(s.clear(), target.expr);
+    s.append(".setown(").append(instanceName).append(".createDataset());");
+    ctx.addQuoted(s);
+}
+
+
+void StreamedDatasetBuilder::buildFinish(BuildCtx & ctx, CHqlBoundExpr & bound)
+{
+    StringBuffer s;
+    s.append(instanceName).append(".createdDataset()");
+    bound.expr.setown(createQuoted(s.str(), setStreamedAttr(dataset->queryType(), true)));
+}
+
+
 //---------------------------------------------------------------------------
 
 SetBuilder::SetBuilder(HqlCppTranslator & _translator, ITypeInfo * fieldType, IHqlExpression * _allVar) : translator(_translator)
@@ -2117,6 +2155,11 @@ IHqlCppDatasetBuilder * HqlCppTranslator::createLinkedDictionaryBuilder(IHqlExpr
     return new LinkedDictionaryBuilder(*this, record);
 }
 
+IHqlCppDatasetBuilder * HqlCppTranslator::createStreamedDatasetBuilder(IHqlExpression * record)
+{
+    return new StreamedDatasetBuilder(*this, record);
+}
+
 IHqlCppDatasetBuilder * HqlCppTranslator::createSingleRowTempDatasetBuilder(IHqlExpression * record, BoundRow * row)
 {
     return new SingleRowTempDatasetBuilder(*this, record, row);

+ 10 - 0
ecl/hqlcpp/hqlcset.ipp

@@ -320,6 +320,16 @@ protected:
     LinkedHqlExpr choosenLimit;
 };
 
+class StreamedDatasetBuilder : public LinkedDatasetBuilderBase
+{
+public:
+    StreamedDatasetBuilder(HqlCppTranslator & _translator, IHqlExpression * _record);
+
+    virtual void buildDeclare(BuildCtx & ctx);
+    virtual void buildFinish(BuildCtx & ctx, const CHqlBoundTarget & target);
+    virtual void buildFinish(BuildCtx & ctx, CHqlBoundExpr & bound);
+};
+
 class LinkedDictionaryBuilder : public LinkedDatasetBuilderBase
 {
 public:

+ 18 - 0
ecl/regress/issue17548.ecl

@@ -0,0 +1,18 @@
+
+outRecord := RECORD
+    STRING10 name;
+    unsigned1  id;
+END;
+
+dsx := DATASET([{'One', 10},{'Two', 2},{'Three',0}], outRecord);
+output(dsx,,'myfile',overwrite);
+
+ds := DATASET('myfile', outRecord, THOR, OPT);
+ds2 := DATASET([{'Fred', 10},{'George', 2},{'Harry',0}], outRecord) : independent(few);
+
+streamed dataset(outRecord) doRead(streamed dataset(outRecord) inds) := EMBED(C++ : distributed,time)
+    return LINK(inds);
+ENDEMBED;
+
+output(doread(ds));
+output(doread(ds2));

+ 13 - 0
rtl/eclrtl/rtlds.cpp

@@ -397,6 +397,19 @@ byte * RtlLinkedDatasetBuilder::createRow()
 }
 
 
+//------------------------------------------------------------------------------------
+
+RtlStreamedDatasetBuilder::RtlStreamedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit)
+    : RtlLinkedDatasetBuilder(_rowAllocator, _choosenLimit)
+{
+}
+
+IRowStream * RtlStreamedDatasetBuilder::createDataset()
+{
+    return createRowStream(getcount(), queryrows());
+}
+
+
 //cloned from thorcommon.cpp
 class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
 {

+ 8 - 0
rtl/eclrtl/rtlds_imp.hpp

@@ -385,6 +385,14 @@ protected:
     size32_t choosenLimit;
 };
 
+class ECLRTL_API RtlStreamedDatasetBuilder : public RtlLinkedDatasetBuilder
+{
+public:
+    RtlStreamedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit=-1);
+
+    virtual IRowStream * createDataset();
+};
+
 class ECLRTL_API RtlLinkedDictionaryBuilder
 {
 public: