jtask.hpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef JTASK_HPP
  14. #define JTASK_HPP
  15. #include <atomic>
  16. #include "jiface.hpp"
  17. #include "jthread.hpp"
  18. #include "jqueue.hpp"
  19. interface ITaskScheduler;
  20. class jlib_decl CTask : public CInterface
  21. {
  22. friend class TaskQueue;
  23. friend class DListOf<CTask>;
  24. public:
  25. CTask(unsigned _numPred) : numPredecessors(_numPred) {}
  26. //Return the next task to execute
  27. virtual CTask * execute() = 0;
  28. bool isReady() const { return numPredecessors == 0; }
  29. void addPred()
  30. {
  31. numPredecessors.fetch_add(1);
  32. }
  33. // Return true if this is now available to execute.
  34. bool notePredDone()
  35. {
  36. return numPredecessors.fetch_add(-1) == 1;
  37. }
  38. CTask * checkNextTask()
  39. {
  40. return nullptr;
  41. }
  42. //Set an exception (if one has not already been set), which will be thrown after waiting is complete
  43. void setException(IException * e);
  44. bool hasException() const { return exception != nullptr; }
  45. protected:
  46. CTask * next = nullptr;
  47. CTask * prev = nullptr;
  48. std::atomic<unsigned> numPredecessors;
  49. std::atomic<IException *> exception{nullptr};
  50. };
  51. //---------------------------------------------------------------------------------------------------------------------
  52. interface ITaskScheduler : public IInterface
  53. {
  54. public:
  55. virtual void enqueueOwnedTask(CTask * ownedTask) = 0;
  56. virtual unsigned numProcessors() const = 0;
  57. };
  58. // Functions to provide schedulers for tasks with different characteristics.
  59. // queryTaskScheduler()
  60. // - for tasks that should be non-blocking and reasonably fine-grained. Number of active tasks never exceeds the number of cores.
  61. // queryIOTaskScheduler()
  62. // - for tasks that could be blocked by io, but not for long periods. Number of active tasks may be higher than number of cores.
  63. extern jlib_decl ITaskScheduler & queryTaskScheduler();
  64. extern jlib_decl ITaskScheduler & queryIOTaskScheduler();
  65. // Future - a scheduler for periodic tasks might be useful
  66. //---------------------------------------------------------------------------------------------------------------------
  67. //MORE: This can probably be private within the cpp file (and enqueue can become non-virtual).
  68. class jlib_decl ATaskProcessor : public Thread
  69. {
  70. public:
  71. virtual void enqueueOwnedChildTask(CTask * ownedTask) = 0;
  72. };
  73. extern jlib_decl ATaskProcessor * queryCurrentTaskProcessor();
  74. //---------------------------------------------------------------------------------------------------------------------
  75. extern jlib_decl void notifyPredDone(CTask * successor);
  76. extern jlib_decl void notifyPredDone(Owned<CTask> && successor);
  77. extern jlib_decl void enqueueOwnedTask(ITaskScheduler & scheduler, CTask * ownedTask);
  78. //---------------------------------------------------------------------------------------------------------------------
  79. // Helper task implementations
  80. //---------------------------------------------------------------------------------------------------------------------
  81. // A task with a successor, which automatically manages the predecessor count for the successor task
  82. // return checkNextTask() from the execute method of the task when it is complete.
  83. class jlib_decl CPredecessorTask : public CTask
  84. {
  85. public:
  86. CPredecessorTask(unsigned _numPred, CTask * _successor) : CTask(_numPred), successor(_successor)
  87. {
  88. if (successor)
  89. successor->addPred();
  90. }
  91. CTask * checkNextTask()
  92. {
  93. if (successor)
  94. {
  95. if (successor->notePredDone())
  96. return successor.getClear();
  97. }
  98. return nullptr;
  99. }
  100. protected:
  101. Linked<CTask> successor; // may be cleared once this task is complete
  102. };
  103. //---------------------------------------------------------------------------------------------------------------------
  104. // A helpful utility class which can be used as a successor for other tasks, and will signal a semaphore once all
  105. // the preceeding tasks have completed. Allows a sort or similar with nested tasks to wait until all work is complete.
  106. // NB: Always allocate this on the heap, otherwise it can go out of scope before execute() returns causing chaos!
  107. class jlib_decl CCompletionTask final : public CTask
  108. {
  109. public:
  110. CCompletionTask(unsigned _numPred, ITaskScheduler & _scheduler) : CTask(_numPred), scheduler(_scheduler) {}
  111. ~CCompletionTask() { ::Release(exception.load()); }
  112. virtual CTask * execute() override
  113. {
  114. sem.signal();
  115. return nullptr;
  116. }
  117. // Execute a function as a child task - decAndWait() will wait for completion
  118. void spawn(std::function<void ()> func);
  119. //Called when main thread has completed - decrements the predecessor count, and waits for completion
  120. void decAndWait();
  121. protected:
  122. ITaskScheduler & scheduler;
  123. Semaphore sem;
  124. };
  125. // A class used by CCompletionTask to implement spawn
  126. class jlib_decl CFunctionTask final : public CPredecessorTask
  127. {
  128. public:
  129. CFunctionTask(std::function<void ()> _func, CTask * _successor);
  130. virtual CTask * execute() override;
  131. protected:
  132. std::function<void ()> func;
  133. };
  134. #endif