瀏覽代碼

HPCC-13036 Add support for embed activities

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 7 年之前
父節點
當前提交
c0d0e2f175
共有 43 個文件被更改,包括 1185 次插入62 次删除
  1. 2 0
      ecl/hql/hqlatoms.cpp
  2. 1 0
      ecl/hql/hqlatoms.hpp
  3. 13 1
      ecl/hql/hqlattr.cpp
  4. 93 5
      ecl/hql/hqlexpr.cpp
  5. 7 0
      ecl/hql/hqlexpr.hpp
  6. 4 0
      ecl/hql/hqlutil.cpp
  7. 2 1
      ecl/hql/hqlutil.hpp
  8. 2 0
      ecl/hqlcpp/hqlcerrors.hpp
  9. 1 0
      ecl/hqlcpp/hqlcpp.ipp
  10. 3 0
      ecl/hqlcpp/hqlcppds.cpp
  11. 1 0
      ecl/hqlcpp/hqlcppsys.ecl
  12. 104 1
      ecl/hqlcpp/hqlhtcpp.cpp
  13. 3 0
      ecl/hqlcpp/hqlinline.cpp
  14. 2 0
      ecl/hqlcpp/hqliproj.cpp
  15. 27 0
      ecl/hqlcpp/hqlttcpp.cpp
  16. 14 0
      ecl/hqlcpp/hqlwcpp.cpp
  17. 13 21
      ecl/hthor/hthor.cpp
  18. 24 7
      ecl/hthor/hthor.ipp
  19. 74 0
      ecl/regress/embedactivity1err.ecl
  20. 4 0
      roxie/ccd/ccdquery.cpp
  21. 142 3
      roxie/ccd/ccdserver.cpp
  22. 1 0
      roxie/ccd/ccdserver.hpp
  23. 15 0
      rtl/eclrtl/eclhelper_base.cpp
  24. 2 2
      rtl/eclrtl/rtlnewkey.hpp
  25. 41 20
      rtl/include/eclhelper.hpp
  26. 14 0
      rtl/include/eclhelper_base.hpp
  27. 85 0
      testing/regress/ecl/embedactivity1.ecl
  28. 33 0
      testing/regress/ecl/embedactivity2.ecl
  29. 62 0
      testing/regress/ecl/embedactivity3.ecl
  30. 77 0
      testing/regress/ecl/embedactivity4.ecl
  31. 42 0
      testing/regress/ecl/key/embedactivity1.xml
  32. 1 0
      testing/regress/ecl/key/embedactivity2.xml
  33. 6 0
      testing/regress/ecl/key/embedactivity3.xml
  34. 18 0
      testing/regress/ecl/key/embedactivity4.xml
  35. 1 0
      thorlcr/activities/activitymasters_lcr.cmake
  36. 1 0
      thorlcr/activities/activityslaves_lcr.cmake
  37. 27 0
      thorlcr/activities/external/thexternal.cpp
  38. 25 0
      thorlcr/activities/external/thexternal.ipp
  39. 175 0
      thorlcr/activities/external/thexternalslave.cpp
  40. 3 0
      thorlcr/graph/thgraph.cpp
  41. 7 1
      thorlcr/graph/thgraphslave.cpp
  42. 6 0
      thorlcr/master/thactivitymaster.cpp
  43. 7 0
      thorlcr/slave/slave.cpp

+ 2 - 0
ecl/hql/hqlatoms.cpp

@@ -77,6 +77,7 @@ IAtom * actionAtom;
 IAtom * activeAtom;
 IAtom * activeFailureAtom;
 IAtom * activeNlpAtom;
+IAtom * activityAtom;
 IAtom * afterAtom;
 IAtom * algorithmAtom;
 IAtom * _aliased_Atom;
@@ -538,6 +539,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
     MAKEATOM(active);
     MAKEATOM(activeFailure);
     MAKEATOM(activeNlp);
+    MAKEATOM(activity);
     MAKEATOM(after);
     MAKEATOM(algorithm);
     MAKESYSATOM(aliased);

+ 1 - 0
ecl/hql/hqlatoms.hpp

@@ -81,6 +81,7 @@ extern HQL_API IAtom * actionAtom;
 extern HQL_API IAtom * activeAtom;
 extern HQL_API IAtom * activeFailureAtom;
 extern HQL_API IAtom * activeNlpAtom;
+extern HQL_API IAtom * activityAtom;
 extern HQL_API IAtom * afterAtom;
 extern HQL_API IAtom * algorithmAtom;
 extern HQL_API IAtom * _aliased_Atom;

+ 13 - 1
ecl/hql/hqlattr.cpp

@@ -1706,6 +1706,15 @@ bool isLocalActivity(IHqlExpression * expr)
         return false;
     case no_compound:
         return isLocalActivity(expr->queryChild(1));
+    case no_call:
+    case no_externalcall:
+        if (callIsActivity(expr))
+        {
+            //Can only be deduced by substituting the parameters into the body and seeing if the local attribute has a constant value
+            //currently assume false.  It may need improving when support for global embed activities is added.
+            return false;
+        }
+        return false;
     case no_compound_diskread:
     case no_compound_disknormalize:
     case no_compound_diskaggregate:
@@ -1904,7 +1913,10 @@ bool localChangesActivityAction(IHqlExpression * expr)
     case no_nwaymerge:
     case no_selfjoin:
     case no_joincount:
-        return !isKeyedJoin(expr);          // Keyed joins always 
+        return !isKeyedJoin(expr);          // Keyed joins always
+    case no_call:
+    case no_externalcall:
+        return callIsActivity(expr);
     }
     return false;
 }

+ 93 - 5
ecl/hql/hqlexpr.cpp

@@ -2264,7 +2264,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
     case no_httpcall:
     case no_soapcall:
     case no_newsoapcall:
-    case no_externalcall:                       // None in the sense it is generally used for.
     case no_alias:
     case no_id2blob:
     case no_embedbody:
@@ -2274,7 +2273,6 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
     case no_param:
     case no_typetransfer:
     case no_translated:
-    case no_call:
     case no_rows:
     case no_external:
     case no_delayedselect:
@@ -2450,6 +2448,14 @@ childDatasetType getChildDatasetType(IHqlExpression * expr)
         return childdataset_left;
     case no_chooseds:
         return childdataset_many_noscope;
+    case no_call:
+        if (functionCallIsActivity(expr))
+            return childdataset_many_noscope;
+        return childdataset_none;
+    case no_externalcall:
+        if (externalCallIsActivity(expr))
+            return childdataset_many_noscope;
+        return childdataset_none;
     case no_merge:
     case no_regroup:
     case no_cogroup:
@@ -2698,7 +2704,6 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
     case no_httpcall:
     case no_soapcall:
     case no_newsoapcall:
-    case no_externalcall:                       // None in the sense it is generally used for.
     case no_alias:
     case no_id2blob:
     case no_embedbody:
@@ -2706,7 +2711,6 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
     case no_datasetfromdictionary:
     case no_param:
     case no_translated:
-    case no_call:
     case no_rows:
     case no_external:
     case no_rowsetindex:
@@ -2767,6 +2771,14 @@ inline unsigned doGetNumChildTables(IHqlExpression * dataset)
     case no_quoted:
     case no_variable:
         return 0;
+    case no_call:
+        if (functionCallIsActivity(dataset))
+            return numStreamInputs(dataset->queryBody()->queryFunctionDefinition());
+        return 0;
+    case no_externalcall:
+        if (externalCallIsActivity(dataset))
+            return numStreamInputs(dataset->queryExternalDefinition());
+        return 0;
     case no_mapto:
     case no_compound:
         return 0;       // a lie.
@@ -8398,6 +8410,81 @@ bool functionBodyUsesContext(IHqlExpression * body)
     }
 }
 
+bool functionBodyIsActivity(IHqlExpression * body)
+{
+    switch (body->getOperator())
+    {
+    case no_external:
+        return body->hasAttribute(activityAtom);
+    case no_outofline:
+    case no_funcdef:
+        return functionBodyIsActivity(body->queryChild(0));
+    case no_embedbody:
+        return body->hasAttribute(activityAtom);
+    default:
+        return false;
+    }
+}
+
+bool functionCallIsActivity(IHqlExpression * call)
+{
+    dbgassertex(call->getOperator() == no_call);
+    return functionBodyIsActivity(call->queryBody()->queryFunctionDefinition());
+}
+
+bool externalCallIsActivity(IHqlExpression * call)
+{
+    dbgassertex(call->getOperator() == no_externalcall);
+    return functionBodyIsActivity(call->queryExternalDefinition());
+}
+
+IHqlExpression * queryFuncdef(IHqlExpression * call)
+{
+    switch (call->getOperator())
+    {
+    case no_call:
+        return call->queryBody()->queryFunctionDefinition();
+    case no_externalcall:
+        return call->queryExternalDefinition();
+    default:
+        throwUnexpected();
+    }
+}
+
+bool callIsActivity(IHqlExpression * call)
+{
+    return functionBodyIsActivity(queryFuncdef(call));
+}
+
+bool isStreamingDataset(IHqlExpression * param)
+{
+    dbgassertex(param->getOperator() == no_param);
+    ITypeInfo * paramType = param->queryType();
+    switch (paramType->getTypeCode())
+    {
+    case type_table:
+    case type_groupedtable:
+        if (hasStreamedModifier(paramType))
+            return true;
+        break;
+    }
+    return false;
+}
+
+unsigned numStreamInputs(IHqlExpression * funcdef)
+{
+    dbgassertex(funcdef->getOperator() == no_funcdef);
+    IHqlExpression * formals = funcdef->queryChild(1);
+    unsigned numStreams = 0;
+    ForEachChild(i, formals)
+    {
+        if (!isStreamingDataset(formals->queryChild(i)))
+            break;
+        numStreams++;
+    }
+    return numStreams;
+}
+
 IHqlExpression * createFunctionDefinition(IIdAtom * id, HqlExprArray & args)
 {
     IHqlExpression * body = &args.item(0);
@@ -12438,7 +12525,6 @@ IHqlExpression * expandOutOfLineFunctionCall(IHqlExpression * expr)
     CallExpansionContext ctx;
     ctx.functionCache = &functionCache;
     ctx.forceOutOfLineExpansion = true;
-    assertex(expr->getOperator() == no_call);
     if (ctx.expandFunctionCall(expr))
         return expandFunctionCallPreserveAnnotation(ctx, expr);
     return LINK(expr);
@@ -12931,6 +13017,8 @@ IHqlExpression * createExternalFuncdefFromInternal(IHqlExpression * funcdef)
         attrs.append(*createAttribute(contextSensitiveAtom));
     if (functionBodyUsesContext(body))
         attrs.append(*LINK(cachedContextAttribute));
+    if (functionBodyIsActivity(body))
+        attrs.append(*createAttribute(activityAtom));
 
     IHqlExpression *child = body->queryChild(0);
     if (child && child->getOperator()==no_embedbody)

+ 7 - 0
ecl/hql/hqlexpr.hpp

@@ -1889,6 +1889,13 @@ extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, IHqlExp
 extern HQL_API IHqlExpression * createFunctionDefinition(IIdAtom * name, HqlExprArray & args);
 extern HQL_API IHqlExpression * queryNonDelayedBaseAttribute(IHqlExpression * expr);
 extern HQL_API bool functionBodyUsesContext(IHqlExpression * body);
+extern HQL_API bool functionBodyIsActivity(IHqlExpression * body);
+extern HQL_API bool functionCallIsActivity(IHqlExpression * call);
+extern HQL_API bool externalCallIsActivity(IHqlExpression * call);
+extern HQL_API bool callIsActivity(IHqlExpression * call); // can be no_call or no_externalcall
+extern HQL_API IHqlExpression * queryFuncdef(IHqlExpression * call);
+extern HQL_API bool isStreamingDataset(IHqlExpression * param);
+extern HQL_API unsigned numStreamInputs(IHqlExpression * funcdef);
 
 #define NO_AGGREGATE        \
          no_count:          \

+ 4 - 0
ecl/hql/hqlutil.cpp

@@ -7809,6 +7809,8 @@ public:
             mangled.append("P18IGlobalCodeContext");
         else if (body->hasAttribute(userMatchFunctionAtom))
             mangled.append("P12IMatchWalker");
+        if (functionBodyIsActivity(body))
+            mangled.append("P20IThorActivityContext");
 
         mangled.append(mangledReturnParameters);
 
@@ -8079,6 +8081,8 @@ public:
             mangled.append("PVIGlobalCodeContext@@");
         else if (body->hasAttribute(userMatchFunctionAtom))
             mangled.append("PVIMatchWalker@@");
+        if (functionBodyIsActivity(body))
+            mangled.append("PVIThorActivityContext@@");
 
         if (mangledReturnParameters.length())
             mangled.append(mangledReturnParameters);

+ 2 - 1
ecl/hql/hqlutil.hpp

@@ -252,7 +252,8 @@ extern HQL_API bool isTimed(IHqlExpression * expr);
 
 inline bool isInternalEmbedAttr(IAtom *name)
 {
-    return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom;
+    return name == languageAtom || name == projectedAtom || name == streamedAtom || name == _linkCounted_Atom || 
+           name == importAtom || name==foldAtom || name==timeAtom || name==prebindAtom || name == activityAtom;
 }
 
 

+ 2 - 0
ecl/hqlcpp/hqlcerrors.hpp

@@ -221,6 +221,7 @@
 #define HQLERR_ExpectedFileLhsFetch             4209
 #define HQLERR_IncompatibleKeyedSubString       4210
 #define HQLERR_NonNullChildDSDefault            4211
+#define HQLERR_AttributeXMustBeConstant         4212
 
 //Warnings....
 #define HQLWRN_PersistDataNotLikely             4500
@@ -523,6 +524,7 @@
 #define HQLERR_ExpectedFileLhsFetch_Text        "The first argument of FETCH must be a disk file (had %s)"
 #define HQLERR_IncompatibleKeyedSubString_Text  "Cannot use two different KEYED substring filters for field %s in key %s"
 #define HQLERR_NonNullChildDSDefault_Text       "Non-null child dataset may not be used as default value (target field '%s')"
+#define HQLERR_AttributeXMustBeConstant_Text    "Attribute %s must be set to a constant value"
 
 //Warnings.
 #define HQLWRN_CannotRecreateDistribution_Text  "Cannot recreate the distribution for a persistent dataset"

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -1488,6 +1488,7 @@ public:
     ABoundActivity * doBuildActivityDistribute(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityDistribution(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
     ABoundActivity * doBuildActivitySectionInput(BuildCtx & ctx, IHqlExpression * expr);
+    ABoundActivity * doBuildActivityEmbed(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
     ABoundActivity * doBuildActivityEnth(BuildCtx & ctx, IHqlExpression * expr);
     ABoundActivity * doBuildActivityExecuteWhen(BuildCtx & ctx, IHqlExpression * expr, bool isRoot);
     ABoundActivity * doBuildActivityForceLocal(BuildCtx & ctx, IHqlExpression * expr);

+ 3 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -2370,6 +2370,9 @@ void HqlCppTranslator::doBuildDataset(BuildCtx & ctx, IHqlExpression * expr, CHq
             buildTempExpr(ctx, expr, tgt, format);
             return;
         }
+        break;
+    case no_quoted:
+        throwUnexpectedX("Translated expression passed to doBuildDataset()");
     }
 
     if (expr->isDictionary())

+ 1 - 0
ecl/hqlcpp/hqlcppsys.ecl

@@ -790,6 +790,7 @@ const char * cppSystemText[]  = {
     "   boolean newMemorySpillSplitArg(unsigned4 usageCount, const varstring name, boolean meta) : include, pseudoentrypoint='new CLibraryMemorySpillSplitArg';",
     "   boolean newWorkUnitReadArg(const varstring _name, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitReadArg';",
     "   boolean newWorkUnitWriteArg(const varstring _name, unsigned4 _flags, boolean _meta) : include, pseudoentrypoint='new CLibraryWorkUnitWriteArg';",
+    "   CThorExternalArg(unsigned4 _numInputs) : include;",
 
     "   destructMetaMember(row _x) : omethod,entrypoint='destruct';",
     "   walkIndirectMetaMember(row _x, boolean _visitor) : omethod,entrypoint='walkIndirectMembers';",

+ 104 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -2351,6 +2351,21 @@ void ActivityInstance::buildPrefix()
 
 void ActivityInstance::buildSuffix()
 {
+    if (!implementationClassName && constructorArgs)
+    {
+        StringBuffer baseClassName;
+        baseClassName.append("CThor").append(activityArgName).append("Arg");
+        IIdAtom * baseClassId = createIdAtom(baseClassName);
+        OwnedHqlExpr call = translator.bindFunctionCall(baseClassId, constructorArgs);
+
+        StringBuffer s;
+        s.append(className).append("() : ");
+        translator.generateExprCpp(s, call);
+        s.append(" {}");
+
+        classctx.addQuoted(s);
+    }
+
     if (numChildQueries)
         addAttributeInt(WaNumChildQueries, numChildQueries);
 
@@ -6490,7 +6505,9 @@ ABoundActivity * HqlCppTranslator::buildActivity(BuildCtx & ctx, IHqlExpression
                 //Items in this list need to also be in the list inside doBuildActivityChildDataset
             case no_call:
             case no_externalcall:
-                if (expr->isAction())
+                if (callIsActivity(expr))
+                    result = doBuildActivityEmbed(ctx, expr, isRoot);
+                else if (expr->isAction())
                     result = doBuildActivityAction(ctx, expr, isRoot);
                 else if (expr->isDatarow())
                     result = doBuildActivityCreateRow(ctx, expr, false);
@@ -8864,6 +8881,92 @@ ABoundActivity * HqlCppTranslator::doBuildActivityRemote(BuildCtx & ctx, IHqlExp
 }
 
 
+//---------------------------------------------------------------------------------------------------------------------
+
+ABoundActivity * HqlCppTranslator::doBuildActivityEmbed(BuildCtx & ctx, IHqlExpression * expr, bool isRoot)
+{
+    CIArray bound;
+    IHqlExpression * funcdef = queryFuncdef(expr);
+    IHqlExpression * formals = funcdef->queryChild(1);
+    ForEachChild(iInput, formals)
+    {
+        IHqlExpression * input = formals->queryChild(iInput);
+        if (!isStreamingDataset(input))
+            break;
+
+        IHqlExpression * actual = expr->queryChild(iInput);
+        assertex(actual);
+        bound.append(*buildCachedActivity(ctx, actual));
+    }
+
+    ThorActivityKind kind;
+    if (expr->isDataset())
+        kind = bound.empty() ? TAKexternalsource : TAKexternalprocess;
+    else
+        kind = TAKexternalsink;
+
+    Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, kind, expr, "External");
+
+    //Substitute the parameters into the external call/embed definition, so that any attributes that depend on the arguments
+    //are expanded.
+    OwnedHqlExpr expandedCall = expandOutOfLineFunctionCall(expr);
+
+    //The setting for the local attribute allows the localness to be configured for the activity
+    IHqlExpression * localAttr = expandedCall->queryAttribute(localAtom);
+    if (localAttr)
+    {
+        IHqlExpression * value = localAttr->queryChild(0);
+        if (value)
+            instance->isLocal = getBoolValue(value, false);
+        else
+            instance->isLocal = true;
+    }
+
+    buildActivityFramework(instance, isRoot);
+    OwnedHqlExpr numInputsExpr = getSizetConstant(bound.ordinality());
+    instance->addConstructorParameter(numInputsExpr);
+
+    buildInstancePrefix(instance);
+
+    HqlExprArray actuals;
+    ForEachChild(i, formals)
+    {
+        if (i < bound.ordinality())
+        {
+            IHqlExpression * input = formals->queryChild(i);
+            StringBuffer argument;
+            argument.append("inputs[").append(i).append("]");
+            OwnedHqlExpr ds = createQuoted(argument, input->getType());
+            actuals.append(*createTranslated(ds));
+        }
+        else
+        {
+            IHqlExpression * actual = expr->queryChild(i);
+            actuals.append(*LINK(actual));
+        }
+    }
+    OwnedHqlExpr newCall = expr->clone(actuals);
+
+    if (expr->isDataset())
+    {
+        MemberFunction func(*this, instance->startctx, "virtual IRowStream * createOutput(IThorActivityContext * activityContext) override");
+        buildReturn(func.ctx, newCall);
+    }
+    else
+    {
+        MemberFunction func(*this, instance->startctx, "virtual void execute(IThorActivityContext * activityContext) override");
+        buildStmt(func.ctx, newCall);
+    }
+
+    buildInstanceSuffix(instance);
+
+    ForEachItemIn(idx2, bound)
+        buildConnectInputOutput(ctx, instance, (ABoundActivity *)&bound.item(idx2), 0, idx2);
+
+    return instance->getBoundActivity();
+}
+
+
 //---------------------------------------------------------------------------
 // no_update
 

+ 3 - 0
ecl/hqlcpp/hqlinline.cpp

@@ -112,6 +112,9 @@ static unsigned calcInlineFlags(BuildCtx * ctx, IHqlExpression * expr)
         return getInlineFlags(ctx, expr->queryChild(0));
     case no_call:
     case no_externalcall:               // no so sure about this - should possibly be assignable only. (also no_call above)
+        if (callIsActivity(expr))
+            return 0;
+        //fallthrough
     case no_getresult:
         if (isStreamed(expr))
             return RETiterate;

+ 2 - 0
ecl/hqlcpp/hqliproj.cpp

@@ -2173,6 +2173,8 @@ ProjectExprKind ImplicitProjectTransformer::getProjectExprKind(IHqlExpression *
         return SinkActivity;
     case no_call:
     case no_externalcall:
+        if (callIsActivity(expr) && (getNumChildTables(expr) != 0))
+            return FixedInputActivity;
         if (hasActivityType(expr))
         {
             if (isProjectableCall(expr))

+ 27 - 0
ecl/hqlcpp/hqlttcpp.cpp

@@ -987,6 +987,8 @@ YesNoOption HqlThorBoundaryTransformer::calcNormalizeThor(IHqlExpression * expr)
 
     case no_call:
         {
+            if (functionCallIsActivity(expr))
+                return OptionYes;
             YesNoOption bodyOption = normalizeThor(expr->queryBody()->queryFunctionDefinition());
             //do Something with it
             break;
@@ -997,6 +999,8 @@ YesNoOption HqlThorBoundaryTransformer::calcNormalizeThor(IHqlExpression * expr)
             IHqlExpression * funcDef = func->queryChild(0);
             if (funcDef->hasAttribute(gctxmethodAtom) || funcDef->hasAttribute(globalContextAtom))
                 return OptionNo;
+            if (externalCallIsActivity(expr))
+                return OptionYes;
 //          if (funcDef->hasAttribute(graphAtom))
 //              return OptionYes;
             if (!resourceConditionalActions && expr->isAction())
@@ -2215,6 +2219,8 @@ IHqlExpression * ThorHqlTransformer::createTransformed(IHqlExpression * expr)
                 if (expr->isDatarow())
                     args.append(*createAttribute(allocatorAtom));
             }
+            inheritAttribute(args, expr, activityAtom);
+
             OwnedHqlExpr body = createWrapper(no_outofline, expr->queryType(), args);
             HqlExprArray newFormals;
             if (expr->hasAttribute(languageAtom))
@@ -13966,6 +13972,10 @@ public:
         case no_choosen:
             checkChoosen(expr);
             break;
+        case no_call:
+            if (callIsActivity(expr))
+                checkEmbedActivity(expr);
+            break;
         }
         QuickHqlTransformer::doAnalyse(expr);
     }
@@ -13974,6 +13984,7 @@ protected:
     void checkBloom(IHqlExpression * expr);
     void checkJoin(IHqlExpression * expr);
     void checkChoosen(IHqlExpression * expr);
+    void checkEmbedActivity(IHqlExpression * expr);
     void reportError(int errNo, const char * format, ...) __attribute__((format(printf, 3, 4)));
     void reportWarning(WarnErrorCategory category, int warnNo, const char * format, ...) __attribute__((format(printf, 4, 5)));
 protected:
@@ -14056,6 +14067,22 @@ void SemanticErrorChecker::checkChoosen(IHqlExpression * expr)
         reportWarning(CategoryUnusual, WRN_CHOOSEN_ALL,"Use CHOOSEN(dataset, ALL) to remove implicit choosen.  CHOOSEN(dataset, 0) now returns no records.");
 }
 
+void SemanticErrorChecker::checkEmbedActivity(IHqlExpression * call)
+{
+    //Substitute the parameters into the external call/embed definition, so that any attributes that depend on the arguments
+    //are expanded.
+    OwnedHqlExpr expandedCall = expandOutOfLineFunctionCall(call);
+
+    //The setting for the local attribute allows the localness to be configured for the activity
+    IHqlExpression * localAttr = expandedCall->queryAttribute(localAtom);
+    if (localAttr)
+    {
+        IHqlExpression * value = localAttr->queryChild(0);
+        if (value && !value->isConstant())
+            reportError(ECODETEXT(HQLERR_AttributeXMustBeConstant), "LOCAL");
+    }
+}
+
 void SemanticErrorChecker::reportError(int errNo, const char * format, ...)
 {
     ECLlocation location;

+ 14 - 0
ecl/hqlcpp/hqlwcpp.cpp

@@ -702,6 +702,13 @@ bool HqlCppWriter::generateFunctionPrototype(IHqlExpression * funcdef, const cha
         out.append("IMatchWalker * results");
         firstParam = false;
     }
+    if (functionBodyIsActivity(body))
+    {
+        if (!firstParam)
+            out.append(",");
+        out.append("IThorActivityContext * activity");
+        firstParam = false;
+    }
 
     if (returnParameters.length())
     {
@@ -1220,6 +1227,13 @@ StringBuffer & HqlCppWriter::generateExprCpp(IHqlExpression * expr)
                     out.append("gctx");
                     needComma = true;
                 }
+                if (functionBodyIsActivity(funcdef))
+                {
+                    if (needComma)
+                        out.append(",");
+                    out.append("activityContext");
+                    needComma = true;
+                }
                 for (unsigned index = firstArg; index < numArgs; index++)
                 {
                     IHqlExpression * cur = expr->queryChild(index);

+ 13 - 21
ecl/hthor/hthor.cpp

@@ -10178,24 +10178,23 @@ void CHThorStreamedIteratorActivity::stop()
 //=====================================================================================================
 
 CHThorExternalActivity::CHThorExternalActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &_arg, ThorActivityKind _kind, IPropertyTree * _graphNode) 
-: CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), graphNode(_graphNode)
+: CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), graphNode(_graphNode), activityContext(1, 0)
 {
 }
 
+void CHThorExternalActivity::setInput(unsigned index, IHThorInput *_input)
+{
+    CHThorMultiInputActivity::setInput(index, _input);
+    CHThorInputAdaptor * adaptedInput = new CHThorInputAdaptor(_input);
+    inputAdaptors.append(*adaptedInput);
+    helper.setInput(index, adaptedInput);
+}
+
 void CHThorExternalActivity::ready()
 {
     CHThorMultiInputActivity::ready();
-    //must be called after onStart()
-    processor.setown(helper.createProcessor());
-    processor->onCreate(agent.queryCodeContext(), graphNode);
-    ForEachItemIn(idx, inputs)
-    {
-        Owned<CHThorInputAdaptor> adaptedInput = new CHThorInputAdaptor(inputs.item(idx));
-        processor->addInput(idx, adaptedInput);
-    }
-    processor->start();
-    if (outputMeta.getMinRecordSize() > 0)
-        rows.setown(processor->createOutput(0));
+    if (kind != TAKexternalsink)
+        rows.setown(helper.createOutput(&activityContext));
 }
 
 const void *CHThorExternalActivity::nextRow()
@@ -10210,14 +10209,7 @@ const void *CHThorExternalActivity::nextRow()
 void CHThorExternalActivity::execute()
 {
     assertex(!rows);
-    processor->execute();
-}
-
-void CHThorExternalActivity::reset()
-{
-    rows.clear();
-    processor->reset();
-    processor.clear();
+    helper.execute(&activityContext);
 }
 
 void CHThorExternalActivity::stop()
@@ -10227,7 +10219,7 @@ void CHThorExternalActivity::stop()
         rows->stop();
         rows.clear();
     }
-    processor->stop();
+    CHThorMultiInputActivity::stop();
 }
 
 

+ 24 - 7
ecl/hthor/hthor.ipp

@@ -2878,24 +2878,41 @@ protected:
     IHThorInput * input;    // not currently a linkable interface
 };
 
+class SingleNodeActivityContext : public IThorActivityContext
+{
+public:
+    SingleNodeActivityContext(unsigned _numStrands, unsigned _curStrand) : strands(_numStrands), curStrand(_curStrand) { assertex(curStrand < strands); }
+
+    virtual bool isLocal() const override { return false; }
+    virtual unsigned numSlaves() const override { return 1; }
+    virtual unsigned numStrands() const override { return strands; }
+    virtual unsigned querySlave() const override { return 0; }
+    virtual unsigned queryStrand() const override { return curStrand; }
+protected:
+    unsigned strands;
+    unsigned curStrand;
+};
+
+
 class CHThorExternalActivity : public CHThorMultiInputActivity
 {
     IHThorExternalArg &helper;
-    Owned<IThorExternalRowProcessor> processor;
     Owned<IRowStream> rows;
     Linked<IPropertyTree> graphNode;
+    SingleNodeActivityContext activityContext;
+    IArrayOf<CHThorInputAdaptor> inputAdaptors;
 public:
     CHThorExternalActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &_arg, ThorActivityKind _kind, IPropertyTree * _graphNode);
 
-    virtual void ready();
-    virtual void stop();
-    virtual void reset();
+    virtual void ready() override;
+    virtual void stop() override;
 
-    virtual void execute();
+    virtual void execute() override;
 
-    virtual const void *nextRow();
+    virtual const void *nextRow() override;
+    virtual void setInput(unsigned index, IHThorInput *_input) override;
 
-    virtual bool isGrouped()                { return outputMeta.isGrouped(); }
+    virtual bool isGrouped() override { return outputMeta.isGrouped(); }
 };
 
 

+ 74 - 0
ecl/regress/embedactivity1err.ecl

@@ -0,0 +1,74 @@
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+streamed dataset(r) myDataset(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := EMBED(C++ : activity, local(isLocal), parallel(numParallel))
+static const char * const names[] = {"Gavin","John","Bart"};
+static const unsigned numNames = (unsigned)(sizeof(names) / sizeof(names[0]));
+#body
+    class MyStreamInlineDataset : public RtlCInterface, implements IRowStream
+    {
+    public:
+        MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, unsigned _first, unsigned _last)
+        : resultAllocator(_resultAllocator), first(_first), last(_last)
+        {
+            current = first;
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void *nextRow() override
+        {
+            if (current >= last)
+                return nullptr;
+
+            unsigned id = current++;
+            unsigned curName = id % numNames;
+            const char * name = names[curName];
+            size32_t lenName = strlen(name);
+
+            RtlDynamicRowBuilder rowBuilder(resultAllocator);
+            unsigned len = sizeof(__int64) + sizeof(size32_t) + lenName;
+            byte * row = rowBuilder.ensureCapacity(len, NULL);
+            *(__uint64 *)(row) = id;
+            *(size32_t *)(row + sizeof(__uint64)) = lenName;
+            memcpy(row+sizeof(__uint64)+sizeof(size32_t), name, lenName);
+            return rowBuilder.finalizeRowClear(len);
+        }
+        virtual void stop() override
+        {
+            current = (unsigned)-1;
+        }
+
+
+    protected:
+        Linked<IEngineRowAllocator> resultAllocator;
+        unsigned current;
+        unsigned first;
+        unsigned last;
+    };
+
+    unsigned numRows = numrows;
+    unsigned numSlaves = activity->numSlaves();
+    unsigned numParallel = numSlaves * activity->numStrands();
+    unsigned rowsPerPart = (numRows + numParallel - 1) / numParallel;
+    unsigned thisSlave = activity->querySlave();
+    unsigned thisIndex = thisSlave * activity->numStrands() + activity->queryStrand();
+    unsigned first = thisIndex * rowsPerPart;
+    unsigned last = first + rowsPerPart;
+    if (first > numRows)
+        first = numRows;
+    if (last > numRows)
+        last = numRows;
+
+    return new MyStreamInlineDataset(_resultAllocator, first, last);
+ENDEMBED;
+
+
+reallyLocal := true : stored('reallyLocal');
+
+//Global activity - fixed number of rows
+output(myDataset(10));
+
+//Local version of the activity
+output(count(myDataset(10, isLocal := reallyLocal)) = CLUSTERSIZE * 10);

+ 4 - 0
roxie/ccd/ccdquery.cpp

@@ -860,6 +860,10 @@ protected:
             return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKdistribution:
             return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
+        case TAKexternalprocess:
+        case TAKexternalsink:
+        case TAKexternalsource:
+            return createRoxieServerExternalActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
 
         // These are not required in Roxie for the time being - code generator should trap them
         case TAKchilddataset:

+ 142 - 3
roxie/ccd/ccdserver.cpp

@@ -2681,14 +2681,24 @@ public:
         sourceIdxArray[idx] = _sourceIdx;
     }
 
-    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override
+    virtual void connectInputStreams(bool consumerOrdered)
     {
         //There could be situations (e.g., NONEMPTY), where you might want to strand the activity.
-        for (unsigned i = 0; i < numInputs; i++)
-            streamArray[i] = connectSingleStream(ctx, inputArray[i], sourceIdxArray[i], junctionArray[i], consumerOrdered);
+        connectSingleInputStreams(consumerOrdered);
+        CRoxieServerActivity::connectInputStreams(consumerOrdered);
+    }
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override
+    {
         return CRoxieServerActivity::getOutputStreams(ctx, idx, streams, NULL, consumerOrdered, nullptr); // The input basesclass does not have an input
     }
 
+protected:
+    void connectSingleInputStreams(bool consumerOrdered)
+    {
+        for (unsigned i = 0; i < numInputs; i++)
+            streamArray[i] = connectSingleStream(ctx, inputArray[i], sourceIdxArray[i], junctionArray[i], consumerOrdered);
+    }
 };
 
 //=================================================================================
@@ -13609,6 +13619,135 @@ IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(unsigned _
 
 //=================================================================================
 
+class SingleNodeActivityContext : public IThorActivityContext
+{
+public:
+    SingleNodeActivityContext(unsigned _numStrands, unsigned _curStrand) : strands(_numStrands), curStrand(_curStrand) { assertex(curStrand < strands); }
+
+    virtual bool isLocal() const override { return false; }
+    virtual unsigned numSlaves() const override { return 1; }
+    virtual unsigned numStrands() const override { return strands; }
+    virtual unsigned querySlave() const override { return 0; }
+    virtual unsigned queryStrand() const override { return curStrand; }
+protected:
+    unsigned strands;
+    unsigned curStrand;
+};
+
+class CRoxieServerExternalActivity : public CRoxieServerMultiInputActivity
+{
+    Owned<IRowStream> rows;
+    SingleNodeActivityContext activityContext;
+
+public:
+    CRoxieServerExternalActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
+        : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), activityContext(1, 0)
+    {
+    }
+
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override
+    {
+        Owned<IStrandJunction> junction = CRoxieServerMultiInputActivity::getOutputStreams(ctx, idx, streams, consumerOptions, consumerOrdered, orderedCallbacks);
+        associateInputsWithHelper();
+        return junction.getClear();
+    }
+
+    virtual void connectInputStreams(bool consumerOrdered)
+    {
+        CRoxieServerMultiInputActivity::connectInputStreams(consumerOrdered);
+        associateInputsWithHelper();
+    }
+
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused);
+        if (factory->getKind() != TAKexternalsink)
+        {
+            IHThorExternalArg & helper = static_cast<IHThorExternalArg &>(basehelper);
+            rows.setown(helper.createOutput(&activityContext));
+        }
+    }
+
+    virtual void stop()
+    {
+        if (rows)
+        {
+            rows->stop();
+            rows.clear();
+        }
+        CRoxieServerMultiInputBaseActivity::stop();
+    }
+
+    virtual void reset()
+    {
+        rows.clear();
+    }
+
+    virtual const void * nextRow()
+    {
+        ActivityTimer t(totalCycles, timeActivities);
+        assertex(rows);
+        const void * next = rows->nextRow();
+        if (next)
+            processed++;
+        return next;
+    }
+
+    virtual void execute(unsigned parentExtractSize, const byte * parentExtract)
+    {
+        try
+        {
+            start(parentExtractSize, parentExtract, false);
+            assertex(!rows);
+            IHThorExternalArg & helper = static_cast<IHThorExternalArg &>(basehelper);
+            helper.execute(&activityContext);
+            stop();
+        }
+        catch (IException * E)
+        {
+            ctx->notifyAbort(E);
+            abort();
+            throw;
+        }
+    }
+
+protected:
+    void associateInputsWithHelper()
+    {
+        IHThorExternalArg & helper = static_cast<IHThorExternalArg &>(basehelper);
+        for (unsigned i = 0; i < numInputs; i++)
+            helper.setInput(i, streamArray[i]);
+    }
+};
+
+class CRoxieServerExternalActivityFactory : public CRoxieServerMultiInputFactory
+{
+    bool isRoot;
+public:
+    CRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot)
+        : CRoxieServerMultiInputFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode), isRoot(_isRoot)
+    {
+    }
+
+    virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
+    {
+        return new CRoxieServerExternalActivity(_ctx, this, _probeManager, numInputs());
+    }
+
+    virtual bool isSink() const
+    {
+        return (kind == TAKexternalsink) && isRoot;
+    }
+};
+
+IRoxieServerActivityFactory *createRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot)
+{
+    return new CRoxieServerExternalActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode, _isRoot);
+}
+
+//=================================================================================
+
 class CRoxieServerMergeActivity : public CRoxieServerMultiInputActivity
 {
     IHThorMergeArg &helper;

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -437,6 +437,7 @@ extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(u
 extern IRoxieServerActivityFactory *createRoxieServerDistributionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerPullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerTraceActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerExternalActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
 
 extern void throwRemoteException(IMessageUnpackCursor *extra);
 

+ 15 - 0
rtl/eclrtl/eclhelper_base.cpp

@@ -801,3 +801,18 @@ unsigned CThorTraceArg::getSample() { return 0; }
 unsigned CThorTraceArg::getSkip() { return 0; }
 const char *CThorTraceArg::getName() { return NULL; }
 
+
+//CThorExternalArg
+CThorExternalArg::CThorExternalArg(unsigned _numInputs)
+{
+    inputs = new IRowStream *[_numInputs];
+    for (unsigned i=0; i < _numInputs; i++)
+        inputs[i] = nullptr;
+}
+
+CThorExternalArg::~CThorExternalArg() { delete [] inputs; }
+
+IRowStream * CThorExternalArg::createOutput(IThorActivityContext * activityContext) { rtlFailUnexpected(); }
+void CThorExternalArg::execute(IThorActivityContext * activityContext) { rtlFailUnexpected(); }
+void CThorExternalArg::setInput(unsigned whichInput, IRowStream * input) { inputs[whichInput] = input; }
+

+ 2 - 2
rtl/eclrtl/rtlnewkey.hpp

@@ -91,7 +91,7 @@ public:
     {
         if (nextSeekIsGT())
         {
-            assertex(nextUnmatchedRange == -1U);
+            assertex(nextUnmatchedRange == (unsigned)-1);
             assertex(numMatched != filters.ordinality());
             unsigned i;
             int c = 0;
@@ -147,7 +147,7 @@ public:
     unsigned numFilterFields() const { return filters.ordinality(); }
     const RtlRow & queryRow() const { return currentRow; }
     bool setRowForward(const byte * row);
-    bool nextSeekIsGT() const { return (nextUnmatchedRange == -1U); }
+    bool nextSeekIsGT() const { return (nextUnmatchedRange == (unsigned)-1); }
     bool noMoreMatches() const { return eos; }
 
 protected:

+ 41 - 20
rtl/include/eclhelper.hpp

@@ -2716,26 +2716,6 @@ struct IHThorStreamedIteratorArg : public IHThorArg
 };
 
 
-
-interface IPropertyTree;
-interface IThorExternalRowProcessor : public IInterface
-{
-    virtual void onCreate(ICodeContext * ctx, IPropertyTree * graph) = 0;
-    virtual void addInput(unsigned idx, ITypedRowStream * input) = 0;
-    virtual IRowStream * createOutput(unsigned idx) = 0;
-    virtual void start() = 0;
-    virtual void execute() = 0;
-    virtual void stop() = 0;
-    virtual void reset() = 0;
-    virtual void onDestroy() = 0;
-};
-
-
-struct IHThorExternalArg : public IHThorArg
-{
-    virtual IThorExternalRowProcessor * createProcessor() = 0;
-};
-
 //------------------------- Dictionary stuff -------------------------
 
 interface IHThorHashLookupInfo
@@ -2772,6 +2752,47 @@ struct IHThorTraceArg : public IHThorArg
     virtual const char *getName() = 0;
 };
 
+//This interface is passed as an implicit parameter to the embed activity factory.  It allows the activity to determine
+//if it is being executed in a child query, is stranded and other useful information.
+interface IThorActivityContext
+{
+public:
+    virtual bool isLocal() const = 0;
+    virtual unsigned numSlaves() const = 0;
+    virtual unsigned numStrands() const = 0;
+    virtual unsigned querySlave() const = 0; // 0 based 0..numSlaves-1
+    virtual unsigned queryStrand() const = 0; // 0 based 0..numStrands-1
+};
+
+//MORE: How does is this extended to support onStart/onCreate
+//MORE: How is this extended to allow multiple outputs
+interface IHThorExternalArg : public IHThorArg
+{
+    virtual IRowStream * createOutput(IThorActivityContext * activityContext) = 0;
+    virtual void execute(IThorActivityContext * activityContext) = 0;
+    virtual void setInput(unsigned whichInput, IRowStream * input) = 0;
+};
+
+/*
+interface IPropertyTree;
+interface IThorExternalRowProcessor : public IInterface
+{
+    virtual void onCreate(ICodeContext * ctx, IPropertyTree * graph) = 0;
+    virtual void addInput(unsigned idx, ITypedRowStream * input) = 0;
+    virtual IRowStream * createOutput(unsigned idx) = 0;
+    virtual void start() = 0;
+    virtual void execute() = 0;
+    virtual void stop() = 0;
+    virtual void reset() = 0;
+    virtual void onDestroy() = 0;
+};
+
+
+struct IHThorExternalArg : public IHThorArg
+{
+    virtual IThorExternalRowProcessor * createProcessor() = 0;
+};
+*/
 
 //------------------------- Other stuff -------------------------
 

+ 14 - 0
rtl/include/eclhelper_base.hpp

@@ -1115,6 +1115,20 @@ class ECLRTL_API CThorStreamedIteratorArg : public CThorArgOf<IHThorStreamedIter
 {
 };
 
+class ECLRTL_API CThorExternalArg : public CThorArgOf<IHThorExternalArg>
+{
+public:
+    CThorExternalArg(unsigned _numInputs);
+    virtual ~CThorExternalArg();
+    virtual IRowStream * createOutput(IThorActivityContext * activityContext) override;
+    virtual void execute(IThorActivityContext * activityContext) override;
+    virtual void setInput(unsigned whichInput, IRowStream * input) override;
+
+protected:
+    IRowStream * * inputs;
+};
+
+
 //-- Full implementations of selective activities that don't ever require any access to the context.
 
 class ECLRTL_API CLibraryNullArg : public CThorNullArg

+ 85 - 0
testing/regress/ecl/embedactivity1.ecl

@@ -0,0 +1,85 @@
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+streamed dataset(r) myDataset(unsigned numRows, boolean isLocal = false, unsigned numParallel = 0) := EMBED(C++ : activity, local(isLocal), parallel(numParallel))
+static const char * const names[] = {"Gavin","John","Bart"};
+static const unsigned numNames = (unsigned)(sizeof(names) / sizeof(names[0]));
+#body
+    class MyStreamInlineDataset : public RtlCInterface, implements IRowStream
+    {
+    public:
+        MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, unsigned _first, unsigned _last)
+        : resultAllocator(_resultAllocator), first(_first), last(_last)
+        {
+            current = first;
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void *nextRow() override
+        {
+            if (current >= last)
+                return nullptr;
+
+            unsigned id = current++;
+            unsigned curName = id % numNames;
+            const char * name = names[curName];
+            size32_t lenName = strlen(name);
+
+            RtlDynamicRowBuilder rowBuilder(resultAllocator);
+            unsigned len = sizeof(__int64) + sizeof(size32_t) + lenName;
+            byte * row = rowBuilder.ensureCapacity(len, NULL);
+            *(__uint64 *)(row) = id;
+            *(size32_t *)(row + sizeof(__uint64)) = lenName;
+            memcpy(row+sizeof(__uint64)+sizeof(size32_t), name, lenName);
+            return rowBuilder.finalizeRowClear(len);
+        }
+        virtual void stop() override
+        {
+            current = (unsigned)-1;
+        }
+
+
+    protected:
+        Linked<IEngineRowAllocator> resultAllocator;
+        unsigned current;
+        unsigned first;
+        unsigned last;
+    };
+
+    unsigned numRows = numrows;
+    unsigned numSlaves = activity->numSlaves();
+    unsigned numParallel = numSlaves * activity->numStrands();
+    unsigned rowsPerPart = (numRows + numParallel - 1) / numParallel;
+    unsigned thisSlave = activity->querySlave();
+    unsigned thisIndex = thisSlave * activity->numStrands() + activity->queryStrand();
+    unsigned first = thisIndex * rowsPerPart;
+    unsigned last = first + rowsPerPart;
+    if (first > numRows)
+        first = numRows;
+    if (last > numRows)
+        last = numRows;
+
+    return new MyStreamInlineDataset(_resultAllocator, first, last);
+ENDEMBED;
+
+
+//Global activity - fixed number of rows
+output(myDataset(10));
+//Local version of the activity 
+output(count(myDataset(10, isLocal := true)) = CLUSTERSIZE * 10);
+
+//Check that stranding (if implemented) still generates unique records
+output(COUNT(DEDUP(myDataset(1000, numParallel := 5), id, ALL)));
+
+r2 := RECORD
+    UNSIGNED id;
+    DATASET(r) child;
+END;
+
+//Check that the activity can also be executed in a child query
+output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := myDataset(COUNTER))));
+
+//Test stranding inside a child query
+output(DATASET(10, TRANSFORM(r2, SELF.id := COUNTER; SELF.child := myDataset(COUNTER, NumParallel := 3))));

+ 33 - 0
testing/regress/ecl/embedactivity2.ecl

@@ -0,0 +1,33 @@
+r := RECORD
+    UNSIGNED id;
+    STRING name;
+END;
+
+traceDataset(streamed dataset(r) ds, boolean isLocal = false) := EMBED(C++ : activity, local(isLocal))
+#include <stdio.h>
+#body
+    for(;;)
+    {
+        const byte * next = (const byte *)ds->nextRow();
+        if (!next)
+        {
+            next = (const byte *)ds->nextRow();
+            if (!next)
+                return;
+        }
+
+        unsigned __int64 id = *(__uint64 *)(next);
+        size32_t lenName = *(size32_t *)(next + sizeof(__uint64));
+        const char * name = (char *)(next + sizeof(__uint64) + sizeof(size32_t));
+
+        printf("id(%u) name(%.*s)\n", (unsigned)id, lenName, name);
+        rtlReleaseRow(next);
+   }
+
+ENDEMBED;
+
+ds := DATASET([
+    {1,'GCH'},{2,'RKC'},{3,'Count Dracular'}, {4, 'Boris'}
+    ], r);
+
+traceDataset(ds);

+ 62 - 0
testing/regress/ecl/embedactivity3.ecl

@@ -0,0 +1,62 @@
+r := RECORD
+    UNSIGNED value;
+END;
+
+//This function takes two streamed inputs, and outputs the result of two values from the left multiply together and added to a row from the right
+
+streamed dataset(r) myDataset(streamed dataset(r) ds1, streamed dataset(r) ds2) := EMBED(C++ : activity)
+#include <stdio.h>
+#body
+    class MyStreamInlineDataset : public RtlCInterface, implements IRowStream
+    {
+    public:
+        MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, IRowStream * _ds1, IRowStream * _ds2)
+        : resultAllocator(_resultAllocator), ds1(_ds1), ds2(_ds2)
+        {
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void *nextRow() override
+        {
+            const byte * next1a = (const byte *)ds1->nextRow();
+            if (!next1a)
+                return nullptr;
+            const byte * next1b = (const byte *)ds1->nextRow();
+            const byte * next2 = (const byte *)ds2->nextRow();
+            if (!next1b || !next2)
+                rtlFailUnexpected();
+
+            unsigned __int64 value1a = *(const unsigned __int64 *)next1a;
+            unsigned __int64 value1b = *(const unsigned __int64 *)next1b;
+            unsigned __int64 value2 = *(const unsigned __int64 *)next2;
+            rtlReleaseRow(next1a);
+            rtlReleaseRow(next1b);
+            rtlReleaseRow(next2);
+            
+            unsigned __int64 result = value1a * value1b + value2;
+            RtlDynamicRowBuilder rowBuilder(resultAllocator);
+            byte * row = rowBuilder.getSelf();
+            *(__uint64 *)(row) = result;
+            return rowBuilder.finalizeRowClear(sizeof(unsigned __int64));
+        }
+        virtual void stop() override
+        {
+            ds1->stop();
+            ds2->stop();
+        }
+
+
+    protected:
+        Linked<IEngineRowAllocator> resultAllocator;
+        IRowStream * ds1;
+        IRowStream * ds2;
+    };
+
+    return new MyStreamInlineDataset(_resultAllocator, ds1, ds2);
+ENDEMBED;
+
+
+ds1 := DATASET([1,3,4,5,9,10,1,1], r);
+ds2 := DATASET([0,3,9,-1], r);
+
+output(myDataset(ds1, ds2));

+ 77 - 0
testing/regress/ecl/embedactivity4.ecl

@@ -0,0 +1,77 @@
+r := RECORD
+    UNSIGNED value;
+END;
+
+//This function takes four streamed inputs, and outputs the result of ds1*ds2+ds3*ds4
+
+streamed dataset(r) myDataset(streamed dataset(r) ds1, streamed dataset(r) ds2, streamed dataset(r) ds3, streamed dataset(r) ds4) := EMBED(C++ : activity)
+#include <stdio.h>
+#body
+    class MyStreamInlineDataset : public RtlCInterface, implements IRowStream
+    {
+    public:
+        MyStreamInlineDataset(IEngineRowAllocator * _resultAllocator, IRowStream * _ds1, IRowStream * _ds2, IRowStream * _ds3, IRowStream * _ds4)
+        : resultAllocator(_resultAllocator), ds1(_ds1), ds2(_ds2), ds3(_ds3), ds4(_ds4)
+        {
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void *nextRow() override
+        {
+            const byte * next1 = (const byte *)ds1->nextRow();
+            if (!next1)
+                return nullptr;
+            const byte * next2 = (const byte *)ds2->nextRow();
+            const byte * next3 = (const byte *)ds3->nextRow();
+            const byte * next4 = (const byte *)ds4->nextRow();
+            if (!next2 || !next3 || !next4)
+                rtlFailUnexpected();
+
+            unsigned __int64 value1 = *(const unsigned __int64 *)next1;
+            unsigned __int64 value2 = *(const unsigned __int64 *)next2;
+            unsigned __int64 value3 = *(const unsigned __int64 *)next3;
+            unsigned __int64 value4 = *(const unsigned __int64 *)next4;
+            rtlReleaseRow(next1);
+            rtlReleaseRow(next2);
+            rtlReleaseRow(next3);
+            rtlReleaseRow(next4);
+            
+            unsigned __int64 result = value1 * value2 + value3 * value4;
+            RtlDynamicRowBuilder rowBuilder(resultAllocator);
+            byte * row = rowBuilder.getSelf();
+            *(__uint64 *)(row) = result;
+            return rowBuilder.finalizeRowClear(sizeof(unsigned __int64));
+        }
+        virtual void stop() override
+        {
+            ds1->stop();
+            ds2->stop();
+            ds3->stop();
+            ds4->stop();
+        }
+
+    protected:
+        Linked<IEngineRowAllocator> resultAllocator;
+        IRowStream * ds1;
+        IRowStream * ds2;
+        IRowStream * ds3;
+        IRowStream * ds4;
+    };
+
+    return new MyStreamInlineDataset(_resultAllocator, ds1, ds2, ds3, ds4);
+ENDEMBED;
+
+
+ds1 := DATASET([1,3,4,5], r);
+ds2 := DATASET([1,2,3,4], r);
+ds3 := DATASET([0,1,1,2], r);
+ds4 := DATASET([9,8,7,6], r);
+
+//Global operation
+output(myDataset(ds1, ds2, ds3, ds4));
+
+//Execute within a child query
+mkDs(unsigned num, unsigned base) := DATASET(num, TRANSFORM(r, SELF.value := COUNTER+base));
+dsx := mkDs(10, 0);
+dsy := PROJECT(dsx, TRANSFORM(r, SELF.value := SUM(myDataset(mkDs(LEFT.value, 0), mkDs(LEFT.value, 1), mkDs(LEFT.value, 2), mkDs(LEFT.value, 3)), value)));
+output(dsy);

+ 42 - 0
testing/regress/ecl/key/embedactivity1.xml

@@ -0,0 +1,42 @@
+<Dataset name='Result 1'>
+ <Row><id>0</id><name>Gavin</name></Row>
+ <Row><id>1</id><name>John</name></Row>
+ <Row><id>2</id><name>Bart</name></Row>
+ <Row><id>3</id><name>Gavin</name></Row>
+ <Row><id>4</id><name>John</name></Row>
+ <Row><id>5</id><name>Bart</name></Row>
+ <Row><id>6</id><name>Gavin</name></Row>
+ <Row><id>7</id><name>John</name></Row>
+ <Row><id>8</id><name>Bart</name></Row>
+ <Row><id>9</id><name>Gavin</name></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>true</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>1000</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row><Row><id>8</id><name>Bart</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row><Row><id>8</id><name>Bart</name></Row><Row><id>9</id><name>Gavin</name></Row></child></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><id>1</id><child><Row><id>0</id><name>Gavin</name></Row></child></Row>
+ <Row><id>2</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row></child></Row>
+ <Row><id>3</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row></child></Row>
+ <Row><id>4</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row></child></Row>
+ <Row><id>5</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row></child></Row>
+ <Row><id>6</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row></child></Row>
+ <Row><id>7</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row></child></Row>
+ <Row><id>8</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row></child></Row>
+ <Row><id>9</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row><Row><id>8</id><name>Bart</name></Row></child></Row>
+ <Row><id>10</id><child><Row><id>0</id><name>Gavin</name></Row><Row><id>1</id><name>John</name></Row><Row><id>2</id><name>Bart</name></Row><Row><id>3</id><name>Gavin</name></Row><Row><id>4</id><name>John</name></Row><Row><id>5</id><name>Bart</name></Row><Row><id>6</id><name>Gavin</name></Row><Row><id>7</id><name>John</name></Row><Row><id>8</id><name>Bart</name></Row><Row><id>9</id><name>Gavin</name></Row></child></Row>
+</Dataset>

+ 1 - 0
testing/regress/ecl/key/embedactivity2.xml

@@ -0,0 +1 @@
+

+ 6 - 0
testing/regress/ecl/key/embedactivity3.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><value>3</value></Row>
+ <Row><value>23</value></Row>
+ <Row><value>99</value></Row>
+ <Row><value>0</value></Row>
+</Dataset>

+ 18 - 0
testing/regress/ecl/key/embedactivity4.xml

@@ -0,0 +1,18 @@
+<Dataset name='Result 1'>
+ <Row><value>1</value></Row>
+ <Row><value>14</value></Row>
+ <Row><value>19</value></Row>
+ <Row><value>32</value></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><value>14</value></Row>
+ <Row><value>40</value></Row>
+ <Row><value>82</value></Row>
+ <Row><value>144</value></Row>
+ <Row><value>230</value></Row>
+ <Row><value>344</value></Row>
+ <Row><value>490</value></Row>
+ <Row><value>672</value></Row>
+ <Row><value>894</value></Row>
+ <Row><value>1160</value></Row>
+</Dataset>

+ 1 - 0
thorlcr/activities/activitymasters_lcr.cmake

@@ -36,6 +36,7 @@ set (    SRCS
          diskwrite/thdiskwrite.cpp 
          distribution/thdistribution.cpp 
          enth/thenth.cpp 
+         external/thexternal.cpp
          fetch/thfetch.cpp 
          filter/thfilter.cpp 
          firstn/thfirstn.cpp 

+ 1 - 0
thorlcr/activities/activityslaves_lcr.cmake

@@ -37,6 +37,7 @@ set (    SRCS
          diskwrite/thdwslave.cpp 
          distribution/thdistributionslave.cpp 
          enth/thenthslave.cpp 
+         external/thexternalslave.cpp
          fetch/thfetchslave.cpp 
          filter/thfilterslave.cpp 
          firstn/thfirstnslave.cpp 

+ 27 - 0
thorlcr/activities/external/thexternal.cpp

@@ -0,0 +1,27 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "dasess.hpp"
+#include "dadfs.hpp"
+#include "thbufdef.hpp"
+#include "thexception.hpp"
+#include "thexternal.ipp"
+
+CActivityBase *createExternalActivityMaster(CMasterGraphElement *container)
+{
+    return new CMasterActivity(container);
+}

+ 25 - 0
thorlcr/activities/external/thexternal.ipp

@@ -0,0 +1,25 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef _THEXTERNAL_IPP
+#define _THEXTERNAL_IPP
+
+#include "thactivitymaster.ipp"
+
+CActivityBase *createExternalActivityMaster(CMasterGraphElement *info);
+
+#endif

+ 175 - 0
thorlcr/activities/external/thexternalslave.cpp

@@ -0,0 +1,175 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "jlib.hpp"
+#include "jset.hpp"
+#include "jqueue.tpp"
+#include "commonext.hpp"
+
+#include "thormisc.hpp"
+#include "thexception.hpp"
+#include "thbufdef.hpp"
+#include "thalloc.hpp"
+#include "eclrtl_imp.hpp"
+
+#include "slave.ipp"
+#include "thactivityutil.ipp"
+
+/////
+///////////////////
+//
+// CExternalSlaveActivity
+//
+
+class CNodeActivityContext : public IThorActivityContext
+{
+public:
+    CNodeActivityContext(bool _local, unsigned _numSlaves, unsigned _curSlave)
+    : local(_local), slaves(_local ? 1 : _numSlaves), curSlave(_local ? 0 : _curSlave)
+    {
+        assertex(curSlave < slaves);
+    }
+
+    virtual bool isLocal() const override { return local; }
+    virtual unsigned numSlaves() const override { return slaves; }
+    virtual unsigned numStrands() const override { return 1; }
+    virtual unsigned querySlave() const override { return curSlave; }
+    virtual unsigned queryStrand() const override { return 0; }
+protected:
+    unsigned slaves;
+    unsigned curSlave;
+    bool local;
+};
+
+class CExternalSlaveActivity : public CSlaveActivity
+{
+    IHThorExternalArg *helper;
+    Owned<IRowStream> rows;
+    CNodeActivityContext activityContext;
+    bool grouped;
+
+public:
+    CExternalSlaveActivity(CGraphElementBase *_container)
+        : CSlaveActivity(_container),
+          activityContext(container.queryLocalData() || container.queryOwner().isLocalChild(), container.queryCodeContext()->getNodes(), container.queryCodeContext()->getNodeNum())
+    {
+        grouped = container.queryGrouped();
+        helper = (IHThorExternalArg *) queryHelper();
+        setRequireInitData(false);
+        appendOutputLinked(this);
+    }
+    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    {
+        CSlaveActivity::setInputStream(index, _input, consumerOrdered);
+        helper->setInput(index, _input.stream);
+    }
+    virtual void start() override
+    {
+        //Cannot call base base start because that will error if > 1 input
+        startAllInputs();
+        dataLinkStart();
+        rows.setown(helper->createOutput(&activityContext));
+    }
+    virtual void stop() override
+    {
+        if (rows)
+        {
+            rows->stop();
+            rows.clear();
+        }
+        stopAllInputs();
+        dataLinkStop();
+    }
+    CATCH_NEXTROW()
+    {
+        assertex(rows);
+        ActivityTimer t(totalCycles, timeActivities);
+        OwnedConstThorRow row = rows->nextRow();
+        if (row)
+            dataLinkIncrement();
+        return row.getClear();
+    }
+    virtual bool isGrouped() const override
+    {
+        return grouped;
+    }
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    {
+        initMetaInfo(info);
+    }
+};
+
+
+CActivityBase *createExternalSlave(CGraphElementBase *container)
+{
+    return new CExternalSlaveActivity(container);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class CExternalSinkSlaveActivity : public ProcessSlaveActivity
+{
+    IHThorExternalArg *helper;
+    CNodeActivityContext activityContext;
+
+public:
+    CExternalSinkSlaveActivity(CGraphElementBase *_container)
+        : ProcessSlaveActivity(_container),
+          activityContext(container.queryLocalData() || container.queryOwner().isLocalChild(), container.queryCodeContext()->getNodes(), container.queryCodeContext()->getNodeNum())
+    {
+        helper = (IHThorExternalArg *) queryHelper();
+        setRequireInitData(false);
+        appendOutputLinked(this);
+    }
+    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    {
+        CSlaveActivity::setInputStream(index, _input, consumerOrdered);
+        helper->setInput(index, _input.stream);
+    }
+    virtual void start() override
+    {
+        startAllInputs();
+    }
+    virtual void process() override
+    {
+        start();
+        processed = THORDATALINK_STARTED;
+        try
+        {
+            helper->execute(&activityContext);
+        }
+        catch(CATCHALL)
+        {
+            ActPrintLog("APPLY: exception");
+            throw;
+        }
+    }
+    virtual void endProcess() override
+    {
+        if ((processed & THORDATALINK_STARTED) && !(processed & THORDATALINK_STOPPED))
+        {
+            stopAllInputs();
+            processed |= THORDATALINK_STOPPED;
+        }
+    }
+};
+
+
+CActivityBase *createExternalSinkSlave(CGraphElementBase *container)
+{
+    return new CExternalSinkSlaveActivity(container);
+}

+ 3 - 0
thorlcr/graph/thgraph.cpp

@@ -852,6 +852,9 @@ bool isGlobalActivity(CGraphElementBase &container)
         case TAKgraphloop:
         case TAKparallelgraphloop:
         case TAKloopdataset:
+        case TAKexternalsink:
+        case TAKexternalsource:
+        case TAKexternalprocess:
             return false;
 // dependent on local/grouped
         case TAKkeyeddistribute:

+ 7 - 1
thorlcr/graph/thgraphslave.cpp

@@ -274,9 +274,15 @@ void CSlaveActivity::start()
 
 void CSlaveActivity::startAllInputs()
 {
+    ActivityTimer s(totalCycles, timeActivities);
     ForEachItemIn(i, inputs)
     {
-        startInput(i);
+        try { startInput(i); }
+        catch (CATCHALL)
+        {
+            ActPrintLog("External(%" ACTPF "d): Error staring input %d", container.queryId(), i);
+            throw;
+        }
     }
 }
 

+ 6 - 0
thorlcr/master/thactivitymaster.cpp

@@ -47,6 +47,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 #include "diskwrite/thdiskwrite.ipp"
 #include "distribution/thdistribution.ipp"
 #include "enth/thenth.ipp"
+#include "external/thexternal.ipp"
 #include "filter/thfilter.ipp"
 #include "firstn/thfirstn.ipp"
 #include "funnel/thfunnel.ipp"
@@ -393,6 +394,11 @@ public:
             case TAKwhen_dataset:
                 ret = createWhenActivityMaster(this);
                 break;
+            case TAKexternalprocess:
+            case TAKexternalsink:
+            case TAKexternalsource:
+                ret = createExternalActivityMaster(this);
+                break;
             default:
                 throw MakeActivityException(this, TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind));
         }

+ 7 - 0
thorlcr/slave/slave.cpp

@@ -269,6 +269,8 @@ CActivityBase *createDictionaryWorkunitWriteSlave(CGraphElementBase *container);
 CActivityBase *createDictionaryResultWriteSlave(CGraphElementBase *container);
 CActivityBase *createTraceSlave(CGraphElementBase *container);
 CActivityBase *createIfActionSlave(CGraphElementBase *container);
+CActivityBase *createExternalSlave(CGraphElementBase *container);
+CActivityBase *createExternalSinkSlave(CGraphElementBase *container);
 
 
 class CGenericSlaveGraphElement : public CSlaveGraphElement
@@ -761,6 +763,11 @@ public:
             case TAKstreamediterator:
                 ret = createStreamedIteratorSlave(this);
                 break;
+            case TAKexternalprocess:
+            case TAKexternalsink:
+            case TAKexternalsource:
+                ret = createExternalSlave(this);
+                break;
             default:
                 throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind));
         }