thorsort.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "thorsort.hpp"
  15. #include "jset.hpp"
  16. #include "jlog.hpp"
  17. #include "errorlist.h"
  18. #include <exception>
  19. #ifdef _USE_TBB
  20. #include "tbb/task.h"
  21. #include "tbb/task_scheduler_init.h"
  22. #include "tbb/parallel_sort.h"
  23. #endif
  24. #ifdef _DEBUG
  25. // #define PARANOID
  26. //#define TESTPARSORT
  27. //#define MCMERGESTATS
  28. #endif
  29. //#define PARANOID_PARTITION
  30. //#define TRACE_PARTITION
  31. #define PARALLEL_GRANULARITY 1024
  32. //---------------------------------------------------------------------------
  33. // tbb versions of the quick sort to provide a useful base comparison
  34. class TbbCompareWrapper
  35. {
  36. public:
  37. TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
  38. bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
  39. const ICompare & compare;
  40. };
  41. class TbbCompareIndirectWrapper
  42. {
  43. public:
  44. TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
  45. bool operator()(void * * const & l, void * * const & r) const
  46. {
  47. int ret = compare.docompare(*l,*r);
  48. if (ret==0)
  49. {
  50. if (l < r)
  51. return true;
  52. else
  53. return false;
  54. }
  55. return (ret < 0);
  56. }
  57. const ICompare & compare;
  58. };
  59. void tbbqsortvec(void **a, size_t n, const ICompare & compare)
  60. {
  61. try
  62. {
  63. #ifdef _USE_TBB
  64. TbbCompareWrapper tbbcompare(compare);
  65. tbb::parallel_sort(a, a + n, tbbcompare);
  66. #else
  67. throwUnexpectedX("TBB quicksort not available");
  68. #endif
  69. }
  70. catch (const std::exception & e)
  71. {
  72. throw makeStringExceptionV(ERRORID_UNKNOWN, "TBB exception: %s", e.what());
  73. }
  74. }
  75. void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
  76. {
  77. try
  78. {
  79. #ifdef _USE_TBB
  80. void * * * rowsAsIndex = (void * * *)rows;
  81. memcpy_iflen(temp, rows, n * sizeof(void*));
  82. for(unsigned i=0; i<n; ++i)
  83. rowsAsIndex[i] = temp+i;
  84. TbbCompareIndirectWrapper tbbcompare(compare);
  85. tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
  86. //I'm sure this violates the aliasing rules...
  87. for(unsigned i=0; i<n; ++i)
  88. rows[i] = *rowsAsIndex[i];
  89. #else
  90. throwUnexpectedX("TBB quicksort not available");
  91. #endif
  92. }
  93. catch (const std::exception & e)
  94. {
  95. throw makeStringExceptionV(ERRORID_UNKNOWN, "TBB exception: %s", e.what());
  96. }
  97. }
  98. //-----------------------------------------------------------------------------------------------------------------------------
  99. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
  100. {
  101. void * * tgt = result;
  102. for (;;)
  103. {
  104. if (compare.docompare(*ret1, *ret2) <= 0)
  105. {
  106. *tgt++ = *ret1++;
  107. if (--n1 == 0)
  108. {
  109. //There must be at least one row in the right partition - copy any that remain
  110. do
  111. {
  112. *tgt++ = *ret2++;
  113. } while (--n2);
  114. return result;
  115. }
  116. }
  117. else
  118. {
  119. *tgt++ = *ret2++;
  120. if (--n2 == 0)
  121. {
  122. //There must be at least one row in the left partition - copy any that remain
  123. do
  124. {
  125. *tgt++ = *ret1++;
  126. } while (--n1);
  127. return result;
  128. }
  129. }
  130. }
  131. }
  132. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  133. {
  134. void * * tgt = result;
  135. while (n--)
  136. {
  137. if (compare.docompare(*ret1, *ret2) <= 0)
  138. {
  139. *tgt++ = *ret1++;
  140. if (--n1 == 0)
  141. {
  142. while (n--)
  143. {
  144. *tgt++ = *ret2++;
  145. }
  146. return result;
  147. }
  148. }
  149. else
  150. {
  151. *tgt++ = *ret2++;
  152. if (--n2 == 0)
  153. {
  154. while (n--)
  155. {
  156. *tgt++ = *ret1++;
  157. }
  158. return result;
  159. }
  160. }
  161. }
  162. return result;
  163. }
  164. inline void clonePartition(void * * result, size_t n, void * * src)
  165. {
  166. void * * tgt = result;
  167. while (n--)
  168. *tgt++ = *src++;
  169. }
  170. inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  171. {
  172. void * * tgt = result+n1+n2-1;
  173. ret1 += (n1-1);
  174. ret2 += (n2-1);
  175. while (n--)
  176. {
  177. if (compare.docompare(*ret1, *ret2) <= 0)
  178. {
  179. *tgt-- = *ret2--;
  180. if (--n2 == 0)
  181. {
  182. //There must be at least one row in the left partition - copy any that remain
  183. while (n--)
  184. {
  185. *tgt-- = *ret1--;
  186. }
  187. return result;
  188. }
  189. }
  190. else
  191. {
  192. *tgt-- = *ret1--;
  193. if (--n1 == 0)
  194. {
  195. while (n--)
  196. {
  197. *tgt-- = *ret2--;
  198. }
  199. return result;
  200. }
  201. }
  202. }
  203. return result;
  204. }
  205. static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
  206. {
  207. void * * result = (depth & 1) ? tmp : rows;
  208. //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
  209. //and in performance testing it executed marginally more quickly
  210. if (n <= 2)
  211. {
  212. //Check for n == 1, but compare against 2 to avoid another comparison
  213. if (n < 2)
  214. {
  215. if (result != rows)
  216. result[0] = rows[0];
  217. }
  218. else
  219. {
  220. void * left = rows[0];
  221. void * right = rows[1];
  222. if (compare.docompare(left, right) <= 0)
  223. {
  224. result[0] = left;
  225. result[1] = right;
  226. }
  227. else
  228. {
  229. result[0] = right;
  230. result[1] = left;
  231. }
  232. }
  233. return result;
  234. }
  235. size_t n1 = (n+1)/2;
  236. size_t n2 = n - n1;
  237. void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
  238. void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
  239. dbgassertex(ret2 == ret1 + n1);
  240. dbgassertex(ret2 != result);
  241. return mergePartitions(compare, result, n1, ret1, n2, ret2);
  242. }
  243. void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
  244. {
  245. if (n <= 1)
  246. return;
  247. mergeSort(rows, n, compare, temp, 0);
  248. }
  249. //=========================================================================
  250. #ifdef _USE_TBB
  251. static const unsigned numPartitionSamples = 3;
  252. //These constants are probably architecture and number of core dependent
  253. static const size_t singleThreadedMSortThreshold = 2000;
  254. static const size_t multiThreadedBlockThreshold = 64; // must be at least 2!
  255. using tbb::task;
  256. class TbbParallelMergeSorter
  257. {
  258. class SplitTask : public tbb::task
  259. {
  260. public:
  261. SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
  262. {
  263. }
  264. virtual task * execute()
  265. {
  266. if (next1->decrement_ref_count() == 0)
  267. spawn(*next1);
  268. if (next2->decrement_ref_count() == 0)
  269. return next2;
  270. return NULL;
  271. }
  272. protected:
  273. task * next1;
  274. task * next2;
  275. };
  276. class BisectTask : public tbb::task
  277. {
  278. public:
  279. BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
  280. : sorter(_sorter), rows(_rows), temp(_temp), next(_next), n(_n), depth(_depth)
  281. {
  282. }
  283. virtual task * execute()
  284. {
  285. for (;;)
  286. {
  287. //On entry next is assumed to be used once by this function
  288. if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
  289. {
  290. //Create a new task rather than calling sort directly, so that the successor is set up correctly
  291. //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
  292. task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
  293. return sort;
  294. }
  295. void * * result = (depth & 1) ? temp : rows;
  296. void * * src = (depth & 1) ? rows : temp;
  297. size_t n1 = (n+1)/2;
  298. size_t n2 = n-n1;
  299. task * mergeTask;
  300. if (depth < sorter.parallelMergeDepth)
  301. {
  302. unsigned partitions = sorter.numPartitionCores() >> depth;
  303. if (partitions > 1)
  304. {
  305. PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
  306. for (unsigned i=0; i < partitions; i++)
  307. {
  308. MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  309. mergeFwdTask->set_ref_count(1);
  310. MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  311. mergeRevTask->set_ref_count(1);
  312. splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
  313. }
  314. next->decrement_ref_count();
  315. mergeTask = splitTask;
  316. }
  317. else
  318. {
  319. task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
  320. mergeFwdTask->set_ref_count(1);
  321. task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
  322. mergeRevTask->set_ref_count(1);
  323. mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
  324. }
  325. }
  326. else
  327. {
  328. mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
  329. }
  330. mergeTask->set_ref_count(2);
  331. task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
  332. spawn(*bisectRightTask);
  333. //recurse directly on the left side rather than creating a new task
  334. n = n1;
  335. depth = depth+1;
  336. next = mergeTask;
  337. }
  338. }
  339. protected:
  340. TbbParallelMergeSorter & sorter;
  341. void ** rows;
  342. void ** temp;
  343. task * next;
  344. size_t n;
  345. unsigned depth;
  346. };
  347. class SubSortTask : public tbb::task
  348. {
  349. public:
  350. SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
  351. : sorter(_sorter), rows(_rows), temp(_temp), n(_n), depth(_depth)
  352. {
  353. }
  354. virtual task * execute()
  355. {
  356. mergeSort(rows, n, sorter.compare, temp, depth);
  357. return NULL;
  358. }
  359. protected:
  360. TbbParallelMergeSorter & sorter;
  361. void ** rows;
  362. void ** temp;
  363. size_t n;
  364. unsigned depth;
  365. };
  366. class MergeTask : public tbb::task
  367. {
  368. public:
  369. MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  370. : compare(_compare),result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
  371. {
  372. }
  373. virtual task * execute()
  374. {
  375. //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
  376. if (n1 == 0)
  377. {
  378. assertex(n <= n2);
  379. clonePartition(result, n, src2);
  380. }
  381. else if (n2 == 0)
  382. {
  383. assertex(n <= n1);
  384. clonePartition(result, n, src1);
  385. }
  386. else
  387. mergePartitions(compare, result, n1, src1, n2, src2, n);
  388. return NULL;
  389. }
  390. void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
  391. {
  392. src1 += deltaLeft;
  393. n1 = numLeft;
  394. src2 += deltaRight;
  395. n2 = numRight;
  396. result += (deltaLeft + deltaRight);
  397. n = num;
  398. }
  399. protected:
  400. const ICompare & compare;
  401. void * * result;
  402. void * * src1;
  403. void * * src2;
  404. size_t n1;
  405. size_t n2;
  406. size_t n;
  407. };
  408. class MergeRevTask : public MergeTask
  409. {
  410. public:
  411. MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  412. : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
  413. {
  414. }
  415. virtual task * execute()
  416. {
  417. if (n1 == 0)
  418. {
  419. assertex(n <= n2);
  420. //This is a reverse merge, so copy n from the end of the input
  421. unsigned delta = n2 - n;
  422. clonePartition(result + delta, n, src2 + delta);
  423. }
  424. else if (n2 == 0)
  425. {
  426. assertex(n <= n1);
  427. unsigned delta = n1 - n;
  428. clonePartition(result + delta, n, src1 + delta);
  429. }
  430. else
  431. mergePartitionsRev(compare, result, n1, src1, n2, src2, n);
  432. return NULL;
  433. }
  434. };
  435. class PartitionSplitTask : public tbb::task
  436. {
  437. public:
  438. PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
  439. : compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
  440. {
  441. //These could be local variables in calculatePartitions(), but placed here to simplify cleanup. (Should consider using alloca)
  442. posLeft = new size_t[numPartitions+1];
  443. posRight = new size_t[numPartitions+1];
  444. tasks = new MergeTask *[numPartitions*2];
  445. for (unsigned i=0; i < numPartitions*2; i++)
  446. tasks[i] = NULL;
  447. }
  448. ~PartitionSplitTask()
  449. {
  450. delete [] posLeft;
  451. delete [] posRight;
  452. delete [] tasks;
  453. }
  454. void calculatePartitions()
  455. {
  456. #ifdef PARANOID_PARTITION
  457. {
  458. for (unsigned ix=1; ix<n1; ix++)
  459. if (compare.docompare(src1[ix-1], src1[ix]) > 0)
  460. DBGLOG("Failure left@%u", ix);
  461. }
  462. {
  463. for (unsigned ix=1; ix<n2; ix++)
  464. if (compare.docompare(src2[ix-1], src2[ix]) > 0)
  465. DBGLOG("Failure right@%u", ix);
  466. }
  467. #endif
  468. //If dividing into P parts, select S*P-1 even points from each side.
  469. unsigned numSamples = numPartitionSamples*numPartitions-1;
  470. QuantilePositionIterator iterLeft(n1, numSamples+1, false);
  471. QuantilePositionIterator iterRight(n2, numSamples+1, false);
  472. iterLeft.first();
  473. iterRight.first();
  474. size_t prevLeft = 0;
  475. size_t prevRight =0;
  476. posLeft[0] = 0;
  477. posRight[0] = 0;
  478. //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
  479. //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
  480. //=> pick samples [0, 2*numSamples, 4*numSamples ...]
  481. //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
  482. for (unsigned part = 1; part < numPartitions; part++)
  483. {
  484. unsigned numToSkip = numPartitionSamples*2;
  485. if (part == 1)
  486. numToSkip++;
  487. for (unsigned skip=numToSkip; skip-- != 0; )
  488. {
  489. size_t leftPos = iterLeft.get();
  490. size_t rightPos = iterRight.get();
  491. int c;
  492. if (leftPos == n1)
  493. c = +1;
  494. else if (rightPos == n2)
  495. c = -1;
  496. else
  497. c = compare.docompare(src1[leftPos], src2[rightPos]);
  498. if (skip == 0)
  499. {
  500. if (c <= 0)
  501. {
  502. //value in left is smallest. Find the position of the first value >= the left value
  503. //do not include it since there may be more left matches to merge first
  504. posLeft[part] = leftPos;
  505. size_t matchRight = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
  506. posRight[part] = matchRight;
  507. prevRight = matchRight; // potentially reduce the search range next time
  508. }
  509. else
  510. {
  511. size_t matchLeft = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
  512. posLeft[part] = matchLeft;
  513. posRight[part] = rightPos;
  514. prevLeft = matchLeft; // potentially reduce the search range next time
  515. }
  516. }
  517. if (c <= 0)
  518. {
  519. iterLeft.next();
  520. prevLeft = leftPos;
  521. }
  522. else
  523. {
  524. iterRight.next();
  525. prevRight = rightPos;
  526. }
  527. }
  528. }
  529. posLeft[numPartitions] = n1;
  530. posRight[numPartitions] = n2;
  531. #ifdef TRACE_PARTITION
  532. DBGLOG("%d,%d -> {", (unsigned)n1, (unsigned)n2);
  533. #endif
  534. for (unsigned i= 0; i < numPartitions; i++)
  535. {
  536. size_t start = posLeft[i] + posRight[i];
  537. size_t end = posLeft[i+1] + posRight[i+1];
  538. size_t num = end - start;
  539. size_t numFwd = (num+1)/2;
  540. #ifdef TRACE_PARTITION
  541. DBGLOG(" ([%d..%d],[%d..%d] %d,%d = %d)",
  542. (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
  543. (unsigned)start, (unsigned)end, (unsigned)num);
  544. #endif
  545. MergeTask & mergeFwdTask = *tasks[i*2];
  546. MergeTask & mergeRevTask = *tasks[i*2+1];
  547. mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  548. posRight[i], posRight[i+1]-posRight[i],
  549. numFwd);
  550. mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  551. posRight[i], posRight[i+1]-posRight[i],
  552. num-numFwd);
  553. }
  554. }
  555. virtual task * execute()
  556. {
  557. calculatePartitions();
  558. for (unsigned i=0; i < numPartitions*2; i++)
  559. {
  560. if (tasks[i]->decrement_ref_count() == 0)
  561. spawn(*tasks[i]);
  562. }
  563. return NULL;
  564. }
  565. void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
  566. {
  567. tasks[i*2] = fwd;
  568. tasks[i*2+1] = rev;
  569. }
  570. protected:
  571. size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
  572. {
  573. if (low == high)
  574. return low;
  575. while (high - low > 1)
  576. {
  577. size_t mid = low + (high - low) / 2;
  578. if (compare.docompare(rows[mid], seek) < 0)
  579. low = mid;
  580. else
  581. high = mid;
  582. }
  583. if (compare.docompare(rows[low], seek) < 0)
  584. return low+1;
  585. return low;
  586. }
  587. size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
  588. {
  589. if (low == high)
  590. return low;
  591. while (high - low > 1)
  592. {
  593. size_t mid = low + (high - low) / 2;
  594. if (compare.docompare(rows[mid], seek) <= 0)
  595. low = mid;
  596. else
  597. high = mid;
  598. }
  599. if (compare.docompare(rows[low], seek) <= 0)
  600. return low+1;
  601. return low;
  602. }
  603. protected:
  604. const ICompare & compare;
  605. unsigned numPartitions;
  606. size_t n1;
  607. size_t n2;
  608. void * * src1;
  609. void * * src2;
  610. size_t * posLeft;
  611. size_t * posRight;
  612. MergeTask * * tasks;
  613. };
  614. public:
  615. TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
  616. {
  617. //The following constants control the number of iterations to be performed in parallel.
  618. //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
  619. //The following constants should possibly be tuned on each platform. The following gave a good balance on a 2x8way xeon
  620. const unsigned extraBisectDepth = 3;
  621. const unsigned extraParallelMergeDepth = 3;
  622. unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
  623. unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
  624. assertex(numCpus <= (1U << ln2NumCpus));
  625. //Merge in parallel once it is likely to be beneficial
  626. parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
  627. //Aim to execute in parallel until the width is 8*the maximum number of parallel task
  628. singleThreadDepth = ln2NumCpus + extraBisectDepth;
  629. partitionCores = numCpus / 2;
  630. }
  631. unsigned numPartitionCores() const { return partitionCores; }
  632. void sortRoot(void ** rows, size_t n, void ** temp)
  633. {
  634. task * end = new (task::allocate_root()) tbb::empty_task();
  635. end->set_ref_count(1+1);
  636. task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
  637. end->spawn(*task);
  638. end->wait_for_all();
  639. end->destroy(*end);
  640. }
  641. public:
  642. const ICompare & compare;
  643. unsigned singleThreadDepth;
  644. unsigned parallelMergeDepth;
  645. unsigned partitionCores;
  646. void * * baseRows;
  647. };
  648. //-------------------------------------------------------------------------------------------------------------------
  649. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  650. {
  651. if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
  652. {
  653. msortvecstableinplace(rows, n, compare, temp);
  654. return;
  655. }
  656. try
  657. {
  658. TbbParallelMergeSorter sorter(rows, compare);
  659. sorter.sortRoot(rows, n, temp);
  660. }
  661. catch (const std::exception & e)
  662. {
  663. throw makeStringExceptionV(ERRORID_UNKNOWN, "TBB exception: %s", e.what());
  664. }
  665. }
  666. #else
  667. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  668. {
  669. parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
  670. }
  671. #endif