浏览代码

Merge branch 'candidate-6.2.22' into candidate-6.4.2

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父节点
当前提交
1c4c6ffc9a
共有 1 个文件被更改,包括 104 次插入2 次删除
  1. 104 2
      roxie/roxiemem/roxiemem.cpp

+ 104 - 2
roxie/roxiemem/roxiemem.cpp

@@ -4300,9 +4300,17 @@ public:
         }
         }
 
 
         void *ret = allocate(newsize, activityId, maxSpillCost);
         void *ret = allocate(newsize, activityId, maxSpillCost);
-        memcpy(ret, original, copysize);
         memsize_t newCapacity = HeapletBase::capacity(ret);
         memsize_t newCapacity = HeapletBase::capacity(ret);
-        callback.atomicUpdate(newCapacity, ret);
+
+        //Copying data must lock for the duration (otherwise another thread modifying the data may leave it out of sync)
+        callback.lock();
+
+        memcpy(ret, original, copysize);
+
+        //previously locked => update the pointer and then unlock
+        callback.update(newCapacity, ret);
+        callback.unlock();
+
         HeapletBase::release(original);
         HeapletBase::release(original);
     }
     }
 
 
@@ -6209,6 +6217,7 @@ class RoxieMemTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testAll);
         CPPUNIT_TEST(testAll);
         CPPUNIT_TEST(testRecursiveCallbacks);
         CPPUNIT_TEST(testRecursiveCallbacks);
         CPPUNIT_TEST(testResize);
         CPPUNIT_TEST(testResize);
+        CPPUNIT_TEST(testResizeLock);
         //MORE: The following currently leak pages, so should go last
         //MORE: The following currently leak pages, so should go last
         CPPUNIT_TEST(testDatamanager);
         CPPUNIT_TEST(testDatamanager);
         CPPUNIT_TEST(testCleanup);
         CPPUNIT_TEST(testCleanup);
@@ -7801,6 +7810,99 @@ protected:
 
 
         CPPUNIT_ASSERT_EQUAL(rowManager->numPagesAfterCleanup(true), 0U);
         CPPUNIT_ASSERT_EQUAL(rowManager->numPagesAfterCleanup(true), 0U);
     }
     }
+    class ResizeCallback : public IRowResizeCallback
+    {
+    public:
+        virtual void lock() { cs.enter(); }
+        virtual void unlock() { cs.leave(); }
+        virtual void update(memsize_t capacity, void * ptr) { max = capacity; rows = (const void * *)ptr; }
+        virtual void atomicUpdate(memsize_t capacity, void * ptr) { lock(); update(capacity, ptr); unlock(); }
+
+        bool failed = false;
+        memsize_t max = 0;
+        std::atomic<memsize_t> count = {0};
+        const void * * rows = nullptr;
+        CriticalSection cs;
+    };
+    const static unsigned maxAlloc = 1000;
+    const static unsigned step = 10;
+
+    class ResizeThread : public Thread
+    {
+    public:
+        ResizeThread(ResizeCallback & _callback, IRowManager * _rm) : Thread("resizeThread"), callback(_callback), rm(_rm)
+        {
+        }
+
+        int run()
+        {
+            for (unsigned i=0; i < maxAlloc; i++)
+            {
+                if (i % step == 0)
+                    rm->resizeRow(callback.rows, i * sizeof(void *), (i+step)*sizeof(void*), 0, 0, callback);
+
+                callback.rows[i] = rm->allocate(20, 0);
+                callback.count = i+1;
+            }
+            return 0;
+        }
+    protected:
+        ResizeCallback & callback;
+        IRowManager * rm;
+    };
+
+    class ReleaseThread : public Thread
+    {
+    public:
+        ReleaseThread(ResizeCallback & _callback, IRowManager * _rm) : Thread("releaseThread"), callback(_callback), rm(_rm)
+        {
+        }
+
+        int run()
+        {
+            try
+            {
+                for (;;)
+                {
+                    unsigned limit = callback.count;
+                    callback.lock();
+                    for (unsigned i=0; i < limit; i++)
+                    {
+                        if (callback.rows[i])
+                            ReleaseClearRoxieRow(callback.rows[i]);
+                    }
+                    callback.unlock();
+                    if (limit == maxAlloc)
+                        return 0;
+                }
+            }
+            catch (IException * e)
+            {
+                DBGLOG(e);
+                callback.unlock();
+                callback.failed = true;
+                return 0;
+            }
+        }
+    protected:
+        ResizeCallback & callback;
+        IRowManager * rm;
+    };
+
+    void testResizeLock()
+    {
+        ResizeCallback callback;
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
+        callback.rows = (const void * *)rowManager->allocate(1, 0);
+        Owned<ReleaseThread> releaser = new ReleaseThread(callback, rowManager);
+        Owned<ResizeThread> resizer = new ResizeThread(callback, rowManager);
+        releaser->start();
+        resizer->start();
+        resizer->join();
+        releaser->join();
+        CPPUNIT_ASSERT(!callback.failed);
+    }
+
 };
 };
 
 
 class CSimpleRowResizeCallback : public CVariableRowResizeCallback
 class CSimpleRowResizeCallback : public CVariableRowResizeCallback