jsort.cpp 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _USE_TBB
  26. #include "tbb/task.h"
  27. #include "tbb/task_scheduler_init.h"
  28. #include "tbb/parallel_sort.h"
  29. #endif
  30. #ifdef _DEBUG
  31. // #define PARANOID
  32. // #define DOUBLE_COMPARE
  33. //#define TESTPARSORT
  34. //#define MCMERGESTATS
  35. #endif
  36. //#define PARANOID_PARTITION
  37. //#define TRACE_PARTITION
  38. #define PARALLEL_GRANULARITY 1024
  39. static const unsigned numPartitionSamples = 3;
  40. static bool sortParallel(unsigned &numcpus)
  41. {
  42. static unsigned numCPUs = 0;
  43. if (numCPUs==0) {
  44. numCPUs = getAffinityCpus();
  45. }
  46. if ((numcpus==0)||(numcpus>numCPUs))
  47. numcpus = numCPUs;
  48. #ifdef TESTPARSORT
  49. numcpus = 2;
  50. return true; // to test
  51. #endif
  52. return (numcpus>1);
  53. }
  54. //define two variants of the same insertion function.
  55. #define COMPARE(search,position) compare(search,position)
  56. #define INSERT(position,search) memmove(position,search, width)
  57. void * binary_add(const void *newitem, const void *base,
  58. size32_t nmemb,
  59. size32_t width,
  60. int ( *compare)(const void *_e1, const void *_e2),
  61. bool * ItemAdded)
  62. {
  63. #include "jsort.inc"
  64. }
  65. #undef COMPARE
  66. #undef INSERT
  67. //---------------------------------------------------------------------------
  68. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  69. #define INSERT(position,search) *(const void * *)(position) = search
  70. #define NEVER_ADD
  71. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  72. size32_t nmemb,
  73. sortCompareFunction compare,
  74. bool * ItemAdded)
  75. {
  76. #define width sizeof(void*)
  77. #include "jsort.inc"
  78. #undef width
  79. }
  80. #undef NEVER_ADD
  81. #undef INSERT
  82. #undef COMPARE
  83. //---------------------------------------------------------------------------
  84. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  85. #define INSERT(position,search) *(const void * *)(position) = search
  86. #define NEVER_ADD
  87. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  88. size32_t nmemb,
  89. ICompare & compare,
  90. bool * ItemAdded)
  91. {
  92. #define width sizeof(void*)
  93. #include "jsort.inc"
  94. #undef width
  95. }
  96. #undef NEVER_ADD
  97. #undef INSERT
  98. #undef COMPARE
  99. //---------------------------------------------------------------------------
  100. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  101. #define INSERT(position,search) *(const void * *)(position) = search
  102. #define ALWAYS_ADD
  103. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  104. size32_t nmemb,
  105. sortCompareFunction compare)
  106. {
  107. #define width sizeof(void*)
  108. #include "jsort.inc"
  109. #undef width
  110. }
  111. #undef ALWAYS_ADD
  112. #undef INSERT
  113. #undef COMPARE
  114. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  115. #define INSERT(position,search) *(const void * *)(position) = search
  116. #define ALWAYS_ADD
  117. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  118. size32_t nmemb,
  119. ICompare const & compare)
  120. {
  121. #define width sizeof(void*)
  122. #include "jsort.inc"
  123. #undef width
  124. }
  125. #undef ALWAYS_ADD
  126. #undef INSERT
  127. #undef COMPARE
  128. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  129. #define INSERT(position,search) *(const void * *)(position) = search
  130. #define ALWAYS_ADD
  131. #define SEEK_LAST_MATCH
  132. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  133. size32_t nmemb,
  134. sortCompareFunction compare)
  135. {
  136. #define width sizeof(void*)
  137. #include "jsort.inc"
  138. #undef width
  139. }
  140. #undef SEEK_LAST_MATCH
  141. #undef ALWAYS_ADD
  142. #undef INSERT
  143. #undef COMPARE
  144. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  145. #define INSERT(position,search) *(const void * *)(position) = search
  146. #define ALWAYS_ADD
  147. #define SEEK_LAST_MATCH
  148. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  149. size32_t nmemb,
  150. ICompare const & compare)
  151. {
  152. #define width sizeof(void*)
  153. #include "jsort.inc"
  154. #undef width
  155. }
  156. #undef SEEK_LAST_MATCH
  157. #undef ALWAYS_ADD
  158. #undef INSERT
  159. #undef COMPARE
  160. //=========================================================================
  161. // optimized quicksort for array of pointers to fixed size objects
  162. typedef void * ELEMENT;
  163. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  164. #define VECTOR _VECTOR
  165. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  166. #define SWAP swap
  167. #define CMP(a,b) memcmp(*(a),*(b),es)
  168. #define MED3(a,b,c) med3a(a,b,c,es)
  169. #define RECURSE(a,b) qsortvec(a, b, es)
  170. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  171. {
  172. return CMP(a, b) < 0 ?
  173. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  174. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  175. }
  176. void qsortvec(void **a, size32_t n, size32_t es)
  177. #include "jsort2.inc"
  178. #undef CMP
  179. #undef MED3
  180. #undef RECURSE
  181. //---------------------------------------------------------------------------
  182. #define CMP(a,b) (compare(*(a),*(b)))
  183. #define MED3(a,b,c) med3c(a,b,c,compare)
  184. #define RECURSE(a,b) qsortvec(a, b, compare)
  185. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  186. {
  187. return CMP(a, b) < 0 ?
  188. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  189. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  190. }
  191. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  192. #include "jsort2.inc"
  193. #undef CMP
  194. #undef MED3
  195. #undef RECURSE
  196. //---------------------------------------------------------------------------
  197. // tbb versions of the quick sort to provide a useful base comparison
  198. class TbbCompareWrapper
  199. {
  200. public:
  201. TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
  202. bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
  203. const ICompare & compare;
  204. };
  205. class TbbCompareIndirectWrapper
  206. {
  207. public:
  208. TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
  209. bool operator()(void * * const & l, void * * const & r) const
  210. {
  211. int ret = compare.docompare(*l,*r);
  212. if (ret==0)
  213. {
  214. if (l < r)
  215. return true;
  216. else
  217. return false;
  218. }
  219. return (ret < 0);
  220. }
  221. const ICompare & compare;
  222. };
  223. void tbbqsortvec(void **a, size_t n, const ICompare & compare)
  224. {
  225. #ifdef _USE_TBB
  226. TbbCompareWrapper tbbcompare(compare);
  227. tbb::parallel_sort(a, a+n, tbbcompare);
  228. #else
  229. throwUnexpectedX("TBB quicksort not available");
  230. #endif
  231. }
  232. void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
  233. {
  234. #ifdef _USE_TBB
  235. void * * * rowsAsIndex = (void * * *)rows;
  236. memcpy(temp, rows, n * sizeof(void*));
  237. for(unsigned i=0; i<n; ++i)
  238. rowsAsIndex[i] = temp+i;
  239. TbbCompareIndirectWrapper tbbcompare(compare);
  240. tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
  241. //I'm sure this violates the aliasing rules...
  242. for(unsigned i=0; i<n; ++i)
  243. rows[i] = *rowsAsIndex[i];
  244. #else
  245. throwUnexpectedX("TBB quicksort not available");
  246. #endif
  247. }
  248. //---------------------------------------------------------------------------
  249. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  250. #define MED3(a,b,c) med3ic(a,b,c,compare)
  251. #define RECURSE(a,b) qsortvec(a, b, compare)
  252. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  253. {
  254. return CMP(a, b) < 0 ?
  255. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  256. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  257. }
  258. void qsortvec(void **a, size32_t n, const ICompare & compare)
  259. #include "jsort2.inc"
  260. // Parallel version (only 2 threads currently)
  261. class cParQSortBase
  262. {
  263. struct sJobItem
  264. {
  265. unsigned start;
  266. unsigned num;
  267. };
  268. NonReentrantSpinLock joblock;
  269. QueueOf<sJobItem,false> jobq;
  270. Semaphore jobqsem;
  271. unsigned waiting;
  272. unsigned numsubthreads;
  273. bool done;
  274. class cThread: public Thread
  275. {
  276. cParQSortBase *parent;
  277. public:
  278. cThread(cParQSortBase *_parent)
  279. : Thread("cParQSort")
  280. {
  281. parent = _parent;
  282. }
  283. int run()
  284. {
  285. parent->run();
  286. return 0;
  287. }
  288. } **threads;
  289. bool waitForWork(unsigned &s,unsigned &n)
  290. {
  291. NonReentrantSpinBlock block(joblock);
  292. while (!done) {
  293. sJobItem *qi = jobq.dequeue();
  294. if (qi) {
  295. s = qi->start;
  296. n = qi->num;
  297. delete qi;
  298. return true;
  299. }
  300. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  301. done = true;
  302. jobqsem.signal(waiting);
  303. break;
  304. }
  305. waiting++;
  306. NonReentrantSpinUnblock unblock(joblock);
  307. jobqsem.wait();
  308. }
  309. s = 0; // remove uninitialised variable warnings
  310. n = 0;
  311. return false;
  312. }
  313. public:
  314. cParQSortBase(unsigned _numsubthreads)
  315. {
  316. numsubthreads = _numsubthreads;
  317. done = false;
  318. waiting = 0;
  319. threads = new cThread*[numsubthreads];
  320. for (unsigned i=0;i<numsubthreads;i++)
  321. threads[i] = new cThread(this);
  322. }
  323. ~cParQSortBase()
  324. {
  325. for (unsigned i=0;i<numsubthreads;i++)
  326. threads[i]->Release();
  327. delete [] threads;
  328. }
  329. void start()
  330. {
  331. for (unsigned i=0;i<numsubthreads;i++)
  332. threads[i]->start();
  333. }
  334. void subsort(unsigned s, unsigned n)
  335. {
  336. do {
  337. sJobItem *qi;
  338. while (n>PARALLEL_GRANULARITY) {
  339. unsigned r1;
  340. unsigned r2;
  341. partition(s, n, r1, r2);
  342. unsigned n2 = n+s-r2;
  343. if (r1==s) {
  344. n = n2;
  345. s = r2;
  346. }
  347. else {
  348. if (n2!=0) {
  349. qi = new sJobItem;
  350. qi->num = n2;
  351. qi->start = r2;
  352. NonReentrantSpinBlock block(joblock);
  353. jobq.enqueue(qi);
  354. if (waiting) {
  355. jobqsem.signal(waiting);
  356. waiting = 0;
  357. }
  358. }
  359. n = r1-s;
  360. }
  361. }
  362. serialsort(s,n);
  363. NonReentrantSpinBlock block(joblock);
  364. if (waiting==numsubthreads) { // well we are done so are rest
  365. done = true;
  366. jobqsem.signal(waiting);
  367. break;
  368. }
  369. }
  370. while(waitForWork(s,n));
  371. }
  372. void run()
  373. {
  374. unsigned s;
  375. unsigned n;
  376. if (waitForWork(s,n))
  377. subsort(s,n);
  378. }
  379. void join()
  380. {
  381. for (unsigned i=0;i<numsubthreads;i++)
  382. threads[i]->join();
  383. }
  384. virtual void serialsort(unsigned from, unsigned len)=0;
  385. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  386. };
  387. #define DOPARTITION \
  388. VECTOR a = array+s; \
  389. VECTOR pm = a + (n / 2); \
  390. VECTOR pl = a; \
  391. VECTOR pn = a + (n - 1) ; \
  392. if (n > 40) { \
  393. unsigned d = (n / 8); \
  394. pl = MED3(pl, pl + d, pl + 2 * d); \
  395. pm = MED3(pm - d, pm, pm + d); \
  396. pn = MED3(pn - 2 * d, pn - d, pn); \
  397. } \
  398. pm = MED3(pl, pm, pn); \
  399. SWAP(a, pm); \
  400. VECTOR pa = a + 1; \
  401. VECTOR pb = pa; \
  402. VECTOR pc = a + (n - 1); \
  403. VECTOR pd = pc; \
  404. int r; \
  405. for (;;) { \
  406. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  407. if (r == 0) { \
  408. SWAP(pa, pb); \
  409. pa++; \
  410. } \
  411. pb++; \
  412. } \
  413. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  414. if (r == 0) { \
  415. SWAP(pc, pd); \
  416. pd--; \
  417. } \
  418. pc--; \
  419. } \
  420. if (pb > pc) \
  421. break; \
  422. SWAP(pb, pc); \
  423. pb++; \
  424. pc--; \
  425. } \
  426. pn = a + n; \
  427. r = MIN(pa - a, pb - pa); \
  428. VECTOR v1 = a; \
  429. VECTOR v2 = pb-r; \
  430. while (r) { \
  431. SWAP(v1,v2); v1++; v2++; r--; \
  432. }; \
  433. r = MIN(pd - pc, pn - pd - 1); \
  434. v1 = pb; \
  435. v2 = pn-r; \
  436. while (r) { \
  437. SWAP(v1,v2); v1++; v2++; r--; \
  438. }; \
  439. r1 = (pb-pa)+s; \
  440. r2 = n-(pd-pc)+s;
  441. class cParQSort: public cParQSortBase
  442. {
  443. VECTOR array;
  444. const ICompare &compare;
  445. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  446. {
  447. DOPARTITION
  448. }
  449. void serialsort(unsigned from, unsigned len)
  450. {
  451. qsortvec(array+from,len,compare);
  452. }
  453. public:
  454. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  455. : cParQSortBase(_numsubthreads), compare(_compare)
  456. {
  457. array = _a;
  458. cParQSortBase::start();
  459. }
  460. };
  461. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  462. {
  463. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  464. qsortvec(a,n,compare);
  465. return;
  466. }
  467. cParQSort sorter(a,compare,numcpus-1);
  468. sorter.subsort(0,n);
  469. sorter.join();
  470. #ifdef TESTPARSORT
  471. for (unsigned i=1;i<n;i++)
  472. if (compare.docompare(a[i-1],a[i])>0)
  473. ERRLOG("parqsortvec failed %d",i);
  474. #endif
  475. }
  476. #undef CMP
  477. #undef MED3
  478. #undef RECURSE
  479. //---------------------------------------------------------------------------
  480. #undef VECTOR
  481. #undef SWAP
  482. typedef void *** _IVECTOR;
  483. #define VECTOR _IVECTOR
  484. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  485. #define SWAP swapind
  486. #define CMP(a,b) cmpicindstable(a,b,compare)
  487. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  488. {
  489. int ret = compare.docompare(**a,**b);
  490. if (ret==0)
  491. {
  492. if (*a>*b)
  493. ret = 1;
  494. else if (*a<*b)
  495. ret = -1;
  496. }
  497. return ret;
  498. }
  499. #define MED3(a,b,c) med3ic(a,b,c,compare)
  500. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  501. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  502. {
  503. return CMP(a, b) < 0 ?
  504. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  505. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  506. }
  507. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  508. #include "jsort2.inc"
  509. class cParQSortStable: public cParQSortBase
  510. {
  511. VECTOR array;
  512. const ICompare &compare;
  513. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  514. {
  515. DOPARTITION
  516. }
  517. void serialsort(unsigned from, unsigned len)
  518. {
  519. doqsortvecstable(array+from,len,compare);
  520. }
  521. public:
  522. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  523. : cParQSortBase(_numsubthreads),compare(_compare)
  524. {
  525. array = _a;
  526. cParQSortBase::start();
  527. }
  528. };
  529. #undef CMP
  530. #undef CMP1
  531. #undef MED3
  532. #undef RECURSE
  533. #undef VECTOR
  534. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  535. {
  536. for(unsigned i=0; i<n; ++i)
  537. index[i] = rows+i;
  538. doqsortvecstable(index, n, compare);
  539. }
  540. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  541. {
  542. memcpy(temp, rows, n * sizeof(void*));
  543. qsortvecstable(temp, n, compare, (void * * *)rows);
  544. //I'm sure this violates the aliasing rules...
  545. void * * * rowsAsIndex = (void * * *)rows;
  546. for(unsigned i=0; i<n; ++i)
  547. rows[i] = *rowsAsIndex[i];
  548. }
  549. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  550. {
  551. for(unsigned i=0; i<n; ++i)
  552. index[i] = rows+i;
  553. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  554. doqsortvecstable(index,n,compare);
  555. return;
  556. }
  557. cParQSortStable sorter(index,compare,numcpus-1);
  558. sorter.subsort(0,n);
  559. sorter.join();
  560. }
  561. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  562. {
  563. memcpy(temp, rows, n * sizeof(void*));
  564. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  565. //I'm sure this violates the aliasing rules...
  566. void * * * rowsAsIndex = (void * * *)rows;
  567. for(size32_t i=0; i<n; ++i)
  568. rows[i] = *rowsAsIndex[i];
  569. }
  570. //-----------------------------------------------------------------------------------------------------------------------------
  571. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
  572. {
  573. void * * tgt = result;
  574. loop
  575. {
  576. if (compare.docompare(*ret1, *ret2) <= 0)
  577. {
  578. *tgt++ = *ret1++;
  579. if (--n1 == 0)
  580. {
  581. //There must be at least one row in the right partition - copy any that remain
  582. do
  583. {
  584. *tgt++ = *ret2++;
  585. } while (--n2);
  586. return result;
  587. }
  588. }
  589. else
  590. {
  591. *tgt++ = *ret2++;
  592. if (--n2 == 0)
  593. {
  594. //There must be at least one row in the left partition - copy any that remain
  595. do
  596. {
  597. *tgt++ = *ret1++;
  598. } while (--n1);
  599. return result;
  600. }
  601. }
  602. }
  603. }
  604. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  605. {
  606. void * * tgt = result;
  607. while (n--)
  608. {
  609. if (compare.docompare(*ret1, *ret2) <= 0)
  610. {
  611. *tgt++ = *ret1++;
  612. if (--n1 == 0)
  613. {
  614. while (n--)
  615. {
  616. *tgt++ = *ret2++;
  617. }
  618. return result;
  619. }
  620. }
  621. else
  622. {
  623. *tgt++ = *ret2++;
  624. if (--n2 == 0)
  625. {
  626. while (n--)
  627. {
  628. *tgt++ = *ret1++;
  629. }
  630. return result;
  631. }
  632. }
  633. }
  634. return result;
  635. }
  636. inline void clonePartition(void * * result, size_t n, void * * src)
  637. {
  638. void * * tgt = result;
  639. while (n--)
  640. *tgt++ = *src++;
  641. }
  642. inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  643. {
  644. void * * tgt = result+n1+n2-1;
  645. ret1 += (n1-1);
  646. ret2 += (n2-1);
  647. while (n--)
  648. {
  649. if (compare.docompare(*ret1, *ret2) >= 0)
  650. {
  651. *tgt-- = *ret1--;
  652. if (--n1 == 0)
  653. {
  654. while (n--)
  655. {
  656. *tgt-- = *ret2--;
  657. }
  658. return result;
  659. }
  660. }
  661. else
  662. {
  663. *tgt-- = *ret2--;
  664. if (--n2 == 0)
  665. {
  666. //There must be at least one row in the left partition - copy any that remain
  667. while (n--)
  668. {
  669. *tgt-- = *ret1--;
  670. }
  671. return result;
  672. }
  673. }
  674. }
  675. return result;
  676. }
  677. static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
  678. {
  679. void * * result = (depth & 1) ? tmp : rows;
  680. //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
  681. //and in performance testing it executed marginally more quickly
  682. if (n <= 2)
  683. {
  684. //Check for n == 1, but compare against 2 to avoid another comparison
  685. if (n < 2)
  686. {
  687. if (result != rows)
  688. result[0] = rows[0];
  689. }
  690. else
  691. {
  692. void * left = rows[0];
  693. void * right = rows[1];
  694. if (compare.docompare(left, right) <= 0)
  695. {
  696. result[0] = left;
  697. result[1] = right;
  698. }
  699. else
  700. {
  701. result[0] = right;
  702. result[1] = left;
  703. }
  704. }
  705. return result;
  706. }
  707. size_t n1 = (n+1)/2;
  708. size_t n2 = n - n1;
  709. void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
  710. void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
  711. dbgassertex(ret2 == ret1 + n1);
  712. dbgassertex(ret2 != result);
  713. return mergePartitions(compare, result, n1, ret1, n2, ret2);
  714. }
  715. void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
  716. {
  717. if (n <= 1)
  718. return;
  719. mergeSort(rows, n, compare, temp, 0);
  720. }
  721. //=========================================================================
  722. //These constants are probably architecture and number of core dependent
  723. static const size_t singleThreadedMSortThreshold = 2000;
  724. static const size_t multiThreadedBlockThreshold = 64; // must be at least 2!
  725. #ifdef _USE_TBB
  726. using tbb::task;
  727. class TbbParallelMergeSorter
  728. {
  729. class SplitTask : public tbb::task
  730. {
  731. public:
  732. SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
  733. {
  734. }
  735. virtual task * execute()
  736. {
  737. if (next1->decrement_ref_count() == 0)
  738. spawn(*next1);
  739. if (next2->decrement_ref_count() == 0)
  740. return next2;
  741. return NULL;
  742. }
  743. protected:
  744. task * next1;
  745. task * next2;
  746. };
  747. class BisectTask : public tbb::task
  748. {
  749. public:
  750. BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
  751. : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth), next(_next)
  752. {
  753. }
  754. virtual task * execute()
  755. {
  756. loop
  757. {
  758. //On entry next is assumed to be used once by this function
  759. if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
  760. {
  761. //Create a new task rather than calling sort directly, so that the successor is set up correctly
  762. //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
  763. task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
  764. return sort;
  765. }
  766. void * * result = (depth & 1) ? temp : rows;
  767. void * * src = (depth & 1) ? rows : temp;
  768. size_t n1 = (n+1)/2;
  769. size_t n2 = n-n1;
  770. task * mergeTask;
  771. if (depth < sorter.parallelMergeDepth)
  772. {
  773. unsigned partitions = sorter.numPartitionCores() >> depth;
  774. if (partitions > 1)
  775. {
  776. PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
  777. for (unsigned i=0; i < partitions; i++)
  778. {
  779. MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  780. mergeFwdTask->set_ref_count(1);
  781. MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  782. mergeRevTask->set_ref_count(1);
  783. splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
  784. }
  785. next->decrement_ref_count();
  786. mergeTask = splitTask;
  787. }
  788. else
  789. {
  790. task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
  791. mergeFwdTask->set_ref_count(1);
  792. task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
  793. mergeRevTask->set_ref_count(1);
  794. mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
  795. }
  796. }
  797. else
  798. {
  799. mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
  800. }
  801. mergeTask->set_ref_count(2);
  802. task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
  803. spawn(*bisectRightTask);
  804. //recurse directly on the left side rather than creating a new task
  805. n = n1;
  806. depth = depth+1;
  807. next = mergeTask;
  808. }
  809. }
  810. protected:
  811. TbbParallelMergeSorter & sorter;
  812. void ** rows;
  813. void ** temp;
  814. task * next;
  815. size_t n;
  816. unsigned depth;
  817. };
  818. class SubSortTask : public tbb::task
  819. {
  820. public:
  821. SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
  822. : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth)
  823. {
  824. }
  825. virtual task * execute()
  826. {
  827. mergeSort(rows, n, sorter.compare, temp, depth);
  828. return NULL;
  829. }
  830. protected:
  831. TbbParallelMergeSorter & sorter;
  832. void ** rows;
  833. void ** temp;
  834. size_t n;
  835. unsigned depth;
  836. };
  837. class MergeTask : public tbb::task
  838. {
  839. public:
  840. MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  841. : compare(_compare),result(_result), n1(_n1), src1(_src1), n2(_n2), src2(_src2), n(_n)
  842. {
  843. }
  844. virtual task * execute()
  845. {
  846. //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
  847. if (n1 == 0)
  848. {
  849. assertex(n <= n2);
  850. clonePartition(result, n, src2);
  851. }
  852. else if (n2 == 0)
  853. {
  854. assertex(n <= n1);
  855. clonePartition(result, n, src1);
  856. }
  857. else
  858. mergePartitions(compare, result, n1, src1, n2, src2, n);
  859. return NULL;
  860. }
  861. void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
  862. {
  863. src1 += deltaLeft;
  864. n1 = numLeft;
  865. src2 += deltaRight;
  866. n2 = numRight;
  867. result += (deltaLeft + deltaRight);
  868. n = num;
  869. }
  870. protected:
  871. const ICompare & compare;
  872. void * * result;
  873. void * * src1;
  874. void * * src2;
  875. size_t n1;
  876. size_t n2;
  877. size_t n;
  878. };
  879. class MergeRevTask : public MergeTask
  880. {
  881. public:
  882. MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  883. : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
  884. {
  885. }
  886. virtual task * execute()
  887. {
  888. if (n1 == 0)
  889. {
  890. assertex(n <= n2);
  891. //This is a reverse merge, so copy n from the end of the input
  892. unsigned delta = n2 - n;
  893. clonePartition(result + delta, n, src2 + delta);
  894. }
  895. else if (n2 == 0)
  896. {
  897. assertex(n <= n1);
  898. unsigned delta = n1 - n;
  899. clonePartition(result + delta, n, src1 + delta);
  900. }
  901. else
  902. mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
  903. return NULL;
  904. }
  905. };
  906. class PartitionSplitTask : public tbb::task
  907. {
  908. public:
  909. PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
  910. : numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2), compare(_compare)
  911. {
  912. //These could be local variables in calculatePartitions(), but placed here to simplify cleanup. (Should consider using alloca)
  913. posLeft = new size_t[numPartitions+1];
  914. posRight = new size_t[numPartitions+1];
  915. tasks = new MergeTask *[numPartitions*2];
  916. for (unsigned i=0; i < numPartitions*2; i++)
  917. tasks[i] = NULL;
  918. }
  919. ~PartitionSplitTask()
  920. {
  921. delete [] posLeft;
  922. delete [] posRight;
  923. delete [] tasks;
  924. }
  925. void calculatePartitions()
  926. {
  927. #ifdef PARANOID_PARTITION
  928. {
  929. for (unsigned ix=1; ix<n1; ix++)
  930. if (compare.docompare(src1[ix-1], src1[ix]) > 0)
  931. DBGLOG("Failure left@%u", ix);
  932. }
  933. {
  934. for (unsigned ix=1; ix<n2; ix++)
  935. if (compare.docompare(src2[ix-1], src2[ix]) > 0)
  936. DBGLOG("Failure right@%u", ix);
  937. }
  938. #endif
  939. //If dividing into P parts, select S*P-1 even points from each side.
  940. unsigned numSamples = numPartitionSamples*numPartitions-1;
  941. QuantilePositionIterator iterLeft(n1, numSamples+1, false);
  942. QuantilePositionIterator iterRight(n2, numSamples+1, false);
  943. iterLeft.first();
  944. iterRight.first();
  945. size_t prevLeft = 0;
  946. size_t prevRight =0;
  947. posLeft[0] = 0;
  948. posRight[0] = 0;
  949. //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
  950. //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
  951. //=> pick samples [0, 2*numSamples, 4*numSamples ...]
  952. //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
  953. for (unsigned part = 1; part < numPartitions; part++)
  954. {
  955. unsigned numToSkip = numPartitionSamples*2;
  956. if (part == 1)
  957. numToSkip++;
  958. for (unsigned skip=numToSkip; skip-- != 0; )
  959. {
  960. size_t leftPos = iterLeft.get();
  961. size_t rightPos = iterRight.get();
  962. if (leftPos == n1)
  963. {
  964. if (skip == 0)
  965. {
  966. posLeft[part] = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
  967. posRight[part] = rightPos;
  968. }
  969. iterRight.next();
  970. }
  971. else if (rightPos == n2)
  972. {
  973. if (skip == 0)
  974. {
  975. posLeft[part] = leftPos;
  976. posRight[part] = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
  977. }
  978. iterLeft.next();
  979. }
  980. else
  981. {
  982. int c = compare.docompare(src1[leftPos], src2[rightPos]);
  983. if (skip == 0)
  984. {
  985. if (c <= 0)
  986. {
  987. //value in left is smallest. Find the position of the value <= the left value
  988. posLeft[part] = leftPos;
  989. posRight[part] = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
  990. }
  991. else
  992. {
  993. posLeft[part] = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
  994. posRight[part] = rightPos;
  995. }
  996. }
  997. if (c <= 0)
  998. {
  999. iterLeft.next();
  1000. prevLeft = leftPos;
  1001. }
  1002. else
  1003. {
  1004. iterRight.next();
  1005. prevRight = rightPos;
  1006. }
  1007. }
  1008. }
  1009. }
  1010. posLeft[numPartitions] = n1;
  1011. posRight[numPartitions] = n2;
  1012. #ifdef TRACE_PARTITION
  1013. DBGLOG("%d,%d -> {", (unsigned)n1, (unsigned)n2);
  1014. #endif
  1015. for (unsigned i= 0; i < numPartitions; i++)
  1016. {
  1017. size_t start = posLeft[i] + posRight[i];
  1018. size_t end = posLeft[i+1] + posRight[i+1];
  1019. size_t num = end - start;
  1020. size_t numFwd = num/2;
  1021. #ifdef TRACE_PARTITION
  1022. DBGLOG(" ([%d..%d],[%d..%d] %d,%d = %d)",
  1023. (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
  1024. (unsigned)start, (unsigned)end, (unsigned)num);
  1025. #endif
  1026. MergeTask & mergeFwdTask = *tasks[i*2];
  1027. MergeTask & mergeRevTask = *tasks[i*2+1];
  1028. mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1029. posRight[i], posRight[i+1]-posRight[i],
  1030. numFwd);
  1031. mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1032. posRight[i], posRight[i+1]-posRight[i],
  1033. num-numFwd);
  1034. }
  1035. }
  1036. virtual task * execute()
  1037. {
  1038. calculatePartitions();
  1039. for (unsigned i=0; i < numPartitions*2; i++)
  1040. {
  1041. if (tasks[i]->decrement_ref_count() == 0)
  1042. spawn(*tasks[i]);
  1043. }
  1044. return NULL;
  1045. }
  1046. void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
  1047. {
  1048. tasks[i*2] = fwd;
  1049. tasks[i*2+1] = rev;
  1050. }
  1051. protected:
  1052. size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
  1053. {
  1054. if (low == high)
  1055. return low;
  1056. while (high - low > 1)
  1057. {
  1058. size_t mid = (low + high) / 2;
  1059. if (compare.docompare(rows[mid], seek) < 0)
  1060. low = mid;
  1061. else
  1062. high = mid;
  1063. }
  1064. if (compare.docompare(rows[low], seek) < 0)
  1065. return low+1;
  1066. return low;
  1067. }
  1068. size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
  1069. {
  1070. if (low == high)
  1071. return low;
  1072. while (high - low > 1)
  1073. {
  1074. size_t mid = (low + high) / 2;
  1075. if (compare.docompare(rows[mid], seek) <= 0)
  1076. low = mid;
  1077. else
  1078. high = mid;
  1079. }
  1080. if (compare.docompare(rows[low], seek) <= 0)
  1081. return low+1;
  1082. return low;
  1083. }
  1084. protected:
  1085. const ICompare & compare;
  1086. unsigned numPartitions;
  1087. size_t n1;
  1088. size_t n2;
  1089. void * * src1;
  1090. void * * src2;
  1091. size_t * posLeft;
  1092. size_t * posRight;
  1093. MergeTask * * tasks;
  1094. };
  1095. public:
  1096. TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
  1097. {
  1098. //The following constants control the number of iterations to be performed in parallel.
  1099. //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
  1100. //The following constants should possibly be tuned on each platform. The following gave a good balance on a 2x8way xeon
  1101. const unsigned extraBisectDepth = 3;
  1102. const unsigned extraParallelMergeDepth = 3;
  1103. unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
  1104. unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
  1105. assertex(numCpus <= (1U << ln2NumCpus));
  1106. //Merge in parallel once it is likely to be beneficial
  1107. parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
  1108. //Aim to execute in parallel until the width is 8*the maximum number of parallel task
  1109. singleThreadDepth = ln2NumCpus + extraBisectDepth;
  1110. partitionCores = numCpus / 2;
  1111. }
  1112. unsigned numPartitionCores() const { return partitionCores; }
  1113. void sortRoot(void ** rows, size_t n, void ** temp)
  1114. {
  1115. task * end = new (task::allocate_root()) tbb::empty_task();
  1116. end->set_ref_count(1+1);
  1117. task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
  1118. end->spawn(*task);
  1119. end->wait_for_all();
  1120. end->destroy(*end);
  1121. }
  1122. public:
  1123. const ICompare & compare;
  1124. unsigned singleThreadDepth;
  1125. unsigned parallelMergeDepth;
  1126. unsigned partitionCores;
  1127. void * * baseRows;
  1128. };
  1129. //-------------------------------------------------------------------------------------------------------------------
  1130. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1131. {
  1132. if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
  1133. {
  1134. msortvecstableinplace(rows, n, compare, temp);
  1135. return;
  1136. }
  1137. TbbParallelMergeSorter sorter(rows, compare);
  1138. sorter.sortRoot(rows, n, temp);
  1139. }
  1140. #else
  1141. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1142. {
  1143. parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
  1144. }
  1145. #endif
  1146. //=========================================================================
  1147. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  1148. {
  1149. bool nochange = true;
  1150. while(1)
  1151. {
  1152. unsigned c = p*2 + 1;
  1153. if(c >= num)
  1154. return nochange;
  1155. if(c+1 < num)
  1156. {
  1157. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  1158. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  1159. ++c;
  1160. }
  1161. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1162. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1163. return nochange;
  1164. nochange = false;
  1165. unsigned r = heap[c];
  1166. heap[c] = heap[p];
  1167. heap[p] = r;
  1168. p = c;
  1169. }
  1170. }
  1171. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  1172. {
  1173. bool nochange = true;
  1174. while(c > 0)
  1175. {
  1176. unsigned p = (unsigned)(c-1)/2;
  1177. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1178. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1179. return nochange;
  1180. nochange = false;
  1181. unsigned r = heap[c];
  1182. heap[c] = heap[p];
  1183. heap[p] = r;
  1184. c = p;
  1185. }
  1186. return nochange;
  1187. }
  1188. //=========================================================================
  1189. #include <assert.h>
  1190. #include <stdio.h>
  1191. #include <stdlib.h>
  1192. #include <fcntl.h>
  1193. #include <string.h>
  1194. #include <stddef.h>
  1195. #include <time.h>
  1196. #ifdef _WIN32
  1197. #include <io.h>
  1198. #include <sys\types.h>
  1199. #include <sys\stat.h>
  1200. #else
  1201. #include <sys/types.h>
  1202. #include <sys/stat.h>
  1203. #endif
  1204. #ifndef off_t
  1205. #define off_t __int64
  1206. #endif
  1207. typedef void ** VECTOR;
  1208. interface IMergeSorter
  1209. {
  1210. public:
  1211. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  1212. };
  1213. #define INSERTMAX 10000
  1214. #define BUFFSIZE 0x100000 // used for output buffer
  1215. //==================================================================================================
  1216. #ifdef DOUBLE_COMPARE
  1217. #define BuffCompare(a, b) ((MSbuffcomp(a,b,inbuffers,mergeheap,icmp)+MSbuffcomp(a,b,inbuffers,mergeheap,icmp))/2)
  1218. #else
  1219. #define BuffCompare(a, b) MSbuffcomp(a,b,inbuffers,mergeheap,icmp)
  1220. #endif
  1221. static inline int MSbuffcomp(unsigned a,unsigned b, void **inbuffers, unsigned *mergeheap, ICompare *icmp)
  1222. {
  1223. int ret = icmp->docompare(inbuffers[mergeheap[a]], inbuffers[mergeheap[b]]);
  1224. if (ret==0)
  1225. ret = (int)mergeheap[a]-mergeheap[b];
  1226. return ret;
  1227. }
  1228. class CRowStreamMerger
  1229. {
  1230. const void **pending;
  1231. size32_t *recsize;
  1232. unsigned *mergeheap;
  1233. unsigned activeInputs;
  1234. count_t recno;
  1235. const ICompare *icmp;
  1236. bool partdedup;
  1237. IRowProvider &provider;
  1238. MemoryAttr workingbuf;
  1239. #ifdef _DEBUG
  1240. bool *stopped;
  1241. MemoryAttr ma;
  1242. #endif
  1243. inline int buffCompare(unsigned a, unsigned b)
  1244. {
  1245. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  1246. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  1247. }
  1248. bool promote(unsigned p)
  1249. {
  1250. activeInputs--;
  1251. if(activeInputs == p)
  1252. return false;
  1253. mergeheap[p] = mergeheap[activeInputs];
  1254. return true;
  1255. }
  1256. inline bool siftDown(unsigned p)
  1257. {
  1258. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  1259. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  1260. bool nochange = true;
  1261. while(1)
  1262. {
  1263. unsigned c = p*2 + 1;
  1264. if(c >= activeInputs)
  1265. return nochange;
  1266. if(c+1 < activeInputs)
  1267. {
  1268. int childcmp = buffCompare(c+1, c);
  1269. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  1270. ++c;
  1271. }
  1272. int cmp = buffCompare(c, p);
  1273. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  1274. return nochange;
  1275. nochange = false;
  1276. unsigned r = mergeheap[c];
  1277. mergeheap[c] = mergeheap[p];
  1278. mergeheap[p] = r;
  1279. p = c;
  1280. }
  1281. }
  1282. void siftDownDedupTop()
  1283. {
  1284. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  1285. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  1286. if(activeInputs < 2)
  1287. return;
  1288. unsigned c = 1;
  1289. int childcmp = 1;
  1290. if(activeInputs >= 3)
  1291. {
  1292. childcmp = buffCompare(2, 1);
  1293. if(childcmp < 0)
  1294. c = 2;
  1295. }
  1296. int cmp = buffCompare(c, 0);
  1297. if(cmp > 0)
  1298. return;
  1299. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  1300. while(cmp <= 0)
  1301. {
  1302. if(cmp == 0)
  1303. {
  1304. if(mergeheap[c] < mergeheap[0])
  1305. {
  1306. unsigned r = mergeheap[c];
  1307. mergeheap[c] = mergeheap[0];
  1308. mergeheap[0] = r;
  1309. }
  1310. if(!pullInput(mergeheap[c]))
  1311. if(!promote(c))
  1312. break;
  1313. siftDown(c);
  1314. }
  1315. else
  1316. {
  1317. unsigned r = mergeheap[c];
  1318. mergeheap[c] = mergeheap[0];
  1319. mergeheap[0] = r;
  1320. if(siftDown(c))
  1321. break;
  1322. }
  1323. cmp = buffCompare(c, 0);
  1324. }
  1325. // the following loop ensures the uniqueness property holds on the other branch too
  1326. c = 3-c;
  1327. if(activeInputs <= c)
  1328. return;
  1329. while(childcmp == 0)
  1330. {
  1331. if(mergeheap[c] < mergeheap[0])
  1332. {
  1333. unsigned r = mergeheap[c];
  1334. mergeheap[c] = mergeheap[0];
  1335. mergeheap[0] = r;
  1336. }
  1337. if(!pullInput(mergeheap[c]))
  1338. if(!promote(c))
  1339. break;
  1340. siftDown(c);
  1341. childcmp = buffCompare(c, 0);
  1342. }
  1343. }
  1344. void init()
  1345. {
  1346. if (activeInputs>0) {
  1347. // setup heap property
  1348. if (activeInputs >= 2)
  1349. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  1350. siftDown(p);
  1351. if (partdedup)
  1352. siftDownDedupTop();
  1353. else
  1354. siftDown(0);
  1355. }
  1356. recno = 0;
  1357. }
  1358. inline count_t num() const { return recno; }
  1359. inline bool _next()
  1360. {
  1361. if (!activeInputs)
  1362. return false;
  1363. if (recno) {
  1364. if(!pullInput(mergeheap[0]))
  1365. if(!promote(0))
  1366. return false;
  1367. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  1368. if(partdedup)
  1369. siftDownDedupTop();
  1370. else
  1371. siftDown(0);
  1372. }
  1373. recno++;
  1374. return true;
  1375. }
  1376. inline bool eof() const { return activeInputs==0; }
  1377. bool pullInput(unsigned i)
  1378. {
  1379. if (pending[i]) {
  1380. assertex(partdedup);
  1381. provider.releaseRow(pending[i]);
  1382. }
  1383. pending[i] = provider.nextRow(i);
  1384. if (pending[i])
  1385. return true;
  1386. provider.stop(i);
  1387. #ifdef _DEBUG
  1388. assertex(!stopped[i]);
  1389. stopped[i] = true;
  1390. #endif
  1391. return false;
  1392. }
  1393. public:
  1394. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  1395. : provider(_provider)
  1396. {
  1397. partdedup = _partdedup;
  1398. icmp = _icmp;
  1399. recsize = NULL;
  1400. activeInputs = 0;
  1401. #ifdef _DEBUG
  1402. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  1403. memset(stopped,0,numstreams*sizeof(bool));
  1404. #endif
  1405. unsigned i;
  1406. recsize = NULL;
  1407. if (numstreams) {
  1408. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  1409. pending = (const void **)buf;
  1410. mergeheap = (unsigned *)(pending+numstreams);
  1411. for (i=0;i<numstreams;i++) {
  1412. pending[i] = NULL;
  1413. if (pullInput(i))
  1414. mergeheap[activeInputs++] = i;
  1415. }
  1416. }
  1417. else {
  1418. pending = NULL;
  1419. mergeheap = NULL;
  1420. }
  1421. init();
  1422. }
  1423. void stop()
  1424. {
  1425. while (activeInputs) {
  1426. activeInputs--;
  1427. if (pending[mergeheap[activeInputs]]) {
  1428. provider.releaseRow(pending[mergeheap[activeInputs]]);
  1429. #ifdef _DEBUG
  1430. assertex(!stopped[mergeheap[activeInputs]]);
  1431. stopped[mergeheap[activeInputs]] = true;
  1432. #endif
  1433. provider.stop(mergeheap[activeInputs]);
  1434. }
  1435. }
  1436. pending = NULL;
  1437. mergeheap = NULL;
  1438. workingbuf.clear();
  1439. }
  1440. ~CRowStreamMerger()
  1441. {
  1442. stop();
  1443. }
  1444. inline const void * next()
  1445. {
  1446. if (!_next())
  1447. return NULL;
  1448. unsigned strm = mergeheap[0];
  1449. const void *row = pending[strm];
  1450. pending[strm] = NULL;
  1451. return row;
  1452. }
  1453. };
  1454. class CMergeRowStreams : public CInterface, implements IRowStream
  1455. {
  1456. protected:
  1457. CRowStreamMerger *merger;
  1458. bool eos;
  1459. class cProvider: public CInterface, implements IRowProvider
  1460. {
  1461. IArrayOf<IRowStream> ostreams;
  1462. IRowStream **streams;
  1463. Linked<IRowLinkCounter> linkcounter;
  1464. const void *nextRow(unsigned idx)
  1465. {
  1466. return streams[idx]->nextRow();
  1467. };
  1468. void linkRow(const void *row)
  1469. {
  1470. linkcounter->linkRow(row);
  1471. }
  1472. void releaseRow(const void *row)
  1473. {
  1474. linkcounter->releaseRow(row);
  1475. }
  1476. void stop(unsigned idx)
  1477. {
  1478. streams[idx]->stop();
  1479. }
  1480. public:
  1481. IMPLEMENT_IINTERFACE;
  1482. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  1483. : linkcounter(_linkcounter)
  1484. {
  1485. ostreams.ensure(numstreams);
  1486. unsigned n = 0;
  1487. while (n<numstreams)
  1488. ostreams.append(*LINK(_streams[n++]));
  1489. streams = ostreams.getArray();
  1490. }
  1491. } *streamprovider;
  1492. public:
  1493. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  1494. {
  1495. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  1496. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  1497. eos = _numstreams==0;
  1498. }
  1499. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  1500. {
  1501. streamprovider = NULL;
  1502. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  1503. eos = _numstreams==0;
  1504. }
  1505. ~CMergeRowStreams()
  1506. {
  1507. delete merger;
  1508. delete streamprovider;
  1509. }
  1510. IMPLEMENT_IINTERFACE;
  1511. void stop()
  1512. {
  1513. if (!eos) {
  1514. merger->stop();
  1515. eos = true;
  1516. }
  1517. }
  1518. const void *nextRow()
  1519. {
  1520. if (eos)
  1521. return NULL;
  1522. const void *r = merger->next();
  1523. if (!r) {
  1524. stop(); // think ok to stop early
  1525. return NULL;
  1526. }
  1527. return r;
  1528. }
  1529. };
  1530. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  1531. {
  1532. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  1533. }
  1534. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  1535. {
  1536. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  1537. }