Browse Source

HPCC-16625 Add options to control lifetime of global variables in embedded Python

See Jira ticket for more details.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 years ago
parent
commit
38f8a46131

+ 17 - 0
common/thorhelper/enginecontext.hpp

@@ -21,10 +21,27 @@
 #include "jsocket.hpp"
 #include "dacoven.hpp"
 
+typedef void (* QueryTermCallback)(const char *queryId);
+
+class TerminationCallbackInfo : public CInterface
+{
+public:
+    TerminationCallbackInfo(QueryTermCallback _callback, const char *_id) : callback(_callback), id(_id) {}
+    ~TerminationCallbackInfo()
+    {
+        callback(id);
+    }
+protected:
+    QueryTermCallback callback;
+    StringAttr id;
+};
+
 interface IEngineContext
 {
     virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode) = 0;
     virtual bool allowDaliAccess() const = 0;
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const = 0;
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const = 0;
 };
 
 #endif // ENGINECONTEXT_HPP

+ 33 - 2
ecl/eclagent/eclagent.ipp

@@ -33,6 +33,7 @@
 #include <stdexcept> 
 #include "thorplugin.hpp"
 #include "thorcommon.hpp"
+#include "enginecontext.hpp"
 
 #define MAX_EDGEDATA_LENGTH 30000
 #define MAX_HEX_SIZE 500
@@ -329,7 +330,7 @@ public:
 
 
 class CHThorDebugContext;
-class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, public CInterface
+class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, implements IEngineContext, public CInterface
 {
 private:
     friend class EclAgentWorkflowMachine;
@@ -503,7 +504,7 @@ public:
     virtual IOrderedOutputSerializer * queryOutputSerializer() { return outputSerializer; }
     virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace);
     virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace);
-    virtual IEngineContext *queryEngineContext() { return NULL; }
+    virtual IEngineContext *queryEngineContext() { return this; }
     virtual char *getDaliServers();
 
     unsigned __int64 queryStopAfter() { return stopAfter; }
@@ -512,6 +513,36 @@ public:
     {
         return queryNullSectionTimer();
     }
+// IEngineContext
+    virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
+    {
+        if (num==0)
+            return 0;
+        SocketEndpoint foreignNode;
+        if (_foreignNode && !_foreignNode->isNull())
+            foreignNode.set(*_foreignNode);
+        else
+        {
+            const char *dali = getDaliServers();
+            if (!dali)
+                return 0;
+            foreignNode.set(dali);
+            free((char *) dali);
+        }
+        return ::getGlobalUniqueIds(num, &foreignNode);
+    }
+    virtual bool allowDaliAccess() const  { return true; }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        result.append("workunit"); // No distinction between global, workunit and query scopes for eclagent
+        return result;
+    }
+
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        // No need to unregister, since scope lasts until exe terminates
+    }
+
 
 //New workflow interface
     virtual void setWorkflowCondition(bool value) { if(workflow) workflow->setCondition(value); }

+ 3 - 0
plugins/pyembed/CMakeLists.txt

@@ -42,6 +42,9 @@ if(PYEMBED)
             ./../../rtl/include
             ./../../rtl/nbcd
             ./../../common/deftype
+            ./../../common/thorhelper
+            ./../../dali/base
+            ./../../system/mp
             ./../../roxie/roxiemem
             ./../../system/jlib)
 

+ 121 - 9
plugins/pyembed/pyembed.cpp

@@ -29,6 +29,7 @@
 #include "rtlfield_imp.hpp"
 #include "nbcd.hpp"
 #include "roxiemem.hpp"
+#include "enginecontext.hpp"
 
 #ifdef _WIN32
 #define EXPORT __declspec(dllexport)
@@ -104,6 +105,19 @@ public:
     inline X **ref()                 { return &ptr; }
 };
 
+
+__declspec(noreturn) static void failx(const char *msg, ...) __attribute__((format(printf, 1, 2), noreturn));
+
+static void failx(const char *message, ...)
+{
+    va_list args;
+    va_start(args,message);
+    StringBuffer msg;
+    msg.append("pyembed: ").valist_appendf(message,args);
+    va_end(args);
+    rtlFail(0, msg.str());
+}
+
 // call checkPythonError to throw an exception if Python error state is set
 
 static void checkPythonError()
@@ -115,8 +129,7 @@ static void checkPythonError()
         PyErr_Fetch(pType.ref(), pValue.ref(), pTraceBack.ref());
         OwnedPyObject valStr = PyObject_Str(pValue);
         PyErr_Clear();
-        VStringBuffer errMessage("pyembed: %s", PyString_AsString(valStr));
-        rtlFail(0, errMessage.str());
+        failx("pyembed: %s", PyString_AsString(valStr));
     }
 }
 
@@ -277,6 +290,7 @@ public:
         const char *argv[] = { nullptr };
         PySys_SetArgvEx(0, (char **) argv, 0);
         PyEval_InitThreads();
+        preservedScopes.setown(PyDict_New());
         tstate = PyEval_SaveThread();
         initialized = true;
     }
@@ -292,6 +306,7 @@ public:
             namedtuple.clear();
             namedtupleTypes.clear();
             compiledScripts.clear();
+            preservedScopes.clear();
             Py_Finalize();
         }
         if (pythonLibrary)
@@ -407,6 +422,31 @@ public:
         }
         return code.getClear();
     }
+    PyObject *getNamedScope(const char *key, bool &isNew)
+    {
+        if (!preservedScopes)
+            preservedScopes.setown(PyDict_New());
+        OwnedPyObject scope;
+        scope.set(PyDict_GetItemString(preservedScopes, key));
+        if (!scope)
+        {
+            scope.setown(PyDict_New());
+            PyDict_SetItemString(preservedScopes, key, scope);
+            isNew = true;
+        }
+        else
+            isNew = false;
+        return scope.getClear();
+    }
+    void releaseNamedScope(const char *key)
+    {
+        if (preservedScopes)
+        {
+            PyDict_DelItemString(preservedScopes, key);
+            PyErr_Clear();  // Should be present, but ignore the error if it is not
+        }
+    }
+    static void unregister(const char *key);
 protected:
     static StringBuffer &wrapPythonText(StringBuffer &out, const char *in, const char *params)
     {
@@ -427,6 +467,7 @@ protected:
     OwnedPyObject namedtuple;      // collections.namedtuple
     OwnedPyObject namedtupleTypes; // dictionary of return values from namedtuple()
     OwnedPyObject compiledScripts; // dictionary of previously compiled scripts
+    OwnedPyObject preservedScopes; // dictionary of preserved scopes
 } globalState;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -1199,6 +1240,13 @@ private:
     PyThreadState * &state;
 };
 
+void Python27GlobalState::unregister(const char *key)
+{
+    checkThreadContext();
+    GILBlock b(threadContext->threadState);
+    globalState.releaseNamedScope(key);
+}
+
 // A Python function that returns a dataset will return a PythonRowStream object that can be
 // interrogated to return each row of the result in turn
 
@@ -1261,9 +1309,71 @@ public:
     : sharedCtx(_sharedCtx)
     {
         PyEval_RestoreThread(sharedCtx->threadState);
+    }
+
+    void setScopes(ICodeContext *codeCtx, const char *_options)
+    {
         locals.setown(PyDict_New());
-        globals.setown(PyDict_New());
-        PyDict_SetItemString(globals, "__builtins__", PyEval_GetBuiltins());  // required for import to work
+        StringArray options;
+        options.appendList(_options, ",");
+        StringBuffer scopeKey;
+        const char *scopeKey2 = nullptr;
+        bool registerCallback = false;
+        bool wuidScope = false;
+        IEngineContext *engine = nullptr;
+        ForEachItemIn(idx, options)
+        {
+            const char *opt = options.item(idx);
+            const char *val = strchr(opt, '=');
+            if (val)
+            {
+                StringBuffer optName(val-opt, opt);
+                val++;
+                if (strieq(optName, "globalscope"))
+                    scopeKey2 = val;
+                else if (strieq(optName, "persist"))
+                {
+                    if (scopeKey.length())
+                        failx("persist option specified more than once");
+                    if (strieq(val, "global"))
+                        scopeKey.append("global");
+                    else if (strieq(val, "workunit"))
+                    {
+                        engine = codeCtx->queryEngineContext();
+                        wuidScope = true;
+                        if (!engine)
+                            failx("Persist mode 'workunit' not supported here");
+                    }
+                    else if (strieq(val, "query"))
+                    {
+                        engine = codeCtx->queryEngineContext();
+                        wuidScope = false;
+                        if (!engine)
+                            failx("Persist mode 'query' not supported here");
+                    }
+                    else
+                        failx("Unrecognized persist mode %s", val);
+                }
+                else
+                    failx("Unrecognized option %s", optName.str());
+            }
+            else
+                failx("Unrecognized option %s", opt);
+        }
+        if (engine)
+            engine->getQueryId(scopeKey, wuidScope);
+        if (scopeKey2)
+            scopeKey.append(':').append(scopeKey2);
+        if (scopeKey.length())
+        {
+            bool isNew;
+            globals.setown(globalState.getNamedScope(scopeKey, isNew));
+            if (isNew && engine)
+                engine->onTermination(Python27GlobalState::unregister, scopeKey.str(), wuidScope);
+        }
+        else
+            globals.setown(PyDict_New());
+        PyDict_SetItemString(globals, "__builtins__",  PyEval_GetBuiltins());  // required for import to work
     }
     ~Python27EmbedContextBase()
     {
@@ -1497,7 +1607,7 @@ protected:
 class Python27EmbedScriptContext : public Python27EmbedContextBase
 {
 public:
-    Python27EmbedScriptContext(PythonThreadContext *_sharedCtx, const char *options)
+    Python27EmbedScriptContext(PythonThreadContext *_sharedCtx)
     : Python27EmbedContextBase(_sharedCtx)
     {
     }
@@ -1515,7 +1625,6 @@ public:
     {
     }
 
-
     virtual void importFunction(size32_t lenChars, const char *text)
     {
         throwUnexpected();
@@ -1555,7 +1664,7 @@ protected:
 class Python27EmbedImportContext : public Python27EmbedContextBase
 {
 public:
-    Python27EmbedImportContext(PythonThreadContext *_sharedCtx, const char *options)
+    Python27EmbedImportContext(PythonThreadContext *_sharedCtx)
     : Python27EmbedContextBase(_sharedCtx)
     {
         argcount = 0;
@@ -1611,10 +1720,13 @@ public:
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     {
         checkThreadContext();
+        Owned<Python27EmbedContextBase> ret;
         if (flags & EFimport)
-            return new Python27EmbedImportContext(threadContext, options);
+            ret.setown(new Python27EmbedImportContext(threadContext));
         else
-            return new Python27EmbedScriptContext(threadContext, options);
+            ret.setown(new Python27EmbedScriptContext(threadContext));
+        ret->setScopes(ctx, options);
+        return ret.getClear();
     }
     virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options)
     {

+ 52 - 1
roxie/ccd/ccdcontext.cpp

@@ -36,6 +36,7 @@
 #include "ccdsnmp.hpp"
 #include "ccdstate.hpp"
 #include "roxiehelper.hpp"
+#include "enginecontext.hpp"
 
 using roxiemem::IRowManager;
 
@@ -2602,7 +2603,7 @@ public:
 
 };
 
-class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext
+class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext, implements IEngineContext
 {
     const IQueryFactory *serverQueryFactory = nullptr;
     IHpccProtocolResponse *protocol = nullptr;
@@ -3030,6 +3031,56 @@ public:
     {
         return protocol;
     }
+    virtual IEngineContext *queryEngineContext() { return this; }
+
+    virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
+    {
+        if (num==0)
+            return 0;
+        SocketEndpoint foreignNode;
+        if (_foreignNode && !_foreignNode->isNull())
+            foreignNode.set(*_foreignNode);
+        else
+        {
+            Owned<IRoxieDaliHelper> dali = ::connectToDali();
+            if (!dali)
+                return 0;
+            StringBuffer daliIp;
+            dali->getDaliIp(daliIp);
+            foreignNode.set(daliIp.str());
+        }
+        return ::getGlobalUniqueIds(num, &foreignNode);
+    }
+    virtual bool allowDaliAccess() const
+    {
+        Owned<IRoxieDaliHelper> dali = ::connectToDali();
+        return dali != nullptr;
+    }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        if (workUnit)
+            result.append(workUnit->queryWuid()); // In workunit mode, this works for both shared and non-shared variants
+        else if (isShared)
+            result.append('Q').append(factory->queryHash());
+        else
+            logctx.getLogPrefix(result);
+        return result;
+    }
+
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
+
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        TerminationCallbackInfo *term(new TerminationCallbackInfo(callback, key));
+        if (isShared)
+            factory->onTermination(term);
+        else
+        {
+            CriticalBlock b(callbacksCrit);
+            callbacks.append(*term);
+        }
+    }
 
     virtual void setResultBool(const char *name, unsigned sequence, bool value)
     {

+ 7 - 0
roxie/ccd/ccdquery.cpp

@@ -500,6 +500,8 @@ protected:
     static SpinLock queriesCrit;
     static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
 
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
 public:
     static CriticalSection queryCreateLock;
 
@@ -1536,6 +1538,11 @@ protected:
         }
     }
 
+    virtual void onTermination(TerminationCallbackInfo *info) const override
+    {
+        CriticalBlock b(callbacksCrit);
+        callbacks.append(*info);
+    }
 };
 
 CriticalSection CQueryFactory::queryCreateLock;

+ 2 - 0
roxie/ccd/ccdquery.hpp

@@ -32,6 +32,7 @@
 #include "thorcommon.ipp"
 #include "roxierow.hpp"
 #include "package.h"
+#include "enginecontext.hpp"
 
 class TranslatorArray : public CInterface
 {
@@ -177,6 +178,7 @@ interface IQueryFactory : extends IInterface
     virtual void getQueryInfo(StringBuffer &result, bool full, IArrayOf<IQueryFactory> *slaveQueries,const IRoxieContextLogger &logctx) const = 0;
     virtual bool isDynamic() const = 0;
     virtual void checkSuspended() const = 0;
+    virtual void onTermination(TerminationCallbackInfo *info) const= 0;
 };
 
 class ActivityArray : public CInterface

+ 29 - 0
testing/regress/ecl/embedpy.ecl

@@ -128,3 +128,32 @@ s2b :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (COUNTER/2)+1));
 s1c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING) COUNTER + '1')));
 s2c :=DATASET(250000, TRANSFORM({ integer a }, SELF.a := (integer) ((STRING)(COUNTER/2) + '1')));
  SUM(NOFOLD(s1c + s2c), a);
+
+unsigned persistscope1(unsigned a) := EMBED(Python: globalscope('yo'),persist('workunit'))
+  global b
+  b = a + 1
+  return a
+ENDEMBED;
+
+unsigned usepersistscope1(unsigned a) := EMBED(Python: globalscope('yo'),persist('workunit'))
+  global b
+  return a + b
+ENDEMBED;
+
+unsigned persistscope2(unsigned a) := EMBED(Python: globalscope('yi'),persist('workunit'))
+  global b
+  b = a + 11
+  return a
+ENDEMBED;
+
+unsigned usepersistscope2(unsigned a) := EMBED(Python: globalscope('yi'),persist('workunit'))
+  global b
+  return a + b
+ENDEMBED;
+
+sequential(
+  persistscope1(1),
+  persistscope2(1),
+  usepersistscope1(1),
+  usepersistscope2(1)
+);

+ 12 - 0
testing/regress/ecl/key/embedpy.xml

@@ -67,3 +67,15 @@
 <Dataset name='Result 23'>
  <Row><Result_23>328126500000</Result_23></Row>
 </Dataset>
+<Dataset name='Result 24'>
+ <Row><Result_24>1</Result_24></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><Result_25>1</Result_25></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>3</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><Result_27>13</Result_27></Row>
+</Dataset>

+ 12 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1322,6 +1322,8 @@ class CThorCodeContextSlave : public CThorCodeContextBase, implements IEngineCon
 {
     mptag_t mptag;
     Owned<IDistributedFileTransaction> superfiletransaction;
+    mutable CIArrayOf<TerminationCallbackInfo> callbacks;
+    mutable CriticalSection callbacksCrit;
 
     void invalidSetResult(const char * name, unsigned seq)
     {
@@ -1433,6 +1435,16 @@ public:
         // NB. includes access to foreign Dalis.
         return jobChannel.queryJob().getOptBool("slaveDaliClient");
     }
+    virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
+    {
+        return result.append(jobChannel.queryJob().queryWuid());
+    }
+    virtual void onTermination(QueryTermCallback callback, const char *key, bool isShared) const
+    {
+        TerminationCallbackInfo *term(new TerminationCallbackInfo(callback, key));
+        CriticalBlock b(callbacksCrit);
+        callbacks.append(*term);
+    }
 };
 
 class CThorCodeContextSlaveSharedMem : public CThorCodeContextSlave