Prechádzať zdrojové kódy

Merge pull request #3724 from jakesmith/cached-allocators

HPCC-8278 Cache memory allocators

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 rokov pred
rodič
commit
950ef3e1c7

+ 3 - 1
roxie/roxiemem/roxiemem.cpp

@@ -4076,9 +4076,11 @@ protected:
         ASSERT(RoxieRowCapacity(alloc3) == capacity);
         ReleaseRoxieRow(alloc3);
     }
-    class CountingRowAllocatorCache : public IRowAllocatorCache
+    class CountingRowAllocatorCache : public CSimpleInterface, public IRowAllocatorCache
     {
     public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
         CountingRowAllocatorCache() { atomic_set(&counter, 0); }
         virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
         virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }

+ 1 - 1
roxie/roxiemem/roxiemem.hpp

@@ -63,7 +63,7 @@
 
 namespace roxiemem {
 
-interface IRowAllocatorCache
+interface IRowAllocatorCache : extends IInterface
 {
     virtual unsigned getActivityId(unsigned cacheId) const = 0;
     virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const = 0;

+ 198 - 1
roxie/roxiemem/roxierow.cpp

@@ -341,6 +341,158 @@ IEngineRowAllocator * createCrcRoxieRowAllocator(roxiemem::IRowManager & rowMana
         return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(rowManager, meta, activityId, allocatorId, flags);
 }
 
+#pragma pack(push,1) // hashing on members, so ensure contiguous
+struct AllocatorKey
+{
+    IOutputMetaData *meta;
+    unsigned activityId;
+    AllocatorKey(IOutputMetaData *_meta, unsigned &_activityId) : meta(_meta), activityId(_activityId) { }
+    bool operator==(AllocatorKey const &other) const
+    {
+        return (meta == other.meta) && (activityId == other.activityId);
+    }
+};
+#pragma pack(pop)
+
+class CAllocatorCacheItem : public OwningHTMapping<IEngineRowAllocator, AllocatorKey>
+{
+    Linked<IOutputMetaData> meta;
+public:
+    CAllocatorCacheItem(IEngineRowAllocator *allocator, AllocatorKey &key)
+        : OwningHTMapping<IEngineRowAllocator, AllocatorKey>(*allocator, key)
+    {
+        meta.set(key.meta);
+    }
+};
+
+class CAllocatorCache : public CSimpleInterface, implements IRowAllocatorMetaActIdCache
+{
+    OwningSimpleHashTableOf<CAllocatorCacheItem, AllocatorKey> cache;
+    IArrayOf<IEngineRowAllocator> allAllocators;
+    mutable SpinLock allAllocatorsLock;
+    Owned<roxiemem::IRowManager> rowManager;
+    roxiemem::RoxieHeapFlags flags;
+    IRowAllocatorMetaActIdCacheCallback *callback;
+
+    inline IEngineRowAllocator *_lookup(IOutputMetaData *meta, unsigned activityId) const
+    {
+        AllocatorKey key(meta, activityId);
+        CAllocatorCacheItem *container = cache.find(key);
+        if (!container)
+            return NULL;
+        return &container->queryElement();
+    }
+    inline bool _add(IEngineRowAllocator *allocator, IOutputMetaData *meta, unsigned activityId)
+    {
+        AllocatorKey key(meta, activityId);
+        CAllocatorCacheItem *container = new CAllocatorCacheItem(allocator, key);
+        return cache.replace(*container);
+    }
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+    CAllocatorCache(IRowAllocatorMetaActIdCacheCallback *_callback) : callback(_callback)
+    {
+    }
+// IRowAllocatorMetaActIdCache
+    inline IEngineRowAllocator *lookup(IOutputMetaData *meta, unsigned activityId) const
+    {
+        SpinBlock b(allAllocatorsLock);
+        return _lookup(meta, activityId);
+    }
+    inline bool add(IEngineRowAllocator *allocator, IOutputMetaData *meta, unsigned activityId)
+    {
+        SpinBlock b(allAllocatorsLock);
+        return _add(allocator, meta, activityId);
+    }
+    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId)
+    {
+        SpinBlock b(allAllocatorsLock);
+        IEngineRowAllocator *ret = _lookup(meta, activityId);
+        if (ret)
+            return LINK(ret);
+        assertex(allAllocators.ordinality() < ALLOCATORID_MASK);
+        ret = callback->createAllocator(meta, activityId, allAllocators.ordinality());
+        _add(LINK(ret), meta, activityId);
+        allAllocators.append(*LINK(ret));
+        return ret;
+    }
+    virtual bool remove(IOutputMetaData *meta, unsigned activityId)
+    {
+        SpinBlock b(allAllocatorsLock);
+        AllocatorKey key(meta, activityId);
+        return cache.remove(&key);
+    }
+    virtual void clear()
+    {
+        SpinBlock b(allAllocatorsLock);
+        cache.kill();
+        allAllocators.kill();
+    }
+    virtual unsigned items() const
+    {
+        return allAllocators.ordinality();
+    }
+// roxiemem::IRowAllocatorCache
+    virtual unsigned getActivityId(unsigned cacheId) const
+    {
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
+        SpinBlock b(allAllocatorsLock);
+        if (allAllocators.isItem(allocatorIndex))
+            return allAllocators.item(allocatorIndex).queryActivityId();
+        else
+        {
+            //assert(false);
+            return 12345678; // Used for tracing, better than a crash...
+        }
+    }
+    virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
+    {
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
+        SpinBlock b(allAllocatorsLock);
+        if (allAllocators.isItem(allocatorIndex))
+            return allAllocators.item(allocatorIndex).getId(out);
+        else
+        {
+            assert(false);
+            return out.append("unknown"); // Used for tracing, better than a crash...
+        }
+    }
+    virtual void onDestroy(unsigned cacheId, void *row) const
+    {
+        IEngineRowAllocator *allocator;
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
+        {
+            SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
+            if (allAllocators.isItem(allocatorIndex))
+                allocator = &allAllocators.item(allocatorIndex);
+            else
+            {
+                assert(false);
+                return;
+            }
+        }
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Give an error, but don't throw an exception!
+        }
+        allocator->queryOutputMeta()->destruct((byte *) row);
+    }
+    virtual void checkValid(unsigned cacheId, const void *row) const
+    {
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Throw an exception?
+        }
+    }
+};
+
+IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback)
+{
+    return new CAllocatorCache(callback);
+}
+
+
 
 #ifdef _USE_CPPUNIT
 #include <cppunit/extensions/HelperMacros.h>
@@ -354,6 +506,7 @@ class RoxieRowAllocatorTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testSetup);
         CPPUNIT_TEST(testChecking);
         CPPUNIT_TEST(testCleanup);
+        CPPUNIT_TEST(testAllocatorCache);
     CPPUNIT_TEST_SUITE_END();
     const IContextLogger &logctx;
 
@@ -367,9 +520,11 @@ public:
     }
 
 protected:
-    class CheckingRowAllocatorCache : public IRowAllocatorCache
+    class CheckingRowAllocatorCache : public CSimpleInterface, public IRowAllocatorCache
     {
     public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
         CheckingRowAllocatorCache() { numFailures = 0; }
         virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
         virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }
@@ -488,6 +643,48 @@ protected:
         }
     }
 
+    void testAllocatorCache()
+    {
+        IArrayOf<IOutputMetaData> metas;
+        Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL);
+        class CAllocatorCallback : implements IRowAllocatorMetaActIdCacheCallback
+        {
+            IRowManager *rm;
+        public:
+            CAllocatorCallback(IRowManager *_rm) : rm(_rm)
+            {
+            }
+            virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId) const
+            {
+                return createRoxieRowAllocator(*rm, meta, activityId, cacheId, roxiemem::RHFnone);
+            }
+        } callback(rm);
+        Owned<IRowAllocatorMetaActIdCache> allocatorCache = createRowAllocatorCache(&callback);
+        for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
+        {
+            DummyOutputMeta *meta = new DummyOutputMeta(fixedSize, fixedSize);
+            metas.append(*meta);
+
+            unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
+            Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId);
+        }
+        ASSERT(allocatorCache->items() == 64);
+        Owned<IEngineRowAllocator> allocator = allocatorCache->lookup(&metas.item(0), 1);
+        ASSERT(NULL != allocator);
+        ASSERT(allocatorCache->remove(&metas.item(0), 1));
+        ASSERT(allocatorCache->add(allocator.getClear(), &metas.item(0), 1));
+
+        for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
+        {
+            unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
+            IOutputMetaData *meta = &metas.item(fixedSize-1); // from 1st round
+            Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId);
+        }
+        ASSERT(allocatorCache->items() == 64);
+
+        metas.kill();
+        allocatorCache.clear();
+    }
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieRowAllocatorTests );

+ 17 - 0
roxie/roxiemem/roxierow.hpp

@@ -37,6 +37,23 @@
 extern roxiemem_decl IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags flags);
 extern roxiemem_decl IEngineRowAllocator * createCrcRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags);
 
+interface IRowAllocatorMetaActIdCache : extends roxiemem::IRowAllocatorCache
+{
+    virtual bool add(IEngineRowAllocator *allocator, IOutputMetaData *meta, unsigned activityId) = 0;
+    virtual bool remove(IOutputMetaData *meta, unsigned activityId) = 0;
+    virtual IEngineRowAllocator *lookup(IOutputMetaData *meta, unsigned activityId) const = 0;
+    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId) = 0;
+    virtual void clear() = 0;
+    virtual unsigned items() const = 0;
+};
+
+interface IRowAllocatorMetaActIdCacheCallback
+{
+    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId) const = 0;
+};
+
+extern roxiemem_decl IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback);
+
 extern roxiemem_decl bool isRowCheckValid(unsigned allocatorId, const void * row);
 
 //Inline call which avoids the call if no row checking is enabled.

+ 11 - 4
system/jlib/jsuperhash.hpp

@@ -347,7 +347,7 @@ public:
     virtual void onRemove(void *et) { ((ET *)et)->Release(); }
 };
 
-// template mapping object for base type to arbitary object
+// template mapping object for base type to arbitrary object
 template <class ET, class FP>
 class HTMapping : public CInterface
 {
@@ -363,11 +363,18 @@ public:
 
 // template mapping object for base type to IInterface object
 template <class ET, class FP>
-class LinkedHTMapping : public HTMapping<ET, FP>
+class OwningHTMapping : public HTMapping<ET, FP>
 {
 public:
-    LinkedHTMapping(ET &et, FP &fp) : HTMapping<ET, FP>(et, fp) { this->et.Link(); }
-    ~LinkedHTMapping() { this->et.Release(); }
+    OwningHTMapping(ET &et, FP &fp) : HTMapping<ET, FP>(et, fp) { }
+    ~OwningHTMapping() { this->et.Release(); }
+};
+
+template <class ET, class FP>
+class LinkedHTMapping : public OwningHTMapping<ET, FP>
+{
+public:
+    LinkedHTMapping(ET &et, FP &fp) : OwningHTMapping<ET, FP>(et, fp) { this->et.Link(); }
 };
 
 // template mapping object for string to arbitrary object

+ 57 - 72
thorlcr/thorutil/thmem.cpp

@@ -1705,39 +1705,80 @@ ILargeMemLimitNotify *createMultiThorResourceMutex(const char *grpname,CSDSServe
     return new cMultiThorResourceMutex(grpname,_status);
 }
 
+#pragma pack(push,1) // hashing on members, so ensure contiguous
+struct ThorAllocatorKey
+{
+    IOutputMetaData *meta;
+    unsigned activityId;
+    ThorAllocatorKey(IOutputMetaData *_meta, unsigned &_activityId) : meta(_meta), activityId(_activityId) { }
+    bool operator==(ThorAllocatorKey const &other) const
+    {
+        return (meta == other.meta) && (activityId == other.activityId);
+    }
+};
+#pragma pack(pop)
 
-class CThorAllocator : public CSimpleInterface, implements roxiemem::IRowAllocatorCache, implements IRtlRowCallback, implements IThorAllocator
+class CThorAllocatorCacheItem : public OwningHTMapping<IEngineRowAllocator, ThorAllocatorKey>
+{
+    Linked<IOutputMetaData> meta;
+public:
+    CThorAllocatorCacheItem(IEngineRowAllocator *allocator, ThorAllocatorKey &key)
+        : OwningHTMapping<IEngineRowAllocator, ThorAllocatorKey>(*allocator, key)
+    {
+        meta.set(key.meta);
+    }
+};
+
+class CThorAllocatorCache : private OwningSimpleHashTableOf<CThorAllocatorCacheItem, ThorAllocatorKey>
+{
+public:
+    inline IEngineRowAllocator *find(IOutputMetaData *meta, unsigned activityId) const
+    {
+        ThorAllocatorKey key(meta, activityId);
+        CThorAllocatorCacheItem *container = OwningSimpleHashTableOf::find(key);
+        if (!container)
+            return NULL;
+        return &container->queryElement();
+    }
+    inline bool add(IEngineRowAllocator *allocator, IOutputMetaData *meta, unsigned activityId)
+    {
+        ThorAllocatorKey key(meta, activityId);
+        CThorAllocatorCacheItem *container = new CThorAllocatorCacheItem(allocator, key);
+        return replace(*container);
+    }
+};
+class CThorAllocator : public CSimpleInterface, implements IRtlRowCallback, implements IThorAllocator, implements IRowAllocatorMetaActIdCacheCallback
 {
 protected:
-    mutable IArrayOf<IEngineRowAllocator> allAllocators;
-    mutable SpinLock allAllocatorsLock;
+    mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
     Owned<roxiemem::IRowManager> rowManager;
     roxiemem::RoxieHeapFlags flags;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
     CThorAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags _flags) : flags(_flags)
     {
-        rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), this, false));
+        allocatorMetaCache.setown(createRowAllocatorCache(this));
+        rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), allocatorMetaCache, false));
         rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
         rtlSetReleaseRowHook(this);
     }
     ~CThorAllocator()
     {
         rowManager.clear();
-        allAllocators.kill();
+        allocatorMetaCache.clear();
         rtlSetReleaseRowHook(NULL); // nothing should use it beyond this point anyway
     }
-
+// roxiemem::IRowAllocatorMetaActIdCacheCallback
+    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id) const
+    {
+        return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
+    }
 // IThorAllocator
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
     {
-        // MORE - may need to do some caching/commoning up here otherwise GRAPH in a child query may use too many
-        SpinBlock b(allAllocatorsLock);
-        IEngineRowAllocator *ret = createRoxieRowAllocator(*rowManager, meta, activityId, allAllocators.ordinality(), flags);
-        LINK(ret);
-        allAllocators.append(*ret);
-        return ret;
+        return allocatorMetaCache->ensure(meta, activityId);
     }
     virtual roxiemem::IRowManager *queryRowManager() const
     {
@@ -1746,58 +1787,6 @@ public:
     virtual roxiemem::RoxieHeapFlags queryFlags() const { return flags; }
     virtual bool queryCrc() const { return false; }
 
-// IRowAllocatorCache
-    virtual unsigned getActivityId(unsigned cacheId) const
-    {
-        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
-        SpinBlock b(allAllocatorsLock);
-        if (allAllocators.isItem(allocatorIndex))
-            return allAllocators.item(allocatorIndex).queryActivityId();
-        else
-        {
-            //assert(false);
-            return 12345678; // Used for tracing, better than a crash...
-        }
-    }
-    virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
-    {
-        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
-        SpinBlock b(allAllocatorsLock);
-        if (allAllocators.isItem(allocatorIndex))
-            return allAllocators.item(allocatorIndex).getId(out);
-        else
-        {
-            assert(false);
-            return out.append("unknown"); // Used for tracing, better than a crash...
-        }
-    }
-    virtual void onDestroy(unsigned cacheId, void *row) const
-    {
-        IEngineRowAllocator *allocator;
-        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
-        {
-            SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
-            if (allAllocators.isItem(allocatorIndex))
-                allocator = &allAllocators.item(allocatorIndex);
-            else
-            {
-                assert(false);
-                return;
-            }
-        }
-        if (!RoxieRowCheckValid(cacheId, row))
-        {
-            //MORE: Give an error, but don't throw an exception!
-        }
-        allocator->queryOutputMeta()->destruct((byte *) row);
-    }
-    virtual void checkValid(unsigned cacheId, const void *row) const
-    {
-        if (!RoxieRowCheckValid(cacheId, row))
-        {
-            //MORE: Throw an exception?
-        }
-    }
 // IRtlRowCallback
     virtual void releaseRow(const void * row) const
     {
@@ -1838,16 +1827,12 @@ public:
     {
     }
 // IThorAllocator
-    virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
+    virtual bool queryCrc() const { return true; }
+// roxiemem::IRowAllocatorMetaActIdCacheCallback
+    virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId) const
     {
-        // MORE - may need to do some caching/commoning up here otherwise GRAPH in a child query may use too many
-        SpinBlock b(allAllocatorsLock);
-        IEngineRowAllocator *ret = createCrcRoxieRowAllocator(*rowManager, meta, activityId, allAllocators.ordinality(), flags);
-        LINK(ret);
-        allAllocators.append(*ret);
-        return ret;
+        return createCrcRoxieRowAllocator(*rowManager, meta, activityId, cacheId, flags);
     }
-    virtual bool queryCrc() const { return true; }
 };