Procházet zdrojové kódy

HPCC-15130 Reduce jlib dependencies

Move tbb requirement to thorhelper

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 9 roky
rodič
revize
ef14fd321f

+ 7 - 0
common/thorhelper/CMakeLists.txt

@@ -64,6 +64,8 @@ set (    SRCS
          roxierow.hpp
          roxierow.hpp
          thorralgo.ipp
          thorralgo.ipp
          thorrparse.ipp
          thorrparse.ipp
+         thorsort.cpp
+         thorsort.hpp
          
          
          roxiedebug.hpp
          roxiedebug.hpp
          roxiedebug.ipp
          roxiedebug.ipp
@@ -88,6 +90,7 @@ include_directories (
          ./../../rtl/include 
          ./../../rtl/include 
          ./../../roxie/roxiemem
          ./../../roxie/roxiemem
          ./../../testing/unittests
          ./../../testing/unittests
+         ${TBB_INCLUDE_DIR}
     )
     )
 
 
 ADD_DEFINITIONS( -DTHORHELPER_EXPORTS -D_USRDLL )
 ADD_DEFINITIONS( -DTHORHELPER_EXPORTS -D_USRDLL )
@@ -107,6 +110,10 @@ if (USE_NUMA)
  target_link_libraries ( thorhelper numa )
  target_link_libraries ( thorhelper numa )
 endif ()
 endif ()
 
 
+if (${USE_TBB} )
+   target_link_libraries ( thorhelper ${TBB_LIBRARIES})
+endif()
+
 
 
 IF (USE_OPENSSL)
 IF (USE_OPENSSL)
     target_link_libraries ( thorhelper 
     target_link_libraries ( thorhelper 

+ 1 - 0
common/thorhelper/roxiehelper.cpp

@@ -17,6 +17,7 @@
 
 
 #include "jexcept.hpp"
 #include "jexcept.hpp"
 #include "thorherror.h"
 #include "thorherror.h"
+#include "thorsort.hpp"
 #include "roxiehelper.hpp"
 #include "roxiehelper.hpp"
 #include "roxielmj.hpp"
 #include "roxielmj.hpp"
 #include "roxierow.hpp"
 #include "roxierow.hpp"

+ 713 - 0
common/thorhelper/thorsort.cpp

@@ -0,0 +1,713 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "thorsort.hpp"
+#include "jset.hpp"
+
+#ifdef _USE_TBB
+#include "tbb/task.h"
+#include "tbb/task_scheduler_init.h"
+#include "tbb/parallel_sort.h"
+#endif
+
+#ifdef _DEBUG
+// #define PARANOID
+//#define TESTPARSORT
+//#define MCMERGESTATS
+#endif
+
+//#define PARANOID_PARTITION
+//#define TRACE_PARTITION
+
+#define PARALLEL_GRANULARITY 1024
+
+//---------------------------------------------------------------------------
+// tbb versions of the quick sort to provide a useful base comparison
+
+class TbbCompareWrapper
+{
+public:
+    TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
+    bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
+    const ICompare & compare;
+};
+
+
+class TbbCompareIndirectWrapper
+{
+public:
+    TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
+    bool operator()(void * * const & l, void * * const & r) const
+    {
+        int ret = compare.docompare(*l,*r);
+        if (ret==0)
+        {
+            if (l < r)
+                return true;
+            else
+                return false;
+        }
+        return (ret < 0);
+    }
+    const ICompare & compare;
+};
+
+
+void tbbqsortvec(void **a, size_t n, const ICompare & compare)
+{
+#ifdef _USE_TBB
+    TbbCompareWrapper tbbcompare(compare);
+    tbb::parallel_sort(a, a+n, tbbcompare);
+#else
+    throwUnexpectedX("TBB quicksort not available");
+#endif
+}
+
+void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
+{
+#ifdef _USE_TBB
+    void * * * rowsAsIndex = (void * * *)rows;
+    memcpy(temp, rows, n * sizeof(void*));
+
+    for(unsigned i=0; i<n; ++i)
+        rowsAsIndex[i] = temp+i;
+
+    TbbCompareIndirectWrapper tbbcompare(compare);
+    tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
+
+    //I'm sure this violates the aliasing rules...
+    for(unsigned i=0; i<n; ++i)
+        rows[i] = *rowsAsIndex[i];
+#else
+    throwUnexpectedX("TBB quicksort not available");
+#endif
+}
+
+//-----------------------------------------------------------------------------------------------------------------------------
+
+inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
+{
+    void * * tgt = result;
+    loop
+    {
+       if (compare.docompare(*ret1, *ret2) <= 0)
+       {
+           *tgt++ = *ret1++;
+           if (--n1 == 0)
+           {
+               //There must be at least one row in the right partition - copy any that remain
+               do
+               {
+                   *tgt++ = *ret2++;
+               } while (--n2);
+               return result;
+           }
+       }
+       else
+       {
+           *tgt++ = *ret2++;
+           if (--n2 == 0)
+           {
+               //There must be at least one row in the left partition - copy any that remain
+               do
+               {
+                   *tgt++ = *ret1++;
+               } while (--n1);
+               return result;
+           }
+       }
+    }
+}
+
+inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
+{
+    void * * tgt = result;
+    while (n--)
+    {
+       if (compare.docompare(*ret1, *ret2) <= 0)
+       {
+           *tgt++ = *ret1++;
+           if (--n1 == 0)
+           {
+               while (n--)
+               {
+                   *tgt++ = *ret2++;
+               }
+               return result;
+           }
+       }
+       else
+       {
+           *tgt++ = *ret2++;
+           if (--n2 == 0)
+           {
+               while (n--)
+               {
+                   *tgt++ = *ret1++;
+               }
+               return result;
+           }
+       }
+    }
+    return result;
+}
+
+inline void clonePartition(void * * result, size_t n, void * * src)
+{
+    void * * tgt = result;
+    while (n--)
+       *tgt++ = *src++;
+}
+
+inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
+{
+    void * * tgt = result+n1+n2-1;
+    ret1 += (n1-1);
+    ret2 += (n2-1);
+    while (n--)
+    {
+       if (compare.docompare(*ret1, *ret2) >= 0)
+       {
+           *tgt-- = *ret1--;
+           if (--n1 == 0)
+           {
+               while (n--)
+               {
+                   *tgt-- = *ret2--;
+               }
+               return result;
+           }
+       }
+       else
+       {
+           *tgt-- = *ret2--;
+           if (--n2 == 0)
+           {
+               //There must be at least one row in the left partition - copy any that remain
+               while (n--)
+               {
+                   *tgt-- = *ret1--;
+               }
+               return result;
+           }
+       }
+    }
+    return result;
+}
+
+static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
+{
+    void * * result = (depth & 1) ? tmp : rows;
+    //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
+    //and in performance testing it executed marginally more quickly
+    if (n <= 2)
+    {
+        //Check for n == 1, but compare against 2 to avoid another comparison
+        if (n < 2)
+        {
+            if (result != rows)
+                result[0] = rows[0];
+        }
+        else
+        {
+            void * left = rows[0];
+            void * right = rows[1];
+            if (compare.docompare(left, right) <= 0)
+            {
+                result[0] = left;
+                result[1] = right;
+            }
+            else
+            {
+                result[0] = right;
+                result[1] = left;
+            }
+        }
+        return result;
+    }
+
+    size_t n1 = (n+1)/2;
+    size_t n2 = n - n1;
+    void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
+    void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
+    dbgassertex(ret2 == ret1 + n1);
+    dbgassertex(ret2 != result);
+    return mergePartitions(compare, result, n1, ret1, n2, ret2);
+}
+
+
+void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
+{
+    if (n <= 1)
+        return;
+    mergeSort(rows, n, compare, temp, 0);
+}
+
+//=========================================================================
+
+#ifdef _USE_TBB
+static const unsigned numPartitionSamples = 3;
+//These constants are probably architecture and number of core dependent
+static const size_t singleThreadedMSortThreshold = 2000;
+static const size_t multiThreadedBlockThreshold = 64;       // must be at least 2!
+
+using tbb::task;
+class TbbParallelMergeSorter
+{
+    class SplitTask : public tbb::task
+    {
+    public:
+        SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
+        {
+        }
+
+        virtual task * execute()
+        {
+            if (next1->decrement_ref_count() == 0)
+                spawn(*next1);
+            if (next2->decrement_ref_count() == 0)
+                return next2;
+            return NULL;
+        }
+    protected:
+        task * next1;
+        task * next2;
+    };
+
+    class BisectTask : public tbb::task
+    {
+    public:
+        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
+        : sorter(_sorter), rows(_rows), temp(_temp), next(_next), n(_n), depth(_depth)
+        {
+        }
+        virtual task * execute()
+        {
+            loop
+            {
+                //On entry next is assumed to be used once by this function
+                if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
+                {
+                    //Create a new task rather than calling sort directly, so that the successor is set up correctly
+                    //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
+                    task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
+                    return sort;
+                }
+
+                void * * result = (depth & 1) ? temp : rows;
+                void * * src = (depth & 1) ? rows : temp;
+                size_t n1 = (n+1)/2;
+                size_t n2 = n-n1;
+                task * mergeTask;
+                if (depth < sorter.parallelMergeDepth)
+                {
+                    unsigned partitions = sorter.numPartitionCores() >> depth;
+                    if (partitions > 1)
+                    {
+                        PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
+                        for (unsigned i=0; i < partitions; i++)
+                        {
+                            MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
+                            mergeFwdTask->set_ref_count(1);
+                            MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
+                            mergeRevTask->set_ref_count(1);
+                            splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
+                        }
+                        next->decrement_ref_count();
+                        mergeTask = splitTask;
+                    }
+                    else
+                    {
+                        task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
+                        mergeFwdTask->set_ref_count(1);
+                        task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
+                        mergeRevTask->set_ref_count(1);
+                        mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
+                    }
+                }
+                else
+                {
+                    mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
+                }
+
+                mergeTask->set_ref_count(2);
+                task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
+                spawn(*bisectRightTask);
+
+                //recurse directly on the left side rather than creating a new task
+                n = n1;
+                depth = depth+1;
+                next = mergeTask;
+            }
+        }
+    protected:
+        TbbParallelMergeSorter & sorter;
+        void ** rows;
+        void ** temp;
+        task * next;
+        size_t n;
+        unsigned depth;
+    };
+
+
+    class SubSortTask : public tbb::task
+    {
+    public:
+        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
+        : sorter(_sorter), rows(_rows), temp(_temp), n(_n), depth(_depth)
+        {
+        }
+
+        virtual task * execute()
+        {
+            mergeSort(rows, n, sorter.compare, temp, depth);
+            return NULL;
+        }
+    protected:
+        TbbParallelMergeSorter & sorter;
+        void ** rows;
+        void ** temp;
+        size_t n;
+        unsigned depth;
+    };
+
+
+    class MergeTask : public tbb::task
+    {
+    public:
+        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
+        : compare(_compare),result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
+        {
+        }
+
+        virtual task * execute()
+        {
+            //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
+            if (n1 == 0)
+            {
+                assertex(n <= n2);
+                clonePartition(result, n, src2);
+            }
+            else if (n2 == 0)
+            {
+                assertex(n <= n1);
+                clonePartition(result, n, src1);
+            }
+            else
+                mergePartitions(compare, result, n1, src1, n2, src2, n);
+            return NULL;
+        }
+
+        void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
+        {
+            src1 += deltaLeft;
+            n1 = numLeft;
+            src2 += deltaRight;
+            n2 = numRight;
+            result += (deltaLeft + deltaRight);
+            n = num;
+        }
+
+    protected:
+        const ICompare & compare;
+        void * * result;
+        void * * src1;
+        void * * src2;
+        size_t n1;
+        size_t n2;
+        size_t n;
+    };
+
+    class MergeRevTask : public MergeTask
+    {
+    public:
+        MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
+        : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
+        {
+        }
+
+        virtual task * execute()
+        {
+            if (n1 == 0)
+            {
+                assertex(n <= n2);
+                //This is a reverse merge, so copy n from the end of the input
+                unsigned delta = n2 - n;
+                clonePartition(result + delta, n, src2 + delta);
+            }
+            else if (n2 == 0)
+            {
+                assertex(n <= n1);
+                unsigned delta = n1 - n;
+                clonePartition(result + delta, n, src1 + delta);
+            }
+            else
+                mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
+            return NULL;
+        }
+    };
+
+    class PartitionSplitTask : public tbb::task
+    {
+    public:
+        PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
+            : compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
+        {
+            //These could be local variables in calculatePartitions(), but placed here to simplify cleanup.  (Should consider using alloca)
+            posLeft = new size_t[numPartitions+1];
+            posRight = new size_t[numPartitions+1];
+            tasks = new MergeTask *[numPartitions*2];
+            for (unsigned i=0; i < numPartitions*2; i++)
+                tasks[i] = NULL;
+        }
+        ~PartitionSplitTask()
+        {
+            delete [] posLeft;
+            delete [] posRight;
+            delete [] tasks;
+        }
+
+        void calculatePartitions()
+        {
+#ifdef PARANOID_PARTITION
+            {
+                for (unsigned ix=1; ix<n1; ix++)
+                    if (compare.docompare(src1[ix-1], src1[ix]) > 0)
+                        DBGLOG("Failure left@%u", ix);
+            }
+            {
+                for (unsigned ix=1; ix<n2; ix++)
+                    if (compare.docompare(src2[ix-1], src2[ix]) > 0)
+                        DBGLOG("Failure right@%u", ix);
+            }
+#endif
+            //If dividing into P parts, select S*P-1 even points from each side.
+            unsigned numSamples = numPartitionSamples*numPartitions-1;
+            QuantilePositionIterator iterLeft(n1, numSamples+1, false);
+            QuantilePositionIterator iterRight(n2, numSamples+1, false);
+            iterLeft.first();
+            iterRight.first();
+
+            size_t prevLeft = 0;
+            size_t prevRight =0;
+            posLeft[0] = 0;
+            posRight[0] = 0;
+
+            //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
+            //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
+            //=> pick samples [0, 2*numSamples, 4*numSamples ...]
+            //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
+            for (unsigned part = 1; part < numPartitions; part++)
+            {
+                unsigned numToSkip = numPartitionSamples*2;
+                if (part == 1)
+                    numToSkip++;
+                for (unsigned skip=numToSkip; skip-- != 0; )
+                {
+                    size_t leftPos = iterLeft.get();
+                    size_t rightPos = iterRight.get();
+                    int c;
+                    if (leftPos == n1)
+                        c = +1;
+                    else if (rightPos == n2)
+                        c = -1;
+                    else
+                        c = compare.docompare(src1[leftPos], src2[rightPos]);
+
+                    if (skip == 0)
+                    {
+                        if (c <= 0)
+                        {
+                            //value in left is smallest.  Find the position of the value <= the left value
+                            posLeft[part] = leftPos;
+                            size_t matchRight = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
+                            posRight[part] = matchRight;
+                            prevRight = matchRight;  // potentially reduce the search range next time
+                        }
+                        else
+                        {
+                            size_t matchLeft = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
+                            posLeft[part] = matchLeft;
+                            posRight[part] = rightPos;
+                            prevLeft = matchLeft;  // potentially reduce the search range next time
+                        }
+                    }
+                    if (c <= 0)
+                    {
+                        iterLeft.next();
+                        prevLeft = leftPos;
+                    }
+                    else
+                    {
+                        iterRight.next();
+                        prevRight = rightPos;
+                    }
+                }
+            }
+
+            posLeft[numPartitions] = n1;
+            posRight[numPartitions] = n2;
+#ifdef TRACE_PARTITION
+            DBGLOG("%d,%d -> {", (unsigned)n1, (unsigned)n2);
+#endif
+            for (unsigned i= 0; i < numPartitions; i++)
+            {
+                size_t start = posLeft[i] + posRight[i];
+                size_t end = posLeft[i+1] + posRight[i+1];
+                size_t num = end - start;
+                size_t numFwd = num/2;
+#ifdef TRACE_PARTITION
+                DBGLOG("  ([%d..%d],[%d..%d] %d,%d = %d)",
+                        (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
+                        (unsigned)start, (unsigned)end, (unsigned)num);
+#endif
+
+                MergeTask & mergeFwdTask = *tasks[i*2];
+                MergeTask & mergeRevTask = *tasks[i*2+1];
+                mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
+                                      posRight[i], posRight[i+1]-posRight[i],
+                                      numFwd);
+                mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
+                                      posRight[i], posRight[i+1]-posRight[i],
+                                      num-numFwd);
+            }
+        }
+
+        virtual task * execute()
+        {
+            calculatePartitions();
+            for (unsigned i=0; i < numPartitions*2; i++)
+            {
+                if (tasks[i]->decrement_ref_count() == 0)
+                    spawn(*tasks[i]);
+            }
+            return NULL;
+        }
+
+        void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
+        {
+            tasks[i*2] = fwd;
+            tasks[i*2+1] = rev;
+        }
+
+    protected:
+        size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
+        {
+            if (low == high)
+                return low;
+            while (high - low > 1)
+            {
+                size_t mid = low + (high - low) / 2;
+                if (compare.docompare(rows[mid], seek) < 0)
+                    low = mid;
+                else
+                    high = mid;
+            }
+            if (compare.docompare(rows[low], seek) < 0)
+                return low+1;
+            return low;
+        }
+
+        size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
+        {
+            if (low == high)
+                return low;
+            while (high - low > 1)
+            {
+                size_t mid = low + (high - low) / 2;
+                if (compare.docompare(rows[mid], seek) <= 0)
+                    low = mid;
+                else
+                    high = mid;
+            }
+            if (compare.docompare(rows[low], seek) <= 0)
+                return low+1;
+            return low;
+        }
+
+    protected:
+        const ICompare & compare;
+        unsigned numPartitions;
+        size_t n1;
+        size_t n2;
+        void * * src1;
+        void * * src2;
+        size_t * posLeft;
+        size_t * posRight;
+        MergeTask * * tasks;
+    };
+
+public:
+    TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
+    {
+        //The following constants control the number of iterations to be performed in parallel.
+        //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
+        //The following constants should possibly be tuned on each platform.  The following gave a good balance on a 2x8way xeon
+        const unsigned extraBisectDepth = 3;
+        const unsigned extraParallelMergeDepth = 3;
+
+        unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
+        unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
+        assertex(numCpus <= (1U << ln2NumCpus));
+
+        //Merge in parallel once it is likely to be beneficial
+        parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
+
+        //Aim to execute in parallel until the width is 8*the maximum number of parallel task
+        singleThreadDepth = ln2NumCpus + extraBisectDepth;
+        partitionCores = numCpus / 2;
+    }
+
+    unsigned numPartitionCores() const { return partitionCores; }
+
+    void sortRoot(void ** rows, size_t n, void ** temp)
+    {
+        task * end = new (task::allocate_root()) tbb::empty_task();
+        end->set_ref_count(1+1);
+        task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
+        end->spawn(*task);
+        end->wait_for_all();
+        end->destroy(*end);
+    }
+
+public:
+    const ICompare & compare;
+    unsigned singleThreadDepth;
+    unsigned parallelMergeDepth;
+    unsigned partitionCores;
+    void * * baseRows;
+};
+
+//-------------------------------------------------------------------------------------------------------------------
+void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+{
+    if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
+    {
+        msortvecstableinplace(rows, n, compare, temp);
+        return;
+    }
+
+    TbbParallelMergeSorter sorter(rows, compare);
+    sorter.sortRoot(rows, n, temp);
+}
+#else
+void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+{
+    parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
+}
+#endif

+ 41 - 0
common/thorhelper/thorsort.hpp

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+
+#ifndef THORSORT_HPP
+#define THORSORT_HPP
+
+#include "thorhelper.hpp"
+#include "jsort.hpp"
+
+extern THORHELPER_API void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp);
+extern THORHELPER_API void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
+
+inline void parsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** stableTablePtr, unsigned maxCores=0)
+{
+#ifdef _USE_TBB
+    parmsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
+#else
+    parqsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
+#endif
+}
+
+extern THORHELPER_API void tbbqsortvec(void **a, size_t n, const ICompare & compare);
+extern THORHELPER_API void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp);
+
+#endif

+ 1 - 0
ecl/hthor/hthor.cpp

@@ -40,6 +40,7 @@
 #include "dasess.hpp"
 #include "dasess.hpp"
 #include "dadfs.hpp"
 #include "dadfs.hpp"
 #include "thorfile.hpp"
 #include "thorfile.hpp"
+#include "thorsort.hpp"
 #include "thorparse.ipp"
 #include "thorparse.ipp"
 #include "thorxmlwrite.hpp"
 #include "thorxmlwrite.hpp"
 #include "jsmartsock.hpp"
 #include "jsmartsock.hpp"

+ 1 - 0
roxie/ccd/ccdserver.cpp

@@ -28,6 +28,7 @@
 #include "thorxmlwrite.hpp"
 #include "thorxmlwrite.hpp"
 #include "thorsoapcall.hpp"
 #include "thorsoapcall.hpp"
 #include "thorcommon.ipp"
 #include "thorcommon.ipp"
+#include "thorsort.hpp"
 #include "jlzw.hpp"
 #include "jlzw.hpp"
 #include "javahash.hpp"
 #include "javahash.hpp"
 #include "javahash.tpp"
 #include "javahash.tpp"

+ 5 - 0
roxie/roxiemem/CMakeLists.txt

@@ -46,6 +46,11 @@ HPCC_ADD_LIBRARY( roxiemem SHARED ${SRCS} ${INCLUDES})
 if (NOT PLUGIN)
 if (NOT PLUGIN)
   install ( TARGETS roxiemem RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
   install ( TARGETS roxiemem RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
 endif()
 endif()
+
+if (${USE_TBB} )
+   target_link_libraries ( roxiemem ${TBB_LIBRARIES})
+endif()
+
 target_link_libraries ( roxiemem
 target_link_libraries ( roxiemem
          jlib
          jlib
          ${CPPUNIT_LIBRARIES}
          ${CPPUNIT_LIBRARIES}

+ 0 - 5
system/jlib/CMakeLists.txt

@@ -179,7 +179,6 @@ include_directories (
          ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file 
          ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file 
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
          ${CMAKE_BINARY_DIR}/oss
-         ${TBB_INCLUDE_DIR}
     )
     )
 
 
 ADD_DEFINITIONS( -DLOGMSGCOMPONENT=1 -D_USRDLL -DJLIB_EXPORTS )
 ADD_DEFINITIONS( -DLOGMSGCOMPONENT=1 -D_USRDLL -DJLIB_EXPORTS )
@@ -190,10 +189,6 @@ target_link_libraries ( jlib
         lz4
         lz4
        )
        )
 
 
-if ( ${USE_TBB} )
-   target_link_libraries ( jlib ${TBB_LIBRARIES})
-endif()
-
 if ( ${HAVE_LIBDL} )
 if ( ${HAVE_LIBDL} )
 target_link_libraries ( jlib dl)
 target_link_libraries ( jlib dl)
 endif ( ${HAVE_LIBDL} )
 endif ( ${HAVE_LIBDL} )

+ 0 - 686
system/jlib/jsort.cpp

@@ -28,12 +28,6 @@
 #include "jset.hpp"
 #include "jset.hpp"
 #include "jutil.hpp"
 #include "jutil.hpp"
 
 
-#ifdef _USE_TBB
-#include "tbb/task.h"
-#include "tbb/task_scheduler_init.h"
-#include "tbb/parallel_sort.h"
-#endif
-
 #ifdef _DEBUG
 #ifdef _DEBUG
 // #define PARANOID
 // #define PARANOID
 //#define TESTPARSORT
 //#define TESTPARSORT
@@ -244,71 +238,6 @@ void qsortvec(void **a, size32_t n, sortCompareFunction compare)
 #undef MED3
 #undef MED3
 #undef RECURSE
 #undef RECURSE
 
 
-//---------------------------------------------------------------------------
-// tbb versions of the quick sort to provide a useful base comparison
-
-class TbbCompareWrapper
-{
-public:
-    TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
-    bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
-    const ICompare & compare;
-};
-
-
-class TbbCompareIndirectWrapper
-{
-public:
-    TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
-    bool operator()(void * * const & l, void * * const & r) const
-    {
-        int ret = compare.docompare(*l,*r);
-        if (ret==0)
-        {
-            if (l < r)
-                return true;
-            else
-                return false;
-        }
-        return (ret < 0);
-    }
-    const ICompare & compare;
-};
-
-
-void tbbqsortvec(void **a, size_t n, const ICompare & compare)
-{
-#ifdef _USE_TBB
-    TbbCompareWrapper tbbcompare(compare);
-    tbb::parallel_sort(a, a+n, tbbcompare);
-#else
-    throwUnexpectedX("TBB quicksort not available");
-#endif
-}
-
-void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
-{
-#ifdef _USE_TBB
-    void * * * rowsAsIndex = (void * * *)rows;
-    memcpy(temp, rows, n * sizeof(void*));
-
-    for(unsigned i=0; i<n; ++i)
-        rowsAsIndex[i] = temp+i;
-
-    TbbCompareIndirectWrapper tbbcompare(compare);
-    tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
-
-    //I'm sure this violates the aliasing rules...
-    for(unsigned i=0; i<n; ++i)
-        rows[i] = *rowsAsIndex[i];
-#else
-    throwUnexpectedX("TBB quicksort not available");
-#endif
-}
-
-
-//---------------------------------------------------------------------------
-
 #define CMP(a,b)         (compare.docompare(*(a),*(b)))
 #define CMP(a,b)         (compare.docompare(*(a),*(b)))
 #define MED3(a,b,c)      med3ic(a,b,c,compare)
 #define MED3(a,b,c)      med3ic(a,b,c,compare)
 #define RECURSE(a,b)     qsortvec(a, b, compare)
 #define RECURSE(a,b)     qsortvec(a, b, compare)
@@ -692,621 +621,6 @@ void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare
         rows[i] = *rowsAsIndex[i];
         rows[i] = *rowsAsIndex[i];
 }
 }
 
 
-
-//-----------------------------------------------------------------------------------------------------------------------------
-
-inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
-{
-    void * * tgt = result;
-    loop
-    {
-       if (compare.docompare(*ret1, *ret2) <= 0)
-       {
-           *tgt++ = *ret1++;
-           if (--n1 == 0)
-           {
-               //There must be at least one row in the right partition - copy any that remain
-               do
-               {
-                   *tgt++ = *ret2++;
-               } while (--n2);
-               return result;
-           }
-       }
-       else
-       {
-           *tgt++ = *ret2++;
-           if (--n2 == 0)
-           {
-               //There must be at least one row in the left partition - copy any that remain
-               do
-               {
-                   *tgt++ = *ret1++;
-               } while (--n1);
-               return result;
-           }
-       }
-    }
-}
-
-inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
-{
-    void * * tgt = result;
-    while (n--)
-    {
-       if (compare.docompare(*ret1, *ret2) <= 0)
-       {
-           *tgt++ = *ret1++;
-           if (--n1 == 0)
-           {
-               while (n--)
-               {
-                   *tgt++ = *ret2++;
-               }
-               return result;
-           }
-       }
-       else
-       {
-           *tgt++ = *ret2++;
-           if (--n2 == 0)
-           {
-               while (n--)
-               {
-                   *tgt++ = *ret1++;
-               }
-               return result;
-           }
-       }
-    }
-    return result;
-}
-
-inline void clonePartition(void * * result, size_t n, void * * src)
-{
-    void * * tgt = result;
-    while (n--)
-       *tgt++ = *src++;
-}
-
-inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
-{
-    void * * tgt = result+n1+n2-1;
-    ret1 += (n1-1);
-    ret2 += (n2-1);
-    while (n--)
-    {
-       if (compare.docompare(*ret1, *ret2) >= 0)
-       {
-           *tgt-- = *ret1--;
-           if (--n1 == 0)
-           {
-               while (n--)
-               {
-                   *tgt-- = *ret2--;
-               }
-               return result;
-           }
-       }
-       else
-       {
-           *tgt-- = *ret2--;
-           if (--n2 == 0)
-           {
-               //There must be at least one row in the left partition - copy any that remain
-               while (n--)
-               {
-                   *tgt-- = *ret1--;
-               }
-               return result;
-           }
-       }
-    }
-    return result;
-}
-
-static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
-{
-    void * * result = (depth & 1) ? tmp : rows;
-    //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
-    //and in performance testing it executed marginally more quickly
-    if (n <= 2)
-    {
-        //Check for n == 1, but compare against 2 to avoid another comparison
-        if (n < 2)
-        {
-            if (result != rows)
-                result[0] = rows[0];
-        }
-        else
-        {
-            void * left = rows[0];
-            void * right = rows[1];
-            if (compare.docompare(left, right) <= 0)
-            {
-                result[0] = left;
-                result[1] = right;
-            }
-            else
-            {
-                result[0] = right;
-                result[1] = left;
-            }
-        }
-        return result;
-    }
-
-    size_t n1 = (n+1)/2;
-    size_t n2 = n - n1;
-    void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
-    void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
-    dbgassertex(ret2 == ret1 + n1);
-    dbgassertex(ret2 != result);
-    return mergePartitions(compare, result, n1, ret1, n2, ret2);
-}
-
-
-void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
-{
-    if (n <= 1)
-        return;
-    mergeSort(rows, n, compare, temp, 0);
-}
-
-//=========================================================================
-
-#ifdef _USE_TBB
-static const unsigned numPartitionSamples = 3;
-//These constants are probably architecture and number of core dependent
-static const size_t singleThreadedMSortThreshold = 2000;
-static const size_t multiThreadedBlockThreshold = 64;       // must be at least 2!
-
-using tbb::task;
-class TbbParallelMergeSorter
-{
-    class SplitTask : public tbb::task
-    {
-    public:
-        SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
-        {
-        }
-
-        virtual task * execute()
-        {
-            if (next1->decrement_ref_count() == 0)
-                spawn(*next1);
-            if (next2->decrement_ref_count() == 0)
-                return next2;
-            return NULL;
-        }
-    protected:
-        task * next1;
-        task * next2;
-    };
-
-    class BisectTask : public tbb::task
-    {
-    public:
-        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
-        : sorter(_sorter), rows(_rows), temp(_temp), next(_next), n(_n), depth(_depth)
-        {
-        }
-        virtual task * execute()
-        {
-            loop
-            {
-                //On entry next is assumed to be used once by this function
-                if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
-                {
-                    //Create a new task rather than calling sort directly, so that the successor is set up correctly
-                    //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
-                    task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
-                    return sort;
-                }
-
-                void * * result = (depth & 1) ? temp : rows;
-                void * * src = (depth & 1) ? rows : temp;
-                size_t n1 = (n+1)/2;
-                size_t n2 = n-n1;
-                task * mergeTask;
-                if (depth < sorter.parallelMergeDepth)
-                {
-                    unsigned partitions = sorter.numPartitionCores() >> depth;
-                    if (partitions > 1)
-                    {
-                        PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
-                        for (unsigned i=0; i < partitions; i++)
-                        {
-                            MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
-                            mergeFwdTask->set_ref_count(1);
-                            MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
-                            mergeRevTask->set_ref_count(1);
-                            splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
-                        }
-                        next->decrement_ref_count();
-                        mergeTask = splitTask;
-                    }
-                    else
-                    {
-                        task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
-                        mergeFwdTask->set_ref_count(1);
-                        task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
-                        mergeRevTask->set_ref_count(1);
-                        mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
-                    }
-                }
-                else
-                {
-                    mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
-                }
-
-                mergeTask->set_ref_count(2);
-                task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
-                spawn(*bisectRightTask);
-
-                //recurse directly on the left side rather than creating a new task
-                n = n1;
-                depth = depth+1;
-                next = mergeTask;
-            }
-        }
-    protected:
-        TbbParallelMergeSorter & sorter;
-        void ** rows;
-        void ** temp;
-        task * next;
-        size_t n;
-        unsigned depth;
-    };
-
-
-    class SubSortTask : public tbb::task
-    {
-    public:
-        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
-        : sorter(_sorter), rows(_rows), temp(_temp), n(_n), depth(_depth)
-        {
-        }
-
-        virtual task * execute()
-        {
-            mergeSort(rows, n, sorter.compare, temp, depth);
-            return NULL;
-        }
-    protected:
-        TbbParallelMergeSorter & sorter;
-        void ** rows;
-        void ** temp;
-        size_t n;
-        unsigned depth;
-    };
-
-
-    class MergeTask : public tbb::task
-    {
-    public:
-        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
-        : compare(_compare),result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
-        {
-        }
-
-        virtual task * execute()
-        {
-            //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
-            if (n1 == 0)
-            {
-                assertex(n <= n2);
-                clonePartition(result, n, src2);
-            }
-            else if (n2 == 0)
-            {
-                assertex(n <= n1);
-                clonePartition(result, n, src1);
-            }
-            else
-                mergePartitions(compare, result, n1, src1, n2, src2, n);
-            return NULL;
-        }
-
-        void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
-        {
-            src1 += deltaLeft;
-            n1 = numLeft;
-            src2 += deltaRight;
-            n2 = numRight;
-            result += (deltaLeft + deltaRight);
-            n = num;
-        }
-
-    protected:
-        const ICompare & compare;
-        void * * result;
-        void * * src1;
-        void * * src2;
-        size_t n1;
-        size_t n2;
-        size_t n;
-    };
-
-    class MergeRevTask : public MergeTask
-    {
-    public:
-        MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
-        : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
-        {
-        }
-
-        virtual task * execute()
-        {
-            if (n1 == 0)
-            {
-                assertex(n <= n2);
-                //This is a reverse merge, so copy n from the end of the input
-                unsigned delta = n2 - n;
-                clonePartition(result + delta, n, src2 + delta);
-            }
-            else if (n2 == 0)
-            {
-                assertex(n <= n1);
-                unsigned delta = n1 - n;
-                clonePartition(result + delta, n, src1 + delta);
-            }
-            else
-                mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
-            return NULL;
-        }
-    };
-
-    class PartitionSplitTask : public tbb::task
-    {
-    public:
-        PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
-            : compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
-        {
-            //These could be local variables in calculatePartitions(), but placed here to simplify cleanup.  (Should consider using alloca)
-            posLeft = new size_t[numPartitions+1];
-            posRight = new size_t[numPartitions+1];
-            tasks = new MergeTask *[numPartitions*2];
-            for (unsigned i=0; i < numPartitions*2; i++)
-                tasks[i] = NULL;
-        }
-        ~PartitionSplitTask()
-        {
-            delete [] posLeft;
-            delete [] posRight;
-            delete [] tasks;
-        }
-
-        void calculatePartitions()
-        {
-#ifdef PARANOID_PARTITION
-            {
-                for (unsigned ix=1; ix<n1; ix++)
-                    if (compare.docompare(src1[ix-1], src1[ix]) > 0)
-                        DBGLOG("Failure left@%u", ix);
-            }
-            {
-                for (unsigned ix=1; ix<n2; ix++)
-                    if (compare.docompare(src2[ix-1], src2[ix]) > 0)
-                        DBGLOG("Failure right@%u", ix);
-            }
-#endif
-            //If dividing into P parts, select S*P-1 even points from each side.
-            unsigned numSamples = numPartitionSamples*numPartitions-1;
-            QuantilePositionIterator iterLeft(n1, numSamples+1, false);
-            QuantilePositionIterator iterRight(n2, numSamples+1, false);
-            iterLeft.first();
-            iterRight.first();
-
-            size_t prevLeft = 0;
-            size_t prevRight =0;
-            posLeft[0] = 0;
-            posRight[0] = 0;
-
-            //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
-            //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
-            //=> pick samples [0, 2*numSamples, 4*numSamples ...]
-            //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
-            for (unsigned part = 1; part < numPartitions; part++)
-            {
-                unsigned numToSkip = numPartitionSamples*2;
-                if (part == 1)
-                    numToSkip++;
-                for (unsigned skip=numToSkip; skip-- != 0; )
-                {
-                    size_t leftPos = iterLeft.get();
-                    size_t rightPos = iterRight.get();
-                    int c;
-                    if (leftPos == n1)
-                        c = +1;
-                    else if (rightPos == n2)
-                        c = -1;
-                    else
-                        c = compare.docompare(src1[leftPos], src2[rightPos]);
-
-                    if (skip == 0)
-                    {
-                        if (c <= 0)
-                        {
-                            //value in left is smallest.  Find the position of the value <= the left value
-                            posLeft[part] = leftPos;
-                            size_t matchRight = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
-                            posRight[part] = matchRight;
-                            prevRight = matchRight;  // potentially reduce the search range next time
-                        }
-                        else
-                        {
-                            size_t matchLeft = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
-                            posLeft[part] = matchLeft;
-                            posRight[part] = rightPos;
-                            prevLeft = matchLeft;  // potentially reduce the search range next time
-                        }
-                    }
-                    if (c <= 0)
-                    {
-                        iterLeft.next();
-                        prevLeft = leftPos;
-                    }
-                    else
-                    {
-                        iterRight.next();
-                        prevRight = rightPos;
-                    }
-                }
-            }
-
-            posLeft[numPartitions] = n1;
-            posRight[numPartitions] = n2;
-#ifdef TRACE_PARTITION
-            DBGLOG("%d,%d -> {", (unsigned)n1, (unsigned)n2);
-#endif
-            for (unsigned i= 0; i < numPartitions; i++)
-            {
-                size_t start = posLeft[i] + posRight[i];
-                size_t end = posLeft[i+1] + posRight[i+1];
-                size_t num = end - start;
-                size_t numFwd = num/2;
-#ifdef TRACE_PARTITION
-                DBGLOG("  ([%d..%d],[%d..%d] %d,%d = %d)",
-                        (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
-                        (unsigned)start, (unsigned)end, (unsigned)num);
-#endif
-
-                MergeTask & mergeFwdTask = *tasks[i*2];
-                MergeTask & mergeRevTask = *tasks[i*2+1];
-                mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
-                                      posRight[i], posRight[i+1]-posRight[i],
-                                      numFwd);
-                mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
-                                      posRight[i], posRight[i+1]-posRight[i],
-                                      num-numFwd);
-            }
-        }
-
-        virtual task * execute()
-        {
-            calculatePartitions();
-            for (unsigned i=0; i < numPartitions*2; i++)
-            {
-                if (tasks[i]->decrement_ref_count() == 0)
-                    spawn(*tasks[i]);
-            }
-            return NULL;
-        }
-
-        void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
-        {
-            tasks[i*2] = fwd;
-            tasks[i*2+1] = rev;
-        }
-
-    protected:
-        size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
-        {
-            if (low == high)
-                return low;
-            while (high - low > 1)
-            {
-                size_t mid = low + (high - low) / 2;
-                if (compare.docompare(rows[mid], seek) < 0)
-                    low = mid;
-                else
-                    high = mid;
-            }
-            if (compare.docompare(rows[low], seek) < 0)
-                return low+1;
-            return low;
-        }
-
-        size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
-        {
-            if (low == high)
-                return low;
-            while (high - low > 1)
-            {
-                size_t mid = low + (high - low) / 2;
-                if (compare.docompare(rows[mid], seek) <= 0)
-                    low = mid;
-                else
-                    high = mid;
-            }
-            if (compare.docompare(rows[low], seek) <= 0)
-                return low+1;
-            return low;
-        }
-
-    protected:
-        const ICompare & compare;
-        unsigned numPartitions;
-        size_t n1;
-        size_t n2;
-        void * * src1;
-        void * * src2;
-        size_t * posLeft;
-        size_t * posRight;
-        MergeTask * * tasks;
-    };
-
-public:
-    TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
-    {
-        //The following constants control the number of iterations to be performed in parallel.
-        //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
-        //The following constants should possibly be tuned on each platform.  The following gave a good balance on a 2x8way xeon
-        const unsigned extraBisectDepth = 3;
-        const unsigned extraParallelMergeDepth = 3;
-
-        unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
-        unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
-        assertex(numCpus <= (1U << ln2NumCpus));
-
-        //Merge in parallel once it is likely to be beneficial
-        parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
-
-        //Aim to execute in parallel until the width is 8*the maximum number of parallel task
-        singleThreadDepth = ln2NumCpus + extraBisectDepth;
-        partitionCores = numCpus / 2;
-    }
-
-    unsigned numPartitionCores() const { return partitionCores; }
-
-    void sortRoot(void ** rows, size_t n, void ** temp)
-    {
-        task * end = new (task::allocate_root()) tbb::empty_task();
-        end->set_ref_count(1+1);
-        task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
-        end->spawn(*task);
-        end->wait_for_all();
-        end->destroy(*end);
-    }
-
-public:
-    const ICompare & compare;
-    unsigned singleThreadDepth;
-    unsigned parallelMergeDepth;
-    unsigned partitionCores;
-    void * * baseRows;
-};
-
-//-------------------------------------------------------------------------------------------------------------------
-void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
-{
-    if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
-    {
-        msortvecstableinplace(rows, n, compare, temp);
-        return;
-    }
-
-    TbbParallelMergeSorter sorter(rows, compare);
-    sorter.sortRoot(rows, n, temp);
-}
-#else
-void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
-{
-    parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
-}
-#endif
-
 //=========================================================================
 //=========================================================================
 
 
 bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
 bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)

+ 0 - 19
system/jlib/jsort.hpp

@@ -76,9 +76,6 @@ extern jlib_decl void qsortvec(void **a, size32_t n, const ICompare & compare1,
 
 
 // Call with n rows of data in rows, index an (uninitialized) array of size n. The function will fill index with a stably sorted index into rows.
 // Call with n rows of data in rows, index an (uninitialized) array of size n. The function will fill index with a stably sorted index into rows.
 extern jlib_decl void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
 extern jlib_decl void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
-extern jlib_decl void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp);
-extern jlib_decl void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
-
 
 
 extern jlib_decl void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned ncpus=0); // runs in parallel on multi-core
 extern jlib_decl void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned ncpus=0); // runs in parallel on multi-core
 extern jlib_decl void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0); // runs in parallel on multi-core
 extern jlib_decl void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0); // runs in parallel on multi-core
@@ -93,25 +90,9 @@ extern jlib_decl bool heap_push_down(unsigned p, unsigned num, unsigned * heap,
 // assuming that all elements <c form a heap, this function pushes c up to its correct position; it returns true if no change is made
 // assuming that all elements <c form a heap, this function pushes c up to its correct position; it returns true if no change is made
 extern jlib_decl bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare);
 extern jlib_decl bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare);
 
 
-
-
-inline void parsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** stableTablePtr, unsigned maxCores=0)
-{
-#ifdef _USE_TBB
-    parmsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
-#else
-    parqsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
-#endif
-}
-
-extern jlib_decl void tbbqsortvec(void **a, size_t n, const ICompare & compare);
-extern jlib_decl void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp);
-
-
 extern jlib_decl IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp, bool partdedup=false);
 extern jlib_decl IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp, bool partdedup=false);
 extern jlib_decl IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp, bool partdedup, IRowLinkCounter *linkcounter);
 extern jlib_decl IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp, bool partdedup, IRowLinkCounter *linkcounter);
 
 
-
 class ISortedRowProvider
 class ISortedRowProvider
 {
 {
 public:
 public:

+ 0 - 62
system/jlib/jsort3.inc

@@ -1,62 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-
-// skeleton for heapsort - define macros for ELEM and LT to use
-
-//void heapsort(ELEM *elems, unsigned n)
-{
-    if (n==0)
-        return;
-    ELEM tmp;
-    unsigned k;
-    for (k=0;k<n;k++) {
-        tmp = elems[k];
-        unsigned i = k;
-        while (i>0) {
-            unsigned j = (i-1)/2;
-            if (LT(elems[j],tmp)) {
-                elems[i] = elems[j];
-                i = j;
-            }
-            else
-                break;
-        }
-        elems[i] = tmp;
-    }
-    for (k=n-1; k>0; k--) {
-        tmp = elems[k];
-        elems[k] = elems[0]; 
-        unsigned j=0;
-        loop {
-            unsigned p=2*(j+1);
-            unsigned i;
-            if (p>k)
-                break;
-            if ((p!=k)&&LT(elems[p-1],elems[p]))
-                i = p;
-            else
-                i = p-1;
-            if (LT(tmp,elems[i]))
-                elems[j] = elems[i];
-            else
-                break;
-            j = i;
-        }
-        elems[j] = tmp;
-    }
-}

+ 1 - 0
thorlcr/thorutil/thmem.cpp

@@ -27,6 +27,7 @@
 #include "thbufdef.hpp"
 #include "thbufdef.hpp"
 #include "thor.hpp"
 #include "thor.hpp"
 #include "thormisc.hpp"
 #include "thormisc.hpp"
+#include "thorsort.hpp"
 #include "eclhelper.hpp"
 #include "eclhelper.hpp"
 #include "dautils.hpp"
 #include "dautils.hpp"
 #include "daclient.hpp"
 #include "daclient.hpp"