Explorar el Código

Merge pull request #7316 from ghalliday/issue13534

HPCC-13534 Improve support for distributed streaming functions

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 10 años
padre
commit
e1247d96e3

+ 19 - 0
ecl/hql/hqlexpr.cpp

@@ -15675,6 +15675,25 @@ ITypeInfo * createRecordType(IHqlExpression * record)
 }
 
 
+IHqlExpression * queryFunctionAttribute(IHqlExpression * funcdef, IAtom * name)
+{
+    dbgassertex(funcdef->getOperator() == no_funcdef);
+    IHqlExpression * body = funcdef->queryChild(0);
+    switch (body->getOperator())
+    {
+    case no_external:
+        return body->queryAttribute(name);
+    case no_outofline:
+        {
+            IHqlExpression * embed = body->queryChild(0);
+            if (embed->getOperator() == no_embedbody)
+                return embed->queryAttribute(name);
+        }
+        break;
+    }
+    return NULL;
+}
+
 ITypeInfo * getSumAggType(ITypeInfo * argType)
 {
     type_t tc = argType->getTypeCode();

+ 1 - 0
ecl/hql/hqlexpr.hpp

@@ -1624,6 +1624,7 @@ extern HQL_API IIdAtom * queryPatternName(IHqlExpression * expr);
 extern HQL_API IHqlExpression * closeAndLink(IHqlExpression * expr);
 extern HQL_API IHqlExpression * createAbstractRecord(IHqlExpression * record);
 extern HQL_API IHqlExpression * createSortList(HqlExprArray & elements);
+extern HQL_API IHqlExpression * queryFunctionAttribute(IHqlExpression * funcdef, IAtom * name);
 
 // Same as expr->queryChild() except it doesn't return attributes.
 inline IHqlExpression * queryRealChild(IHqlExpression * expr, unsigned i)

+ 15 - 0
ecl/hql/hqlgram.y

@@ -160,6 +160,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   COUNTER
   COVARIANCE
   CPPBODY
+  TOK_CPP
   CRC
   CRON
   CSV
@@ -1065,6 +1066,12 @@ embedBody
                             else
                                 $$.setExpr(parser->processEmbedBody($2, embedText, language, NULL), $1);
                         }
+    | embedCppPrefix CPPBODY
+                        {
+                            OwnedHqlExpr attrs = $1.getExpr();
+                            OwnedHqlExpr embedText = $2.getExpr();
+                            $$.setExpr(parser->processEmbedBody($2, embedText, NULL, attrs), $1);
+                        }
     | EMBED '(' abstractModule ',' expression ')'
                         {
                             parser->normalizeExpression($5, type_stringorunicode, true);
@@ -1091,6 +1098,14 @@ embedPrefix
                         }
     ;
 
+embedCppPrefix
+    : EMBED '(' TOK_CPP attribs ')'
+                        {
+                            parser->getLexer()->enterEmbeddedMode();
+                            $$.setExpr($4.getExpr(), $1);
+                        }
+    ;
+
 compoundAttribute
     : startCompoundAttribute optDefinitions returnAction ';' END
                         {

+ 1 - 0
ecl/hql/hqlgram2.cpp

@@ -10441,6 +10441,7 @@ static void getTokenText(StringBuffer & msg, int token)
     case COUNTER: msg.append("COUNTER"); break;
     case COVARIANCE: msg.append("COVARIANCE"); break;
     case CPPBODY: msg.append("BEGINC++"); break;
+    case TOK_CPP: msg.append("C++"); break;
     case CRC: msg.append("HASHCRC"); break;
     case CRON: msg.append("CRON"); break;
     case CSV: msg.append("CSV"); break;

+ 1 - 0
ecl/hql/hqllex.l

@@ -632,6 +632,7 @@ BLOB                { RETURNSYM(BLOB); }
 BNOT                { RETURNSYM(BNOT); }
 BUILD               { RETURNSYM(BUILD); }
 BUILDINDEX          { RETURNSYM(BUILD); }
+"C++"               { RETURNSYM(TOK_CPP); }
 CARDINALITY         { RETURNSYM(CARDINALITY); }
 CASE                { RETURNSYM(CASE); }
 CATCH               { RETURNSYM(TOK_CATCH); }

+ 28 - 3
ecl/hqlcpp/hqlhtcpp.cpp

@@ -13636,15 +13636,31 @@ ABoundActivity * HqlCppTranslator::doBuildActivityAggregate(BuildCtx & ctx, IHql
 
 //---------------------------------------------------------------------------
 
+static bool isDistributedFunctionCall(IHqlExpression * expr)
+{
+    IHqlExpression * funcdef = NULL;
+    switch (expr->getOperator())
+    {
+    case no_externalcall:
+        funcdef = expr->queryBody()->queryExternalDefinition();
+        break;
+    case no_call:
+        funcdef = expr->queryBody()->queryFunctionDefinition();
+        break;
+    }
+    return (funcdef && queryFunctionAttribute(funcdef, distributedAtom));
+}
+
 ABoundActivity * HqlCppTranslator::doBuildActivityChildDataset(BuildCtx & ctx, IHqlExpression * expr)
 {
     if (options.mainRowsAreLinkCounted || isGrouped(expr))
         return doBuildActivityLinkedRawChildDataset(ctx, expr);
 
-
     StringBuffer s;
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKchilditerator, expr, "ChildIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
 
     buildInstancePrefix(instance);
@@ -13698,6 +13714,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityStreamedCall(BuildCtx & ctx, I
     StringBuffer s;
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKstreamediterator, expr, "StreamedIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
 
     buildInstancePrefix(instance);
@@ -13717,6 +13735,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityLinkedRawChildDataset(BuildCtx
     StringBuffer s;
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKlinkedrawiterator, expr, "LinkedRawIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
 
     buildInstancePrefix(instance);
@@ -18653,8 +18673,6 @@ static bool needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_nohoist:
     case no_actionlist:
     case no_orderedactionlist:
-    case no_externalcall:
-    case no_call:
     case no_compound_fetch:
     case no_addfiles:
     case no_nonempty:
@@ -18709,6 +18727,13 @@ static bool needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_extractresult:
         return needsRealThor(expr->queryChild(0), flags);
 
+    case no_call:
+    case no_externalcall:
+        if (isDistributedFunctionCall(expr))
+            return true;
+        //MORE: check for streamed inputs.
+        break;
+
     case no_fetch:
         return needsRealThor(expr->queryChild(1), flags);
 

+ 134 - 0
ecl/regress/issue12921.ecl

@@ -0,0 +1,134 @@
+#option ('spanMultipleCpp', false);
+
+outRecord := RECORD
+    STRING2  x;
+    STRING10 name;
+    STRING1  term;
+    STRING2  nl;
+END;
+
+myService := SERVICE
+    streamed dataset(outRecord) testRead(const varstring name) : distributed;
+    testWrite(streamed dataset(outRecord) out);
+    testWrite3(streamed dataset(outRecord) out1, streamed dataset(outRecord) out2, streamed dataset(outRecord) out3);
+END;
+
+streamed dataset(outRecord) doRead(const varstring name) := EMBED(C++ : distributed)
+    #include "platform.h"
+    #include "jiface.hpp"
+    #include "jfile.hpp"
+    #include "jstring.hpp"
+
+    class StreamReader : public CInterfaceOf<IRowStream>
+    {
+    public:
+        FileReader(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator) : resultAllocator(_resultAllocator)
+        {
+            deserializer.setown(resultAllocator->createDiskDeserializer(_ctx));
+        }
+
+        virtual const void * nextRow()
+        {
+            if (!source || source->isEof())
+                return NULL;
+
+            RtlDynamicRowBuilder builder(resultAllocator);
+            size32_t size = deserializer->deserialize(builder, *source);
+            return builder.finalizeRowClear(size);
+        }
+
+        virtual void stop()
+        {
+        }
+
+    private:
+        Linked<IOutputRowDeserializer> deserializer;
+        Linked<IEngineRowAllocator> resultAllocator;
+        Linked<IRowDeserializerSource> source;
+    };
+
+    class FileReader : public CInterfaceOf<IRowStream>
+    {
+    public:
+        FileReader(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator, IFileIO * _in) : StreamReader(_ctx, _resultAllocator)
+        {
+            source.setown(createSeralizerSource(in));
+        }
+
+    private:
+        Linked<IFileIO> in;
+    };
+
+    #body
+    unsigned numParts = ctx->getNodes();
+    unsigned whichPart = ctx->getNodeNum();
+    StringBuffer filename;
+    filename.append(name).append(".").append(whichPart).append("_of_").append(numParts);
+    Owned<IFile> out = createIFile(filename);
+    Owned<IFileIO> io = out->open(IFOread);
+    return new FileReader(ctx, _resultAllocator, io);
+ENDEMBED;
+
+
+
+doWrite(streamed dataset ds, const varstring name) := BEGINC++
+    #include "platform.h"
+    #include "jiface.hpp"
+    #include "jfile.hpp"
+    #include "jstring.hpp"
+
+    #body
+    unsigned numParts = ctx->getNodes();
+    unsigned whichPart = ctx->getNodeNum();
+    StringBuffer filename;
+    filename.append(name).append(".").append(whichPart).append("_of_").append(numParts);
+    Owned<IFile> out = createIFile(filename);
+    Owned<IFileIO> io = out->open(IFOcreate);
+    //create a buffered io stream
+    //create a serializer
+    for(;;)
+    {
+        const void * next = ds->nextRow();
+        if (!next)
+        {
+            next = ds->nextRow();
+            if (!next)
+                break;
+        }
+        //serialize row... to buffered stream
+        rtlReleaseRow(next);
+    }
+ENDC++;
+
+doRead('C:\\temp\\simple');
+
+ds := DATASET(20, TRANSFORM(outRecord, SELF.name := (string)HASH32(counter); SELF.x := (string2)COUNTER; SELF.nl := '\r\n'; SELF.term := '!'));
+
+sds := SORT(NOFOLD(ds), name);
+
+doWrite(sds, 'C:\\temp\\simple2');
+
+
+allNodesDs := DATASET(1, TRANSFORM({ unsigned id }, SELF.id := 0), LOCAL);
+streamedDs := NORMALIZE(allNodesDs, doRead('C:\\temp\\simple2'), TRANSFORM(RIGHT));
+output(streamedDs);
+
+/*
+Problems
+- no way to specify a function that returns a dataset with a user supplied format
+  * We could probably use macros to solve the problems for external services.
+- code for streaming output is poor.
+  * Need to introduce a new user-output activity
+- no way to cordinate between instances on different nodes.
+  * Needs more thought.  Might be required if input dataset required partitioning
+- no way that a dataset can be specified as local/executed on all
+  * Probably want a new syntax.  DATASET(function, LOCAL/DISTRIBUTED??).
+- no way to represent a filtered join against an external dataset
+  * A prefetch project almost provides what you need.  We should introduce a new syntax that allows
+    joins against datasources where the filter is pushed to the source.  There are other situtions where
+    this might help - e.g., remote filtering when reading from other thors, filtering on the disk controller etc.
+*/
+
+myService.testWrite(sds);
+myService.testWrite3(sds, sds, sds(name != 'gavin'));
+output(myService.testRead('x'));

+ 9 - 0
testing/regress/ecl/key/streamread.xml

@@ -0,0 +1,9 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>true</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>true</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>true</Result_3></Row>
+</Dataset>

+ 83 - 0
testing/regress/ecl/streamread.ecl

@@ -0,0 +1,83 @@
+outRecord := RECORD
+    STRING10 name;
+    unsigned1  id;
+END;
+
+streamed dataset(outRecord) doRead(const varstring name) := EMBED(C++ : distributed)
+
+    const char * rows[] = {
+        "Gavin     \x01",
+        "Simon     \x02",
+        "Charlotte \x09",
+        "TheEnd    \x00" };
+
+    class StreamCreator : public IRowStream, public RtlCInterface
+    {
+    public:
+        StreamCreator(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator) : resultAllocator(_resultAllocator)
+        {
+            idx = 0;
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void * nextRow()
+        {
+            if (idx >= sizeof(rows)/sizeof(*rows))
+                return NULL;
+
+            RtlDynamicRowBuilder builder(resultAllocator);
+            memcpy(builder.getSelf(), rows[idx++], 11);
+            return builder.finalizeRowClear(11);
+        }
+
+        virtual void stop()
+        {
+        }
+
+    private:
+        Linked<IEngineRowAllocator> resultAllocator;
+        unsigned idx;
+    };
+
+    #body
+    return new StreamCreator(ctx, _resultAllocator);
+ENDEMBED;
+
+ds := doRead('C:\\temp\\simple');
+
+count(ds) = CLUSTERSIZE * 4;
+
+
+linkcounted dataset(outRecord) doReadRows(const varstring name) := EMBED(C++ : distributed)
+
+    static const char * rows2[] = {
+        "Gavin     \x01",
+        "Simon     \x02",
+        "Charlotte \x09",
+        "TheEnd    \x00" };
+
+    #body
+    //Can return constant allocations as roxie rows
+    __countResult = 4;
+    __result = (byte * *)rows2;
+ENDEMBED;
+
+
+dsRows := doReadRows('C:\\temp\\simple');
+
+count(dsRows) = CLUSTERSIZE * 4;
+
+dataset(outRecord) doReadBlock(const varstring name) := EMBED(C++ : distributed)
+
+    static const char * rows3 = "Gavin     \x01Simon     \002Charlotte \x09TheEnd    \x00";
+
+    #body
+    __lenResult = 44;
+    __result = rtlMalloc(44);
+    memcpy(__result, rows3, 44);
+ENDEMBED;
+
+
+dsBlock := doReadBlock('C:\\temp\\simple');
+
+count(dsBlock) = CLUSTERSIZE * 4;

+ 2 - 3
thorlcr/activities/iterate/thiterateslave.cpp

@@ -401,7 +401,7 @@ class CStreamedIteratorSlaveActivity : public CSlaveActivity, public CThorDataLi
 {
     IHThorStreamedIteratorArg *helper;
     Owned<IRowStream> rows;
-    bool eof, isLocal;
+    bool eof;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -414,11 +414,10 @@ public:
     {
         appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorStreamedIteratorArg *> (queryHelper());
-        isLocal = false;
     }
     virtual void start()
     {
-        isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
+        bool isLocal = container.queryLocalData() || container.queryOwner().isLocalChild();
         eof = isLocal ? false : !firstNode();
         if (!eof)
             rows.setown(helper->createInput());