jsort.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #ifdef _DEBUG
  24. // #define PARANOID
  25. // #define DOUBLE_COMPARE
  26. //#define TESTPARSORT
  27. //#define MCMERGESTATS
  28. #endif
  29. #define PARALLEL_GRANULARITY 1024
  30. static bool sortParallel(unsigned &numcpus)
  31. {
  32. static unsigned numCPUs = 0;
  33. if (numCPUs==0) {
  34. numCPUs = getAffinityCpus();
  35. }
  36. if ((numcpus==0)||(numcpus>numCPUs))
  37. numcpus = numCPUs;
  38. #ifdef TESTPARSORT
  39. numcpus = 2;
  40. return true; // to test
  41. #endif
  42. return (numcpus>1);
  43. }
  44. //define two variants of the same insertion function.
  45. #define COMPARE(search,position) compare(search,position)
  46. #define INSERT(position,search) memmove(position,search, width)
  47. void * binary_add(const void *newitem, const void *base,
  48. size32_t nmemb,
  49. size32_t width,
  50. int ( *compare)(const void *_e1, const void *_e2),
  51. bool * ItemAdded)
  52. {
  53. #include "jsort.inc"
  54. }
  55. #undef COMPARE
  56. #undef INSERT
  57. //---------------------------------------------------------------------------
  58. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  59. #define INSERT(position,search) *(const void * *)(position) = search
  60. #define NEVER_ADD
  61. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  62. size32_t nmemb,
  63. sortCompareFunction compare,
  64. bool * ItemAdded)
  65. {
  66. #define width sizeof(void*)
  67. #include "jsort.inc"
  68. #undef width
  69. }
  70. #undef NEVER_ADD
  71. #undef INSERT
  72. #undef COMPARE
  73. //---------------------------------------------------------------------------
  74. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  75. #define INSERT(position,search) *(const void * *)(position) = search
  76. #define NEVER_ADD
  77. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  78. size32_t nmemb,
  79. ICompare & compare,
  80. bool * ItemAdded)
  81. {
  82. #define width sizeof(void*)
  83. #include "jsort.inc"
  84. #undef width
  85. }
  86. #undef NEVER_ADD
  87. #undef INSERT
  88. #undef COMPARE
  89. //---------------------------------------------------------------------------
  90. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  91. #define INSERT(position,search) *(const void * *)(position) = search
  92. #define ALWAYS_ADD
  93. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  94. size32_t nmemb,
  95. sortCompareFunction compare)
  96. {
  97. #define width sizeof(void*)
  98. #include "jsort.inc"
  99. #undef width
  100. }
  101. #undef ALWAYS_ADD
  102. #undef INSERT
  103. #undef COMPARE
  104. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  105. #define INSERT(position,search) *(const void * *)(position) = search
  106. #define ALWAYS_ADD
  107. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  108. size32_t nmemb,
  109. ICompare const & compare)
  110. {
  111. #define width sizeof(void*)
  112. #include "jsort.inc"
  113. #undef width
  114. }
  115. #undef ALWAYS_ADD
  116. #undef INSERT
  117. #undef COMPARE
  118. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  119. #define INSERT(position,search) *(const void * *)(position) = search
  120. #define ALWAYS_ADD
  121. #define SEEK_LAST_MATCH
  122. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  123. size32_t nmemb,
  124. sortCompareFunction compare)
  125. {
  126. #define width sizeof(void*)
  127. #include "jsort.inc"
  128. #undef width
  129. }
  130. #undef SEEK_LAST_MATCH
  131. #undef ALWAYS_ADD
  132. #undef INSERT
  133. #undef COMPARE
  134. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  135. #define INSERT(position,search) *(const void * *)(position) = search
  136. #define ALWAYS_ADD
  137. #define SEEK_LAST_MATCH
  138. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  139. size32_t nmemb,
  140. ICompare const & compare)
  141. {
  142. #define width sizeof(void*)
  143. #include "jsort.inc"
  144. #undef width
  145. }
  146. #undef SEEK_LAST_MATCH
  147. #undef ALWAYS_ADD
  148. #undef INSERT
  149. #undef COMPARE
  150. //=========================================================================
  151. // optimized quicksort for array of pointers to fixed size objects
  152. typedef void * ELEMENT;
  153. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  154. #define VECTOR _VECTOR
  155. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  156. #define SWAP swap
  157. #define CMP(a,b) memcmp(*(a),*(b),es)
  158. #define MED3(a,b,c) med3a(a,b,c,es)
  159. #define RECURSE(a,b) qsortvec(a, b, es)
  160. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  161. {
  162. return CMP(a, b) < 0 ?
  163. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  164. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  165. }
  166. void qsortvec(void **a, size32_t n, size32_t es)
  167. #include "jsort2.inc"
  168. #undef CMP
  169. #undef MED3
  170. #undef RECURSE
  171. //---------------------------------------------------------------------------
  172. #define CMP(a,b) (compare(*(a),*(b)))
  173. #define MED3(a,b,c) med3c(a,b,c,compare)
  174. #define RECURSE(a,b) qsortvec(a, b, compare)
  175. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  176. {
  177. return CMP(a, b) < 0 ?
  178. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  179. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  180. }
  181. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  182. #include "jsort2.inc"
  183. #undef CMP
  184. #undef MED3
  185. #undef RECURSE
  186. //---------------------------------------------------------------------------
  187. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  188. #define MED3(a,b,c) med3ic(a,b,c,compare)
  189. #define RECURSE(a,b) qsortvec(a, b, compare)
  190. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  191. {
  192. return CMP(a, b) < 0 ?
  193. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  194. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  195. }
  196. void qsortvec(void **a, size32_t n, const ICompare & compare)
  197. #include "jsort2.inc"
  198. // Parallel version (only 2 threads currently)
  199. class cParQSortBase
  200. {
  201. struct sJobItem
  202. {
  203. unsigned start;
  204. unsigned num;
  205. };
  206. NonReentrantSpinLock joblock;
  207. QueueOf<sJobItem,false> jobq;
  208. Semaphore jobqsem;
  209. unsigned waiting;
  210. unsigned numsubthreads;
  211. bool done;
  212. class cThread: public Thread
  213. {
  214. cParQSortBase *parent;
  215. public:
  216. cThread(cParQSortBase *_parent)
  217. : Thread("cParQSort")
  218. {
  219. parent = _parent;
  220. }
  221. int run()
  222. {
  223. parent->run();
  224. return 0;
  225. }
  226. } **threads;
  227. bool waitForWork(unsigned &s,unsigned &n)
  228. {
  229. NonReentrantSpinBlock block(joblock);
  230. while (!done) {
  231. sJobItem *qi = jobq.dequeue();
  232. if (qi) {
  233. s = qi->start;
  234. n = qi->num;
  235. delete qi;
  236. return true;
  237. }
  238. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  239. done = true;
  240. jobqsem.signal(waiting);
  241. break;
  242. }
  243. waiting++;
  244. NonReentrantSpinUnblock unblock(joblock);
  245. jobqsem.wait();
  246. }
  247. s = 0; // remove uninitialised variable warnings
  248. n = 0;
  249. return false;
  250. }
  251. public:
  252. cParQSortBase(unsigned _numsubthreads)
  253. {
  254. numsubthreads = _numsubthreads;
  255. done = false;
  256. waiting = 0;
  257. threads = new cThread*[numsubthreads];
  258. for (unsigned i=0;i<numsubthreads;i++)
  259. threads[i] = new cThread(this);
  260. }
  261. ~cParQSortBase()
  262. {
  263. for (unsigned i=0;i<numsubthreads;i++)
  264. threads[i]->Release();
  265. delete [] threads;
  266. }
  267. void start()
  268. {
  269. for (unsigned i=0;i<numsubthreads;i++)
  270. threads[i]->start();
  271. }
  272. void subsort(unsigned s, unsigned n)
  273. {
  274. do {
  275. sJobItem *qi;
  276. while (n>PARALLEL_GRANULARITY) {
  277. unsigned r1;
  278. unsigned r2;
  279. partition(s, n, r1, r2);
  280. unsigned n2 = n+s-r2;
  281. if (r1==s) {
  282. n = n2;
  283. s = r2;
  284. }
  285. else {
  286. if (n2!=0) {
  287. qi = new sJobItem;
  288. qi->num = n2;
  289. qi->start = r2;
  290. NonReentrantSpinBlock block(joblock);
  291. jobq.enqueue(qi);
  292. if (waiting) {
  293. jobqsem.signal(waiting);
  294. waiting = 0;
  295. }
  296. }
  297. n = r1-s;
  298. }
  299. }
  300. serialsort(s,n);
  301. NonReentrantSpinBlock block(joblock);
  302. if (waiting==numsubthreads) { // well we are done so are rest
  303. done = true;
  304. jobqsem.signal(waiting);
  305. break;
  306. }
  307. }
  308. while(waitForWork(s,n));
  309. }
  310. void run()
  311. {
  312. unsigned s;
  313. unsigned n;
  314. if (waitForWork(s,n))
  315. subsort(s,n);
  316. }
  317. void join()
  318. {
  319. for (unsigned i=0;i<numsubthreads;i++)
  320. threads[i]->join();
  321. }
  322. virtual void serialsort(unsigned from, unsigned len)=0;
  323. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  324. };
  325. #define DOPARTITION \
  326. VECTOR a = array+s; \
  327. VECTOR pm = a + (n / 2); \
  328. VECTOR pl = a; \
  329. VECTOR pn = a + (n - 1) ; \
  330. if (n > 40) { \
  331. unsigned d = (n / 8); \
  332. pl = MED3(pl, pl + d, pl + 2 * d); \
  333. pm = MED3(pm - d, pm, pm + d); \
  334. pn = MED3(pn - 2 * d, pn - d, pn); \
  335. } \
  336. pm = MED3(pl, pm, pn); \
  337. SWAP(a, pm); \
  338. VECTOR pa = a + 1; \
  339. VECTOR pb = pa; \
  340. VECTOR pc = a + (n - 1); \
  341. VECTOR pd = pc; \
  342. int r; \
  343. for (;;) { \
  344. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  345. if (r == 0) { \
  346. SWAP(pa, pb); \
  347. pa++; \
  348. } \
  349. pb++; \
  350. } \
  351. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  352. if (r == 0) { \
  353. SWAP(pc, pd); \
  354. pd--; \
  355. } \
  356. pc--; \
  357. } \
  358. if (pb > pc) \
  359. break; \
  360. SWAP(pb, pc); \
  361. pb++; \
  362. pc--; \
  363. } \
  364. pn = a + n; \
  365. r = MIN(pa - a, pb - pa); \
  366. VECTOR v1 = a; \
  367. VECTOR v2 = pb-r; \
  368. while (r) { \
  369. SWAP(v1,v2); v1++; v2++; r--; \
  370. }; \
  371. r = MIN(pd - pc, pn - pd - 1); \
  372. v1 = pb; \
  373. v2 = pn-r; \
  374. while (r) { \
  375. SWAP(v1,v2); v1++; v2++; r--; \
  376. }; \
  377. r1 = (pb-pa)+s; \
  378. r2 = n-(pd-pc)+s;
  379. class cParQSort: public cParQSortBase
  380. {
  381. VECTOR array;
  382. const ICompare &compare;
  383. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  384. {
  385. DOPARTITION
  386. }
  387. void serialsort(unsigned from, unsigned len)
  388. {
  389. qsortvec(array+from,len,compare);
  390. }
  391. public:
  392. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  393. : cParQSortBase(_numsubthreads), compare(_compare)
  394. {
  395. array = _a;
  396. cParQSortBase::start();
  397. }
  398. };
  399. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  400. {
  401. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  402. qsortvec(a,n,compare);
  403. return;
  404. }
  405. cParQSort sorter(a,compare,numcpus-1);
  406. sorter.subsort(0,n);
  407. sorter.join();
  408. #ifdef TESTPARSORT
  409. for (unsigned i=1;i<n;i++)
  410. if (compare.docompare(a[i-1],a[i])>0)
  411. ERRLOG("parqsortvec failed %d",i);
  412. #endif
  413. }
  414. #undef CMP
  415. #undef MED3
  416. #undef RECURSE
  417. //---------------------------------------------------------------------------
  418. #define CMP(a,b) cmpic2(a,b,compare1,compare2)
  419. #define MED3(a,b,c) med3ic2(a,b,c,compare1,compare2)
  420. #define RECURSE(a,b) qsortvec(a, b, compare1, compare2)
  421. static inline int cmpic2(VECTOR a, VECTOR b, const ICompare & compare1, const ICompare & compare2)
  422. {
  423. int ret = compare1.docompare(*(a),*(b));
  424. if (ret==0)
  425. ret = compare2.docompare(*(a),*(b));
  426. return ret;
  427. }
  428. static inline VECTOR med3ic2(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare1, const ICompare & compare2)
  429. {
  430. return CMP(a, b) < 0 ?
  431. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  432. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  433. }
  434. void qsortvec(void **a, size32_t n, const ICompare & compare1, const ICompare & compare2)
  435. #include "jsort2.inc"
  436. class cParQSort2: public cParQSortBase
  437. {
  438. VECTOR array;
  439. const ICompare &compare1;
  440. const ICompare &compare2;
  441. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  442. {
  443. DOPARTITION
  444. }
  445. void serialsort(unsigned from, unsigned len)
  446. {
  447. qsortvec(array+from,len,compare1,compare2);
  448. }
  449. public:
  450. cParQSort2(VECTOR _a,const ICompare &_compare1,const ICompare &_compare2, unsigned _numsubthreads)
  451. : cParQSortBase(_numsubthreads),compare1(_compare1), compare2(_compare2)
  452. {
  453. array = _a;
  454. cParQSortBase::start();
  455. }
  456. };
  457. void parqsortvec(void **a, size32_t n, const ICompare & compare1, const ICompare & compare2,unsigned numcpus)
  458. {
  459. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  460. qsortvec(a,n,compare1,compare2);
  461. return;
  462. }
  463. cParQSort2 sorter(a,compare1,compare2,numcpus-1);
  464. sorter.subsort(0,n);
  465. sorter.join();
  466. #ifdef TESTPARSORT
  467. for (unsigned i=1;i<n;i++) {
  468. int cmp = compare1.docompare(a[i-1],a[i]);
  469. if (cmp>0)
  470. ERRLOG("parqsortvec(2,1) failed %d",i);
  471. else if ((cmp==0)&&(compare2.docompare(a[i-1],a[i])>0))
  472. ERRLOG("parqsortvec(2,2) failed %d",i);
  473. }
  474. #endif
  475. }
  476. #undef CMP
  477. #undef MED3
  478. #undef RECURSE
  479. //---------------------------------------------------------------------------
  480. #undef VECTOR
  481. #undef SWAP
  482. typedef void *** _IVECTOR;
  483. #define VECTOR _IVECTOR
  484. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  485. #define SWAP swapind
  486. #define CMP(a,b) cmpicindstable(a,b,compare)
  487. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  488. {
  489. int ret = compare.docompare(**a,**b);
  490. if (ret==0)
  491. if (*a>*b)
  492. ret = 1;
  493. else if (*a<*b)
  494. ret = -1;
  495. return ret;
  496. }
  497. #define MED3(a,b,c) med3ic(a,b,c,compare)
  498. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  499. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  500. {
  501. return CMP(a, b) < 0 ?
  502. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  503. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  504. }
  505. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  506. #include "jsort2.inc"
  507. class cParQSortStable: public cParQSortBase
  508. {
  509. VECTOR array;
  510. const ICompare &compare;
  511. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  512. {
  513. DOPARTITION
  514. }
  515. void serialsort(unsigned from, unsigned len)
  516. {
  517. doqsortvecstable(array+from,len,compare);
  518. }
  519. public:
  520. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  521. : cParQSortBase(_numsubthreads),compare(_compare)
  522. {
  523. array = _a;
  524. cParQSortBase::start();
  525. }
  526. };
  527. #undef CMP
  528. #undef CMP1
  529. #undef MED3
  530. #undef RECURSE
  531. #undef VECTOR
  532. void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  533. {
  534. for(unsigned i=0; i<n; ++i)
  535. index[i] = rows+i;
  536. doqsortvecstable(index, n, compare);
  537. }
  538. void parqsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  539. {
  540. for(unsigned i=0; i<n; ++i)
  541. index[i] = rows+i;
  542. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  543. doqsortvecstable(index,n,compare);
  544. return;
  545. }
  546. cParQSortStable sorter(index,compare,numcpus-1);
  547. sorter.subsort(0,n);
  548. sorter.join();
  549. }
  550. //=========================================================================
  551. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  552. {
  553. bool nochange = true;
  554. while(1)
  555. {
  556. unsigned c = p*2 + 1;
  557. if(c >= num)
  558. return nochange;
  559. if(c+1 < num)
  560. {
  561. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  562. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  563. ++c;
  564. }
  565. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  566. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  567. return nochange;
  568. nochange = false;
  569. unsigned r = heap[c];
  570. heap[c] = heap[p];
  571. heap[p] = r;
  572. p = c;
  573. }
  574. }
  575. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  576. {
  577. bool nochange = true;
  578. while(c > 0)
  579. {
  580. unsigned p = (unsigned)(c-1)/2;
  581. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  582. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  583. return nochange;
  584. nochange = false;
  585. unsigned r = heap[c];
  586. heap[c] = heap[p];
  587. heap[p] = r;
  588. c = p;
  589. }
  590. return nochange;
  591. }
  592. //=========================================================================
  593. #include <assert.h>
  594. #include <stdio.h>
  595. #include <stdlib.h>
  596. #include <fcntl.h>
  597. #include <string.h>
  598. #include <stddef.h>
  599. #include <time.h>
  600. #ifdef _WIN32
  601. #include <io.h>
  602. #include <sys\types.h>
  603. #include <sys\stat.h>
  604. #else
  605. #include <sys/types.h>
  606. #include <sys/stat.h>
  607. #endif
  608. #ifndef off_t
  609. #define off_t __int64
  610. #endif
  611. typedef void ** VECTOR;
  612. interface IMergeSorter
  613. {
  614. public:
  615. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  616. };
  617. #define INSERTMAX 10000
  618. #define BUFFSIZE 0x100000 // used for output buffer
  619. //==================================================================================================
  620. #ifdef DOUBLE_COMPARE
  621. #define BuffCompare(a, b) ((MSbuffcomp(a,b,inbuffers,mergeheap,icmp)+MSbuffcomp(a,b,inbuffers,mergeheap,icmp))/2)
  622. #else
  623. #define BuffCompare(a, b) MSbuffcomp(a,b,inbuffers,mergeheap,icmp)
  624. #endif
  625. static inline int MSbuffcomp(unsigned a,unsigned b, void **inbuffers, unsigned *mergeheap, ICompare *icmp)
  626. {
  627. int ret = icmp->docompare(inbuffers[mergeheap[a]], inbuffers[mergeheap[b]]);
  628. if (ret==0)
  629. ret = (int)mergeheap[a]-mergeheap[b];
  630. return ret;
  631. }
  632. class CRowStreamMerger
  633. {
  634. const void **pending;
  635. size32_t *recsize;
  636. unsigned *mergeheap;
  637. unsigned activeInputs;
  638. count_t recno;
  639. ICompare *icmp;
  640. bool partdedup;
  641. IRowProvider &provider;
  642. MemoryAttr workingbuf;
  643. #ifdef _DEBUG
  644. bool *stopped;
  645. MemoryAttr ma;
  646. #endif
  647. inline int buffCompare(unsigned a, unsigned b)
  648. {
  649. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  650. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  651. }
  652. bool promote(unsigned p)
  653. {
  654. activeInputs--;
  655. if(activeInputs == p)
  656. return false;
  657. mergeheap[p] = mergeheap[activeInputs];
  658. return true;
  659. }
  660. inline bool siftDown(unsigned p)
  661. {
  662. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  663. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  664. bool nochange = true;
  665. while(1)
  666. {
  667. unsigned c = p*2 + 1;
  668. if(c >= activeInputs)
  669. return nochange;
  670. if(c+1 < activeInputs)
  671. {
  672. int childcmp = buffCompare(c+1, c);
  673. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  674. ++c;
  675. }
  676. int cmp = buffCompare(c, p);
  677. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  678. return nochange;
  679. nochange = false;
  680. unsigned r = mergeheap[c];
  681. mergeheap[c] = mergeheap[p];
  682. mergeheap[p] = r;
  683. p = c;
  684. }
  685. }
  686. void siftDownDedupTop()
  687. {
  688. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  689. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  690. if(activeInputs < 2)
  691. return;
  692. unsigned c = 1;
  693. int childcmp = 1;
  694. if(activeInputs >= 3)
  695. {
  696. childcmp = buffCompare(2, 1);
  697. if(childcmp < 0)
  698. c = 2;
  699. }
  700. int cmp = buffCompare(c, 0);
  701. if(cmp > 0)
  702. return;
  703. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  704. while(cmp <= 0)
  705. {
  706. if(cmp == 0)
  707. {
  708. if(mergeheap[c] < mergeheap[0])
  709. {
  710. unsigned r = mergeheap[c];
  711. mergeheap[c] = mergeheap[0];
  712. mergeheap[0] = r;
  713. }
  714. if(!pullInput(mergeheap[c]))
  715. if(!promote(c))
  716. break;
  717. siftDown(c);
  718. }
  719. else
  720. {
  721. unsigned r = mergeheap[c];
  722. mergeheap[c] = mergeheap[0];
  723. mergeheap[0] = r;
  724. if(siftDown(c))
  725. break;
  726. }
  727. cmp = buffCompare(c, 0);
  728. }
  729. // the following loop ensures the uniqueness property holds on the other branch too
  730. c = 3-c;
  731. if(activeInputs <= c)
  732. return;
  733. while(childcmp == 0)
  734. {
  735. if(mergeheap[c] < mergeheap[0])
  736. {
  737. unsigned r = mergeheap[c];
  738. mergeheap[c] = mergeheap[0];
  739. mergeheap[0] = r;
  740. }
  741. if(!pullInput(mergeheap[c]))
  742. if(!promote(c))
  743. break;
  744. siftDown(c);
  745. childcmp = buffCompare(c, 0);
  746. }
  747. }
  748. void init()
  749. {
  750. if (activeInputs>0) {
  751. // setup heap property
  752. if (activeInputs >= 2)
  753. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  754. siftDown(p);
  755. if (partdedup)
  756. siftDownDedupTop();
  757. else
  758. siftDown(0);
  759. }
  760. recno = 0;
  761. }
  762. inline count_t num() const { return recno; }
  763. inline bool _next()
  764. {
  765. if (!activeInputs)
  766. return false;
  767. if (recno) {
  768. if(!pullInput(mergeheap[0]))
  769. if(!promote(0))
  770. return false;
  771. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  772. if(partdedup)
  773. siftDownDedupTop();
  774. else
  775. siftDown(0);
  776. }
  777. recno++;
  778. return true;
  779. }
  780. inline bool eof() const { return activeInputs==0; }
  781. bool pullInput(unsigned i)
  782. {
  783. if (pending[i]) {
  784. assertex(partdedup);
  785. provider.releaseRow(pending[i]);
  786. }
  787. pending[i] = provider.nextRow(i);
  788. if (pending[i])
  789. return true;
  790. provider.stop(i);
  791. #ifdef _DEBUG
  792. assertex(!stopped[i]);
  793. stopped[i] = true;
  794. #endif
  795. return false;
  796. }
  797. public:
  798. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams,ICompare *_icmp,bool _partdedup=false)
  799. : provider(_provider)
  800. {
  801. partdedup = _partdedup;
  802. icmp = _icmp;
  803. recsize = NULL;
  804. activeInputs = 0;
  805. #ifdef _DEBUG
  806. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  807. memset(stopped,0,numstreams*sizeof(bool));
  808. #endif
  809. unsigned i;
  810. recsize = NULL;
  811. if (numstreams) {
  812. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  813. pending = (const void **)buf;
  814. mergeheap = (unsigned *)(pending+numstreams);
  815. for (i=0;i<numstreams;i++) {
  816. pending[i] = NULL;
  817. if (pullInput(i))
  818. mergeheap[activeInputs++] = i;
  819. }
  820. }
  821. else {
  822. pending = NULL;
  823. mergeheap = NULL;
  824. }
  825. init();
  826. }
  827. void stop()
  828. {
  829. while (activeInputs) {
  830. activeInputs--;
  831. if (pending[mergeheap[activeInputs]]) {
  832. provider.releaseRow(pending[mergeheap[activeInputs]]);
  833. #ifdef _DEBUG
  834. assertex(!stopped[mergeheap[activeInputs]]);
  835. stopped[mergeheap[activeInputs]] = true;
  836. #endif
  837. provider.stop(mergeheap[activeInputs]);
  838. }
  839. }
  840. pending = NULL;
  841. mergeheap = NULL;
  842. workingbuf.clear();
  843. }
  844. ~CRowStreamMerger()
  845. {
  846. stop();
  847. }
  848. inline const void * next()
  849. {
  850. if (!_next())
  851. return NULL;
  852. unsigned strm = mergeheap[0];
  853. const void *row = pending[strm];
  854. pending[strm] = NULL;
  855. return row;
  856. }
  857. };
  858. class CMergeRowStreams : public CInterface, implements IRowStream
  859. {
  860. protected:
  861. CRowStreamMerger *merger;
  862. bool eos;
  863. class cProvider: public CInterface, implements IRowProvider
  864. {
  865. IArrayOf<IRowStream> ostreams;
  866. IRowStream **streams;
  867. Linked<IRowLinkCounter> linkcounter;
  868. const void *nextRow(unsigned idx)
  869. {
  870. return streams[idx]->nextRow();
  871. };
  872. void linkRow(const void *row)
  873. {
  874. linkcounter->linkRow(row);
  875. }
  876. void releaseRow(const void *row)
  877. {
  878. linkcounter->releaseRow(row);
  879. }
  880. void stop(unsigned idx)
  881. {
  882. streams[idx]->stop();
  883. }
  884. public:
  885. IMPLEMENT_IINTERFACE;
  886. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  887. : linkcounter(_linkcounter)
  888. {
  889. ostreams.ensure(numstreams);
  890. unsigned n = 0;
  891. while (n<numstreams)
  892. ostreams.append(*LINK(_streams[n++]));
  893. streams = ostreams.getArray();
  894. }
  895. } *streamprovider;
  896. public:
  897. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  898. {
  899. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  900. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  901. eos = _numstreams==0;
  902. }
  903. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  904. {
  905. streamprovider = NULL;
  906. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  907. eos = _numstreams==0;
  908. }
  909. ~CMergeRowStreams()
  910. {
  911. delete merger;
  912. delete streamprovider;
  913. }
  914. IMPLEMENT_IINTERFACE;
  915. void stop()
  916. {
  917. if (!eos) {
  918. merger->stop();
  919. eos = true;
  920. }
  921. }
  922. const void *nextRow()
  923. {
  924. if (eos)
  925. return NULL;
  926. const void *r = merger->next();
  927. if (!r) {
  928. stop(); // think ok to stop early
  929. return NULL;
  930. }
  931. return r;
  932. }
  933. };
  934. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  935. {
  936. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  937. }
  938. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  939. {
  940. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  941. }