浏览代码

HPCC-10455 Support streamed dataset results from embedded languages

Initial POC implementation in Python plugin

Also fixing HPCC-10457 Streamed dataset support for Python

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父节点
当前提交
2e2f5a1fc0

+ 4 - 1
common/thorhelper/roxierow.cpp

@@ -201,7 +201,10 @@ public:
     {
         return meta.createInternalDeserializer(ctx, activityId);
     }
-
+    virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type)
+    {
+        return this; // MORE - wrong!
+    }
 protected:
     inline byte * * doReallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
     {

+ 8 - 0
common/thorhelper/thorcommon.ipp

@@ -143,6 +143,10 @@ public:
         self = NULL;
         reserved = 0;
     }
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return NULL;
+    }
 
 protected:
     virtual byte * createSelf()
@@ -181,6 +185,10 @@ public:
         return self;
     }
 
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return builder->queryAllocator();
+    }
 protected:
     size32_t offset;
     Linked<ARowBuilder> builder;

+ 1 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -2909,6 +2909,7 @@ public:
     virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) { throwUnexpected(); }
     virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx = NULL) { throwUnexpected(); }
     virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx) { throwUnexpected(); }
+    virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type) { return this; }
 };
 
 //Use a (constant) transform to map selectors of the form queryActiveTableSelector().field

+ 4 - 2
plugins/pyembed/pyembed.cpp

@@ -672,8 +672,10 @@ public:
         }
         else if (countFields(field->type->queryFields())==1)
         {
-            // iter is NULL;
-            // NOTE - we don't call nextField here. There is a single field. Debatable whether supporting this is helpful?
+            // Python doesn't seem to support the concept of a tuple containing a single element.
+            // If we are expecting a single field in our row, then the 'tuple' layer will be missing
+            // NOTE - we don't call nextField here, and iter is set to null by the getClear() above.
+            // elem will be valid for one field only
         }
         else
         {

+ 8 - 1
roxie/ccd/ccdactivities.cpp

@@ -622,6 +622,10 @@ public:
     }
     IMPLEMENT_IINTERFACE
 
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return dynamicBuilder.queryAllocator();
+    }
     virtual byte * createSelf()
     {
         if (useDynamic)
@@ -714,7 +718,10 @@ public:
     {
         useDynamic = meta.isVariableSize();
     }
-
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return dynamicBuilder.queryAllocator();
+    }
     virtual byte * createSelf()
     {
         if (useDynamic)

+ 12 - 0
rtl/eclrtl/rtlds_imp.hpp

@@ -227,6 +227,10 @@ public:
             maxLength = 0;
         }
     }
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return rowAllocator;
+    }
     inline RtlDynamicRowBuilder(IEngineRowAllocator * _rowAllocator, bool createInitial) : rowAllocator(_rowAllocator) 
     {
         if (rowAllocator && createInitial)
@@ -284,6 +288,10 @@ public:
     }
 
     virtual byte * ensureCapacity(size32_t required, const char * fieldName);
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return NULL;
+    }
 
     inline void clear() { self = NULL; maxLength = 0; }
     inline void set(size32_t _maxLength, void * _self) { self = static_cast<byte *>(_self); maxLength = _maxLength; }
@@ -310,6 +318,10 @@ public:
         self = container.ensureCapacity(offset+required+suffix, fieldName) + offset;
         return self;
     }
+    virtual IEngineRowAllocator *queryAllocator() const
+    {
+        return container.queryAllocator();
+    }
 
 protected:
     virtual byte * createSelf()

+ 24 - 10
rtl/eclrtl/rtlfield.cpp

@@ -75,13 +75,13 @@ size32_t RtlTypeInfoBase::toXML(const byte * self, const byte * selfrow, const R
     rtlFailUnexpected();
     return 0;
 }
-/*
+
 size32_t RtlTypeInfoBase::build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const
 {
     rtlFailUnexpected();
     return 0;
 }
-*/
+
 const char * RtlTypeInfoBase::queryLocale() const 
 {
     return NULL; 
@@ -189,7 +189,8 @@ size32_t RtlSwapIntTypeInfo::build(ARowBuilder &builder, size32_t offset, const
 {
     builder.ensureCapacity(length+offset, field->name->str());
     __int64 val = isUnsigned() ? (__int64) source.getUnsignedResult(field) : source.getSignedResult(field);
-    rtlWriteSwapInt(builder.getSelf() + offset, val, length);
+    // NOTE - we assume that the value returned from the source is already a swapped int
+    rtlWriteInt(builder.getSelf() + offset, val, length);
     offset += length;
     return offset;
 }
@@ -266,9 +267,14 @@ size32_t RtlStringTypeInfo::build(ARowBuilder &builder, size32_t offset, const R
         builder.ensureCapacity(offset+size+sizeof(size32_t), field->name->str());
         byte *dest = builder.getSelf()+offset;
         rtlWriteInt4(dest, size);
+#if 0
+        // NOTE - you might argue that we should convert the incoming data to EBCDIC. But it seems more useful to
+        // define the semantics as being that the IFieldSource should return EBCDIC if you have declared the matching field as EBCDIC
+        // (otherwise, why did you bother?)
         if (isEbcdic())
-            rtlStrToEStr(size, (char *) dest+sizeof(size32_t), size, (char *)value);  // slightly debatable - might expect incoming result to already be in ebcdic?
+            rtlStrToEStr(size, (char *) dest+sizeof(size32_t), size, (char *)value);
         else
+#endif
             memcpy(dest+sizeof(size32_t), value, size);
         offset += size+sizeof(size32_t);
     }
@@ -276,9 +282,12 @@ size32_t RtlStringTypeInfo::build(ARowBuilder &builder, size32_t offset, const R
     {
         builder.ensureCapacity(offset+length, field->name->str());
         byte *dest = builder.getSelf()+offset;
+#if 0
+        // See above...
         if (isEbcdic())
             rtlStrToEStr(length, (char *) dest, size, (char *) value);
         else
+#endif
             rtlStrToStr(length, dest, size, value);
         offset += length;
     }
@@ -1088,15 +1097,20 @@ size32_t RtlDatasetTypeInfo::build(ARowBuilder &builder, size32_t offset, const
         // a 32-bit record count, and a pointer to an array of record pointers
         size32_t sizeInBytes = sizeof(size32_t) + sizeof(void *);
         builder.ensureCapacity(offset+sizeInBytes, field->name->str());
-        size32_t newOffset = offset + sizeInBytes;
         size32_t numRows = 0;
-
-        // MORE - read all the child rows
-
+        IEngineRowAllocator *childAllocator = builder.queryAllocator()->createChildRowAllocator(child);
+        byte **childRows = NULL;
+        RtlFieldStrInfo dummyField("<nested row>", NULL, child);
+        while (source.processNextRow(field))
+        {
+            RtlDynamicRowBuilder childBuilder(childAllocator);
+            size32_t childLen = child->build(childBuilder, 0, &dummyField, source);
+            childRows = childAllocator->appendRowOwn(childRows, ++numRows, (void *) childBuilder.finalizeRowClear(childLen));
+        }
         // Go back in and patch the count, remembering it may have moved
         rtlWriteInt4(builder.getSelf()+offset, numRows);
-        // * ( void * * ) (builder.getSelf()+offset+sizeof(size32_t)) = rows;
-        offset = newOffset;
+        * ( const void * * ) (builder.getSelf()+offset+sizeof(size32_t)) = childRows;
+        offset += sizeInBytes;
     }
     else
     {

+ 4 - 4
rtl/eclrtl/rtlfield_imp.hpp

@@ -28,7 +28,7 @@ struct ECLRTL_API RtlTypeInfoBase : public RtlTypeInfo
     virtual size32_t size(const byte * self, const byte * selfrow) const;
     virtual size32_t process(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IFieldProcessor & target) const;
     virtual size32_t toXML(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IXmlWriter & target) const;
-    // virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
+    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
 
 
     virtual const char * queryLocale() const;
@@ -153,8 +153,8 @@ struct ECLRTL_API RtlUnicodeTypeInfo : public RtlTypeInfoBase
 public:
     inline RtlUnicodeTypeInfo(unsigned _fieldType, unsigned _length, const char * _locale) : RtlTypeInfoBase(_fieldType, _length), locale(_locale) {}
 
-    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual size32_t size(const byte * self, const byte * selfrow) const;
+    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual size32_t process(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IFieldProcessor & target) const;
     virtual size32_t toXML(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IXmlWriter & target) const;
 
@@ -169,8 +169,8 @@ struct ECLRTL_API RtlVarUnicodeTypeInfo : public RtlTypeInfoBase
 public:
     inline RtlVarUnicodeTypeInfo(unsigned _fieldType, unsigned _length, const char * _locale) : RtlTypeInfoBase(_fieldType, _length), locale(_locale) {}
 
-    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual size32_t size(const byte * self, const byte * selfrow) const;
+    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual size32_t process(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IFieldProcessor & target) const;
     virtual size32_t toXML(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IXmlWriter & target) const;
 
@@ -202,9 +202,9 @@ struct ECLRTL_API RtlRecordTypeInfo : public RtlTypeInfoBase
     const RtlFieldInfo * const * fields;                // null terminated
 
     virtual size32_t size(const byte * self, const byte * selfrow) const;
+    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual size32_t process(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IFieldProcessor & target) const;
     virtual size32_t toXML(const byte * self, const byte * selfrow, const RtlFieldInfo * field, IXmlWriter & target) const;
-    virtual size32_t build(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, IFieldSource &source) const;
     virtual const RtlFieldInfo * const * queryFields() const { return fields; }
 };
 

+ 4 - 0
rtl/include/eclhelper.hpp

@@ -103,9 +103,12 @@ interface INaryCompareEq
     virtual bool match(unsigned _num, const void * * _rows) const = 0;
 };
 
+interface IEngineRowAllocator;
+
 interface IRowBuilder : public IInterface
 {
     virtual byte * ensureCapacity(size32_t required, const char * fieldName) = 0;
+    virtual IEngineRowAllocator *queryAllocator() const = 0;
 protected:
     virtual byte * createSelf() = 0;
     virtual void reportMissingRow() const = 0;
@@ -254,6 +257,7 @@ interface IEngineRowAllocator : extends IInterface
     virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) = 0;
     virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx = NULL) = 0;
     virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx) = 0;
+    virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *childtype) = 0;
 };
 
 interface IRowSerializerTarget

+ 7 - 7
testing/ecl/streame.ecl

@@ -20,17 +20,19 @@ IMPORT Python;
 childrec := RECORD
    string name => unsigned value;
 END;
+
 namesRecord := RECORD
     STRING name1;
     STRING10 name2;
     LINKCOUNTED DATASET(childrec) childnames;
-    DICTIONARY(childrec) childdict;
+//    DICTIONARY(childrec) childdict;
+    childrec r;
     unsigned1 val1;
     integer1   val2;
     UTF8 u1;
     UNICODE u2;
     UNICODE8 u3;
-    BIG_ENDIAN unsigned6 val3;
+    BIG_ENDIAN unsigned4 val3;
     DATA d;
     BOOLEAN b;
     SET OF STRING ss1;
@@ -46,10 +48,8 @@ ENDEMBED;
 
 dataset(namesRecord) streamedNames(data d, utf8 u) := EMBED(Python)
   return [  \
-     ("Gavin", "Halliday", [("a", 1)], 250, -1,  U'là',  U'là',  U'là', 1234566, d, False, {"1","2"}), \
-     ("John", "Smith", [], 250, -1,  U'là',  U'là',  u, 1234566, d, True, [])]
+     ("Gavin", "Halliday", [("a", 1)], ("b", 2), 250, -1,  U'là',  U'là',  U'là', 0x01000000, d, False, {"1","2"}), \
+     ("John", "Smith", [], ("c", 3), 250, -1,  U'là',  U'là',  u, 0x02000000, d, True, [])]
 ENDEMBED;
 
-//output(linkedNames('AA'));
-//output(blockedNames('AA'));
-output(streamedNames(d'AA', u'là'));
+output(streamedNames(d'AA', u'là'));