Selaa lähdekoodia

Merge pull request #1313 from ghalliday/roxiemem-refactor

Roxiemem refactor

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 vuotta sitten
vanhempi
commit
15152877a8

+ 2 - 1
common/roxiehelper/CMakeLists.txt

@@ -44,7 +44,8 @@ include_directories (            ./../../system/security/securesocket
 
 ADD_DEFINITIONS( -DROXIEHELPER_EXPORTS -D_USRDLL -DMODULE_PRIORITY=5 )
 
-set ( SRCS roxiehelper.cpp roxiedebug.cpp roxiehelper.hpp roxiedebug.hpp roxielmj.hpp)
+set ( SRCS roxiehelper.cpp roxiedebug.cpp roxierow.cpp
+           roxiehelper.hpp roxiedebug.hpp roxierow.hpp roxielmj.hpp)
 
 HPCC_ADD_LIBRARY( roxiehelper SHARED ${SRCS} )
 install ( TARGETS roxiehelper DESTINATION ${OSSDIR}/lib )

+ 250 - 0
common/roxiehelper/roxierow.cpp

@@ -0,0 +1,250 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+#include "jexcept.hpp"
+#include "roxierow.hpp"
+#include "thorcommon.ipp"
+
+class RoxieEngineRowAllocatorBase : public CInterface, implements IEngineRowAllocator
+{
+public:
+    RoxieEngineRowAllocatorBase(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
+        : rowManager(_rowManager), meta(_meta)
+    {
+        activityId = _activityId;
+        allocatorId = _allocatorId;
+    }
+
+    IMPLEMENT_IINTERFACE
+
+//interface IEngineRowsetAllocator
+    virtual byte * * createRowset(unsigned count)
+    {
+        if (count == 0)
+            return NULL;
+        return (byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
+    }
+
+    virtual void releaseRowset(unsigned count, byte * * rowset)
+    {
+        rtlReleaseRowset(count, rowset);
+    }
+
+    virtual byte * * linkRowset(byte * * rowset)
+    {
+        return rtlLinkRowset(rowset);
+    }
+
+    virtual byte * * appendRowOwn(byte * * rowset, unsigned newRowCount, void * row)
+    {
+        byte * * expanded = doReallocRows(rowset, newRowCount-1, newRowCount);
+        expanded[newRowCount-1] = (byte *)row;
+        return expanded;
+    }
+
+    virtual byte * * reallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
+    {
+        //New rows (if any) aren't cleared....
+        return doReallocRows(rowset, oldRowCount, newRowCount);
+    }
+
+    virtual void releaseRow(const void * row)
+    {
+        ReleaseRoxieRow(row);
+    }
+
+    virtual void * linkRow(const void * row)
+    {
+        LinkRoxieRow(row);
+        return const_cast<void *>(row);
+    }
+
+    virtual IOutputMetaData * queryOutputMeta()
+    {
+        return meta.queryOriginal();
+    }
+    virtual unsigned queryActivityId()
+    {
+        return activityId;
+    }
+    virtual StringBuffer &getId(StringBuffer &idStr)
+    {
+        return idStr.append(activityId); // MORE - may want more context info in here
+    }
+    virtual IOutputRowSerializer *createRowSerializer(ICodeContext *ctx)
+    {
+        return meta.createRowSerializer(ctx, activityId);
+    }
+    virtual IOutputRowDeserializer *createRowDeserializer(ICodeContext *ctx)
+    {
+        return meta.createRowDeserializer(ctx, activityId);
+    }
+
+protected:
+    inline byte * * doReallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
+    {
+        if (!rowset)
+            return createRowset(newRowCount);
+
+        //This would be more efficient if previous capacity was stored by the caller - or if capacity() is more efficient
+        if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
+            return rowset;
+
+        size32_t capacity;
+        return (byte * *)rowManager.resizeRow(rowset, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
+    }
+
+protected:
+    roxiemem::IRowManager & rowManager;
+    const CachedOutputMetaData meta;
+    unsigned activityId;
+    unsigned allocatorId;
+};
+
+//General purpose row allocator here for reference - should be removed once spcialised versions are created
+class RoxieEngineRowAllocator : public RoxieEngineRowAllocatorBase
+{
+public:
+    RoxieEngineRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
+        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
+    {
+    }
+
+    virtual void * createRow()
+    {
+        size32_t allocSize = meta.getInitialSize();
+        return rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
+    }
+
+    virtual void * createRow(size32_t & allocatedSize)
+    {
+        const size32_t allocSize = meta.getInitialSize();
+        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
+        //more: allocate could return the allocated size, but that would penalise the fixed row case
+        allocatedSize = RoxieRowCapacity(ret);
+        return ret;
+    }
+
+    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
+    {
+        return rowManager.resizeRow(row, size, newSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED, size);
+    }
+
+    virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
+    {
+        unsigned id = allocatorId | ACTIVITY_FLAG_ISREGISTERED;
+        if (meta.needsDestruct()) id |= ACTIVITY_FLAG_NEEDSDESTRUCTOR;
+        return rowManager.finalizeRow(row, oldSize, finalSize, id);
+    }
+};
+
+class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
+{
+public:
+    RoxieEngineFixedRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool packed)
+        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
+    {
+        unsigned flags = packed ? roxiemem::RHFpacked : roxiemem::RHFnone;
+        if (meta.needsDestruct())
+            flags |= roxiemem::RHFhasdestructor;
+        heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize(), allocatorId | ACTIVITY_FLAG_ISREGISTERED, (roxiemem::RoxieHeapFlags)flags));
+    }
+
+    virtual void * createRow()
+    {
+        return heap->allocate();
+    }
+
+    virtual void * createRow(size32_t & allocatedSize)
+    {
+        allocatedSize = meta.getFixedSize();
+        return heap->allocate();
+    }
+
+    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
+    {
+        throwUnexpected();
+        return NULL;
+    }
+
+    virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
+    {
+        if (!meta.needsDestruct())
+            return row;
+        return heap->finalizeRow(row);
+    }
+
+protected:
+    Owned<roxiemem::IFixedRowHeap> heap;
+};
+
+class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
+{
+public:
+    RoxieEngineVariableRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool _packed)
+        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId), packed(_packed)
+    {
+        unsigned flags = packed ? roxiemem::RHFpacked : roxiemem::RHFnone;
+        if (meta.needsDestruct())
+            flags |= roxiemem::RHFhasdestructor;
+        heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED, (roxiemem::RoxieHeapFlags)flags));
+    }
+
+    virtual void * createRow()
+    {
+        size32_t allocSize = meta.getInitialSize();
+        size32_t capacity;
+        return heap->allocate(allocSize, capacity);
+    }
+
+    virtual void * createRow(size32_t & allocatedSize)
+    {
+        const size32_t allocSize = meta.getInitialSize();
+        return heap->allocate(allocSize, allocatedSize);
+    }
+
+    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
+    {
+        return heap->resizeRow(row, size, newSize, size);
+    }
+
+    virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
+    {
+        if (!meta.needsDestruct() && !packed)
+            return row;
+        return heap->finalizeRow(row, oldSize, finalSize);
+    }
+
+protected:
+    Owned<roxiemem::IVariableRowHeap> heap;
+    bool packed;    // may not be needed - depends on implementation
+};
+
+
+IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, bool packed)
+{
+#if 0
+    //old code
+    return new RoxieEngineRowAllocator(rowManager, meta, activityId, allocatorId);
+#else
+    if (meta->getFixedSize() != 0)
+        return new RoxieEngineFixedRowAllocator(rowManager, meta, activityId, allocatorId, packed);
+    else
+        return new RoxieEngineVariableRowAllocator(rowManager, meta, activityId, allocatorId, packed);
+#endif
+}

+ 37 - 0
common/roxiehelper/roxierow.hpp

@@ -0,0 +1,37 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+#ifndef _ROXIEROW_INCL
+#define _ROXIEROW_INCL
+
+#ifdef _WIN32
+ #ifdef ROXIEHELPER_EXPORTS
+  #define ROXIEHELPER_API __declspec(dllexport)
+ #else
+  #define ROXIEHELPER_API __declspec(dllimport)
+ #endif
+#else
+ #define ROXIEHELPER_API
+#endif
+
+#include "roxiemem.hpp"
+#include "eclhelper.hpp"
+
+extern ROXIEHELPER_API IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool packed);
+
+#endif

+ 2 - 2
common/thorhelper/thorcommon.cpp

@@ -500,7 +500,7 @@ void CThorDemoRowSerializer::endNested(size32_t sizePos)
 
 
 
-IOutputRowSerializer * CachedOutputMetaData::createRowSerializer(ICodeContext * ctx, unsigned activityId)
+IOutputRowSerializer * CachedOutputMetaData::createRowSerializer(ICodeContext * ctx, unsigned activityId) const
 {
     if (metaFlags & (MDFhasserialize|MDFneedserialize))
         return meta->createRowSerializer(ctx, activityId);
@@ -510,7 +510,7 @@ IOutputRowSerializer * CachedOutputMetaData::createRowSerializer(ICodeContext *
 }
 
 
-IOutputRowDeserializer * CachedOutputMetaData::createRowDeserializer(ICodeContext * ctx, unsigned activityId)
+IOutputRowDeserializer * CachedOutputMetaData::createRowDeserializer(ICodeContext * ctx, unsigned activityId) const
 {
     if (metaFlags & (MDFhasserialize|MDFneedserialize))
         return meta->createRowDeserializer(ctx, activityId);

+ 4 - 4
common/thorhelper/thorcommon.ipp

@@ -124,8 +124,8 @@ public:
             meta->destruct(self);
     }
 
-    IOutputRowSerializer * createRowSerializer(ICodeContext * ctx, unsigned activityId);
-    IOutputRowDeserializer * createRowDeserializer(ICodeContext * ctx, unsigned activityId);
+    IOutputRowSerializer * createRowSerializer(ICodeContext * ctx, unsigned activityId) const;
+    IOutputRowDeserializer * createRowDeserializer(ICodeContext * ctx, unsigned activityId) const;
 
     inline IOutputMetaData * querySerializedMeta() const
     {
@@ -665,7 +665,7 @@ protected:
 class THORHELPER_API CSimpleVariableRowSerializer : public CInterface, implements IOutputRowSerializer
 {
 public:
-    CSimpleVariableRowSerializer(CachedOutputMetaData * _meta) : meta(_meta) {}
+    CSimpleVariableRowSerializer(const CachedOutputMetaData * _meta) : meta(_meta) {}
     IMPLEMENT_IINTERFACE
 
     virtual void serialize(IRowSerializerTarget & out, const byte * self)
@@ -674,7 +674,7 @@ public:
     }
 
 protected:
-    CachedOutputMetaData * meta;        // assume lifetime is shorter than this meta
+    const CachedOutputMetaData * meta;        // assume lifetime is shorter than this meta
 };
 
 //This should never be created in practice - need to use a streamer. Pseudocode below for illustration purposes only

+ 2 - 1
ecl/hthor/hthor.cpp

@@ -26,6 +26,7 @@
 #include "jlzw.hpp"
 #include "jisem.hpp"
 #include "roxiedebug.hpp"
+#include "roxierow.hpp"
 #include "eclhelper.hpp"
 #include "workunit.hpp"
 #include "jfile.hpp"
@@ -9720,7 +9721,7 @@ IHThorException * makeHThorException(ThorActivityKind kind, unsigned activityId,
 
 extern HTHOR_API IEngineRowAllocator * createHThorRowAllocator(IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
 {
-    return new HThorEngineRowAllocator(_rowManager, _meta, _activityId, _allocatorId);  
+    return createRoxieRowAllocator(_rowManager, _meta, _activityId, _allocatorId, false);
 }
 
 static class RowCallbackHook : implements IRtlRowCallback

+ 0 - 137
ecl/hthor/hthor.ipp

@@ -42,10 +42,6 @@
 #include "rtlds_imp.hpp"
 #include "rtlread_imp.hpp"
 
-#ifdef _DEBUG                   
-#define _CLEAR_ALLOCATED_ROW    
-#endif                          
-
 roxiemem::IRowManager * queryRowManager();
 #define releaseHThorRow(row) ReleaseRoxieRow(row)
 #define linkHThorRow(row) LinkRoxieRow(row)
@@ -58,139 +54,6 @@ typedef roxiemem::OwnedRoxieRow OwnedRow;
 byte * * linkHThorRowset(byte * * rowset);
 void releaseHThorRowset(unsigned count, byte * * rowset);
 
-
-//HThorEngineRowAllocator Copied from RoxieEngineRowAllocator in ccdquery.hpp
-class HThorEngineRowAllocator : public CInterface, implements IEngineRowAllocator
-{
-public:
-    HThorEngineRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
-        : rowManager(_rowManager), meta(_meta) 
-    {
-        activityId = _activityId;
-        allocatorId = _allocatorId;
-    }
-    IMPLEMENT_IINTERFACE
-
-//interface IEngineRowsetAllocator
-
-    virtual byte * * createRowset(unsigned count)
-    {
-        if (count == 0)
-            return NULL;
-        return (byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-    }
-
-    virtual void releaseRowset(unsigned count, byte * * rowset)
-    {
-        rtlReleaseRowset(count, rowset);
-    }
-
-    virtual byte * * linkRowset(byte * * rowset)
-    {
-        return rtlLinkRowset(rowset);
-    }
-
-    virtual byte * * appendRowOwn(byte * * rowset, unsigned newRowCount, void * row)
-    {
-        if (!rowset)
-            rowset = createRowset(newRowCount);
-        else
-        {
-            size32_t capacity;
-            rowset = (byte * *)rowManager.resizeRow(rowset, (newRowCount-1) * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        }
-        rowset[newRowCount-1] = (byte *)row;
-        return rowset;
-    }
-
-    virtual byte * * reallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
-    {
-        if (!rowset)
-            rowset = createRowset(newRowCount);
-        else
-        {
-            size32_t capacity;
-            rowset = (byte * *)rowManager.resizeRow(rowset, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        }
-        //New rows (if any) aren't cleared....
-        return rowset;
-    }
-
-//interface IEngineAnyRowAllocator
-    virtual void * createRow()
-    {
-        const size32_t allocSize = meta.getInitialSize();
-        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-#ifdef _CLEAR_ALLOCATED_ROW
-        memset(ret, 0xcc, allocSize); 
-#endif
-        return ret;
-    }
-
-    virtual void * createRow(size32_t & allocatedSize)
-    {
-        const size32_t allocSize = meta.getInitialSize();
-        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-#ifdef _CLEAR_ALLOCATED_ROW
-        memset(ret, 0xcc, allocSize); 
-#endif
-        allocatedSize = allocSize;
-        return ret;
-    }
-
-    virtual void releaseRow(const void * row)
-    {
-        ReleaseRoxieRow(row);
-    }
-
-    virtual void * linkRow(const void * row)
-    {
-        LinkRoxieRow(row);
-        return const_cast<void *>(row);
-    }
-
-    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
-    {
-        size32_t capacity;
-        void * ret = rowManager.resizeRow(row, size, newSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        size = capacity;
-        return ret;
-    }
-
-    virtual void * finalizeRow(size32_t newSize, void * row, size32_t oldSize)
-    {
-        unsigned id = allocatorId | ACTIVITY_FLAG_ISREGISTERED;
-        if (meta.needsDestruct()) id |= ACTIVITY_FLAG_NEEDSDESTRUCTOR;
-        return rowManager.finalizeRow(row, oldSize, newSize, id);
-    }
-
-    virtual IOutputMetaData * queryOutputMeta()
-    {
-        return meta.queryOriginal();
-    }
-    virtual unsigned queryActivityId()
-    {
-        return activityId;
-    }
-    virtual StringBuffer &getId(StringBuffer &idStr)
-    {
-        return idStr.append(activityId); // MORE - may want more context info in here
-    }
-    virtual IOutputRowSerializer *createRowSerializer(ICodeContext *ctx)
-    {
-        return meta.createRowSerializer(ctx, activityId);
-    }
-    virtual IOutputRowDeserializer *createRowDeserializer(ICodeContext *ctx)
-    {
-        return meta.createRowDeserializer(ctx, activityId);
-    }
-protected:
-    roxiemem::IRowManager & rowManager;
-    CachedOutputMetaData meta;
-    unsigned activityId;
-    unsigned allocatorId;
-};
-
 //---------------------------------------------------------------------------
 
 class CHThorException : public CInterface, implements IHThorException

+ 1 - 135
roxie/ccd/ccdquery.hpp

@@ -30,10 +30,7 @@
 #include "thorcommon.hpp"
 #include "ccddali.hpp"
 #include "thorcommon.ipp"
-
-#ifdef _DEBUG
-#define _CLEAR_ALLOCATED_ROW
-#endif
+#include "roxierow.hpp"
 
 class TranslatorArray : public CInterface, implements IInterface
 {
@@ -210,137 +207,6 @@ public:
 };
 
 
-class RoxieEngineRowAllocator : public CInterface, implements IEngineRowAllocator
-{
-public:
-    RoxieEngineRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
-        : rowManager(_rowManager), meta(_meta) 
-    {
-        activityId = _activityId;
-        allocatorId = _allocatorId;
-    }
-
-    IMPLEMENT_IINTERFACE
-
-//interface IEngineRowsetAllocator
-    virtual byte * * createRowset(unsigned count)
-    {
-        if (count == 0)
-            return NULL;
-        return (byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-    }
-
-    virtual void releaseRowset(unsigned count, byte * * rowset)
-    {
-        rtlReleaseRowset(count, rowset);
-    }
-
-    virtual byte * * linkRowset(byte * * rowset)
-    {
-        return rtlLinkRowset(rowset);
-    }
-
-    virtual byte * * appendRowOwn(byte * * rowset, unsigned newRowCount, void * row)
-    {
-        if (!rowset)
-            rowset = createRowset(newRowCount);
-        else
-        {
-            size32_t capacity;
-            rowset = (byte * *)rowManager.resizeRow(rowset, (newRowCount-1) * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        }
-        rowset[newRowCount-1] = (byte *)row;
-        return rowset;
-    }
-
-    virtual byte * * reallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
-    {
-        if (!rowset)
-            rowset = createRowset(newRowCount);
-        else
-        {
-            size32_t capacity;
-            rowset = (byte * *)rowManager.resizeRow(rowset, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        }
-        //New rows (if any) aren't cleared....
-        return rowset;
-    }
-
-//interface IEngineAnyRowAllocator
-    virtual void * createRow()
-    {
-        size32_t allocSize = meta.getInitialSize();
-        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-#ifdef _CLEAR_ALLOCATED_ROW
-        memset(ret, 0xcc, allocSize); 
-#endif
-        return ret;
-    }
-
-    virtual void * createRow(size32_t & allocatedSize)
-    {
-        const size32_t allocSize = meta.getInitialSize();
-        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-#ifdef _CLEAR_ALLOCATED_ROW
-        memset(ret, 0xcc, allocSize); 
-#endif
-        allocatedSize = allocSize;
-        return ret;
-    }
-
-    virtual void releaseRow(const void * row)
-    {
-        ReleaseRoxieRow(row);
-    }
-
-    virtual void * linkRow(const void * row)
-    {
-        LinkRoxieRow(row);
-        return const_cast<void *>(row);
-    }
-
-    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
-    {
-        size32_t capacity;
-        void * ret = rowManager.resizeRow(row, size, newSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
-        size = capacity;
-        return ret;
-    }
-
-    virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
-    {
-        unsigned id = allocatorId | ACTIVITY_FLAG_ISREGISTERED;
-        if (meta.needsDestruct()) id |= ACTIVITY_FLAG_NEEDSDESTRUCTOR;
-        return rowManager.finalizeRow(row, oldSize, finalSize, id);
-    }
-
-    virtual IOutputMetaData * queryOutputMeta()
-    {
-        return meta.queryOriginal();
-    }
-    virtual unsigned queryActivityId()
-    {
-        return activityId;
-    }
-    virtual StringBuffer &getId(StringBuffer &idStr)
-    {
-        return idStr.append(activityId); // MORE - may want more context info in here
-    }
-    virtual IOutputRowSerializer *createRowSerializer(ICodeContext *ctx)
-    {
-        return meta.createRowSerializer(ctx, activityId);
-    }
-    virtual IOutputRowDeserializer *createRowDeserializer(ICodeContext *ctx)
-    {
-        return meta.createRowDeserializer(ctx, activityId);
-    }
-protected:
-    roxiemem::IRowManager & rowManager;
-    CachedOutputMetaData meta;
-    unsigned activityId;
-    unsigned allocatorId;
-};
-
 interface IQueryDll : public IInterface
 {
     virtual HelperFactory *getFactory(const char *name) const = 0;

+ 5 - 7
roxie/ccd/ccdqueue.cpp

@@ -34,11 +34,6 @@
 #include <cppunit/extensions/HelperMacros.h>
 #endif
 
-#ifdef _DEBUG
-#define allocate(a) allocate(a, __LINE__)
-#define clone(a,b) clone(a, b, __LINE__)
-#endif
-
 CriticalSection ibytiCrit; // CAUTION - not safe to use spinlocks as real-time thread accesses
 CriticalSection queueCrit;
 unsigned channels[MAX_CLUSTER_SIZE];
@@ -2232,9 +2227,12 @@ public:
         if (pos==datalen)
             return NULL;
         assertex(pos + length <= datalen);
-        void *ret = ((char *) data) + pos;
+        void * cur = ((char *) data) + pos;
         pos += length;
-        return rowManager->clone(length, ret);
+        void * ret = rowManager->allocate(length, 0);
+        memcpy(ret, cur, length);
+        //No need for finalize since only contains plain data.
+        return ret;
     }
 };
 

+ 3 - 1
roxie/ccd/ccdserver.cpp

@@ -66,6 +66,8 @@ namespace ccdserver_hqlhelper
 #include "nbcd.hpp"
 #include "roxiehelper.hpp"
 #include "roxielmj.hpp"
+#include "roxierow.hpp"
+
 #include "thorplugin.hpp"
 #include "keybuild.hpp"
 
@@ -28063,7 +28065,7 @@ public:
     {
         // MORE - may need to do some caching/commoning up here otherwise GRAPH in a child query may use too many
         SpinBlock b(allAllocatorsLock);
-        IEngineRowAllocator * ret = new RoxieEngineRowAllocator(*rowManager, meta, activityId, allAllocators.ordinality());
+        IEngineRowAllocator * ret = createRoxieRowAllocator(*rowManager, meta, activityId, allAllocators.ordinality(), false);
         LINK(ret);
         allAllocators.append(*ret);
         return ret;

+ 0 - 4
roxie/ccd/ccdstate.hpp

@@ -31,10 +31,6 @@
 #include "ccddali.hpp"
 #include "thorcommon.ipp"
 
-#ifdef _DEBUG
-#define _CLEAR_ALLOCATED_ROW
-#endif
-
 interface IFilePartMap : public IInterface
 {
     virtual bool IsShared() const = 0;

+ 111 - 1
roxie/roxiemem/roxiemem.cpp

@@ -24,6 +24,10 @@
 #include <sys/mman.h>
 #endif
 
+#ifdef _DEBUG
+#define _CLEAR_ALLOCATED_ROW
+#endif
+
 namespace roxiemem {
 
 #define USE_MADVISE_ON_FREE     // avoid linux swapping 'freed' pages to disk
@@ -725,6 +729,15 @@ public:
                 ret += sizeof(unsigned);
                 atomic_set((atomic_t *) ret, 1);
                 ret += sizeof(atomic_t);
+#ifdef _CLEAR_ALLOCATED_ROW
+                //MORE: This should be in the header, or a member of the class
+#ifdef CHECKING_HEAP
+                const unsigned chunkOverhead = sizeof(atomic_t) + sizeof(unsigned) + sizeof(unsigned);
+#else
+                const unsigned chunkOverhead = sizeof(atomic_t) + sizeof(unsigned);
+#endif
+                memset(ret, 0xcc, size-chunkOverhead);
+#endif
                 return ret;
             }
         }
@@ -897,6 +910,9 @@ public:
     {
         atomic_inc(&count);
         assertex(size == hugeSize);
+#ifdef _CLEAR_ALLOCATED_ROW
+        memset(&data, 0xcc, size);
+#endif
         return &data;
     }
 
@@ -998,8 +1014,55 @@ public:
 
 //================================================================================
 //
+class CRoxieFixedRowHeap : implements IFixedRowHeap, public CInterface
+{
+public:
+    CRoxieFixedRowHeap(CChunkingRowManager * _rowManager, size32_t _fixedSize, unsigned _activityId, RoxieHeapFlags _flags)
+        : rowManager(_rowManager), fixedSize(_fixedSize), activityId(_activityId), flags(_flags)
+    {
+    }
+    IMPLEMENT_IINTERFACE
+
+    virtual void *allocate();
+    virtual void *finalizeRow(void *final);
+
+protected:
+    CChunkingRowManager * rowManager;       // Lifetime of rowManager is guaranteed to be longer
+    size32_t fixedSize;
+    unsigned activityId;
+    RoxieHeapFlags flags;
+};
+
+
+//================================================================================
+//
+class CRoxieVariableRowHeap : implements IVariableRowHeap, public CInterface
+{
+public:
+    CRoxieVariableRowHeap(CChunkingRowManager * _rowManager, unsigned _activityId, RoxieHeapFlags _flags)
+        : rowManager(_rowManager), activityId(_activityId), flags(_flags)
+    {
+    }
+    IMPLEMENT_IINTERFACE
+
+    virtual void *allocate(size32_t size, size32_t & capacity);
+    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity);
+    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize);
+
+protected:
+    CChunkingRowManager * rowManager;       // Lifetime of rowManager is guaranteed to be longer
+    unsigned activityId;
+    RoxieHeapFlags flags;
+};
+
+
+//================================================================================
+//
 class CChunkingRowManager : public CInterface, implements IRowManager
 {
+    friend class CRoxieFixedRowHeap;
+    friend class CRoxieVariableRowHeap;
+
     BigHeapletBase active;
     CriticalSection crit;
     unsigned pageLimit;
@@ -1427,7 +1490,7 @@ public:
         }
     }
 
-    virtual void *finalizeRow(void * original, unsigned initialSize, unsigned finalSize, unsigned activityId)
+    virtual void *finalizeRow(void * original, size32_t initialSize, size32_t finalSize, unsigned activityId)
     {
         assertex(finalSize);
         if (isCheckingHeap)
@@ -1514,9 +1577,56 @@ public:
             logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::noteDataBuffReleased dataBuffs=%u dataBuffPages=%u possibleGoers=%u dataBuff=%p rowMgr=%p", 
                     dataBuffs, dataBuffPages, possibleGoers, dataBuff, this);
     }
+
+    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, RoxieHeapFlags flags)
+    {
+        //Although the activityId is passed here, there is nothing to stop multiple RowHeaps sharing the same ChunkAllocator
+        return new CRoxieFixedRowHeap(this, fixedSize, activityId, flags);
+    }
+
+    virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, RoxieHeapFlags flags)
+    {
+        //Although the activityId is passed here, there is nothing to stop multiple RowHeaps sharing the same ChunkAllocator
+        return new CRoxieVariableRowHeap(this, activityId, flags);
+    }
 };
 
 
+void * CRoxieFixedRowHeap::allocate()
+{
+    return rowManager->allocate(fixedSize, activityId);
+}
+
+void * CRoxieFixedRowHeap::finalizeRow(void *final)
+{
+    //MORE: Checking heap checks for not shared.
+    if (flags & RHFhasdestructor)
+        HeapletBase::setDestructorFlag(final);
+    return final;
+}
+
+void * CRoxieVariableRowHeap::allocate(size32_t size, size32_t & capacity)
+{
+    void * ret = rowManager->allocate(size, activityId);
+    capacity = RoxieRowCapacity(ret);
+    return ret;
+}
+
+void * CRoxieVariableRowHeap::resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity)
+{
+    return rowManager->resizeRow(original, oldsize, newsize, activityId, capacity);
+}
+
+void * CRoxieVariableRowHeap::finalizeRow(void *final, size32_t originalSize, size32_t finalSize)
+{
+    return rowManager->finalizeRow(final, originalSize, finalSize, activityId);
+    //If never shrink the following should be sufficient.
+    //MORE: Checking heap checks for not shared.
+    if (flags & RHFhasdestructor)
+        HeapletBase::setDestructorFlag(final);
+    return final;
+}
+
 //================================================================================
 // Buffer manager - blocked 
 

+ 29 - 3
roxie/roxiemem/roxiemem.hpp

@@ -133,6 +133,8 @@ public:
         if (ptr)
         {
             HeapletBase *h = findBase(ptr);
+            //MORE: If capacity was always the size stored in the first word of the block this could be non virtual
+            //and the whole function could be inline.
             return h->_capacity();
         }
         throwUnexpected();
@@ -143,6 +145,7 @@ public:
         if (ptr)
         {
             HeapletBase *h = findBase(ptr);
+            //MORE: Should this test be debug only?
             if (h->flags & NOTE_RELEASES)
                 h->_setDestructorFlag(ptr);
             else
@@ -268,6 +271,7 @@ public:
 #define ReleaseRoxieRow(row) roxiemem::HeapletBase::release(row)
 #define ReleaseClearRoxieRow(row) roxiemem::HeapletBase::releaseClear(row)
 #define LinkRoxieRow(row) roxiemem::HeapletBase::link(row)
+#define RoxieRowCapacity(row)  roxiemem::HeapletBase::capacity(row)
 
 class OwnedRoxieRow;
 class OwnedConstRoxieRow
@@ -343,13 +347,32 @@ private:
 
 
 
+interface IFixedRowHeap : extends IInterface
+{
+    virtual void *allocate() = 0;
+    virtual void *finalizeRow(void *final) = 0;
+};
+
+interface IVariableRowHeap : extends IInterface
+{
+    virtual void *allocate(size32_t size, size32_t & capacity) = 0;
+    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity) = 0;
+    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize) = 0;
+};
+
+enum RoxieHeapFlags
+{
+    RHFnone             = 0x0000,
+    RHFpacked           = 0x0001,
+    RHFhasdestructor    = 0x0002,
+};
+
 // Variable size aggregated link-counted Roxie (etc) row manager
 interface IRowManager : extends IInterface
 {
-    virtual void *allocate(size32_t size, unsigned activityId=0) = 0;
-    virtual void *clone(size32_t size, const void *source, unsigned activityId=0) = 0;
+    virtual void *allocate(size32_t size, unsigned activityId) = 0;
     virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, unsigned activityId, size32_t &capacity) = 0;
-    virtual void *finalizeRow(void *final, unsigned originalSize, unsigned finalSize, unsigned activityId) = 0;
+    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize, unsigned activityId) = 0;
     virtual void setMemoryLimit(memsize_t size) = 0;
     virtual unsigned allocated() = 0;
     virtual unsigned pages() = 0;
@@ -361,6 +384,9 @@ interface IRowManager : extends IInterface
     virtual void reportLeaks() = 0;
     virtual unsigned maxSimpleBlock() = 0;
     virtual void checkHeap() = 0;
+    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, RoxieHeapFlags flags) = 0;
+    virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, RoxieHeapFlags flags) = 0;            // should this be passed the initial size?
+
     // Set the chunk sizes to use. Note that these sizes are before adjusting for per-row overhead
     virtual void setChunkSizes(const UnsignedArray &) = 0;
 };

+ 1 - 1
roxie/udplib/udpmsgpk.cpp

@@ -358,7 +358,7 @@ public:
                 LinkRoxieRow(res);
                 return res;
             }   
-            char *currResLoc = (char*)rowMgr->allocate(length);
+            char *currResLoc = (char*)rowMgr->allocate(length, 0);
             res = currResLoc;
             while (length && dataBuff) 
             {

+ 11 - 6
rtl/eclrtl/rtlds.cpp

@@ -272,13 +272,17 @@ byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fiel
     {
         if (!self)
             create();
-        void * next = rowAllocator->resizeRow(required, self, maxLength);
-        if (!next)
+
+        if (required > maxLength)
         {
-            rtlReportFieldOverflow(required, maxLength, fieldName);
-            return NULL;
-        }   
-        self = static_cast<byte *>(next);
+            void * next = rowAllocator->resizeRow(required, self, maxLength);
+            if (!next)
+            {
+                rtlReportFieldOverflow(required, maxLength, fieldName);
+                return NULL;
+            }
+            self = static_cast<byte *>(next);
+        }
     }
     return self;
 }
@@ -483,6 +487,7 @@ byte * * RtlLinkedDatasetBuilder::linkrows()
 void RtlLinkedDatasetBuilder::expand(size32_t required)
 {
     assertex(required < choosenLimit);
+    //MORE: Next factoring change this so it passes this logic over to the row allocator
     size32_t newMax = max ? max : 4;
     while (newMax < required)
     {