jsort.cpp 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _USE_TBB
  26. #include "tbb/task.h"
  27. #include "tbb/task_scheduler_init.h"
  28. #include "tbb/parallel_sort.h"
  29. #endif
  30. #ifdef _DEBUG
  31. // #define PARANOID
  32. // #define DOUBLE_COMPARE
  33. //#define TESTPARSORT
  34. //#define MCMERGESTATS
  35. #endif
  36. //#define TRACE_PARTITION
  37. #define PARALLEL_GRANULARITY 1024
  38. static const unsigned numPartitionSamples = 3;
  39. static bool sortParallel(unsigned &numcpus)
  40. {
  41. static unsigned numCPUs = 0;
  42. if (numCPUs==0) {
  43. numCPUs = getAffinityCpus();
  44. }
  45. if ((numcpus==0)||(numcpus>numCPUs))
  46. numcpus = numCPUs;
  47. #ifdef TESTPARSORT
  48. numcpus = 2;
  49. return true; // to test
  50. #endif
  51. return (numcpus>1);
  52. }
  53. //define two variants of the same insertion function.
  54. #define COMPARE(search,position) compare(search,position)
  55. #define INSERT(position,search) memmove(position,search, width)
  56. void * binary_add(const void *newitem, const void *base,
  57. size32_t nmemb,
  58. size32_t width,
  59. int ( *compare)(const void *_e1, const void *_e2),
  60. bool * ItemAdded)
  61. {
  62. #include "jsort.inc"
  63. }
  64. #undef COMPARE
  65. #undef INSERT
  66. //---------------------------------------------------------------------------
  67. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  68. #define INSERT(position,search) *(const void * *)(position) = search
  69. #define NEVER_ADD
  70. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  71. size32_t nmemb,
  72. sortCompareFunction compare,
  73. bool * ItemAdded)
  74. {
  75. #define width sizeof(void*)
  76. #include "jsort.inc"
  77. #undef width
  78. }
  79. #undef NEVER_ADD
  80. #undef INSERT
  81. #undef COMPARE
  82. //---------------------------------------------------------------------------
  83. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  84. #define INSERT(position,search) *(const void * *)(position) = search
  85. #define NEVER_ADD
  86. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  87. size32_t nmemb,
  88. ICompare & compare,
  89. bool * ItemAdded)
  90. {
  91. #define width sizeof(void*)
  92. #include "jsort.inc"
  93. #undef width
  94. }
  95. #undef NEVER_ADD
  96. #undef INSERT
  97. #undef COMPARE
  98. //---------------------------------------------------------------------------
  99. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  100. #define INSERT(position,search) *(const void * *)(position) = search
  101. #define ALWAYS_ADD
  102. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  103. size32_t nmemb,
  104. sortCompareFunction compare)
  105. {
  106. #define width sizeof(void*)
  107. #include "jsort.inc"
  108. #undef width
  109. }
  110. #undef ALWAYS_ADD
  111. #undef INSERT
  112. #undef COMPARE
  113. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  114. #define INSERT(position,search) *(const void * *)(position) = search
  115. #define ALWAYS_ADD
  116. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  117. size32_t nmemb,
  118. ICompare const & compare)
  119. {
  120. #define width sizeof(void*)
  121. #include "jsort.inc"
  122. #undef width
  123. }
  124. #undef ALWAYS_ADD
  125. #undef INSERT
  126. #undef COMPARE
  127. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  128. #define INSERT(position,search) *(const void * *)(position) = search
  129. #define ALWAYS_ADD
  130. #define SEEK_LAST_MATCH
  131. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  132. size32_t nmemb,
  133. sortCompareFunction compare)
  134. {
  135. #define width sizeof(void*)
  136. #include "jsort.inc"
  137. #undef width
  138. }
  139. #undef SEEK_LAST_MATCH
  140. #undef ALWAYS_ADD
  141. #undef INSERT
  142. #undef COMPARE
  143. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  144. #define INSERT(position,search) *(const void * *)(position) = search
  145. #define ALWAYS_ADD
  146. #define SEEK_LAST_MATCH
  147. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  148. size32_t nmemb,
  149. ICompare const & compare)
  150. {
  151. #define width sizeof(void*)
  152. #include "jsort.inc"
  153. #undef width
  154. }
  155. #undef SEEK_LAST_MATCH
  156. #undef ALWAYS_ADD
  157. #undef INSERT
  158. #undef COMPARE
  159. //=========================================================================
  160. // optimized quicksort for array of pointers to fixed size objects
  161. typedef void * ELEMENT;
  162. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  163. #define VECTOR _VECTOR
  164. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  165. #define SWAP swap
  166. #define CMP(a,b) memcmp(*(a),*(b),es)
  167. #define MED3(a,b,c) med3a(a,b,c,es)
  168. #define RECURSE(a,b) qsortvec(a, b, es)
  169. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  170. {
  171. return CMP(a, b) < 0 ?
  172. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  173. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  174. }
  175. void qsortvec(void **a, size32_t n, size32_t es)
  176. #include "jsort2.inc"
  177. #undef CMP
  178. #undef MED3
  179. #undef RECURSE
  180. //---------------------------------------------------------------------------
  181. #define CMP(a,b) (compare(*(a),*(b)))
  182. #define MED3(a,b,c) med3c(a,b,c,compare)
  183. #define RECURSE(a,b) qsortvec(a, b, compare)
  184. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  185. {
  186. return CMP(a, b) < 0 ?
  187. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  188. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  189. }
  190. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  191. #include "jsort2.inc"
  192. #undef CMP
  193. #undef MED3
  194. #undef RECURSE
  195. //---------------------------------------------------------------------------
  196. // tbb versions of the quick sort to provide a useful base comparison
  197. class TbbCompareWrapper
  198. {
  199. public:
  200. TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
  201. bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
  202. const ICompare & compare;
  203. };
  204. class TbbCompareIndirectWrapper
  205. {
  206. public:
  207. TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
  208. bool operator()(void * * const & l, void * * const & r) const
  209. {
  210. int ret = compare.docompare(*l,*r);
  211. if (ret==0)
  212. {
  213. if (l < r)
  214. return true;
  215. else
  216. return false;
  217. }
  218. return (ret < 0);
  219. }
  220. const ICompare & compare;
  221. };
  222. void tbbqsortvec(void **a, size_t n, const ICompare & compare)
  223. {
  224. #ifdef _USE_TBB
  225. TbbCompareWrapper tbbcompare(compare);
  226. tbb::parallel_sort(a, a+n, tbbcompare);
  227. #else
  228. throwUnexpectedX("TBB quicksort not available");
  229. #endif
  230. }
  231. void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
  232. {
  233. #ifdef _USE_TBB
  234. void * * * rowsAsIndex = (void * * *)rows;
  235. memcpy(temp, rows, n * sizeof(void*));
  236. for(unsigned i=0; i<n; ++i)
  237. rowsAsIndex[i] = temp+i;
  238. TbbCompareIndirectWrapper tbbcompare(compare);
  239. tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
  240. //I'm sure this violates the aliasing rules...
  241. for(unsigned i=0; i<n; ++i)
  242. rows[i] = *rowsAsIndex[i];
  243. #else
  244. throwUnexpectedX("TBB quicksort not available");
  245. #endif
  246. }
  247. //---------------------------------------------------------------------------
  248. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  249. #define MED3(a,b,c) med3ic(a,b,c,compare)
  250. #define RECURSE(a,b) qsortvec(a, b, compare)
  251. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  252. {
  253. return CMP(a, b) < 0 ?
  254. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  255. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  256. }
  257. void qsortvec(void **a, size32_t n, const ICompare & compare)
  258. #include "jsort2.inc"
  259. // Parallel version (only 2 threads currently)
  260. class cParQSortBase
  261. {
  262. struct sJobItem
  263. {
  264. unsigned start;
  265. unsigned num;
  266. };
  267. NonReentrantSpinLock joblock;
  268. QueueOf<sJobItem,false> jobq;
  269. Semaphore jobqsem;
  270. unsigned waiting;
  271. unsigned numsubthreads;
  272. bool done;
  273. class cThread: public Thread
  274. {
  275. cParQSortBase *parent;
  276. public:
  277. cThread(cParQSortBase *_parent)
  278. : Thread("cParQSort")
  279. {
  280. parent = _parent;
  281. }
  282. int run()
  283. {
  284. parent->run();
  285. return 0;
  286. }
  287. } **threads;
  288. bool waitForWork(unsigned &s,unsigned &n)
  289. {
  290. NonReentrantSpinBlock block(joblock);
  291. while (!done) {
  292. sJobItem *qi = jobq.dequeue();
  293. if (qi) {
  294. s = qi->start;
  295. n = qi->num;
  296. delete qi;
  297. return true;
  298. }
  299. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  300. done = true;
  301. jobqsem.signal(waiting);
  302. break;
  303. }
  304. waiting++;
  305. NonReentrantSpinUnblock unblock(joblock);
  306. jobqsem.wait();
  307. }
  308. s = 0; // remove uninitialised variable warnings
  309. n = 0;
  310. return false;
  311. }
  312. public:
  313. cParQSortBase(unsigned _numsubthreads)
  314. {
  315. numsubthreads = _numsubthreads;
  316. done = false;
  317. waiting = 0;
  318. threads = new cThread*[numsubthreads];
  319. for (unsigned i=0;i<numsubthreads;i++)
  320. threads[i] = new cThread(this);
  321. }
  322. ~cParQSortBase()
  323. {
  324. for (unsigned i=0;i<numsubthreads;i++)
  325. threads[i]->Release();
  326. delete [] threads;
  327. }
  328. void start()
  329. {
  330. for (unsigned i=0;i<numsubthreads;i++)
  331. threads[i]->start();
  332. }
  333. void subsort(unsigned s, unsigned n)
  334. {
  335. do {
  336. sJobItem *qi;
  337. while (n>PARALLEL_GRANULARITY) {
  338. unsigned r1;
  339. unsigned r2;
  340. partition(s, n, r1, r2);
  341. unsigned n2 = n+s-r2;
  342. if (r1==s) {
  343. n = n2;
  344. s = r2;
  345. }
  346. else {
  347. if (n2!=0) {
  348. qi = new sJobItem;
  349. qi->num = n2;
  350. qi->start = r2;
  351. NonReentrantSpinBlock block(joblock);
  352. jobq.enqueue(qi);
  353. if (waiting) {
  354. jobqsem.signal(waiting);
  355. waiting = 0;
  356. }
  357. }
  358. n = r1-s;
  359. }
  360. }
  361. serialsort(s,n);
  362. NonReentrantSpinBlock block(joblock);
  363. if (waiting==numsubthreads) { // well we are done so are rest
  364. done = true;
  365. jobqsem.signal(waiting);
  366. break;
  367. }
  368. }
  369. while(waitForWork(s,n));
  370. }
  371. void run()
  372. {
  373. unsigned s;
  374. unsigned n;
  375. if (waitForWork(s,n))
  376. subsort(s,n);
  377. }
  378. void join()
  379. {
  380. for (unsigned i=0;i<numsubthreads;i++)
  381. threads[i]->join();
  382. }
  383. virtual void serialsort(unsigned from, unsigned len)=0;
  384. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  385. };
  386. #define DOPARTITION \
  387. VECTOR a = array+s; \
  388. VECTOR pm = a + (n / 2); \
  389. VECTOR pl = a; \
  390. VECTOR pn = a + (n - 1) ; \
  391. if (n > 40) { \
  392. unsigned d = (n / 8); \
  393. pl = MED3(pl, pl + d, pl + 2 * d); \
  394. pm = MED3(pm - d, pm, pm + d); \
  395. pn = MED3(pn - 2 * d, pn - d, pn); \
  396. } \
  397. pm = MED3(pl, pm, pn); \
  398. SWAP(a, pm); \
  399. VECTOR pa = a + 1; \
  400. VECTOR pb = pa; \
  401. VECTOR pc = a + (n - 1); \
  402. VECTOR pd = pc; \
  403. int r; \
  404. for (;;) { \
  405. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  406. if (r == 0) { \
  407. SWAP(pa, pb); \
  408. pa++; \
  409. } \
  410. pb++; \
  411. } \
  412. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  413. if (r == 0) { \
  414. SWAP(pc, pd); \
  415. pd--; \
  416. } \
  417. pc--; \
  418. } \
  419. if (pb > pc) \
  420. break; \
  421. SWAP(pb, pc); \
  422. pb++; \
  423. pc--; \
  424. } \
  425. pn = a + n; \
  426. r = MIN(pa - a, pb - pa); \
  427. VECTOR v1 = a; \
  428. VECTOR v2 = pb-r; \
  429. while (r) { \
  430. SWAP(v1,v2); v1++; v2++; r--; \
  431. }; \
  432. r = MIN(pd - pc, pn - pd - 1); \
  433. v1 = pb; \
  434. v2 = pn-r; \
  435. while (r) { \
  436. SWAP(v1,v2); v1++; v2++; r--; \
  437. }; \
  438. r1 = (pb-pa)+s; \
  439. r2 = n-(pd-pc)+s;
  440. class cParQSort: public cParQSortBase
  441. {
  442. VECTOR array;
  443. const ICompare &compare;
  444. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  445. {
  446. DOPARTITION
  447. }
  448. void serialsort(unsigned from, unsigned len)
  449. {
  450. qsortvec(array+from,len,compare);
  451. }
  452. public:
  453. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  454. : cParQSortBase(_numsubthreads), compare(_compare)
  455. {
  456. array = _a;
  457. cParQSortBase::start();
  458. }
  459. };
  460. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  461. {
  462. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  463. qsortvec(a,n,compare);
  464. return;
  465. }
  466. cParQSort sorter(a,compare,numcpus-1);
  467. sorter.subsort(0,n);
  468. sorter.join();
  469. #ifdef TESTPARSORT
  470. for (unsigned i=1;i<n;i++)
  471. if (compare.docompare(a[i-1],a[i])>0)
  472. ERRLOG("parqsortvec failed %d",i);
  473. #endif
  474. }
  475. #undef CMP
  476. #undef MED3
  477. #undef RECURSE
  478. //---------------------------------------------------------------------------
  479. #undef VECTOR
  480. #undef SWAP
  481. typedef void *** _IVECTOR;
  482. #define VECTOR _IVECTOR
  483. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  484. #define SWAP swapind
  485. #define CMP(a,b) cmpicindstable(a,b,compare)
  486. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  487. {
  488. int ret = compare.docompare(**a,**b);
  489. if (ret==0)
  490. {
  491. if (*a>*b)
  492. ret = 1;
  493. else if (*a<*b)
  494. ret = -1;
  495. }
  496. return ret;
  497. }
  498. #define MED3(a,b,c) med3ic(a,b,c,compare)
  499. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  500. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  501. {
  502. return CMP(a, b) < 0 ?
  503. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  504. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  505. }
  506. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  507. #include "jsort2.inc"
  508. class cParQSortStable: public cParQSortBase
  509. {
  510. VECTOR array;
  511. const ICompare &compare;
  512. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  513. {
  514. DOPARTITION
  515. }
  516. void serialsort(unsigned from, unsigned len)
  517. {
  518. doqsortvecstable(array+from,len,compare);
  519. }
  520. public:
  521. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  522. : cParQSortBase(_numsubthreads),compare(_compare)
  523. {
  524. array = _a;
  525. cParQSortBase::start();
  526. }
  527. };
  528. #undef CMP
  529. #undef CMP1
  530. #undef MED3
  531. #undef RECURSE
  532. #undef VECTOR
  533. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  534. {
  535. for(unsigned i=0; i<n; ++i)
  536. index[i] = rows+i;
  537. doqsortvecstable(index, n, compare);
  538. }
  539. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  540. {
  541. memcpy(temp, rows, n * sizeof(void*));
  542. qsortvecstable(temp, n, compare, (void * * *)rows);
  543. //I'm sure this violates the aliasing rules...
  544. void * * * rowsAsIndex = (void * * *)rows;
  545. for(unsigned i=0; i<n; ++i)
  546. rows[i] = *rowsAsIndex[i];
  547. }
  548. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  549. {
  550. for(unsigned i=0; i<n; ++i)
  551. index[i] = rows+i;
  552. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  553. doqsortvecstable(index,n,compare);
  554. return;
  555. }
  556. cParQSortStable sorter(index,compare,numcpus-1);
  557. sorter.subsort(0,n);
  558. sorter.join();
  559. }
  560. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  561. {
  562. memcpy(temp, rows, n * sizeof(void*));
  563. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  564. //I'm sure this violates the aliasing rules...
  565. void * * * rowsAsIndex = (void * * *)rows;
  566. for(size32_t i=0; i<n; ++i)
  567. rows[i] = *rowsAsIndex[i];
  568. }
  569. //-----------------------------------------------------------------------------------------------------------------------------
  570. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
  571. {
  572. void * * tgt = result;
  573. loop
  574. {
  575. if (compare.docompare(*ret1, *ret2) <= 0)
  576. {
  577. *tgt++ = *ret1++;
  578. if (--n1 == 0)
  579. {
  580. //There must be at least one row in the right partition - copy any that remain
  581. do
  582. {
  583. *tgt++ = *ret2++;
  584. } while (--n2);
  585. return result;
  586. }
  587. }
  588. else
  589. {
  590. *tgt++ = *ret2++;
  591. if (--n2 == 0)
  592. {
  593. //There must be at least one row in the left partition - copy any that remain
  594. do
  595. {
  596. *tgt++ = *ret1++;
  597. } while (--n1);
  598. return result;
  599. }
  600. }
  601. }
  602. }
  603. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  604. {
  605. void * * tgt = result;
  606. while (n--)
  607. {
  608. if (compare.docompare(*ret1, *ret2) <= 0)
  609. {
  610. *tgt++ = *ret1++;
  611. if (--n1 == 0)
  612. {
  613. while (n--)
  614. {
  615. *tgt++ = *ret2++;
  616. }
  617. return result;
  618. }
  619. }
  620. else
  621. {
  622. *tgt++ = *ret2++;
  623. if (--n2 == 0)
  624. {
  625. while (n--)
  626. {
  627. *tgt++ = *ret1++;
  628. }
  629. return result;
  630. }
  631. }
  632. }
  633. return result;
  634. }
  635. inline void clonePartition(void * * result, size_t n, void * * src)
  636. {
  637. void * * tgt = result;
  638. while (n--)
  639. *tgt++ = *src++;
  640. }
  641. inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  642. {
  643. void * * tgt = result+n1+n2-1;
  644. ret1 += (n1-1);
  645. ret2 += (n2-1);
  646. while (n--)
  647. {
  648. if (compare.docompare(*ret1, *ret2) >= 0)
  649. {
  650. *tgt-- = *ret1--;
  651. if (--n1 == 0)
  652. {
  653. while (n--)
  654. {
  655. *tgt-- = *ret2--;
  656. }
  657. return result;
  658. }
  659. }
  660. else
  661. {
  662. *tgt-- = *ret2--;
  663. if (--n2 == 0)
  664. {
  665. //There must be at least one row in the left partition - copy any that remain
  666. while (n--)
  667. {
  668. *tgt-- = *ret1--;
  669. }
  670. return result;
  671. }
  672. }
  673. }
  674. return result;
  675. }
  676. static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
  677. {
  678. void * * result = (depth & 1) ? tmp : rows;
  679. //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
  680. //and in performance testing it executed marginally more quickly
  681. if (n <= 2)
  682. {
  683. //Check for n == 1, but compare against 2 to avoid another comparison
  684. if (n < 2)
  685. {
  686. if (result != rows)
  687. result[0] = rows[0];
  688. }
  689. else
  690. {
  691. void * left = rows[0];
  692. void * right = rows[1];
  693. if (compare.docompare(left, right) <= 0)
  694. {
  695. result[0] = left;
  696. result[1] = right;
  697. }
  698. else
  699. {
  700. result[0] = right;
  701. result[1] = left;
  702. }
  703. }
  704. return result;
  705. }
  706. size_t n1 = (n+1)/2;
  707. size_t n2 = n - n1;
  708. void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
  709. void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
  710. dbgassertex(ret2 == ret1 + n1);
  711. dbgassertex(ret2 != result);
  712. return mergePartitions(compare, result, n1, ret1, n2, ret2);
  713. }
  714. void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
  715. {
  716. if (n <= 1)
  717. return;
  718. mergeSort(rows, n, compare, temp, 0);
  719. }
  720. //=========================================================================
  721. //These constants are probably architecture and number of core dependent
  722. static const size_t singleThreadedMSortThreshold = 2000;
  723. static const size_t multiThreadedBlockThreshold = 64; // must be at least 2!
  724. #ifdef _USE_TBB
  725. using tbb::task;
  726. class TbbParallelMergeSorter
  727. {
  728. class SplitTask : public tbb::task
  729. {
  730. public:
  731. SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
  732. {
  733. }
  734. virtual task * execute()
  735. {
  736. if (next1->decrement_ref_count() == 0)
  737. spawn(*next1);
  738. if (next2->decrement_ref_count() == 0)
  739. return next2;
  740. return NULL;
  741. }
  742. protected:
  743. task * next1;
  744. task * next2;
  745. };
  746. class BisectTask : public tbb::task
  747. {
  748. public:
  749. BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
  750. : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth), next(_next)
  751. {
  752. }
  753. virtual task * execute()
  754. {
  755. loop
  756. {
  757. //On entry next is assumed to be used once by this function
  758. if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
  759. {
  760. //Create a new task rather than calling sort directly, so that the successor is set up correctly
  761. //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
  762. task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
  763. return sort;
  764. }
  765. void * * result = (depth & 1) ? temp : rows;
  766. void * * src = (depth & 1) ? rows : temp;
  767. size_t n1 = (n+1)/2;
  768. size_t n2 = n-n1;
  769. task * mergeTask;
  770. if (depth < sorter.parallelMergeDepth)
  771. {
  772. unsigned partitions = sorter.numPartitionCores() >> depth;
  773. if (partitions > 1)
  774. {
  775. PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
  776. for (unsigned i=0; i < partitions; i++)
  777. {
  778. MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  779. mergeFwdTask->set_ref_count(1);
  780. MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  781. mergeRevTask->set_ref_count(1);
  782. splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
  783. }
  784. next->decrement_ref_count();
  785. mergeTask = splitTask;
  786. }
  787. else
  788. {
  789. task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
  790. mergeFwdTask->set_ref_count(1);
  791. task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
  792. mergeRevTask->set_ref_count(1);
  793. mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
  794. }
  795. }
  796. else
  797. {
  798. mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
  799. }
  800. mergeTask->set_ref_count(2);
  801. task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
  802. spawn(*bisectRightTask);
  803. //recurse directly on the left side rather than creating a new task
  804. n = n1;
  805. depth = depth+1;
  806. next = mergeTask;
  807. }
  808. }
  809. protected:
  810. TbbParallelMergeSorter & sorter;
  811. void ** rows;
  812. void ** temp;
  813. task * next;
  814. size_t n;
  815. unsigned depth;
  816. };
  817. class SubSortTask : public tbb::task
  818. {
  819. public:
  820. SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
  821. : sorter(_sorter), rows(_rows), n(_n), temp(_temp), depth(_depth)
  822. {
  823. }
  824. virtual task * execute()
  825. {
  826. mergeSort(rows, n, sorter.compare, temp, depth);
  827. return NULL;
  828. }
  829. protected:
  830. TbbParallelMergeSorter & sorter;
  831. void ** rows;
  832. void ** temp;
  833. size_t n;
  834. unsigned depth;
  835. };
  836. class MergeTask : public tbb::task
  837. {
  838. public:
  839. MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  840. : compare(_compare),result(_result), n1(_n1), src1(_src1), n2(_n2), src2(_src2), n(_n)
  841. {
  842. }
  843. virtual task * execute()
  844. {
  845. //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
  846. if (n1 == 0)
  847. {
  848. assertex(n <= n2);
  849. clonePartition(result, n, src2);
  850. }
  851. else if (n2 == 0)
  852. {
  853. assertex(n <= n1);
  854. clonePartition(result, n, src1);
  855. }
  856. else
  857. mergePartitions(compare, result, n1, src1, n2, src2, n);
  858. return NULL;
  859. }
  860. void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
  861. {
  862. src1 += deltaLeft;
  863. n1 = numLeft;
  864. src2 += deltaRight;
  865. n2 = numRight;
  866. result += (deltaLeft + deltaRight);
  867. n = num;
  868. }
  869. protected:
  870. const ICompare & compare;
  871. void * * result;
  872. void * * src1;
  873. void * * src2;
  874. size_t n1;
  875. size_t n2;
  876. size_t n;
  877. };
  878. class MergeRevTask : public MergeTask
  879. {
  880. public:
  881. MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  882. : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
  883. {
  884. }
  885. virtual task * execute()
  886. {
  887. if (n1 == 0)
  888. {
  889. assertex(n <= n2);
  890. //This is a reverse merge, so copy n from the end of the input
  891. unsigned delta = n2 - n;
  892. clonePartition(result + delta, n, src2 + delta);
  893. }
  894. else if (n2 == 0)
  895. {
  896. assertex(n <= n1);
  897. unsigned delta = n1 - n;
  898. clonePartition(result + delta, n, src1 + delta);
  899. }
  900. else
  901. mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
  902. return NULL;
  903. }
  904. };
  905. class PartitionSplitTask : public tbb::task
  906. {
  907. public:
  908. PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
  909. : numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2), compare(_compare)
  910. {
  911. //These could be local variables in calculatePartitions(), but placed here to simplify cleanup. (Should consider using alloca)
  912. posLeft = new size_t[numPartitions+1];
  913. posRight = new size_t[numPartitions+1];
  914. tasks = new MergeTask *[numPartitions*2];
  915. for (unsigned i=0; i < numPartitions*2; i++)
  916. tasks[i] = NULL;
  917. }
  918. ~PartitionSplitTask()
  919. {
  920. delete [] posLeft;
  921. delete [] posRight;
  922. delete [] tasks;
  923. }
  924. void calculatePartitions()
  925. {
  926. #ifdef PARANOID
  927. {
  928. for (unsigned ix=1; ix<n1; ix++)
  929. if (compare.docompare(src1[ix-1], src1[ix]) > 0)
  930. printf("Failure left@%u\n", ix);
  931. }
  932. if (false)
  933. {
  934. for (unsigned ix=1; ix<n2; ix++)
  935. if (compare.docompare(src2[ix-1], src2[ix]) > 0)
  936. printf("Failure right@%u\n", ix);
  937. }
  938. #endif
  939. //If dividing into P parts, select S*P-1 even points from each side.
  940. unsigned numSamples = numPartitionSamples*numPartitions-1;
  941. QuantilePositionIterator iterLeft(n1, numSamples+1, false);
  942. QuantilePositionIterator iterRight(n2, numSamples+1, false);
  943. iterLeft.first();
  944. iterRight.first();
  945. size_t prevLeft = 0;
  946. size_t prevRight =0;
  947. posLeft[0] = 0;
  948. posRight[0] = 0;
  949. //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
  950. //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
  951. //=> pick samples [0, 2*numSamples, 4*numSamples ...]
  952. //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
  953. for (unsigned part = 1; part < numPartitions; part++)
  954. {
  955. unsigned numToSkip = numPartitionSamples*2;
  956. if (part == 1)
  957. numToSkip++;
  958. for (unsigned skip=numToSkip; skip-- != 0; )
  959. {
  960. size_t leftPos = iterLeft.get();
  961. size_t rightPos = iterRight.get();
  962. if (leftPos == n1)
  963. {
  964. if (skip == 0)
  965. {
  966. posLeft[part] = leftPos;
  967. posRight[part] = rightPos;
  968. }
  969. iterRight.next();
  970. }
  971. else if (rightPos == n2)
  972. {
  973. if (skip == 0)
  974. {
  975. posLeft[part] = leftPos;
  976. posRight[part] = rightPos;
  977. }
  978. iterLeft.next();
  979. }
  980. else
  981. {
  982. int c = compare.docompare(src1[leftPos], src2[rightPos]);
  983. if (skip == 0)
  984. {
  985. if (c <= 0)
  986. {
  987. //value in left is smallest. Find the position of the value <= the left value
  988. posLeft[part] = leftPos;
  989. posRight[part] = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
  990. }
  991. else
  992. {
  993. posLeft[part] = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
  994. posRight[part] = rightPos;
  995. }
  996. }
  997. if (c <= 0)
  998. {
  999. iterLeft.next();
  1000. prevLeft = leftPos;
  1001. }
  1002. else
  1003. {
  1004. iterRight.next();
  1005. prevRight = rightPos;
  1006. }
  1007. }
  1008. }
  1009. }
  1010. posLeft[numPartitions] = n1;
  1011. posRight[numPartitions] = n2;
  1012. #ifdef TRACE_PARTITION
  1013. printf("%d,%d -> {", (unsigned)n1, (unsigned)n2);
  1014. #endif
  1015. for (unsigned i= 0; i < numPartitions; i++)
  1016. {
  1017. size_t start = posLeft[i] + posRight[i];
  1018. size_t end = posLeft[i+1] + posRight[i+1];
  1019. size_t num = end - start;
  1020. size_t numFwd = num/2;
  1021. #ifdef TRACE_PARTITION
  1022. printf("([%d..%d],[%d..%d] %d,%d = %d)\n",
  1023. (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
  1024. (unsigned)start, (unsigned)end, (unsigned)num);
  1025. #endif
  1026. MergeTask & mergeFwdTask = *tasks[i*2];
  1027. MergeTask & mergeRevTask = *tasks[i*2+1];
  1028. mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1029. posRight[i], posRight[i+1]-posRight[i],
  1030. numFwd);
  1031. mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1032. posRight[i], posRight[i+1]-posRight[i],
  1033. num-numFwd);
  1034. }
  1035. #ifdef TRACE_PARTITION
  1036. printf("}\n");
  1037. #endif
  1038. }
  1039. virtual task * execute()
  1040. {
  1041. calculatePartitions();
  1042. for (unsigned i=0; i < numPartitions*2; i++)
  1043. {
  1044. if (tasks[i]->decrement_ref_count() == 0)
  1045. spawn(*tasks[i]);
  1046. }
  1047. return NULL;
  1048. }
  1049. void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
  1050. {
  1051. tasks[i*2] = fwd;
  1052. tasks[i*2+1] = rev;
  1053. }
  1054. protected:
  1055. size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
  1056. {
  1057. if (low == high)
  1058. return low;
  1059. while (high - low > 1)
  1060. {
  1061. size_t mid = (low + high) / 2;
  1062. if (compare.docompare(rows[mid], seek) < 0)
  1063. low = mid;
  1064. else
  1065. high = mid;
  1066. }
  1067. if (compare.docompare(rows[low], seek) < 0)
  1068. return low+1;
  1069. return low;
  1070. }
  1071. size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
  1072. {
  1073. if (low == high)
  1074. return low;
  1075. while (high - low > 1)
  1076. {
  1077. size_t mid = (low + high) / 2;
  1078. if (compare.docompare(rows[mid], seek) <= 0)
  1079. low = mid;
  1080. else
  1081. high = mid;
  1082. }
  1083. if (compare.docompare(rows[low], seek) <= 0)
  1084. return low+1;
  1085. return low;
  1086. }
  1087. protected:
  1088. const ICompare & compare;
  1089. unsigned numPartitions;
  1090. size_t n1;
  1091. size_t n2;
  1092. void * * src1;
  1093. void * * src2;
  1094. size_t * posLeft;
  1095. size_t * posRight;
  1096. MergeTask * * tasks;
  1097. };
  1098. public:
  1099. TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
  1100. {
  1101. //The following constants control the number of iterations to be performed in parallel.
  1102. //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
  1103. //The following constants should possibly be tuned on each platform. The following gave a good balance on a 2x8way xeon
  1104. const unsigned extraBisectDepth = 3;
  1105. const unsigned extraParallelMergeDepth = 3;
  1106. unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
  1107. unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
  1108. assertex(numCpus <= (1U << ln2NumCpus));
  1109. //Merge in parallel once it is likely to be beneficial
  1110. parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
  1111. //Aim to execute in parallel until the width is 8*the maximum number of parallel task
  1112. singleThreadDepth = ln2NumCpus + extraBisectDepth;
  1113. partitionCores = numCpus / 2;
  1114. }
  1115. unsigned numPartitionCores() const { return partitionCores; }
  1116. void sortRoot(void ** rows, size_t n, void ** temp)
  1117. {
  1118. task * end = new (task::allocate_root()) tbb::empty_task();
  1119. end->set_ref_count(1+1);
  1120. task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
  1121. end->spawn(*task);
  1122. end->wait_for_all();
  1123. end->destroy(*end);
  1124. }
  1125. public:
  1126. const ICompare & compare;
  1127. unsigned singleThreadDepth;
  1128. unsigned parallelMergeDepth;
  1129. unsigned partitionCores;
  1130. void * * baseRows;
  1131. };
  1132. //-------------------------------------------------------------------------------------------------------------------
  1133. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1134. {
  1135. if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
  1136. {
  1137. msortvecstableinplace(rows, n, compare, temp);
  1138. return;
  1139. }
  1140. TbbParallelMergeSorter sorter(rows, compare);
  1141. sorter.sortRoot(rows, n, temp);
  1142. }
  1143. #else
  1144. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1145. {
  1146. parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
  1147. }
  1148. #endif
  1149. //=========================================================================
  1150. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  1151. {
  1152. bool nochange = true;
  1153. while(1)
  1154. {
  1155. unsigned c = p*2 + 1;
  1156. if(c >= num)
  1157. return nochange;
  1158. if(c+1 < num)
  1159. {
  1160. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  1161. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  1162. ++c;
  1163. }
  1164. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1165. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1166. return nochange;
  1167. nochange = false;
  1168. unsigned r = heap[c];
  1169. heap[c] = heap[p];
  1170. heap[p] = r;
  1171. p = c;
  1172. }
  1173. }
  1174. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  1175. {
  1176. bool nochange = true;
  1177. while(c > 0)
  1178. {
  1179. unsigned p = (unsigned)(c-1)/2;
  1180. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1181. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1182. return nochange;
  1183. nochange = false;
  1184. unsigned r = heap[c];
  1185. heap[c] = heap[p];
  1186. heap[p] = r;
  1187. c = p;
  1188. }
  1189. return nochange;
  1190. }
  1191. //=========================================================================
  1192. #include <assert.h>
  1193. #include <stdio.h>
  1194. #include <stdlib.h>
  1195. #include <fcntl.h>
  1196. #include <string.h>
  1197. #include <stddef.h>
  1198. #include <time.h>
  1199. #ifdef _WIN32
  1200. #include <io.h>
  1201. #include <sys\types.h>
  1202. #include <sys\stat.h>
  1203. #else
  1204. #include <sys/types.h>
  1205. #include <sys/stat.h>
  1206. #endif
  1207. #ifndef off_t
  1208. #define off_t __int64
  1209. #endif
  1210. typedef void ** VECTOR;
  1211. interface IMergeSorter
  1212. {
  1213. public:
  1214. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  1215. };
  1216. #define INSERTMAX 10000
  1217. #define BUFFSIZE 0x100000 // used for output buffer
  1218. //==================================================================================================
  1219. #ifdef DOUBLE_COMPARE
  1220. #define BuffCompare(a, b) ((MSbuffcomp(a,b,inbuffers,mergeheap,icmp)+MSbuffcomp(a,b,inbuffers,mergeheap,icmp))/2)
  1221. #else
  1222. #define BuffCompare(a, b) MSbuffcomp(a,b,inbuffers,mergeheap,icmp)
  1223. #endif
  1224. static inline int MSbuffcomp(unsigned a,unsigned b, void **inbuffers, unsigned *mergeheap, ICompare *icmp)
  1225. {
  1226. int ret = icmp->docompare(inbuffers[mergeheap[a]], inbuffers[mergeheap[b]]);
  1227. if (ret==0)
  1228. ret = (int)mergeheap[a]-mergeheap[b];
  1229. return ret;
  1230. }
  1231. class CRowStreamMerger
  1232. {
  1233. const void **pending;
  1234. size32_t *recsize;
  1235. unsigned *mergeheap;
  1236. unsigned activeInputs;
  1237. count_t recno;
  1238. const ICompare *icmp;
  1239. bool partdedup;
  1240. IRowProvider &provider;
  1241. MemoryAttr workingbuf;
  1242. #ifdef _DEBUG
  1243. bool *stopped;
  1244. MemoryAttr ma;
  1245. #endif
  1246. inline int buffCompare(unsigned a, unsigned b)
  1247. {
  1248. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  1249. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  1250. }
  1251. bool promote(unsigned p)
  1252. {
  1253. activeInputs--;
  1254. if(activeInputs == p)
  1255. return false;
  1256. mergeheap[p] = mergeheap[activeInputs];
  1257. return true;
  1258. }
  1259. inline bool siftDown(unsigned p)
  1260. {
  1261. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  1262. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  1263. bool nochange = true;
  1264. while(1)
  1265. {
  1266. unsigned c = p*2 + 1;
  1267. if(c >= activeInputs)
  1268. return nochange;
  1269. if(c+1 < activeInputs)
  1270. {
  1271. int childcmp = buffCompare(c+1, c);
  1272. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  1273. ++c;
  1274. }
  1275. int cmp = buffCompare(c, p);
  1276. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  1277. return nochange;
  1278. nochange = false;
  1279. unsigned r = mergeheap[c];
  1280. mergeheap[c] = mergeheap[p];
  1281. mergeheap[p] = r;
  1282. p = c;
  1283. }
  1284. }
  1285. void siftDownDedupTop()
  1286. {
  1287. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  1288. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  1289. if(activeInputs < 2)
  1290. return;
  1291. unsigned c = 1;
  1292. int childcmp = 1;
  1293. if(activeInputs >= 3)
  1294. {
  1295. childcmp = buffCompare(2, 1);
  1296. if(childcmp < 0)
  1297. c = 2;
  1298. }
  1299. int cmp = buffCompare(c, 0);
  1300. if(cmp > 0)
  1301. return;
  1302. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  1303. while(cmp <= 0)
  1304. {
  1305. if(cmp == 0)
  1306. {
  1307. if(mergeheap[c] < mergeheap[0])
  1308. {
  1309. unsigned r = mergeheap[c];
  1310. mergeheap[c] = mergeheap[0];
  1311. mergeheap[0] = r;
  1312. }
  1313. if(!pullInput(mergeheap[c]))
  1314. if(!promote(c))
  1315. break;
  1316. siftDown(c);
  1317. }
  1318. else
  1319. {
  1320. unsigned r = mergeheap[c];
  1321. mergeheap[c] = mergeheap[0];
  1322. mergeheap[0] = r;
  1323. if(siftDown(c))
  1324. break;
  1325. }
  1326. cmp = buffCompare(c, 0);
  1327. }
  1328. // the following loop ensures the uniqueness property holds on the other branch too
  1329. c = 3-c;
  1330. if(activeInputs <= c)
  1331. return;
  1332. while(childcmp == 0)
  1333. {
  1334. if(mergeheap[c] < mergeheap[0])
  1335. {
  1336. unsigned r = mergeheap[c];
  1337. mergeheap[c] = mergeheap[0];
  1338. mergeheap[0] = r;
  1339. }
  1340. if(!pullInput(mergeheap[c]))
  1341. if(!promote(c))
  1342. break;
  1343. siftDown(c);
  1344. childcmp = buffCompare(c, 0);
  1345. }
  1346. }
  1347. void init()
  1348. {
  1349. if (activeInputs>0) {
  1350. // setup heap property
  1351. if (activeInputs >= 2)
  1352. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  1353. siftDown(p);
  1354. if (partdedup)
  1355. siftDownDedupTop();
  1356. else
  1357. siftDown(0);
  1358. }
  1359. recno = 0;
  1360. }
  1361. inline count_t num() const { return recno; }
  1362. inline bool _next()
  1363. {
  1364. if (!activeInputs)
  1365. return false;
  1366. if (recno) {
  1367. if(!pullInput(mergeheap[0]))
  1368. if(!promote(0))
  1369. return false;
  1370. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  1371. if(partdedup)
  1372. siftDownDedupTop();
  1373. else
  1374. siftDown(0);
  1375. }
  1376. recno++;
  1377. return true;
  1378. }
  1379. inline bool eof() const { return activeInputs==0; }
  1380. bool pullInput(unsigned i)
  1381. {
  1382. if (pending[i]) {
  1383. assertex(partdedup);
  1384. provider.releaseRow(pending[i]);
  1385. }
  1386. pending[i] = provider.nextRow(i);
  1387. if (pending[i])
  1388. return true;
  1389. provider.stop(i);
  1390. #ifdef _DEBUG
  1391. assertex(!stopped[i]);
  1392. stopped[i] = true;
  1393. #endif
  1394. return false;
  1395. }
  1396. public:
  1397. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  1398. : provider(_provider)
  1399. {
  1400. partdedup = _partdedup;
  1401. icmp = _icmp;
  1402. recsize = NULL;
  1403. activeInputs = 0;
  1404. #ifdef _DEBUG
  1405. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  1406. memset(stopped,0,numstreams*sizeof(bool));
  1407. #endif
  1408. unsigned i;
  1409. recsize = NULL;
  1410. if (numstreams) {
  1411. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  1412. pending = (const void **)buf;
  1413. mergeheap = (unsigned *)(pending+numstreams);
  1414. for (i=0;i<numstreams;i++) {
  1415. pending[i] = NULL;
  1416. if (pullInput(i))
  1417. mergeheap[activeInputs++] = i;
  1418. }
  1419. }
  1420. else {
  1421. pending = NULL;
  1422. mergeheap = NULL;
  1423. }
  1424. init();
  1425. }
  1426. void stop()
  1427. {
  1428. while (activeInputs) {
  1429. activeInputs--;
  1430. if (pending[mergeheap[activeInputs]]) {
  1431. provider.releaseRow(pending[mergeheap[activeInputs]]);
  1432. #ifdef _DEBUG
  1433. assertex(!stopped[mergeheap[activeInputs]]);
  1434. stopped[mergeheap[activeInputs]] = true;
  1435. #endif
  1436. provider.stop(mergeheap[activeInputs]);
  1437. }
  1438. }
  1439. pending = NULL;
  1440. mergeheap = NULL;
  1441. workingbuf.clear();
  1442. }
  1443. ~CRowStreamMerger()
  1444. {
  1445. stop();
  1446. }
  1447. inline const void * next()
  1448. {
  1449. if (!_next())
  1450. return NULL;
  1451. unsigned strm = mergeheap[0];
  1452. const void *row = pending[strm];
  1453. pending[strm] = NULL;
  1454. return row;
  1455. }
  1456. };
  1457. class CMergeRowStreams : public CInterface, implements IRowStream
  1458. {
  1459. protected:
  1460. CRowStreamMerger *merger;
  1461. bool eos;
  1462. class cProvider: public CInterface, implements IRowProvider
  1463. {
  1464. IArrayOf<IRowStream> ostreams;
  1465. IRowStream **streams;
  1466. Linked<IRowLinkCounter> linkcounter;
  1467. const void *nextRow(unsigned idx)
  1468. {
  1469. return streams[idx]->nextRow();
  1470. };
  1471. void linkRow(const void *row)
  1472. {
  1473. linkcounter->linkRow(row);
  1474. }
  1475. void releaseRow(const void *row)
  1476. {
  1477. linkcounter->releaseRow(row);
  1478. }
  1479. void stop(unsigned idx)
  1480. {
  1481. streams[idx]->stop();
  1482. }
  1483. public:
  1484. IMPLEMENT_IINTERFACE;
  1485. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  1486. : linkcounter(_linkcounter)
  1487. {
  1488. ostreams.ensure(numstreams);
  1489. unsigned n = 0;
  1490. while (n<numstreams)
  1491. ostreams.append(*LINK(_streams[n++]));
  1492. streams = ostreams.getArray();
  1493. }
  1494. } *streamprovider;
  1495. public:
  1496. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  1497. {
  1498. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  1499. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  1500. eos = _numstreams==0;
  1501. }
  1502. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  1503. {
  1504. streamprovider = NULL;
  1505. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  1506. eos = _numstreams==0;
  1507. }
  1508. ~CMergeRowStreams()
  1509. {
  1510. delete merger;
  1511. delete streamprovider;
  1512. }
  1513. IMPLEMENT_IINTERFACE;
  1514. void stop()
  1515. {
  1516. if (!eos) {
  1517. merger->stop();
  1518. eos = true;
  1519. }
  1520. }
  1521. const void *nextRow()
  1522. {
  1523. if (eos)
  1524. return NULL;
  1525. const void *r = merger->next();
  1526. if (!r) {
  1527. stop(); // think ok to stop early
  1528. return NULL;
  1529. }
  1530. return r;
  1531. }
  1532. };
  1533. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  1534. {
  1535. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  1536. }
  1537. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  1538. {
  1539. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  1540. }