瀏覽代碼

Merge branch 'candidate-6.2.0' into candidate-6.4.0

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 年之前
父節點
當前提交
116fab0239

+ 17 - 0
common/thorhelper/enginecontext.hpp

@@ -21,10 +21,27 @@
 #include "jsocket.hpp"
 #include "dacoven.hpp"
 
+typedef void (* QueryTermCallback)(const char *queryId);
+
+class TerminationCallbackInfo : public CInterface
+{
+public:
+    TerminationCallbackInfo(QueryTermCallback _callback, const char *_id) : callback(_callback), id(_id) {}
+    ~TerminationCallbackInfo()
+    {
+        callback(id);
+    }
+protected:
+    QueryTermCallback callback;
+    StringAttr id;
+};
+
 interface IEngineContext
 {
     virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode) = 0;
     virtual bool allowDaliAccess() const = 0;
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const = 0;
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const = 0;
 };
 
 #endif // ENGINECONTEXT_HPP

+ 14 - 0
common/workunit/workunit.cpp

@@ -1750,6 +1750,7 @@ public:
     virtual IProperties *queryResultXmlns();
     virtual IStringVal& getResultFieldOpt(const char *name, IStringVal &str) const;
     virtual void getSchema(IArrayOf<ITypeInfo> &types, StringAttrArray &names, IStringVal * ecl=NULL) const;
+    virtual void        getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const;
 
     // interface IWUResult
     virtual void        setResultStatus(WUResultStatus status);
@@ -1784,6 +1785,7 @@ public:
     virtual void        setResultRow(unsigned len, const void * data);
     virtual void        setResultXmlns(const char *prefix, const char *uri);
     virtual void        setResultFieldOpt(const char *name, const char *value);
+    virtual void        setResultWriteLocation(const char * _graph, unsigned _activityId);
 
     virtual IPropertyTree *queryPTree() { return p; }
 };
@@ -7950,6 +7952,12 @@ IStringVal& CLocalWUResult::getResultFieldOpt(const char *name, IStringVal &str)
     return str;
 }
 
+void CLocalWUResult::getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const
+{
+    _graph.set(p->queryProp("@graph"));
+    _activityId = p->getPropInt("@activity", 0);
+}
+
 void CLocalWUResult::setResultStatus(WUResultStatus status)
 {
     setEnum(p, "@status", status, resultStatuses);
@@ -7993,6 +8001,12 @@ void CLocalWUResult::setResultFieldOpt(const char *name, const char *value)
     format->setProp(xpath, value);
 }
 
+void CLocalWUResult::setResultWriteLocation(const char * _graph, unsigned _activityId)
+{
+    p->setProp("@graph", _graph);
+    p->setPropInt("@activity", _activityId);
+}
+
 void CLocalWUResult::setResultScalar(bool isScalar)
 {
     p->setPropInt("@isScalar", (int) isScalar);

+ 2 - 0
common/workunit/workunit.hpp

@@ -291,6 +291,7 @@ interface IConstWUResult : extends IInterface
     virtual const IProperties *queryResultXmlns() = 0;
     virtual IStringVal &getResultFieldOpt(const char *name, IStringVal &str) const = 0;
     virtual void getSchema(IArrayOf<ITypeInfo> &types, StringAttrArray &names, IStringVal * eclText) const = 0;
+    virtual void getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const = 0;
 };
 
 
@@ -328,6 +329,7 @@ interface IWUResult : extends IConstWUResult
     virtual void setResultRow(unsigned len, const void * data) = 0;
     virtual void setResultXmlns(const char *prefix, const char *uri) = 0;
     virtual void setResultFieldOpt(const char *name, const char *value)=0;
+    virtual void setResultWriteLocation(const char * _graph, unsigned _activityId) = 0;
 
     virtual IPropertyTree *queryPTree() = 0;
 };

+ 4 - 2
ecl/eclagent/eclagent.cpp

@@ -4200,7 +4200,7 @@ public:
     {
     }
 
-    IRoxieProbe *createProbe(IInputBase *in, IEngineRowStream *stream, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration)
+    IRoxieProbe *createProbe(IInputBase *in, IActivityBase *sourceAct, IActivityBase *targetAct, unsigned sourceIdx, unsigned targetIdx, unsigned iteration) override
     {
         CriticalBlock b(crit);
         unsigned channel = debugContext->queryChannel();
@@ -4208,7 +4208,9 @@ public:
         unsigned targetId = targetAct->queryId();
         DebugActivityRecord *sourceActRecord = noteActivity(sourceAct, iteration, channel, debugContext->querySequence());
         DebugActivityRecord *targetActRecord = noteActivity(targetAct, iteration, channel, debugContext->querySequence());
-        DebugProbe *probe = new DebugProbe(dynamic_cast<IHThorInput*>(in), stream, sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
+        IHThorInput *hin = dynamic_cast<IHThorInput*>(in);
+        assertex(hin);
+        DebugProbe *probe = new DebugProbe(hin, &hin->queryStream(), sourceId, sourceIdx, sourceActRecord, targetId, targetIdx, targetActRecord, iteration, channel, debugContext);
     #ifdef _DEBUG
         DBGLOG("Creating probe for edge id %s in graphManager %p", probe->queryEdgeId(), this);
     #endif

+ 33 - 2
ecl/eclagent/eclagent.ipp

@@ -33,6 +33,7 @@
 #include <stdexcept> 
 #include "thorplugin.hpp"
 #include "thorcommon.hpp"
+#include "enginecontext.hpp"
 
 #define MAX_EDGEDATA_LENGTH 30000
 #define MAX_HEX_SIZE 500
@@ -329,7 +330,7 @@ public:
 
 
 class CHThorDebugContext;
-class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, public CInterface
+class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, implements IEngineContext, public CInterface
 {
 private:
     friend class EclAgentWorkflowMachine;
@@ -503,7 +504,7 @@ public:
     virtual IOrderedOutputSerializer * queryOutputSerializer() { return outputSerializer; }
     virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace);
     virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace);
-    virtual IEngineContext *queryEngineContext() { return NULL; }
+    virtual IEngineContext *queryEngineContext() { return this; }
     virtual char *getDaliServers();
 
     unsigned __int64 queryStopAfter() { return stopAfter; }
@@ -512,6 +513,36 @@ public:
     {
         return queryNullSectionTimer();
     }
+// IEngineContext
+    virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
+    {
+        if (num==0)
+            return 0;
+        SocketEndpoint foreignNode;
+        if (_foreignNode && !_foreignNode->isNull())
+            foreignNode.set(*_foreignNode);
+        else
+        {
+            const char *dali = getDaliServers();
+            if (!dali)
+                return 0;
+            foreignNode.set(dali);
+            free((char *) dali);
+        }
+        return ::getGlobalUniqueIds(num, &foreignNode);
+    }
+    virtual bool allowDaliAccess() const  { return true; }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        result.append("workunit"); // No distinction between global, workunit and query scopes for eclagent
+        return result;
+    }
+
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        // No need to unregister, since scope lasts until exe terminates
+    }
+
 
 //New workflow interface
     virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }

+ 18 - 8
ecl/hqlcpp/hqlhtcpp.cpp

@@ -5369,16 +5369,26 @@ void HqlCppTranslator::buildSetResultInfo(BuildCtx & ctx, IHqlExpression * origi
             }
         }
 
-        IHqlExpression * format =  originalExpr->queryAttribute(storedFieldFormatAtom);
-        if (format)
+        if (result)
         {
-            ForEachChild(i, format)
+            ActivityInstance * activity = queryCurrentActivity(ctx);
+            if (activity)
             {
-                StringBuffer name;
-                StringBuffer value;
-                OwnedHqlExpr folded = foldHqlExpression(format->queryChild(i));
-                getHintNameValue(folded, name, value);
-                result->setResultFieldOpt(name, value);
+                const char * graphName = activeGraph->name;
+                result->setResultWriteLocation(graphName, activity->activityId);
+            }
+
+            IHqlExpression * format = originalExpr->queryAttribute(storedFieldFormatAtom);
+            if (format)
+            {
+                ForEachChild(i, format)
+                {
+                    StringBuffer name;
+                    StringBuffer value;
+                    OwnedHqlExpr folded = foldHqlExpression(format->queryChild(i));
+                    getHintNameValue(folded, name, value);
+                    result->setResultFieldOpt(name, value);
+                }
             }
         }
     }

+ 3 - 3
ecllibrary/std/BLAS.ecl

@@ -133,7 +133,7 @@ EXPORT Types.matrix_t
    * A = L  * L**T,  if UPLO = 'L',
    * where U is an upper triangular matrix and L is lower triangular.
    * This is the unblocked version of the algorithm, calling Level 2 BLAS.
-   * @param tri indocate whether upper or lower triangle is used
+   * @param tri indicate whether upper or lower triangle is used
    * @param r number of rows/columns in the square matrix
    * @param A the square matrix
    * @param clear clears the unused triangle
@@ -234,10 +234,10 @@ EXPORT Types.matrix_t
    * @return the triangle
    */
   EXPORT Types.matrix_t
-         Extract_Tri(Types.dimension_t m, Types.dimension_t n,
+         extract_tri(Types.dimension_t m, Types.dimension_t n,
                      Types.Triangle tri, Types.Diagonal dt,
                      Types.matrix_t a)
-       := LIB_ECLBLAS.ECLBLAS.Extract_Tri(m, n, tri, dt, a);
+       := LIB_ECLBLAS.ECLBLAS.extract_tri(m, n, tri, dt, a);
 
   /**
    * Generate a diagonal matrix.

+ 15 - 0
ecllibrary/teststd/BLAS/Test_extract_tri.ecl

@@ -0,0 +1,15 @@
+IMPORT Std.BLAS AS BLAS;
+IMPORT BLAS.Types;
+Diagonal := Types.Diagonal;
+Triangle := Types.Triangle;
+
+SET OF REAL8 init1 := [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0];
+
+EXPORT Test_extract_tri := MODULE
+  EXPORT TestRuntime := MODULE
+    EXPORT test01 := ASSERT(BLAS.extract_tri(3, 3, Triangle.Upper, Diagonal.UnitTri, init1)   =[1.0,0.0,0.0,4.0,1.0,0.0,7.0,8.0,1.0]);
+    EXPORT test02 := ASSERT(BLAS.extract_tri(3, 3, Triangle.Upper, Diagonal.NotUnitTri, init1)=[1.0,0.0,0.0,4.0,5.0,0.0,7.0,8.0,9.0]);
+    EXPORT test03 := ASSERT(BLAS.extract_tri(3, 3, Triangle.Lower, Diagonal.UnitTri, init1)   =[1.0,2.0,3.0,0.0,1.0,6.0,0.0,0.0,1.0]);
+    EXPORT test04 := ASSERT(BLAS.extract_tri(3, 3, Triangle.Lower, Diagonal.NotUnitTri, init1)=[1.0,2.0,3.0,0.0,5.0,6.0,0.0,0.0,9.0]);
+  END;
+END;

+ 1 - 1
esp/services/esdl_svc_engine/esdl_svc_engine.cpp

@@ -19,7 +19,7 @@
 #include "esdl_svc_engine.hpp"
 #include "params2xml.hpp"
 
-void CEsdlSvcEngine::init(IPropertyTree *cfg, const char *process, const char *service)
+void CEsdlSvcEngine::init(const IPropertyTree *cfg, const char *process, const char *service)
 {
     EsdlServiceImpl::init(cfg, process, service);
 

+ 4 - 4
esp/services/esdl_svc_engine/esdl_svc_engine.hpp

@@ -35,10 +35,10 @@ public:
 
     ~CEsdlSvcEngine();
 
-    virtual void init(IPropertyTree *cfg, const char *process, const char *service);
-    void generateTransactionId(IEspContext & context, StringBuffer & trxid);
-    virtual IPropertyTree *createTargetContext(IEspContext &context, IPropertyTree *tgtcfg, IEsdlDefService &srvdef, IEsdlDefMethod &mthdef, IPropertyTree *req_pt);
-    virtual void esdl_log(IEspContext &context, IEsdlDefService &srvdef, IEsdlDefMethod &mthdef, IPropertyTree *tgtcfg, IPropertyTree *tgtctx, IPropertyTree *req_pt, const char *xmlresp, const char *logdata, unsigned int timetaken);
+    virtual void init(const IPropertyTree *cfg, const char *process, const char *service) override;
+    void generateTransactionId(IEspContext & context, StringBuffer & trxid) override;
+    virtual IPropertyTree *createTargetContext(IEspContext &context, IPropertyTree *tgtcfg, IEsdlDefService &srvdef, IEsdlDefMethod &mthdef, IPropertyTree *req_pt) override;
+    virtual void esdl_log(IEspContext &context, IEsdlDefService &srvdef, IEsdlDefMethod &mthdef, IPropertyTree *tgtcfg, IPropertyTree *tgtctx, IPropertyTree *req_pt, const char *xmlresp, const char *logdata, unsigned int timetaken) override;
 };
 
 class CEsdlSvcEngineSoapBindingEx : public EsdlBindingImpl

+ 42 - 23
plugins/cassandra/cassandrawu.cpp

@@ -830,7 +830,7 @@ struct CassandraTableInfo
 };
 
 static const int majorVersion = 1;  // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
-static const int minorVersion = 1;  // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
+static const int minorVersion = 2;  // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
                                     // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
                                     // Make sure to increment this if any column is ever added below
 
@@ -1077,7 +1077,9 @@ static const ChildTableInfo wuGraphMetasTable =
         {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},  /* This is the number of rows in value */ \
         {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},              \
         {"logicalName", "text", "logicalName", stringColumnMapper},        /* either this or value will be present once result status is "calculated" */ \
-        {"value", "blob", "Value", blobColumnMapper}
+        {"value", "blob", "Value", blobColumnMapper}, \
+        {"graph", "text", "@graph", stringColumnMapper}, \
+        {"activity", "int", "@activity", intColumnMapper}
 
 static const CassandraXmlMapping wuResultsMappings [] =
 {
@@ -3052,36 +3054,53 @@ public:
         cluster.setOptions(options);
         if (!cluster.queryKeySpace())
             cluster.setKeySpace("hpcc");
-        cluster.connect();
-        Owned<IPTree> versionInfo = getVersionInfo();
-        if (versionInfo)
+        try
         {
-            int major = versionInfo->getPropInt("@major", 0);
-            int minor = versionInfo->getPropInt("@minor", 0);
-            if (major && minor)
+            cluster.connect();
+            Owned<IPTree> versionInfo = getVersionInfo();
+            if (versionInfo)
             {
-                // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
-                if (major != majorVersion)
-                    throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
-                if (minor != minorVersion)
+                int major = versionInfo->getPropInt("@major", 0);
+                int minor = versionInfo->getPropInt("@minor", 0);
+                if (major && minor)
                 {
-                    if (minor < minorVersion)
+                    // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
+                    if (major != majorVersion)
+                        throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
+                    if (minor != minorVersion)
                     {
-                        DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
-                        switch (minor)
+                        if (minor < minorVersion)
                         {
-                        // Add code here to create any columns that we need to to get from version "minor" to expected layout
+                            DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
+                            switch (minor)
+                            {
+                            case 1:
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
+                                break;
+                            }
+                            createVersionTable(true);
                         }
+                        else
+                            DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
                     }
-                    else
-                        DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
                 }
             }
+            else
+            {
+                DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
+                cluster.disconnect();
+            }
         }
-        else
+        catch (IException *E)
         {
+            EXCLOG(E);
+            E->Release();
             DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
-            cluster.disconnect();
         }
         cacheRetirer.start();
         LINK(_dll);  // Yes, this leaks. Not really sure how to avoid that.
@@ -3707,7 +3726,7 @@ public:
         executeSimpleCommand(s, create);
         s.set(NULL);
         cluster.connect();
-        createVersionTable();
+        createVersionTable(false);
         ensureTable(querySession(), workunitsMappings);
         ensureTable(querySession(), searchMappings);
         ensureTable(querySession(), uniqueSearchMappings);
@@ -3732,12 +3751,12 @@ public:
         return cluster.prepareStatement(query, traceLevel>=2);
     }
 private:
-    void createVersionTable()
+    void createVersionTable(bool force)
     {
         StringBuffer schema;
         executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
         Owned<IPTree> oldVersion = getVersionInfo();
-        if (!oldVersion)
+        if (force || !oldVersion)
         {
             VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
             CassandraBatch versionBatch(cass_batch_new(CASS_BATCH_TYPE_LOGGED));

+ 1 - 1
plugins/eclblas/CMakeLists.txt

@@ -37,7 +37,7 @@ if(USE_CBLAS)
         dsyrk.cpp
         dtrsm.cpp
         eclblas.cpp
-        Extract_Tri.cpp
+        extract_tri.cpp
         make_diag.cpp)
 
     include_directories(

+ 4 - 0
plugins/eclblas/dasum.cpp

@@ -20,9 +20,13 @@
 
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL double dasum(uint32_t m, bool isAllX, size32_t lenX, const void * x,
                           uint32_t incx, uint32_t skipped) {
   const double* X = ((const double*)x) + skipped;
   double rslt = cblas_dasum(m, X, incx);
   return rslt;
 }
+
+}

+ 3 - 0
plugins/eclblas/daxpy.cpp

@@ -18,6 +18,7 @@
 // Vector add, alpha X  +  Y
 #include "eclblas.hpp"
 
+namespace eclblas {
 
 ECLBLAS_CALL void daxpy(bool & __isAllResult, size32_t & __lenResult,
                         void * & __result, uint32_t n, double alpha,
@@ -33,3 +34,5 @@ ECLBLAS_CALL void daxpy(bool & __isAllResult, size32_t & __lenResult,
   cblas_daxpy(n, alpha, X, incx, Y, incy);
   __result = (void*) result;
 }
+
+}

+ 5 - 1
plugins/eclblas/dgemm.cpp

@@ -20,6 +20,8 @@
 //transpose A and B.  beta defaults to zero, and C to empty
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void dgemm(bool & __isAllResult, size32_t & __lenResult,
                         void * & __result, bool transposeA, bool transposeB,
                         uint32_t m, uint32_t n, uint32_t k,
@@ -42,4 +44,6 @@ ECLBLAS_CALL void dgemm(bool & __isAllResult, size32_t & __lenResult,
               (const double *) B, ldb,
               beta, result, ldc);
   __result = (void *) result;
-  }
+}
+
+}

+ 4 - 0
plugins/eclblas/dgetf2.cpp

@@ -24,6 +24,8 @@
 #include <math.h>
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void dgetf2(bool & __isAllResult, size32_t & __lenResult,
                          void * & __result, uint32_t m, uint32_t n,
                          bool isAllA, size32_t lenA, const void* a) {
@@ -59,3 +61,5 @@ ECLBLAS_CALL void dgetf2(bool & __isAllResult, size32_t & __lenResult,
   }
   __result = (void*) new_a;
 }
+
+}

+ 4 - 0
plugins/eclblas/dpotf2.cpp

@@ -26,6 +26,8 @@
 #include "eclblas.hpp"
 #include <math.h>
 
+namespace eclblas {
+
 ECLBLAS_CALL void dpotf2(bool & __isAllResult, size32_t & __lenResult,
                          void * & __result, uint8_t tri, uint32_t r,
                          bool isAllA, size32_t lenA, const void * A,
@@ -74,3 +76,5 @@ ECLBLAS_CALL void dpotf2(bool & __isAllResult, size32_t & __lenResult,
   }
   __result = (void*) new_a;
 }
+
+}

+ 4 - 0
plugins/eclblas/dscal.cpp

@@ -18,6 +18,8 @@
 
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void dscal(bool & __isAllResult, size32_t & __lenResult,
                         void * & __result, uint32_t n, double alpha,
                         bool isAllX, size32_t lenX, const void * x,
@@ -29,3 +31,5 @@ ECLBLAS_CALL void dscal(bool & __isAllResult, size32_t & __lenResult,
   __isAllResult = false;
   __lenResult = lenX;
 }
+
+}

+ 4 - 0
plugins/eclblas/dsyrk.cpp

@@ -19,6 +19,8 @@
 //the update is upper or lower.  C is N by N
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void dsyrk(bool & __isAllResult, size32_t & __lenResult,
                         void * &__result, uint8_t tri, bool transposeA,
                         uint32_t n, uint32_t k, double alpha, bool isAllA,
@@ -45,3 +47,5 @@ ECLBLAS_CALL void dsyrk(bool & __isAllResult, size32_t & __lenResult,
               n, k, alpha, (const double *)a, lda, beta, new_c, n);
   __result = (void*) new_c;
 }
+
+}

+ 4 - 0
plugins/eclblas/dtrsm.cpp

@@ -19,6 +19,8 @@
 //
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void dtrsm(bool & __isAllResult, size32_t & __lenResult,
                         void * & __result, uint8_t side, uint8_t tri,
                         bool transposeA, uint8_t diag, uint32_t m,
@@ -38,3 +40,5 @@ ECLBLAS_CALL void dtrsm(bool & __isAllResult, size32_t & __lenResult,
               m, n, alpha, (const double *)a, lda, new_b, ldb);
   __result = (void*) new_b;
 }
+
+}

+ 43 - 46
plugins/eclblas/eclblas.hpp

@@ -45,57 +45,54 @@ extern "C" {
 #include <cblas.h>
 }
 
-extern "C" {
-  ECLBLAS_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
-
-  ECLBLAS_CALL double dasum(uint32_t m, bool isAllX, size32_t lenX, const void * x,
-                            uint32_t incx, uint32_t skipped);
-
-  ECLBLAS_CALL void daxpy(bool & __isAllResult, size32_t & __lenResult,
-                          void * & __result, uint32_t n, double alpha,
-                          bool isAllX, size32_t lenX, const void * x, uint32_t incx,
-                          bool isAllY, size32_t lenY, const void * y, uint32_t incy,
-                          uint32_t x_skipped, uint32_t y_skipped);
-
-  ECLBLAS_CALL void dgemm(bool & __isAll_Result, size32_t & __lenResult,
-                          void * & __result, bool transposeA, bool transposeB,
-                          uint32_t m, uint32_t n, uint32_t k,
-                          double alpha, bool isAllA, size32_t lenA, const void* A,
-                          bool isAllB, size32_t lenB, const void* B, double beta,
-                          bool isAllC, size32_t lenC, const void* C);
-
-  ECLBLAS_CALL void dgetf2(bool & __isAllResult, size32_t & __lenResult,
-                           void * & result, uint32_t m, uint32_t n,
-                           bool isAllA, size32_t lenA, const void* a);
-
-  ECLBLAS_CALL void dpotf2(bool & __isAllResult, size32_t & __lenResult,
-                           void * & __result, uint8_t tri, uint32_t r,
-                           bool isAllA, size32_t lenA, const void * A,
-                           bool clear);
-  ECLBLAS_CALL void dscal(bool & __isAllResult, size32_t & __lenResult,
-                          void * & __result, uint32_t n, double alpha,
-                          bool isAllX, size32_t lenX, const void * X,
+extern "C" ECLBLAS_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
+
+namespace eclblas {
+
+ECLBLAS_CALL double dasum(uint32_t m, bool isAllX, size32_t lenX, const void * x,
                           uint32_t incx, uint32_t skipped);
-  ECLBLAS_CALL void dsyrk(bool & __isAllResult, size32_t & __lenResult,
-                          void * &__result, uint8_t tri, bool transposeA,
-                          uint32_t N, uint32_t k, double alpha, bool isAllA,
-                          size32_t lenA, const void * a, double beta,
-                          bool isAllC, size32_t lenC, const void * c,
-                          bool clear);
-  ECLBLAS_CALL void dtrsm(bool & __isAllResult, size32_t & __lenResult,
+ECLBLAS_CALL void daxpy(bool & __isAllResult, size32_t & __lenResult,
+                        void * & __result, uint32_t n, double alpha,
+                        bool isAllX, size32_t lenX, const void * x, uint32_t incx,
+                        bool isAllY, size32_t lenY, const void * y, uint32_t incy,
+                        uint32_t x_skipped, uint32_t y_skipped);
+ECLBLAS_CALL void dgemm(bool & __isAll_Result, size32_t & __lenResult,
+                        void * & __result, bool transposeA, bool transposeB,
+                        uint32_t m, uint32_t n, uint32_t k,
+                        double alpha, bool isAllA, size32_t lenA, const void* A,
+                        bool isAllB, size32_t lenB, const void* B, double beta,
+                        bool isAllC, size32_t lenC, const void* C);
+ECLBLAS_CALL void dgetf2(bool & __isAllResult, size32_t & __lenResult,
+                         void * & result, uint32_t m, uint32_t n,
+                         bool isAllA, size32_t lenA, const void* a);
+ECLBLAS_CALL void dpotf2(bool & __isAllResult, size32_t & __lenResult,
+                         void * & __result, uint8_t tri, uint32_t r,
+                         bool isAllA, size32_t lenA, const void * A,
+                         bool clear);
+ECLBLAS_CALL void dscal(bool & __isAllResult, size32_t & __lenResult,
+                        void * & __result, uint32_t n, double alpha,
+                        bool isAllX, size32_t lenX, const void * X,
+                        uint32_t incx, uint32_t skipped);
+ECLBLAS_CALL void dsyrk(bool & __isAllResult, size32_t & __lenResult,
+                        void * &__result, uint8_t tri, bool transposeA,
+                        uint32_t N, uint32_t k, double alpha, bool isAllA,
+                        size32_t lenA, const void * a, double beta,
+                        bool isAllC, size32_t lenC, const void * c,
+                        bool clear);
+ECLBLAS_CALL void dtrsm(bool & __isAllResult, size32_t & __lenResult,
                         void * & __result, uint8_t side, uint8_t tri,
                         bool transposeA, uint8_t diag, uint32_t m,
                         uint32_t n, uint32_t lda, double alpha, bool isAllA,
                         size32_t lenA, const void * a, bool isAllB, size32_t lenB,
                         const void * b);
-  ECLBLAS_CALL void Extract_Tri(bool & __isAllResult, size32_t & __lenResult,
-                                void * & __result, uint32_t m, uint32_t n, uint8_t tri,
-                                uint8_t dt, bool isAllA, size32_t lenA,
-                                const void * a);
-  ECLBLAS_CALL void make_diag(bool & __isAllResult, size32_t & __lenResult,
-                              void * & __result, size32_t m, double v,
-                              bool isAllX, size32_t lenX, const void * x);
-}
-
+ECLBLAS_CALL void extract_tri(bool & __isAllResult, size32_t & __lenResult,
+                              void * & __result, uint32_t m, uint32_t n, uint8_t tri,
+                              uint8_t dt, bool isAllA, size32_t lenA,
+                              const void * a);
+ECLBLAS_CALL void make_diag(bool & __isAllResult, size32_t & __lenResult,
+                            void * & __result, size32_t m, double v,
+                            bool isAllX, size32_t lenX, const void * x);
+
+} // namespace
 
 #endif

+ 5 - 1
plugins/eclblas/Extract_Tri.cpp

@@ -19,7 +19,9 @@
 //
 #include "eclblas.hpp"
 
-ECLBLAS_CALL void Extract_Tri(bool & __isAllResult, size32_t & __lenResult,
+namespace eclblas {
+
+ECLBLAS_CALL void extract_tri(bool & __isAllResult, size32_t & __lenResult,
                               void * & __result, uint32_t m, uint32_t n, uint8_t tri,
                               uint8_t dt, bool isAllA, size32_t lenA, const void * a){
   int cells = m * n;
@@ -44,3 +46,5 @@ ECLBLAS_CALL void Extract_Tri(bool & __isAllResult, size32_t & __lenResult,
   }
   __result = (void*) new_a;
 }
+
+}

+ 12 - 16
plugins/eclblas/lib_eclblas.ecllib

@@ -23,32 +23,28 @@ EXPORT blas_Diagonal     := ENUM(UNSIGNED1, UnitTri=1, NotUnitTri=2);
 EXPORT blas_Side         := ENUM(UNSIGNED1, Ax=1, xA=2);
 
 
-EXPORT eclblas := SERVICE : plugin('eclblasplugin'), library('eclblas')
+EXPORT eclblas := SERVICE : plugin('eclblasplugin'), library('eclblas'), namespace('eclblas'), CPP, PURE
   REAL8 dasum(blas_dimension_t m, const blas_matrix_t x, blas_dimension_t incx,
-                blas_dimension_t skipped=0) : C, PURE;
+                blas_dimension_t skipped=0);
   SET OF REAL8 daxpy(blas_dimension_t N, blas_value_t alpha, const blas_matrix_t X,
                  blas_dimension_t incX, blas_matrix_t Y, blas_dimension_t incY,
-                 blas_dimension_t x_skipped=0, blas_dimension_t y_skipped=0)
-                 : C, PURE;
+                 blas_dimension_t x_skipped=0, blas_dimension_t y_skipped=0);
   SET OF REAL8 dgemm(BOOLEAN transposeA, BOOLEAN transposeB,
                  blas_dimension_t M, blas_dimension_t N, blas_dimension_t K,
                  blas_value_t alpha, const blas_matrix_t A, const blas_matrix_t B,
-                 blas_value_t beta=0.0, const blas_matrix_t C=[]) : C, PURE;
-  SET OF REAL8 dgetf2(blas_dimension_t m, blas_dimension_t n, const blas_matrix_t a)
-                 : C, PURE;
-  SET OF REAL8 dpotf2(blas_Triangle tri, blas_dimension_t r, blas_matrix_t A,
-                  BOOLEAN clear=TRUE) : C, PURE;
+                 blas_value_t beta=0.0, const blas_matrix_t C=[]);
+  SET OF REAL8 dgetf2(blas_dimension_t m, blas_dimension_t n, const blas_matrix_t a);
+  SET OF REAL8 dpotf2(blas_Triangle tri, blas_dimension_t r, blas_matrix_t A, BOOLEAN clear=TRUE);
   SET OF REAL8 dscal(blas_dimension_t N, blas_value_t alpha, const blas_matrix_t X,
-                 blas_dimension_t incX, blas_dimension_t skipped=0) : C, PURE;
+                 blas_dimension_t incX, blas_dimension_t skipped=0);
   SET OF REAL8 dsyrk(blas_Triangle tri, BOOLEAN transposeA, blas_dimension_t N,
                  blas_dimension_t K, blas_value_t alpha, const blas_matrix_t A,
-                 blas_value_t beta, const blas_matrix_t C, BOOLEAN clear=FALSE)
-                 : C, PURE;
+                 blas_value_t beta, const blas_matrix_t C, BOOLEAN clear=FALSE);
   SET OF REAL8 dtrsm(blas_Side side, blas_Triangle tri,
                  BOOLEAN transposeA, blas_Diagonal diag,
                  blas_dimension_t M, blas_dimension_t N,  blas_dimension_t lda,
-                 blas_value_t alpha, const blas_matrix_t A, const blas_matrix_t B) : C, PURE;
-  SET OF REAL8 Extract_Tri(blas_dimension_t m, blas_dimension_t n, blas_Triangle tri,
-                       blas_Diagonal dt, const blas_matrix_t a) : C, PURE;
-  SET OF REAL8 make_diag(blas_dimension_t m, blas_value_t v=1.0, const blas_matrix_t X=[]) : C, PURE;
+                 blas_value_t alpha, const blas_matrix_t A, const blas_matrix_t B);
+  SET OF REAL8 extract_tri(blas_dimension_t m, blas_dimension_t n, blas_Triangle tri,
+                       blas_Diagonal dt, const blas_matrix_t a);
+  SET OF REAL8 make_diag(blas_dimension_t m, blas_value_t v=1.0, const blas_matrix_t X=[]);
 END;

+ 4 - 0
plugins/eclblas/make_diag.cpp

@@ -18,6 +18,8 @@
 //If the vector is present the diagonal is the product
 #include "eclblas.hpp"
 
+namespace eclblas {
+
 ECLBLAS_CALL void make_diag(bool & __isAllResult, size32_t & __lenResult,
                             void * & __result, uint32_t m, double v,
                             bool isAllX, size32_t lenX, const void * x) {
@@ -39,3 +41,5 @@ ECLBLAS_CALL void make_diag(bool & __isAllResult, size32_t & __lenResult,
   }
   __result = (void*) diag;
 }
+
+}

+ 3 - 0
plugins/pyembed/CMakeLists.txt

@@ -42,6 +42,9 @@ if(PYEMBED)
             ./../../rtl/include
             ./../../rtl/nbcd
             ./../../common/deftype
+            ./../../common/thorhelper
+            ./../../dali/base
+            ./../../system/mp
             ./../../roxie/roxiemem
             ./../../system/jlib)
 

+ 121 - 9
plugins/pyembed/pyembed.cpp

@@ -29,6 +29,7 @@
 #include "rtlfield_imp.hpp"
 #include "nbcd.hpp"
 #include "roxiemem.hpp"
+#include "enginecontext.hpp"
 
 static const char * compatibleVersions[] = {
     "Python2.7 Embed Helper 1.0.0",
@@ -98,6 +99,19 @@ public:
     inline X **ref()                 { return &ptr; }
 };
 
+
+__declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
+
+static void failx(const char *message, ...)
+{
+    va_list args;
+    va_start(args,message);
+    StringBuffer msg;
+    msg.append("pyembed: ").valist_appendf(message,args);
+    va_end(args);
+    rtlFail(0, msg.str());
+}
+
 // call checkPythonError to throw an exception if Python error state is set
 
 static void checkPythonError()
@@ -109,8 +123,7 @@ static void checkPythonError()
         PyErr_Fetch(pType.ref(), pValue.ref(), pTraceBack.ref());
         OwnedPyObject valStr = PyObject_Str(pValue);
         PyErr_Clear();
-        VStringBuffer errMessage("pyembed: %s", PyString_AsString(valStr));
-        rtlFail(0, errMessage.str());
+        failx("pyembed: %s", PyString_AsString(valStr));
     }
 }
 
@@ -271,6 +284,7 @@ public:
         const char *argv[] = { nullptr };
         PySys_SetArgvEx(0, (char **) argv, 0);
         PyEval_InitThreads();
+        preservedScopes.setown(PyDict_New());
         tstate = PyEval_SaveThread();
         initialized = true;
     }
@@ -286,6 +300,7 @@ public:
             namedtuple.clear();
             namedtupleTypes.clear();
             compiledScripts.clear();
+            preservedScopes.clear();
             Py_Finalize();
         }
         if (pythonLibrary)
@@ -401,6 +416,31 @@ public:
         }
         return code.getClear();
     }
+    PyObject *getNamedScope(const char *key, bool &isNew)
+    {
+        if (!preservedScopes)
+            preservedScopes.setown(PyDict_New());
+        OwnedPyObject scope;
+        scope.set(PyDict_GetItemString(preservedScopes, key));
+        if (!scope)
+        {
+            scope.setown(PyDict_New());
+            PyDict_SetItemString(preservedScopes, key, scope);
+            isNew = true;
+        }
+        else
+            isNew = false;
+        return scope.getClear();
+    }
+    void releaseNamedScope(const char *key)
+    {
+        if (preservedScopes)
+        {
+            PyDict_DelItemString(preservedScopes, key);
+            PyErr_Clear();  // Should be present, but ignore the error if it is not
+        }
+    }
+    static void unregister(const char *key);
 protected:
     static StringBuffer &wrapPythonText(StringBuffer &out, const char *in, const char *params)
     {
@@ -421,6 +461,7 @@ protected:
     OwnedPyObject namedtuple;      // collections.namedtuple
     OwnedPyObject namedtupleTypes; // dictionary of return values from namedtuple()
     OwnedPyObject compiledScripts; // dictionary of previously compiled scripts
+    OwnedPyObject preservedScopes; // dictionary of preserved scopes
 } globalState;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -1193,6 +1234,13 @@ private:
     PyThreadState * &state;
 };
 
+void Python27GlobalState::unregister(const char *key)
+{
+    checkThreadContext();
+    GILBlock b(threadContext->threadState);
+    globalState.releaseNamedScope(key);
+}
+
 // A Python function that returns a dataset will return a PythonRowStream object that can be
 // interrogated to return each row of the result in turn
 
@@ -1255,9 +1303,71 @@ public:
     : sharedCtx(_sharedCtx)
     {
         PyEval_RestoreThread(sharedCtx->threadState);
+    }
+
+    void setScopes(ICodeContext *codeCtx, const char *_options)
+    {
         locals.setown(PyDict_New());
-        globals.setown(PyDict_New());
-        PyDict_SetItemString(globals, "__builtins__", PyEval_GetBuiltins());  // required for import to work
+        StringArray options;
+        options.appendList(_options, ",");
+        StringBuffer scopeKey;
+        const char *scopeKey2 = nullptr;
+        bool registerCallback = false;
+        bool wuidScope = false;
+        IEngineContext *engine = nullptr;
+        ForEachItemIn(idx, options)
+        {
+            const char *opt = options.item(idx);
+            const char *val = strchr(opt, '=');
+            if (val)
+            {
+                StringBuffer optName(val-opt, opt);
+                val++;
+                if (strieq(optName, "globalscope"))
+                    scopeKey2 = val;
+                else if (strieq(optName, "persist"))
+                {
+                    if (scopeKey.length())
+                        failx("persist option specified more than once");
+                    if (strieq(val, "global"))
+                        scopeKey.append("global");
+                    else if (strieq(val, "workunit"))
+                    {
+                        engine = codeCtx->queryEngineContext();
+                        wuidScope = true;
+                        if (!engine)
+                            failx("Persist mode 'workunit' not supported here");
+                    }
+                    else if (strieq(val, "query"))
+                    {
+                        engine = codeCtx->queryEngineContext();
+                        wuidScope = false;
+                        if (!engine)
+                            failx("Persist mode 'query' not supported here");
+                    }
+                    else
+                        failx("Unrecognized persist mode %s", val);
+                }
+                else
+                    failx("Unrecognized option %s", optName.str());
+            }
+            else
+                failx("Unrecognized option %s", opt);
+        }
+        if (engine)
+            engine->getQueryId(scopeKey, wuidScope);
+        if (scopeKey2)
+            scopeKey.append(':').append(scopeKey2);
+        if (scopeKey.length())
+        {
+            bool isNew;
+            globals.setown(globalState.getNamedScope(scopeKey, isNew));
+            if (isNew && engine)
+                engine->onTermination(Python27GlobalState::unregister, scopeKey.str(), wuidScope);
+        }
+        else
+            globals.setown(PyDict_New());
+        PyDict_SetItemString(globals, "__builtins__",  PyEval_GetBuiltins());  // required for import to work
     }
     ~Python27EmbedContextBase()
     {
@@ -1491,7 +1601,7 @@ protected:
 class Python27EmbedScriptContext : public Python27EmbedContextBase
 {
 public:
-    Python27EmbedScriptContext(PythonThreadContext *_sharedCtx, const char *options)
+    Python27EmbedScriptContext(PythonThreadContext *_sharedCtx)
     : Python27EmbedContextBase(_sharedCtx)
     {
     }
@@ -1509,7 +1619,6 @@ public:
     {
     }
 
-
     virtual void importFunction(size32_t lenChars, const char *text)
     {
         throwUnexpected();
@@ -1549,7 +1658,7 @@ protected:
 class Python27EmbedImportContext : public Python27EmbedContextBase
 {
 public:
-    Python27EmbedImportContext(PythonThreadContext *_sharedCtx, const char *options)
+    Python27EmbedImportContext(PythonThreadContext *_sharedCtx)
     : Python27EmbedContextBase(_sharedCtx)
     {
         argcount = 0;
@@ -1605,10 +1714,13 @@ public:
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     {
         checkThreadContext();
+        Owned<Python27EmbedContextBase> ret;
         if (flags & EFimport)
-            return new Python27EmbedImportContext(threadContext, options);
+            ret.setown(new Python27EmbedImportContext(threadContext));
         else
-            return new Python27EmbedScriptContext(threadContext, options);
+            ret.setown(new Python27EmbedScriptContext(threadContext));
+        ret->setScopes(ctx, options);
+        return ret.getClear();
     }
     virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
     {

+ 6 - 0
plugins/timelib/timelib.cpp

@@ -667,6 +667,9 @@ TIMELIB_API unsigned int TIMELIB_CALL tlAdjustTime(unsigned int time, short hour
     unsigned int    result = 0;
 
     memset(&timeInfo, 0, sizeof(timeInfo));
+#ifdef __APPLE__
+    timeInfo.tm_year = 2;
+#endif
 
     tlInsertTimeIntoTimeStruct(&timeInfo, time);
 
@@ -689,6 +692,9 @@ TIMELIB_API unsigned int TIMELIB_CALL tlAdjustTimeBySeconds(unsigned int time, i
     unsigned int    result = 0;
 
     memset(&timeInfo, 0, sizeof(timeInfo));
+#ifdef __APPLE__
+    timeInfo.tm_year = 2;
+#endif
 
     tlInsertTimeIntoTimeStruct(&timeInfo, time);
     timeInfo.tm_sec += seconds_delta;

+ 52 - 1
roxie/ccd/ccdcontext.cpp

@@ -36,6 +36,7 @@
 #include "ccdsnmp.hpp"
 #include "ccdstate.hpp"
 #include "roxiehelper.hpp"
+#include "enginecontext.hpp"
 
 using roxiemem::IRowManager;
 
@@ -2602,7 +2603,7 @@ public:
 
 };
 
-class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext
+class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext, implements IEngineContext
 {
     const IQueryFactory *serverQueryFactory = nullptr;
     IHpccProtocolResponse *protocol = nullptr;
@@ -3030,6 +3031,56 @@ public:
     {
         return protocol;
     }
+    virtual IEngineContext *queryEngineContext() { return this; }
+
+    virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
+    {
+        if (num==0)
+            return 0;
+        SocketEndpoint foreignNode;
+        if (_foreignNode && !_foreignNode->isNull())
+            foreignNode.set(*_foreignNode);
+        else
+        {
+            Owned<IRoxieDaliHelper> dali = ::connectToDali();
+            if (!dali)
+                return 0;
+            StringBuffer daliIp;
+            dali->getDaliIp(daliIp);
+            foreignNode.set(daliIp.str());
+        }
+        return ::getGlobalUniqueIds(num, &foreignNode);
+    }
+    virtual bool allowDaliAccess() const
+    {
+        Owned<IRoxieDaliHelper> dali = ::connectToDali();
+        return dali != nullptr;
+    }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        if (workUnit)
+            result.append(workUnit->queryWuid()); // In workunit mode, this works for both shared and non-shared variants
+        else if (isShared)
+            result.append('Q').append(factory->queryHash());
+        else
+            logctx.getLogPrefix(result);
+        return result;
+    }
+
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
+
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        TerminationCallbackInfo *term(new TerminationCallbackInfo(callback, key));
+        if (isShared)
+            factory->onTermination(term);
+        else
+        {
+            CriticalBlock b(callbacksCrit);
+            callbacks.append(*term);
+        }
+    }
 
     virtual void setResultBool(const char *name, unsigned sequence, bool value)
     {

+ 7 - 0
roxie/ccd/ccdquery.cpp

@@ -500,6 +500,8 @@ protected:
     static SpinLock queriesCrit;
     static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
 
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
 public:
     static CriticalSection queryCreateLock;
 
@@ -1536,6 +1538,11 @@ protected:
         }
     }
 
+    virtual void onTermination(TerminationCallbackInfo *info) const override
+    {
+        CriticalBlock b(callbacksCrit);
+        callbacks.append(*info);
+    }
 };
 
 CriticalSection CQueryFactory::queryCreateLock;

+ 2 - 0
roxie/ccd/ccdquery.hpp

@@ -32,6 +32,7 @@
 #include "thorcommon.ipp"
 #include "roxierow.hpp"
 #include "package.h"
+#include "enginecontext.hpp"
 
 class TranslatorArray : public CInterface
 {
@@ -177,6 +178,7 @@ interface IQueryFactory : extends IInterface
     virtual void getQueryInfo(StringBuffer &result, bool full, IArrayOf<IQueryFactory> *slaveQueries,const IRoxieContextLogger &logctx) const = 0;
     virtual bool isDynamic() const = 0;
     virtual void checkSuspended() const = 0;
+    virtual void onTermination(TerminationCallbackInfo *info) const= 0;
 };
 
 class ActivityArray : public CInterface

+ 6 - 6
system/jlib/jsocket.cpp

@@ -6351,22 +6351,22 @@ int wait_multiple(bool isRead,               //IN   true if wait read, false it
 #ifdef _DEBUG
         DBGLOG("%s",dbgSB.str());
 #endif
-#ifndef _USE_SELECT
-        delete [] fds;
-#endif
     }
     else if (res == SOCKET_ERROR)
     {
         res = 0; // dont return negative on failure
         int err = ERRNO();
-#ifndef _USE_SELECT
-        delete [] fds;
-#endif
         if (err != JSE_INTR)
         {
+#ifndef _USE_SELECT
+            delete [] fds;
+#endif
             throw MakeStringException(-1,"wait_multiple::select/poll error %d", err);
         }
     }
+#ifndef _USE_SELECT
+    delete [] fds;
+#endif
     return res;
 }
 

+ 29 - 0
testing/regress/ecl/embedpy.ecl

@@ -128,3 +128,32 @@ s2b :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (COUNTER/2)+1));
 s1c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING) COUNTER + '1')));
 s2c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING)(COUNTER/2) + '1')));
  SUM(NOFOLD(s1c + s2c), a);
+
+unsigned persistscope1(unsigned a) := EMBED(Python: globalscope('yo'),persist('workunit'))
+  global b
+  b = a + 1
+  return a
+ENDEMBED;
+
+unsigned usepersistscope1(unsigned a) := EMBED(Python: globalscope('yo'),persist('workunit'))
+  global b
+  return a + b
+ENDEMBED;
+
+unsigned persistscope2(unsigned a) := EMBED(Python: globalscope('yi'),persist('workunit'))
+  global b
+  b = a + 11
+  return a
+ENDEMBED;
+
+unsigned usepersistscope2(unsigned a) := EMBED(Python: globalscope('yi'),persist('workunit'))
+  global b
+  return a + b
+ENDEMBED;
+
+sequential(
+  persistscope1(1),
+  persistscope2(1),
+  usepersistscope1(1),
+  usepersistscope2(1)
+);

+ 12 - 0
testing/regress/ecl/key/embedpy.xml

@@ -67,3 +67,15 @@
 <Dataset name='Result 23'>
  <Row><Result_23>328126500000</Result_23></Row>
 </Dataset>
+<Dataset name='Result 24'>
+ <Row><Result_24>1</Result_24></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><Result_25>1</Result_25></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>3</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><Result_27>13</Result_27></Row>
+</Dataset>

+ 12 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1322,6 +1322,8 @@ class CThorCodeContextSlave : public CThorCodeContextBase, implements IEngineCon
 {
     mptag_t mptag;
     Owned<IDistributedFileTransaction> superfiletransaction;
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
 
     void invalidSetResult(const char * name, unsigned seq)
     {
@@ -1433,6 +1435,16 @@ public:
         // NB. includes access to foreign Dalis.
         return jobChannel.queryJob().getOptBool("slaveDaliClient");
     }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        return result.append(jobChannel.queryJob().queryWuid());
+    }
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        TerminationCallbackInfo *term(new TerminationCallbackInfo(callback, key));
+        CriticalBlock b(callbacksCrit);
+        callbacks.append(*term);
+    }
 };
 
 class CThorCodeContextSlaveSharedMem : public CThorCodeContextSlave