Explorar el Código

Merge pull request #7455 from ghalliday/issue13710

HPCC-13710 Multi threaded implementation of merge sort using tbb

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 10 años
padre
commit
2f5a50cdf2

+ 59 - 0
cmake_modules/FindTBB.cmake

@@ -0,0 +1,59 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+
+# - Try to find the TBB compression library
+# Once done this will define
+#
+#  TBB_FOUND - system has the TBB library
+#  TBB_INCLUDE_DIR - the TBB include directory
+#  TBB_LIBRARIES - The libraries needed to use TBB
+
+IF (NOT TBB_FOUND)
+  SET (tbb_lib "tbb")
+
+  IF (NOT "${EXTERNALS_DIRECTORY}" STREQUAL "")
+    IF (WIN32)
+      IF (${ARCH64BIT} EQUAL 1)
+        SET (osdir "Win64")
+      ELSE()
+        SET (osdir "Win32")
+      ENDIF()
+      SET (tbbver "1.2.8")
+    ELSE()
+      SET (osdir "unknown")
+      SET (tbbver "unknown")
+    ENDIF()
+    IF (NOT ("${osdir}" STREQUAL "unknown"))
+      FIND_PATH (TBB_INCLUDE_DIR NAMES tbb/tbb.h PATHS "${EXTERNALS_DIRECTORY}/tbb/${tbbver}/include" NO_DEFAULT_PATH)
+      FIND_LIBRARY (TBB_LIBRARIES NAMES ${tbb_lib} PATHS "${EXTERNALS_DIRECTORY}/tbb/${tbbver}/lib/${osdir}" NO_DEFAULT_PATH)
+    ENDIF()
+  ENDIF()
+
+  if (USE_NATIVE_LIBRARIES)
+    # if we didn't find in externals, look in system include path
+    FIND_PATH (TBB_INCLUDE_DIR NAMES tbb/tbb.h)
+    FIND_LIBRARY (TBB_LIBRARIES NAMES ${tbb_lib})
+  endif()
+
+  include(FindPackageHandleStandardArgs)
+  find_package_handle_standard_args(TBB DEFAULT_MSG
+    TBB_LIBRARIES
+    TBB_INCLUDE_DIR
+  )
+
+  MARK_AS_ADVANCED(TBB_INCLUDE_DIR TBB_LIBRARIES)
+ENDIF()

+ 17 - 0
cmake_modules/commonSetup.cmake

@@ -92,6 +92,12 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
   option(USE_MEMCACHED "Enable Memcached support" ON)
   option(USE_REDIS "Enable Redis support" ON)
 
+  if (APPLE OR WIN32)
+      option(USE_TBB "Enable Threading Building Block support" OFF)
+  else()
+      option(USE_TBB "Enable Threading Building Block support" ON)
+  endif()
+
   option(USE_OPTIONAL "Automatically disable requested features with missing dependencies" ON)
 
   if ( USE_PYTHON OR USE_V8 OR USE_JNI OR USE_RINSIDE OR USE_SQLITE3 OR USE_MYSQL OR USE_CASSANDRA OR USE_MEMCACHED OR USE_REDIS)
@@ -690,6 +696,17 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
         add_definitions (-D_NO_APR)
       endif(USE_APR)
 
+      if(USE_TBB)
+        find_package(TBB)
+        if (TBB_FOUND)
+          add_definitions (-D_USE_TBB)
+        else()
+          message(FATAL_ERROR "TBB requested but package not found")
+        endif()
+      else()
+        set(TBB_INCLUDE_DIR "")
+      endif(USE_TBB)
+
   ENDIF()
   ###########################################################################
   ###

+ 1 - 1
cmake_modules/dependencies/lenny.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.34.1, libicu38, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, sudo, openssh-client, openssh-server, expect, libarchive, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.34.1, libicu38, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, sudo, openssh-client, openssh-server, expect, libarchive, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/lucid.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.40.0, libicu42, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.40.0, libicu42, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/natty.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.42.0, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.42.0, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/oneiric.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.46.1, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.46.1, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/precise.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.46.1, libicu48, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.46.1, libicu48, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/quantal.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/raring.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip, libtbb2")

+ 1 - 1
cmake_modules/dependencies/saucy.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.53.0, libicu48, libxalan-c111, libxerces-c3.1, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.53.0, libicu48, libxalan-c111, libxerces-c3.1, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/squeeze.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.42.0, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.42.0, libicu44, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive1, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/trusty.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.54.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.54.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, zip, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/utopic.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.55.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.55.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, python, libtbb2")

+ 1 - 1
cmake_modules/dependencies/vivid.cmake

@@ -1 +1 @@
-set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.55.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, python")
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.55.0, libicu52, libxslt1.1, libxml2, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive13, rsync, libapr1, libaprutil1, python, libtbb2")

+ 4 - 0
ecl/hql/hqlgram.y

@@ -11384,6 +11384,10 @@ sortItem
                             $$.setExpr(createExprAttribute(unstableAtom, $3.getExpr()));
                             $$.setPosition($1);
                         }
+    | PARALLEL
+                        {
+                            $$.setExpr(createAttribute(parallelAtom), $1);
+                        }
     | prefetchAttribute
     | expandedSortListByReference
     ;

+ 1 - 0
ecl/hql/hqlgram2.cpp

@@ -5693,6 +5693,7 @@ IHqlExpression * HqlGram::processSortList(const attribute & errpos, node_operato
                     if (attr == assertAtom) ok = true;
                     if (attr == stableAtom) ok = true;
                     if (attr == unstableAtom) ok = true;
+                    if (attr == parallelAtom) ok = true;
                     break;
                 case no_nwaymerge:
                     if (attr == localAtom) ok = true;

+ 2 - 0
ecl/hqlcpp/hqlhtcpp.cpp

@@ -16473,6 +16473,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySort(BuildCtx & ctx, IHqlExpre
         flags.append("|TAFspill");
     if (!method || method->isConstant())
         flags.append("|TAFconstant");
+    if (expr->hasAttribute(parallelAtom))
+        flags.append("|TAFparallel");
 
     if (method)
         doBuildVarStringFunction(instance->startctx, "getAlgorithm", method);

+ 23 - 1
ecl/hthor/hthor.cpp

@@ -3801,21 +3801,32 @@ void CHThorGroupSortActivity::createSorter()
         return;
     }
     if(stricmp(algoname, "quicksort") == 0)
+    {
         if((flags & TAFstable) != 0)
             sorter.setown(new CStableQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
         else
             sorter.setown(new CQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
+    }
     else if(stricmp(algoname, "parquicksort") == 0)
         sorter.setown(new CParallelStableQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
     else if(stricmp(algoname, "mergesort") == 0)
-        sorter.setown(new CStableMergeSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
+    {
+        if((flags & TAFparallel) != 0)
+            sorter.setown(new CParallelStableMergeSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
+        else
+            sorter.setown(new CStableMergeSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
+    }
+    else if(stricmp(algoname, "parmergesort") == 0)
+        sorter.setown(new CParallelStableMergeSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
     else if(stricmp(algoname, "heapsort") == 0)
         sorter.setown(new CHeapSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
     else if(stricmp(algoname, "insertionsort") == 0)
+    {
         if((flags & TAFstable) != 0)
             sorter.setown(new CStableInsertionSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
         else
             sorter.setown(new CInsertionSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
+    }
     else
     {
         StringBuffer sb;
@@ -4025,6 +4036,17 @@ void CStableMergeSorter::performSort()
     }
 }
 
+void CParallelStableMergeSorter::performSort()
+{
+    size32_t numRows = rowsToSort.numCommitted();
+    if (numRows)
+    {
+        const void * * rows = rowsToSort.getBlock(numRows);
+        parmsortvecstableinplace((void * *)rows, numRows, *compare, (void * *)index);
+        finger = 0;
+    }
+}
+
 // Heap sort
 
 void CHeapSorter::performSort()

+ 8 - 0
ecl/hthor/hthor.ipp

@@ -1164,6 +1164,14 @@ public:
     virtual void performSort();
 };
 
+class CParallelStableMergeSorter : public CStableSorter
+{
+public:
+    CParallelStableMergeSorter(ICompare * _compare, roxiemem::IRowManager * _rowManager, size32_t _initialSize, size32_t _commitDelta, roxiemem::IBufferedRowCallback * _rowCB) : CStableSorter(_compare, _rowManager, _initialSize, _commitDelta, _rowCB){}
+
+    virtual void performSort();
+};
+
 class CHeapSorter :  public CSimpleSorterBase
 {
 public:

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -1575,6 +1575,7 @@ enum
     TAFstable           = 0x0002,
     TAFunstable         = 0x0004,
     TAFspill            = 0x0008,
+    TAFparallel         = 0x0010,
 };
 
 struct IHThorSortArg : public IHThorArg

+ 6 - 1
system/jlib/CMakeLists.txt

@@ -71,7 +71,7 @@ set (    SRCS
          jstats.cpp 
          jstream.cpp 
          jstring.cpp 
-         jsuperhash.cpp 
+         jsuperhash.cpp
          jthread.cpp 
          jtime.cpp 
          junicode.cpp 
@@ -162,6 +162,7 @@ include_directories (
          ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file 
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
+         ${TBB_INCLUDE_DIR}
     )
 
 ADD_DEFINITIONS( -DLOGMSGCOMPONENT=1 -D_USRDLL -DJLIB_EXPORTS )
@@ -172,6 +173,10 @@ target_link_libraries ( jlib
         lzma
        )
 
+if ( ${USE_TBB} )
+   target_link_libraries ( jlib ${TBB_LIBRARIES})
+endif()
+
 if ( ${HAVE_LIBDL} )
 target_link_libraries ( jlib dl)
 endif ( ${HAVE_LIBDL} )

+ 316 - 36
system/jlib/jsort.cpp

@@ -15,7 +15,6 @@
     limitations under the License.
 ############################################################################## */
 
-
 #include "platform.h"
 #include <string.h>
 #include <limits.h>
@@ -26,6 +25,12 @@
 #include "jfile.hpp"
 #include "jthread.hpp"
 #include "jqueue.tpp"
+#include "jset.hpp"
+
+#ifdef _USE_TBB
+#include "tbb/task.h"
+#include "tbb/task_scheduler_init.h"
+#endif
 
 #ifdef _DEBUG
 // #define PARANOID
@@ -623,6 +628,109 @@ void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare
 
 //-----------------------------------------------------------------------------------------------------------------------------
 
+inline void * * mergePartitions(const ICompare & compare, void * * result, unsigned n1, void * * ret1, unsigned n2, void * * ret2)
+{
+    void * * tgt = result;
+    loop
+    {
+       if (compare.docompare(*ret1, *ret2) <= 0)
+       {
+           *tgt++ = *ret1++;
+           if (--n1 == 0)
+           {
+               //There must be at least one row in the right partition - copy any that remain
+               do
+               {
+                   *tgt++ = *ret2++;
+               } while (--n2);
+               return result;
+           }
+       }
+       else
+       {
+           *tgt++ = *ret2++;
+           if (--n2 == 0)
+           {
+               //There must be at least one row in the left partition - copy any that remain
+               do
+               {
+                   *tgt++ = *ret1++;
+               } while (--n1);
+               return result;
+           }
+       }
+    }
+}
+
+inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
+{
+    void * * tgt = result;
+    while (n--)
+    {
+       if (compare.docompare(*ret1, *ret2) <= 0)
+       {
+           *tgt++ = *ret1++;
+           if (--n1 == 0)
+           {
+               while (n--)
+               {
+                   *tgt++ = *ret2++;
+               }
+               return result;
+           }
+       }
+       else
+       {
+           *tgt++ = *ret2++;
+           if (--n2 == 0)
+           {
+               while (n--)
+               {
+                   *tgt++ = *ret1++;
+               }
+               return result;
+           }
+       }
+    }
+    return result;
+}
+
+inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
+{
+    void * * tgt = result+n1+n2-1;
+    ret1 += (n1-1);
+    ret2 += (n2-1);
+    while (n--)
+    {
+       if (compare.docompare(*ret1, *ret2) >= 0)
+       {
+           *tgt-- = *ret1--;
+           if (--n1 == 0)
+           {
+               while (n--)
+               {
+                   *tgt-- = *ret2--;
+               }
+               return result;
+           }
+       }
+       else
+       {
+           *tgt-- = *ret2--;
+           if (--n2 == 0)
+           {
+               //There must be at least one row in the left partition - copy any that remain
+               while (n--)
+               {
+                   *tgt-- = *ret1--;
+               }
+               return result;
+           }
+       }
+    }
+    return result;
+}
+
 static void * * mergeSort(void ** rows, size32_t n, const ICompare & compare, void ** tmp, unsigned depth)
 {
     void * * result = (depth & 1) ? tmp : rows;
@@ -660,37 +768,7 @@ static void * * mergeSort(void ** rows, size32_t n, const ICompare & compare, vo
     void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
     dbgassertex(ret2 == ret1 + n1);
     dbgassertex(ret2 != result);
-    void * * tgt = result;
-
-    loop
-    {
-       if (compare.docompare(*ret1, *ret2) <= 0)
-       {
-           *tgt++ = *ret1++;
-           if (--n1 == 0)
-           {
-               //There must be at least one row in the right partition - copy any that remain
-               do
-               {
-                   *tgt++ = *ret2++;
-               } while (--n2);
-               return result;
-           }
-       }
-       else
-       {
-           *tgt++ = *ret2++;
-           if (--n2 == 0)
-           {
-               //There must be at least one row in the left partition - copy any that remain
-               do
-               {
-                   *tgt++ = *ret1++;
-               } while (--n1);
-               return result;
-           }
-       }
-    }
+    return mergePartitions(compare, result, n1, ret1, n2, ret2);
 }
 
 
@@ -703,6 +781,210 @@ void msortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, v
 
 //=========================================================================
 
+//These constants are probably architecture and number of core dependent
+static const size_t singleThreadedMSortThreshold = 2000;
+static const size_t multiThreadedBlockThreshold = 64;       // must be at least 2!
+
+#ifdef _USE_TBB
+using tbb::task;
+class TbbParallelMergeSorter
+{
+    class SplitTask : public tbb::task
+    {
+    public:
+        SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
+        {
+        }
+
+        virtual task * execute()
+        {
+            if (next1->decrement_ref_count() == 0)
+                spawn(*next1);
+            if (next2->decrement_ref_count() == 0)
+                return next2;
+            return NULL;
+        }
+    protected:
+        task * next1;
+        task * next2;
+    };
+
+    class BisectTask : public tbb::task
+    {
+    public:
+        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size32_t _n, void ** _temp, unsigned _depth, task * _next)
+        : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth), next(_next)
+        {
+        }
+        virtual task * execute()
+        {
+            loop
+            {
+                //On entry next is assumed to be used once by this function
+                if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
+                {
+                    //Create a new task rather than calling sort directly, so that the successor is set up correctly
+                    //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
+                    task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
+                    return sort;
+                }
+
+                void * * result = (depth & 1) ? temp : rows;
+                void * * src = (depth & 1) ? rows : temp;
+                unsigned n1 = (n+1)/2;
+                unsigned n2 = n-n1;
+                task * mergeTask;
+                if (depth < sorter.parallelMergeDepth)
+                {
+                    task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
+                    mergeFwdTask->set_ref_count(1);
+                    task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
+                    mergeRevTask->set_ref_count(1);
+                    mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
+                }
+                else
+                {
+                    mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
+                }
+
+                mergeTask->set_ref_count(2);
+                task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
+                spawn(*bisectRightTask);
+
+                //recurse directly on the left side rather than creating a new task
+                n = n1;
+                depth = depth+1;
+                next = mergeTask;
+            }
+        }
+    protected:
+        TbbParallelMergeSorter & sorter;
+        void ** rows;
+        void ** temp;
+        task * next;
+        size32_t n;
+        unsigned depth;
+    };
+
+
+    class SubSortTask : public tbb::task
+    {
+    public:
+        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size32_t _n, void ** _temp, unsigned _depth)
+        : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth)
+        {
+        }
+
+        virtual task * execute()
+        {
+            mergeSort(rows, n, sorter.compare, temp, depth);
+            return NULL;
+        }
+    protected:
+        TbbParallelMergeSorter & sorter;
+        void ** rows;
+        void ** temp;
+        size32_t n;
+        unsigned depth;
+    };
+
+
+    class MergeTask : public tbb::task
+    {
+    public:
+        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size32_t _n)
+        : compare(_compare),result(_result), n1(_n1), src1(_src1), n2(_n2), src2(_src2), n(_n)
+        {
+        }
+
+        virtual task * execute()
+        {
+            mergePartitions(compare, result, n1, src1, n2, src2, n);
+            return NULL;
+        }
+
+    protected:
+        const ICompare & compare;
+        void * * result;
+        void * * src1;
+        void * * src2;
+        size_t n1;
+        size_t n2;
+        size_t n;
+    };
+
+    class MergeRevTask : public MergeTask
+    {
+    public:
+        MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
+        : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
+        {
+        }
+
+        virtual task * execute()
+        {
+            mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
+            return NULL;
+        }
+    };
+
+public:
+    TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
+    {
+        //The following constants control the number of iterations to be performed in parallel.
+        //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
+        //The following constants should possibly be tuned on each platform.  The following gave a good balance on a 2x8way xeon
+        const unsigned extraBisectDepth = 3;
+        const unsigned extraParallelMergeDepth = 3;
+
+        unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
+        unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
+        assertex(numCpus <= (1U << ln2NumCpus));
+
+        //Merge in parallel once it is likely to be beneficial
+        parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
+        //Aim to execute in parallel until the width is 8*the maximum number of parallel task
+        singleThreadDepth = ln2NumCpus + extraBisectDepth;
+    }
+
+    void sortRoot(void ** rows, size32_t n, void ** temp)
+    {
+        task * end = new (task::allocate_root()) tbb::empty_task();
+        end->set_ref_count(1+1);
+        task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
+        end->spawn(*task);
+        end->wait_for_all();
+        end->destroy(*end);
+    }
+
+public:
+    const ICompare & compare;
+    unsigned singleThreadDepth;
+    unsigned parallelMergeDepth;
+    void * * baseRows;
+};
+
+//-------------------------------------------------------------------------------------------------------------------
+void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+{
+    if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
+    {
+        msortvecstableinplace(rows, n, compare, temp);
+        return;
+    }
+
+    TbbParallelMergeSorter sorter(rows, compare);
+    sorter.sortRoot(rows, n, temp);
+}
+#else
+void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+{
+    parqsortvecstableinplace(rows, n, compare, temp, ncpus);
+}
+#endif
+
+//=========================================================================
+
 bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
 {
     bool nochange = true;
@@ -810,7 +1092,7 @@ class CRowStreamMerger
     unsigned *mergeheap;
     unsigned activeInputs; 
     count_t recno;
-    ICompare *icmp;
+    const ICompare *icmp;
     bool partdedup;
 
     IRowProvider &provider;
@@ -982,7 +1264,7 @@ class CRowStreamMerger
     }
 
 public:
-    CRowStreamMerger(IRowProvider &_provider,unsigned numstreams,ICompare *_icmp,bool _partdedup=false)
+    CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
         : provider(_provider)
     {
         partdedup = _partdedup;
@@ -1147,5 +1429,3 @@ IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICo
 {
     return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
 }
-
-

+ 1 - 0
system/jlib/jsort.hpp

@@ -77,6 +77,7 @@ extern jlib_decl void qsortvec(void **a, size32_t n, const ICompare & compare1,
 // Call with n rows of data in rows, index an (uninitialized) array of size n. The function will fill index with a stably sorted index into rows.
 extern jlib_decl void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
 extern jlib_decl void msortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
+extern jlib_decl void parmsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
 
 
 extern jlib_decl void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned ncpus=0); // runs in parallel on multi-core