Browse Source

HPCC-10456 Codegen support for streamed datasets from embedded languages

Add queryChildMeta() and createChildRowAllocator(), to allow child datasets to
be created from the type info.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 11 years ago
parent
commit
609e1e4f78

+ 43 - 18
common/thorhelper/roxierow.cpp

@@ -122,8 +122,8 @@ bool isRowCheckValid(unsigned allocatorId, const void * row)
 class RoxieEngineRowAllocatorBase : public CInterface, implements IEngineRowAllocator
 {
 public:
-    RoxieEngineRowAllocatorBase(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
-        : rowManager(_rowManager), meta(_meta)
+    RoxieEngineRowAllocatorBase(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _createFlags)
+        : cache(_cache), rowManager(_rowManager), meta(_meta), createFlags(_createFlags)
     {
         activityId = _activityId;
         allocatorId = _allocatorId;
@@ -203,8 +203,26 @@ public:
     }
     virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type)
     {
-        return this; // MORE - wrong!
+        CriticalBlock block(cs); // Not very likely but better be safe
+        if (children.empty())
+        {
+            for (unsigned i =0;;i++)
+            {
+                IOutputMetaData * childMeta = meta.queryChildMeta(i);
+                if (!childMeta)
+                    break;
+                children.append(*cache->ensure(childMeta, activityId, createFlags));
+            }
+        }
+        ForEachItemIn(i, children)
+        {
+            IEngineRowAllocator & cur = children.item(i);
+            if (cur.queryOutputMeta()->queryTypeInfo() == type)
+                return LINK(&cur);
+        }
+        return NULL;
     }
+
 protected:
     inline byte * * doReallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
     {
@@ -237,18 +255,24 @@ protected:
     }
 
 protected:
+    static CriticalSection cs; // Very unlikely to have contention, so share between all allocators
+    IRowAllocatorMetaActIdCache * cache;
     roxiemem::IRowManager & rowManager;
     const CachedOutputMetaData meta;
     unsigned activityId;
     unsigned allocatorId;
+    roxiemem::RoxieHeapFlags createFlags;
+    IArrayOf<IEngineRowAllocator> children;
 };
 
+CriticalSection RoxieEngineRowAllocatorBase::cs;
+
 template <class CHECKER>
 class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
 {
 public:
-    RoxieEngineFixedRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
-        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
+    RoxieEngineFixedRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
+        : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
     {
         unsigned flags = _flags;
         if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
@@ -289,8 +313,8 @@ template <class CHECKER>
 class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
 {
 public:
-    RoxieEngineVariableRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
-        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
+    RoxieEngineVariableRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
+        : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
     {
         unsigned flags = _flags;
         if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
@@ -342,20 +366,20 @@ protected:
 };
 
 
-IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
+IEngineRowAllocator * createRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
 {
     if (meta->getFixedSize() != 0)
-        return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(rowManager, meta, activityId, allocatorId, flags);
+        return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
     else
-        return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(rowManager, meta, activityId, allocatorId, flags);
+        return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
 }
 
-IEngineRowAllocator * createCrcRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
+IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
 {
     if (meta->getFixedSize() != 0)
-        return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(rowManager, meta, activityId, allocatorId, flags);
+        return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
     else
-        return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(rowManager, meta, activityId, allocatorId, flags);
+        return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
 }
 
 #pragma pack(push,1) // hashing on members, so ensure contiguous
@@ -428,7 +452,7 @@ public:
                     return LINK(&container->queryElement());
                 // if in cache but unique, reuse allocatorId
                 SpinUnblock b(allAllocatorsLock);
-                return callback->createAllocator(meta, activityId, container->queryAllocatorId(), flags);
+                return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
             }
             // NB: a RHFunique allocator, will cause 1st to be added to 'allAllocators'
             // subsequent requests for the same type of unique allocator, will share same allocatorId
@@ -439,7 +463,7 @@ public:
             IEngineRowAllocator *ret;
             {
                 SpinUnblock b(allAllocatorsLock);
-                ret = callback->createAllocator(meta, activityId, allocatorId, flags);
+                ret = callback->createAllocator(this, meta, activityId, allocatorId, flags);
                 assertex(ret);
             }
             if (allocatorId == allAllocators.ordinality())
@@ -617,6 +641,7 @@ protected:
         virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
         virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
         virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+        virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 
         size32_t minSize;
         size32_t fixedSize;
@@ -626,7 +651,7 @@ protected:
     {
         CheckingRowAllocatorCache cache;
         Owned<IRowManager> rm = createRowManager(0, NULL, logctx, &cache);
-        Owned<IEngineRowAllocator> alloc = checking ? createCrcRoxieRowAllocator(*rm, meta, 0, 0, flags) : createRoxieRowAllocator(*rm, meta, 0, 0, flags);
+        Owned<IEngineRowAllocator> alloc = checking ? createCrcRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags) : createRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags);
 
         for (unsigned size=low; size <= high; size++)
         {
@@ -710,9 +735,9 @@ protected:
             CAllocatorCallback(IRowManager *_rm) : rm(_rm)
             {
             }
-            virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
+            virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
             {
-                return createRoxieRowAllocator(*rm, meta, activityId, cacheId, flags);
+                return createRoxieRowAllocator(cache, *rm, meta, activityId, cacheId, flags);
             }
         } callback(rm);
         Owned<IRowAllocatorMetaActIdCache> allocatorCache = createRowAllocatorCache(&callback);

+ 4 - 4
common/thorhelper/roxierow.hpp

@@ -23,9 +23,6 @@
 #include "eclhelper.hpp"
 
 
-extern THORHELPER_API IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags flags);
-extern THORHELPER_API IEngineRowAllocator * createCrcRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags);
-
 interface IRowAllocatorMetaActIdCache : extends roxiemem::IRowAllocatorCache
 {
     virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) = 0;
@@ -34,7 +31,7 @@ interface IRowAllocatorMetaActIdCache : extends roxiemem::IRowAllocatorCache
 
 interface IRowAllocatorMetaActIdCacheCallback
 {
-    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const = 0;
+    virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const = 0;
 };
 
 extern THORHELPER_API IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback);
@@ -63,4 +60,7 @@ public:
     }
 };
 
+extern THORHELPER_API IEngineRowAllocator * createRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags flags);
+extern THORHELPER_API IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags);
+
 #endif

+ 14 - 2
common/thorhelper/thorcommon.ipp

@@ -99,6 +99,10 @@ public:
             return meta->querySerializedDiskMeta();
         return meta;
     }
+    inline IOutputMetaData * queryChildMeta(unsigned i) const
+    {
+        return meta->queryChildMeta(i);
+    }
 
 //cast operators.
     inline IOutputMetaData * queryOriginal() const          { return meta; }
@@ -402,7 +406,11 @@ public:
     {
         original->walkIndirectMembers(self+offset, visitor);
     }
-        
+    virtual IOutputMetaData * queryChildMeta(unsigned i)
+    {
+        return original->queryChildMeta(i);
+    }
+
 protected:
     size32_t offset;
     IOutputMetaData *original;
@@ -533,7 +541,11 @@ public:
     {
         original->walkIndirectMembers(self, visitor);
     }
-        
+    virtual IOutputMetaData * queryChildMeta(unsigned i)
+    {
+        return original->queryChildMeta(i);
+    }
+
 protected:
     size32_t offset;
     Linked<IOutputMetaData> original;

+ 3 - 2
ecl/eclagent/eclagent.ipp

@@ -641,9 +641,9 @@ public:
     virtual void updateWULogfile();
 
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
-    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
+    virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
     {
-        return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
+        return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
     }
 };
 
@@ -671,6 +671,7 @@ public:
     virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 };
 
 class EclBoundLoopGraph : public CInterface, implements IHThorBoundLoopGraph

+ 8 - 4
ecl/hql/hqlgram2.cpp

@@ -867,6 +867,7 @@ IHqlExpression * HqlGram::processEmbedBody(const attribute & errpos, IHqlExpress
     {
         type.setown(setStreamedAttr(type, true));
         type.setown(setLinkCountedAttr(type, true));
+        //MORE: Recursively set link counted on the records.
     }
 
     IHqlExpression * record = queryOriginalRecord(type);
@@ -2421,10 +2422,6 @@ void HqlGram::addDatasetField(const attribute &errpos, IIdAtom * name, ITypeInfo
             attrs = createComma(LINK(childAttrs->queryChild(0)), attrs);
     }
 
-    //An explicitly link counted dataset type should ensure the field is linkcounted
-    if (isLinkedRowset(fieldType) && !queryAttributeInList(_linkCounted_Atom, attrs))
-        attrs = createComma(attrs, getLinkCountedAttr());
-
     Owned<ITypeInfo> dsType = LINK(fieldType);
     if (value)
     {
@@ -2452,6 +2449,10 @@ void HqlGram::addDatasetField(const attribute &errpos, IIdAtom * name, ITypeInfo
     if (!attrs)
         attrs = extractAttrsFromExpr(value);
 
+    //An explicitly link counted dataset type should ensure the field is linkcounted
+    if (isLinkedRowset(dsType) && !queryAttributeInList(_linkCounted_Atom, attrs))
+        attrs = createComma(attrs, getLinkCountedAttr());
+
     IHqlExpression *newField = createField(name, LINK(dsType), value, attrs);
     addToActiveRecord(newField);
 }
@@ -2480,6 +2481,9 @@ void HqlGram::addDictionaryField(const attribute &errpos, IIdAtom * name, ITypeI
     if (!attrs)
         attrs = extractAttrsFromExpr(value);
 
+    if (isLinkedRowset(dictType) && !queryAttributeInList(_linkCounted_Atom, attrs))
+        attrs = createComma(attrs, getLinkCountedAttr());
+
     IHqlExpression *newField = createField(name, dictType.getClear(), value, attrs);
     addToActiveRecord(newField);
 }

+ 1 - 1
ecl/hqlcpp/hqlcppds.cpp

@@ -2909,7 +2909,7 @@ public:
     virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) { throwUnexpected(); }
     virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx = NULL) { throwUnexpected(); }
     virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx) { throwUnexpected(); }
-    virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type) { return this; }
+    virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type) { throwUnexpected(); }
 };
 
 //Use a (constant) transform to map selectors of the form queryActiveTableSelector().field

+ 81 - 0
ecl/hqlcpp/hqlhtcpp.cpp

@@ -4320,6 +4320,70 @@ protected:
 
 
 
+class MetaChildMetaCallback
+{
+public:
+    MetaChildMetaCallback(HqlCppTranslator & _translator, IHqlStmt * _switchStmt)
+        : translator(_translator), switchStmt(_switchStmt)
+    {
+        nextIndex = 0;
+    }
+
+    void walkRecord(BuildCtx & ctx, IHqlExpression * record)
+    {
+        ForEachChild(i, record)
+        {
+            IHqlExpression * cur = record->queryChild(i);
+            switch (cur->getOperator())
+            {
+            case no_record:
+                walkRecord(ctx, cur);
+                break;
+            case no_ifblock:
+                walkRecord(ctx, cur->queryChild(1));
+                break;
+            case no_field:
+                {
+                    ITypeInfo * type = cur->queryType();
+                    switch (type->getTypeCode())
+                    {
+                    case type_row:
+                        walkRecord(ctx, queryRecord(cur));
+                        break;
+                    case type_dictionary:
+                    case type_table:
+                    case type_groupedtable:
+                        {
+                            IHqlExpression * record = cur->queryRecord()->queryBody();
+                            if (!visited.contains(*record))
+                            {
+                                BuildCtx condctx(ctx);
+                                OwnedHqlExpr branch = getSizetConstant(nextIndex++);
+                                OwnedHqlExpr childMeta = translator.buildMetaParameter(record);
+                                OwnedHqlExpr ret = createValue(no_address, makeBoolType(), LINK(childMeta));
+
+                                condctx.addCase(switchStmt, branch);
+                                condctx.addReturn(ret);
+                                visited.append(*record);
+                            }
+                            break;
+                        }
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+protected:
+    HqlCppTranslator & translator;
+    IHqlStmt * switchStmt;
+    HqlExprCopyArray visited;
+    unsigned nextIndex;
+};
+
+
+
 void HqlCppTranslator::generateMetaRecordSerialize(BuildCtx & ctx, IHqlExpression * record, const char * diskSerializerName, const char * diskDeserializerName, const char * internalSerializerName, const char * internalDeserializerName, const char * prefetcherName)
 {
     OwnedHqlExpr dataset = createDataset(no_null, LINK(record));
@@ -4343,6 +4407,23 @@ void HqlCppTranslator::generateMetaRecordSerialize(BuildCtx & ctx, IHqlExpressio
         builder.walkRecord(walkctx, dataset, record);
     }
 
+    if (recordRequiresDestructor(record))
+    {
+        BuildCtx childmetactx(ctx);
+        OwnedHqlExpr switchVar = createVariable("i", makeIntType(4, false));
+        IHqlStmt * child = childmetactx.addQuotedCompound("virtual IOutputMetaData * queryChildMeta(unsigned i)");
+
+        BuildCtx switchctx(childmetactx);
+        IHqlStmt * switchStmt = switchctx.addSwitch(switchVar);
+        unsigned prevChildren = calcTotalChildren(child);
+        MetaChildMetaCallback builder(*this, switchStmt);
+        builder.walkRecord(childmetactx, record);
+        if (prevChildren != calcTotalChildren(child))
+            childmetactx.addReturn(queryQuotedNullExpr());
+        else
+            child->setIncluded(false);
+    }
+
     if (diskSerializerName && *diskSerializerName)
     {
         BuildCtx serializectx(ctx);

+ 1 - 5
ecl/hthor/hthor.cpp

@@ -9320,6 +9320,7 @@ public:
     virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 };
 
 //=====================================================================================================
@@ -10043,8 +10044,3 @@ IHThorException * makeHThorException(ThorActivityKind kind, unsigned activityId,
 {
     return new CHThorException(exc, extra, kind, activityId, subgraphId);
 }
-
-extern HTHOR_API IEngineRowAllocator * createHThorRowAllocator(IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
-{
-    return createRoxieRowAllocator(_rowManager, _meta, _activityId, _allocatorId, roxiemem::RHFnone);
-}

+ 0 - 3
ecl/hthor/hthor.hpp

@@ -214,7 +214,4 @@ extern HTHOR_API IHThorException * makeHThorException(ThorActivityKind kind, uns
 extern HTHOR_API IHThorException * makeHThorException(ThorActivityKind kind, unsigned activityId, unsigned subgraphId, IException * exc);
 extern HTHOR_API IHThorException * makeHThorException(ThorActivityKind kind, unsigned activityId, unsigned subgraphId, IException * exc, char const * extra);
 
-extern HTHOR_API IEngineRowAllocator * createHThorRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId);
-
-
 #endif // HTHOR_INCL

+ 4 - 4
roxie/ccd/ccdcontext.cpp

@@ -1256,12 +1256,12 @@ public:
     }
 
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
-    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
+    virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
     {
         if (checkingHeap)
-            return createCrcRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
+            return createCrcRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
         else
-            return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
+            return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
     }
 
     virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
@@ -2320,7 +2320,7 @@ public:
             size32_t oldCount;
             rowset_t oldData;
             resultStore.queryResult(oldId, oldCount, oldData);
-            Owned<IEngineRowAllocator> allocator = createRoxieRowAllocator(*rowManager, meta, 0, 0, roxiemem::RHFnone);
+            Owned<IEngineRowAllocator> allocator = getRowAllocator(meta, 0);
             RtlLinkedDatasetBuilder builder(allocator);
             builder.appendRows(oldCount, oldData);
             builder.appendRows(count, data);

+ 2 - 0
roxie/ccd/ccdserver.cpp

@@ -14525,6 +14525,7 @@ public:
     virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 };
 
 class CRoxieServerLoopActivityFactory : public CRoxieServerActivityFactory
@@ -25922,6 +25923,7 @@ public:
     virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 } testMeta;
 
 class TestInput : public CInterface, implements IRoxieInput

+ 3 - 1
rtl/include/eclhelper.hpp

@@ -108,10 +108,11 @@ interface IEngineRowAllocator;
 interface IRowBuilder : public IInterface
 {
     virtual byte * ensureCapacity(size32_t required, const char * fieldName) = 0;
-    virtual IEngineRowAllocator *queryAllocator() const = 0;
 protected:
     virtual byte * createSelf() = 0;
     virtual void reportMissingRow() const = 0;
+public:
+    virtual IEngineRowAllocator *queryAllocator() const = 0;
 };
 
 class ARowBuilder : public IRowBuilder
@@ -448,6 +449,7 @@ interface IOutputMetaData : public IRecordSize
 
     virtual void process(const byte * self, IFieldProcessor & target, unsigned from, unsigned to) {}            // from and to are *hints* for the range of fields to call through with
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) = 0;
+    virtual IOutputMetaData * queryChildMeta(unsigned i) = 0;
 };
 
 

+ 1 - 0
rtl/include/eclhelper_base.hpp

@@ -174,6 +174,7 @@ public:
         return createDiskDeserializer(ctx, activityId);
     }
     virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) { }
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
 
 protected:
     //This is the prefetch function that is actually generated by the code generator

+ 12 - 4
thorlcr/thorutil/thmem.cpp

@@ -1954,9 +1954,9 @@ public:
         allocatorMetaCache.clear();
     }
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
-    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
+    virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
     {
-        return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
+        return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
     }
 // IThorAllocator
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
@@ -1985,9 +1985,9 @@ public:
 // IThorAllocator
     virtual bool queryCrc() const { return true; }
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
-    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
+    virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
     {
-        return createCrcRoxieRowAllocator(*rowManager, meta, activityId, cacheId, flags);
+        return createCrcRoxieRowAllocator(cache, *rowManager, meta, activityId, cacheId, flags);
     }
 };
 
@@ -2159,6 +2159,10 @@ public:
         //GH: I think this is what it should do, please check
         visitor.visitRow(*(const byte **)(self+extraSz)); 
     }
+    virtual IOutputMetaData * queryChildMeta(unsigned i)
+    {
+        return childMeta->queryChildMeta(i);
+    }
 };
 
 IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz)
@@ -2303,6 +2307,10 @@ public:
     {
         meta->walkIndirectMembers(self, visitor);
     }
+    virtual IOutputMetaData * queryChildMeta(unsigned i)
+    {
+        return meta->queryChildMeta(i);
+    }
 };
 
 IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz)