Pārlūkot izejas kodu

Merge pull request #15828 from ghalliday/mergeSort

HPCC-27237 Improvements to task manager and re-implement task merge sort

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 gadi atpakaļ
vecāks
revīzija
b04030243f

+ 91 - 78
common/thorhelper/thorsort.cpp

@@ -21,10 +21,9 @@
 #include "jlog.hpp"
 #include "errorlist.h"
 #include <exception>
+#include "jtask.hpp"
 
 #ifdef _USE_TBB
-#include "tbb/task.h"
-#include "tbb/task_scheduler_init.h"
 #include "tbb/parallel_sort.h"
 #endif
 
@@ -277,126 +276,129 @@ void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, voi
 
 //=========================================================================
 
-#ifdef _USE_TBB
 static const unsigned numPartitionSamples = 3;
 //These constants are probably architecture and number of core dependent
 static const size_t singleThreadedMSortThreshold = 2000;
 static const size_t multiThreadedBlockThreshold = 64;       // must be at least 2!
 
-using tbb::task;
-class TbbParallelMergeSorter
+class ParallelMergeSorter
 {
-    class SplitTask : public tbb::task
+    class SplitTask : public CTask
     {
     public:
-        SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
+        SplitTask(CTask * _succ1, CTask * _succ2) : CTask(1), succ1(_succ1), succ2(_succ2)
         {
         }
 
-        virtual task * execute()
+        virtual CTask * execute()
         {
-            if (next1->decrement_ref_count() == 0)
-                spawn(*next1);
-            if (next2->decrement_ref_count() == 0)
-                return next2;
+            //MORE: Nothing shares either succ1 or succ2, so we could initialize predecessor count to 0, and unconditionally schedule
+            if (succ1->notePredDone())
+                spawnOwnedChildTask(*succ1);
+            if (succ2->notePredDone())
+                return succ2;
             return NULL;
         }
     protected:
-        task * next1;
-        task * next2;
+        CTask * succ1;
+        CTask * succ2;
     };
 
-    class BisectTask : public tbb::task
+    class BisectTask : public CTask
     {
     public:
-        BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
-        : sorter(_sorter), rows(_rows), temp(_temp), next(_next), n(_n), depth(_depth)
+        BisectTask(ParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, CTask * _succ)
+        : CTask(0), sorter(_sorter), rows(_rows), temp(_temp), successor(_succ), n(_n), depth(_depth)
         {
         }
-        virtual task * execute()
+        virtual CTask * execute()
         {
             for (;;)
             {
                 //On entry next is assumed to be used once by this function
                 if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
                 {
-                    //Create a new task rather than calling sort directly, so that the successor is set up correctly
-                    //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
-                    task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
-                    return sort;
+                    mergeSort(rows, n, sorter.compare, temp, depth);
+                    if (successor->notePredDone())
+                        return successor;
+                    return nullptr;
                 }
 
                 void * * result = (depth & 1) ? temp : rows;
                 void * * src = (depth & 1) ? rows : temp;
                 size_t n1 = (n+1)/2;
                 size_t n2 = n-n1;
-                task * mergeTask;
+                CTask * mergeTask;
                 if (depth < sorter.parallelMergeDepth)
                 {
                     unsigned partitions = sorter.numPartitionCores() >> depth;
+
+                    //Following will create 2 * partition merge tasks, one of which will inherit the already incremented predecessor count
                     if (partitions > 1)
                     {
-                        PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
+                        successor->incPred(2 * partitions - 1);
+                        PartitionSplitTask * splitTask = new PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
                         for (unsigned i=0; i < partitions; i++)
                         {
-                            MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
-                            mergeFwdTask->set_ref_count(1);
-                            MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
-                            mergeRevTask->set_ref_count(1);
+                            MergeTask * mergeFwdTask = new MergeTask(sorter.compare, *successor, result, n1, src, n2, src+n1, 0);
+                            MergeTask * mergeRevTask = new MergeRevTask(sorter.compare, *successor, result, n1, src, n2, src+n1, 0);
                             splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
                         }
-                        next->decrement_ref_count();
                         mergeTask = splitTask;
                     }
                     else
                     {
-                        task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
-                        mergeFwdTask->set_ref_count(1);
-                        task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
-                        mergeRevTask->set_ref_count(1);
-                        mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
+                        successor->incPred(1);
+                        CTask * mergeFwdTask = new MergeTask(sorter.compare, *successor, result, n1, src, n2, src+n1, n1);
+                        CTask * mergeRevTask = new MergeRevTask(sorter.compare, *successor, result, n1, src, n2, src+n1, n2);
+                        mergeTask = new SplitTask(mergeFwdTask, mergeRevTask);
                     }
                 }
                 else
                 {
-                    mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
+                    mergeTask = new MergeTask(sorter.compare, *successor, result, n1, src, n2, src+n1, n);
                 }
 
-                mergeTask->set_ref_count(2);
-                task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
-                spawn(*bisectRightTask);
+                mergeTask->setNumPredecessors(2);
+
+                CTask * bisectRightTask = new BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
+                enqueueOwnedTask(sorter.scheduler, *bisectRightTask);
 
                 //recurse directly on the left side rather than creating a new task
                 n = n1;
                 depth = depth+1;
-                next = mergeTask;
+                successor = mergeTask;
             }
         }
     protected:
-        TbbParallelMergeSorter & sorter;
+        ParallelMergeSorter & sorter;
         void ** rows;
         void ** temp;
-        task * next;
+        CTask * successor;
         size_t n;
         unsigned depth;
     };
 
 
-    class SubSortTask : public tbb::task
+    class SubSortTask : public CTask
     {
     public:
-        SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
-        : sorter(_sorter), rows(_rows), temp(_temp), n(_n), depth(_depth)
+        SubSortTask(ParallelMergeSorter & _sorter, CTask & _successor, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
+        : CTask(0), sorter(_sorter), successor(_successor), rows(_rows), temp(_temp), n(_n), depth(_depth)
         {
         }
 
-        virtual task * execute()
+        virtual CTask * execute()
         {
             mergeSort(rows, n, sorter.compare, temp, depth);
+            if (successor.notePredDone())
+                return &successor;
             return NULL;
         }
+
     protected:
-        TbbParallelMergeSorter & sorter;
+        ParallelMergeSorter & sorter;
+        CTask & successor;
         void ** rows;
         void ** temp;
         size_t n;
@@ -404,15 +406,15 @@ class TbbParallelMergeSorter
     };
 
 
-    class MergeTask : public tbb::task
+    class MergeTask : public CTask
     {
     public:
-        MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
-        : compare(_compare),result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
+        MergeTask(const ICompare & _compare, CTask & _successor, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
+        : CTask(1), compare(_compare), successor(_successor), result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
         {
         }
 
-        virtual task * execute()
+        virtual CTask * execute()
         {
             //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
             if (n1 == 0)
@@ -427,7 +429,7 @@ class TbbParallelMergeSorter
             }
             else
                 mergePartitions(compare, result, n1, src1, n2, src2, n);
-            return NULL;
+            return checkNextTask();
         }
 
         void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
@@ -440,8 +442,16 @@ class TbbParallelMergeSorter
             n = num;
         }
 
+        CTask * checkNextTask()
+        {
+            if (successor.notePredDone())
+                return &successor;
+            return nullptr;
+        }
+
     protected:
         const ICompare & compare;
+        CTask & successor;
         void * * result;
         void * * src1;
         void * * src2;
@@ -453,12 +463,12 @@ class TbbParallelMergeSorter
     class MergeRevTask : public MergeTask
     {
     public:
-        MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
-        : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
+        MergeRevTask(const ICompare & _compare, CTask & _successor, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
+        : MergeTask(_compare, _successor, _result, _n1, _src1, _n2, _src2, _n)
         {
         }
 
-        virtual task * execute()
+        virtual CTask * execute()
         {
             if (n1 == 0)
             {
@@ -475,15 +485,16 @@ class TbbParallelMergeSorter
             }
             else
                 mergePartitionsRev(compare, result, n1, src1, n2, src2, n);
-            return NULL;
+
+            return checkNextTask();
         }
     };
 
-    class PartitionSplitTask : public tbb::task
+    class PartitionSplitTask : public CTask
     {
     public:
         PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
-            : compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
+            : CTask(0), compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
         {
             //These could be local variables in calculatePartitions(), but placed here to simplify cleanup.  (Should consider using alloca)
             posLeft = new size_t[numPartitions+1];
@@ -606,13 +617,15 @@ class TbbParallelMergeSorter
             }
         }
 
-        virtual task * execute()
+        virtual CTask * execute()
         {
             calculatePartitions();
             for (unsigned i=0; i < numPartitions*2; i++)
             {
-                if (tasks[i]->decrement_ref_count() == 0)
-                    spawn(*tasks[i]);
+                //NOTE: These tasks only hava a single successor, so simpler to set #pred to 0, and unconditionally spawn them
+                //(and use continuation to return the 1st)
+                if (tasks[i]->notePredDone())
+                    spawnOwnedChildTask(*tasks[i]);
             }
             return NULL;
         }
@@ -671,7 +684,8 @@ class TbbParallelMergeSorter
     };
 
 public:
-    TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
+    ParallelMergeSorter(void * * _rows, const ICompare & _compare)
+    : scheduler(queryTaskScheduler()), compare(_compare), baseRows(_rows)
     {
         //The following constants control the number of iterations to be performed in parallel.
         //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
@@ -679,7 +693,7 @@ public:
         const unsigned extraBisectDepth = 3;
         const unsigned extraParallelMergeDepth = 3;
 
-        unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
+        unsigned numCpus = scheduler.numProcessors();
         unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
         assertex(numCpus <= (1U << ln2NumCpus));
 
@@ -688,23 +702,28 @@ public:
 
         //Aim to execute in parallel until the width is 8*the maximum number of parallel task
         singleThreadDepth = ln2NumCpus + extraBisectDepth;
-        partitionCores = numCpus / 2;
+        partitionCores = std::max(numCpus / 2, 1U);
     }
 
     unsigned numPartitionCores() const { return partitionCores; }
 
     void sortRoot(void ** rows, size_t n, void ** temp)
     {
-        task * end = new (task::allocate_root()) tbb::empty_task();
-        end->set_ref_count(1+1);
-        task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
-        end->spawn(*task);
-        end->wait_for_all();
-        end->destroy(*end);
+        unsigned  numPred = 1 + 1; // the initial bisect task and the wait for the result.
+        Owned<CCompletionTask> end = new CCompletionTask(scheduler, numPred);
+
+        // Rely on scheduling to release the link counts for child tasks (including this completion task) to minimize atomic operations
+        end->setMinimalLinking();
+
+        //MORE: This bisection could be done in a single pass, rather than creating separate tasks - although
+        //it is hard to tell what would be the most efficient...  This ensures they are spread over all cpus.
+        scheduler.enqueueOwnedTask(*new BisectTask(*this, rows, n, temp, 0, end));
+        end->decAndWait();
     }
 
 public:
     const ICompare & compare;
+    ITaskScheduler & scheduler;
     unsigned singleThreadDepth;
     unsigned parallelMergeDepth;
     unsigned partitionCores;
@@ -712,9 +731,9 @@ public:
 };
 
 //-------------------------------------------------------------------------------------------------------------------
-void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
+void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
 {
-    if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
+    if ((n <= singleThreadedMSortThreshold) || queryTaskScheduler().numProcessors() == 1)
     {
         msortvecstableinplace(rows, n, compare, temp);
         return;
@@ -722,7 +741,7 @@ void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare,
 
     try
     {
-        TbbParallelMergeSorter sorter(rows, compare);
+        ParallelMergeSorter sorter(rows, compare);
         sorter.sortRoot(rows, n, temp);
     }
     catch (const std::exception & e)
@@ -730,9 +749,3 @@ void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare,
         throw makeStringExceptionV(ERRORID_UNKNOWN, "TBB exception: %s", e.what());
     }
 }
-#else
-void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
-{
-    parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
-}
-#endif

+ 5 - 3
common/thorhelper/thorsort.hpp

@@ -23,13 +23,15 @@
 #include "thorhelper.hpp"
 #include "jsort.hpp"
 
+//#define DEFAULT_MERGE_SORT
+
 extern THORHELPER_API void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp);
-extern THORHELPER_API void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus=0);
+extern THORHELPER_API void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp);
 
 inline void parsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** stableTablePtr, unsigned maxCores=0)
 {
-#ifdef _USE_TBB
-    parmsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
+#ifdef DEFAULT_MERGE_SORT
+    parmsortvecstableinplace(rows, n, compare, stableTablePtr);
 #else
     parqsortvecstableinplace(rows, n, compare, stableTablePtr, maxCores);
 #endif

+ 4 - 4
system/jlib/jsort2.cpp

@@ -100,7 +100,7 @@ class cTaskQSortBase
 
 public:
 
-    cTaskQSortBase() : taskScheduler(queryTaskScheduler()), finished(new CCompletionTask(1, queryTaskScheduler()))
+    cTaskQSortBase() : taskScheduler(queryTaskScheduler()), finished(new CCompletionTask(queryTaskScheduler()))
     {
     }
 
@@ -114,7 +114,7 @@ private:
     //MORE: Not really sure what this should do...
     void abort()
     {
-        notifyPredDone(finished);
+        notifyPredDone(*finished);
     }
 
     void doSubSort(unsigned s, unsigned n)
@@ -136,14 +136,14 @@ private:
             }
         }
         serialsort(s,n);
-        notifyPredDone(finished);
+        notifyPredDone(*finished);
     }
 
     void enqueueSort(unsigned from, unsigned num)
     {
         CSubSortTask * task = new CSubSortTask(this, from, num);
         finished->addPred();
-        enqueueOwnedTask(taskScheduler, task);
+        enqueueOwnedTask(taskScheduler, *task);
     }
 
 public:

+ 142 - 101
system/jlib/jtask.cpp

@@ -18,10 +18,13 @@
 #include "platform.h"
 #include <string.h>
 #include <limits.h>
+#include <algorithm>
 #include "jtask.hpp"
 #include "jlog.hpp"
 #include "jqueue.hpp"
 
+//#define TRACE_TASKS
+
 static std::atomic<ITaskScheduler *> taskScheduler{nullptr};
 static std::atomic<ITaskScheduler *> iotaskScheduler{nullptr};
 static CriticalSection singletonCs;
@@ -36,69 +39,9 @@ MODULE_EXIT()
     ::Release(iotaskScheduler.load());
 }
 
-class ATaskProcessors;
-static thread_local ATaskProcessor * activeTaskProcessor = nullptr;
-
-//---------------------------------------------------------------------------------------------------------------------
-
-void CTask::setException(IException * e)
-{
-    IException * expected = nullptr;
-    if (exception.compare_exchange_strong(expected, e))
-        e->Link();
-}
-
-//---------------------------------------------------------------------------------------------------------------------
-
-void CCompletionTask::decAndWait()
-{
-    if (notePredDone())
-    {
-        //This is the last predecessor - skip signalling the semaphore and then waiting for it
-        //common if no child tasks have been created...
-    }
-    else
-        sem.wait();
-
-    if (exception)
-        throw LINK(exception.load());
-}
-
-void CCompletionTask::spawn(std::function<void ()> func)
-{
-    // Avoid spawning a new child task if a different child task has failed
-    if (!hasException())
-    {
-        CTask * task = new CFunctionTask(func, this);
-        enqueueOwnedTask(scheduler, task);
-    }
-}
-
-
-//---------------------------------------------------------------------------------------------------------------------
-
-CFunctionTask::CFunctionTask(std::function<void ()> _func, CTask * _successor)
-: CPredecessorTask(0, _successor), func(_func)
-{
-}
-
-CTask * CFunctionTask::execute()
-{
-    //Avoid starting a new subtask if one of the subtasks has already failed
-    if (!successor->hasException())
-    {
-        try
-        {
-            func();
-        }
-        catch (IException * e)
-        {
-            successor->setException(e);
-            e->Release();
-        }
-    }
-    return checkNextTask();
-}
+class CTaskProcessor;
+class TaskScheduler;
+static thread_local CTaskProcessor * activeTaskProcessor = nullptr;
 
 //---------------------------------------------------------------------------------------------------------------------
 
@@ -260,38 +203,16 @@ static_assert(sizeof(CasTaskStack::pair_type) == 2 * sizeof(CasTaskStack::seq_ty
 
 //---------------------------------------------------------------------------------------------------------------------
 
-void notifyPredDone(Owned<CTask> && successor)
-{
-    if (successor->notePredDone())
-        activeTaskProcessor->enqueueOwnedChildTask(successor.getClear());
-}
-
-void notifyPredDone(CTask * successor)
-{
-    if (successor->notePredDone())
-        activeTaskProcessor->enqueueOwnedChildTask(LINK(successor));
-}
-
-void enqueueOwnedTask(ITaskScheduler & scheduler, CTask * ownedTask)
-{
-    if (activeTaskProcessor)
-        activeTaskProcessor->enqueueOwnedChildTask(ownedTask);
-    else
-        scheduler.enqueueOwnedTask(ownedTask);
-}
-
-//---------------------------------------------------------------------------------------------------------------------
-
-class TaskScheduler;
-class CTaskProcessor final : public ATaskProcessor
+class CTaskProcessor final : public Thread
 {
     using TaskStack = CasTaskStack;
 public:
     CTaskProcessor(TaskScheduler * _scheduler, unsigned _id);
 
-    virtual void enqueueOwnedChildTask(CTask * ownedTask) override;
+// Thread
     virtual int run();
 
+    void enqueueOwnedChildTask(CTask & ownedTask);
     bool isAborting() const { return abort; }
     void stopProcessing() { abort = true; }
     CTask * stealTask() { return tasks.stealTask(); }
@@ -313,12 +234,12 @@ public:
     TaskScheduler(unsigned _numThreads);
     ~TaskScheduler();
 
-    virtual void enqueueOwnedTask(CTask * ownedTask) override final
+    virtual void enqueueOwnedTask(CTask & ownedTask) override final
     {
-        assertex(!ownedTask || ownedTask->isReady());
+        assertex(ownedTask.isReady());
         {
             CriticalBlock block(cs);
-            queue.enqueue(ownedTask);
+            queue.enqueue(&ownedTask);
         }
         if (processorsWaiting)
             avail.signal();
@@ -370,6 +291,9 @@ protected:
                 task = processors[nextTarget]->stealTask();
                 if (task)
                 {
+#ifdef TRACE_TASKS
+                    printf("Stolen for %u on %u\n", id, sched_getcpu());
+#endif
                     if (waiting)
                         processorsWaiting--;
                     return task;
@@ -379,7 +303,15 @@ protected:
             //Nothing was found - probably another processor added a child but then processed it before
             //anyone stole it.
             if (waiting)
+            {
+#ifdef TRACE_TASKS
+                printf("Pause %u on %u\n", id, sched_getcpu());
+#endif
                 avail.wait();
+#ifdef TRACE_TASKS
+                printf("Restart %u on %u\n", id, sched_getcpu());
+#endif
+            }
             else
             {
                 waiting = true;
@@ -387,6 +319,10 @@ protected:
             }
         }
 
+#ifdef TRACE_TASKS
+        printf("Task for %u on %u\n", id, sched_getcpu());
+#endif
+
         if (waiting)
             processorsWaiting--;
         return task;
@@ -419,9 +355,9 @@ CTaskProcessor::CTaskProcessor(TaskScheduler * _scheduler, unsigned _id)
 {
 }
 
-void CTaskProcessor::enqueueOwnedChildTask(CTask * ownedTask)
+void CTaskProcessor::enqueueOwnedChildTask(CTask & ownedTask)
 {
-    tasks.pushTask(ownedTask);
+    tasks.pushTask(&ownedTask);
     scheduler->noteChildEqueued();
 }
 
@@ -473,12 +409,112 @@ int CTaskProcessor::run()
     return 0;
 }
 
-ATaskProcessor * queryCurrentTaskProcessor()
+//---------------------------------------------------------------------------------------------------------------------
+
+void notifyPredDone(Owned<CTask> && successor)
+{
+    if (successor->notePredDone())
+        activeTaskProcessor->enqueueOwnedChildTask(*successor.getClear());
+}
+
+void notifyPredDone(CTask & successor)
+{
+    if (successor.notePredDone())
+        activeTaskProcessor->enqueueOwnedChildTask(OLINK(successor));
+}
+
+void enqueueOwnedTask(ITaskScheduler & scheduler, CTask & ownedTask)
+{
+    if (activeTaskProcessor)
+        activeTaskProcessor->enqueueOwnedChildTask(ownedTask);
+    else
+        scheduler.enqueueOwnedTask(ownedTask);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void CTask::spawnOwnedChildTask(CTask & ownedTask)
+{
+    assertex(activeTaskProcessor);
+    activeTaskProcessor->enqueueOwnedChildTask(ownedTask);
+}
+
+void CTask::setException(IException * e)
+{
+    IException * expected = nullptr;
+    if (exception.compare_exchange_strong(expected, e))
+        e->Link();
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void CCompletionTask::decAndWait()
+{
+    if (notePredDone())
+    {
+        //This is the last predecessor - skip signalling the semaphore and then waiting for it
+        //common if no child tasks have been created...
+        if (!tasksLinkedOnSchedule)
+            Release();
+    }
+    else
+        sem.wait();
+
+    if (exception)
+        throw LINK(exception.load());
+}
+
+void CCompletionTask::setMinimalLinking()
+{
+    Link();     // This will be release either when the task is scheduled, or when waiting for completion.
+    tasksLinkedOnSchedule = false;
+}
+
+void CCompletionTask::spawn(std::function<void ()> func, bool ignoreParallelExceptions)
+{
+    // Avoid spawning a new child task if a different child task has failed
+    if (ignoreParallelExceptions || !hasException())
+    {
+        CTask * task = new CFunctionTask(func, this, ignoreParallelExceptions);
+        enqueueOwnedTask(scheduler, *task);
+    }
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+CFunctionTask::CFunctionTask(std::function<void ()> _func, CTask * _successor, bool _ignoreParallelExceptions)
+: CPredecessorTask(0, _successor), func(_func), ignoreParallelExceptions(_ignoreParallelExceptions)
+{
+}
+
+CTask * CFunctionTask::execute()
 {
-    return activeTaskProcessor;
+    //Avoid starting a new subtask if one of the subtasks has already failed
+    if (ignoreParallelExceptions || !successor->hasException())
+    {
+        try
+        {
+            func();
+        }
+        catch (IException * e)
+        {
+            successor->setException(e);
+            e->Release();
+        }
+    }
+    return checkNextTask();
 }
 
 //---------------------------------------------------------------------------------------------------------------------
+static unsigned maxTasks = (unsigned)-1;
+static unsigned maxIOTasks = (unsigned)-1;
+
+void setTaskLimits(unsigned _maxCpuTasks, unsigned _maxIOTasks)
+{
+    maxTasks = _maxCpuTasks;
+    maxIOTasks = _maxIOTasks;
+}
 
 TaskScheduler::TaskScheduler(unsigned _numThreads) : numThreads(_numThreads)
 {
@@ -497,24 +533,29 @@ TaskScheduler::~TaskScheduler()
     for (unsigned i1 = 0; i1 < numThreads; i1++)
         processors[i1]->stopProcessing();
 
-    //Add null entries to the task queue so the processors can terminate
+    //Start all the processors
     avail.signal(numThreads);
+
+    //Join all processors before deleting - just in case something tries to steal from a deleted processor
+    //(only possible if this is terminating while tasks are currently being processed)
+    for (unsigned i2 = 0; i2 < numThreads; i2++)
+        processors[i2]->join();
+
     for (unsigned i3 = 0; i3 < numThreads; i3++)
-    {
-        processors[i3]->join();
         delete processors[i3];
-    }
     delete [] processors;
 }
 
 extern jlib_decl ITaskScheduler & queryTaskScheduler()
 {
-    return *querySingleton(taskScheduler, singletonCs, [] { return new TaskScheduler(getAffinityCpus()); });
+    unsigned numTasks = std::min(maxTasks, getAffinityCpus());
+    return *querySingleton(taskScheduler, singletonCs, [ numTasks ] { return new TaskScheduler(numTasks); });
 }
 
 extern jlib_decl ITaskScheduler & queryIOTaskScheduler()
 {
-    return *querySingleton(iotaskScheduler, singletonCs, [] { return new TaskScheduler(getAffinityCpus() * 2); });
+    unsigned numTasks = std::min(maxIOTasks, getAffinityCpus() * 2);
+    return *querySingleton(iotaskScheduler, singletonCs, [ numTasks ] { return new TaskScheduler(numTasks); });
 }
 
 //---------------------------------------------------------------------------------------------------------------------

+ 113 - 20
system/jlib/jtask.hpp

@@ -23,6 +23,72 @@
 #include "jthread.hpp"
 #include "jqueue.hpp"
 
+
+/*
+
+This file defines multiple taskSchedulers than be used to execute code in parallel without the cost of starting up
+new threads, and avoiding over-commiting the number of cores to process tasks.
+
+It is currently aimed at non-blocking tasks, but the hope is to also use it for IO based tasks (which may over-commit to a certain degree).
+
+There are two common ways of using it.
+
+1) Lambda style functions.
+
+Create a completion task.  When you want to execute some code in parallel use the spawn() function, and at the end wait for all tasks to complete.
+
+Owned<CCompletionTask> completed = new CCompletionTask[(queryTaskScheduler())];
+...
+completed->spawn([n]() { printf("%u\n", n); });
+...
+completed->decAndWait();
+
+
+2) Create task classes, and schedule them when they are ready to run
+
+
+Owned<CCompletionTask> completed = new CCompletionTask[(queryTaskScheduler())];
+...
+CTask * task = new XTask(completed);
+queryTaskScheduler().enqueueOwnedTask(*task);
+...
+processor->decAndWait();
+
+
+Link counting
+-------------
+
+- Simple successor linking:
+  The simplest approach is for all tasks to LINK their successor tasks.  When a successor task is ready to be scheduled
+  it is LINKed and scheduled.  The link counts are reduced when the predecessor task is destroyed, and then released
+  again when the task completes.
+  However, this means two effective link counts are held for a task, one for the number of predecessors and another
+  for the lifetime.
+
+  The classes used to implmement the lambda tasks use this approach.
+
+- Avoiding successor linking:
+  A task is only ever scheduled by a single predecessor. If all created tasks will eventually be
+  executed there is no need to link/release the successor tasks.  The link count will be 1 from when it was created.
+  When it is executed it will be decremented and cleaned up.
+
+  If this approach is used, you must call setMinimalLinking() on the CCompletionTask - this increments the link count
+  ready for decrementing when the task is scheduled, and also avoids starting a task if there are no child tasks
+  waiting to complete.
+
+  The parallel merge sort uses this approach.
+
+Exceptions
+----------
+Any code using tasks needs to be careful that it will complete if exceptions are reported.  Task base class has
+helper functions for thread-safely recording exceptions for later throwing
+
+The lambda style functions automatically forward any exception to the completion task, which will rethrow the first
+exception once all functions have completed.  By default new functions will not be executed once another function
+has thrown an exception, but this can be overriden.
+
+*/
+
 interface ITaskScheduler;
 
 class jlib_decl CTask : public CInterface
@@ -32,6 +98,7 @@ class jlib_decl CTask : public CInterface
 
 public:
     CTask(unsigned _numPred) : numPredecessors(_numPred) {}
+    ~CTask() { ::Release(exception.load()); }
 
     //Return the next task to execute
     virtual CTask * execute() = 0;
@@ -41,6 +108,10 @@ public:
     {
         numPredecessors.fetch_add(1);
     }
+    void incPred(unsigned numExtra)
+    {
+        numPredecessors.fetch_add(numExtra);
+    }
     // Return true if this is now available to execute.
     bool notePredDone()
     {
@@ -52,9 +123,18 @@ public:
         return nullptr;
     }
 
-    //Set an exception (if one has not already been set), which will be thrown after waiting is complete
-    void setException(IException * e);
+    void setNumPredecessors(unsigned _numPred)
+    {
+        numPredecessors.store(_numPred);
+    }
+
+    // Called within an executing task to start a child task - will be pushed onto the local task queue
+    void spawnOwnedChildTask(CTask & ownedTask);
+
+// Exception management helper functions
     bool hasException() const { return exception != nullptr; }
+    IException * queryException() const { return exception.load(); }
+    void setException(IException * e);      //Set an exception (if one has not already been set)
 
 protected:
     CTask * next = nullptr;
@@ -69,7 +149,7 @@ protected:
 interface ITaskScheduler : public IInterface
 {
 public:
-    virtual void enqueueOwnedTask(CTask * ownedTask) = 0;
+    virtual void enqueueOwnedTask(CTask & ownedTask) = 0;
     virtual unsigned numProcessors() const = 0;
 };
 
@@ -85,20 +165,9 @@ extern jlib_decl ITaskScheduler & queryIOTaskScheduler();
 
 //---------------------------------------------------------------------------------------------------------------------
 
-//MORE: This can probably be private within the cpp file (and enqueue can become non-virtual).
-class jlib_decl ATaskProcessor  : public Thread
-{
-public:
-    virtual void enqueueOwnedChildTask(CTask * ownedTask) = 0;
-};
-
-extern jlib_decl ATaskProcessor * queryCurrentTaskProcessor();
-
-//---------------------------------------------------------------------------------------------------------------------
-
-extern jlib_decl void notifyPredDone(CTask * successor);
+extern jlib_decl void notifyPredDone(CTask & successor);
 extern jlib_decl void notifyPredDone(Owned<CTask> && successor);
-extern jlib_decl void enqueueOwnedTask(ITaskScheduler & scheduler, CTask * ownedTask);
+extern jlib_decl void enqueueOwnedTask(ITaskScheduler & scheduler, CTask & ownedTask);
 
 //---------------------------------------------------------------------------------------------------------------------
 // Helper task implementations
@@ -137,8 +206,8 @@ protected:
 class jlib_decl CCompletionTask final : public CTask
 {
 public:
-    CCompletionTask(unsigned _numPred, ITaskScheduler & _scheduler) : CTask(_numPred), scheduler(_scheduler) {}
-    ~CCompletionTask() { ::Release(exception.load()); }
+    CCompletionTask(ITaskScheduler & _scheduler, unsigned _numPred=1) : CTask(_numPred), scheduler(_scheduler) {}
+    CCompletionTask() : CTask(1), scheduler(queryTaskScheduler()) {}
 
     virtual CTask * execute() override
     {
@@ -147,27 +216,51 @@ public:
     }
 
     // Execute a function as a child task - decAndWait() will wait for completion
-    void spawn(std::function<void ()> func);
+    void spawn(std::function<void ()> func, bool ignoreParallelExceptions = false);
 
     //Called when main thread has completed - decrements the predecessor count, and waits for completion
     void decAndWait();
 
+    //Called when tasks are not linked before scheduling
+    void setMinimalLinking();
+
 protected:
     ITaskScheduler & scheduler;
     Semaphore sem;
+    bool tasksLinkedOnSchedule{true};
 };
 
 // A class used by CCompletionTask to implement spawn
 class jlib_decl CFunctionTask final : public CPredecessorTask
 {
 public:
-    CFunctionTask(std::function<void ()> _func, CTask * _successor);
+    CFunctionTask(std::function<void ()> _func, CTask * _successor, bool _ignoreParallelExceptions);
 
     virtual CTask * execute() override;
 
 protected:
     std::function<void ()> func;
+    bool ignoreParallelExceptions;
 };
 
+//Implementation of asyncFor that uses the task library
+template <typename AsyncFunc>
+inline void taskAsyncFor(unsigned num, ITaskScheduler & scheduler, AsyncFunc func)
+{
+    if (num != 1)
+    {
+        Owned<CCompletionTask> completed = new CCompletionTask(scheduler);
+        for (unsigned i=0; i < num; i++)
+            completed->spawn([i, func]() { func(i); });
+
+        completed->decAndWait();
+    }
+    else
+        func(0);
+}
+
+static constexpr unsigned UnlimitedTasks = (unsigned)-1;
+// Allow the number of parallel tasks to be restricted.  by default it will be set to (#cores, 2* #cores)
+void jlib_decl setTaskLimits(unsigned _maxCpuTasks, unsigned _maxIOTasks);
 
 #endif

+ 18 - 0
system/jlib/jthread.cpp

@@ -26,6 +26,8 @@
 #include "jregexp.hpp"
 #include "jlog.ipp"
 #include "jisem.hpp"
+#include "jtask.hpp"
+
 #include <assert.h>
 #ifdef _WIN32
 #include <process.h>
@@ -841,6 +843,22 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
         throw e;
 }
 
+void CAsyncFor::TaskFor(unsigned num, ITaskScheduler & scheduler)
+{
+    if (num <= 1)
+    {
+        if (num == 1)
+            Do(0);
+        return;
+    }
+
+    Owned<CCompletionTask> completed = new CCompletionTask(scheduler);
+    for (unsigned i=0; i < num; i++)
+        completed->spawn([i, this]() { Do(i); });
+
+    completed->decAndWait();
+}
+
 //---------------------------------------------------------------------------------------------------------------------
 
 class CSimpleFunctionThread : public Thread

+ 2 - 1
system/jlib/jthread.hpp

@@ -198,11 +198,12 @@ public:
 
 // Asynchronous 'for' utility class
 // see HRPCUTIL.CPP for example of usage
-
+interface ITaskScheduler;
 class jlib_decl CAsyncFor
 {
 public:
     void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false);
+    void TaskFor(unsigned num, ITaskScheduler & scheduler);
     virtual void Do(unsigned idx=0)=0;
 };