Pārlūkot izejas kodu

Merge pull request #8593 from jakesmith/hpcc-15489

HPCC-15489 Fix crash with child datasets and multiple channels

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 gadi atpakaļ
vecāks
revīzija
368e345c9f

+ 8 - 18
common/thorhelper/roxierow.cpp

@@ -386,9 +386,9 @@ IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * c
 struct AllocatorKey
 {
     IOutputMetaData *meta;
-    unsigned activityId;
+    unsigned __int64 activityId;
     roxiemem::RoxieHeapFlags flags;
-    AllocatorKey(IOutputMetaData *_meta, unsigned &_activityId, roxiemem::RoxieHeapFlags _flags)
+    AllocatorKey(IOutputMetaData *_meta, unsigned __int64 &_activityId, roxiemem::RoxieHeapFlags _flags)
         : meta(_meta), activityId(_activityId), flags(_flags)
     {
     }
@@ -412,7 +412,7 @@ public:
     unsigned queryAllocatorId() const { return allocatorId; }
 };
 
-class CAllocatorCache : public CSimpleInterface, implements IRowAllocatorMetaActIdCache
+class CAllocatorCache : public CSimpleInterfaceOf<IRowAllocatorMetaActIdCache>
 {
     OwningSimpleHashTableOf<CAllocatorCacheItem, AllocatorKey> cache;
     IArrayOf<IEngineRowAllocator> allAllocators;
@@ -420,39 +420,29 @@ class CAllocatorCache : public CSimpleInterface, implements IRowAllocatorMetaAct
     Owned<roxiemem::IRowManager> rowManager;
     IRowAllocatorMetaActIdCacheCallback *callback;
 
-    inline CAllocatorCacheItem *_lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+    inline CAllocatorCacheItem *lookup(IOutputMetaData *meta, unsigned __int64 activityId, roxiemem::RoxieHeapFlags flags) const
     {
         AllocatorKey key(meta, activityId, flags);
         return cache.find(key);
     }
 public:
-    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-
     CAllocatorCache(IRowAllocatorMetaActIdCacheCallback *_callback) : callback(_callback)
     {
     }
 // IRowAllocatorMetaActIdCache
-    inline IEngineRowAllocator *lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
-    {
-        SpinBlock b(allAllocatorsLock);
-        CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
-        if (!container)
-            return NULL;
-        return &container->queryElement();
-    }
-    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags)
+    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned __int64 activityId, roxiemem::RoxieHeapFlags flags) override
     {
         SpinBlock b(allAllocatorsLock);
         loop
         {
-            CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
+            CAllocatorCacheItem *container = lookup(meta, activityId, flags);
             if (container)
             {
                 if (0 == (roxiemem::RHFunique & flags))
                     return LINK(&container->queryElement());
                 // if in cache but unique, reuse allocatorId
                 SpinUnblock b(allAllocatorsLock);
-                return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
+                return callback->createAllocator(this, meta, (unsigned)(activityId & ACTIVITY_MASK), container->queryAllocatorId(), flags);
             }
             // NB: a RHFunique allocator, will cause 1st to be added to 'allAllocators'
             // subsequent requests for the same type of unique allocator, will share same allocatorId
@@ -463,7 +453,7 @@ public:
             IEngineRowAllocator *ret;
             {
                 SpinUnblock b(allAllocatorsLock);
-                ret = callback->createAllocator(this, meta, activityId, allocatorId, flags);
+                ret = callback->createAllocator(this, meta, (unsigned)(activityId & ACTIVITY_MASK), allocatorId, flags);
                 assertex(ret);
             }
             if (allocatorId == allAllocators.ordinality())

+ 1 - 1
common/thorhelper/roxierow.hpp

@@ -25,7 +25,7 @@
 
 interface IRowAllocatorMetaActIdCache : extends roxiemem::IRowAllocatorCache
 {
-    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) = 0;
+    virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned __int64 activityId, roxiemem::RoxieHeapFlags flags) = 0;
     virtual unsigned items() const = 0;
 };
 

+ 8 - 5
thorlcr/thorutil/thmem.cpp

@@ -2289,6 +2289,7 @@ protected:
     roxiemem::RoxieHeapFlags defaultFlags;
     IContextLogger *logctx;
     unsigned numChannels;
+    unsigned __int64 channelBits = 0;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -2315,19 +2316,21 @@ public:
             rowManager->setReleaseWhenModifyCallback(true, true);
         }
     }
-    CThorAllocator(IThorAllocator &sharedAllocator, unsigned channel)
+    CThorAllocator(CThorAllocator &sharedAllocator, unsigned channel)
     {
-        allocatorMetaCache.setown(createRowAllocatorCache(this));
+        allocatorMetaCache.set(sharedAllocator.queryAllocateCache());
         rowManager.set(sharedAllocator.queryRowManager()->querySlaveRowManager(channel));
         defaultFlags = sharedAllocator.queryFlags();
         logctx = sharedAllocator.queryLoggingContext();
         numChannels = 0;
+        channelBits = ((unsigned __int64)(channel+1)) << 32; // channel bits occupy top 4 bytes;
     }
     ~CThorAllocator()
     {
         rowManager.clear();
         allocatorMetaCache.clear();
     }
+    IRowAllocatorMetaActIdCache *queryAllocateCache() const { return allocatorMetaCache; }
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
     virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
     {
@@ -2336,11 +2339,11 @@ public:
 // IThorAllocator
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, activity_id activityId, roxiemem::RoxieHeapFlags flags) const
     {
-        return allocatorMetaCache->ensure(meta, activityId, flags);
+        return allocatorMetaCache->ensure(meta, ((unsigned __int64)activityId) | channelBits, flags);
     }
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, activity_id activityId) const
     {
-        return allocatorMetaCache->ensure(meta, activityId, defaultFlags);
+        return allocatorMetaCache->ensure(meta, ((unsigned __int64)activityId) | channelBits, defaultFlags);
     }
     virtual roxiemem::IRowManager *queryRowManager() const
     {
@@ -2364,7 +2367,7 @@ public:
         : CThorAllocator(memLimitMB, sharedMemLimitMB, numChannels, memorySpillAtPercentage, logctx, flags)
     {
     }
-    CThorCrcCheckingAllocator(IThorAllocator &sharedAllocator, unsigned channel) : CThorAllocator(sharedAllocator, channel)
+    CThorCrcCheckingAllocator(CThorCrcCheckingAllocator &sharedAllocator, unsigned channel) : CThorAllocator(sharedAllocator, channel)
     {
     }
 // IThorAllocator