Browse Source

HPCC-13708 Streamed datasets from Python not thread-safe

There are two issues - we need to ensure that we get the Python
interpreter lock before releasing the iterator object, and we should not
assume that we are operating on the same thread as the one that originally
created the iterator.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
27aa151881
3 changed files with 40 additions and 13 deletions
  1. 28 12
      plugins/pyembed/pyembed.cpp
  2. 7 1
      testing/regress/ecl/key/streame.xml
  3. 5 0
      testing/regress/ecl/streame.ecl

+ 28 - 12
plugins/pyembed/pyembed.cpp

@@ -454,6 +454,17 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     return true;
 }
 
+static void checkThreadContext()
+{
+    if (!threadContext)
+    {
+        if (!globalState.isInitialized())
+            rtlFail(0, "Python not initialized");
+        threadContext = new PythonThreadContext;
+        threadHookChain = addThreadTermFunc(releaseContext);
+    }
+}
+
 PyObject *PythonThreadContext::getNamedTupleType(const RtlTypeInfo *type)
 {
     if (!lru || (type!=lrutype))
@@ -1178,8 +1189,8 @@ private:
 class PythonRowStream : public CInterfaceOf<IRowStream>
 {
 public:
-    PythonRowStream(PythonThreadContext *_sharedCtx, PyObject *result, IEngineRowAllocator *_resultAllocator)
-    : sharedCtx(_sharedCtx), resultIterator(NULL)
+    PythonRowStream(PyObject *result, IEngineRowAllocator *_resultAllocator)
+    : resultIterator(NULL)
     {
         // NOTE - the caller should already have the GIL lock before creating me
         if (!result || result == Py_None)
@@ -1188,9 +1199,19 @@ public:
         checkPythonError();
         resultAllocator.set(_resultAllocator);
     }
+    ~PythonRowStream()
+    {
+        if (resultIterator)
+        {
+            checkThreadContext();
+            GILBlock b(threadContext->threadState);
+            resultIterator.clear();
+        }
+    }
     virtual const void *nextRow()
     {
-        GILBlock b(sharedCtx->threadState);
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         if (!resultIterator)
             return NULL;
         OwnedPyObject row = PyIter_Next(resultIterator);
@@ -1202,12 +1223,13 @@ public:
     }
     virtual void stop()
     {
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         resultAllocator.clear();
         resultIterator.clear();
     }
 
 protected:
-    PythonThreadContext *sharedCtx;
     Linked<IEngineRowAllocator> resultAllocator;
     OwnedPyObject resultIterator;
 };
@@ -1275,7 +1297,7 @@ public:
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
-        return new PythonRowStream(sharedCtx, result, _resultAllocator);
+        return new PythonRowStream(result, _resultAllocator);
     }
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     {
@@ -1544,13 +1566,7 @@ public:
     }
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     {
-        if (!threadContext)
-        {
-            if (!globalState.isInitialized())
-                rtlFail(0, "Python not initialized");
-            threadContext = new PythonThreadContext;
-            threadHookChain = addThreadTermFunc(releaseContext);
-        }
+        checkThreadContext();
         if (flags & EFimport)
             return new Python27EmbedImportContext(threadContext, options);
         else

+ 7 - 1
testing/regress/ecl/key/streame.xml

@@ -15,5 +15,11 @@
  <Row><name>Generate:</name><value>9</value></Row>
 </Dataset>
 <Dataset name='Result 3'>
- <Row><Result_3>Yo</Result_3></Row>
+ <Row><Result_3>500</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>499</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>Yo</Result_5></Row>
 </Dataset>

+ 5 - 0
testing/regress/ecl/streame.ecl

@@ -65,6 +65,11 @@ ENDEMBED;
 output(streamedNames(d'AA', u'là'));
 output (testGenerator(10));
 
+// Test what happens when two threads pull from a generator
+c := testGenerator(1000);
+count(c(value < 500));
+count(c(value > 500));
+
 // Test Python code returning named tuples
 childrec tnamed(string s) := EMBED(Python)
   import collections;