Browse Source

HPCC-13690 Implement synchronous parallel row release

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 10 năm trước cách đây
mục cha
commit
c91b9dfb3f

+ 6 - 11
common/thorhelper/roxiehelper.cpp

@@ -33,16 +33,13 @@ unsigned traceLevel = 0;
 //OwnedRowArray
 void OwnedRowArray::clear()
 {
-    ForEachItemIn(idx, buff)
-        ReleaseRoxieRow(buff.item(idx));
+    roxiemem::ReleaseRoxieRowArray(buff.ordinality(), buff.getArray());
     buff.kill();
 }
 
 void OwnedRowArray::clearPart(aindex_t from, aindex_t to)
 {
-    aindex_t idx;
-    for(idx = from; idx < to; idx++)
-        ReleaseRoxieRow(buff.item(idx));
+    roxiemem::ReleaseRoxieRowRange(buff.getArray(), from, to);
     buff.removen(from, to-from);
 }
 
@@ -655,8 +652,7 @@ public:
 
     virtual void reset()
     {
-        while (sorted.isItem(curIndex))
-            ReleaseRoxieRow(sorted.item(curIndex++));
+        roxiemem::ReleaseRoxieRowRange(sorted.getArray(), curIndex, sorted.ordinality());
         curIndex = 0;
         sorted.kill();
     }
@@ -756,8 +752,7 @@ public:
 
     ~SortedBlock()
     {
-        while (pos < length)
-            ReleaseRoxieRow(rows[pos++]);
+        roxiemem::ReleaseRoxieRowRange(rows, pos, length);
         ReleaseRoxieRow(rows);
     }
 
@@ -1107,14 +1102,14 @@ public:
         eof = false;
         if (inputAlreadySorted)
         {
-            while (sorted.isItem(curIndex))
-                ReleaseRoxieRow(sorted.item(curIndex++));
+            roxiemem::ReleaseRoxieRowRange(sorted.getArray(), curIndex, sorted.ordinality());
             sorted.kill();
         }
         else
         {
             roxiemem::ReleaseRoxieRows(sorted);
         }
+        curIndex = 0;
         inputAlreadySorted = true;
         sequences.kill();
     }

+ 4 - 4
ecl/hthor/hthor.cpp

@@ -26,6 +26,7 @@
 #include "jisem.hpp"
 #include "roxiedebug.hpp"
 #include "roxierow.hpp"
+#include "roxiemem.hpp"
 #include "eclhelper.hpp"
 #include "workunit.hpp"
 #include "jfile.hpp"
@@ -7085,8 +7086,7 @@ CHThorTopNActivity::CHThorTopNActivity(IAgentContext & _agent, unsigned _activit
 
 CHThorTopNActivity::~CHThorTopNActivity()
 {
-    while(curIndex < sortedCount)
-        ReleaseRoxieRow(sorted[curIndex++]);
+    roxiemem::ReleaseRoxieRowRange(sorted, curIndex, sortedCount);
     free(sorted);
 }
 
@@ -7105,10 +7105,10 @@ void CHThorTopNActivity::ready()
 void CHThorTopNActivity::done()
 {
     CHThorSimpleActivityBase::done();
-    while(curIndex < sortedCount)
-        ReleaseRoxieRow(sorted[curIndex++]);
+    roxiemem::ReleaseRoxieRowRange(sorted, curIndex, sortedCount);
     free(sorted);
     sorted = NULL;
+    curIndex = 0;
 }
 
 const void * CHThorTopNActivity::nextInGroup()

+ 0 - 1
plugins/fileservices/fileservices.cpp

@@ -21,7 +21,6 @@
 #include "platform.h"
 #include "fileservices.hpp"
 #include "workunit.hpp"
-#include "agentctx.hpp"
 #include "jio.hpp"
 #include "jmisc.hpp"
 #include "daft.hpp"

+ 22 - 31
roxie/ccd/ccdserver.cpp

@@ -4377,9 +4377,9 @@ public:
 
     virtual void onReset()
     {
-        while (buff.isItem(index))
-            ReleaseRoxieRow(buff.item(index++));
+        roxiemem::ReleaseRoxieRowRange(buff.getArray(), index, buff.ordinality());
         buff.kill();
+        index = 0;
         pulled = false;
         exception.clear();
         CRemoteResultAdaptor::onReset();
@@ -9110,9 +9110,9 @@ public:
 
     inline void releaseGathered()
     {
-        while (gathered.isItem(curIndex))
-            ReleaseRoxieRow(gathered.item(curIndex++));
+        roxiemem::ReleaseRoxieRowRange(gathered.getArray(), curIndex, gathered.ordinality());
         gathered.kill();
+        curIndex = 0;
     }
 
     virtual const void * nextInGroup()
@@ -9607,9 +9607,9 @@ public:
         setCounts = NULL;
         free(limits);
         limits = NULL;
-        while (gathered.isItem(curIndex))
-            ReleaseRoxieRow(gathered.item(curIndex++));
+        roxiemem::ReleaseRoxieRowRange(gathered.getArray(), curIndex, gathered.ordinality());
         gathered.kill();
+        curIndex = 0;
         CRoxieServerActivity::reset();
     }
 
@@ -16899,9 +16899,7 @@ private:
 
         ~DedupLookupTable()
         {
-            unsigned i;
-            for(i=0; i<size; i++)
-                ReleaseRoxieRow(table[i]);
+            roxiemem::ReleaseRoxieRowArray(size, table);
             free(table);
         }
 
@@ -16966,9 +16964,7 @@ private:
 
         ~FewLookupTable()
         {
-            unsigned i;
-            for(i=0; i<size; i++)
-                ReleaseRoxieRow(table[i]);
+            roxiemem::ReleaseRoxieRowArray(size, table);
             free(table);
         }
 
@@ -17064,10 +17060,7 @@ private:
 
         ~ManyLookupTable()
         {
-            ForEachItemIn(idx, rowtable)
-            {
-                ReleaseRoxieRow(rowtable.item(idx));
-            }
+            roxiemem::ReleaseRoxieRows(rowtable);
             free(table);
         }
 
@@ -17243,8 +17236,7 @@ public:
         }
         catch (...)
         {
-            ForEachItemIn(idx, rightset)
-                ReleaseRoxieRow(rightset.item(idx));
+            roxiemem::ReleaseRoxieRows(rightset);
             throw;
         }
     };
@@ -18093,11 +18085,12 @@ public:
     {
         if (sorted)
         {
-            while(curIndex < sortedCount)
-                ReleaseRoxieRow(sorted[curIndex++]);
+            roxiemem::ReleaseRoxieRowRange(sorted, curIndex, sortedCount);
+            curIndex = 0;
+            sortedCount = 0;
             ReleaseRoxieRow(sorted);
+            sorted = NULL;
         }
-        sorted = NULL;
         CRoxieServerLateStartActivity::reset();
     }
 
@@ -18324,9 +18317,9 @@ public:
 
     virtual void reset()
     {
-        while (buff.isItem(index))
-            ReleaseRoxieRow(buff.item(index++));
+        roxiemem::ReleaseRoxieRowRange(buff.getArray(), index, buff.ordinality());
         buff.kill();
+        index = 0;
         started = false;
         CRoxieServerLimitActivity::reset();
     }
@@ -18512,9 +18505,9 @@ public:
 
     virtual void reset()
     {
-        while (buff.isItem(index))
-            ReleaseRoxieRow(buff.item(index++));
+        roxiemem::ReleaseRoxieRowRange(buff.getArray(), index, buff.ordinality());
         buff.kill();
+        index = 0;
         started = false;
         CRoxieServerActivity::reset();
     }
@@ -20304,8 +20297,7 @@ public:
 
     virtual void reset()
     {
-        while (readrows.isItem(readIndex))
-            ReleaseRoxieRow(readrows.item(readIndex++));
+        roxiemem::ReleaseRoxieRowRange(readrows.getArray(), readIndex, readrows.ordinality());
         readrows.kill();
         readAheadDone = false;
         readIndex = 0;
@@ -20331,9 +20323,9 @@ public:
                     if (preprocessed > rowLimit)
                     {
                         ReleaseRoxieRow(row);
-                        while (readrows.isItem(readIndex))
-                            ReleaseRoxieRow(readrows.item(readIndex++));
+                        roxiemem::ReleaseRoxieRowRange(readrows.getArray(), readIndex, readrows.ordinality());
                         readrows.kill();
+                        readIndex = 0;
                         eof = true;
                         if (ctx->queryDebugContext())
                             ctx->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
@@ -23527,8 +23519,7 @@ public:
         if (left)
         {
             ReleaseRoxieRow(left);
-            ForEachItemIn(idx, rows)
-                ReleaseRoxieRow(rows.item(idx));
+            roxiemem::ReleaseRoxieRowArray(rows.ordinality(), (const void * *)rows.getArray());
             rows.kill();
         }
     }

+ 291 - 12
roxie/roxiemem/roxiemem.cpp

@@ -20,6 +20,10 @@
 #include "jlog.hpp"
 #include "jset.hpp"
 #include <new>
+#ifdef _USE_TBB
+#include "tbb/task.h"
+#include "tbb/task_scheduler_init.h"
+#endif
 
 #ifndef _WIN32
 #include <sys/mman.h>
@@ -41,6 +45,11 @@ namespace roxiemem {
 
 #define NOTIFY_UNUSED_PAGES_ON_FREE     // avoid linux swapping 'freed' pages to disk
 
+//The following constants should probably be tuned depending on the architecture - see Tuning Test at the end of the file.
+#define DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY       2048        // default
+#define RELEASE_THRESHOLD_SCALING                       64     // rather high, may reduce if memory manager restructured
+#define DEFAULT_PARALLEL_SYNC_RELEASE_THRESHOLD         (DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY * RELEASE_THRESHOLD_SCALING)
+
 unsigned memTraceLevel = 1;
 memsize_t memTraceSizeLimit = 0;
 
@@ -1023,12 +1032,10 @@ void HeapletBase::releaseRowset(unsigned count, byte * * rowset)
     if (rowset)
     {
         //MORE: There is a small window of simultaneous releases that could lead to the children being leaked
+        //This should really implemented as a destructor, possibly of a special activity number
         if (!isShared(rowset))
-        {
-            byte * * finger = rowset;
-            while (count--)
-                release(*finger++);
-        }
+            ReleaseRoxieRowArray(count, (const void * *)rowset);
+
         release(rowset);
     }
 }
@@ -1099,6 +1106,111 @@ bool HeapletBase::hasDestructor(const void *ptr)
         return false;
 }
 
+//================================================================================
+
+#ifdef _USE_CPPUNIT
+static size_t parallelSyncReleaseGranularity = DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY;
+static size_t parallelSyncReleaseThreshold = DEFAULT_PARALLEL_SYNC_RELEASE_THRESHOLD;
+
+//Only used by the tuning unit tests, should possibly use a different flag to fix them as constant
+static void setParallelSyncReleaseGranularity(size_t granularity, unsigned scaling)
+{
+    parallelSyncReleaseGranularity = granularity;
+    parallelSyncReleaseThreshold = granularity * scaling;
+}
+#else
+const static size_t parallelSyncReleaseGranularity = DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY;
+const static size_t parallelSyncReleaseThreshold = DEFAULT_PARALLEL_SYNC_RELEASE_THRESHOLD;
+#endif
+
+
+inline void inlineReleaseRoxieRowArray(size_t count, const void * * rows)
+{
+    for (size_t i = 0; i < count; i++)
+        ReleaseRoxieRow(rows[i]);
+}
+
+#ifdef PARALLEL_SYNC_RELEASE
+class sync_releaser_task : public tbb::task
+{
+public:
+    sync_releaser_task(size_t _num, const void * * _rows) : num(_num), rows(_rows) {}
+
+    virtual task * execute()
+    {
+        inlineReleaseRoxieRowArray(num, rows);
+        return NULL;
+    }
+
+public:
+    size_t num;
+    const void * * rows;
+};
+
+void ParallelReleaseRoxieRowArray(size_t count, const void * * rows)
+{
+    //This is quicker than creating a task list and then spawning all.
+    tbb::task * completed_task = new (tbb::task::allocate_root()) tbb::empty_task();
+    unsigned numTasks = (unsigned)((count + parallelSyncReleaseGranularity -1) / parallelSyncReleaseGranularity);
+    completed_task->set_ref_count(1+numTasks);
+
+    //Arrange the blocks that are freed from the array so that if the rows are allocated in order, adjacent blocks
+    //are not released at the same time, therefore reducing contention.
+    const unsigned stride = 8;
+    for (unsigned j= 0; j < stride; j++)
+    {
+        size_t i = j * parallelSyncReleaseGranularity;
+        for (; i < count; i += parallelSyncReleaseGranularity * stride)
+        {
+            size_t remain = count - i;
+            size_t blockRows = (remain > parallelSyncReleaseGranularity) ? parallelSyncReleaseGranularity : remain;
+            tbb::task * next = new (completed_task->allocate_child()) sync_releaser_task(blockRows, rows + i);
+            next->spawn(*next); // static member in tbb 3.0
+        }
+    }
+
+    completed_task->wait_for_all();
+    completed_task->destroy(*completed_task);       // static member in tbb 3.0
+}
+#endif
+
+
+void ReleaseRoxieRowArray(size_t count, const void * * rows)
+{
+#ifdef PARALLEL_SYNC_RELEASE
+    if (count >= parallelSyncReleaseThreshold)
+    {
+        ParallelReleaseRoxieRowArray(count, rows);
+        return;
+    }
+#endif
+
+    inlineReleaseRoxieRowArray(count, rows);
+}
+
+//Not implemented as ReleaseRoxieRowArray(to - from, rows + from) to avoid "from > to" wrapping issues
+void roxiemem_decl ReleaseRoxieRowRange(const void * * rows, size_t from, size_t to)
+{
+#ifdef PARALLEL_SYNC_RELEASE
+    if ((from < to) && ((to - from) >= parallelSyncReleaseThreshold))
+    {
+        ParallelReleaseRoxieRowArray(to-from, rows+from);
+        return;
+    }
+#endif
+    for (size_t i = from; i < to; i++)
+        ReleaseRoxieRow(rows[i]);
+}
+
+
+void ReleaseRoxieRows(ConstPointerArray &rows)
+{
+    ReleaseRoxieRowArray(rows.ordinality(), rows.getArray());
+    rows.kill();
+}
+
+//================================================================================
+
 class CHeap;
 static void noteEmptyPage(CHeap * heap);
 class Heaplet : public HeapletBase
@@ -3254,6 +3366,7 @@ void initAllocSizeMappings(const unsigned * sizes)
     numDirectBuckets = bucket+1;
 }
 
+
 //---------------------------------------------------------------------------------------------------------------------
 
 
@@ -4935,8 +5048,7 @@ public:
         if (numCommitted == 0)
             return false;
         const void * * committed = rows.getBlock(numCommitted);
-        for (unsigned i=0; i < numCommitted; i++)
-            ReleaseRoxieRow(committed[i]);
+        ReleaseRoxieRowArray(numCommitted, committed);
         rows.noteSpilled(numCommitted);
         return true;
     }
@@ -4973,18 +5085,16 @@ public:
 
     void addRow(const void * row)
     {
-        rows.append((void *)row);
+        rows.append(row);
     }
 
     void kill()
     {
-        ForEachItemIn(i, rows)
-            ReleaseRoxieRow(rows.item(i));
-        rows.kill();
+        ReleaseRoxieRows(rows);
     }
 
 protected:
-    PointerArray rows;
+    ConstPointerArray rows;
     unsigned cost;
 };
 
@@ -6797,10 +6907,179 @@ protected:
     }
 };
 
+
+//#define RUN_SINGLE_TEST
+
+static const memsize_t tuningMemorySize = I64C(0x100000000);
+static const unsigned tuningAllocSize = 64;
+static const memsize_t numTuningRows = tuningMemorySize / (tuningAllocSize * 2);
+static const memsize_t numTuningIters = 5;
+static const size_t minGranularity = 256;
+static const size_t maxGranularity = 0x2000;
+static const size_t defaultGranularity = DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY;
+
+static int compareTiming(const void * pLeft, const void * pRight)
+{
+    const unsigned * left = (const unsigned *)pLeft;
+    const unsigned * right = (const unsigned *)pRight;
+    return (*left < *right) ? -1 : (*left > *right) ? +1 : 0;
+}
+
+
+class RoxieMemTuningTests : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE( RoxieMemTuningTests );
+    CPPUNIT_TEST(testSetup);
+#ifdef RUN_SINGLE_TEST
+    CPPUNIT_TEST(testOne);
+#else
+    CPPUNIT_TEST(testSyncOrderRelease);
+    CPPUNIT_TEST(testSyncShuffleRelease);
+#endif
+    CPPUNIT_TEST(testCleanup);
+    CPPUNIT_TEST_SUITE_END();
+    const IContextLogger &logctx;
+
+public:
+    RoxieMemTuningTests() : logctx(queryDummyContextLogger())
+    {
+    }
+
+    ~RoxieMemTuningTests()
+    {
+    }
+
+protected:
+    void testSetup()
+    {
+        setTotalMemoryLimit(true, true, true, tuningMemorySize, 0, NULL, NULL);
+    }
+
+    void testCleanup()
+    {
+        releaseRoxieHeap();
+    }
+
+    void testOne()
+    {
+        unsigned sequential = 0;
+        unsigned numTestRows = 0x10000;
+        testSync(numTestRows, DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY, true, sequential);
+    }
+
+    void testSyncOrderRelease()
+    {
+        testSyncRows(false);
+        testSyncRelease(numTuningRows / 0x100, false);
+        testSyncRelease(numTuningRows / 0x10, false);
+        testSyncRelease(numTuningRows, false);
+    }
+    void testSyncShuffleRelease()
+    {
+        testSyncRows(true);
+        testSyncRelease(numTuningRows / 256, true);
+        testSyncRelease(numTuningRows / 16, true);
+        testSyncRelease(numTuningRows, true);
+    }
+    void testSyncRelease(size_t numRows, bool shuffle)
+    {
+        size_t granularity = minGranularity;
+
+        //First timing is not done in parallel
+        unsigned sequential = testSync(numRows, numTuningRows, shuffle, 0);
+        while ((granularity < maxGranularity) && (granularity * 2< numRows))
+        {
+            testSync(numRows, granularity, shuffle, sequential);
+            testSync(numRows, granularity+granularity/2, shuffle, sequential);
+            granularity <<= 1;
+        }
+    }
+
+    void testSyncRows(bool shuffle)
+    {
+        unsigned numRows = numTuningRows;
+        while (numRows >= defaultGranularity)
+        {
+            unsigned sequential = testSync(numRows, numRows+1, shuffle, 0);
+            testSync(numRows, defaultGranularity, shuffle, sequential);
+            numRows = numRows / 2;
+        }
+    }
+
+    unsigned testSync(size_t numRows, size_t granularity, bool shuffle, unsigned sequential)
+    {
+        setParallelSyncReleaseGranularity(granularity, 2);
+        unsigned times[numTuningIters];
+        for (unsigned iter=0; iter < numTuningIters; iter++)
+            times[iter] = testSingleSync(numRows, granularity, shuffle);
+
+        unsigned median = reportSummary("Sync", numRows, granularity, shuffle, sequential, times);
+        return median;
+    }
+
+    unsigned reportSummary(const char * type, size_t numRows, size_t granularity, bool shuffle, unsigned sequential, unsigned * times)
+    {
+        //Calculate the median
+        qsort(times, numTuningIters, sizeof(*times), compareTiming);
+        //Give an estimate of the range - exclude the min and max values
+        unsigned low = 0;
+        unsigned high = 0;
+        unsigned median = times[numTuningIters/2];
+        if (numTuningIters >= 5)
+        {
+            high = (times[numTuningIters-2] - median);
+            low = median - times[1];
+        }
+        double percent = sequential ? ((double)median * 100) / sequential : 100.0;
+        const char * compare = (sequential ? ((median < sequential) ? "<" : ">") : " ");
+        printf("%s %s %s (%u) took %u(-%u..+%u) us {%.2f%%} for %" I64F "u rows\n", type, shuffle ? "Shuffle" : "Ordered", compare, (unsigned)granularity, median, low, high, percent, (unsigned __int64)numRows);
+        return median;
+    }
+
+    unsigned testSingleSync(size_t numRows, size_t granularity, bool shuffle)
+    {
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
+        ConstPointerArray rows;
+        createRows(rowManager, numRows, rows, shuffle);
+        cycle_t start = get_cycles_now();
+        ReleaseRoxieRowArray(rows.ordinality(), rows.getArray());
+        unsigned microsecs = cycle_to_microsec(get_cycles_now() - start);
+        return microsecs;
+    }
+
+
+    void createRows(IRowManager * rowManager, size_t numRows, ConstPointerArray & target, bool shuffle)
+    {
+        Owned<IFixedRowHeap> heap = rowManager->createFixedRowHeap(tuningAllocSize, 0, RHFpacked, 0);
+        target.ensure(numTuningRows);
+        for (size_t i = 0; i < numRows; i++)
+            target.append(heap->allocate());
+
+        if (shuffle)
+        {
+            Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
+            random->seed(123456789);
+            unsigned i = target.ordinality();
+            while (i > 1)
+            {
+                unsigned j = random->next() % i;
+                i--;
+                target.swap(i, j);
+            }
+        }
+    }
+
+};
+
+
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemTests );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemTests, "RoxieMemTests" );
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemStressTests );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemStressTests, "RoxieMemStressTests" );
+
+CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemTuningTests );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemTuningTests, "RoxieMemTuningTests" );
+
 #ifdef __64BIT__
 //CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemHugeTests );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemHugeTests, "RoxieMemHugeTests" );

+ 9 - 6
roxie/roxiemem/roxiemem.hpp

@@ -17,12 +17,18 @@
 
 #ifndef _ROXIEMEM_INCL
 #define _ROXIEMEM_INCL
+#include "platform.h"
 #include "jlib.hpp"
 #include "jlog.hpp"
 #include "jdebug.hpp"
 #include "jstats.h"
 #include "errorlist.h"
 
+#if defined(_USE_TBB)
+//Release blocks of rows in parallel - always likely to improve performance
+#define PARALLEL_SYNC_RELEASE
+#endif
+
 #ifdef _WIN32
  #ifdef ROXIEMEM_EXPORTS
   #define roxiemem_decl __declspec(dllexport)
@@ -299,12 +305,9 @@ public:
 #define RoxieRowAllocatorId(row) roxiemem::HeapletBase::getAllocatorId(row)
 #define RoxieRowIsShared(row)  roxiemem::HeapletBase::isShared(row)
 
-inline void ReleaseRoxieRows(ConstPointerArray &data)
-{
-    ForEachItemIn(idx, data)
-        ReleaseRoxieRow(data.item(idx));
-    data.kill();
-}
+void roxiemem_decl ReleaseRoxieRowArray(size_t count, const void * * rows);
+void roxiemem_decl ReleaseRoxieRowRange(const void * * rows, size_t from, size_t to);
+void roxiemem_decl ReleaseRoxieRows(ConstPointerArray &rows);
 
 
 class OwnedRoxieRow;

+ 2 - 4
roxie/roxiemem/roxierowbuff.cpp

@@ -107,8 +107,7 @@ void RoxieOutputRowArray::flush()
 
 void RoxieOutputRowArray::clearRows()
 {
-    for (rowidx_t i = firstRow; i < numRows; i++)
-        ReleaseRoxieRow(rows[i]);
+    roxiemem::ReleaseRoxieRowRange(rows, firstRow, numRows);
     firstRow = 0;
     numRows = 0;
     commitRows = 0;
@@ -205,8 +204,7 @@ void RoxieSimpleInputRowArray::kill()
 {
     if (rows)
     {
-        for (rowidx_t i = firstRow; i < numRows; i++)
-            ReleaseRoxieRow(rows[i]);
+        roxiemem::ReleaseRoxieRowRange(rows, firstRow, numRows);
         firstRow = 0;
         numRows = 0;
         ReleaseRoxieRow(rows);

+ 1 - 5
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2261,11 +2261,7 @@ class CLookupHT : public CHTBase
     }
     void releaseHTRows()
     {
-        for (rowidx_t r=0; r<htSize; r++)
-        {
-            if (ht[r])
-                ReleaseThorRow(ht[r]);
-        }
+        roxiemem::ReleaseRoxieRowArray(htSize, ht);
     }
 public:
     CLookupHT()

+ 1 - 4
thorlcr/thorutil/thbuf.cpp

@@ -1751,10 +1751,7 @@ public:
     }
     ~CRowMultiWriterReader()
     {
-        while (rowPos < rowsToRead)
-        {
-            ReleaseThorRow(readRows[rowPos++]);
-        }
+        roxiemem::ReleaseRoxieRowRange(readRows, rowPos, rowsToRead);
         ReleaseThorRow(readRows);
     }
     void writerStopped()

+ 5 - 12
thorlcr/thorutil/thmem.cpp

@@ -366,10 +366,7 @@ public:
     ~CSpillableStream()
     {
         spillStream.clear();
-        while (pos < numReadRows)
-        {
-            ReleaseThorRow(readRows[pos++]);
-        }
+        roxiemem::ReleaseRoxieRowRange(readRows, pos, numReadRows);
         ReleaseThorRow(readRows);
     }
 
@@ -583,8 +580,7 @@ inline bool CThorExpandingRowArray::_resize(rowidx_t requiredRows, unsigned maxS
         {
             if (numRows > requiredRows)
             {
-                for (rowidx_t i = requiredRows; i < numRows; i++)
-                    ReleaseThorRow(rows[i]);
+                roxiemem::ReleaseRoxieRowRange(rows, requiredRows, numRows);
                 numRows = requiredRows;
             }
         }
@@ -662,8 +658,7 @@ void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, Sta
 
 void CThorExpandingRowArray::clearRows()
 {
-    for (rowidx_t i = 0; i < numRows; i++)
-        ReleaseThorRow(rows[i]);
+    roxiemem::ReleaseRoxieRowArray(numRows, rows);
     numRows = 0;
 }
 
@@ -790,8 +785,7 @@ void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)
     if (rows)
     {
         rowidx_t end = start+n;
-        for (rowidx_t i = start; i < end; i++)
-            ReleaseThorRow(rows[i]);
+        roxiemem::ReleaseRoxieRowRange(rows, start, end);
         //firstRow = 0;
         const void **from = rows+start;
         memmove(from, from+n, (numRows-end) * sizeof(void *));
@@ -1238,8 +1232,7 @@ CThorSpillableRowArray::~CThorSpillableRowArray()
 
 void CThorSpillableRowArray::clearRows()
 {
-    for (rowidx_t i = firstRow; i < numRows; i++)
-        ReleaseThorRow(rows[i]);
+    roxiemem::ReleaseRoxieRowRange(rows, firstRow, numRows);
     numRows = 0;
     firstRow = 0;
     commitRows = 0;