jsort2.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #include "jtask.hpp"
  26. #ifdef _DEBUG
  27. // #define PARANOID
  28. //#define TESTPARSORT
  29. //#define MCMERGESTATS
  30. #endif
  31. //#define PARANOID_PARTITION
  32. //#define TRACE_PARTITION
  33. #define PARALLEL_GRANULARITY 512 // Worth creating more tasks up to this point, should really base on number of threads and recursion depth
  34. #define PARALLEL_THRESHOLD 8096 // Threshold for it being worth sorting in parallel
  35. typedef void * ELEMENT;
  36. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  37. #define VECTOR _VECTOR
  38. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  39. #define SWAP swap
  40. static bool sortParallel()
  41. {
  42. #ifdef TESTPARSORT
  43. return true; // to test
  44. #else
  45. unsigned numCPUs = getAffinityCpus();
  46. return (numCPUs>1);
  47. #endif
  48. }
  49. //---------------------------------------------------------------------------
  50. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  51. #define MED3(a,b,c) med3ic(a,b,c,compare)
  52. #define RECURSE(a,b) qsortvec(a, b, compare)
  53. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  54. {
  55. return CMP(a, b) < 0 ?
  56. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  57. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  58. }
  59. class cTaskQSortBase
  60. {
  61. friend class CSubSortTask;
  62. ITaskScheduler & taskScheduler;
  63. Owned<CCompletionTask> finished;
  64. class CSubSortTask : public CTask
  65. {
  66. public:
  67. CSubSortTask(cTaskQSortBase * _parent, unsigned _start, unsigned _num) :
  68. CTask(0), parent(_parent), start(_start), num(_num)
  69. {
  70. }
  71. virtual CTask * execute() override
  72. {
  73. //MORE: Does this need a memory fence to ensure that writes from other threads are updated in the cache?
  74. parent->doSubSort(start, num);
  75. return checkNextTask();
  76. }
  77. protected:
  78. cTaskQSortBase * parent;
  79. unsigned start;
  80. unsigned num;
  81. };
  82. public:
  83. cTaskQSortBase() : taskScheduler(queryTaskScheduler()), finished(new CCompletionTask(1, queryTaskScheduler()))
  84. {
  85. }
  86. void sort(unsigned n)
  87. {
  88. enqueueSort(0, n);
  89. finished->decAndWait();
  90. }
  91. private:
  92. //MORE: Not really sure what this should do...
  93. void abort()
  94. {
  95. notifyPredDone(finished);
  96. }
  97. void doSubSort(unsigned s, unsigned n)
  98. {
  99. while (n > PARALLEL_GRANULARITY)
  100. {
  101. unsigned r1;
  102. unsigned r2;
  103. partition(s, n, r1, r2);
  104. unsigned n2 = n+s-r2;
  105. if (r1==s) {
  106. n = n2;
  107. s = r2;
  108. }
  109. else {
  110. if (n2!=0)
  111. enqueueSort(r2, n2);
  112. n = r1-s;
  113. }
  114. }
  115. serialsort(s,n);
  116. notifyPredDone(finished);
  117. }
  118. void enqueueSort(unsigned from, unsigned num)
  119. {
  120. CSubSortTask * task = new CSubSortTask(this, from, num);
  121. finished->addPred();
  122. enqueueOwnedTask(taskScheduler, task);
  123. }
  124. public:
  125. virtual void serialsort(unsigned from, unsigned len)=0;
  126. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  127. };
  128. #define DOPARTITION \
  129. VECTOR a = array+s; \
  130. VECTOR pm = a + (n / 2); \
  131. VECTOR pl = a; \
  132. VECTOR pn = a + (n - 1) ; \
  133. if (n > 40) { \
  134. unsigned d = (n / 8); \
  135. pl = MED3(pl, pl + d, pl + 2 * d); \
  136. pm = MED3(pm - d, pm, pm + d); \
  137. pn = MED3(pn - 2 * d, pn - d, pn); \
  138. } \
  139. pm = MED3(pl, pm, pn); \
  140. SWAP(a, pm); \
  141. VECTOR pa = a + 1; \
  142. VECTOR pb = pa; \
  143. VECTOR pc = a + (n - 1); \
  144. VECTOR pd = pc; \
  145. int r; \
  146. for (;;) { \
  147. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  148. if (r == 0) { \
  149. SWAP(pa, pb); \
  150. pa++; \
  151. } \
  152. pb++; \
  153. } \
  154. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  155. if (r == 0) { \
  156. SWAP(pc, pd); \
  157. pd--; \
  158. } \
  159. pc--; \
  160. } \
  161. if (pb > pc) \
  162. break; \
  163. SWAP(pb, pc); \
  164. pb++; \
  165. pc--; \
  166. } \
  167. pn = a + n; \
  168. r = MIN(pa - a, pb - pa); \
  169. VECTOR v1 = a; \
  170. VECTOR v2 = pb-r; \
  171. while (r) { \
  172. SWAP(v1,v2); v1++; v2++; r--; \
  173. }; \
  174. r = MIN(pd - pc, pn - pd - 1); \
  175. v1 = pb; \
  176. v2 = pn-r; \
  177. while (r) { \
  178. SWAP(v1,v2); v1++; v2++; r--; \
  179. }; \
  180. r1 = (pb-pa)+s; \
  181. r2 = n-(pd-pc)+s;
  182. class cTaskQSort: public cTaskQSortBase
  183. {
  184. VECTOR array;
  185. const ICompare &compare;
  186. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  187. {
  188. DOPARTITION
  189. }
  190. void serialsort(unsigned from, unsigned len)
  191. {
  192. qsortvec(array+from,len,compare);
  193. }
  194. public:
  195. cTaskQSort(VECTOR _a,const ICompare &_compare)
  196. : compare(_compare)
  197. {
  198. array = _a;
  199. }
  200. };
  201. void taskqsortvec(void **a, size32_t n, const ICompare & compare)
  202. {
  203. if ((n<=PARALLEL_THRESHOLD)||!sortParallel()) {
  204. qsortvec(a,n,compare);
  205. return;
  206. }
  207. cTaskQSort sorter(a,compare);
  208. sorter.sort(n);
  209. #ifdef TESTPARSORT
  210. for (unsigned i=1;i<n;i++)
  211. if (compare.docompare(a[i-1],a[i])>0)
  212. IERRLOG("taskqsortvec failed %d",i);
  213. #endif
  214. }
  215. #undef CMP
  216. #undef MED3
  217. #undef RECURSE
  218. //---------------------------------------------------------------------------
  219. #undef VECTOR
  220. #undef SWAP
  221. typedef void *** _IVECTOR;
  222. #define VECTOR _IVECTOR
  223. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  224. #define SWAP swapind
  225. #define CMP(a,b) cmpicindstable(a,b,compare)
  226. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  227. {
  228. int ret = compare.docompare(**a,**b);
  229. if (ret==0)
  230. {
  231. if (*a>*b)
  232. ret = 1;
  233. else if (*a<*b)
  234. ret = -1;
  235. }
  236. return ret;
  237. }
  238. #define MED3(a,b,c) med3ic(a,b,c,compare)
  239. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  240. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  241. {
  242. return CMP(a, b) < 0 ?
  243. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  244. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  245. }
  246. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  247. #include "jsort2.inc"
  248. class cTaskQSortStable: public cTaskQSortBase
  249. {
  250. VECTOR array;
  251. const ICompare &compare;
  252. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  253. {
  254. DOPARTITION
  255. }
  256. void serialsort(unsigned from, unsigned len)
  257. {
  258. doqsortvecstable(array+from,len,compare);
  259. }
  260. public:
  261. cTaskQSortStable(VECTOR _a,const ICompare &_compare)
  262. : cTaskQSortBase(),compare(_compare)
  263. {
  264. array = _a;
  265. }
  266. };
  267. #undef CMP
  268. #undef CMP1
  269. #undef MED3
  270. #undef RECURSE
  271. #undef VECTOR
  272. static void taskqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index)
  273. {
  274. for(unsigned i=0; i<n; ++i)
  275. index[i] = rows+i;
  276. if ((n<=PARALLEL_THRESHOLD)||!sortParallel()) {
  277. doqsortvecstable(index,n,compare);
  278. return;
  279. }
  280. cTaskQSortStable sorter(index,compare);
  281. sorter.sort(n);
  282. }
  283. void taskqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  284. {
  285. memcpy(temp, rows, n * sizeof(void*));
  286. taskqsortvecstable(temp, n, compare, (void * * *)rows);
  287. //I'm sure this violates the aliasing rules...
  288. void * * * rowsAsIndex = (void * * *)rows;
  289. for(size32_t i=0; i<n; ++i)
  290. rows[i] = *rowsAsIndex[i];
  291. }