Browse Source

HPCC-24214 Resolved eclagent threading issues for parallel graphs in hthor

Removed global variable 'activeGraph' in EclAgentWorkflowMachine
Added an argument to all HThorActivities to pass in the corresponding graph

Signed-off-by: nhalliday <nathanhallidaywork@gmail.com>
nhalliday 5 năm trước cách đây
mục cha
commit
bc60b7b381

+ 2 - 1
ecl/eclagent/agentctx.hpp

@@ -35,6 +35,7 @@
 #define WRN_MismatchCompressInfo            5405
 #define WRN_RemoteReadFailure               5406
 
+class EclGraph;
 struct IHThorGraphResult : extends IInterface
 {
     virtual void addRowOwn(const void * row) = 0;
@@ -100,7 +101,7 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual void reloadWorkUnit() = 0;
 
     virtual char *resolveName(const char *in, char *out, unsigned outlen) = 0;
-    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type) = 0;
+    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type, EclGraph & graph) = 0;
     virtual void addWuException(const char * text, unsigned code, unsigned severity, char const * source) = 0;
 
     virtual IHThorGraphResults * executeLibraryGraph(const char * libraryName, unsigned expectedInterfaceHash, unsigned activityId, const char * embeddedGraphName, const byte * parentExtract) = 0;

+ 8 - 4
ecl/eclagent/eclagent.cpp

@@ -706,8 +706,12 @@ void EclAgent::reloadWorkUnit()
 
 void EclAgent::abort()
 {
-    if (activeGraph)
-        activeGraph->abort();
+    //for each active graph, abort()
+    CriticalBlock thisBlock(activeGraphCritSec);
+    ForEachItemIn(i,activeGraphs)
+    {
+        activeGraphs.item(i)->abort();
+    }
 }
 
 RecordTranslationMode EclAgent::getLayoutTranslationMode() const
@@ -1357,7 +1361,7 @@ char * EclAgent::resolveName(const char *in, char *out, unsigned outlen)
     return NULL;
 }
 
-void EclAgent::logFileAccess(IDistributedFile * file, char const * component, char const * type)
+void EclAgent::logFileAccess(IDistributedFile * file, char const * component, char const * type, EclGraph & activeGraph)
 {
     const char * cluster = clusterNames.item(clusterNames.length()-1);
     LOG(MCauditInfo,
@@ -1368,7 +1372,7 @@ void EclAgent::logFileAccess(IDistributedFile * file, char const * component, ch
         ensureText(userid.get()),
         file->queryLogicalName(),
         wuid.get(),
-        activeGraph ? activeGraph->queryGraphName() : "");
+        activeGraph.queryGraphName());
 }
 
 bool EclAgent::expandLogicalName(StringBuffer & fullname, const char * logicalName)

+ 8 - 4
ecl/eclagent/eclagent.ipp

@@ -197,9 +197,9 @@ public:
     {
         return ctx->resolveName(in, out, outlen);
     }
-    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type)
+    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type, EclGraph & graph)
     {
-        ctx->logFileAccess(file, component, type);
+        ctx->logFileAccess(file, component, type, graph);
     }
     virtual void addWuException(const char * text, unsigned code, unsigned severity, char const * source)
     {
@@ -385,7 +385,8 @@ private:
     unsigned int clusterWidth;
     Owned<IDistributedFileTransaction> superfiletransaction;
     mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
-    Owned<EclGraph> activeGraph;
+    CriticalSection activeGraphCritSec;
+    PointerArrayOf<EclGraph> activeGraphs;
     Owned<CHThorDebugContext> debugContext;
     Owned<IProbeManager> probeManager;
     StringAttr allowedPipeProgs;
@@ -425,6 +426,9 @@ private:
     virtual bool getWorkunitResultFilename(StringBuffer & diskFilename, const char * wuid, const char * name, int seq);
     virtual IDebuggableContext *queryDebugContext() const { return debugContext; };
 
+    //protected by critical section
+    EclGraph * addGraph(const char * graphName);
+    void removeGraph(EclGraph * g);
     EclGraph * loadGraph(const char * graphName, IConstWorkUnit * wu, ILoadedDllEntry * dll, bool isLibrary);
     virtual bool forceNewDiskReadActivity() const { return useNewDiskReadActivity; }
 
@@ -584,7 +588,7 @@ public:
     virtual void getEventExtra(size32_t & outLen, char * & outStr, const char * tag);
     //virtual void logException(IEclException *e);
     virtual char *resolveName(const char *in, char *out, unsigned outlen);
-    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type);
+    virtual void logFileAccess(IDistributedFile * file, char const * component, char const * type, EclGraph & graph);
     virtual ILocalOrDistributedFile  *resolveLFN(const char *logicalName, const char *errorTxt, bool optional, bool noteRead, bool write, StringBuffer * expandedlfn, bool isPrivilegedUser);
 
     virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract);

+ 139 - 130
ecl/eclagent/eclgraph.cpp

@@ -48,177 +48,178 @@ using roxiemem::OwnedRoxieString;
 
 static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityId, unsigned subgraphId, unsigned graphId, ThorActivityKind kind, bool isLocal, bool isGrouped, IHThorArg & arg, IPropertyTree * node, EclGraphElement * graphElement)
 {
+    EclGraph & graph = graphElement->subgraph->parent;
     switch (kind)
     {
     case TAKdiskwrite:
     case TAKspillwrite:
-        return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind);
+        return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
     case TAKsort:
-        return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind);
+        return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind, graph);
     case TAKdedup:
-        return createGroupDedupActivity(agent, activityId, subgraphId, (IHThorDedupArg &)arg, kind);
+        return createGroupDedupActivity(agent, activityId, subgraphId, (IHThorDedupArg &)arg, kind, graph);
     case TAKfilter:
-        return createFilterActivity(agent, activityId, subgraphId, (IHThorFilterArg &)arg, kind);
+        return createFilterActivity(agent, activityId, subgraphId, (IHThorFilterArg &)arg, kind, graph);
     case TAKproject:
-        return createProjectActivity(agent, activityId, subgraphId, (IHThorProjectArg &)arg, kind);
+        return createProjectActivity(agent, activityId, subgraphId, (IHThorProjectArg &)arg, kind, graph);
     case TAKprefetchproject:
-        return createPrefetchProjectActivity(agent, activityId, subgraphId, (IHThorPrefetchProjectArg &)arg, kind);
+        return createPrefetchProjectActivity(agent, activityId, subgraphId, (IHThorPrefetchProjectArg &)arg, kind, graph);
     case TAKfilterproject :
-        return createFilterProjectActivity(agent, activityId, subgraphId, (IHThorFilterProjectArg &)arg, kind);
+        return createFilterProjectActivity(agent, activityId, subgraphId, (IHThorFilterProjectArg &)arg, kind, graph);
     case TAKrollup:
-        return createRollupActivity(agent, activityId, subgraphId, (IHThorRollupArg &)arg, kind);
+        return createRollupActivity(agent, activityId, subgraphId, (IHThorRollupArg &)arg, kind, graph);
     case TAKiterate:
-        return createIterateActivity(agent, activityId, subgraphId, (IHThorIterateArg &)arg, kind);
+        return createIterateActivity(agent, activityId, subgraphId, (IHThorIterateArg &)arg, kind, graph);
     case TAKaggregate:
     case TAKexistsaggregate:
     case TAKcountaggregate:
-        return createAggregateActivity(agent, activityId, subgraphId, (IHThorAggregateArg &)arg, kind);
+        return createAggregateActivity(agent, activityId, subgraphId, (IHThorAggregateArg &)arg, kind, graph);
     case TAKhashaggregate:
-        return createHashAggregateActivity(agent, activityId, subgraphId, (IHThorHashAggregateArg &)arg, kind, isGrouped);
+        return createHashAggregateActivity(agent, activityId, subgraphId, (IHThorHashAggregateArg &)arg, kind, graph, isGrouped);
     case TAKfirstn:
-        return createFirstNActivity(agent, activityId, subgraphId, (IHThorFirstNArg &)arg, kind);
+        return createFirstNActivity(agent, activityId, subgraphId, (IHThorFirstNArg &)arg, kind, graph);
     case TAKsample:
-        return createSampleActivity(agent, activityId, subgraphId, (IHThorSampleArg &)arg, kind);
+        return createSampleActivity(agent, activityId, subgraphId, (IHThorSampleArg &)arg, kind, graph);
     case TAKdegroup:
-        return createDegroupActivity(agent, activityId, subgraphId, (IHThorDegroupArg &)arg, kind);
+        return createDegroupActivity(agent, activityId, subgraphId, (IHThorDegroupArg &)arg, kind, graph);
     case TAKjoin:
     case TAKjoinlight:
     case TAKdenormalize:
     case TAKdenormalizegroup:
-        return createJoinActivity(agent, activityId, subgraphId, (IHThorJoinArg &)arg, kind);
+        return createJoinActivity(agent, activityId, subgraphId, (IHThorJoinArg &)arg, kind, graph);
     case TAKselfjoin:
     case TAKselfjoinlight:
-        return createSelfJoinActivity(agent, activityId, subgraphId, (IHThorJoinArg &)arg, kind);
+        return createSelfJoinActivity(agent, activityId, subgraphId, (IHThorJoinArg &)arg, kind, graph);
     case TAKkeyedjoin:
     case TAKkeyeddenormalize:
     case TAKkeyeddenormalizegroup:
-        return createKeyedJoinActivity(agent, activityId, subgraphId, (IHThorKeyedJoinArg &)arg, kind, node);
+        return createKeyedJoinActivity(agent, activityId, subgraphId, (IHThorKeyedJoinArg &)arg, kind, graph, node);
     case TAKlookupjoin:
     case TAKlookupdenormalize:
     case TAKlookupdenormalizegroup:
     case TAKsmartjoin:
     case TAKsmartdenormalize:
     case TAKsmartdenormalizegroup:
-        return createLookupJoinActivity(agent, activityId, subgraphId, (IHThorHashJoinArg &)arg, kind);
+        return createLookupJoinActivity(agent, activityId, subgraphId, (IHThorHashJoinArg &)arg, kind, graph);
     case TAKalljoin:
     case TAKalldenormalize:
     case TAKalldenormalizegroup:
-        return createAllJoinActivity(agent, activityId, subgraphId, (IHThorAllJoinArg &)arg, kind);
+        return createAllJoinActivity(agent, activityId, subgraphId, (IHThorAllJoinArg &)arg, kind, graph);
     case TAKgroup:
-        return createGroupActivity(agent, activityId, subgraphId, (IHThorGroupArg &)arg, kind);
+        return createGroupActivity(agent, activityId, subgraphId, (IHThorGroupArg &)arg, kind, graph);
     case TAKworkunitwrite:
-        return createWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorWorkUnitWriteArg &)arg, kind);
+        return createWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorWorkUnitWriteArg &)arg, kind, graph);
     case TAKdictionaryworkunitwrite:
-        return createDictionaryWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorDictionaryWorkUnitWriteArg &)arg, kind);
+        return createDictionaryWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorDictionaryWorkUnitWriteArg &)arg, kind, graph);
     case TAKfunnel:
-        return createConcatActivity(agent, activityId, subgraphId, (IHThorFunnelArg &)arg, kind);
+        return createConcatActivity(agent, activityId, subgraphId, (IHThorFunnelArg &)arg, kind, graph);
     case TAKapply:
-        return createApplyActivity(agent, activityId, subgraphId, (IHThorApplyArg &)arg, kind);
+        return createApplyActivity(agent, activityId, subgraphId, (IHThorApplyArg &)arg, kind, graph);
     case TAKinlinetable:
-        return createInlineTableActivity(agent, activityId, subgraphId, (IHThorInlineTableArg &)arg, kind);
+        return createInlineTableActivity(agent, activityId, subgraphId, (IHThorInlineTableArg &)arg, kind, graph);
     case TAKnormalize:
-        return createNormalizeActivity(agent, activityId, subgraphId, (IHThorNormalizeArg &)arg, kind);
+        return createNormalizeActivity(agent, activityId, subgraphId, (IHThorNormalizeArg &)arg, kind, graph);
     case TAKnormalizechild:
-        return createNormalizeChildActivity(agent, activityId, subgraphId, (IHThorNormalizeChildArg &)arg, kind);
+        return createNormalizeChildActivity(agent, activityId, subgraphId, (IHThorNormalizeChildArg &)arg, kind, graph);
     case TAKnormalizelinkedchild:
-        return createNormalizeLinkedChildActivity(agent, activityId, subgraphId, (IHThorNormalizeLinkedChildArg &)arg, kind);
+        return createNormalizeLinkedChildActivity(agent, activityId, subgraphId, (IHThorNormalizeLinkedChildArg &)arg, kind, graph);
     case TAKremoteresult:
-        return createRemoteResultActivity(agent, activityId, subgraphId, (IHThorRemoteResultArg &)arg, kind);
+        return createRemoteResultActivity(agent, activityId, subgraphId, (IHThorRemoteResultArg &)arg, kind, graph);
     case TAKselectn:
-        return createSelectNActivity(agent, activityId, subgraphId, (IHThorSelectNArg &)arg, kind);
+        return createSelectNActivity(agent, activityId, subgraphId, (IHThorSelectNArg &)arg, kind, graph);
     case TAKif:
-        return createIfActivity(agent, activityId, subgraphId, (IHThorIfArg &)arg, kind);
+        return createIfActivity(agent, activityId, subgraphId, (IHThorIfArg &)arg, kind, graph);
     case TAKchildif:
-        return createChildIfActivity(agent, activityId, subgraphId, (IHThorIfArg &)arg, kind);
+        return createChildIfActivity(agent, activityId, subgraphId, (IHThorIfArg &)arg, kind, graph);
     case TAKchildcase:
-        return createCaseActivity(agent, activityId, subgraphId, (IHThorCaseArg &)arg, kind);
+        return createCaseActivity(agent, activityId, subgraphId, (IHThorCaseArg &)arg, kind, graph);
     case TAKnull:
-        return createNullActivity(agent, activityId, subgraphId, (IHThorNullArg &)arg, kind);
+        return createNullActivity(agent, activityId, subgraphId, (IHThorNullArg &)arg, kind, graph);
     case TAKdistribution:
-        return createDistributionActivity(agent, activityId, subgraphId, (IHThorDistributionArg &)arg, kind);
+        return createDistributionActivity(agent, activityId, subgraphId, (IHThorDistributionArg &)arg, kind, graph);
     case TAKchoosesets:
-        return createChooseSetsActivity(agent, activityId, subgraphId, (IHThorChooseSetsArg &)arg, kind);
+        return createChooseSetsActivity(agent, activityId, subgraphId, (IHThorChooseSetsArg &)arg, kind, graph);
     case TAKpiperead:
-        return createPipeReadActivity(agent, activityId, subgraphId, (IHThorPipeReadArg &)arg, kind);
+        return createPipeReadActivity(agent, activityId, subgraphId, (IHThorPipeReadArg &)arg, kind, graph);
     case TAKpipewrite:
-        return createPipeWriteActivity(agent, activityId, subgraphId, (IHThorPipeWriteArg &)arg, kind);
+        return createPipeWriteActivity(agent, activityId, subgraphId, (IHThorPipeWriteArg &)arg, kind, graph);
     case TAKcsvwrite:
-        return createCsvWriteActivity(agent, activityId, subgraphId, (IHThorCsvWriteArg &)arg, kind);
+        return createCsvWriteActivity(agent, activityId, subgraphId, (IHThorCsvWriteArg &)arg, kind, graph);
     case TAKxmlwrite:
     case TAKjsonwrite:
-        return createXmlWriteActivity(agent, activityId, subgraphId, (IHThorXmlWriteArg &)arg, kind);
+        return createXmlWriteActivity(agent, activityId, subgraphId, (IHThorXmlWriteArg &)arg, kind, graph);
     case TAKpipethrough:
-        return createPipeThroughActivity(agent, activityId, subgraphId, (IHThorPipeThroughArg &)arg, kind);
+        return createPipeThroughActivity(agent, activityId, subgraphId, (IHThorPipeThroughArg &)arg, kind, graph);
     case TAKchoosesetsenth:
-        return createChooseSetsEnthActivity(agent, activityId, subgraphId, (IHThorChooseSetsExArg &)arg, kind);
+        return createChooseSetsEnthActivity(agent, activityId, subgraphId, (IHThorChooseSetsExArg &)arg, kind, graph);
     case TAKchoosesetslast:
-        return createChooseSetsLastActivity(agent, activityId, subgraphId, (IHThorChooseSetsExArg &)arg, kind);
+        return createChooseSetsLastActivity(agent, activityId, subgraphId, (IHThorChooseSetsExArg &)arg, kind, graph);
     case TAKfetch:
-        return createFetchActivity(agent, activityId, subgraphId, (IHThorFetchArg &)arg, kind, node);
+        return createFetchActivity(agent, activityId, subgraphId, (IHThorFetchArg &)arg, kind, graph, node);
     case TAKcsvfetch:
-        return createCsvFetchActivity(agent, activityId, subgraphId, (IHThorCsvFetchArg &)arg, kind, node);
+        return createCsvFetchActivity(agent, activityId, subgraphId, (IHThorCsvFetchArg &)arg, kind, graph, node);
     case TAKworkunitread:
-        return createWorkunitReadActivity(agent, activityId, subgraphId, (IHThorWorkunitReadArg &)arg, kind);
+        return createWorkunitReadActivity(agent, activityId, subgraphId, (IHThorWorkunitReadArg &)arg, kind, graph);
     case TAKspill:
-        return createSpillActivity(agent, activityId, subgraphId, (IHThorSpillArg &)arg, kind);
+        return createSpillActivity(agent, activityId, subgraphId, (IHThorSpillArg &)arg, kind, graph);
     case TAKlimit:
-        return createLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind);
+        return createLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind, graph);
     case TAKskiplimit:
-        return createSkipLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind);
+        return createSkipLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind, graph);
     case TAKcatch:
-        return createCatchActivity(agent, activityId, subgraphId, (IHThorCatchArg &)arg, kind);
+        return createCatchActivity(agent, activityId, subgraphId, (IHThorCatchArg &)arg, kind, graph);
     case TAKskipcatch:
     case TAKcreaterowcatch:
-        return createSkipCatchActivity(agent, activityId, subgraphId, (IHThorCatchArg &)arg, kind);
+        return createSkipCatchActivity(agent, activityId, subgraphId, (IHThorCatchArg &)arg, kind, graph);
     case TAKcountproject:
-        return createCountProjectActivity(agent, activityId, subgraphId, (IHThorCountProjectArg &)arg, kind);
+        return createCountProjectActivity(agent, activityId, subgraphId, (IHThorCountProjectArg &)arg, kind, graph);
     case TAKindexwrite:
-        return createIndexWriteActivity(agent, activityId, subgraphId, (IHThorIndexWriteArg &)arg, kind);
+        return createIndexWriteActivity(agent, activityId, subgraphId, (IHThorIndexWriteArg &)arg, kind, graph);
     case TAKparse:
-        return createParseActivity(agent, activityId, subgraphId, (IHThorParseArg &)arg, kind);
+        return createParseActivity(agent, activityId, subgraphId, (IHThorParseArg &)arg, kind, graph);
     case TAKsideeffect:
-        return createSideEffectActivity(agent, activityId, subgraphId, (IHThorSideEffectArg &)arg, kind);
+        return createSideEffectActivity(agent, activityId, subgraphId, (IHThorSideEffectArg &)arg, kind, graph);
     case TAKsimpleaction:
-        return createActionActivity(agent, activityId, subgraphId, (IHThorActionArg &)arg, kind);
+        return createActionActivity(agent, activityId, subgraphId, (IHThorActionArg &)arg, kind, graph);
     case TAKenth:
-        return createEnthActivity(agent, activityId, subgraphId, (IHThorEnthArg &)arg, kind);
+        return createEnthActivity(agent, activityId, subgraphId, (IHThorEnthArg &)arg, kind, graph);
     case TAKtopn:
-        return createTopNActivity(agent, activityId, subgraphId, (IHThorTopNArg &)arg, kind);
+        return createTopNActivity(agent, activityId, subgraphId, (IHThorTopNArg &)arg, kind, graph);
     case TAKxmlparse:
-        return createXmlParseActivity(agent, activityId, subgraphId, (IHThorXmlParseArg &)arg, kind);
+        return createXmlParseActivity(agent, activityId, subgraphId, (IHThorXmlParseArg &)arg, kind, graph);
     case TAKxmlfetch:
     case TAKjsonfetch:
-        return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind, node);
+        return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind, graph, node);
     case TAKmerge:
-        return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);
+        return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind, graph);
     case TAKhttp_rowdataset:
-        return createHttpRowCallActivity(agent, activityId, subgraphId, (IHThorHttpCallArg &)arg, kind);
+        return createHttpRowCallActivity(agent, activityId, subgraphId, (IHThorHttpCallArg &)arg, kind, graph);
     case TAKsoap_rowdataset:
-        return createSoapRowCallActivity(agent, activityId, subgraphId, (IHThorSoapCallArg &)arg, kind);
+        return createSoapRowCallActivity(agent, activityId, subgraphId, (IHThorSoapCallArg &)arg, kind, graph);
     case TAKsoap_rowaction:
-        return createSoapRowActionActivity(agent, activityId, subgraphId, (IHThorSoapActionArg &)arg, kind);
+        return createSoapRowActionActivity(agent, activityId, subgraphId, (IHThorSoapActionArg &)arg, kind, graph);
     case TAKsoap_datasetdataset:
-        return createSoapDatasetCallActivity(agent, activityId, subgraphId, (IHThorSoapCallArg &)arg, kind);
+        return createSoapDatasetCallActivity(agent, activityId, subgraphId, (IHThorSoapCallArg &)arg, kind, graph);
     case TAKsoap_datasetaction:
-        return createSoapDatasetActionActivity(agent, activityId, subgraphId, (IHThorSoapActionArg &)arg, kind);
+        return createSoapDatasetActionActivity(agent, activityId, subgraphId, (IHThorSoapActionArg &)arg, kind, graph);
     case TAKchilditerator:
-        return createChildIteratorActivity(agent, activityId, subgraphId, (IHThorChildIteratorArg &)arg, kind);
+        return createChildIteratorActivity(agent, activityId, subgraphId, (IHThorChildIteratorArg &)arg, kind, graph);
     case TAKlinkedrawiterator:
-        return createLinkedRawIteratorActivity(agent, activityId, subgraphId, (IHThorLinkedRawIteratorArg &)arg, kind);
+        return createLinkedRawIteratorActivity(agent, activityId, subgraphId, (IHThorLinkedRawIteratorArg &)arg, kind, graph);
     case TAKrowresult:
-        return createRowResultActivity(agent, activityId, subgraphId, (IHThorRowResultArg &)arg, kind);
+        return createRowResultActivity(agent, activityId, subgraphId, (IHThorRowResultArg &)arg, kind, graph);
     case TAKdatasetresult:
-        return createDatasetResultActivity(agent, activityId, subgraphId, (IHThorDatasetResultArg &)arg, kind);
+        return createDatasetResultActivity(agent, activityId, subgraphId, (IHThorDatasetResultArg &)arg, kind, graph);
     case TAKwhen_dataset:
     case TAKwhen_action:
-        return createWhenActionActivity(agent, activityId, subgraphId, (IHThorWhenActionArg &)arg, kind, graphElement);
+        return createWhenActionActivity(agent, activityId, subgraphId, (IHThorWhenActionArg &)arg, kind, graph, graphElement);
     case TAKsequential:
     case TAKparallel:
     case TAKemptyaction:
     case TAKifaction:
-        return createDummyActivity(agent, activityId, subgraphId, arg, kind);
+        return createDummyActivity(agent, activityId, subgraphId, arg, kind, graph);
     case TAKhashdedup:
-        return createHashDedupActivity(agent, activityId, subgraphId, (IHThorHashDedupArg &)arg, kind);
+        return createHashDedupActivity(agent, activityId, subgraphId, (IHThorHashDedupArg &)arg, kind, graph);
     case TAKhashdenormalize:
     case TAKhashdistribute:
     case TAKhashdistributemerge:
@@ -228,106 +229,106 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKsplit:
         throwUnexpected();  // Code generator should have removed or transformed
     case TAKchildnormalize:
-        return createChildNormalizeActivity(agent, activityId, subgraphId, (IHThorChildNormalizeArg &)arg, kind);
+        return createChildNormalizeActivity(agent, activityId, subgraphId, (IHThorChildNormalizeArg &)arg, kind, graph);
     case TAKchildaggregate:
-        return createChildAggregateActivity(agent, activityId, subgraphId, (IHThorChildAggregateArg &)arg, kind);
+        return createChildAggregateActivity(agent, activityId, subgraphId, (IHThorChildAggregateArg &)arg, kind, graph);
     case TAKchildgroupaggregate:
-        return createChildGroupAggregateActivity(agent, activityId, subgraphId, (IHThorChildGroupAggregateArg &)arg, kind);
+        return createChildGroupAggregateActivity(agent, activityId, subgraphId, (IHThorChildGroupAggregateArg &)arg, kind, graph);
     case TAKchildthroughnormalize:
-        return createChildThroughNormalizeActivity(agent, activityId, subgraphId, (IHThorChildThroughNormalizeArg &)arg, kind);
+        return createChildThroughNormalizeActivity(agent, activityId, subgraphId, (IHThorChildThroughNormalizeArg &)arg, kind, graph);
     case TAKdiskread:
     case TAKspillread:
-        return createDiskReadActivity(agent, activityId, subgraphId, (IHThorDiskReadArg &)arg, kind, node);
+        return createDiskReadActivity(agent, activityId, subgraphId, (IHThorDiskReadArg &)arg, kind, graph, node);
     case TAKnewdiskread:
-        return createNewDiskReadActivity(agent, activityId, subgraphId, (IHThorNewDiskReadArg &)arg, kind, node);
+        return createNewDiskReadActivity(agent, activityId, subgraphId, (IHThorNewDiskReadArg &)arg, kind, graph, node);
     case TAKdisknormalize:
-        return createDiskNormalizeActivity(agent, activityId, subgraphId, (IHThorDiskNormalizeArg &)arg, kind, node);
+        return createDiskNormalizeActivity(agent, activityId, subgraphId, (IHThorDiskNormalizeArg &)arg, kind, graph, node);
     case TAKdiskaggregate:
-        return createDiskAggregateActivity(agent, activityId, subgraphId, (IHThorDiskAggregateArg &)arg, kind, node);
+        return createDiskAggregateActivity(agent, activityId, subgraphId, (IHThorDiskAggregateArg &)arg, kind, graph, node);
     case TAKdiskcount:
-        return createDiskCountActivity(agent, activityId, subgraphId, (IHThorDiskCountArg &)arg, kind, node);
+        return createDiskCountActivity(agent, activityId, subgraphId, (IHThorDiskCountArg &)arg, kind, graph, node);
     case TAKdiskgroupaggregate:
-        return createDiskGroupAggregateActivity(agent, activityId, subgraphId, (IHThorDiskGroupAggregateArg &)arg, kind, node);
+        return createDiskGroupAggregateActivity(agent, activityId, subgraphId, (IHThorDiskGroupAggregateArg &)arg, kind, graph, node);
     case TAKindexread:
-        return createIndexReadActivity(agent, activityId, subgraphId, (IHThorIndexReadArg &)arg, kind, node);
+        return createIndexReadActivity(agent, activityId, subgraphId, (IHThorIndexReadArg &)arg, kind, graph, node);
     case TAKindexnormalize:
-        return createIndexNormalizeActivity(agent, activityId, subgraphId, (IHThorIndexNormalizeArg &)arg, kind, node);
+        return createIndexNormalizeActivity(agent, activityId, subgraphId, (IHThorIndexNormalizeArg &)arg, kind, graph, node);
     case TAKindexaggregate:
-        return createIndexAggregateActivity(agent, activityId, subgraphId, (IHThorIndexAggregateArg &)arg, kind, node);
+        return createIndexAggregateActivity(agent, activityId, subgraphId, (IHThorIndexAggregateArg &)arg, kind, graph, node);
     case TAKindexcount:
-        return createIndexCountActivity(agent, activityId, subgraphId, (IHThorIndexCountArg &)arg, kind, node);
+        return createIndexCountActivity(agent, activityId, subgraphId, (IHThorIndexCountArg &)arg, kind, graph, node);
     case TAKindexgroupaggregate:
     case TAKindexgroupexists:
     case TAKindexgroupcount:
-        return createIndexGroupAggregateActivity(agent, activityId, subgraphId, (IHThorIndexGroupAggregateArg &)arg, kind, node);
+        return createIndexGroupAggregateActivity(agent, activityId, subgraphId, (IHThorIndexGroupAggregateArg &)arg, kind, graph, node);
     case TAKchilddataset:
     case TAKthroughaggregate:
         UNIMPLEMENTED;
     case TAKcsvread:
-        return createCsvReadActivity(agent, activityId, subgraphId, (IHThorCsvReadArg &)arg, kind, node);
+        return createCsvReadActivity(agent, activityId, subgraphId, (IHThorCsvReadArg &)arg, kind, graph, node);
     case TAKxmlread:
     case TAKjsonread:
-        return createXmlReadActivity(agent, activityId, subgraphId, (IHThorXmlReadArg &)arg, kind, node);
+        return createXmlReadActivity(agent, activityId, subgraphId, (IHThorXmlReadArg &)arg, kind, graph, node);
     case TAKlocalresultread:
-        return createLocalResultReadActivity(agent, activityId, subgraphId, (IHThorLocalResultReadArg &)arg, kind, node->getPropInt("att[@name='_graphId']/@value"));
+        return createLocalResultReadActivity(agent, activityId, subgraphId, (IHThorLocalResultReadArg &)arg, kind, graph, node->getPropInt("att[@name='_graphId']/@value"));
     case TAKlocalresultwrite:
-        return createLocalResultWriteActivity(agent, activityId, subgraphId, (IHThorLocalResultWriteArg &)arg, kind, graphId);
+        return createLocalResultWriteActivity(agent, activityId, subgraphId, (IHThorLocalResultWriteArg &)arg, kind, graph, graphId);
     case TAKdictionaryresultwrite:
-        return createDictionaryResultWriteActivity(agent, activityId, subgraphId, (IHThorDictionaryResultWriteArg &)arg, kind, graphId);
+        return createDictionaryResultWriteActivity(agent, activityId, subgraphId, (IHThorDictionaryResultWriteArg &)arg, kind, graph, graphId);
     case TAKlocalresultspill:
-        return createLocalResultSpillActivity(agent, activityId, subgraphId, (IHThorLocalResultSpillArg &)arg, kind, graphId);
+        return createLocalResultSpillActivity(agent, activityId, subgraphId, (IHThorLocalResultSpillArg &)arg, kind, graph, graphId);
     case TAKcombine:
-        return createCombineActivity(agent, activityId, subgraphId, (IHThorCombineArg &)arg, kind);
+        return createCombineActivity(agent, activityId, subgraphId, (IHThorCombineArg &)arg, kind, graph);
     case TAKcombinegroup:
-        return createCombineGroupActivity(agent, activityId, subgraphId, (IHThorCombineGroupArg &)arg, kind);
+        return createCombineGroupActivity(agent, activityId, subgraphId, (IHThorCombineGroupArg &)arg, kind, graph);
     case TAKregroup:
-        return createRegroupActivity(agent, activityId, subgraphId, (IHThorRegroupArg &)arg, kind);
+        return createRegroupActivity(agent, activityId, subgraphId, (IHThorRegroupArg &)arg, kind, graph);
     case TAKrollupgroup:
-        return createRollupGroupActivity(agent, activityId, subgraphId, (IHThorRollupGroupArg &)arg, kind);
+        return createRollupGroupActivity(agent, activityId, subgraphId, (IHThorRollupGroupArg &)arg, kind, graph);
     case TAKfiltergroup:
-        return createFilterGroupActivity(agent, activityId, subgraphId, (IHThorFilterGroupArg &)arg, kind);
+        return createFilterGroupActivity(agent, activityId, subgraphId, (IHThorFilterGroupArg &)arg, kind, graph);
     case TAKloopcount:
     case TAKlooprow:
     case TAKloopdataset:
-        return createLoopActivity(agent, activityId, subgraphId, (IHThorLoopArg &)arg, kind);
+        return createLoopActivity(agent, activityId, subgraphId, (IHThorLoopArg &)arg, kind, graph);
     case TAKgraphloop:
-        return createGraphLoopActivity(agent, activityId, subgraphId, (IHThorGraphLoopArg &)arg, kind);
+        return createGraphLoopActivity(agent, activityId, subgraphId, (IHThorGraphLoopArg &)arg, kind, graph);
     case TAKgraphloopresultread:
-        return createGraphLoopResultReadActivity(agent, activityId, subgraphId, (IHThorGraphLoopResultReadArg &)arg, kind, graphId);
+        return createGraphLoopResultReadActivity(agent, activityId, subgraphId, (IHThorGraphLoopResultReadArg &)arg, kind, graph, graphId);
     case TAKgraphloopresultwrite:
-        return createGraphLoopResultWriteActivity(agent, activityId, subgraphId, (IHThorGraphLoopResultWriteArg &)arg, kind, graphId);
+        return createGraphLoopResultWriteActivity(agent, activityId, subgraphId, (IHThorGraphLoopResultWriteArg &)arg, kind, graph, graphId);
     case TAKprocess:
-        return createProcessActivity(agent, activityId, subgraphId, (IHThorProcessArg &)arg, kind);
+        return createProcessActivity(agent, activityId, subgraphId, (IHThorProcessArg &)arg, kind, graph);
     case TAKlibrarycall:
-        return createLibraryCallActivity(agent, activityId, subgraphId, (IHThorLibraryCallArg &)arg, kind, node);
+        return createLibraryCallActivity(agent, activityId, subgraphId, (IHThorLibraryCallArg &)arg, kind, graph, node);
     case TAKsorted:
-        return createSortedActivity(agent, activityId, subgraphId, (IHThorSortedArg &)arg, kind);
+        return createSortedActivity(agent, activityId, subgraphId, (IHThorSortedArg &)arg, kind, graph);
     case TAKtrace:
-        return createTraceActivity(agent, activityId, subgraphId, (IHThorTraceArg &)arg, kind);
+        return createTraceActivity(agent, activityId, subgraphId, (IHThorTraceArg &)arg, kind, graph);
     case TAKgrouped:
-        return createGroupedActivity(agent, activityId, subgraphId, (IHThorGroupedArg &)arg, kind);
+        return createGroupedActivity(agent, activityId, subgraphId, (IHThorGroupedArg &)arg, kind, graph);
     case TAKnwayjoin:
-        return createNWayJoinActivity(agent, activityId, subgraphId, (IHThorNWayMergeJoinArg &)arg, kind);
+        return createNWayJoinActivity(agent, activityId, subgraphId, (IHThorNWayMergeJoinArg &)arg, kind, graph);
     case TAKnwaymerge:
-        return createNWayMergeActivity(agent, activityId, subgraphId, (IHThorNWayMergeArg &)arg, kind);
+        return createNWayMergeActivity(agent, activityId, subgraphId, (IHThorNWayMergeArg &)arg, kind, graph);
     case TAKnwaymergejoin:
-        return createNWayMergeJoinActivity(agent, activityId, subgraphId, (IHThorNWayMergeJoinArg &)arg, kind);
+        return createNWayMergeJoinActivity(agent, activityId, subgraphId, (IHThorNWayMergeJoinArg &)arg, kind, graph);
     case TAKnwayinput:
-        return createNWayInputActivity(agent, activityId, subgraphId, (IHThorNWayInputArg &)arg, kind);
+        return createNWayInputActivity(agent, activityId, subgraphId, (IHThorNWayInputArg &)arg, kind, graph);
     case TAKnwayselect:
-        return createNWaySelectActivity(agent, activityId, subgraphId, (IHThorNWaySelectArg &)arg, kind);
+        return createNWaySelectActivity(agent, activityId, subgraphId, (IHThorNWaySelectArg &)arg, kind, graph);
     case TAKnwaygraphloopresultread:
-        return createNWayGraphLoopResultReadActivity(agent, activityId, subgraphId, (IHThorNWayGraphLoopResultReadArg &)arg, kind, graphId);
+        return createNWayGraphLoopResultReadActivity(agent, activityId, subgraphId, (IHThorNWayGraphLoopResultReadArg &)arg, kind, graph, graphId);
     case TAKnonempty:
-        return createNonEmptyActivity(agent, activityId, subgraphId, (IHThorNonEmptyArg &)arg, kind);
+        return createNonEmptyActivity(agent, activityId, subgraphId, (IHThorNonEmptyArg &)arg, kind, graph);
     case TAKcreaterowlimit:
-        return createOnFailLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind);
+        return createOnFailLimitActivity(agent, activityId, subgraphId, (IHThorLimitArg &)arg, kind, graph);
     case TAKexternalsource:
     case TAKexternalsink:
     case TAKexternalprocess:
-        return createExternalActivity(agent, activityId, subgraphId, (IHThorExternalArg &)arg, kind, node);
+        return createExternalActivity(agent, activityId, subgraphId, (IHThorExternalArg &)arg, kind, graph, node);
     case TAKstreamediterator:
-        return createStreamedIteratorActivity(agent, activityId, subgraphId, (IHThorStreamedIteratorArg &)arg, kind);
+        return createStreamedIteratorActivity(agent, activityId, subgraphId, (IHThorStreamedIteratorArg &)arg, kind, graph);
     }
     throw MakeStringException(-1, "UNIMPLEMENTED activity '%s'(kind=%d) at %s(%d)", activityKindStr(kind), kind, sanitizeSourceFile(__FILE__), __LINE__);
 }
@@ -1568,7 +1569,18 @@ void EclAgent::updateWULogfile()
         }
     }
 }
-
+EclGraph * EclAgent::addGraph(const char * graphName)
+{
+    CriticalBlock thisBlock(activeGraphCritSec);
+    EclGraph * graphPtr = loadGraph(graphName, queryWorkUnit(), dll, false);
+    activeGraphs.append(graphPtr);
+    return graphPtr;
+}
+void EclAgent::removeGraph(EclGraph * g)
+{
+    CriticalBlock thisBlock(activeGraphCritSec);
+    activeGraphs.zap(g);
+}
 void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
 {
     assertex(parentExtractSize == 0);
@@ -1580,6 +1592,7 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
     }
     else
     {
+        Owned<EclGraph> activeGraph;
         try
         {
             PROGLOG("Executing hthor graph %s", graphName);
@@ -1590,8 +1603,7 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
                 probeManager.setown(createDebugManager(debugContext, graphName));
                 debugContext->checkBreakpoint(DebugStateGraphCreate, NULL, graphName);
             }
-
-            activeGraph.setown(loadGraph(graphName, queryWorkUnit(), dll, false));
+            activeGraph.setown(addGraph(graphName));
             unsigned guillotineTimeout = queryWorkUnit()->getDebugValueInt("maxRunTime", 0);
             if (guillotineTimeout)
                 abortmonitor->setGuillotineTimeout(guillotineTimeout);
@@ -1605,7 +1617,7 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
                 abortmonitor->setGuillotineTimeout(0);
             if (debugContext)
                 debugContext->checkBreakpoint(DebugStateGraphEnd, NULL, graphName);
-            activeGraph.clear();
+            removeGraph(activeGraph.get());
             if (debugContext)
             {
                 if (isAborting)
@@ -1615,19 +1627,19 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
         catch (WorkflowException *e)
         {
             EXCLOG(e,"EclAgent::executeGraph");
-            activeGraph.clear();
+            removeGraph(activeGraph.get());
             throw;
         }
         catch (IException *e)
         {
             EXCLOG(e,"EclAgent::executeGraph");
-            activeGraph.clear();
+            removeGraph(activeGraph.get());
             throw;
         }
         catch (...)
         {
             PROGLOG("EclAgent::executeGraph unknown exception");
-            activeGraph.clear();
+            removeGraph(activeGraph.get());
             throw;
         }
     }
@@ -1635,8 +1647,8 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
 
 IHThorGraphResults * EclAgent::executeLibraryGraph(const char * libraryName, unsigned expectedInterfaceHash, unsigned activityId, const char * embeddedGraphName, const byte * parentExtract)
 {
-    Linked<EclGraph> savedGraph = activeGraph.get();
-
+    //Linked<EclGraph> savedGraph = activeGraph.get();
+    Owned<EclGraph> activeGraph;
     try
     {
         EclAgentQueryLibrary * library = loadEclLibrary(libraryName, expectedInterfaceHash, embeddedGraphName);
@@ -1645,13 +1657,10 @@ IHThorGraphResults * EclAgent::executeLibraryGraph(const char * libraryName, uns
 
         activeGraph.set(library->graph);
         activeGraph->executeLibrary(parentExtract, libraryResults);
-        activeGraph.set(savedGraph);
-
         return libraryResults.getClear();
     }
     catch (...)
     {
-        activeGraph.set(savedGraph);
         throw;
     }
 }

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 175 - 175
ecl/hthor/hthor.cpp


+ 121 - 121
ecl/hthor/hthor.hpp

@@ -88,127 +88,127 @@ struct IHThorActivity : implements IActivityBase
 };
 
 
-extern HTHOR_API IHThorActivity *createDiskWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createIterateActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorIterateArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createFilterActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createFilterGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterGroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorAggregateArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createRollupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRollupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorProjectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createPrefetchProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPrefetchProjectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createFilterProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterProjectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createGroupDedupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDedupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createHashDedupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorHashDedupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createGroupSortActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSortArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorJoinArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSelfJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorJoinArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createLookupJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorHashJoinArg & arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createAllJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorAllJoinArg & arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorWorkUnitWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createDictionaryWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryWorkUnitWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createFirstNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFirstNArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createInlineTableActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorInlineTableArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createConcatActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFunnelArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createApplyActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorApplyArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSampleActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSampleArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createDegroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDegroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNormalizeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNormalizeChildActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeChildArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNormalizeLinkedChildActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeLinkedChildArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createDistributionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDistributionArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createRemoteResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRemoteResultArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChooseSetsActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChooseSetsLastActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChooseSetsEnthActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createWorkunitReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorWorkunitReadArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createPipeReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeReadArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createPipeWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCsvWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCsvWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createXmlWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createPipeThroughActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeThroughArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChildIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &arg, ThorActivityKind kind, bool _isGroupedHashAggregate);
-extern HTHOR_API IHThorActivity *createNullActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNullArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSideEffectActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSideEffectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorActionArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSelectNActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSelectNArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSpillActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSpillArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSkipLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCatchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCatchArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSkipCatchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCatchArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createOnFailLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCountProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCountProjectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createIndexWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorIndexWriteArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createParseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorParseArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createEnthActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorEnthArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createTopNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTopNArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createXmlParseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlParseArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createMergeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorMergeArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createHttpRowCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHttpCallArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSoapRowCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapCallArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSoapRowActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapActionArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSoapDatasetCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapCallArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createSoapDatasetActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapActionArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChildIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChildIteratorArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createRowResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRowResultArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createDatasetResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDatasetResultArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createDummyActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createWhenActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &arg, ThorActivityKind kind, EclGraphElement * _graphElement);
-extern HTHOR_API IHThorActivity *createChildNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildNormalizeArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChildAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildAggregateArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChildGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildGroupAggregateArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createChildThroughNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildThroughNormalizeArg &arg, ThorActivityKind kind);
-
-extern HTHOR_API IHThorActivity *createDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &arg, ThorActivityKind kind, IPropertyTree *node);
-extern HTHOR_API IHThorActivity *createDiskNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskNormalizeArg &arg, ThorActivityKind kind, IPropertyTree *node);
-extern HTHOR_API IHThorActivity *createDiskAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskAggregateArg &arg, ThorActivityKind kind, IPropertyTree *node);
-extern HTHOR_API IHThorActivity *createDiskCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskCountArg &arg, ThorActivityKind kind, IPropertyTree *node);
-extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskGroupAggregateArg &arg, ThorActivityKind kind, IPropertyTree *node);
-
-extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, IPropertyTree *node);
-
-extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createCsvReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCsvReadArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createXmlReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlReadArg &arg, ThorActivityKind kind, IPropertyTree *_node);
-extern HTHOR_API IHThorActivity *createLocalResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultReadArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createLocalResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultWriteArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createLocalResultSpillActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultSpillArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createDictionaryResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryResultWriteArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createCombineActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCombineArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createRollupGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRollupGroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createRegroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRegroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCombineGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCombineGroupArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createLoopActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLoopArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createGraphLoopActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createCaseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCaseArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createLinkedRawIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLinkedRawIteratorArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createProcessActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorProcessArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createLibraryCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLibraryCallArg &arg, ThorActivityKind kind, IPropertyTree * node);
-extern HTHOR_API IHThorActivity *createGraphLoopResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopResultReadArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createGraphLoopResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopResultWriteArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createSortedActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSortedArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createTraceActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTraceArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createGroupedActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGroupedArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNWayInputActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayInputArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNWayGraphLoopResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayGraphLoopResultReadArg &arg, ThorActivityKind kind, __int64 graphId);
-extern HTHOR_API IHThorActivity *createNWayMergeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNWaySelectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWaySelectArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createNonEmptyActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNonEmptyArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createStreamedIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorStreamedIteratorArg &arg, ThorActivityKind kind);
-extern HTHOR_API IHThorActivity *createExternalActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &arg, ThorActivityKind kind, IPropertyTree * graphNode);
+extern HTHOR_API IHThorActivity *createDiskWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createIterateActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorIterateArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createFilterActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createFilterGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterGroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createRollupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRollupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorProjectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createPrefetchProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPrefetchProjectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createFilterProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFilterProjectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createGroupDedupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDedupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createHashDedupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorHashDedupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createGroupSortActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSortArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorJoinArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSelfJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorJoinArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createLookupJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorHashJoinArg & arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createAllJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorAllJoinArg & arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorWorkUnitWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createDictionaryWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryWorkUnitWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createFirstNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFirstNArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createInlineTableActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorInlineTableArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createConcatActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFunnelArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createApplyActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorApplyArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSampleActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSampleArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createDegroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDegroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNormalizeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNormalizeChildActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeChildArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNormalizeLinkedChildActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNormalizeLinkedChildArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createDistributionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDistributionArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createRemoteResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRemoteResultArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChooseSetsActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChooseSetsLastActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChooseSetsEnthActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createWorkunitReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorWorkunitReadArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createPipeReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeReadArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createPipeWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCsvWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCsvWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createXmlWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createPipeThroughActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorPipeThroughArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChildIfActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIfArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createHashAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHashAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph, bool _isGroupedHashAggregate);
+extern HTHOR_API IHThorActivity *createNullActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNullArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSideEffectActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSideEffectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorActionArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSelectNActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSelectNArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSpillActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSpillArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSkipLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCatchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCatchArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSkipCatchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCatchArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createOnFailLimitActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLimitArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCountProjectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCountProjectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createIndexWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorIndexWriteArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createParseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorParseArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createEnthActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorEnthArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createTopNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTopNArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createXmlParseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlParseArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createMergeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorMergeArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createHttpRowCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorHttpCallArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSoapRowCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapCallArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSoapRowActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapActionArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSoapDatasetCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapCallArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createSoapDatasetActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorSoapActionArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChildIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorChildIteratorArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createRowResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRowResultArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createDatasetResultActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDatasetResultArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createDummyActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createWhenActionActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &arg, ThorActivityKind kind, EclGraph & _graph, EclGraphElement * _graphElement);
+extern HTHOR_API IHThorActivity *createChildNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChildAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChildGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildGroupAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createChildThroughNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildThroughNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph);
+
+extern HTHOR_API IHThorActivity *createDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+extern HTHOR_API IHThorActivity *createDiskNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+extern HTHOR_API IHThorActivity *createDiskAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+extern HTHOR_API IHThorActivity *createDiskCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskCountArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskGroupAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+
+extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
+
+extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createCsvReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCsvReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createXmlReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorXmlReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
+extern HTHOR_API IHThorActivity *createLocalResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultReadArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createLocalResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createLocalResultSpillActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLocalResultSpillArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createDictionaryResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryResultWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createCombineActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCombineArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createRollupGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRollupGroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createRegroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorRegroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCombineGroupActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCombineGroupArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createLoopActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLoopArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createGraphLoopActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createCaseActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCaseArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createLinkedRawIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorLinkedRawIteratorArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createProcessActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorProcessArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createLibraryCallActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorLibraryCallArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);
+extern HTHOR_API IHThorActivity *createGraphLoopResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopResultReadArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createGraphLoopResultWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGraphLoopResultWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createSortedActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorSortedArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createTraceActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTraceArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createGroupedActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorGroupedArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNWayInputActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayInputArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNWayGraphLoopResultReadActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayGraphLoopResultReadArg &arg, ThorActivityKind kind, EclGraph & _graph, __int64 graphId);
+extern HTHOR_API IHThorActivity *createNWayMergeActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & arg, ThorActivityKind kin, EclGraph & _graphd);
+extern HTHOR_API IHThorActivity *createNWaySelectActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNWaySelectArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createNonEmptyActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorNonEmptyArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createStreamedIteratorActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorStreamedIteratorArg &arg, ThorActivityKind kind, EclGraph & _graph);
+extern HTHOR_API IHThorActivity *createExternalActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorExternalArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * graphNode);
 
 #define OwnedHThorRowArray OwnedRowArray
 class HTHOR_API IHThorException : public IException

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 127 - 125
ecl/hthor/hthor.ipp


+ 53 - 53
ecl/hthor/hthorkey.cpp

@@ -148,7 +148,7 @@ void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
 class CHThorNullAggregateActivity : public CHThorNullActivity
 {
 public:
-    CHThorNullAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, IHThorCompoundAggregateExtra &_extra, ThorActivityKind _kind) : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind), helper(_extra) {}
+    CHThorNullAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, IHThorCompoundAggregateExtra &_extra, ThorActivityKind _kind, EclGraph & _graph) : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_extra) {}
 
     //interface IHThorInput
     virtual void ready();
@@ -190,8 +190,8 @@ const void *CHThorNullAggregateActivity::nextRow()
 class CHThorNullCountActivity : public CHThorNullActivity
 {
 public:
-    CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind)
-        : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind), finished(false) {}
+    CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind, EclGraph & _graph)
+        : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind, _graph), finished(false) {}
 
     //interface IHThorInput
     virtual void ready();
@@ -233,7 +233,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
 {
 
 public:
-    CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
     ~CHThorIndexReadActivityBase();
 
     virtual void ready();
@@ -350,8 +350,8 @@ protected:
 
 };
 
-CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-    : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
+CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+    : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
 {
     nextPartNumber = 0;
 
@@ -401,7 +401,7 @@ void CHThorIndexReadActivityBase::resolveIndexFilename()
     }
     else
     {
-        agent.logFileAccess(df, "HThor", "READ");
+        agent.logFileAccess(df, "HThor", "READ", graph);
         enterSingletonSuperfiles(df);
 
         singlePart = false;
@@ -790,7 +790,7 @@ class CHThorIndexReadActivity : public CHThorIndexReadActivityBase
 {
 
 public:
-    CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
     ~CHThorIndexReadActivity();
 
     //interface IHThorInput
@@ -823,8 +823,8 @@ protected:
     bool keyedLimitRowCreated;
 };
 
-CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg)
+CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
 {
     limitTransformExtra = &helper;
     steppedExtra = helper.querySteppingExtra();
@@ -1116,9 +1116,9 @@ IInputSteppingMeta * CHThorIndexReadActivity::querySteppingMeta()
 }
 
 
-extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //-------------------------------------------------------------------------------------------------------------
@@ -1128,7 +1128,7 @@ class CHThorIndexNormalizeActivity : public CHThorIndexReadActivityBase
 {
 
 public:
-    CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
     ~CHThorIndexNormalizeActivity();
 
     virtual void ready();
@@ -1151,7 +1151,7 @@ protected:
 };
 
 
-CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
+CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), outBuilder(NULL)
 {
     limitTransformExtra = &helper;
     keyedLimit = (unsigned __int64)-1;
@@ -1290,9 +1290,9 @@ const void * CHThorIndexNormalizeActivity::createNextRow()
 
 }
 
-extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //-------------------------------------------------------------------------------------------------------------
@@ -1302,7 +1302,7 @@ class CHThorIndexAggregateActivity : public CHThorIndexReadActivityBase
 {
 
 public:
-    CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
     ~CHThorIndexAggregateActivity();
 
     //interface IHThorInput
@@ -1322,8 +1322,8 @@ protected:
 };
 
 
-CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
+CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), outBuilder(NULL)
 {
 }
 
@@ -1396,9 +1396,9 @@ const void *CHThorIndexAggregateActivity::nextRow()
 }
 
 
-extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //-------------------------------------------------------------------------------------------------------------
@@ -1411,7 +1411,7 @@ class CHThorIndexCountActivity : public CHThorIndexReadActivityBase
     unsigned __int64 rowLimit = (unsigned __int64)-1;
 
 public:
-    CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
 
     //interface IHThorInput
     virtual void ready();
@@ -1427,8 +1427,8 @@ protected:
 };
 
 
-CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg)
+CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+    : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
 {
     choosenLimit = (unsigned __int64)-1;
     finished = false;
@@ -1541,9 +1541,9 @@ const void *CHThorIndexCountActivity::nextRow()
 }
 
 
-extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //-------------------------------------------------------------------------------------------------------------
@@ -1552,7 +1552,7 @@ class CHThorIndexGroupAggregateActivity : public CHThorIndexReadActivityBase, im
 {
 
 public:
-    CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node);
+    CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
     IMPLEMENT_IINTERFACE
 
     //interface IHThorInput
@@ -1573,7 +1573,7 @@ protected:
 };
 
 
-CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _node), helper(_arg), aggregated(_arg, _arg)
+CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg), aggregated(_arg, _arg)
 {
     eof = false;
     gathered = false;
@@ -1641,9 +1641,9 @@ const void *CHThorIndexGroupAggregateActivity::nextRow()
 }
 
 
-extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //-------------------------------------------------------------------------------------------------------------
@@ -2113,8 +2113,8 @@ class CHThorThreadedActivityBase : public CHThorActivityBase, implements IThread
     };
 
 public:
-    CHThorThreadedActivityBase (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, IPropertyTree *_node)
-        : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), fetch(_fetch)
+    CHThorThreadedActivityBase (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node)
+        : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), fetch(_fetch)
     {
         exception = NULL;
         rowLimit = 0;
@@ -2252,7 +2252,7 @@ public:
             if(dFile)
             {
                 verifyFetchFormatCrc(dFile);
-                agent.logFileAccess(dFile, "HThor", "READ");
+                agent.logFileAccess(dFile, "HThor", "READ", graph);
                 initParts(dFile);
             }
             else
@@ -2273,8 +2273,8 @@ protected:
 class CHThorFetchActivityBase : public CHThorThreadedActivityBase, public IFetchHandlerFactory<SimpleFetchPartHandlerBase>
 {
 public:
-    CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, IPropertyTree *_node)
-      : CHThorThreadedActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize, _node)
+    CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node)
+      : CHThorThreadedActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, _graph, diskSize, _node)
     {
         pendingSeq = 0;
         signalSeq = 0;
@@ -2426,8 +2426,8 @@ protected:
 class CHThorFlatFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
 {
 public:
-    CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, IPropertyTree *_node, MemoryAttr &encryptionkey)
-        : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize, _node), helper(_arg)
+    CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, EclGraph & _graph, IRecordSize *diskSize, IPropertyTree *_node, MemoryAttr &encryptionkey)
+        : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, _graph, diskSize, _node), helper(_arg)
     {}
 
     ~CHThorFlatFetchActivity()
@@ -2539,14 +2539,14 @@ protected:
     IHThorFetchArg & helper;
 };
 
-extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
     size32_t kl;
     void *k;
     arg.getFileEncryptKey(kl,k);
     MemoryAttr encryptionkey;
     encryptionkey.setOwn(kl,k);
-    return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, arg.queryDiskRecordSize(), _node, encryptionkey);
+    return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, _graph, arg.queryDiskRecordSize(), _node, encryptionkey);
 }
 
 //------------------------------------------------------------------------------------------
@@ -2554,8 +2554,8 @@ extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsi
 class CHThorCsvFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
 {
 public:
-    CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-        : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL, _node), helper(_arg)
+    CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+        : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, NULL, _node), helper(_arg)
     {
         //MORE: I have no idea what should be passed for recordSize in the line above, either something that reads a fixed size, or
         //reads a record based on the csv information
@@ -2575,7 +2575,7 @@ public:
             separators = options.queryProp("@csvSeparate");
             terminators = options.queryProp("@csvTerminate");
             escapes = options.queryProp("@csvEscape");
-            agent.logFileAccess(dFile, "HThor", "READ");
+            agent.logFileAccess(dFile, "HThor", "READ", graph);
         }
         else
         {
@@ -2643,9 +2643,9 @@ protected:
     IHThorCsvFetchArg & helper;
 };
 
-extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //------------------------------------------------------------------------------------------
@@ -2722,8 +2722,8 @@ protected:
 class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
 {
 public:
-    CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind, IPropertyTree *_node)
-        : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL, _node), helper(_arg)
+    CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+        : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, NULL, _node), helper(_arg)
     {
     }
 
@@ -2782,9 +2782,9 @@ protected:
     IHThorXmlFetchArg & helper;
 };
 
-extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }
 
 //------------------------------------------------------------------------------------------
@@ -3462,8 +3462,8 @@ class CHThorKeyedJoinActivity  : public CHThorThreadedActivityBase, implements I
     RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
     bool isCodeSigned = false;
 public:
-    CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
-        : CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _arg.queryDiskRecordSize(), _node), helper(_arg)
+    CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
+        : CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
     {
         atomic_set(&prefiltered, 0);
         atomic_set(&postfiltered, 0);
@@ -4028,7 +4028,7 @@ public:
                 lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
             else
                 lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
-            agent.logFileAccess(dFile, "HThor", "READ");
+            agent.logFileAccess(dFile, "HThor", "READ", graph);
         }
         else
         {
@@ -4236,7 +4236,7 @@ protected:
     }
 };
 
-extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind, IPropertyTree *_node)
+extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
 {
-    return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind, _node);
+    return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind, _graph, _node);
 }

+ 14 - 14
ecl/hthor/hthorstep.cpp

@@ -72,7 +72,7 @@ void CHThorSteppedInput::resetEOF()
 
 //---------------------------------------------------------------------------
 
-CHThorNaryActivity::CHThorNaryActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind) : CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind)
+CHThorNaryActivity::CHThorNaryActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorMultiInputActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph)
 {
 }
 
@@ -116,7 +116,7 @@ void CHThorNaryActivity::updateProgress(IStatisticGatherer &progress) const
 
 //---------------------------------------------------------------------------
 
-CHThorNWayMergeActivity::CHThorNWayMergeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &_arg, ThorActivityKind _kind) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
+CHThorNWayMergeActivity::CHThorNWayMergeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &_arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
 {
     merger.init(helper.queryCompare(), helper.dedup(), helper.querySteppingMeta()->queryCompare());
 }
@@ -175,7 +175,7 @@ const void * CHThorNWayMergeActivity::nextRowGE(const void * seek, unsigned numF
 
 //---------------------------------------------------------------------------
 
-CHThorMergeJoinBaseActivity::CHThorMergeJoinBaseActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, CMergeJoinProcessor & _processor) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), processor(_processor)
+CHThorMergeJoinBaseActivity::CHThorMergeJoinBaseActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph, CMergeJoinProcessor & _processor) : CHThorNaryActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), processor(_processor)
 {
 }
 
@@ -239,57 +239,57 @@ const void * CHThorMergeJoinBaseActivity::nextRowGE(const void * seek, unsigned
 
 //---------------------------------------------------------------------------
 
-CHThorAndMergeJoinActivity::CHThorAndMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, andProcessor), andProcessor(_arg)
+CHThorAndMergeJoinActivity::CHThorAndMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, andProcessor), andProcessor(_arg)
 {
 }
 
 
 //---------------------------------------------------------------------------
 
-CHThorAndLeftMergeJoinActivity::CHThorAndLeftMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, andLeftProcessor), andLeftProcessor(_arg)
+CHThorAndLeftMergeJoinActivity::CHThorAndLeftMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, andLeftProcessor), andLeftProcessor(_arg)
 {
 }
 
 
 //---------------------------------------------------------------------------
 
-CHThorMofNMergeJoinActivity::CHThorMofNMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, mofNProcessor), mofNProcessor(_arg)
+CHThorMofNMergeJoinActivity::CHThorMofNMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, mofNProcessor), mofNProcessor(_arg)
 {
 }
 
 
 //---------------------------------------------------------------------------
 
-CHThorProximityJoinActivity::CHThorProximityJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, proximityProcessor), proximityProcessor(_arg)
+CHThorProximityJoinActivity::CHThorProximityJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorMergeJoinBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, proximityProcessor), proximityProcessor(_arg)
 {
 }
 
 
 //---------------------------------------------------------------------------
 
-extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind)
+extern HTHOR_API IHThorActivity *createNWayMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph)
 {
     unsigned flags = _arg.getJoinFlags();
     if (flags & IHThorNWayMergeJoinArg::MJFhasrange)
-        return new CHThorProximityJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind);
+        return new CHThorProximityJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph);
 
     switch (flags & IHThorNWayMergeJoinArg::MJFkindmask)
     {
     case IHThorNWayMergeJoinArg::MJFinner:
-        return new CHThorAndMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind);
+        return new CHThorAndMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph);
     case IHThorNWayMergeJoinArg::MJFleftonly:
     case IHThorNWayMergeJoinArg::MJFleftouter:
-        return new CHThorAndLeftMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind);
+        return new CHThorAndLeftMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph);
     case IHThorNWayMergeJoinArg::MJFmofn:
-        return new CHThorMofNMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind);
+        return new CHThorMofNMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph);
     }
     UNIMPLEMENTED;
 }
 
 
-extern HTHOR_API IHThorActivity *createNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind)
+extern HTHOR_API IHThorActivity *createNWayJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph)
 {
-    return createNWayMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind);
+    return createNWayMergeJoinActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph);
 }
 
 

+ 7 - 7
ecl/hthor/hthorstep.ipp

@@ -46,7 +46,7 @@ protected:
     InputArrayType expandedInputs;
 
 public:
-    CHThorNaryActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, ThorActivityKind _kind);
+    CHThorNaryActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, ThorActivityKind _kind, EclGraph & _graph);
 
     //interface IHThorInput
     virtual void stop();
@@ -61,7 +61,7 @@ public:
 class CHThorNWayMergeActivity : public CHThorNaryActivity
 {
 public:
-    CHThorNWayMergeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &_arg, ThorActivityKind _kind);
+    CHThorNWayMergeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeArg &_arg, ThorActivityKind _kind, EclGraph & _graph);
     ~CHThorNWayMergeActivity();
 
     virtual void ready();
@@ -81,7 +81,7 @@ protected:
 class CHThorMergeJoinBaseActivity : public CHThorNaryActivity
 {
 public:
-    CHThorMergeJoinBaseActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, CMergeJoinProcessor & _processor);
+    CHThorMergeJoinBaseActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph, CMergeJoinProcessor & _processor);
 
     //interface IHThorInput
     virtual void ready();
@@ -104,7 +104,7 @@ protected:
 class CHThorAndMergeJoinActivity : public CHThorMergeJoinBaseActivity
 {
 public:
-    CHThorAndMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind);
+    CHThorAndMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph);
 
 protected:
     CAndMergeJoinProcessor andProcessor;
@@ -114,7 +114,7 @@ protected:
 class CHThorAndLeftMergeJoinActivity : public CHThorMergeJoinBaseActivity
 {
 public:
-    CHThorAndLeftMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind);
+    CHThorAndLeftMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph);
 
 protected:
     CAndLeftMergeJoinProcessor andLeftProcessor;
@@ -123,7 +123,7 @@ protected:
 class CHThorMofNMergeJoinActivity : public CHThorMergeJoinBaseActivity
 {
 public:
-    CHThorMofNMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind);
+    CHThorMofNMergeJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph);
 
 protected:
     CMofNMergeJoinProcessor mofNProcessor;
@@ -133,7 +133,7 @@ protected:
 class CHThorProximityJoinActivity : public CHThorMergeJoinBaseActivity
 {
 public:
-    CHThorProximityJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind);
+    CHThorProximityJoinActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorNWayMergeJoinArg & _arg, ThorActivityKind _kind, EclGraph & _graph);
 
 protected:
     CProximityJoinProcessor proximityProcessor;