Ver código fonte

HPCC-8498 In roxiemem, check that pointer being linked/released is in the heap

Previously, linkRoxieRow and releaseRoxieRow would core when passed an
invalid pointer.

With this change, we treat any pointer that was not allocated in the Roxie
heap as (in effect) pointing to 'infinitely linked' data, which means we can
pass constant rows around without having to first clone them.

Moving the code out of the header has prevented gcc from inlining it, which
appears to make Roxie faster (probably because the code stays in the cache
more often). Replacing the NULL check with a test against the range of the
Roxie heap appears to have no significant impact on performance.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 anos atrás
pai
commit
9bb385c399
3 arquivos alterados com 85 adições e 75 exclusões
  1. 1 5
      ecl/hthor/hthorkey.cpp
  2. 72 0
      roxie/roxiemem/roxiemem.cpp
  3. 12 70
      roxie/roxiemem/roxiemem.hpp

+ 1 - 5
ecl/hthor/hthorkey.cpp

@@ -2103,8 +2103,7 @@ public:
         while (!stopped)
         {
             const void * row = getRow();
-            if(row)
-                releaseHThorRow(row);
+            releaseHThorRow(row);
         }
         clearQueue();
         waitForThreads();
@@ -3373,9 +3372,6 @@ public:
     {
         clearQueue();
         waitForThreads();
-        if (defaultRight)
-            releaseHThorRow(defaultRight);
-        defaultRight.getClear();
         rtlFree(activityRecordMetaBuff);
     }
 

+ 72 - 0
roxie/roxiemem/roxiemem.cpp

@@ -89,6 +89,7 @@ const unsigned maxLeakReport = 4;
 #endif
 
 static char *heapBase;
+static char *heapEnd;   // Equal to heapBase + (heapTotalPages * page size)
 static unsigned *heapBitmap;
 static unsigned heapBitmapSize;
 static unsigned heapTotalPages; // derived from heapBitmapSize - here for code clarity
@@ -151,6 +152,7 @@ static void initializeHeap(unsigned pages, unsigned largeBlockGranularity, ILarg
         HEAPERROR("RoxieMemMgr: Unable to create heap");
     }
 #endif
+    heapEnd = heapBase + memsize;
     heapBitmap = new unsigned [heapBitmapSize];
     memset(heapBitmap, 0xff, heapBitmapSize*sizeof(unsigned));
     heapLargeBlocks = 1;
@@ -177,6 +179,7 @@ extern void releaseRoxieHeap()
         free(heapBase);
 #endif
         heapBase = NULL;
+        heapEnd = NULL;
     }
 }
 
@@ -701,6 +704,72 @@ static inline unsigned getRealActivityId(unsigned allocatorId, const IRowAllocat
         return allocatorId & MAX_ACTIVITY_ID;
 }
 
+static inline bool isValidRoxiePtr(const void *_ptr)
+{
+    const char *ptr = (const char *) _ptr;
+    return ptr >= heapBase && ptr < heapEnd;
+}
+
+void HeapletBase::release(const void *ptr)
+{
+    if (isValidRoxiePtr(ptr))
+    {
+        HeapletBase *h = findBase(ptr);
+        h->noteReleased(ptr);
+    }
+}
+
+void HeapletBase::link(const void *ptr)
+{
+    if (isValidRoxiePtr(ptr))
+    {
+        HeapletBase *h = findBase(ptr);
+        h->noteLinked(ptr);
+    }
+}
+
+bool HeapletBase::isShared(const void *ptr)
+{
+    if (isValidRoxiePtr(ptr))
+    {
+        HeapletBase *h = findBase(ptr);
+        return h->_isShared(ptr);
+    }
+    if (ptr)
+        return true;  // Objects outside the Roxie heap are implicitly 'infinitely shared'
+    // isShared(NULL) is an error
+    throwUnexpected();
+}
+
+memsize_t HeapletBase::capacity(const void *ptr)
+{
+    if (isValidRoxiePtr(ptr))
+    {
+        HeapletBase *h = findBase(ptr);
+        return h->_capacity();
+    }
+    throwUnexpected();   // should never ask about capacity of anything but a row you allocated from Roxie heap
+}
+
+void HeapletBase::setDestructorFlag(const void *ptr)
+{
+    dbgassertex(isValidRoxiePtr(ptr));
+    HeapletBase *h = findBase(ptr);
+    h->_setDestructorFlag(ptr);
+}
+
+bool HeapletBase::hasDestructor(const void *ptr)
+{
+    if (isValidRoxiePtr(ptr))
+    {
+        HeapletBase *h = findBase(ptr);
+        return h->_hasDestructor(ptr);
+    }
+    else
+        return false;
+}
+
+
 class BigHeapletBase : public HeapletBase
 {
     friend class CChunkingHeap;
@@ -3608,6 +3677,7 @@ protected:
         HeapPreserver()
         {
             _heapBase = heapBase;
+            _heapEnd = heapEnd;
             _heapBitmap = heapBitmap;
             _heapBitmapSize = heapBitmapSize;
             _heapTotalPages = heapTotalPages;
@@ -3617,6 +3687,7 @@ protected:
         ~HeapPreserver()
         {
             heapBase = _heapBase;
+            heapEnd = _heapEnd;
             heapBitmap = _heapBitmap;
             heapBitmapSize = _heapBitmapSize;
             heapTotalPages = _heapTotalPages;
@@ -3624,6 +3695,7 @@ protected:
             heapAllocated = _heapAllocated;
         }
         char *_heapBase;
+        char *_heapEnd;
         unsigned *_heapBitmap;
         unsigned _heapBitmapSize;
         unsigned _heapTotalPages;

+ 12 - 70
roxie/roxiemem/roxiemem.hpp

@@ -124,82 +124,24 @@ public:
         return atomic_read(&count) < DEAD_PSEUDO_COUNT;        //only safe if Link() is called first
     }
 
-    static void release(const void *ptr)
-    {
-        if (ptr)
-        {
-            HeapletBase *h = findBase(ptr);
-            h->noteReleased(ptr);
-        }
-    }
-
-    static bool isShared(const void *ptr)
-    {
-        if (ptr)
-        {
-            HeapletBase *h = findBase(ptr);
-            return h->_isShared(ptr);
-        }
-        // isShared(NULL) or isShared on an object that shares a link-count is an error
-        throwUnexpected();
-    }
+    static void release(const void *ptr);
+    static bool isShared(const void *ptr);
+    static void link(const void *ptr);
+    static memsize_t capacity(const void *ptr);
 
-    static memsize_t capacity(const void *ptr)
-    {
-        if (ptr)
-        {
-            HeapletBase *h = findBase(ptr);
-            //MORE: If capacity was always the size stored in the first word of the block this could be non virtual
-            //and the whole function could be inline.
-            return h->_capacity();
-        }
-        throwUnexpected();
-    }
-
-    static void setDestructorFlag(const void *ptr)
-    {
-        dbgassertex(ptr);
-        HeapletBase *h = findBase(ptr);
-        h->_setDestructorFlag(ptr);
-    }
-
-    static bool hasDestructor(const void *ptr)
-    {
-        dbgassertex(ptr);
-        HeapletBase *h = findBase(ptr);
-        return h->_hasDestructor(ptr);
-    }
-
-    static unsigned getAllocatorId(const void *ptr)
-    {
-        dbgassertex(ptr);
-        HeapletBase *h = findBase(ptr);
-        unsigned id = h->_rawAllocatorId(ptr);
-        return (id & ACTIVITY_MASK);
-    }
-
-    static void releaseClear(const void *&ptr)
-    {
-        if (ptr)
-        {
-            release(ptr);
-            ptr = NULL;
-        }
-    }
+    static void setDestructorFlag(const void *ptr);
+    static bool hasDestructor(const void *ptr);
 
-    static void releaseClear(void *&ptr)
+    static inline void releaseClear(const void *&ptr)
     {
-        if (ptr)
-        {
-            release(ptr);
-            ptr = NULL;
-        }
+        release(ptr);
+        ptr = NULL;
     }
 
-    static void link(const void *ptr)
+    static inline void releaseClear(void *&ptr)
     {
-        HeapletBase *h = findBase(ptr);
-        h->noteLinked(ptr);
+        release(ptr);
+        ptr = NULL;
     }
 
     inline unsigned queryCount() const