Explorar o código

Merge pull request #13750 from jakesmith/hpcc-15886-thor-func-timing

HPCC-15886 Add function timing support to Thor

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday %!s(int64=5) %!d(string=hai) anos
pai
achega
a0559d0c24

+ 2 - 3
thorlcr/activities/wuidread/thwuidread.cpp

@@ -47,12 +47,11 @@ static bool getWorkunitResultFilename(CGraphElementBase &container, StringBuffer
 {
     try
     {
-        ICodeContextExt &codeContext = *QUERYINTERFACE(container.queryCodeContext(), ICodeContextExt);
         Owned<IConstWUResult> result;
         if (wuid)
-            result.setown(codeContext.getExternalResult(wuid, stepname, sequence));
+            result.setown(container.queryCodeContext()->getExternalResult(wuid, stepname, sequence));
         else
-            result.setown(codeContext.getResultForGet(stepname, sequence));
+            result.setown(container.queryCodeContext()->getResultForGet(stepname, sequence));
         if (!result)
             throw MakeThorException(TE_FailedToRetrieveWorkunitValue, "Failed to find value %s:%d in workunit %s", stepname?stepname:"(null)", sequence, wuid?wuid:"(null)");
 

+ 54 - 12
thorlcr/graph/thgraph.cpp

@@ -28,6 +28,8 @@
 #include "rtlformat.hpp"
 #include "thorsoapcall.hpp"
 #include "thorport.hpp"
+#include "roxiehelper.hpp"
+
 
 
 PointerArray createFuncs;
@@ -320,7 +322,47 @@ CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CG
     return container;
 }
 
-CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml) : owner(&_owner)
+///////////////////////////////////
+CActivityCodeContext::CActivityCodeContext()
+{
+}
+
+IThorChildGraph * CActivityCodeContext::resolveChildQuery(__int64 gid, IHThorArg * colocal)
+{
+    return parent->getChildGraph((graph_id)gid);
+}
+
+IEclGraphResults * CActivityCodeContext::resolveLocalQuery(__int64 gid)
+{
+    if (gid == containerGraph->queryGraphId())
+        return containerGraph;
+    else
+        return ctx->resolveLocalQuery(gid);
+}
+
+unsigned CActivityCodeContext::getGraphLoopCounter() const
+{
+    return containerGraph->queryLoopCounter();           // only called if value is valid
+}
+
+ISectionTimer * CActivityCodeContext::registerTimer(unsigned activityId, const char * name)
+{
+    if (!stats) // if master context and local CQ, there is no activity instance, and hence no setStats call
+        return queryNullSectionTimer();
+    CriticalBlock b(contextCrit);
+    auto it = functionTimers.find(name);
+    if (it != functionTimers.end())
+        return it->second;
+
+    ISectionTimer *timer = ThorSectionTimer::createTimer(*stats, name);    
+    functionTimers.insert({name, timer}); // owns
+    return timer;
+}
+
+///////////////////////////////////
+
+CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml, CGraphBase *_resultsGraph)
+    : owner(&_owner), resultsGraph(_resultsGraph)
 {
     xgmml.setown(createPTreeFromIPT(&_xgmml));
     eclText.set(xgmml->queryProp("att[@name=\"ecl\"]/@value"));
@@ -331,7 +373,6 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
     isLocalData = xgmml->getPropBool("att[@name=\"local\"]/@value", false); // local execute + local data access only
     isLocal = isLocalData || coLocal; // local execute
     isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
-    resultsGraph = NULL;
     ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
     onCreateCalled = prepared = haveCreateCtx = nullAct = false;
     onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
@@ -349,6 +390,11 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
     if (0 == maxCores)
         maxCores = queryJob().queryMaxDefaultActivityCores();
     baseHelper.setown(helperFactory());
+
+    CGraphBase *graphContainer = resultsGraph;
+    if (!graphContainer)
+        graphContainer = owner;
+    activityCodeContext.setContext(owner, graphContainer, &queryJobChannel().queryCodeContext());
 }
 
 CGraphElementBase::~CGraphElementBase()
@@ -775,13 +821,14 @@ void CGraphElementBase::createActivity()
     if (activity)
         return;
     activity.setown(factory());
+    activityCodeContext.setStats(&activity->queryStats());
     if (isSink())
         owner->addActiveSink(*this);
 }
 
-ICodeContext *CGraphElementBase::queryCodeContext()
+ICodeContextExt *CGraphElementBase::queryCodeContext()
 {
-    return queryOwner().queryCodeContext();
+    return &activityCodeContext;
 }
 
 /////
@@ -1886,12 +1933,6 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
     parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
 
     graphResultsContainer = resultsGraph;
-    CGraphBase *graphContainer = this;
-    if (resultsGraph)
-        graphContainer = resultsGraph; // JCSMORE is this right?
-
-    graphCodeContext.setContext(this, graphContainer, (ICodeContextExt *)&jobChannel.queryCodeContext());
-
 
     unsigned numResults = xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0);
     if (numResults)
@@ -2959,7 +3000,7 @@ void CJobChannel::wait()
         graphExecutor->wait();
 }
 
-ICodeContext &CJobChannel::queryCodeContext() const
+ICodeContextExt &CJobChannel::queryCodeContext() const
 {
     return *codeCtx;
 }
@@ -3140,7 +3181,8 @@ IThorResource &queryThor()
 //
 //
 
-CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
+CActivityBase::CActivityBase(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
+    : container(*_container), timeActivities(_container->queryJob().queryTimeActivities()), stats(statsMapping)
 {
     mpTag = TAG_NULL;
     abortSoon = receiving = cancelledReceive = initialized = reInit = false;

+ 122 - 122
thorlcr/graph/thgraph.hpp

@@ -31,6 +31,9 @@
 #define DEFAULT_MAX_ACTINITWAITTIME_MINS (2*60) // 2hrs
 #define DEFAULT_MAXLFN_BLOCKTIME_MINS 25 // 25 mins
 
+#include <unordered_map>
+#include <string>
+
 #include "jlib.hpp"
 #include "jarray.hpp"
 #include "jexcept.hpp"
@@ -43,6 +46,7 @@
 #include "mptag.hpp"
 
 #include "roxiemem.hpp"
+#include "thorstats.hpp"
 #include "thormisc.hpp"
 #include "workunit.hpp"
 #include "thorcommon.hpp"
@@ -258,6 +262,116 @@ typedef ArrayIIteratorOf<const CGraphArrayCopy, CGraphBase, IThorGraphIterator>
 
 typedef IIteratorOf<CGraphStub> IThorGraphStubIterator;
 
+
+
+class CActivityCodeContext : implements ICodeContextExt
+{
+    ICodeContextExt *ctx = nullptr;
+    CGraphBase *containerGraph = nullptr;
+    CGraphBase *parent = nullptr;
+    CRuntimeStatisticCollection *stats = nullptr;
+    mutable CriticalSection contextCrit;
+    std::unordered_map<std::string, Owned<ISectionTimer>> functionTimers;
+
+public:
+    CActivityCodeContext();
+    void setContext(CGraphBase *_parent, CGraphBase *_containerGraph, ICodeContextExt *_ctx)
+    {
+        parent = _parent;
+        containerGraph = _containerGraph;
+        ctx = _ctx;
+    }
+    void setStats(CRuntimeStatisticCollection *_stats) { stats = _stats; }
+    virtual const char *loadResource(unsigned id) { return ctx->loadResource(id); }
+    virtual void setResultBool(const char *name, unsigned sequence, bool value) { ctx->setResultBool(name, sequence, value); }
+    virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultData(name, sequence, len, data); }
+    virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val); }
+    virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { ctx->setResultInt(name, sequence, value, size); }
+    virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultRaw(name, sequence, len, data); }
+    virtual void setResultReal(const char * stepname, unsigned sequence, double value) { ctx->setResultReal(stepname, sequence, value); }
+    virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { ctx->setResultSet(name, sequence, isAll, len, data, transformer); }
+    virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { ctx->setResultString(name, sequence, len, str); }
+    virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { ctx->setResultUInt(name, sequence, value, size); }
+    virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { ctx->setResultUnicode(name, sequence, len, str); }
+    virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { ctx->setResultVarString(name, sequence, value); }
+    virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { ctx->setResultVarUnicode(name, sequence, value); }
+    virtual bool getResultBool(const char * name, unsigned sequence) { return ctx->getResultBool(name, sequence); }
+    virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { ctx->getResultData(tlen, tgt, name, sequence); }
+    virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence); }
+    virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
+    virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
+    virtual __int64 getResultInt(const char * name, unsigned sequence) { return ctx->getResultInt(name, sequence); }
+    virtual double getResultReal(const char * name, unsigned sequence) { return ctx->getResultReal(name, sequence); }
+    virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { ctx->getResultString(tlen, tgt, name, sequence); }
+    virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { ctx->getResultStringF(tlen, tgt, name, sequence); }
+    virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { ctx->getResultUnicode(tlen, tgt, name, sequence); }
+    virtual char *getResultVarString(const char * name, unsigned sequence) { return ctx->getResultVarString(name, sequence); }
+    virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { return ctx->getResultVarUnicode(name, sequence); }
+    virtual unsigned getResultHash(const char * name, unsigned sequence) { return ctx->getResultHash(name, sequence); }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { return ctx->getExternalResultHash(wuid, name, sequence); }
+    virtual const char *cloneVString(const char *str) const { return ctx->cloneVString(str); }
+    virtual const char *cloneVString(size32_t len, const char *str) const { return ctx->cloneVString(len, str); }
+    virtual char *getWuid() { return ctx->getWuid(); }
+    virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer); }
+    virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract); }
+    virtual char * getExpandLogicalName(const char * logicalName) { return ctx->getExpandLogicalName(logicalName); }
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { ctx->addWuException(text, code, severity, source); }
+    virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort); }
+    virtual IUserDescriptor *queryUserDescriptor() { return ctx->queryUserDescriptor(); }
+    virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { return ctx->getDatasetHash(name, hash); }
+    virtual unsigned getNodes() { return ctx->getNodes(); }
+    virtual unsigned getNodeNum() { return ctx->getNodeNum(); }
+    virtual char *getFilePart(const char *logicalPart, bool create) { return ctx->getFilePart(logicalPart, create); }
+    virtual unsigned __int64 getFileOffset(const char *logicalPart) { return ctx->getFileOffset(logicalPart); }
+    virtual IDistributedFileTransaction *querySuperFileTransaction() { return ctx->querySuperFileTransaction(); }
+    virtual char *getJobName() { return ctx->getJobName(); }
+    virtual char *getJobOwner() { return ctx->getJobOwner(); }
+    virtual char *getClusterName() { return ctx->getClusterName(); }
+    virtual char *getGroupName() { return ctx->getGroupName(); }
+    virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { return ctx->queryIndexMetaData(lfn, xpath); }
+    virtual unsigned getPriority() const { return ctx->getPriority(); }
+    virtual char *getPlatform() { return ctx->getPlatform(); }
+    virtual char *getEnv(const char *name, const char *defaultValue) const { return ctx->getEnv(name, defaultValue); }
+    virtual char *getOS() { return ctx->getOS(); }
+    virtual IThorChildGraph * resolveChildQuery(__int64 gid, IHThorArg * colocal);
+    virtual IEclGraphResults * resolveLocalQuery(__int64 gid);
+    virtual char *getEnv(const char *name, const char *defaultValue) { return ctx->getEnv(name, defaultValue); }
+    virtual unsigned logString(const char * text) const { return ctx->logString(text); }
+    virtual const IContextLogger &queryContextLogger() const { return ctx->queryContextLogger(); }
+    virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { return ctx->getRowAllocator(meta, activityId); }
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const { return ctx->getRowAllocatorEx(meta, activityId, heapFlags); }
+    virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override { ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer); }
+    virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt,IEngineRowAllocator * _rowAllocator,  const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override { ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher); }
+
+    virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { convertRowToXML(lenResult, result, info, row, flags); }
+    virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { convertRowToJSON(lenResult, result, info, row, flags); }
+    virtual IDebuggableContext *queryDebugContext() const override { return ctx->queryDebugContext(); }
+    virtual unsigned getGraphLoopCounter() const;
+    virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) { return ctx->getExternalResult(wuid, name, sequence); }
+    virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) { return ctx->getResultForGet(name, sequence); }
+    virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
+    {
+        return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
+    }
+    virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
+    {
+        return ctx->fromJson(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
+    }
+    virtual IEngineContext *queryEngineContext()
+    {
+        return ctx->queryEngineContext();
+    }
+    virtual char *getDaliServers()
+    {
+        return ctx->getDaliServers();
+    }
+    virtual IWorkUnit *updateWorkUnit() const { return ctx->updateWorkUnit(); }
+    virtual ISectionTimer * registerTimer(unsigned activityId, const char * name);
+
+    virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char * source) override { ctx->addWuExceptionEx(text, code, severity, audience, source); }
+};
+
+
 class CJobBase;
 class CJobChannel;
 class graph_decl CGraphElementBase : public CInterface, implements IInterface
@@ -278,6 +392,7 @@ protected:
     bool haveCreateCtx;
     unsigned maxCores;
     bool isCodeSigned = false;
+    CActivityCodeContext activityCodeContext;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -292,7 +407,7 @@ public:
     unsigned whichBranch;
     Owned<IBitSet> sentActInitData;
 
-    CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml);
+    CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml, CGraphBase *resultsGraph);
     ~CGraphElementBase();
 
     void doconnect();
@@ -374,7 +489,7 @@ public:
     virtual CActivityBase *factory(ThorActivityKind kind) { assertex(false); return NULL; }
     virtual CActivityBase *factory() { return factory(getKind()); }
     virtual CActivityBase *factorySet(ThorActivityKind kind) { CActivityBase *_activity = factory(kind); activity.setown(_activity); return _activity; }
-    virtual ICodeContext *queryCodeContext();
+    virtual ICodeContextExt *queryCodeContext();
     virtual bool activityIsCodeSigned() const { return isCodeSigned; }
 };
 
@@ -476,122 +591,6 @@ class graph_decl CGraphBase : public CGraphStub, implements IEclGraphResults
 
     void clean();
 
-    class CGraphCodeContext : implements ICodeContextExt
-    {
-        ICodeContextExt *ctx = nullptr;
-        CGraphBase *containerGraph = nullptr;
-        CGraphBase *parent = nullptr;
-    public:
-        CGraphCodeContext() { }
-        void setContext(CGraphBase *_parent, CGraphBase *_containerGraph, ICodeContextExt *_ctx)
-        {
-            parent = _parent;
-            containerGraph = _containerGraph;
-            ctx = _ctx;
-        }
-        virtual const char *loadResource(unsigned id) { return ctx->loadResource(id); }
-        virtual void setResultBool(const char *name, unsigned sequence, bool value) { ctx->setResultBool(name, sequence, value); }
-        virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultData(name, sequence, len, data); }
-        virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val); }
-        virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { ctx->setResultInt(name, sequence, value, size); }
-        virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultRaw(name, sequence, len, data); }
-        virtual void setResultReal(const char * stepname, unsigned sequence, double value) { ctx->setResultReal(stepname, sequence, value); }
-        virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { ctx->setResultSet(name, sequence, isAll, len, data, transformer); }
-        virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { ctx->setResultString(name, sequence, len, str); }
-        virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { ctx->setResultUInt(name, sequence, value, size); }
-        virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { ctx->setResultUnicode(name, sequence, len, str); }
-        virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { ctx->setResultVarString(name, sequence, value); }
-        virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { ctx->setResultVarUnicode(name, sequence, value); }
-        virtual bool getResultBool(const char * name, unsigned sequence) { return ctx->getResultBool(name, sequence); }
-        virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { ctx->getResultData(tlen, tgt, name, sequence); }
-        virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence); }
-        virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
-        virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
-        virtual __int64 getResultInt(const char * name, unsigned sequence) { return ctx->getResultInt(name, sequence); }
-        virtual double getResultReal(const char * name, unsigned sequence) { return ctx->getResultReal(name, sequence); }
-        virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { ctx->getResultString(tlen, tgt, name, sequence); }
-        virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { ctx->getResultStringF(tlen, tgt, name, sequence); }
-        virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { ctx->getResultUnicode(tlen, tgt, name, sequence); }
-        virtual char *getResultVarString(const char * name, unsigned sequence) { return ctx->getResultVarString(name, sequence); }
-        virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { return ctx->getResultVarUnicode(name, sequence); }
-        virtual unsigned getResultHash(const char * name, unsigned sequence) { return ctx->getResultHash(name, sequence); }
-        virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { return ctx->getExternalResultHash(wuid, name, sequence); }
-        virtual const char *cloneVString(const char *str) const { return ctx->cloneVString(str); }
-        virtual const char *cloneVString(size32_t len, const char *str) const { return ctx->cloneVString(len, str); }
-        virtual char *getWuid() { return ctx->getWuid(); }
-        virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer); }
-        virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract); }
-        virtual char * getExpandLogicalName(const char * logicalName) { return ctx->getExpandLogicalName(logicalName); }
-        virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { ctx->addWuException(text, code, severity, source); }
-        virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort); }
-        virtual IUserDescriptor *queryUserDescriptor() { return ctx->queryUserDescriptor(); }
-        virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { return ctx->getDatasetHash(name, hash); }
-        virtual unsigned getNodes() { return ctx->getNodes(); }
-        virtual unsigned getNodeNum() { return ctx->getNodeNum(); }
-        virtual char *getFilePart(const char *logicalPart, bool create) { return ctx->getFilePart(logicalPart, create); }
-        virtual unsigned __int64 getFileOffset(const char *logicalPart) { return ctx->getFileOffset(logicalPart); }
-        virtual IDistributedFileTransaction *querySuperFileTransaction() { return ctx->querySuperFileTransaction(); }
-        virtual char *getJobName() { return ctx->getJobName(); }
-        virtual char *getJobOwner() { return ctx->getJobOwner(); }
-        virtual char *getClusterName() { return ctx->getClusterName(); }
-        virtual char *getGroupName() { return ctx->getGroupName(); }
-        virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { return ctx->queryIndexMetaData(lfn, xpath); }
-        virtual unsigned getPriority() const { return ctx->getPriority(); }
-        virtual char *getPlatform() { return ctx->getPlatform(); }
-        virtual char *getEnv(const char *name, const char *defaultValue) const { return ctx->getEnv(name, defaultValue); }
-        virtual char *getOS() { return ctx->getOS(); }
-        virtual IThorChildGraph * resolveChildQuery(__int64 gid, IHThorArg * colocal)
-        {
-            return parent->getChildGraph((graph_id)gid);
-        }
-        virtual IEclGraphResults * resolveLocalQuery(__int64 gid)
-        {
-            if (gid == containerGraph->queryGraphId())
-                return containerGraph;
-            else
-                return ctx->resolveLocalQuery(gid);
-        }
-        virtual char *getEnv(const char *name, const char *defaultValue) { return ctx->getEnv(name, defaultValue); }
-        virtual unsigned logString(const char * text) const { return ctx->logString(text); }
-        virtual const IContextLogger &queryContextLogger() const { return ctx->queryContextLogger(); }
-        virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { return ctx->getRowAllocator(meta, activityId); }
-        virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const { return ctx->getRowAllocatorEx(meta, activityId, heapFlags); }
-        virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override { ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer); }
-        virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt,IEngineRowAllocator * _rowAllocator,  const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override { ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher); }
-
-        virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { convertRowToXML(lenResult, result, info, row, flags); }
-        virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { convertRowToJSON(lenResult, result, info, row, flags); }
-        virtual IDebuggableContext *queryDebugContext() const override { return ctx->queryDebugContext(); }
-        virtual unsigned getGraphLoopCounter() const
-        {
-            return containerGraph->queryLoopCounter();           // only called if value is valid
-        }
-        virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) { return ctx->getExternalResult(wuid, name, sequence); }
-        virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) { return ctx->getResultForGet(name, sequence); }
-        virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
-        {
-            return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
-        }
-        virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
-        {
-            return ctx->fromJson(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
-        }
-        virtual IEngineContext *queryEngineContext()
-        {
-            return ctx->queryEngineContext();
-        }
-        virtual char *getDaliServers()
-        {
-            return ctx->getDaliServers();
-        }
-        virtual IWorkUnit *updateWorkUnit() const { return ctx->updateWorkUnit(); }
-        virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
-        {
-            return ctx->registerTimer(activityId, name);
-        }
-        virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char * source) override { ctx->addWuExceptionEx(text, code, severity, audience, source); }
-   } graphCodeContext;
-
 protected:
     Owned<IThorGraphResults> localResults, graphLoopResults;
     CGraphBase *owner, *parent, *graphResultsContainer;
@@ -664,7 +663,6 @@ public:
         sz = parentExtractSz;
         return (const byte *)parentExtractMb.toByteArray();
     }
-    virtual ICodeContext *queryCodeContext() { return &graphCodeContext; }
     void setLoopCounter(unsigned _counter) { counter = _counter; }
     unsigned queryLoopCounter() const { return counter; }
     virtual void setComplete(bool tf=true) { complete=tf; }
@@ -1028,7 +1026,7 @@ public:
         return LINK(allGraphs.find(gid));
     }
 
-    ICodeContext &queryCodeContext() const;
+    ICodeContextExt &queryCodeContext() const;
     ICodeContext &querySharedMemCodeContext() const;
     IThorResult *getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId);
     IThorAllocator *queryThorAllocator() const { return thorAllocator; }
@@ -1079,10 +1077,12 @@ protected:
     bool timeActivities; // purely for access efficiency
     bool receiving, cancelledReceive, initialized, reInit;
     Owned<IThorGraphResults> ownedResults; // NB: probably only to be used by loop results
+    CRuntimeStatisticCollection stats;
 
 public:
-    CActivityBase(CGraphElementBase *container);
+    CActivityBase(CGraphElementBase *container, const StatisticsMapping &statsMapping);
     ~CActivityBase();
+    CRuntimeStatisticCollection &queryStats() { return stats; }
     inline activity_id queryId() const { return container.queryId(); }
     CGraphElementBase &queryContainer() const { return container; }
     CJobBase &queryJob() const { return container.queryJob(); }
@@ -1145,7 +1145,7 @@ public:
     virtual IOutputRowDeserializer * queryRowDeserializer(); 
     virtual IOutputMetaData *queryRowMetaData() { return baseHelper->queryOutputMeta(); }
     virtual unsigned queryActivityId() const { return (unsigned)queryId(); }
-    virtual ICodeContext *queryCodeContext() { return container.queryCodeContext(); }
+    virtual ICodeContextExt *queryCodeContext() { return container.queryCodeContext(); }
     virtual roxiemem::IRowManager *queryRowManager() const { return queryJobChannel().queryRowManager(); }
 
     StringBuffer &getOpt(const char *prop, StringBuffer &out) const { return container.getOpt(prop, out); }

+ 2 - 2
thorlcr/graph/thgraphmaster.cpp

@@ -363,7 +363,7 @@ void CSlaveMessageHandler::threadmain()
 //////////////////////
 
 CMasterActivity::CMasterActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
-    : CActivityBase(_container), threaded("CMasterActivity", this), statsCollection(statsMapping)
+    : CActivityBase(_container, statsMapping), threaded("CMasterActivity", this), statsCollection(statsMapping)
 {
     notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
@@ -550,7 +550,7 @@ void CMasterActivity::done()
 // CMasterGraphElement impl.
 //
 
-CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml)
+CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml, nullptr)
 {
 }
 

+ 2 - 7
thorlcr/graph/thgraphslave.cpp

@@ -127,7 +127,7 @@ bool CThorInput::isFastThrough() const
 // 
 
 CSlaveActivity::CSlaveActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
-    : CActivityBase(_container), stats(statsMapping), CEdgeProgress(this)
+    : CActivityBase(_container, statsMapping), CEdgeProgress(this)
 {
     data = NULL;
 }
@@ -558,7 +558,6 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
     // JCS->GH - should these be serialized as cycles, and a different mapping used on master?
     stats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
     stats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
-
     stats.serialize(mb);
     ForEachItemIn(i, outputs)
     {
@@ -1248,13 +1247,9 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
             {
                 CGraphElementBase &element = iter->query();
                 CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
-                unsigned pos = mb.length();
                 mb.append(activity.queryContainer().queryId());
                 activity.serializeStats(mb);
-                if (pos == mb.length()-sizeof(activity_id))
-                    mb.rewrite(pos);
-                else
-                    ++count;
+                ++count;
             }
             mb.writeDirect(cPos, sizeof(count), &count);
         }

+ 1 - 2
thorlcr/graph/thgraphslave.hpp

@@ -203,7 +203,6 @@ protected:
     CThorInputArray inputs;
     IPointerArrayOf<IThorDataLink> outputs;
     IPointerArrayOf<IEngineRowStream> outputStreams;
-    CRuntimeStatisticCollection stats;
 
     IThorDataLink *input = nullptr;
     bool inputStopped = false;
@@ -420,7 +419,7 @@ public:
 class graphslave_decl CSlaveGraphElement : public CGraphElementBase
 {
 public:
-    CSlaveGraphElement(CGraphBase &owner, IPropertyTree &xgmml) : CGraphElementBase(owner, xgmml)
+    CSlaveGraphElement(CGraphBase &owner, IPropertyTree &xgmml, CGraphBase *resultsGraph) : CGraphElementBase(owner, xgmml, resultsGraph)
     {
     }
 };

+ 2 - 2
thorlcr/slave/slave.cpp

@@ -278,7 +278,7 @@ class CGenericSlaveGraphElement : public CSlaveGraphElement
     Owned<CActivityBase> nullActivity;
     CriticalSection nullActivityCs;
 public:
-    CGenericSlaveGraphElement(CGraphBase &_owner, IPropertyTree &xgmml) : CSlaveGraphElement(_owner, xgmml)
+    CGenericSlaveGraphElement(CGraphBase &_owner, IPropertyTree &xgmml, CGraphBase *resultsGraph) : CSlaveGraphElement(_owner, xgmml, resultsGraph)
     {
         wuidread2diskread = false;
         switch (getKind())
@@ -784,7 +784,7 @@ public:
 
 activityslaves_decl CGraphElementBase *createSlaveContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph)
 {
-    return new CGenericSlaveGraphElement(owner, xgmml);
+    return new CGenericSlaveGraphElement(owner, xgmml, resultsGraph);
 }
 
 activityslaves_decl IThorRowInterfaces *queryRowInterfaces(IThorDataLink *link) { return link?link->queryFromActivity():NULL; }