浏览代码

HPCC-8237 Add options to stress test callback usage in roxiemem

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 12 年之前
父节点
当前提交
8a0140f7cf
共有 5 个文件被更改,包括 230 次插入49 次删除
  1. 198 47
      roxie/roxiemem/roxiemem.cpp
  2. 17 2
      roxie/roxiemem/roxiemem.hpp
  3. 5 0
      system/jlib/jthread.cpp
  4. 1 0
      system/jlib/jthread.hpp
  5. 9 0
      thorlcr/thorutil/thmem.cpp

+ 198 - 47
roxie/roxiemem/roxiemem.cpp

@@ -1876,10 +1876,15 @@ protected:
 
 class BufferedRowCallbackManager
 {
-    class ReleaseBufferThread : public Thread
+    friend class ReleaseBufferThread;
+
+    class BackgroundReleaseBufferThread : public Thread
     {
     public:
-        ReleaseBufferThread(BufferedRowCallbackManager * _owner) : Thread("ReleaseBufferThread"), owner(_owner) {};
+        BackgroundReleaseBufferThread(BufferedRowCallbackManager * _owner)
+        : Thread("BackgroundReleaseBufferThread"), owner(_owner)
+        {
+        }
 
         virtual int run()
         {
@@ -1891,12 +1896,73 @@ class BufferedRowCallbackManager
         BufferedRowCallbackManager * owner;
     };
 
+    class ReleaseBufferThread : public Thread
+    {
+    public:
+        ReleaseBufferThread(BufferedRowCallbackManager & _owner)
+        : Thread("ReleaseBufferThread"), owner(_owner), abort(false)
+        {
+        }
+
+        virtual int run()
+        {
+            loop
+            {
+                goSem.wait();
+                if (abort)
+                    break;
+                args.result = owner.releaseBuffersNow(args.critical, false, 0);
+                doneSem.signal();
+            }
+            return 0;
+        }
+
+        bool releaseBuffers(const bool critical)
+        {
+            if (isCurrentThread())
+                return owner.releaseBuffersNow(critical, false, 0);
+
+            bool ok;
+            {
+                CriticalBlock block(cs);
+                args.critical = critical;
+                goSem.signal();
+                doneSem.wait();
+                ok = args.result;
+            }
+            return ok;
+        }
+
+        void stop()
+        {
+            //This should not be called while any active heap operations could be occuring
+            CriticalBlock block(cs);
+            abort = true;
+            goSem.signal();
+        }
+
+    private:
+        BufferedRowCallbackManager & owner;
+        CriticalSection cs;
+        Semaphore goSem;
+        Semaphore doneSem;
+        struct
+        {
+            bool critical;
+            bool result;
+        } args;
+        bool abort;
+    };
+
 public:
     BufferedRowCallbackManager(IRowManager * _owner) : owner(_owner)
     {
         atomic_set(&releasingBuffers, 0);
         atomic_set(&releaseSeq, 0);
         abortBufferThread = false;
+        minCallbackThreshold = 1;
+        releaseWhenModifyCallback = false;
+        releaseWhenModifyCallbackCritical = false;
     }
     ~BufferedRowCallbackManager()
     {
@@ -1909,6 +1975,9 @@ public:
 
     void addRowBuffer(IBufferedRowCallback * callback)
     {
+        if (releaseWhenModifyCallback)
+            releaseBuffers(releaseWhenModifyCallbackCritical, false, 0);
+
         CriticalBlock block(callbackCrit);
         //Assuming a small number so perform an insertion sort.
         unsigned max = rowBufferCallbacks.ordinality();
@@ -1926,6 +1995,9 @@ public:
 
     void removeRowBuffer(IBufferedRowCallback * callback)
     {
+        if (releaseWhenModifyCallback)
+            releaseBuffers(releaseWhenModifyCallbackCritical, false, 0);
+
         CriticalBlock block(callbackCrit);
         rowBufferCallbacks.zap(callback);
         updateCallbackInfo();
@@ -1975,6 +2047,90 @@ public:
         }
     }
 
+    //Release buffers will ensure that the rows are attempted to be cleaned up before returning
+    bool releaseBuffers(const bool critical, bool checkSequence, unsigned prevReleaseSeq)
+    {
+        if (!releaseBuffersThread)
+            return releaseBuffersNow(critical, checkSequence, prevReleaseSeq);
+        return releaseBuffersThread->releaseBuffers(critical);
+    }
+
+    void runReleaseBufferThread()
+    {
+        loop
+        {
+            releaseBuffersSem.wait();
+            if (abortBufferThread)
+                break;
+            releaseBuffersNow(false, false, 0);
+            atomic_set(&releasingBuffers, 0);
+        }
+    }
+
+    void releaseBuffersInBackground()
+    {
+        if (atomic_cas(&releasingBuffers, 1, 0))
+        {
+            assertex(backgroundReleaseBuffersThread);
+            releaseBuffersSem.signal();
+        }
+    }
+
+    void startReleaseBufferThread()
+    {
+        if (!backgroundReleaseBuffersThread)
+        {
+            backgroundReleaseBuffersThread.setown(new BackgroundReleaseBufferThread(this));
+            backgroundReleaseBuffersThread->start();
+        }
+    }
+
+    void stopReleaseBufferThread()
+    {
+        if (backgroundReleaseBuffersThread)
+        {
+            abortBufferThread = true;
+            releaseBuffersSem.signal();
+            backgroundReleaseBuffersThread->join();
+            backgroundReleaseBuffersThread.clear();
+            abortBufferThread = false;
+        }
+    }
+
+    void setCallbackOnThread(bool value)
+    {
+        //May crash if called in parallel with any ongoing memory operations.
+        if (value)
+        {
+            if (!releaseBuffersThread)
+            {
+                releaseBuffersThread.setown(new ReleaseBufferThread(*this));
+                releaseBuffersThread->start();
+            }
+        }
+        else
+        {
+            if (releaseBuffersThread)
+            {
+                releaseBuffersThread->stop();
+                releaseBuffersThread->join();
+                releaseBuffersThread.clear();
+            }
+        }
+    }
+
+    void setMemoryCallbackThreshold(unsigned value)
+    {
+        minCallbackThreshold = value;
+    }
+
+    virtual void setReleaseWhenModifyCallback(bool value, bool critical)
+    {
+        releaseWhenModifyCallback = value;
+        releaseWhenModifyCallbackCritical = critical;
+    }
+
+protected:
     bool doReleaseBuffers(const bool critical, const unsigned minSuccess)
     {
         const unsigned numCallbacks = rowBufferCallbacks.ordinality();
@@ -2018,8 +2174,9 @@ public:
     }
 
     //Release buffers will ensure that the rows are attempted to be cleaned up before returning
-    bool releaseBuffers(const bool critical, const unsigned minSuccess, bool checkSequence, unsigned prevReleaseSeq)
+    bool releaseBuffersNow(const bool critical, bool checkSequence, unsigned prevReleaseSeq)
     {
+        const unsigned minSuccess = minCallbackThreshold;
         CriticalBlock block(callbackCrit);
         //While we were waiting check if something freed some memory up
         //if so try again otherwise we might end up calling more callbacks than we need to.
@@ -2046,59 +2203,21 @@ public:
         return false;
     }
 
-    void runReleaseBufferThread()
-    {
-        loop
-        {
-            releaseBuffersSem.wait();
-            if (abortBufferThread)
-                break;
-            releaseBuffers(false, 1, false, 0);
-            atomic_set(&releasingBuffers, 0);
-        }
-    }
-
-    void releaseBuffersInBackground()
-    {
-        if (atomic_cas(&releasingBuffers, 1, 0))
-        {
-            assertex(releaseBuffersThread);
-            releaseBuffersSem.signal();
-        }
-    }
-
-    void startReleaseBufferThread()
-    {
-        if (!releaseBuffersThread)
-        {
-            releaseBuffersThread.setown(new ReleaseBufferThread(this));
-            releaseBuffersThread->start();
-        }
-    }
-
-    void stopReleaseBufferThread()
-    {
-        if (releaseBuffersThread)
-        {
-            abortBufferThread = true;
-            releaseBuffersSem.signal();
-            releaseBuffersThread->join();
-            releaseBuffersThread.clear();
-            abortBufferThread = false;
-        }
-    }
-
 protected:
     CriticalSection callbackCrit;
     Semaphore releaseBuffersSem;
     PointerArrayOf<IBufferedRowCallback> rowBufferCallbacks;
     PointerArrayOf<IBufferedRowCallback> activeCallbacks;
+    Owned<BackgroundReleaseBufferThread> backgroundReleaseBuffersThread;
     Owned<ReleaseBufferThread> releaseBuffersThread;
     UnsignedArray callbackRanges;  // the maximum index of the callbacks for the nth priority
     UnsignedArray nextCallbacks;  // the next call back to try and free for the nth priority
     IRowManager * owner;
     atomic_t releasingBuffers;  // boolean if pre-emptive releasing thread is active
     atomic_t releaseSeq;
+    unsigned minCallbackThreshold;
+    bool releaseWhenModifyCallback;
+    bool releaseWhenModifyCallbackCritical;
     volatile bool abortBufferThread;
 };
 
@@ -2159,6 +2278,8 @@ class CChunkingRowManager : public CInterface, implements IRowManager
     unsigned __int64 cyclesCheckInterval; // How often we need to check timelimit
     bool ignoreLeaks;
     bool trackMemoryByActivity;
+    bool minimizeFootprint;
+    bool minimizeFootprintCritical;
 
     inline unsigned getActivityId(unsigned rawId) const
     {
@@ -2193,6 +2314,8 @@ public:
         activeBuffs = NULL;
         dataBuffPages = 0;
         ignoreLeaks = _ignoreLeaks;
+        minimizeFootprint = false;
+        minimizeFootprintCritical = false;
 #ifdef _DEBUG
         trackMemoryByActivity = true; 
 #else
@@ -2486,6 +2609,27 @@ public:
             logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::setMemoryLimit new memlimit=%"I64F"u pageLimit=%u spillLimit=%u rowMgr=%p", (unsigned __int64) bytes, pageLimit, spillPageLimit, this);
     }
 
+    virtual void setMemoryCallbackThreshold(unsigned value)
+    {
+        callbacks.setMemoryCallbackThreshold(value);
+    }
+
+    virtual void setCallbackOnThread(bool value)
+    {
+        callbacks.setCallbackOnThread(value);
+    }
+
+    virtual void setMinimizeFootprint(bool value, bool critical)
+    {
+        minimizeFootprint = value;
+        minimizeFootprintCritical = critical;
+    }
+
+    virtual void setReleaseWhenModifyCallback(bool value, bool critical)
+    {
+        callbacks.setReleaseWhenModifyCallback(value, critical);
+    }
+
     virtual void resizeRow(memsize_t &capacity, void * & ptr, memsize_t copysize, memsize_t newsize, unsigned activityId)
     {
         void * const original = ptr;
@@ -2640,6 +2784,9 @@ public:
     {
         unsigned totalPages;
         releaseEmptyPages(false);
+        if (minimizeFootprint)
+            callbacks.releaseBuffers(minimizeFootprintCritical, false, 0);
+
         loop
         {
             unsigned lastReleaseSeq = callbacks.getReleaseSeq();
@@ -2670,7 +2817,7 @@ public:
             //The following reduces the nubmer of times the callback is called, but I'm not sure how this affects
             //performance.  I think better if a single free is likely to free up some memory, and worse if not.
             const bool skipReleaseIfAnotherThreadReleases = true;
-            if (!callbacks.releaseBuffers(true, 1, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
+            if (!callbacks.releaseBuffers(true, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
             {
                 //Check if a background thread has freed up some memory.  That can be checked by a comparing value of a counter
                 //which is incremented each time releaseBuffers is successful.
@@ -2709,7 +2856,7 @@ public:
 
     bool releaseCallbackMemory(bool critical)
     {
-        return callbacks.releaseBuffers(critical, 1, false, 0);
+        return callbacks.releaseBuffers(critical, false, 0);
     }
 
 protected:
@@ -4699,6 +4846,10 @@ protected:
         CountingRowAllocatorCache rowCache;
         Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, &rowCache);
         rowManager->setMemoryLimit(pages * numCasThreads * HEAP_ALIGNMENT_SIZE, spillPages * numCasThreads * HEAP_ALIGNMENT_SIZE);
+        rowManager->setMemoryCallbackThreshold((unsigned)-1);
+        rowManager->setCallbackOnThread(true);
+        rowManager->setMinimizeFootprint(true, true);
+        rowManager->setReleaseWhenModifyCallback(true, true);
 
         Semaphore sem;
         CasAllocatorThread * threads[numCasThreads];

+ 17 - 2
roxie/roxiemem/roxiemem.hpp

@@ -411,14 +411,12 @@ interface IRowManager : extends IInterface
     virtual void resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback) = 0;
     virtual void resizeRow(memsize_t & capacity, void * & original, memsize_t copysize, memsize_t newsize, unsigned activityId) = 0;
     virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize, unsigned activityId) = 0;
-    virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;
     virtual unsigned allocated() = 0;
     virtual unsigned numPagesAfterCleanup(bool forceFreeAll) = 0; // calls releaseEmptyPages() then returns
     virtual bool releaseEmptyPages(bool forceFreeAll) = 0; // ensures any empty pages are freed back to the heap
     virtual unsigned getMemoryUsage() = 0;
     virtual bool attachDataBuff(DataBuffer *dataBuff) = 0 ;
     virtual void noteDataBuffReleased(DataBuffer *dataBuff) = 0 ;
-    virtual void setActivityTracking(bool val) = 0;
     virtual void reportLeaks() = 0;
     virtual void checkHeap() = 0;
     virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags) = 0;
@@ -427,6 +425,21 @@ interface IRowManager : extends IInterface
     virtual void removeRowBuffer(IBufferedRowCallback * callback) = 0;
     virtual memsize_t getExpectedCapacity(memsize_t size, unsigned heapFlags) = 0; // what is the expected capacity for a given size allocation
     virtual memsize_t getExpectedFootprint(memsize_t size, unsigned heapFlags) = 0; // how much memory will a given size allocation actually use.
+
+//Allow various options to be configured
+    virtual void setActivityTracking(bool val) = 0;
+    virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;  // First size is max memory, second is the limit which will trigger a background thread to reduce memory
+
+    //set the number of callbacks that successfully free some memory before deciding it is good enough.
+    //Default is 1, use -1 to free all possible memory whenever an out of memory occurs
+    virtual void setMemoryCallbackThreshold(unsigned value) = 0;
+
+    //If set release callbacks are called on separate threads - it maximises the likelihood of hitting a deadlock.
+    virtual void setCallbackOnThread(bool value) = 0;
+    //If enabled, callbacks will be called whenever a new block of memory needs to be allocated
+    virtual void setMinimizeFootprint(bool value, bool critical) = 0;
+    //If set, and changes to the callback list always triggers the callbacks to be called.
+    virtual void setReleaseWhenModifyCallback(bool value, bool critical) = 0;
 };
 
 extern roxiemem_decl void setDataAlignmentSize(unsigned size);
@@ -463,6 +476,8 @@ extern roxiemem_decl unsigned getHeapPercentAllocated();
 extern roxiemem_decl unsigned getDataBufferPages();
 extern roxiemem_decl unsigned getDataBuffersActive();
 
+//Various options to stress the memory
+
 extern roxiemem_decl unsigned memTraceLevel;
 extern roxiemem_decl memsize_t memTraceSizeLimit;
 

+ 5 - 0
system/jlib/jthread.cpp

@@ -223,6 +223,11 @@ void Thread::adjustNiceLevel()
 #endif
 }
 
+bool Thread::isCurrentThread() const
+{
+    return GetCurrentThreadId() == threadid;
+}
+
 // _nicelevel ranges from -20 to 19, the higher the nice level, the less cpu time the thread will get.
 void Thread::setNice(char _nicelevel)
 {

+ 1 - 0
system/jlib/jthread.hpp

@@ -104,6 +104,7 @@ public:
     ~Thread();
 
     void adjustPriority(char delta);
+    bool isCurrentThread() const;
     void setNice(char nicelevel);
     void setStackSize(size32_t size);               // required stack size in bytes - called before start() (obviously)
     const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }

+ 9 - 0
thorlcr/thorutil/thmem.cpp

@@ -1750,6 +1750,15 @@ public:
         allocatorMetaCache.setown(createRowAllocatorCache(this));
         rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), allocatorMetaCache, false));
         rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
+        const bool paranoid = false;
+        if (paranoid)
+        {
+            //you probably want to test these options individually
+            rowManager->setMemoryCallbackThreshold((unsigned)-1);
+            rowManager->setCallbackOnThread(true);
+            rowManager->setMinimizeFootprint(true, true);
+            rowManager->setReleaseWhenModifyCallback(true, true);
+        }
     }
     ~CThorAllocator()
     {