Pārlūkot izejas kodu

Merge pull request #15772 from ghalliday/taskManager

HPCC-10212 Implement a task manager and use for a new parallel sort

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 gadi atpakaļ
vecāks
revīzija
0c2821ef6b

+ 42 - 0
common/thorhelper/roxiehelper.cpp

@@ -710,6 +710,23 @@ public:
     }
     }
 };
 };
 
 
+class CParallelTaskQuickSortAlgorithm : public CInplaceSortAlgorithm
+{
+public:
+    CParallelTaskQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
+
+    virtual void prepare(IEngineRowStream *input)
+    {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+        {
+            cycle_t startCycles = get_cycles_now();
+            taskqsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
+            elapsedCycles += (get_cycles_now() - startCycles);
+        }
+    }
+};
+
 class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
 class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
 {
 {
 public:
 public:
@@ -772,6 +789,17 @@ public:
     }
     }
 };
 };
 
 
+class CParallelTaskStableQuickSortAlgorithm : public CStableInplaceSortAlgorithm
+{
+public:
+    CParallelTaskStableQuickSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp)
+    {
+        taskqsortvecstableinplace(rows, numRows, *compare, temp);
+    }
+};
+
 class CMergeSortAlgorithm : public CStableInplaceSortAlgorithm
 class CMergeSortAlgorithm : public CStableInplaceSortAlgorithm
 {
 {
 public:
 public:
@@ -1227,6 +1255,11 @@ extern ISortAlgorithm *createParallelQuickSortAlgorithm(ICompare *_compare)
     return new CParallelQuickSortAlgorithm(_compare);
     return new CParallelQuickSortAlgorithm(_compare);
 }
 }
 
 
+extern ISortAlgorithm *createParallelTaskQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CParallelTaskQuickSortAlgorithm(_compare);
+}
+
 extern ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare)
 extern ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare)
 {
 {
     return new CStableQuickSortAlgorithm(_compare);
     return new CStableQuickSortAlgorithm(_compare);
@@ -1237,6 +1270,11 @@ extern ISortAlgorithm *createParallelStableQuickSortAlgorithm(ICompare *_compare
     return new CParallelStableQuickSortAlgorithm(_compare);
     return new CParallelStableQuickSortAlgorithm(_compare);
 }
 }
 
 
+extern ISortAlgorithm *createParallelTaskStableQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CParallelTaskStableQuickSortAlgorithm(_compare);
+}
+
 extern ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare)
 extern ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare)
 {
 {
     return new CTbbQuickSortAlgorithm(_compare);
     return new CTbbQuickSortAlgorithm(_compare);
@@ -1281,6 +1319,10 @@ extern ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm _algorithm, ICompa
         return createParallelQuickSortAlgorithm(_compare);
         return createParallelQuickSortAlgorithm(_compare);
     case parallelStableQuickSortAlgorithm:
     case parallelStableQuickSortAlgorithm:
         return createParallelStableQuickSortAlgorithm(_compare);
         return createParallelStableQuickSortAlgorithm(_compare);
+    case parallelTaskQuickSortAlgorithm:
+        return createParallelTaskQuickSortAlgorithm(_compare);
+    case parallelTaskStableQuickSortAlgorithm:
+        return createParallelTaskStableQuickSortAlgorithm(_compare);
     case spillingQuickSortAlgorithm:
     case spillingQuickSortAlgorithm:
     case stableSpillingQuickSortAlgorithm:
     case stableSpillingQuickSortAlgorithm:
         return createSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _algorithm==stableSpillingQuickSortAlgorithm);
         return createSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _algorithm==stableSpillingQuickSortAlgorithm);

+ 4 - 0
common/thorhelper/roxiehelper.hpp

@@ -309,6 +309,8 @@ typedef enum {
     tbbStableQuickSortAlgorithm,        // stable version of tbbQuickSortAlgorithm
     tbbStableQuickSortAlgorithm,        // stable version of tbbQuickSortAlgorithm
     parallelQuickSortAlgorithm,         // parallel version of the internal quicksort implementation (for comparison)
     parallelQuickSortAlgorithm,         // parallel version of the internal quicksort implementation (for comparison)
     parallelStableQuickSortAlgorithm,   // stable version of parallelQuickSortAlgorithm
     parallelStableQuickSortAlgorithm,   // stable version of parallelQuickSortAlgorithm
+    parallelTaskQuickSortAlgorithm,      // task based parallel version of the internal quicksort implementation (for comparison)
+    parallelTaskStableQuickSortAlgorithm,// task based stable version of parallelQuickSortAlgorithm
     unknownSortAlgorithm
     unknownSortAlgorithm
 } RoxieSortAlgorithm;
 } RoxieSortAlgorithm;
 
 
@@ -331,6 +333,8 @@ extern THORHELPER_API ISortAlgorithm *createMergeSortAlgorithm(ICompare *_compar
 extern THORHELPER_API ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare);
 extern THORHELPER_API ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createParallelTaskQuickSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createParallelTaskStableQuickSortAlgorithm(ICompare *_compare);
 
 
 extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
 extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
 
 

+ 29 - 0
ecl/hthor/hthor.cpp

@@ -4056,6 +4056,13 @@ void CHThorGroupSortActivity::createSorter()
         else
         else
             sorter.setown(new CParallelQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
             sorter.setown(new CParallelQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
     }
     }
+    else if(stricmp(algoname, "taskquicksort") == 0)
+    {
+        if((flags & TAFstable) != 0)
+            sorter.setown(new CParallelTaskStableQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep, this));
+        else
+            sorter.setown(new CParallelTaskQuickSorter(helper.queryCompare(), queryRowManager(), InitialSortElements, CommitStep));
+    }
     else if(stricmp(algoname, "mergesort") == 0)
     else if(stricmp(algoname, "mergesort") == 0)
     {
     {
         if((flags & TAFparallel) != 0)
         if((flags & TAFparallel) != 0)
@@ -4224,6 +4231,17 @@ void CParallelQuickSorter::performSort()
     }
     }
 }
 }
 
 
+void CParallelTaskQuickSorter::performSort()
+{
+    size32_t numRows = rowsToSort.numCommitted();
+    if (numRows)
+    {
+        const void * * rows = rowsToSort.getBlock(numRows);
+        taskqsortvec((void * *)rows, numRows, *compare);
+        finger = 0;
+    }
+}
+
 // StableQuick sort
 // StableQuick sort
 
 
 bool CStableSorter::addRow(const void * next)
 bool CStableSorter::addRow(const void * next)
@@ -4289,6 +4307,17 @@ void CParallelStableQuickSorter::performSort()
     }
     }
 }
 }
 
 
+void CParallelTaskStableQuickSorter::performSort()
+{
+    size32_t numRows = rowsToSort.numCommitted();
+    if (numRows)
+    {
+        const void * * rows = rowsToSort.getBlock(numRows);
+        taskqsortvecstableinplace((void * *)rows, numRows, *compare, (void * *)index);
+        finger = 0;
+    }
+}
+
 // StableMerge sort
 // StableMerge sort
 
 
 void CStableMergeSorter::performSort()
 void CStableMergeSorter::performSort()

+ 15 - 0
ecl/hthor/hthor.ipp

@@ -1169,6 +1169,13 @@ public:
     virtual void performSort();
     virtual void performSort();
 };
 };
 
 
+class CParallelTaskQuickSorter : public CSimpleSorterBase
+{
+public:
+    CParallelTaskQuickSorter(ICompare * _compare, roxiemem::IRowManager * _rowManager, size32_t _initialSize, size32_t _commitDelta) : CSimpleSorterBase(_compare, _rowManager, _initialSize, _commitDelta) {}
+    virtual void performSort();
+};
+
 class CStableSorter : public CSimpleSorterBase
 class CStableSorter : public CSimpleSorterBase
 {
 {
 public:
 public:
@@ -1202,6 +1209,14 @@ public:
     virtual void performSort();
     virtual void performSort();
 };
 };
 
 
+class CParallelTaskStableQuickSorter : public CStableSorter
+{
+public:
+    CParallelTaskStableQuickSorter(ICompare * _compare, roxiemem::IRowManager * _rowManager, size32_t _initialSize, size32_t _commitDelta, roxiemem::IBufferedRowCallback * _rowCB) : CStableSorter(_compare, _rowManager, _initialSize, _commitDelta, _rowCB){}
+
+    virtual void performSort();
+};
+
 class CStableMergeSorter : public CStableSorter
 class CStableMergeSorter : public CStableSorter
 {
 {
 public:
 public:

+ 9 - 0
roxie/ccd/ccdserver.cpp

@@ -8318,6 +8318,15 @@ public:
             default: throwUnexpected();
             default: throwUnexpected();
             }
             }
         }
         }
+        else if (stricmp(algorithmName, "taskquicksort")==0)
+        {
+            switch (sortFlags & TAFstable)
+            {
+            case 0: sortAlgorithm = parallelTaskQuickSortAlgorithm; break;
+            case TAFstable: sortAlgorithm = parallelTaskStableQuickSortAlgorithm; break;
+            default: throwUnexpected();
+            }
+        }
         else if (stricmp(algorithmName, "heapsort")==0)
         else if (stricmp(algorithmName, "heapsort")==0)
             sortAlgorithm = heapSortAlgorithm; // NOTE - we do allow UNSTABLE('heapsort') in order to facilitate runtime selection. Also explicit selection of heapsort overrides request to spill
             sortAlgorithm = heapSortAlgorithm; // NOTE - we do allow UNSTABLE('heapsort') in order to facilitate runtime selection. Also explicit selection of heapsort overrides request to spill
         else if (stricmp(algorithmName, "mergesort")==0)
         else if (stricmp(algorithmName, "mergesort")==0)

+ 1 - 116
system/jhtree/jhutil.hpp

@@ -19,124 +19,9 @@
 #define JHUTIL_HPP
 #define JHUTIL_HPP
 
 
 #include "jlib.hpp"
 #include "jlib.hpp"
-#include "jqueue.tpp"
+#include "jqueue.hpp"
 #include "jhtree.hpp"
 #include "jhtree.hpp"
 
 
-//Implementation of a queue using a doubly linked list.  Should possibly move to jlib if it would be generally useful.
-//Currently assumes next and prev fields in the element can be used to maintain the list
-template <class ELEMENT>
-class DListOf
-{
-    typedef DListOf<ELEMENT> SELF;
-    ELEMENT * pHead = nullptr;
-    ELEMENT * pTail = nullptr;
-    unsigned numEntries = 0;
-
-public:
-    void enqueueHead(ELEMENT * element)
-    {
-        assertex(!element->next && !element->prev);
-        if (pHead)
-        {
-            pHead->prev = element;
-            element->next = pHead;
-            pHead = element;
-        }
-        else
-        {
-            pHead = pTail = element;
-        }
-        numEntries++;
-    }
-    void enqueue(ELEMENT * element)
-    {
-        assertex(!element->next && !element->prev);
-        if (pTail)
-        {
-            pTail->next = element;
-            element->prev = pTail;
-            pTail = element;
-        }
-        else
-        {
-            pHead = pTail = element;
-        }
-        numEntries++;
-    }
-    ELEMENT *head() const { return pHead; }
-    ELEMENT *tail() const { return pTail; }
-    void remove(ELEMENT *element)
-    {
-        ELEMENT * next = element->next;
-        ELEMENT * prev = element->prev;
-        assertex(prev || next || element == pHead);
-        if (element == pHead)
-            pHead = next;
-        if (element == pTail)
-            pTail = prev;
-        if (next)
-            next->prev = prev;
-        if (prev)
-            prev->next = next;
-        element->next = nullptr;
-        element->prev = nullptr;
-        numEntries--;
-    }
-    ELEMENT *dequeue()
-    {
-        if (!pHead)
-            return nullptr;
-        ELEMENT * element = pHead;
-        ELEMENT * next = element->next;
-        pHead = next;
-        if (element == pTail)
-            pTail = nullptr;
-        if (next)
-            next->prev = nullptr;
-        element->next = nullptr;
-        numEntries--;
-        return element;
-    }
-    ELEMENT *dequeueTail()
-    {
-        if (!pTail)
-            return nullptr;
-        ELEMENT * element = pTail;
-        ELEMENT * prev = element->prev;
-        pTail = prev;
-        if (element == pHead)
-            pHead = nullptr;
-        if (prev)
-            prev->next = nullptr;
-        element->prev = nullptr;
-        numEntries--;
-        return element;
-    }
-    void dequeue(ELEMENT *element)
-    {
-        remove(element);
-    }
-    inline unsigned ordinality() const { return numEntries; }
-    //Check that the linked list is self-consistent.  For debugging potential issues.
-    void validate() const
-    {
-        ELEMENT * prev = nullptr;
-        ELEMENT * cur = pHead;
-        unsigned count = 0;
-        while (cur)
-        {
-            assertex(cur->prev == prev);
-            prev = cur;
-            cur = cur->next;
-            count++;
-        }
-        assertex(prev == pTail);
-        assertex(count == numEntries);
-    }
-};
-
-
-
 // TABLE should be SuperHashTable derivative to contain MAPPING's
 // TABLE should be SuperHashTable derivative to contain MAPPING's
 // MAPPING should be something that constructs with (KEY, ENTRY) and impl. query returning ref. to ENTRY
 // MAPPING should be something that constructs with (KEY, ENTRY) and impl. query returning ref. to ENTRY
 template <class KEY, class ENTRY, class MAPPING, class TABLE>
 template <class KEY, class ENTRY, class MAPPING, class TABLE>

+ 3 - 0
system/jlib/CMakeLists.txt

@@ -87,10 +87,12 @@ set (    SRCS
          jsmartsock.cpp
          jsmartsock.cpp
          jsocket.cpp
          jsocket.cpp
          jsort.cpp
          jsort.cpp
+         jsort2.cpp
          jstats.cpp
          jstats.cpp
          jstream.cpp
          jstream.cpp
          jstring.cpp
          jstring.cpp
          jsuperhash.cpp
          jsuperhash.cpp
+         jtask.cpp
          jthread.cpp
          jthread.cpp
          jtime.cpp
          jtime.cpp
          junicode.cpp
          junicode.cpp
@@ -170,6 +172,7 @@ set (    INCLUDES
         jstream.ipp
         jstream.ipp
         jstring.hpp
         jstring.hpp
         jsuperhash.hpp
         jsuperhash.hpp
+        jtask.hpp
         jthread.hpp
         jthread.hpp
         jtime.hpp
         jtime.hpp
         jtime.ipp
         jtime.ipp

+ 115 - 0
system/jlib/jqueue.hpp

@@ -406,4 +406,119 @@ protected:
     std::atomic<state_t> state;
     std::atomic<state_t> state;
 };
 };
 
 
+//Implementation of a queue using a doubly linked list.
+//Currently assumes next and prev fields in the element can be used to maintain the list
+template <class ELEMENT>
+class DListOf
+{
+    typedef DListOf<ELEMENT> SELF;
+    ELEMENT * pHead = nullptr;
+    ELEMENT * pTail = nullptr;
+    unsigned numEntries = 0;
+
+public:
+    void enqueueHead(ELEMENT * element)
+    {
+        assertex(!element->next && !element->prev);
+        if (pHead)
+        {
+            pHead->prev = element;
+            element->next = pHead;
+            pHead = element;
+        }
+        else
+        {
+            pHead = pTail = element;
+        }
+        numEntries++;
+    }
+    void enqueue(ELEMENT * element)
+    {
+        assertex(!element->next && !element->prev);
+        if (pTail)
+        {
+            pTail->next = element;
+            element->prev = pTail;
+            pTail = element;
+        }
+        else
+        {
+            pHead = pTail = element;
+        }
+        numEntries++;
+    }
+    ELEMENT *head() const { return pHead; }
+    ELEMENT *tail() const { return pTail; }
+    void remove(ELEMENT *element)
+    {
+        ELEMENT * next = element->next;
+        ELEMENT * prev = element->prev;
+        assertex(prev || next || element == pHead);
+        if (element == pHead)
+            pHead = next;
+        if (element == pTail)
+            pTail = prev;
+        if (next)
+            next->prev = prev;
+        if (prev)
+            prev->next = next;
+        element->next = nullptr;
+        element->prev = nullptr;
+        numEntries--;
+    }
+    ELEMENT *dequeue()
+    {
+        if (!pHead)
+            return nullptr;
+        ELEMENT * element = pHead;
+        ELEMENT * next = element->next;
+        pHead = next;
+        if (element == pTail)
+            pTail = nullptr;
+        if (next)
+            next->prev = nullptr;
+        element->next = nullptr;
+        numEntries--;
+        return element;
+    }
+    ELEMENT *dequeueTail()
+    {
+        if (!pTail)
+            return nullptr;
+        ELEMENT * element = pTail;
+        ELEMENT * prev = element->prev;
+        pTail = prev;
+        if (element == pHead)
+            pHead = nullptr;
+        if (prev)
+            prev->next = nullptr;
+        element->prev = nullptr;
+        numEntries--;
+        return element;
+    }
+    void dequeue(ELEMENT *element)
+    {
+        remove(element);
+    }
+    bool isEmpty() const { return numEntries == 0; }
+    inline unsigned ordinality() const { return numEntries; }
+    //Check that the linked list is self-consistent.  For debugging potential issues.
+    void validate() const
+    {
+        ELEMENT * prev = nullptr;
+        ELEMENT * cur = pHead;
+        unsigned count = 0;
+        while (cur)
+        {
+            assertex(cur->prev == prev);
+            prev = cur;
+            cur = cur->next;
+            count++;
+        }
+        assertex(prev == pTail);
+        assertex(count == numEntries);
+    }
+};
+
+
 #endif
 #endif

+ 3 - 0
system/jlib/jsort.hpp

@@ -81,6 +81,9 @@ extern jlib_decl void parqsortvec(void **a, size32_t n, const ICompare & compare
 extern jlib_decl void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0); // runs in parallel on multi-core
 extern jlib_decl void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned ncpus=0); // runs in parallel on multi-core
 
 
 
 
+extern jlib_decl void taskqsortvec(void **a, size32_t n, const ICompare & compare); // runs in parallel on multi-core
+extern jlib_decl void taskqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp);
+
 // we define the heap property that no element c should be smaller than its parent (unsigned)(c-1)/2
 // we define the heap property that no element c should be smaller than its parent (unsigned)(c-1)/2
 // heap stores indexes into the data in rows, so compare->docompare is called with arguments rows[heap[i]]
 // heap stores indexes into the data in rows, so compare->docompare is called with arguments rows[heap[i]]
 // these functions are stable
 // these functions are stable

+ 351 - 0
system/jlib/jsort2.cpp

@@ -0,0 +1,351 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include <string.h>
+#include <limits.h>
+#include "jsort.hpp"
+#include "jio.hpp"
+#include "jmisc.hpp"
+#include "jexcept.hpp"
+#include "jfile.hpp"
+#include "jthread.hpp"
+#include "jqueue.tpp"
+#include "jset.hpp"
+#include "jutil.hpp"
+#include "jtask.hpp"
+
+#ifdef _DEBUG
+// #define PARANOID
+//#define TESTPARSORT
+//#define MCMERGESTATS
+#endif
+
+//#define PARANOID_PARTITION
+//#define TRACE_PARTITION
+
+#define PARALLEL_GRANULARITY 512            // Worth creating more tasks up to this point, should really base on number of threads and recursion depth
+#define PARALLEL_THRESHOLD  8096            // Threshold for it being worth sorting in parallel
+
+typedef void *  ELEMENT;
+typedef void ** _VECTOR;   // bit messy but allow to be redefined later
+#define VECTOR _VECTOR
+
+static inline void swap(VECTOR a, VECTOR b)  { ELEMENT t = *a;  *a = *b; *b = t; }
+#define SWAP swap
+
+static bool sortParallel()
+{
+#ifdef TESTPARSORT
+    return true;        // to test
+#else
+    unsigned numCPUs = getAffinityCpus();
+    return (numCPUs>1);
+#endif
+}
+
+//---------------------------------------------------------------------------
+
+#define CMP(a,b)         (compare.docompare(*(a),*(b)))
+#define MED3(a,b,c)      med3ic(a,b,c,compare)
+#define RECURSE(a,b)     qsortvec(a, b, compare)
+static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
+{
+  return CMP(a, b) < 0 ?
+      (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
+    : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
+}
+
+class cTaskQSortBase
+{
+    friend class CSubSortTask;
+
+    ITaskScheduler & taskScheduler;
+    Owned<CCompletionTask> finished;
+
+    class CSubSortTask : public CTask
+    {
+    public:
+        CSubSortTask(cTaskQSortBase * _parent, unsigned _start, unsigned _num) :
+             CTask(0), parent(_parent), start(_start), num(_num)
+        {
+        }
+
+        virtual CTask * execute() override
+        {
+            //MORE: Does this need a memory fence to ensure that writes from other threads are updated in the cache?
+            parent->doSubSort(start, num);
+            return checkNextTask();
+        }
+
+    protected:
+        cTaskQSortBase * parent;
+        unsigned start;
+        unsigned num;
+    };
+
+public:
+
+    cTaskQSortBase() : taskScheduler(queryTaskScheduler()), finished(new CCompletionTask(1, queryTaskScheduler()))
+    {
+    }
+
+    void sort(unsigned n)
+    {
+        enqueueSort(0, n);
+        finished->decAndWait();
+    }
+
+private:
+    //MORE: Not really sure what this should do...
+    void abort()
+    {
+        notifyPredDone(finished);
+    }
+
+    void doSubSort(unsigned s, unsigned n)
+    {
+        while (n > PARALLEL_GRANULARITY)
+        {
+            unsigned r1;
+            unsigned r2;
+            partition(s, n, r1, r2);
+            unsigned n2 = n+s-r2;
+            if (r1==s) {
+                n = n2;
+                s = r2;
+            }
+            else {
+                if (n2!=0)
+                    enqueueSort(r2, n2);
+                n = r1-s;
+            }
+        }
+        serialsort(s,n);
+        notifyPredDone(finished);
+    }
+
+    void enqueueSort(unsigned from, unsigned num)
+    {
+        CSubSortTask * task = new CSubSortTask(this, from, num);
+        finished->addPred();
+        enqueueOwnedTask(taskScheduler, task);
+    }
+
+public:
+    virtual void serialsort(unsigned from, unsigned len)=0;
+    virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
+};
+
+#define DOPARTITION                                 \
+        VECTOR a = array+s;                         \
+        VECTOR pm = a + (n / 2);                    \
+        VECTOR pl = a;                              \
+        VECTOR pn = a + (n - 1) ;                   \
+        if (n > 40) {                               \
+            unsigned d = (n / 8);                   \
+            pl = MED3(pl, pl + d, pl + 2 * d);      \
+            pm = MED3(pm - d, pm, pm + d);          \
+            pn = MED3(pn - 2 * d, pn - d, pn);      \
+        }                                           \
+        pm = MED3(pl, pm, pn);                      \
+        SWAP(a, pm);                                \
+        VECTOR pa = a + 1;                          \
+        VECTOR pb = pa;                             \
+        VECTOR pc = a + (n - 1);                    \
+        VECTOR pd = pc;                             \
+        int r;                                      \
+        for (;;) {                                  \
+            while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
+                if (r == 0) {                       \
+                    SWAP(pa, pb);                   \
+                    pa++;                           \
+                }                                   \
+                pb++;                               \
+            }                                       \
+            while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
+                if (r == 0) {                       \
+                    SWAP(pc, pd);                   \
+                    pd--;                           \
+                }                                   \
+                pc--;                               \
+            }                                       \
+            if (pb > pc)                            \
+                break;                              \
+            SWAP(pb, pc);                           \
+            pb++;                                   \
+            pc--;                                   \
+        }                                           \
+        pn = a + n;                                 \
+        r = MIN(pa - a, pb - pa);                   \
+        VECTOR v1 = a;                              \
+        VECTOR v2 = pb-r;                           \
+        while (r) {                                 \
+            SWAP(v1,v2); v1++; v2++; r--;           \
+        };                                          \
+        r = MIN(pd - pc, pn - pd - 1);              \
+        v1 = pb;                                    \
+        v2 = pn-r;                                  \
+        while (r) {                                 \
+            SWAP(v1,v2); v1++; v2++; r--;           \
+        };                                          \
+        r1 = (pb-pa)+s;                             \
+        r2 = n-(pd-pc)+s;
+
+
+class cTaskQSort: public cTaskQSortBase
+{
+    VECTOR array;
+    const ICompare &compare;
+
+    void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
+    {
+        DOPARTITION
+    }
+
+    void serialsort(unsigned from, unsigned len)
+    {
+        qsortvec(array+from,len,compare);
+    }
+
+public:
+
+    cTaskQSort(VECTOR _a,const ICompare &_compare)
+        : compare(_compare)
+    {
+        array = _a;
+    }
+
+};
+
+
+void taskqsortvec(void **a, size32_t n, const ICompare & compare)
+{
+    if ((n<=PARALLEL_THRESHOLD)||!sortParallel()) {
+        qsortvec(a,n,compare);
+        return;
+    }
+    cTaskQSort sorter(a,compare);
+    sorter.sort(n);
+
+#ifdef TESTPARSORT
+    for (unsigned i=1;i<n;i++)
+        if (compare.docompare(a[i-1],a[i])>0)
+            IERRLOG("taskqsortvec failed %d",i);
+#endif
+
+}
+
+
+#undef CMP
+#undef MED3
+#undef RECURSE
+
+//---------------------------------------------------------------------------
+
+#undef VECTOR
+#undef SWAP
+typedef void *** _IVECTOR;
+#define VECTOR _IVECTOR
+static inline void swapind(VECTOR a, VECTOR b)  { void ** t = *a;  *a = *b; *b = t; }
+#define SWAP swapind
+
+
+#define CMP(a,b)         cmpicindstable(a,b,compare)
+
+static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
+{
+    int ret = compare.docompare(**a,**b);
+    if (ret==0)
+    {
+        if (*a>*b)
+            ret = 1;
+        else if (*a<*b)
+            ret = -1;
+    }
+    return ret;
+}
+
+#define MED3(a,b,c)      med3ic(a,b,c,compare)
+#define RECURSE(a,b)     doqsortvecstable(a, b, compare)
+static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
+{
+  return CMP(a, b) < 0 ?
+      (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
+    : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
+}
+
+
+static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
+#include "jsort2.inc"
+
+class cTaskQSortStable: public cTaskQSortBase
+{
+    VECTOR array;
+    const ICompare &compare;
+
+    void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
+    {
+        DOPARTITION
+    }
+
+    void serialsort(unsigned from, unsigned len)
+    {
+        doqsortvecstable(array+from,len,compare);
+    }
+
+public:
+
+    cTaskQSortStable(VECTOR _a,const ICompare &_compare)
+        : cTaskQSortBase(),compare(_compare)
+    {
+        array = _a;
+    }
+};
+
+
+
+#undef CMP
+#undef CMP1
+#undef MED3
+#undef RECURSE
+#undef VECTOR
+
+static void taskqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index)
+{
+    for(unsigned i=0; i<n; ++i)
+        index[i] = rows+i;
+    if ((n<=PARALLEL_THRESHOLD)||!sortParallel()) {
+        doqsortvecstable(index,n,compare);
+        return;
+    }
+    cTaskQSortStable sorter(index,compare);
+    sorter.sort(n);
+}
+
+
+void taskqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
+{
+    memcpy(temp, rows, n * sizeof(void*));
+
+    taskqsortvecstable(temp, n, compare, (void * * *)rows);
+
+    //I'm sure this violates the aliasing rules...
+    void * * * rowsAsIndex = (void * * *)rows;
+    for(size32_t i=0; i<n; ++i)
+        rows[i] = *rowsAsIndex[i];
+}

+ 520 - 0
system/jlib/jtask.cpp

@@ -0,0 +1,520 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include <string.h>
+#include <limits.h>
+#include "jtask.hpp"
+#include "jlog.hpp"
+#include "jqueue.hpp"
+
+static std::atomic<ITaskScheduler *> taskScheduler{nullptr};
+static std::atomic<ITaskScheduler *> iotaskScheduler{nullptr};
+static CriticalSection singletonCs;
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    return true;
+}
+MODULE_EXIT()
+{
+    ::Release(taskScheduler.load());
+    ::Release(iotaskScheduler.load());
+}
+
+class ATaskProcessors;
+static thread_local ATaskProcessor * activeTaskProcessor = nullptr;
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void CTask::setException(IException * e)
+{
+    IException * expected = nullptr;
+    if (exception.compare_exchange_strong(expected, e))
+        e->Link();
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void CCompletionTask::decAndWait()
+{
+    if (notePredDone())
+    {
+        //This is the last predecessor - skip signalling the semaphore and then waiting for it
+        //common if no child tasks have been created...
+    }
+    else
+        sem.wait();
+
+    if (exception)
+        throw LINK(exception.load());
+}
+
+void CCompletionTask::spawn(std::function<void ()> func)
+{
+    // Avoid spawning a new child task if a different child task has failed
+    if (!hasException())
+    {
+        CTask * task = new CFunctionTask(func, this);
+        enqueueOwnedTask(scheduler, task);
+    }
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+CFunctionTask::CFunctionTask(std::function<void ()> _func, CTask * _successor)
+: CPredecessorTask(0, _successor), func(_func)
+{
+}
+
+CTask * CFunctionTask::execute()
+{
+    //Avoid starting a new subtask if one of the subtasks has already failed
+    if (!successor->hasException())
+    {
+        try
+        {
+            func();
+        }
+        catch (IException * e)
+        {
+            successor->setException(e);
+            e->Release();
+        }
+    }
+    return checkNextTask();
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+static inline unsigned __int64 nextPowerOf2(unsigned __int64 v)
+{
+    assert(sizeof(v)==8);
+    v--;
+    v |= v >> 1;
+    v |= v >> 2;
+    v |= v >> 4;
+    v |= v >> 8;
+    v |= v >> 16;
+    v |= v >> 32;
+    v++;
+    return v;
+}
+
+
+/*
+This class aims to implement a lock free stack - with operations to push and pop items from the stack, but also
+allow other threads to steal items from top of the stack.
+
+The aim is for push and pop to be as efficient as possible (so that recursive tasks that are using all threads)
+progress as efficiently as possible.
+
+There is a nasty ABA race condition
+- one thread pushes an item
+- another thread starts to steal it.
+- the first thread pops that item, and then pushes another one.
+- the stealing thread then completes the steal, but with the old task pointer
+
+To avoid this, the pop code increments both start and end if the last item is removed, so that the steal will be
+forced to retry.  (branch savedTaskManager contains a more complex solution adding a sequence number)
+*/
+
+class CasTaskStack
+{
+public:
+    using seq_type = unsigned int;
+    using pair_type = unsigned __int64;
+
+protected:
+    static constexpr unsigned shift = sizeof(seq_type) * 8;
+
+    //Pack (start,end) into a single value.  End is in the top so it can be incremented (and decremented) without unpacking
+    static seq_type getStart(pair_type value) { return (seq_type)(value); }
+    static seq_type getEnd(pair_type value) { return (seq_type)(value >> shift); }
+    static constexpr pair_type makePair(seq_type start, seq_type end) { return (((pair_type)end) << shift) | start; }
+public:
+    CasTaskStack(seq_type _numElements) : elementMask(nextPowerOf2(_numElements)-1)
+    {
+        tasks = new std::atomic<CTask *>[numElements()];
+    }
+    ~CasTaskStack()
+    {
+        try
+        {
+            for (;;)
+            {
+                CTask * task = popNextTask();
+                if (!task)
+                    break;
+                task->Release();
+            }
+            delete [] tasks;
+        }
+        catch (...)
+        {
+            printf("Error popping task\n");
+        }
+    }
+
+    void pushTask(CTask * ownedTask)
+    {
+        pair_type cur = startEnd.load(std::memory_order_acquire);
+        seq_type start = getStart(cur);
+        seq_type end = getEnd(cur);
+        if (end - start > elementMask)
+            outOfMemory();
+
+        set(end, ownedTask);
+        //increment end atomically - wrapping causes no problem, nothing else can modify end at the same time
+        pair_type incEnd = makePair(0, 1);
+        startEnd.fetch_add(incEnd, std::memory_order_acq_rel);
+    }
+
+    CTask * popNextTask()
+    {
+        pair_type cur = startEnd.load(std::memory_order_acquire);
+        //Optimize: Can calculate end here and load task, and avoid reloading later, since nothing else can modify it
+        seq_type end = getEnd(cur);
+
+        //Fast path should be popping an item from a non-empty list => best to load even if the list proves to be empty
+        CTask * task = get(end-1);
+        for (;;)
+        {
+            seq_type start = getStart(cur);
+            if (start == end)
+                return nullptr;
+
+            pair_type next = makePair(start, end-1);
+            //Avoid ABA problem mentioned above by incrementing start and end if this is the last element
+            if (start == end -1)
+                next = makePair(end, end);
+
+            //or...  next = cur - makePair(0, 1);
+            if (startEnd.compare_exchange_weak(cur, next, std::memory_order_acq_rel))
+                return task;
+        }
+    }
+
+    //Tasks are stolen from the start of the list
+    CTask * stealTask()
+    {
+        pair_type cur = startEnd.load(std::memory_order_acquire);
+        for (;;)
+        {
+            seq_type start = getStart(cur);
+            seq_type end = getEnd(cur);
+            if (start == end)
+                return nullptr;
+            CTask * task = get(start);
+            pair_type next = makePair(start+1, end);
+            if (startEnd.compare_exchange_weak(cur, next, std::memory_order_acq_rel))
+                return task;
+        }
+    }
+
+    void outOfMemory() __attribute__((noinline))
+    {
+        //Fail if no room to add the item - could expand at this point, by cloning it into another array, and replacing tasks
+        //create a new array double the size
+        //copy current array into both sides of the new array
+        //atomically update tasks
+        //atomically update elementMask
+        //or store number of elements inside the tasks array object and only have a single atomic
+        printf("TaskStack::outOfMemory\n");
+        UNIMPLEMENTED_X("TaskStack::outOfMemory");
+
+    }
+    size_t numElements() const { return elementMask + 1; }
+
+    CTask * get(seq_type index) const
+    {
+        return tasks[index & elementMask].load(std::memory_order_acquire);
+    }
+    void set(seq_type index, CTask * task) const
+    {
+        tasks[index & elementMask].store(task, std::memory_order_release);
+    }
+
+protected:
+    seq_type elementMask;
+    std::atomic<pair_type> startEnd{0};
+    std::atomic<CTask *> * tasks;
+};
+
+static_assert(sizeof(CasTaskStack::pair_type) == 2 * sizeof(CasTaskStack::seq_type), "pair_type and seq_type are inconsistent");
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void notifyPredDone(Owned<CTask> && successor)
+{
+    if (successor->notePredDone())
+        activeTaskProcessor->enqueueOwnedChildTask(successor.getClear());
+}
+
+void notifyPredDone(CTask * successor)
+{
+    if (successor->notePredDone())
+        activeTaskProcessor->enqueueOwnedChildTask(LINK(successor));
+}
+
+void enqueueOwnedTask(ITaskScheduler & scheduler, CTask * ownedTask)
+{
+    if (activeTaskProcessor)
+        activeTaskProcessor->enqueueOwnedChildTask(ownedTask);
+    else
+        scheduler.enqueueOwnedTask(ownedTask);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class TaskScheduler;
+class CTaskProcessor final : public ATaskProcessor
+{
+    using TaskStack = CasTaskStack;
+public:
+    CTaskProcessor(TaskScheduler * _scheduler, unsigned _id);
+
+    virtual void enqueueOwnedChildTask(CTask * ownedTask) override;
+    virtual int run();
+
+    bool isAborting() const { return abort; }
+    void stopProcessing() { abort = true; }
+    CTask * stealTask() { return tasks.stealTask(); }
+
+protected:
+    void doRun();
+
+private:
+    TaskScheduler * scheduler;
+    TaskStack tasks;
+    unsigned id;
+    std::atomic_bool abort{false};
+};
+
+class TaskScheduler final : public CInterfaceOf<ITaskScheduler>
+{
+    friend class CTaskProcessor;
+public:
+    TaskScheduler(unsigned _numThreads);
+    ~TaskScheduler();
+
+    virtual void enqueueOwnedTask(CTask * ownedTask) override final
+    {
+        assertex(!ownedTask || ownedTask->isReady());
+        {
+            CriticalBlock block(cs);
+            queue.enqueue(ownedTask);
+        }
+        if (processorsWaiting)
+            avail.signal();
+    }
+
+    virtual unsigned numProcessors() const override { return numThreads; }
+
+
+protected:
+    // Return a new task for a thread processor which has run out of .
+    CTask * getNextTask(unsigned id)
+    {
+        //Implement in 3 phases
+        //a) Check if anything is on the global queue or could be stolen from another processor
+        //b) Indicate there is a thread waiting and then repeat (a)
+        //c) wait for a semaphore to indicate something has been added and then repeat (a)
+        //
+        // The aim is to:
+        // * Avoid semaphore signals and waits if the system is busy
+        // * Ensure the processors sleep if there is no work to do
+        //
+        // It is possible/likely that the semaphore will be signalled too many times - if a thread is waiting and
+        // several work items are added, but the only negative side-effect of that is that processors will spin around the check loop
+        // when there is no more work to be done.
+
+        CTask * task = nullptr;
+        bool waiting = false;
+        for (;;)
+        {
+            if (aborting)
+                break;
+
+            //First check to see if there is a global queue
+            if (!queue.isEmpty())
+            {
+                CriticalBlock block(cs);
+                task = queue.dequeue();
+                if (task)
+                    break;
+            }
+
+            //Nothing there - now see if we can steal a child from all processors except ourself.
+            unsigned nextTarget = id;
+            for (unsigned i=1; i < numThreads; i++)
+            {
+                nextTarget++;
+                if (nextTarget == numThreads)
+                    nextTarget = 0;
+                task = processors[nextTarget]->stealTask();
+                if (task)
+                {
+                    if (waiting)
+                        processorsWaiting--;
+                    return task;
+                }
+            }
+
+            //Nothing was found - probably another processor added a child but then processed it before
+            //anyone stole it.
+            if (waiting)
+                avail.wait();
+            else
+            {
+                waiting = true;
+                processorsWaiting++;
+            }
+        }
+
+        if (waiting)
+            processorsWaiting--;
+        return task;
+
+    }
+
+    void noteChildEqueued()
+    {
+        if (processorsWaiting != 0)
+            avail.signal();
+    }
+
+protected:
+    unsigned numThreads = 0;
+    std::atomic<unsigned> processorsWaiting{0};
+    CTaskProcessor * * processors = nullptr;
+    Semaphore avail;
+    CriticalSection cs;
+    DListOf<CTask> queue;
+    std::atomic_bool aborting{false};
+};
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+static constexpr unsigned maxChildTasks = 1024;
+
+CTaskProcessor::CTaskProcessor(TaskScheduler * _scheduler, unsigned _id)
+: scheduler(_scheduler), tasks(maxChildTasks), id(_id)
+{
+}
+
+void CTaskProcessor::enqueueOwnedChildTask(CTask * ownedTask)
+{
+    tasks.pushTask(ownedTask);
+    scheduler->noteChildEqueued();
+}
+
+void CTaskProcessor::doRun()
+{
+    for (;;)
+    {
+        CTask * next = scheduler->getNextTask(id);
+
+        for (;;)
+        {
+            if (abort)
+            {
+                ::Release(next);
+                return;
+            }
+
+            try
+            {
+                CTask * follow = next->execute();
+                //Not sure this should even be special cased - more sensible would be a loop within execute if you want to rerun
+                if (likely(follow != next))
+                {
+                    next->Release();
+                    if (!follow)
+                    {
+                        // Pop the next child task from the processors' private list.
+                        follow = tasks.popNextTask();
+                        if (!follow)
+                            break;  // i.e. check for an item on the global task list
+                    }
+                }
+                next = follow;
+            }
+            catch (IException * e)
+            {
+                EXCLOG(e);
+                e->Release();
+            }
+        }
+    }
+}
+
+int CTaskProcessor::run()
+{
+    activeTaskProcessor = this;
+    doRun();
+    activeTaskProcessor = nullptr;
+    return 0;
+}
+
+ATaskProcessor * queryCurrentTaskProcessor()
+{
+    return activeTaskProcessor;
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+TaskScheduler::TaskScheduler(unsigned _numThreads) : numThreads(_numThreads)
+{
+    processors = new CTaskProcessor * [numThreads];
+    for (unsigned i = 0; i < numThreads; i++)
+        processors[i] = new CTaskProcessor(this, i);
+    for (unsigned i2 = 0; i2 < numThreads; i2++)
+        processors[i2]->start();
+}
+
+TaskScheduler::~TaskScheduler()
+{
+    aborting = true;
+
+    //Indicate to all schedulers they should terminate
+    for (unsigned i1 = 0; i1 < numThreads; i1++)
+        processors[i1]->stopProcessing();
+
+    //Add null entries to the task queue so the processors can terminate
+    avail.signal(numThreads);
+    for (unsigned i3 = 0; i3 < numThreads; i3++)
+    {
+        processors[i3]->join();
+        delete processors[i3];
+    }
+    delete [] processors;
+}
+
+extern jlib_decl ITaskScheduler & queryTaskScheduler()
+{
+    return *querySingleton(taskScheduler, singletonCs, [] { return new TaskScheduler(getAffinityCpus()); });
+}
+
+extern jlib_decl ITaskScheduler & queryIOTaskScheduler()
+{
+    return *querySingleton(iotaskScheduler, singletonCs, [] { return new TaskScheduler(getAffinityCpus() * 2); });
+}
+
+//---------------------------------------------------------------------------------------------------------------------

+ 173 - 0
system/jlib/jtask.hpp

@@ -0,0 +1,173 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef JTASK_HPP
+#define JTASK_HPP
+
+#include <atomic>
+#include "jiface.hpp"
+#include "jthread.hpp"
+#include "jqueue.hpp"
+
+interface ITaskScheduler;
+
+class jlib_decl CTask : public CInterface
+{
+    friend class TaskQueue;
+    friend class DListOf<CTask>;
+
+public:
+    CTask(unsigned _numPred) : numPredecessors(_numPred) {}
+
+    //Return the next task to execute
+    virtual CTask * execute() = 0;
+
+    bool isReady() const { return numPredecessors == 0; }
+    void addPred()
+    {
+        numPredecessors.fetch_add(1);
+    }
+    // Return true if this is now available to execute.
+    bool notePredDone()
+    {
+        return numPredecessors.fetch_add(-1) == 1;
+    }
+
+    CTask * checkNextTask()
+    {
+        return nullptr;
+    }
+
+    //Set an exception (if one has not already been set), which will be thrown after waiting is complete
+    void setException(IException * e);
+    bool hasException() const { return exception != nullptr; }
+
+protected:
+    CTask * next = nullptr;
+    CTask * prev = nullptr;
+    std::atomic<unsigned> numPredecessors;
+    std::atomic<IException *> exception{nullptr};
+};
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+interface ITaskScheduler : public IInterface
+{
+public:
+    virtual void enqueueOwnedTask(CTask * ownedTask) = 0;
+    virtual unsigned numProcessors() const = 0;
+};
+
+// Functions to provide schedulers for tasks with different characteristics.
+//    queryTaskScheduler()
+//          - for tasks that should be non-blocking and reasonably fine-grained.  Number of active tasks never exceeds the number of cores.
+//    queryIOTaskScheduler()
+//          - for tasks that could be blocked by io, but not for long periods.  Number of active tasks may be higher than number of cores.
+extern jlib_decl ITaskScheduler & queryTaskScheduler();
+extern jlib_decl ITaskScheduler & queryIOTaskScheduler();
+
+// Future - a scheduler for periodic tasks might be useful
+
+//---------------------------------------------------------------------------------------------------------------------
+
+//MORE: This can probably be private within the cpp file (and enqueue can become non-virtual).
+class jlib_decl ATaskProcessor  : public Thread
+{
+public:
+    virtual void enqueueOwnedChildTask(CTask * ownedTask) = 0;
+};
+
+extern jlib_decl ATaskProcessor * queryCurrentTaskProcessor();
+
+//---------------------------------------------------------------------------------------------------------------------
+
+extern jlib_decl void notifyPredDone(CTask * successor);
+extern jlib_decl void notifyPredDone(Owned<CTask> && successor);
+extern jlib_decl void enqueueOwnedTask(ITaskScheduler & scheduler, CTask * ownedTask);
+
+//---------------------------------------------------------------------------------------------------------------------
+// Helper task implementations
+//---------------------------------------------------------------------------------------------------------------------
+
+// A task with a successor, which automatically manages the predecessor count for the successor task
+// return checkNextTask() from the execute method of the task when it is complete.
+class jlib_decl CPredecessorTask : public CTask
+{
+public:
+    CPredecessorTask(unsigned _numPred, CTask * _successor) : CTask(_numPred), successor(_successor)
+    {
+        if (successor)
+            successor->addPred();
+    }
+
+    CTask * checkNextTask()
+    {
+        if (successor)
+        {
+            if (successor->notePredDone())
+                return successor.getClear();
+        }
+        return nullptr;
+    }
+
+protected:
+    Linked<CTask> successor; // may be cleared once this task is complete
+};
+
+//---------------------------------------------------------------------------------------------------------------------
+
+// A helpful utility class which can be used as a successor for other tasks, and will signal a semaphore once all
+// the preceeding tasks have completed.  Allows a sort or similar with nested tasks to wait until all work is complete.
+// NB: Always allocate this on the heap, otherwise it can go out of scope before execute() returns causing chaos!
+class jlib_decl CCompletionTask final : public CTask
+{
+public:
+    CCompletionTask(unsigned _numPred, ITaskScheduler & _scheduler) : CTask(_numPred), scheduler(_scheduler) {}
+    ~CCompletionTask() { ::Release(exception.load()); }
+
+    virtual CTask * execute() override
+    {
+        sem.signal();
+        return nullptr;
+    }
+
+    // Execute a function as a child task - decAndWait() will wait for completion
+    void spawn(std::function<void ()> func);
+
+    //Called when main thread has completed - decrements the predecessor count, and waits for completion
+    void decAndWait();
+
+protected:
+    ITaskScheduler & scheduler;
+    Semaphore sem;
+};
+
+// A class used by CCompletionTask to implement spawn
+class jlib_decl CFunctionTask final : public CPredecessorTask
+{
+public:
+    CFunctionTask(std::function<void ()> _func, CTask * _successor);
+
+    virtual CTask * execute() override;
+
+protected:
+    std::function<void ()> func;
+};
+
+
+#endif

+ 1 - 2
testing/regress/ecl/sortnorm.ecl

@@ -8,8 +8,7 @@
 
 
 import ^ as root;
 import ^ as root;
 algo := #IFDEFINED(root.algo, 'quicksort');
 algo := #IFDEFINED(root.algo, 'quicksort');
-
-numRows := 100000;
+numRows := #IFDEFINED(root.numRows, 100000);
 
 
 ds := DATASET(numRows, TRANSFORM({unsigned id}, SELF.id := HASH32(COUNTER)));
 ds := DATASET(numRows, TRANSFORM({unsigned id}, SELF.id := HASH32(COUNTER)));
 s1 := sort(ds, id, local, stable(algo));
 s1 := sort(ds, id, local, stable(algo));