Przeglądaj źródła

HPCC-10457 Streamed dataset support for Python

Add initial namedtuple support

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 lat temu
rodzic
commit
ce0a014683
3 zmienionych plików z 124 dodań i 42 usunięć
  1. 63 41
      plugins/pyembed/pyembed.cpp
  2. 1 1
      testing/ecl/streame.ecl
  3. 60 0
      testing/ecl/streame2.ecl

+ 63 - 41
plugins/pyembed/pyembed.cpp

@@ -581,90 +581,83 @@ class PythonRowBuilder : public CInterfaceOf<IFieldSource>
 {
 public:
     PythonRowBuilder(PyObject *_row)
-    : iter(NULL), elem(_row)
+    : iter(NULL), elem(NULL), pushback(_row), named(false)
     {
     }
     virtual bool getBooleanResult(const RtlFieldInfo *field)
     {
-        bool ret = pyembed::getBooleanResult(field, elem);
-        nextField();
-        return ret;
+        nextField(field);
+        return pyembed::getBooleanResult(field, elem);
     }
     virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
     {
+        nextField(field);
         pyembed::getDataResult(field, elem, len, result);
-        nextField();
     }
     virtual double getRealResult(const RtlFieldInfo *field)
     {
-        double ret = pyembed::getRealResult(field, elem);
-        nextField();
-        return ret;
+        nextField(field);
+        return pyembed::getRealResult(field, elem);
     }
     virtual __int64 getSignedResult(const RtlFieldInfo *field)
     {
-        __int64 ret = pyembed::getSignedResult(field, elem);
-        nextField();
-        return ret;
+        nextField(field);
+        return pyembed::getSignedResult(field, elem);
     }
     virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
     {
-        unsigned __int64 ret = pyembed::getUnsignedResult(field, elem);
-        nextField();
-        return ret;
+        nextField(field);
+        return pyembed::getUnsignedResult(field, elem);
     }
     virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
     {
+        nextField(field);
         pyembed::getStringResult(field, elem, chars, result);
-        nextField();
     }
     virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
     {
+        nextField(field);
         pyembed::getUTF8Result(field, elem, chars, result);
-        nextField();
     }
     virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
     {
+        nextField(field);
         pyembed::getUnicodeResult(field, elem, chars, result);
-        nextField();
     }
     virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
     {
+        nextField(field);
         double ret = pyembed::getRealResult(field, elem);
         value.setReal(ret);
-        nextField();
     }
 
     virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
     {
+        nextField(field);
         isAll = false;  // No concept of an 'all' set in Python
         assertex(elem && elem != Py_None);
         if (!PyList_Check(elem) && !PySet_Check(elem))
             typeError("list or set", field);
-        iterStack.append(iter.getClear());
-        iter.setown(PyObject_GetIter(elem));
-        nextField();
+        push();
     }
     virtual bool processNextSet(const RtlFieldInfo * field)
     {
-        return elem != NULL;
+        nextField(NULL);
+        pushback.setown(elem.getClear());
+        return pushback != NULL;
     }
     virtual void processBeginDataset(const RtlFieldInfo * field)
     {
-        if (PyList_Check(elem))
-        {
-            iterStack.append(iter.getClear());
-            iter.setown(PyObject_GetIter(elem));
-            nextField();
-        }
-        else
+        nextField(field);
+        if (!PyList_Check(elem))
             typeError("list", field);
+        push();
     }
     virtual void processBeginRow(const RtlFieldInfo * field)
     {
         // Expect to see a tuple here, or possibly (if the ECL record has a single field), an arbitrary scalar object
         // If it's a tuple, we push it onto our stack as the active object
-        iterStack.append(iter.getClear());
+        nextField(NULL);  // MORE - should it be passing field?
         if (!PyTuple_Check(elem))
         {
             if (countFields(field->type->queryFields())==1)
@@ -676,40 +669,69 @@ public:
             else
                 typeError("tuple", field);
         }
-        iter.setown(PyObject_GetIter(elem));
-        nextField();
+        push();
     }
     virtual bool processNextRow(const RtlFieldInfo * field)
     {
-        return elem != NULL;
+        nextField(NULL);
+        pushback.setown(elem.getClear());
+        return pushback != NULL;
     }
     virtual void processEndSet(const RtlFieldInfo * field)
     {
-        iter.setown((PyObject *) iterStack.pop());
-        nextField();
+        pop();
     }
     virtual void processEndDataset(const RtlFieldInfo * field)
     {
-        iter.setown((PyObject *) iterStack.pop());
-        nextField();
+        pop();
     }
     virtual void processEndRow(const RtlFieldInfo * field)
     {
-        iter.setown((PyObject *) iterStack.pop());
-        nextField();
+        pop();
     }
 protected:
-    void nextField()
+    void pop()
     {
-        if (iter)
+        iter.setown((PyObject *) iterStack.pop());
+        parent.setown((PyObject *) parentStack.pop());
+        named = namedStack.pop();
+        elem.clear();
+    }
+    void push()
+    {
+        iterStack.append(iter.getClear());
+        parentStack.append(parent.getClear());
+        namedStack.append(named);
+        parent.set(elem);
+        iter.setown(PyObject_GetIter(elem));
+        named = isNamedTuple(elem);
+    }
+    bool isNamedTuple(PyObject *obj)
+    {
+        return PyObject_HasAttrString((PyObject *) obj->ob_type, "_fields");
+    }
+    void nextField(const RtlFieldInfo * field)
+    {
+        if (pushback)
+            elem.setown(pushback.getClear());
+        else if (field && named) // If it's named tuple, expect to always resolve fields by name, not position
+        {
+            elem.setown(PyObject_GetAttrString(parent, field->name->str()));
+        }
+        else if (iter)
             elem.setown(PyIter_Next(iter));
         else
             elem = NULL;
         checkPythonError();
     }
     OwnedPyObject iter;
+    OwnedPyObject pushback;
     OwnedPyObject elem;
+    OwnedPyObject parent;
+    bool named;
     PointerArray iterStack;
+    PointerArray parentStack;
+    BoolArray namedStack;
 };
 
 // GILBlock ensures the we hold the Python "Global interpreter lock" for the appropriate duration

+ 1 - 1
testing/ecl/streame.ecl

@@ -48,7 +48,7 @@ ENDEMBED;
 
 dataset(namesRecord) streamedNames(data d, utf8 u) := EMBED(Python)
   return [  \
-     ("Gavin", "Halliday", [("a", 1)], [("aa", 11)], ("b", 2), 250, -1,  U'là',  U'là',  U'là', 0x01000000, d, False, {"1","2"}), \
+     ("Gavin", "Halliday", [("a", 1),("b", 2),("c", 3)], [("aa", 11)], ("aaa", 111), 250, -1,  U'là',  U'là',  U'là', 0x01000000, d, False, {"1","2"}), \
      ("John", "Smith", [], [], ("c", 3), 250, -1,  U'là',  U'là',  u, 0x02000000, d, True, []) \
      ]
 ENDEMBED;

+ 60 - 0
testing/ecl/streame2.ecl

@@ -0,0 +1,60 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+IMPORT Python;
+
+childrec := RECORD
+   string name => unsigned value;
+END;
+
+namerec := RECORD
+   string name;
+END;
+
+// Test use of Python generator object for lazy evaluation...
+
+dataset(childrec) testGenerator(unsigned lim) := EMBED(Python)
+  num = 0
+  while num < lim:
+    yield ("Generate:", num)
+    num += 1
+ENDEMBED;
+
+// Test use of Python named tuple...
+
+dataset(childrec) testNamedTuple(unsigned lim) := EMBED(Python)
+  import collections
+  ChildRec = collections.namedtuple("childrec", "value, name") # Note - order is reverse of childrec - but works as we get fields by name
+  c1 = ChildRec(1, "name1")
+  c2 = ChildRec(name="name2", value=2)
+  return [ c1, c2 ]
+ENDEMBED;
+
+// Test 'missing tuple' case...
+
+dataset(namerec) testMissingTuple1(unsigned lim) := EMBED(Python)
+  return [ '1', '2', '3' ]
+ENDEMBED;
+
+dataset(namerec) testMissingTuple2(unsigned lim) := EMBED(Python)
+  return [ ('1'), ('2'), ('3') ]
+ENDEMBED;
+
+//output (testGenerator(10));
+output (testNamedTuple(10));
+//output (testMissingTuple1(10));
+//output (testMissingTuple2(10));