Bläddra i källkod

HPCC-10457 Streamed dataset support for Python

Get the threading right

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 år sedan
förälder
incheckning
4f36c59e95
1 ändrade filer med 66 tillägg och 33 borttagningar
  1. 66 33
      plugins/pyembed/pyembed.cpp

+ 66 - 33
plugins/pyembed/pyembed.cpp

@@ -715,17 +715,78 @@ protected:
     PointerArray iterStack;
 };
 
+// GILBlock ensures the we hold the Python "Global interpreter lock" for the appropriate duration
+
+class GILBlock
+{
+public:
+    GILBlock(PyThreadState * &_state) : state(_state)
+    {
+        PyEval_RestoreThread(state);
+    }
+    ~GILBlock()
+    {
+        state = PyEval_SaveThread();
+    }
+private:
+    PyThreadState * &state;
+};
+
+// A Python function that returns a dataset will return a PythonRowStream object that can be
+// interrogated to return each row of the result in turn
+
+class PythonRowStream : public CInterfaceOf<IRowStream>
+{
+public:
+    PythonRowStream(PythonThreadContext *_sharedCtx, PyObject *result, IEngineRowAllocator *_resultAllocator)
+    : sharedCtx(_sharedCtx), resultIterator(NULL)
+    {
+        // NOTE - the caller should already have the GIL lock before creating me
+        if (!result || result == Py_None)
+            typeError("list or generator", NULL);
+        if (!PyList_Check(result) && !PyGen_Check(result))  // MORE - should I remove this check, and just say if it is iterable, it's good?
+            typeError("list or generator", NULL);
+        resultIterator.setown(PyObject_GetIter(result));
+        checkPythonError();
+        resultAllocator.set(_resultAllocator);
+    }
+    virtual const void *nextRow()
+    {
+        GILBlock b(sharedCtx->threadState);
+        if (!resultIterator)
+            return NULL;
+        OwnedPyObject row = PyIter_Next(resultIterator);
+        if (!row)
+            return NULL;
+        RtlDynamicRowBuilder rowBuilder(resultAllocator);
+        PythonRowBuilder pyRowBuilder(row);
+        const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
+        assertex(typeInfo);
+        RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
+        size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, pyRowBuilder);
+        return rowBuilder.finalizeRowClear(len);
+    }
+    virtual void stop()
+    {
+        resultAllocator.clear();
+        resultIterator.clear();
+    }
+
+protected:
+    PythonThreadContext *sharedCtx;
+    Linked<IEngineRowAllocator> resultAllocator;
+    OwnedPyObject resultIterator;
+};
+
 // Each call to a Python function will use a new Python27EmbedFunctionContext object
 // This takes care of ensuring that the Python GIL is locked while we are executing python code,
 // and released when we are not
 
-class Python27EmbedContextBase : public CInterface, implements IEmbedFunctionContext, implements IRowStream
+class Python27EmbedContextBase : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
-    IMPLEMENT_IINTERFACE;
-
     Python27EmbedContextBase(PythonThreadContext *_sharedCtx)
-    : sharedCtx(_sharedCtx), resultIterator(NULL)
+    : sharedCtx(_sharedCtx)
     {
         PyEval_RestoreThread(sharedCtx->threadState);
         locals.setown(PyDict_New());
@@ -780,32 +841,7 @@ public:
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
-        assertex(result && result != Py_None);
-        if (!PyList_Check(result) && !PyGen_Check(result))  // MORE - should I remove this check, and just say if it is iterable, it's good?
-            typeError("list or generator", NULL);
-        resultIterator.setown(PyObject_GetIter(result));
-        checkPythonError();
-        resultAllocator.set(_resultAllocator);
-        return LINK(this);
-    }
-    virtual const void *nextRow()
-    {
-        assertex(resultAllocator);
-        assertex(resultIterator);
-        OwnedPyObject row = PyIter_Next(resultIterator);
-        if (!row)
-            return NULL;
-        RtlDynamicRowBuilder rowBuilder(resultAllocator);
-        PythonRowBuilder pyRowBuilder(row);
-        const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
-        assertex(typeInfo);
-        RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
-        size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, pyRowBuilder);
-        return rowBuilder.finalizeRowClear(len);
-    }
-    virtual void stop()
-    {
-        resultAllocator.clear();
+        return new PythonRowStream(sharedCtx, result, _resultAllocator);
     }
 
     virtual void bindBooleanParam(const char *name, bool val)
@@ -950,9 +986,6 @@ protected:
     OwnedPyObject globals;
     OwnedPyObject result;
     OwnedPyObject script;
-
-    Linked<IEngineRowAllocator> resultAllocator;
-    OwnedPyObject resultIterator;
 };
 
 class Python27EmbedScriptContext : public Python27EmbedContextBase