浏览代码

HPCC-10457 Streamed dataset support for Python

Map dataset input parameters to Python generators

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父节点
当前提交
6b9762c852
共有 1 个文件被更改,包括 107 次插入5 次删除
  1. 107 5
      plugins/pyembed/pyembed.cpp

+ 107 - 5
plugins/pyembed/pyembed.cpp

@@ -927,7 +927,7 @@ public:
     }
     PyObject *getTuple(const RtlTypeInfo *type)
     {
-        OwnedPyObject mynamedtupletype = sharedCtx->getNamedTupleType(type);
+        OwnedPyObject mynamedtupletype = sharedCtx ? sharedCtx->getNamedTupleType(type) : globalState.getNamedTupleType(type);
         OwnedPyObject argsTuple = PyList_AsTuple(args);
         OwnedPyObject mynamedtuple = PyObject_CallObject(mynamedtupletype, argsTuple);  // Creates a namedtuple from the supplied tuple
         checkPythonError();
@@ -959,6 +959,110 @@ protected:
 };
 
 
+//----------------------------------------------------------------------
+
+// Wrap an IRowStream into a Python generator
+
+struct ECLDatasetIterator
+{
+    PyObject_HEAD;
+    const RtlTypeInfo *typeInfo;  // Not linked (or linkable)
+    IRowStream * val;  // Linked
+};
+
+PyObject* ECLDatasetIterator_iter(PyObject *self)
+{
+      Py_INCREF(self);
+      return self;
+}
+
+void ECLDatasetIterator_dealloc(PyObject *self)
+{
+    ECLDatasetIterator *p = (ECLDatasetIterator *)self;
+    if (p->val)
+    {
+        p->val->stop();
+        ::Release(p->val);
+        p->val = NULL;
+    }
+    self->ob_type->tp_free(self);
+}
+
+PyObject* ECLDatasetIterator_iternext(PyObject *self)
+{
+    ECLDatasetIterator *p = (ECLDatasetIterator *)self;
+    if (p->val)
+    {
+        const byte *nextRow = (const byte *) p->val->nextRow();
+        if (!nextRow)
+        {
+            p->val->stop();
+            ::Release(p->val);
+            p->val = NULL;
+        }
+        else
+        {
+            RtlFieldStrInfo dummyField("<row>", NULL, p->typeInfo);
+            PythonNamedTupleBuilder tupleBuilder(NULL, &dummyField);
+            p->typeInfo->process(nextRow, nextRow, &dummyField, tupleBuilder);
+            return tupleBuilder.getTuple(p->typeInfo);
+        }
+    }
+    // If we get here, it's EOF
+    PyErr_SetNone(PyExc_StopIteration);
+    return NULL;
+}
+
+static PyTypeObject ECLDatasetIteratorType =
+{
+    PyObject_HEAD_INIT(NULL)
+    0,                         /*ob_size*/
+    "ECLDatasetIterator._MyIter",      /*tp_name*/
+    sizeof(ECLDatasetIterator),       /*tp_basicsize*/
+    0,                         /*tp_itemsize*/
+    ECLDatasetIterator_dealloc,        /*tp_dealloc*/
+    0,                         /*tp_print*/
+    0,                         /*tp_getattr*/
+    0,                         /*tp_setattr*/
+    0,                         /*tp_compare*/
+    0,                         /*tp_repr*/
+    0,                         /*tp_as_number*/
+    0,                         /*tp_as_sequence*/
+    0,                         /*tp_as_mapping*/
+    0,                         /*tp_hash */
+    0,                         /*tp_call*/
+    0,                         /*tp_str*/
+    0,                         /*tp_getattro*/
+    0,                         /*tp_setattro*/
+    0,                         /*tp_as_buffer*/
+    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_ITER,  /* tp_flags: tell python to use tp_iter and tp_iternext fields. */
+    "ECL dataset iterator object.",           /* tp_doc */
+    0,  /* tp_traverse */
+    0,  /* tp_clear */
+    0,  /* tp_richcompare */
+    0,  /* tp_weaklistoffset */
+    ECLDatasetIterator_iter,  /* tp_iter: __iter__() method */
+    ECLDatasetIterator_iternext  /* tp_iternext: next() method */
+};
+
+static PyObject *createECLDatasetIterator(const RtlTypeInfo *_typeInfo, IRowStream * _val)
+{
+    ECLDatasetIteratorType.tp_new = PyType_GenericNew;
+    if (PyType_Ready(&ECLDatasetIteratorType) < 0)  return NULL;
+
+    ECLDatasetIterator *p = PyObject_New(ECLDatasetIterator, &ECLDatasetIteratorType);
+    if (!p)
+    {
+        checkPythonError();
+        rtlFail(0, "pyembed: failed to create dataset iterator");
+    }
+    p->typeInfo = _typeInfo;
+    p->val = _val;
+    return (PyObject *)p;
+}
+
+//-----------------------------------------------------
+
 // GILBlock ensures the we hold the Python "Global interpreter lock" for the appropriate duration
 
 class GILBlock
@@ -988,9 +1092,7 @@ public:
         // NOTE - the caller should already have the GIL lock before creating me
         if (!result || result == Py_None)
             typeError("list or generator", NULL);
-        if (!PyList_Check(result) && !PyGen_Check(result))  // MORE - should I remove this check, and just say if it is iterable, it's good?
-            typeError("list or generator", NULL);
-        resultIterator.setown(PyObject_GetIter(result));
+        resultIterator.setown(PyObject_GetIter(result));   // We allow anything that is iterable to be returned for a row stream
         checkPythonError();
         resultAllocator.set(_resultAllocator);
     }
@@ -1237,7 +1339,7 @@ public:
     }
     virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
     {
-        UNIMPLEMENTED;
+        addArg(name, createECLDatasetIterator(metaVal.queryTypeInfo(), LINK(val)));
     }
 
 protected: