Browse Source

HPCC-14469 Implement partitioned merging for final stage of merge sort

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
55a4d8ace5

+ 63 - 0
common/thorhelper/roxiehelper.cpp

@@ -621,6 +621,8 @@ extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const I
 class CSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
 {
 public:
+    CSortAlgorithm() { elapsedCycles = 0; }
+
     virtual void getSortedGroup(ConstPointerArray & result)
     {
         loop
@@ -631,6 +633,17 @@ public:
             result.append(row);
         }
     }
+
+    virtual cycle_t getElapsedCycles(bool reset)
+    {
+        cycle_t ret = elapsedCycles;
+        if (reset)
+            elapsedCycles = 0;
+        return ret;
+    }
+
+protected:
+    cycle_t elapsedCycles;
 };
 
 class CInplaceSortAlgorithm : public CSortAlgorithm
@@ -676,7 +689,28 @@ public:
     {
         curIndex = 0;
         if (input->nextGroup(sorted))
+        {
+            cycle_t startCycles = get_cycles_now();
             qsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
+            elapsedCycles += (get_cycles_now() - startCycles);
+        }
+    }
+};
+
+class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
+{
+public:
+    CTbbQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
+
+    virtual void prepare(IInputBase *input)
+    {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+        {
+            cycle_t startCycles = get_cycles_now();
+            tbbqsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
+            elapsedCycles += (get_cycles_now() - startCycles);
+        }
     }
 };
 
@@ -696,7 +730,9 @@ public:
             void **rows = const_cast<void * *>(sorted.getArray());
             MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
             void **temp = (void **) tempAttr.bufferBase();
+            cycle_t startCycles = get_cycles_now();
             sortRows(rows, numRows, temp);
+            elapsedCycles += (get_cycles_now() - startCycles);
         }
     }
 };
@@ -734,6 +770,17 @@ public:
     }
 };
 
+class CTbbStableQuickSortAlgorithm : public CStableInplaceSortAlgorithm
+{
+public:
+    CTbbStableQuickSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp)
+    {
+        tbbqsortstable(rows, numRows, *compare, temp);
+    }
+};
+
 #define INSERTION_SORT_BLOCKSIZE 1024
 
 class SortedBlock : public CInterface, implements IInterface
@@ -1277,6 +1324,7 @@ protected:
         unsigned numRows = rowsToSort.numCommitted();
         if (numRows)
         {
+            cycle_t startCycles = get_cycles_now();
             void ** rows = const_cast<void * *>(rowsToSort.getBlock(numRows));
             //MORE: Should this be parallel?  Should that be dependent on whether it is grouped?  Should be a hint.
             if (stable)
@@ -1287,6 +1335,7 @@ protected:
             }
             else
                 sortRows(rows, numRows, *compare, NULL);
+            elapsedCycles += (get_cycles_now() - startCycles);
         }
     }
     bool spillRows()
@@ -1372,6 +1421,16 @@ extern ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare)
     return new CStableQuickSortAlgorithm(_compare);
 }
 
+extern ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CTbbQuickSortAlgorithm(_compare);
+}
+
+extern ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CTbbStableQuickSortAlgorithm(_compare);
+}
+
 extern ISortAlgorithm *createInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId)
 {
     return new CInsertionSortAlgorithm(_compare, _rowManager, _activityId);
@@ -1420,6 +1479,10 @@ extern ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm _algorithm, ICompa
         return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, false);
     case spillingParallelMergeSortAlgorithm:
         return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, true);
+    case tbbQuickSortAlgorithm:
+        return createTbbQuickSortAlgorithm(_compare);
+    case tbbStableQuickSortAlgorithm:
+        return createTbbStableQuickSortAlgorithm(_compare);
     default:
         break;
     }

+ 4 - 0
common/thorhelper/roxiehelper.hpp

@@ -103,6 +103,7 @@ typedef enum { heapSortAlgorithm, insertionSortAlgorithm,
               quickSortAlgorithm, stableQuickSortAlgorithm, spillingQuickSortAlgorithm, stableSpillingQuickSortAlgorithm,
               mergeSortAlgorithm, spillingMergeSortAlgorithm,
               parallelMergeSortAlgorithm, spillingParallelMergeSortAlgorithm,
+              tbbQuickSortAlgorithm, tbbStableQuickSortAlgorithm,
               unknownSortAlgorithm } RoxieSortAlgorithm;
 
 interface ISortAlgorithm : extends IInterface
@@ -111,6 +112,7 @@ interface ISortAlgorithm : extends IInterface
     virtual const void *next() = 0;
     virtual void reset() = 0;
     virtual void getSortedGroup(ConstPointerArray & result) = 0;
+    virtual cycle_t getElapsedCycles(bool reset) = 0;
 };
 
 extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);
@@ -120,6 +122,8 @@ extern THORHELPER_API ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare
 extern THORHELPER_API ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable);
 extern THORHELPER_API ISortAlgorithm *createMergeSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare);
 
 extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
 

+ 4 - 0
roxie/ccd/ccdserver.cpp

@@ -7381,6 +7381,10 @@ public:
             sortAlgorithm = (sortFlags & TAFspill) ? spillingMergeSortAlgorithm : mergeSortAlgorithm;
         else if (stricmp(algorithmName, "parmergesort")==0)
             sortAlgorithm = (sortFlags & TAFspill) ? spillingParallelMergeSortAlgorithm : parallelMergeSortAlgorithm;
+        else if (stricmp(algorithmName, "tbbqsort")==0)
+            sortAlgorithm = tbbQuickSortAlgorithm;
+        else if (stricmp(algorithmName, "tbbstableqsort")==0)
+            sortAlgorithm = tbbStableQuickSortAlgorithm;
         else
         {
             if (*algorithmName)

+ 362 - 24
system/jlib/jsort.cpp

@@ -26,10 +26,12 @@
 #include "jthread.hpp"
 #include "jqueue.tpp"
 #include "jset.hpp"
+#include "jutil.hpp"
 
 #ifdef _USE_TBB
 #include "tbb/task.h"
 #include "tbb/task_scheduler_init.h"
+#include "tbb/parallel_sort.h"
 #endif
 
 #ifdef _DEBUG
@@ -39,7 +41,10 @@
 //#define MCMERGESTATS
 #endif
 
+//#define TRACE_PARTITION
+
 #define PARALLEL_GRANULARITY 1024
+static const unsigned numPartitionSamples = 3;
 
 static bool sortParallel(unsigned &numcpus)
 {
@@ -241,6 +246,69 @@ void qsortvec(void **a, size32_t n, sortCompareFunction compare)
 #undef RECURSE
 
 //---------------------------------------------------------------------------
+// tbb versions of the quick sort to provide a useful base comparison
+
+class TbbCompareWrapper
+{
+public:
+    TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
+    bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
+    const ICompare & compare;
+};
+
+
+class TbbCompareIndirectWrapper
+{
+public:
+    TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
+    bool operator()(void * * const & l, void * * const & r) const
+    {
+        int ret = compare.docompare(*l,*r);
+        if (ret==0)
+        {
+            if (l < r)
+                return true;
+            else
+                return false;
+        }
+        return (ret < 0);
+    }
+    const ICompare & compare;
+};
+
+
+void tbbqsortvec(void **a, size_t n, const ICompare & compare)
+{
+#ifdef _USE_TBB
+    TbbCompareWrapper tbbcompare(compare);
+    tbb::parallel_sort(a, a+n, tbbcompare);
+#else
+    throwUnexpectedX("TBB quicksort not available");
+#endif
+}
+
+void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
+{
+#ifdef _USE_TBB
+    void * * * rowsAsIndex = (void * * *)rows;
+    memcpy(temp, rows, n * sizeof(void*));
+
+    for(unsigned i=0; i<n; ++i)
+        rowsAsIndex[i] = temp+i;
+
+    TbbCompareIndirectWrapper tbbcompare(compare);
+    tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
+
+    //I'm sure this violates the aliasing rules...
+    for(unsigned i=0; i<n; ++i)
+        rows[i] = *rowsAsIndex[i];
+#else
+    throwUnexpectedX("TBB quicksort not available");
+#endif
+}
+
+
+//---------------------------------------------------------------------------
 
 #define CMP(a,b)         (compare.docompare(*(a),*(b)))
 #define MED3(a,b,c)      med3ic(a,b,c,compare)
@@ -621,14 +689,14 @@ void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare
 
     //I'm sure this violates the aliasing rules...
     void * * * rowsAsIndex = (void * * *)rows;
-    for(unsigned i=0; i<n; ++i)
+    for(size32_t i=0; i<n; ++i)
         rows[i] = *rowsAsIndex[i];
 }
 
 
 //-----------------------------------------------------------------------------------------------------------------------------
 
-inline void * * mergePartitions(const ICompare & compare, void * * result, unsigned n1, void * * ret1, unsigned n2, void * * ret2)
+inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
 {
     void * * tgt = result;
     loop
@@ -695,6 +763,13 @@ inline void * * mergePartitions(const ICompare & compare, void * * result, size_
     return result;
 }
 
+inline void clonePartition(void * * result, size_t n, void * * src)
+{
+    void * * tgt = result;
+    while (n--)
+       *tgt++ = *src++;
+}
+
 inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
 {
     void * * tgt = result+n1+n2-1;
@@ -731,7 +806,7 @@ inline void * * mergePartitionsRev(const ICompare & compare, void * * result, si
     return result;
 }
 
-static void * * mergeSort(void ** rows, size32_t n, const ICompare & compare, void ** tmp, unsigned depth)
+static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
 {
     void * * result = (depth & 1) ? tmp : rows;
     //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
@@ -762,8 +837,8 @@ static void * * mergeSort(void ** rows, size32_t n, const ICompare & compare, vo
         return result;
     }
 
-    unsigned n1 = (n+1)/2;
-    unsigned n2 = n - n1;
+    size_t n1 = (n+1)/2;
+    size_t n2 = n - n1;
     void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
     void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
     dbgassertex(ret2 == ret1 + n1);
@@ -772,7 +847,7 @@ static void * * mergeSort(void ** rows, size32_t n, const ICompare & compare, vo
 }
 
 
-void msortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
+void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
 {
     if (n <= 1)
         return;
@@ -812,7 +887,7 @@ class TbbParallelMergeSorter
     class BisectTask : public tbb::task
     {
     public:
-        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size32_t _n, void ** _temp, unsigned _depth, task * _next)
+        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
         : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth), next(_next)
         {
         }
@@ -831,16 +906,34 @@ class TbbParallelMergeSorter
 
                 void * * result = (depth & 1) ? temp : rows;
                 void * * src = (depth & 1) ? rows : temp;
-                unsigned n1 = (n+1)/2;
-                unsigned n2 = n-n1;
+                size_t n1 = (n+1)/2;
+                size_t n2 = n-n1;
                 task * mergeTask;
                 if (depth < sorter.parallelMergeDepth)
                 {
-                    task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
-                    mergeFwdTask->set_ref_count(1);
-                    task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
-                    mergeRevTask->set_ref_count(1);
-                    mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
+                    unsigned partitions = sorter.numPartitionCores() >> depth;
+                    if (partitions > 1)
+                    {
+                        PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
+                        for (unsigned i=0; i < partitions; i++)
+                        {
+                            MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
+                            mergeFwdTask->set_ref_count(1);
+                            MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
+                            mergeRevTask->set_ref_count(1);
+                            splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
+                        }
+                        next->decrement_ref_count();
+                        mergeTask = splitTask;
+                    }
+                    else
+                    {
+                        task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
+                        mergeFwdTask->set_ref_count(1);
+                        task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
+                        mergeRevTask->set_ref_count(1);
+                        mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
+                    }
                 }
                 else
                 {
@@ -862,7 +955,7 @@ class TbbParallelMergeSorter
         void ** rows;
         void ** temp;
         task * next;
-        size32_t n;
+        size_t n;
         unsigned depth;
     };
 
@@ -870,7 +963,7 @@ class TbbParallelMergeSorter
     class SubSortTask : public tbb::task
     {
     public:
-        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size32_t _n, void ** _temp, unsigned _depth)
+        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
         : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth)
         {
         }
@@ -884,7 +977,7 @@ class TbbParallelMergeSorter
         TbbParallelMergeSorter & sorter;
         void ** rows;
         void ** temp;
-        size32_t n;
+        size_t n;
         unsigned depth;
     };
 
@@ -892,17 +985,39 @@ class TbbParallelMergeSorter
     class MergeTask : public tbb::task
     {
     public:
-        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size32_t _n)
+        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
         : compare(_compare),result(_result), n1(_n1), src1(_src1), n2(_n2), src2(_src2), n(_n)
         {
         }
 
         virtual task * execute()
         {
-            mergePartitions(compare, result, n1, src1, n2, src2, n);
+            //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
+            if (n1 == 0)
+            {
+                assertex(n <= n2);
+                clonePartition(result, n, src2);
+            }
+            else if (n2 == 0)
+            {
+                assertex(n <= n1);
+                clonePartition(result, n, src1);
+            }
+            else
+                mergePartitions(compare, result, n1, src1, n2, src2, n);
             return NULL;
         }
 
+        void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
+        {
+            src1 += deltaLeft;
+            n1 = numLeft;
+            src2 += deltaRight;
+            n2 = numRight;
+            result += (deltaLeft + deltaRight);
+            n = num;
+        }
+
     protected:
         const ICompare & compare;
         void * * result;
@@ -923,9 +1038,227 @@ class TbbParallelMergeSorter
 
         virtual task * execute()
         {
-            mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
+            if (n1 == 0)
+            {
+                assertex(n <= n2);
+                //This is a reverse merge, so copy n from the end of the input
+                unsigned delta = n2 - n;
+                clonePartition(result + delta, n, src2 + delta);
+            }
+            else if (n2 == 0)
+            {
+                assertex(n <= n1);
+                unsigned delta = n1 - n;
+                clonePartition(result + delta, n, src1 + delta);
+            }
+            else
+                mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
+            return NULL;
+        }
+    };
+
+    class PartitionSplitTask : public tbb::task
+    {
+    public:
+        PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
+            : numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2), compare(_compare)
+        {
+            //These could be local variables in calculatePartitions(), but placed here to simplify cleanup.  (Should consider using alloca)
+            posLeft = new size_t[numPartitions+1];
+            posRight = new size_t[numPartitions+1];
+            tasks = new MergeTask *[numPartitions*2];
+            for (unsigned i=0; i < numPartitions*2; i++)
+                tasks[i] = NULL;
+        }
+        ~PartitionSplitTask()
+        {
+            delete [] posLeft;
+            delete [] posRight;
+            delete [] tasks;
+        }
+
+        void calculatePartitions()
+        {
+#ifdef PARANOID
+            {
+                for (unsigned ix=1; ix<n1; ix++)
+                    if (compare.docompare(src1[ix-1], src1[ix]) > 0)
+                        printf("Failure left@%u\n", ix);
+            }
+            if (false)
+            {
+                for (unsigned ix=1; ix<n2; ix++)
+                    if (compare.docompare(src2[ix-1], src2[ix]) > 0)
+                        printf("Failure right@%u\n", ix);
+            }
+#endif
+            //If dividing into P parts, select S*P-1 even points from each side.
+            unsigned numSamples = numPartitionSamples*numPartitions-1;
+            QuantilePositionIterator iterLeft(n1, numSamples+1, false);
+            QuantilePositionIterator iterRight(n2, numSamples+1, false);
+            iterLeft.first();
+            iterRight.first();
+
+            size_t prevLeft = 0;
+            size_t prevRight =0;
+            posLeft[0] = 0;
+            posRight[0] = 0;
+
+            //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
+            //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
+            //=> pick samples [0, 2*numSamples, 4*numSamples ...]
+            //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
+            for (unsigned part = 1; part < numPartitions; part++)
+            {
+                unsigned numToSkip = numPartitionSamples*2;
+                if (part == 1)
+                    numToSkip++;
+                for (unsigned skip=numToSkip; skip-- != 0; )
+                {
+                    size_t leftPos = iterLeft.get();
+                    size_t rightPos = iterRight.get();
+                    if (leftPos == n1)
+                    {
+                        if (skip == 0)
+                        {
+                            posLeft[part] = leftPos;
+                            posRight[part] = rightPos;
+                        }
+                        iterRight.next();
+                    }
+                    else if (rightPos == n2)
+                    {
+                        if (skip == 0)
+                        {
+                            posLeft[part] = leftPos;
+                            posRight[part] = rightPos;
+                        }
+                        iterLeft.next();
+                    }
+                    else
+                    {
+                        int c = compare.docompare(src1[leftPos], src2[rightPos]);
+                        if (skip == 0)
+                        {
+                            if (c <= 0)
+                            {
+                                //value in left is smallest.  Find the position of the value <= the left value
+                                posLeft[part] = leftPos;
+                                posRight[part] = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
+                            }
+                            else
+                            {
+                                posLeft[part] = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
+                                posRight[part] = rightPos;
+                            }
+                        }
+                        if (c <= 0)
+                        {
+                            iterLeft.next();
+                            prevLeft = leftPos;
+                        }
+                        else
+                        {
+                            iterRight.next();
+                            prevRight = rightPos;
+                        }
+                    }
+                }
+            }
+
+            posLeft[numPartitions] = n1;
+            posRight[numPartitions] = n2;
+#ifdef TRACE_PARTITION
+            printf("%d,%d -> {", (unsigned)n1, (unsigned)n2);
+#endif
+            for (unsigned i= 0; i < numPartitions; i++)
+            {
+                size_t start = posLeft[i] + posRight[i];
+                size_t end = posLeft[i+1] + posRight[i+1];
+                size_t num = end - start;
+                size_t numFwd = num/2;
+#ifdef TRACE_PARTITION
+                printf("([%d..%d],[%d..%d] %d,%d = %d)\n",
+                        (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
+                        (unsigned)start, (unsigned)end, (unsigned)num);
+#endif
+
+                MergeTask & mergeFwdTask = *tasks[i*2];
+                MergeTask & mergeRevTask = *tasks[i*2+1];
+                mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
+                                      posRight[i], posRight[i+1]-posRight[i],
+                                      numFwd);
+                mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
+                                      posRight[i], posRight[i+1]-posRight[i],
+                                      num-numFwd);
+            }
+#ifdef TRACE_PARTITION
+            printf("}\n");
+#endif
+        }
+
+        virtual task * execute()
+        {
+            calculatePartitions();
+            for (unsigned i=0; i < numPartitions*2; i++)
+            {
+                if (tasks[i]->decrement_ref_count() == 0)
+                    spawn(*tasks[i]);
+            }
             return NULL;
         }
+
+        void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
+        {
+            tasks[i*2] = fwd;
+            tasks[i*2+1] = rev;
+        }
+
+    protected:
+        size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
+        {
+            if (low == high)
+                return low;
+            while (high - low > 1)
+            {
+                size_t mid = (low + high) / 2;
+                if (compare.docompare(rows[mid], seek) < 0)
+                    low = mid;
+                else
+                    high = mid;
+            }
+            if (compare.docompare(rows[low], seek) < 0)
+                return low+1;
+            return low;
+        }
+
+        size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
+        {
+            if (low == high)
+                return low;
+            while (high - low > 1)
+            {
+                size_t mid = (low + high) / 2;
+                if (compare.docompare(rows[mid], seek) <= 0)
+                    low = mid;
+                else
+                    high = mid;
+            }
+            if (compare.docompare(rows[low], seek) <= 0)
+                return low+1;
+            return low;
+        }
+
+    protected:
+        const ICompare & compare;
+        unsigned numPartitions;
+        size_t n1;
+        size_t n2;
+        void * * src1;
+        void * * src2;
+        size_t * posLeft;
+        size_t * posRight;
+        MergeTask * * tasks;
     };
 
 public:
@@ -943,11 +1276,15 @@ public:
 
         //Merge in parallel once it is likely to be beneficial
         parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
+
         //Aim to execute in parallel until the width is 8*the maximum number of parallel task
         singleThreadDepth = ln2NumCpus + extraBisectDepth;
+        partitionCores = numCpus / 2;
     }
 
-    void sortRoot(void ** rows, size32_t n, void ** temp)
+    unsigned numPartitionCores() const { return partitionCores; }
+
+    void sortRoot(void ** rows, size_t n, void ** temp)
     {
         task * end = new (task::allocate_root()) tbb::empty_task();
         end->set_ref_count(1+1);
@@ -961,11 +1298,12 @@ public:
     const ICompare & compare;
     unsigned singleThreadDepth;
     unsigned parallelMergeDepth;
+    unsigned partitionCores;
     void * * baseRows;
 };
 
 //-------------------------------------------------------------------------------------------------------------------
-void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
 {
     if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
     {
@@ -977,9 +1315,9 @@ void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare
     sorter.sortRoot(rows, n, temp);
 }
 #else
-void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
 {
-    parqsortvecstableinplace(rows, n, compare, temp, ncpus);
+    parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
 }
 #endif
 

+ 5 - 3
system/jlib/jsort.hpp

@@ -76,8 +76,8 @@ extern jlib_decl void qsortvec(void **a, size32_t n, const ICompare & compare1,
 
 // Call with n rows of data in rows, index an (uninitialized) array of size n. The function will fill index with a stably sorted index into rows.
 extern jlib_decl void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
-extern jlib_decl void msortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
-extern jlib_decl void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
+extern jlib_decl void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp);
+extern jlib_decl void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
 
 
 extern jlib_decl void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned ncpus=0); // runs in parallel on multi-core
@@ -95,7 +95,7 @@ extern jlib_decl bool heap_push_up(unsigned c, unsigned * heap, const void ** ro
 
 
 
-inline void parsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** stableTablePtr, unsigned maxCores=0)
+inline void parsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** stableTablePtr, unsigned maxCores=0)
 {
 #ifdef _USE_TBB
     parmsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
@@ -104,6 +104,8 @@ inline void parsortvecstableinplace(void ** rows, size32_t n, const ICompare & c
 #endif
 }
 
+extern jlib_decl void tbbqsortvec(void **a, size_t n, const ICompare & compare);
+extern jlib_decl void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp);
 
 
 extern jlib_decl IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp, bool partdedup=false);

+ 93 - 0
system/jlib/jutil.hpp

@@ -336,5 +336,98 @@ struct mapEnums { int val; const char *str; };
 
 extern jlib_decl const char *getEnumText(int value, const mapEnums *map);
 
+class jlib_decl QuantilePositionIterator
+{
+public:
+    QuantilePositionIterator(size_t _numRows, unsigned _numDivisions, bool roundUp)
+    : numRows(_numRows), numDivisions(_numDivisions)
+    {
+        assertex(numDivisions);
+        step = numRows / numDivisions;
+        stepDelta = (unsigned)(numRows % numDivisions);
+        initialDelta = roundUp ? (numDivisions)/2 : (numDivisions-1)/2;
+        first();
+    }
+    bool first()
+    {
+        curRow = 0;
+        curDelta = initialDelta;
+        curQuantile = 0;
+        return true;
+    }
+    bool next()
+    {
+        if (curQuantile >= numDivisions)
+            return false;
+        curQuantile++;
+        curRow += step;
+        curDelta += stepDelta;
+        if (curDelta >= numDivisions)
+        {
+            curRow++;
+            curDelta -= numDivisions;
+        }
+        assertex(curRow <= numRows);
+        return true;
+    }
+    size_t get() { return curRow; }
+
+protected:
+    size_t numRows;
+    size_t curRow;
+    size_t step;
+    unsigned numDivisions;
+    unsigned stepDelta;
+    unsigned curQuantile;
+    unsigned curDelta;
+    unsigned initialDelta;
+};
+
+
+class jlib_decl QuantileFilterIterator
+{
+public:
+    QuantileFilterIterator(size_t _numRows, unsigned _numDivisions, bool roundUp)
+    : numRows(_numRows), numDivisions(_numDivisions)
+    {
+        assertex(numDivisions);
+        initialDelta = roundUp ? (numDivisions-1)/2 : (numDivisions)/2;
+        first();
+    }
+    bool first()
+    {
+        curRow = 0;
+        curDelta = initialDelta;
+        curQuantile = 0;
+        isQuantile = true;
+        return true;
+    }
+    bool next()
+    {
+        if (curRow > numRows)
+            return false;
+        curRow++;
+        curDelta += numDivisions;
+        isQuantile = false;
+        if (curDelta >= numRows)
+        {
+            curDelta -= numRows;
+            isQuantile = true;
+        }
+        return true;
+    }
+    size_t get() { return isQuantile; }
+
+protected:
+    size_t numRows;
+    size_t curRow;
+    size_t step;
+    size_t curDelta;
+    unsigned numDivisions;
+    unsigned curQuantile;
+    unsigned initialDelta;
+    bool isQuantile;
+};
+
 #endif
 

+ 90 - 0
testing/unittests/jlibtests.cpp

@@ -527,4 +527,94 @@ CPPUNIT_TEST_SUITE_REGISTRATION( JlibStringBufferTest );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibStringBufferTest, "JlibStringBufferTest" );
 
 
+/* =========================================================== */
+
+static const unsigned split4_2[] = {0, 2, 4 };
+static const unsigned split100_2[] = {0, 50, 100  };
+static const unsigned split100_10[] = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100  };
+static const unsigned split7_10[] = {0,1,1,2,3,3,4,5,6,6,7 };
+static const unsigned split10_3[] = {0,3,7,10 };
+static const unsigned split58_10[] = {0,6,12,17,23,29,35,41,46,52,58 };
+static const unsigned split9_2T[] = { 0,5,9 };
+static const unsigned split9_2F[] = { 0,4,9 };
+static const unsigned split15_3[] = { 0,5,10,15 };
+
+class JlibQuantileTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE( JlibQuantileTest );
+        CPPUNIT_TEST(testQuantile);
+        CPPUNIT_TEST(testRandom);
+    CPPUNIT_TEST_SUITE_END();
+
+public:
+    JlibQuantileTest()
+    {
+    }
+
+    void testQuantilePos(unsigned numItems, unsigned numDivisions, bool roundUp, const unsigned * expected)
+    {
+        if (numDivisions == 0)
+            return;
+
+        QuantilePositionIterator iter(numItems, numDivisions, roundUp);
+        QuantileFilterIterator filter(numItems, numDivisions, roundUp);
+
+        unsigned prevPos = 0;
+        iter.first();
+        for (unsigned i=0; i <= numDivisions; i++)
+        {
+            //Check the values from the quantile iterator match those that are expected
+            unsigned pos = (unsigned)iter.get();
+#if 0
+            printf("(%d,%d) %d=%d\n", numItems, numDivisions, i, pos);
+#endif
+            if (expected)
+                CPPUNIT_ASSERT_EQUAL(expected[i], pos);
+
+            //Check that the quantile filter correctly returns true and false for subsequent calls.
+            while (prevPos < pos)
+            {
+                CPPUNIT_ASSERT(!filter.get());
+                filter.next();
+                prevPos++;
+            }
+
+            if (prevPos == pos)
+            {
+                CPPUNIT_ASSERT(filter.get());
+                filter.next();
+                prevPos++;
+            }
+            iter.next();
+        }
+    }
+
+    void testQuantile()
+    {
+        testQuantilePos(4, 2, false, split4_2);
+        testQuantilePos(100, 2, false, split100_2);
+        testQuantilePos(100, 10, false, split100_10);
+        testQuantilePos(7, 10, false, split7_10);
+        testQuantilePos(10, 3, false, split10_3);
+        testQuantilePos(10, 3, true, split10_3);
+        testQuantilePos(58, 10, false, split58_10);
+        //testQuantilePos(9, 2, true, split9_2T);
+        testQuantilePos(9, 2, false, split9_2F);
+        testQuantilePos(15, 3, false, split15_3);
+        testQuantilePos(1231, 57, false, NULL);
+        testQuantilePos(1, 63, false, NULL);
+        testQuantilePos(10001, 17, false, NULL);
+    }
+    void testRandom()
+    {
+        //test various random combinations to ensure the results are consistent.
+        for (unsigned i=0; i < 10; i++)
+            testQuantilePos(random() % 1000000, random() % 10000, true, NULL);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( JlibQuantileTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibQuantileTest, "JlibQuantileTest" );
+
+
 #endif // _USE_CPPUNIT