소스 검색

HPCC-19493 Add access to activity context for other embedded languages

Python, Cassandra, Javascript, MySQL and SQLite support.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 년 전
부모
커밋
7e65c10cc1

+ 1 - 1
ecl/hql/hqlfold.cpp

@@ -1662,7 +1662,7 @@ IHqlExpression * foldEmbeddedCall(IHqlExpression* expr, unsigned foldOptions, IT
 
     Owned<IEmbedContext> __plugin = (IEmbedContext *) plugin->getIntValue();  // We declared as int since ecl has no pointer type - not sure what the clean fix is here...
     DummyContext dummyContext;
-    Owned<IEmbedFunctionContext> __ctx = __plugin->createFunctionContextEx(&dummyContext,flags,optionsStr.str());
+    Owned<IEmbedFunctionContext> __ctx = __plugin->createFunctionContextEx(&dummyContext,nullptr,flags,optionsStr.str());
 
     IValue *query = body->queryChild(0)->queryValue();
     assertex(query);

+ 2 - 1
ecl/hql/hqlutil.hpp

@@ -254,7 +254,8 @@ extern HQL_API bool isTimed(IHqlExpression * expr);
 inline bool isInternalEmbedAttr(IAtom *name)
 {
     return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || 
-           name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom || name == activityAtom;
+           name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom ||
+           name == activityAtom || name == localAtom || name == parallelAtom;
 }
 
 

+ 6 - 0
ecl/hqlcpp/hqlcpp.cpp

@@ -11971,6 +11971,12 @@ void HqlCppTranslator::buildScriptFunctionDefinition(BuildCtx &ctx, IHqlExpressi
     buildAssignToTemp(funcctx, pluginPtr, getPlugin);
     StringBuffer createParam;
     createParam.append("Owned<IEmbedFunctionContext> __ctx = __plugin->createFunctionContextEx(ctx,");
+
+    if (functionBodyIsActivity(bodyCode))
+        createParam.append("activity,");
+    else
+        createParam.append("nullptr,");
+
     createParam.append(isImport ? "EFimport" : "EFembed");
     if (returnType->getTypeCode()==type_void)
         createParam.append("|EFnoreturn");

+ 4 - 4
plugins/Rembed/Rembed.cpp

@@ -1369,17 +1369,17 @@ private:
 class REmbedContext: public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         Owned<REmbedFunctionContext> ret =  new REmbedFunctionContext(*queryGlobalState()->R);
         ret->setScopes(ctx, options);
         return ret.getClear();
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 12 - 12
plugins/cassandra/cassandraembed.cpp

@@ -1300,8 +1300,8 @@ protected:
 class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
-    CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
-      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
+    CassandraEmbedFunctionContext(const IContextLogger &_logctx, const IThorActivityContext *_activityCtx, unsigned _flags, const char *options)
+      : logctx(_logctx), activityCtx(_activityCtx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
     {
         StringArray opts;
         opts.appendList(options, ",");
@@ -1761,9 +1761,8 @@ public:
     virtual void compileEmbeddedScript(size32_t chars, const char *_script)
     {
         // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
-        size32_t len = rtlUtf8Size(chars, _script);
-        queryString.set(_script, len);
-        const char *script = queryString.get(); // Now null terminated
+        rtlSubstituteActivityContext(queryString, activityCtx, chars, _script);
+        const char *script = queryString.str(); // Now null terminated
         if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
         {
             for (;;)
@@ -1803,7 +1802,7 @@ public:
         {
             StringBuffer msg;
             E->errorMessage(msg);
-            msg.appendf(" (processing query %s)", queryString.get());
+            msg.appendf(" (processing query %s)", queryString.str());
             throw makeStringException(E->errorCode(), msg);
         }
     }
@@ -1908,10 +1907,11 @@ protected:
     Owned<CassandraStatementInfo> stmtInfo;
     Owned<CassandraDatasetBinder> inputStream;
     const IContextLogger &logctx;
+    const IThorActivityContext *activityCtx;
     unsigned flags;
     unsigned nextParam;
     unsigned numParams;
-    StringAttr queryString;
+    StringBuffer queryString;
     CassBatchType batchMode;
     unsigned pageSize;
 };
@@ -1919,18 +1919,18 @@ protected:
 class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         if (flags & EFimport)
             UNSUPPORTED("IMPORT");
         else
-            return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), flags, options);
+            return new CassandraEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), activityCtx, flags, options);
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 4 - 4
plugins/couchbase/couchbaseembed.cpp

@@ -1493,12 +1493,12 @@ namespace couchbaseembed
     class CouchbaseEmbedContext : public CInterfaceOf<IEmbedContext>
     {
     public:
-        virtual IEmbedFunctionContext * createFunctionContext(unsigned flags, const char *options)
+        virtual IEmbedFunctionContext * createFunctionContext(unsigned flags, const char *options) override
         {
-            return createFunctionContextEx(NULL, flags, options);
+            return createFunctionContextEx(nullptr, nullptr, flags, options);
         }
 
-        virtual IEmbedFunctionContext * createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+        virtual IEmbedFunctionContext * createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
         {
             if (flags & EFimport)
             {
@@ -1510,7 +1510,7 @@ namespace couchbaseembed
                 return new CouchbaseEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), options, flags);
         }
 
-        virtual IEmbedServiceContext * createServiceContext(const char *service, unsigned flags, const char *options)
+        virtual IEmbedServiceContext * createServiceContext(const char *service, unsigned flags, const char *options) override
         {
             throwUnexpected();
             return nullptr;

+ 4 - 4
plugins/javaembed/javaembed.cpp

@@ -3228,16 +3228,16 @@ protected:
 class JavaEmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         assertex(flags & EFimport);
         return new JavaEmbedImportContext(queryContext(), NULL, options);
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         Owned<JavaEmbedServiceContext> serviceContext = new JavaEmbedServiceContext(queryContext(), service, options);
         serviceContext->init();

+ 18 - 8
plugins/mysql/mysqlembed.cpp

@@ -1385,8 +1385,8 @@ static void initializeMySqlThread()
 class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
-    MySQLEmbedFunctionContext(const char *options)
-      : nextParam(0)
+    MySQLEmbedFunctionContext(const IThorActivityContext *_ctx, const char *options)
+      : nextParam(0), activityCtx(_ctx)
     {
         initializeMySqlThread();
         conn.setown(MySQLConnection::findCachedConnection(options, false));
@@ -1594,7 +1594,16 @@ public:
     }
     virtual void compileEmbeddedScript(size32_t chars, const char *script)
     {
-        size32_t len = rtlUtf8Size(chars, script);
+        StringBuffer scriptStr;
+        size32_t len;
+        if (activityCtx)
+        {
+            rtlSubstituteActivityContext(scriptStr, activityCtx, chars, script);
+            script = scriptStr.str();
+            len = scriptStr.length();
+        }
+        else
+            len = rtlUtf8Size(chars, script);
         for (;;)
         {
             Owned<MySQLStatement> stmt  = new MySQLStatement(mysql_stmt_init(*conn));
@@ -1655,24 +1664,25 @@ protected:
     Owned<MySQLConnection> conn;
     Owned<MySQLPreparedStatement> stmtInfo;
     Owned<MySQLDatasetBinder> inputStream;
+    const IThorActivityContext *activityCtx;
     int nextParam;
 };
 
 class MySQLEmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         if (flags & EFimport)
             UNSUPPORTED("IMPORT");
         else
-            return new MySQLEmbedFunctionContext(options);
+            return new MySQLEmbedFunctionContext(activityCtx, options);
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 63 - 19
plugins/py3embed/py3embed.cpp

@@ -339,7 +339,18 @@ public:
             threadstate->frame = NULL;
     }
 
-    PyObject *getNamedTupleType(const RtlTypeInfo *type)
+
+    PyObject *getActivityContextTupleType()
+    {
+        // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
+        // and if we add our own lock we are liable to deadlock as the code within Py_CompileStringFlags may
+        // temporarily release then re-acquire the GIL.
+        if (!activityContextTupleType)
+            activityContextTupleType.setown(getNamedTupleType("isLocal,numSlaves,numStrands,slave,strand"));
+        return activityContextTupleType.get();
+    }
+
+    PyObject *getNamedTupleType(const char *names)
     {
         // It seems the customized namedtuple types leak, and they are slow to create, so take care to reuse
         // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
@@ -356,20 +367,7 @@ public:
             assertex(PyCallable_Check(namedtuple));
         }
 
-        const RtlFieldInfo * const *fields = type->queryFields();
-        if (!fields && type->queryChildType())
-            fields = type->queryChildType()->queryFields();
-        assertex(fields);
-        StringBuffer names;
-        while (*fields)
-        {
-            const RtlFieldInfo *field = *fields;
-            if (names.length())
-                names.append(',');
-            names.append(field->name);
-            fields++;
-        }
-        OwnedPyObject pnames = PyUnicode_FromString(names.str());
+        OwnedPyObject pnames = PyUnicode_FromString(names);
         OwnedPyObject mynamedtupletype;
         checkPythonError();
         mynamedtupletype.set(PyDict_GetItem(namedtupleTypes, pnames));   // NOTE - returns borrowed reference
@@ -388,6 +386,25 @@ public:
         assertex(PyCallable_Check(mynamedtupletype));
         return mynamedtupletype.getClear();
     }
+
+    PyObject *getNamedTupleType(const RtlTypeInfo *type)
+    {
+        const RtlFieldInfo * const *fields = type->queryFields();
+        if (!fields && type->queryChildType())
+            fields = type->queryChildType()->queryFields();
+        assertex(fields);
+        StringBuffer names;
+        while (*fields)
+        {
+            const RtlFieldInfo *field = *fields;
+            if (names.length())
+                names.append(',');
+            names.append(field->name);
+            fields++;
+        }
+        return getNamedTupleType(names.str());
+    }
+
     PyObject *compileScript(const char *text, const char *parameters)
     {
         // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
@@ -480,6 +497,7 @@ protected:
     OwnedPyObject namedtupleTypes; // dictionary of return values from namedtuple()
     OwnedPyObject compiledScripts; // dictionary of previously compiled scripts
     OwnedPyObject preservedScopes; // dictionary of preserved scopes
+    OwnedPyObject activityContextTupleType; // type used for activity context
 } globalState;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -1285,10 +1303,12 @@ public:
         if (!resultIterator)
             return NULL;
         OwnedPyObject row = PyIter_Next(resultIterator);
+        checkPythonError();
         if (!row)
             return NULL;
         RtlDynamicRowBuilder rowBuilder(resultAllocator);
         size32_t len = py3embed::getRowResult(row, rowBuilder);
+        checkPythonError();
         return rowBuilder.finalizeRowClear(len);
     }
     virtual void stop()
@@ -1317,6 +1337,23 @@ public:
         PyEval_RestoreThread(sharedCtx->threadState);
     }
 
+    virtual void setActivityOptions(const IThorActivityContext *ctx)
+    {
+        OwnedPyObject mynamedtupletype = globalState.getActivityContextTupleType();
+        OwnedPyObject args = PyTuple_New(5);
+        OwnedPyObject isLocal;
+        isLocal.set(ctx->isLocal() ? Py_True : Py_False);
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 0, isLocal.getClear());
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 1, PyLong_FromLong(ctx->numSlaves()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 2, PyLong_FromLong(ctx->numStrands()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 3, PyLong_FromLong(ctx->querySlave()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 4, PyLong_FromLong(ctx->queryStrand()));
+        OwnedPyObject activityTuple = PyObject_CallObject(mynamedtupletype, args);  // Creates a namedtuple from the supplied tuple
+        checkPythonError();
+        PyDict_SetItemString(locals, "__activity__", activityTuple.getClear());
+        checkPythonError();
+    }
+
     void setScopes(ICodeContext *codeCtx, const char *_options)
     {
         locals.setown(PyDict_New());
@@ -1639,6 +1676,11 @@ public:
     {
         script.setown(sharedCtx->compileEmbeddedScript(lenChars, utf, argstring));
     }
+    virtual void setActivityOptions(const IThorActivityContext *ctx) override
+    {
+        Python3xEmbedContextBase::setActivityOptions(ctx);
+        argstring.append("__activity__");
+    }
 
     virtual void callFunction()
     {
@@ -1719,11 +1761,11 @@ private:
 class Python3xEmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         checkThreadContext();
         Owned<Python3xEmbedContextBase> ret;
@@ -1732,9 +1774,11 @@ public:
         else
             ret.setown(new Python3xEmbedScriptContext(threadContext));
         ret->setScopes(ctx, options);
+        if (activityCtx)
+            ret->setActivityOptions(activityCtx);
         return ret.getClear();
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 66 - 19
plugins/pyembed/pyembed.cpp

@@ -336,7 +336,18 @@ public:
             threadstate->frame = NULL;
     }
 
-    PyObject *getNamedTupleType(const RtlTypeInfo *type)
+
+    PyObject *getActivityContextTupleType()
+    {
+        // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
+        // and if we add our own lock we are liable to deadlock as the code within Py_CompileStringFlags may
+        // temporarily release then re-acquire the GIL.
+        if (!activityContextTupleType)
+            activityContextTupleType.setown(getNamedTupleType("isLocal,numSlaves,numStrands,slave,strand"));
+        return activityContextTupleType.get();
+    }
+
+    PyObject *getNamedTupleType(const char *names)
     {
         // It seems the customized namedtuple types leak, and they are slow to create, so take care to reuse
         // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
@@ -353,26 +364,15 @@ public:
             assertex(PyCallable_Check(namedtuple));
         }
 
-        const RtlFieldInfo * const *fields = type->queryFields();
-        if (!fields && type->queryChildType())
-            fields = type->queryChildType()->queryFields();
-        assertex(fields);
-        StringBuffer names;
-        while (*fields)
-        {
-            const RtlFieldInfo *field = *fields;
-            if (names.length())
-                names.append(',');
-            names.append(field->name);
-            fields++;
-        }
-        OwnedPyObject pnames = PyString_FromString(names.str());
+        OwnedPyObject pnames = PyString_FromString(names);
         OwnedPyObject mynamedtupletype;
+        checkPythonError();
         mynamedtupletype.set(PyDict_GetItem(namedtupleTypes, pnames));   // NOTE - returns borrowed reference
         if (!mynamedtupletype)
         {
             OwnedPyObject recname = PyString_FromString("namerec");     // MORE - do we care what the name is?
             OwnedPyObject ntargs = PyTuple_Pack(2, recname.get(), pnames.get());
+            checkPythonError();
             OwnedPyX<PyFrameObject> frame = pushDummyFrame();
             mynamedtupletype.setown(PyObject_CallObject(namedtuple, ntargs));
             popDummyFrame(frame);
@@ -383,6 +383,25 @@ public:
         assertex(PyCallable_Check(mynamedtupletype));
         return mynamedtupletype.getClear();
     }
+
+    PyObject *getNamedTupleType(const RtlTypeInfo *type)
+    {
+        const RtlFieldInfo * const *fields = type->queryFields();
+        if (!fields && type->queryChildType())
+            fields = type->queryChildType()->queryFields();
+        assertex(fields);
+        StringBuffer names;
+        while (*fields)
+        {
+            const RtlFieldInfo *field = *fields;
+            if (names.length())
+                names.append(',');
+            names.append(field->name);
+            fields++;
+        }
+        return getNamedTupleType(names.str());
+    }
+
     PyObject *compileScript(const char *text, const char *parameters)
     {
         // Note - we do not need (and must not have) a lock protecting this. It is protected by the Python GIL,
@@ -475,6 +494,7 @@ protected:
     OwnedPyObject namedtupleTypes; // dictionary of return values from namedtuple()
     OwnedPyObject compiledScripts; // dictionary of previously compiled scripts
     OwnedPyObject preservedScopes; // dictionary of preserved scopes
+    OwnedPyObject activityContextTupleType; // type used for activity context
 } globalState;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -1260,10 +1280,12 @@ public:
         if (!resultIterator)
             return NULL;
         OwnedPyObject row = PyIter_Next(resultIterator);
+        checkPythonError();
         if (!row)
             return NULL;
         RtlDynamicRowBuilder rowBuilder(resultAllocator);
         size32_t len = py2embed::getRowResult(row, rowBuilder);
+        checkPythonError();
         return rowBuilder.finalizeRowClear(len);
     }
     virtual void stop()
@@ -1292,6 +1314,23 @@ public:
         PyEval_RestoreThread(sharedCtx->threadState);
     }
 
+    virtual void setActivityOptions(const IThorActivityContext *ctx)
+    {
+        OwnedPyObject mynamedtupletype = globalState.getActivityContextTupleType();
+        OwnedPyObject args = PyTuple_New(5);
+        OwnedPyObject isLocal;
+        isLocal.set(ctx->isLocal() ? Py_True : Py_False);
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 0, isLocal.getClear());
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 1, PyInt_FromLong(ctx->numSlaves()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 2, PyInt_FromLong(ctx->numStrands()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 3, PyInt_FromLong(ctx->querySlave()));
+        PyTuple_SET_ITEM((PyTupleObject *) args.get(), 4, PyInt_FromLong(ctx->queryStrand()));
+        OwnedPyObject activityTuple = PyObject_CallObject(mynamedtupletype, args);  // Creates a namedtuple from the supplied tuple
+        checkPythonError();
+        PyDict_SetItemString(locals, "__activity__", activityTuple.getClear());
+        checkPythonError();
+    }
+
     void setScopes(ICodeContext *codeCtx, const char *_options)
     {
         locals.setown(PyDict_New());
@@ -1614,6 +1653,12 @@ public:
     {
         script.setown(sharedCtx->compileEmbeddedScript(lenChars, utf, argstring));
     }
+    virtual void setActivityOptions(const IThorActivityContext *ctx) override
+    {
+        Python27EmbedContextBase::setActivityOptions(ctx);
+        argstring.append("__activity__");
+    }
+
 
     virtual void callFunction()
     {
@@ -1694,11 +1739,11 @@ private:
 class Python27EmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         checkThreadContext();
         Owned<Python27EmbedContextBase> ret;
@@ -1707,9 +1752,11 @@ public:
         else
             ret.setown(new Python27EmbedScriptContext(threadContext));
         ret->setScopes(ctx, options);
+        if (activityCtx)
+            ret->setActivityOptions(activityCtx);
         return ret.getClear();
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 18 - 8
plugins/sqlite3/sqlite3.cpp

@@ -357,8 +357,8 @@ protected:
 class SqLite3EmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
-    SqLite3EmbedFunctionContext(unsigned _flags, const char *options)
-    : flags(_flags), db(NULL)
+    SqLite3EmbedFunctionContext(const IThorActivityContext *_activityCtx, unsigned _flags, const char *options)
+    : activityCtx(_activityCtx), flags(_flags), db(NULL)
     {
         const char *dbname = NULL;
         StringArray opts;
@@ -541,7 +541,16 @@ public:
     }
     virtual void compileEmbeddedScript(size32_t chars, const char *script)
     {
-        size32_t len = rtlUtf8Size(chars, script);
+        StringBuffer scriptStr;
+        size32_t len;
+        if (activityCtx)
+        {
+            rtlSubstituteActivityContext(scriptStr, activityCtx, chars, script);
+            script = scriptStr.str();
+            len = scriptStr.length();
+        }
+        else
+            len = rtlUtf8Size(chars, script);
         int rc = sqlite3_prepare_v2(db, script, len, stmt.ref(), NULL);
         checkSqliteError(rc);
     }
@@ -578,24 +587,25 @@ protected:
     }
     OwnedStatement stmt;
     sqlite3 *db;
+    const IThorActivityContext *activityCtx;
     unsigned flags;
 };
 
 class SqLite3EmbedContext : public CInterfaceOf<IEmbedContext>
 {
 public:
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
         if (flags & EFimport)
             UNSUPPORTED("IMPORT");
         else
-            return new SqLite3EmbedFunctionContext(flags, options);
+            return new SqLite3EmbedFunctionContext(activityCtx, flags, options);
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 20 - 4
plugins/v8embed/v8embed.cpp

@@ -454,6 +454,10 @@ public:
         isolate->Exit();
         isolate->Dispose();
     }
+    void setActivityContext(const IThorActivityContext *_activityCtx)
+    {
+        activityCtx = _activityCtx;
+    }
     virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name)
     {
         return NULL;
@@ -897,6 +901,16 @@ public:
         assertex (!script.IsEmpty());
         v8::HandleScope handle_scope;
         v8::TryCatch tryCatch;
+        if (activityCtx)
+        {
+            v8::Handle<v8::Object> jsActivityCtx = v8::Object::New();
+            jsActivityCtx->Set(v8::String::New("isLocal"), v8::Boolean::New(activityCtx->isLocal()));
+            jsActivityCtx->Set(v8::String::New("numSlaves"), v8::Integer::NewFromUnsigned(activityCtx->numSlaves()));
+            jsActivityCtx->Set(v8::String::New("numStrands"), v8::Integer::NewFromUnsigned(activityCtx->numStrands()));
+            jsActivityCtx->Set(v8::String::New("slave"), v8::Integer::NewFromUnsigned(activityCtx->querySlave()));
+            jsActivityCtx->Set(v8::String::New("strand"), v8::Integer::NewFromUnsigned(activityCtx->queryStrand()));
+            context->Global()->Set(v8::String::New("__activity__"), jsActivityCtx);
+        }
         result = v8::Persistent<v8::Value>::New(script->Run());
         v8::Handle<v8::Value> exception = tryCatch.Exception();
         if (!exception.IsEmpty())
@@ -907,6 +921,7 @@ public:
     }
 
 protected:
+    const IThorActivityContext *activityCtx = nullptr;
     v8::Isolate *isolate;
     v8::Persistent<v8::Context> context;
     v8::Persistent<v8::Script> script;
@@ -936,11 +951,11 @@ public:
     V8JavascriptEmbedContext()
     {
     }
-    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override
     {
-        return createFunctionContextEx(NULL, flags, options);
+        return createFunctionContextEx(nullptr, nullptr, flags, options);
     }
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityContext, unsigned flags, const char *options) override
     {
         if (flags & EFimport)
             UNSUPPORTED("IMPORT");
@@ -949,9 +964,10 @@ public:
             theFunctionContext = new V8JavascriptEmbedFunctionContext;
             threadHookChain = addThreadTermFunc(releaseContext);
         }
+        theFunctionContext->setActivityContext(activityContext);
         return LINK(theFunctionContext);
     }
-    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
+    virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
         throwUnexpected();
     }

+ 16 - 0
rtl/eclrtl/eclrtl.cpp

@@ -27,6 +27,7 @@
 #include "junicode.hpp"
 #include "eclrtl.hpp"
 #include "rtlbcd.hpp"
+#include "eclhelper.hpp"
 #include "eclrtl_imp.hpp"
 #ifdef _USE_ICU
 #include "unicode/uchar.h"
@@ -5803,6 +5804,21 @@ void rtlSubstituteEmbeddedScript(size32_t &__lenResult, char * &__result, size32
     __result = result.detach();
 }
 
+void rtlSubstituteActivityContext(StringBuffer &result, const IThorActivityContext *ctx, size32_t scriptChars, const char *script)
+{
+    result.append(rtlUtf8Size(scriptChars, script), script);
+    if (ctx)
+    {
+        char buf[20];
+        result.replaceStringNoCase("__activity__.isLocal", ctx->isLocal() ? "TRUE" : "FALSE");
+        result.replaceStringNoCase("__activity__.numSlaves", itoa(ctx->numSlaves(), buf, 10));
+        result.replaceStringNoCase("__activity__.numStrands", itoa(ctx->numStrands(), buf, 10));
+        result.replaceStringNoCase("__activity__.slave", itoa(ctx->querySlave(), buf, 10));
+        result.replaceStringNoCase("__activity__.strand", itoa(ctx->queryStrand(), buf, 10));
+    }
+}
+
+
 //---------------------------------------------------------------------------
 
 void rtlRowBuilder::forceAvailable(size32_t size)

+ 3 - 1
rtl/eclrtl/eclrtl.hpp

@@ -852,10 +852,11 @@ interface IEmbedServiceContext : extends IInterface
 enum EmbedFlags { EFembed = 1, EFimport = 2, EFnoreturn = 4, EFnoparams = 8 }; // For createFunctionContext flags
 
 interface ICodeContext;
+interface IThorActivityContext;
 interface IEmbedContext : extends IInterface
 {
     virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) = 0; // legacy
-    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options) = 0;
+    virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext * activityCtx, unsigned flags, const char *options) = 0;
     virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) = 0;
     // MORE - add syntax checked here!
 };
@@ -863,5 +864,6 @@ interface IEmbedContext : extends IInterface
 typedef IEmbedContext * (* GetEmbedContextFunction)();
 
 ECLRTL_API void rtlSubstituteEmbeddedScript(size32_t &__lenResult, char * &__result, size32_t scriptChars, const char *script, size32_t outFieldsChars, const char *outFields, size32_t searchChars, const char *search);
+ECLRTL_API void rtlSubstituteActivityContext(StringBuffer &, const IThorActivityContext *ctx, size32_t scriptChars, const char *script);
 
 #endif

+ 40 - 0
system/jlib/jstring.cpp

@@ -947,6 +947,33 @@ StringBuffer &replaceString(StringBuffer & result, size32_t lenSource, const cha
     return result;
 }
 
+StringBuffer &replaceStringNoCase(StringBuffer & result, size32_t lenSource, const char *source, size32_t lenOldStr, const char* oldStr, size32_t lenNewStr, const char* newStr)
+{
+    if (lenSource)
+    {
+        size32_t left = lenSource;
+        while (left >= lenOldStr)
+        {
+            if (memicmp(source, oldStr, lenOldStr)==0)
+            {
+                result.append(lenNewStr, newStr);
+                source += lenOldStr;
+                left -= lenOldStr;
+            }
+            else
+            {
+                result.append(*source);
+                source++;
+                left--;
+            }
+        }
+
+        // there are no more possible replacements, make sure we keep the end of the original buffer
+        result.append(left, source);
+    }
+    return result;
+}
+
 // this method will replace all occurrences of "oldStr" with "newStr"
 StringBuffer & StringBuffer::replaceString(const char* oldStr, const char* newStr)
 {
@@ -961,6 +988,19 @@ StringBuffer & StringBuffer::replaceString(const char* oldStr, const char* newSt
     return *this;
 }
 
+StringBuffer & StringBuffer::replaceStringNoCase(const char* oldStr, const char* newStr)
+{
+    if (curLen)
+    {
+        StringBuffer temp;
+        size32_t oldlen = oldStr ? strlen(oldStr) : 0;
+        size32_t newlen = newStr ? strlen(newStr) : 0;
+        ::replaceStringNoCase(temp, curLen, buffer, oldlen, oldStr, newlen, newStr);
+        swapWith(temp);
+    }
+    return *this;
+}
+
 StringBuffer & StringBuffer::stripChar(char oldChar)
 {
     size32_t delta = 0;

+ 1 - 0
system/jlib/jstring.hpp

@@ -117,6 +117,7 @@ public:
     StringBuffer &  toUpperCase();
     StringBuffer &  replace(char oldChar, char newChar);
     StringBuffer &  replaceString(const char* oldStr, const char* newStr);
+    StringBuffer &  replaceStringNoCase(const char* oldStr, const char* newStr);
     char *          reserve(size32_t size);
     char *          reserveTruncate(size32_t size);
     void            setown(StringBuffer &other);

+ 51 - 0
testing/regress/ecl/embedjs.ecl

@@ -101,6 +101,36 @@ val[2] = t;
 val;
 ENDEMBED;
 
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+m(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := MODULE
+  EXPORT streamed dataset(r) myDataset(unsigned numRows = numRows) := EMBED(javascript : activity, local(isLocal), parallel(numParallel))
+    var numSlaves = __activity__.numSlaves;
+    var numParallel = numSlaves * __activity__.numStrands;
+    var rowsPerPart = (numRows + numParallel - 1) / numParallel;
+    var thisSlave = __activity__.slave;
+    var thisIndex = thisSlave * __activity__.numStrands + __activity__.strand;
+    var first = thisIndex * rowsPerPart;
+    var last = first + rowsPerPart;
+    if (first > numRows)
+        first = numRows;
+    if (last > numRows)
+        last = numRows;
+  
+    var names = [ "Gavin", "Richard", "John", "Bart" ];
+    var ds = [ ];
+    while (first < last)
+    {
+        ds.push( { id : first, name: names[first % 4] });
+        first += 1;
+    }
+    ds;
+  ENDEMBED;
+END;
+
 add1(10);
 add2('Hello');
 add3('World');
@@ -136,3 +166,24 @@ SUM(NOFOLD(s1b + s2b), a);
 s1c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING) COUNTER + '1')));
 s2c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING)(COUNTER/2) + '1')));
 SUM(NOFOLD(s1c + s2c), a);
+
+// Test embed activity
+
+//Global activity - fixed number of rows
+output(m(10).myDataset());
+//Local version of the activity 
+output(count(m(10, isLocal := true).myDataset()) = CLUSTERSIZE * 10);
+
+//Check that stranding (if implemented) still generates unique records
+output(COUNT(DEDUP(m(1000, numParallel := 5).myDataset(), id, ALL)));
+
+r2 := RECORD
+    UNSIGNED id;
+    DATASET(r) child;
+END;
+
+//Check that the activity can also be executed in a child query
+output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER).myDataset())));
+
+//Test stranding inside a child query
+output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER, NumParallel := 3).myDataset())));

+ 42 - 0
testing/regress/ecl/key/embedjs.xml

@@ -64,3 +64,45 @@
 <Dataset name='Result 22'>
  <Row><Result_22>328126500000</Result_22></Row>
 </Dataset>
+<Dataset name='Result 23'>
+ <Row><id>0</id><name>Gavin</name></Row>
+ <Row><id>1</id><name>Richard</name></Row>
+ <Row><id>2</id><name>John</name></Row>
+ <Row><id>3</id><name>Bart</name></Row>
+ <Row><id>4</id><name>Gavin</name></Row>
+ <Row><id>5</id><name>Richard</name></Row>
+ <Row><id>6</id><name>John</name></Row>
+ <Row><id>7</id><name>Bart</name></Row>
+ <Row><id>8</id><name>Gavin</name></Row>
+ <Row><id>9</id><name>Richard</name></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><Result_24>true</Result_24></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><Result_25>1000</Result_25></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>

+ 21 - 15
testing/regress/ecl/key/mysqlembed.xml

@@ -8,55 +8,61 @@
  <Row><Result_2>2</Result_2></Row>
 </Dataset>
 <Dataset name='Result 3'>
+ <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
+ <Row><name>name2</name><bval>66</bval><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2><dt>2015-12-25 01:23:45</dt></Row>
+ <Row><name>nulls</name><bval>-1</bval><value>99999</value><boolval>true</boolval><r8>99.98999999999999</r8><r4>999.989990234375</r4><d>393939393939</d><ddd>9.99</ddd><u1>9999 ß</u1><u2>9999 ßßß</u2><dt>1963-11-22 12:30:00</dt></Row>
+ <Row><name>utf8test</name><bval>65</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>2019-02-01 23:59:59</dt></Row>
+</Dataset>
+<Dataset name='Result 4'>
  <Row><name>name1</name></Row>
  <Row><name>name2</name></Row>
  <Row><name>nulls</name></Row>
  <Row><name>utf8test</name></Row>
 </Dataset>
-<Dataset name='Result 4'>
- <Row><Result_4>name1</Result_4></Row>
-</Dataset>
 <Dataset name='Result 5'>
- <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
+ <Row><Result_5>name1</Result_5></Row>
 </Dataset>
 <Dataset name='Result 6'>
- <Row><Result_6>utf8test</Result_6></Row>
+ <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
 </Dataset>
 <Dataset name='Result 7'>
- <Row><name>utf8test</name><bval>65</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>2019-02-01 23:59:59</dt></Row>
+ <Row><Result_7>utf8test</Result_7></Row>
 </Dataset>
 <Dataset name='Result 8'>
- <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
- <Row><name>name2</name><bval>66</bval><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2><dt>2015-12-25 01:23:45</dt></Row>
+ <Row><name>utf8test</name><bval>65</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>2019-02-01 23:59:59</dt></Row>
 </Dataset>
 <Dataset name='Result 9'>
- <Row><Result_9>4</Result_9></Row>
+ <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
+ <Row><name>name2</name><bval>66</bval><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2><dt>2015-12-25 01:23:45</dt></Row>
 </Dataset>
 <Dataset name='Result 10'>
- <Row><Result_10>true</Result_10></Row>
+ <Row><Result_10>4</Result_10></Row>
 </Dataset>
 <Dataset name='Result 11'>
- <Row><Result_11>5.6</Result_11></Row>
+ <Row><Result_11>true</Result_11></Row>
 </Dataset>
 <Dataset name='Result 12'>
- <Row><Result_12>7.800000190734863</Result_12></Row>
+ <Row><Result_12>5.6</Result_12></Row>
 </Dataset>
 <Dataset name='Result 13'>
- <Row><Result_13>6161353561613535</Result_13></Row>
+ <Row><Result_13>7.800000190734863</Result_13></Row>
 </Dataset>
 <Dataset name='Result 14'>
- <Row><Result_14>Straße</Result_14></Row>
+ <Row><Result_14>6161353561613535</Result_14></Row>
 </Dataset>
 <Dataset name='Result 15'>
  <Row><Result_15>Straße  </Result_15></Row>
 </Dataset>
 <Dataset name='Result 16'>
+ <Row><Result_16>Straße  </Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
  <Row><dt1>19631122123000</dt1><dt2>1963-11-22 12:30:00</dt2></Row>
  <Row><dt1>20151225012345</dt1><dt2>2015-12-25 01:23:45</dt2></Row>
  <Row><dt1>0</dt1><dt2>                   </dt2></Row>
  <Row><dt1>20190201235959</dt1><dt2>2019-02-01 23:59:59</dt2></Row>
 </Dataset>
-<Dataset name='Result 17'>
+<Dataset name='Result 18'>
  <Row><name>name1</name><bval>16737</bval><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2><dt>1963-11-22 12:30:00</dt></Row>
  <Row><name>name2</name><bval>66</bval><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2><dt>2015-12-25 01:23:45</dt></Row>
 </Dataset>

+ 42 - 0
testing/regress/ecl/key/py2embedactivity.xml

@@ -0,0 +1,42 @@
+<Dataset name='Result 1'>
+ <Row><id>0</id><name>Gavin</name></Row>
+ <Row><id>1</id><name>Richard</name></Row>
+ <Row><id>2</id><name>John</name></Row>
+ <Row><id>3</id><name>Bart</name></Row>
+ <Row><id>4</id><name>Gavin</name></Row>
+ <Row><id>5</id><name>Richard</name></Row>
+ <Row><id>6</id><name>John</name></Row>
+ <Row><id>7</id><name>Bart</name></Row>
+ <Row><id>8</id><name>Gavin</name></Row>
+ <Row><id>9</id><name>Richard</name></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>true</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>1000</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>

+ 42 - 0
testing/regress/ecl/key/py3embedactivity.xml

@@ -0,0 +1,42 @@
+<Dataset name='Result 1'>
+ <Row><id>0</id><name>Gavin</name></Row>
+ <Row><id>1</id><name>Richard</name></Row>
+ <Row><id>2</id><name>John</name></Row>
+ <Row><id>3</id><name>Bart</name></Row>
+ <Row><id>4</id><name>Gavin</name></Row>
+ <Row><id>5</id><name>Richard</name></Row>
+ <Row><id>6</id><name>John</name></Row>
+ <Row><id>7</id><name>Bart</name></Row>
+ <Row><id>8</id><name>Gavin</name></Row>
+ <Row><id>9</id><name>Richard</name></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>true</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>1000</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>Richard</name></Row><Row><id>2</id><name>John</name></Row><Row><id>3</id><name>Bart</name></Row><Row><id>4</id><name>Gavin</name></Row><Row><id>5</id><name>Richard</name></Row><Row><id>6</id><name>John</name></Row><Row><id>7</id><name>Bart</name></Row><Row><id>8</id><name>Gavin</name></Row><Row><id>9</id><name>Richard</name></Row></child></Row>
+</Dataset>

+ 7 - 0
testing/regress/ecl/mysqlembed.ecl

@@ -80,6 +80,12 @@ dataset(childrec) testMySQLDS2() := EMBED(mysql : server(myServer),user(myUser),
   SELECT [] from tbl1 where u1='Straße';
 ENDEMBED;
 
+// NOTE - while this works as a test case you really don't want to be using the modulo in this way to pull a dataset in parallel, as it won't be able to be indexed efficiently in MySQL
+
+streamed dataset(childrec) testMySQLDS2a() := EMBED(mysql : server(myServer),user(myUser),database(myDB),PROJECTED('[]'), activity, local(true))
+  SELECT [] from tbl1 where (NOT __ACTIVITY__.ISLOCAL) OR (value % __activity__.numSlaves = __activity__.slave);
+ENDEMBED;
+
 ds3query := u'SELECT ** FROM tbl1;' : STORED('ds3query');
 
 dataset(childrec) testMySQLDS3() := EMBED(mysql, ds3query : server(myServer),user(myUser),database(myDB),PROJECTED(u'**'));
@@ -165,6 +171,7 @@ sequential (
   PARALLEL (
   OUTPUT(testMySQLDS()),
   COUNT(testMySQLDS2()),
+  OUTPUT(testMySQLDS2a()),
   OUTPUT(testMySQLDS3(), {name}),
   OUTPUT(testMySQLRow().name),
   OUTPUT(testMySQLParms('name1', 1, true, 1.2, 3.4, D'aa55aa55', U'Straße', U'Straße')),

+ 69 - 0
testing/regress/ecl/py2embedactivity.ecl

@@ -0,0 +1,69 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=embedded
+//class=python2
+
+IMPORT Python;
+
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+m(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := MODULE
+  EXPORT streamed dataset(r) myDataset(unsigned numRows = numRows) := EMBED(Python : activity, local(isLocal), parallel(numParallel))
+    numSlaves = __activity__.numSlaves
+    numParallel = numSlaves * __activity__.numStrands
+    rowsPerPart = (numRows + numParallel - 1) / numParallel
+    thisSlave = __activity__.slave
+    thisIndex = thisSlave * __activity__.numStrands + __activity__.strand
+    first = thisIndex * rowsPerPart
+    last = first + rowsPerPart
+    if first > numRows:
+      first = numRows
+    if last > numRows:
+      last = numRows
+
+    names = [ "Gavin", "Richard", "John", "Bart" ]
+    while first < last:
+        yield (first, names[first % 4 ])
+        first += 1
+  ENDEMBED;
+END;
+
+r2 := RECORD
+    UNSIGNED id;
+    DATASET(r) child;
+END;
+
+sequential(
+  //Global activity - fixed number of rows
+  output(m(10).myDataset());
+
+  //Local version of the activity 
+  output(count(m(10, isLocal := true).myDataset()) = CLUSTERSIZE * 10);
+
+  //Check that stranding (if implemented) still generates unique records
+  output(COUNT(DEDUP(m(1000, numParallel := 5).myDataset(), id, ALL)));
+
+  //Check that the activity can also be executed in a child query
+  output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER).myDataset())));
+
+  //Test stranding inside a child query
+  output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER, NumParallel := 3).myDataset())));
+);

+ 69 - 0
testing/regress/ecl/py3embedactivity.ecl

@@ -0,0 +1,69 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=embedded
+//class=python3
+
+IMPORT Python3;
+
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+m(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := MODULE
+  EXPORT streamed dataset(r) myDataset(unsigned numRows = numRows) := EMBED(Python3 : activity, local(isLocal), parallel(numParallel))
+    numSlaves = __activity__.numSlaves
+    numParallel = numSlaves * __activity__.numStrands
+    rowsPerPart = (numRows + numParallel - 1) // numParallel
+    thisSlave = __activity__.slave
+    thisIndex = thisSlave * __activity__.numStrands + __activity__.strand
+    first = thisIndex * rowsPerPart
+    last = first + rowsPerPart
+    if first > numRows:
+      first = numRows
+    if last > numRows:
+      last = numRows
+
+    names = [ "Gavin", "Richard", "John", "Bart" ]
+    while first < last:
+        yield (first, names[first % 4 ])
+        first += 1
+  ENDEMBED;
+END;
+
+r2 := RECORD
+    UNSIGNED id;
+    DATASET(r) child;
+END;
+
+sequential(
+  //Global activity - fixed number of rows
+  output(m(10).myDataset());
+
+  //Local version of the activity 
+  output(count(m(10, isLocal := true).myDataset()) = CLUSTERSIZE * 10);
+
+  //Check that stranding (if implemented) still generates unique records
+  output(COUNT(DEDUP(m(1000, numParallel := 5).myDataset(), id, ALL)));
+
+  //Check that the activity can also be executed in a child query
+  output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER).myDataset())));
+
+  //Test stranding inside a child query
+  output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := m(COUNTER, NumParallel := 3).myDataset())));
+);

+ 1 - 1
testing/regress/ecl/sqlite.ecl

@@ -29,7 +29,7 @@ IMPORT SqLite3;
 
 // This is the record structure in ECL that will correspond to the rows in the SQLite dataset
 // Note that the default values specified in the fields will be used when a NULL value is being
-// returned from MySQL
+// returned from SQLite
 
 childrec := RECORD
    string name,