jsort.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _DEBUG
  26. // #define PARANOID
  27. //#define TESTPARSORT
  28. //#define MCMERGESTATS
  29. #endif
  30. //#define PARANOID_PARTITION
  31. //#define TRACE_PARTITION
  32. #define PARALLEL_GRANULARITY 1024
  33. static bool sortParallel(unsigned &numcpus)
  34. {
  35. static unsigned numCPUs = 0;
  36. if (numCPUs==0) {
  37. numCPUs = getAffinityCpus();
  38. }
  39. if ((numcpus==0)||(numcpus>numCPUs))
  40. numcpus = numCPUs;
  41. #ifdef TESTPARSORT
  42. numcpus = 2;
  43. return true; // to test
  44. #endif
  45. return (numcpus>1);
  46. }
  47. //define two variants of the same insertion function.
  48. #define COMPARE(search,position) compare(search,position)
  49. #define INSERT(position,search) memmove(position,search, width)
  50. void * binary_add(const void *newitem, const void *base,
  51. size32_t nmemb,
  52. size32_t width,
  53. int ( *compare)(const void *_e1, const void *_e2),
  54. bool * ItemAdded)
  55. {
  56. #include "jsort.inc"
  57. }
  58. #undef COMPARE
  59. #undef INSERT
  60. //---------------------------------------------------------------------------
  61. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  62. #define INSERT(position,search) *(const void * *)(position) = search
  63. #define NEVER_ADD
  64. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  65. size32_t nmemb,
  66. sortCompareFunction compare,
  67. bool * ItemAdded)
  68. {
  69. #define width sizeof(void*)
  70. #include "jsort.inc"
  71. #undef width
  72. }
  73. #undef NEVER_ADD
  74. #undef INSERT
  75. #undef COMPARE
  76. //---------------------------------------------------------------------------
  77. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  78. #define INSERT(position,search) *(const void * *)(position) = search
  79. #define NEVER_ADD
  80. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  81. size32_t nmemb,
  82. ICompare & compare,
  83. bool * ItemAdded)
  84. {
  85. #define width sizeof(void*)
  86. #include "jsort.inc"
  87. #undef width
  88. }
  89. #undef NEVER_ADD
  90. #undef INSERT
  91. #undef COMPARE
  92. //---------------------------------------------------------------------------
  93. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  94. #define INSERT(position,search) *(const void * *)(position) = search
  95. #define ALWAYS_ADD
  96. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  97. size32_t nmemb,
  98. sortCompareFunction compare)
  99. {
  100. #define width sizeof(void*)
  101. #include "jsort.inc"
  102. #undef width
  103. }
  104. #undef ALWAYS_ADD
  105. #undef INSERT
  106. #undef COMPARE
  107. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  108. #define INSERT(position,search) *(const void * *)(position) = search
  109. #define ALWAYS_ADD
  110. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  111. size32_t nmemb,
  112. ICompare const & compare)
  113. {
  114. #define width sizeof(void*)
  115. #include "jsort.inc"
  116. #undef width
  117. }
  118. #undef ALWAYS_ADD
  119. #undef INSERT
  120. #undef COMPARE
  121. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  122. #define INSERT(position,search) *(const void * *)(position) = search
  123. #define ALWAYS_ADD
  124. #define SEEK_LAST_MATCH
  125. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  126. size32_t nmemb,
  127. sortCompareFunction compare)
  128. {
  129. #define width sizeof(void*)
  130. #include "jsort.inc"
  131. #undef width
  132. }
  133. #undef SEEK_LAST_MATCH
  134. #undef ALWAYS_ADD
  135. #undef INSERT
  136. #undef COMPARE
  137. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  138. #define INSERT(position,search) *(const void * *)(position) = search
  139. #define ALWAYS_ADD
  140. #define SEEK_LAST_MATCH
  141. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  142. size32_t nmemb,
  143. ICompare const & compare)
  144. {
  145. #define width sizeof(void*)
  146. #include "jsort.inc"
  147. #undef width
  148. }
  149. #undef SEEK_LAST_MATCH
  150. #undef ALWAYS_ADD
  151. #undef INSERT
  152. #undef COMPARE
  153. //=========================================================================
  154. // optimized quicksort for array of pointers to fixed size objects
  155. typedef void * ELEMENT;
  156. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  157. #define VECTOR _VECTOR
  158. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  159. #define SWAP swap
  160. #define CMP(a,b) memcmp(*(a),*(b),es)
  161. #define MED3(a,b,c) med3a(a,b,c,es)
  162. #define RECURSE(a,b) qsortvec(a, b, es)
  163. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  164. {
  165. return CMP(a, b) < 0 ?
  166. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  167. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  168. }
  169. void qsortvec(void **a, size32_t n, size32_t es)
  170. #include "jsort2.inc"
  171. #undef CMP
  172. #undef MED3
  173. #undef RECURSE
  174. //---------------------------------------------------------------------------
  175. #define CMP(a,b) (compare(*(a),*(b)))
  176. #define MED3(a,b,c) med3c(a,b,c,compare)
  177. #define RECURSE(a,b) qsortvec(a, b, compare)
  178. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  179. {
  180. return CMP(a, b) < 0 ?
  181. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  182. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  183. }
  184. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  185. #include "jsort2.inc"
  186. #undef CMP
  187. #undef MED3
  188. #undef RECURSE
  189. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  190. #define MED3(a,b,c) med3ic(a,b,c,compare)
  191. #define RECURSE(a,b) qsortvec(a, b, compare)
  192. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  193. {
  194. return CMP(a, b) < 0 ?
  195. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  196. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  197. }
  198. void qsortvec(void **a, size32_t n, const ICompare & compare)
  199. #include "jsort2.inc"
  200. // Parallel version (only 2 threads currently)
  201. class cParQSortBase
  202. {
  203. struct sJobItem
  204. {
  205. unsigned start;
  206. unsigned num;
  207. };
  208. NonReentrantSpinLock joblock;
  209. QueueOf<sJobItem,false> jobq;
  210. Semaphore jobqsem;
  211. unsigned waiting;
  212. unsigned numsubthreads;
  213. bool done;
  214. class cThread: public Thread
  215. {
  216. cParQSortBase *parent;
  217. public:
  218. cThread(cParQSortBase *_parent)
  219. : Thread("cParQSort")
  220. {
  221. parent = _parent;
  222. }
  223. int run()
  224. {
  225. parent->run();
  226. return 0;
  227. }
  228. } **threads;
  229. bool waitForWork(unsigned &s,unsigned &n)
  230. {
  231. NonReentrantSpinBlock block(joblock);
  232. while (!done) {
  233. sJobItem *qi = jobq.dequeue();
  234. if (qi) {
  235. s = qi->start;
  236. n = qi->num;
  237. delete qi;
  238. return true;
  239. }
  240. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  241. done = true;
  242. jobqsem.signal(waiting);
  243. break;
  244. }
  245. waiting++;
  246. NonReentrantSpinUnblock unblock(joblock);
  247. jobqsem.wait();
  248. }
  249. s = 0; // remove uninitialised variable warnings
  250. n = 0;
  251. return false;
  252. }
  253. public:
  254. cParQSortBase(unsigned _numsubthreads)
  255. {
  256. numsubthreads = _numsubthreads;
  257. done = false;
  258. waiting = 0;
  259. threads = new cThread*[numsubthreads];
  260. for (unsigned i=0;i<numsubthreads;i++)
  261. threads[i] = new cThread(this);
  262. }
  263. ~cParQSortBase()
  264. {
  265. for (unsigned i=0;i<numsubthreads;i++)
  266. threads[i]->Release();
  267. delete [] threads;
  268. }
  269. void start()
  270. {
  271. for (unsigned i=0;i<numsubthreads;i++)
  272. threads[i]->start();
  273. }
  274. void subsort(unsigned s, unsigned n)
  275. {
  276. do {
  277. sJobItem *qi;
  278. while (n>PARALLEL_GRANULARITY) {
  279. unsigned r1;
  280. unsigned r2;
  281. partition(s, n, r1, r2);
  282. unsigned n2 = n+s-r2;
  283. if (r1==s) {
  284. n = n2;
  285. s = r2;
  286. }
  287. else {
  288. if (n2!=0) {
  289. qi = new sJobItem;
  290. qi->num = n2;
  291. qi->start = r2;
  292. NonReentrantSpinBlock block(joblock);
  293. jobq.enqueue(qi);
  294. if (waiting) {
  295. jobqsem.signal(waiting);
  296. waiting = 0;
  297. }
  298. }
  299. n = r1-s;
  300. }
  301. }
  302. serialsort(s,n);
  303. NonReentrantSpinBlock block(joblock);
  304. if (waiting==numsubthreads) { // well we are done so are rest
  305. done = true;
  306. jobqsem.signal(waiting);
  307. break;
  308. }
  309. }
  310. while(waitForWork(s,n));
  311. }
  312. void run()
  313. {
  314. unsigned s;
  315. unsigned n;
  316. if (waitForWork(s,n))
  317. subsort(s,n);
  318. }
  319. void join()
  320. {
  321. for (unsigned i=0;i<numsubthreads;i++)
  322. threads[i]->join();
  323. }
  324. virtual void serialsort(unsigned from, unsigned len)=0;
  325. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  326. };
  327. #define DOPARTITION \
  328. VECTOR a = array+s; \
  329. VECTOR pm = a + (n / 2); \
  330. VECTOR pl = a; \
  331. VECTOR pn = a + (n - 1) ; \
  332. if (n > 40) { \
  333. unsigned d = (n / 8); \
  334. pl = MED3(pl, pl + d, pl + 2 * d); \
  335. pm = MED3(pm - d, pm, pm + d); \
  336. pn = MED3(pn - 2 * d, pn - d, pn); \
  337. } \
  338. pm = MED3(pl, pm, pn); \
  339. SWAP(a, pm); \
  340. VECTOR pa = a + 1; \
  341. VECTOR pb = pa; \
  342. VECTOR pc = a + (n - 1); \
  343. VECTOR pd = pc; \
  344. int r; \
  345. for (;;) { \
  346. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  347. if (r == 0) { \
  348. SWAP(pa, pb); \
  349. pa++; \
  350. } \
  351. pb++; \
  352. } \
  353. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  354. if (r == 0) { \
  355. SWAP(pc, pd); \
  356. pd--; \
  357. } \
  358. pc--; \
  359. } \
  360. if (pb > pc) \
  361. break; \
  362. SWAP(pb, pc); \
  363. pb++; \
  364. pc--; \
  365. } \
  366. pn = a + n; \
  367. r = MIN(pa - a, pb - pa); \
  368. VECTOR v1 = a; \
  369. VECTOR v2 = pb-r; \
  370. while (r) { \
  371. SWAP(v1,v2); v1++; v2++; r--; \
  372. }; \
  373. r = MIN(pd - pc, pn - pd - 1); \
  374. v1 = pb; \
  375. v2 = pn-r; \
  376. while (r) { \
  377. SWAP(v1,v2); v1++; v2++; r--; \
  378. }; \
  379. r1 = (pb-pa)+s; \
  380. r2 = n-(pd-pc)+s;
  381. class cParQSort: public cParQSortBase
  382. {
  383. VECTOR array;
  384. const ICompare &compare;
  385. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  386. {
  387. DOPARTITION
  388. }
  389. void serialsort(unsigned from, unsigned len)
  390. {
  391. qsortvec(array+from,len,compare);
  392. }
  393. public:
  394. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  395. : cParQSortBase(_numsubthreads), compare(_compare)
  396. {
  397. array = _a;
  398. cParQSortBase::start();
  399. }
  400. };
  401. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  402. {
  403. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  404. qsortvec(a,n,compare);
  405. return;
  406. }
  407. cParQSort sorter(a,compare,numcpus-1);
  408. sorter.subsort(0,n);
  409. sorter.join();
  410. #ifdef TESTPARSORT
  411. for (unsigned i=1;i<n;i++)
  412. if (compare.docompare(a[i-1],a[i])>0)
  413. ERRLOG("parqsortvec failed %d",i);
  414. #endif
  415. }
  416. #undef CMP
  417. #undef MED3
  418. #undef RECURSE
  419. //---------------------------------------------------------------------------
  420. #undef VECTOR
  421. #undef SWAP
  422. typedef void *** _IVECTOR;
  423. #define VECTOR _IVECTOR
  424. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  425. #define SWAP swapind
  426. #define CMP(a,b) cmpicindstable(a,b,compare)
  427. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  428. {
  429. int ret = compare.docompare(**a,**b);
  430. if (ret==0)
  431. {
  432. if (*a>*b)
  433. ret = 1;
  434. else if (*a<*b)
  435. ret = -1;
  436. }
  437. return ret;
  438. }
  439. #define MED3(a,b,c) med3ic(a,b,c,compare)
  440. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  441. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  442. {
  443. return CMP(a, b) < 0 ?
  444. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  445. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  446. }
  447. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  448. #include "jsort2.inc"
  449. class cParQSortStable: public cParQSortBase
  450. {
  451. VECTOR array;
  452. const ICompare &compare;
  453. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  454. {
  455. DOPARTITION
  456. }
  457. void serialsort(unsigned from, unsigned len)
  458. {
  459. doqsortvecstable(array+from,len,compare);
  460. }
  461. public:
  462. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  463. : cParQSortBase(_numsubthreads),compare(_compare)
  464. {
  465. array = _a;
  466. cParQSortBase::start();
  467. }
  468. };
  469. #undef CMP
  470. #undef CMP1
  471. #undef MED3
  472. #undef RECURSE
  473. #undef VECTOR
  474. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  475. {
  476. for(unsigned i=0; i<n; ++i)
  477. index[i] = rows+i;
  478. doqsortvecstable(index, n, compare);
  479. }
  480. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  481. {
  482. memcpy(temp, rows, n * sizeof(void*));
  483. qsortvecstable(temp, n, compare, (void * * *)rows);
  484. //I'm sure this violates the aliasing rules...
  485. void * * * rowsAsIndex = (void * * *)rows;
  486. for(unsigned i=0; i<n; ++i)
  487. rows[i] = *rowsAsIndex[i];
  488. }
  489. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  490. {
  491. for(unsigned i=0; i<n; ++i)
  492. index[i] = rows+i;
  493. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  494. doqsortvecstable(index,n,compare);
  495. return;
  496. }
  497. cParQSortStable sorter(index,compare,numcpus-1);
  498. sorter.subsort(0,n);
  499. sorter.join();
  500. }
  501. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  502. {
  503. memcpy(temp, rows, n * sizeof(void*));
  504. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  505. //I'm sure this violates the aliasing rules...
  506. void * * * rowsAsIndex = (void * * *)rows;
  507. for(size32_t i=0; i<n; ++i)
  508. rows[i] = *rowsAsIndex[i];
  509. }
  510. //=========================================================================
  511. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  512. {
  513. bool nochange = true;
  514. while(1)
  515. {
  516. unsigned c = p*2 + 1;
  517. if(c >= num)
  518. return nochange;
  519. if(c+1 < num)
  520. {
  521. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  522. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  523. ++c;
  524. }
  525. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  526. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  527. return nochange;
  528. nochange = false;
  529. unsigned r = heap[c];
  530. heap[c] = heap[p];
  531. heap[p] = r;
  532. p = c;
  533. }
  534. }
  535. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  536. {
  537. bool nochange = true;
  538. while(c > 0)
  539. {
  540. unsigned p = (unsigned)(c-1)/2;
  541. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  542. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  543. return nochange;
  544. nochange = false;
  545. unsigned r = heap[c];
  546. heap[c] = heap[p];
  547. heap[p] = r;
  548. c = p;
  549. }
  550. return nochange;
  551. }
  552. //=========================================================================
  553. #include <assert.h>
  554. #include <stdio.h>
  555. #include <stdlib.h>
  556. #include <fcntl.h>
  557. #include <string.h>
  558. #include <stddef.h>
  559. #include <time.h>
  560. #ifdef _WIN32
  561. #include <io.h>
  562. #include <sys\types.h>
  563. #include <sys\stat.h>
  564. #else
  565. #include <sys/types.h>
  566. #include <sys/stat.h>
  567. #endif
  568. #ifndef off_t
  569. #define off_t __int64
  570. #endif
  571. typedef void ** VECTOR;
  572. interface IMergeSorter
  573. {
  574. public:
  575. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  576. };
  577. #define INSERTMAX 10000
  578. #define BUFFSIZE 0x100000 // used for output buffer
  579. //==================================================================================================
  580. class CRowStreamMerger
  581. {
  582. const void **pending;
  583. size32_t *recsize;
  584. unsigned *mergeheap;
  585. unsigned activeInputs;
  586. count_t recno;
  587. const ICompare *icmp;
  588. bool partdedup;
  589. IRowProvider &provider;
  590. MemoryAttr workingbuf;
  591. #ifdef _DEBUG
  592. bool *stopped;
  593. MemoryAttr ma;
  594. #endif
  595. inline int buffCompare(unsigned a, unsigned b)
  596. {
  597. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  598. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  599. }
  600. bool promote(unsigned p)
  601. {
  602. activeInputs--;
  603. if(activeInputs == p)
  604. return false;
  605. mergeheap[p] = mergeheap[activeInputs];
  606. return true;
  607. }
  608. inline bool siftDown(unsigned p)
  609. {
  610. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  611. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  612. bool nochange = true;
  613. while(1)
  614. {
  615. unsigned c = p*2 + 1;
  616. if(c >= activeInputs)
  617. return nochange;
  618. if(c+1 < activeInputs)
  619. {
  620. int childcmp = buffCompare(c+1, c);
  621. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  622. ++c;
  623. }
  624. int cmp = buffCompare(c, p);
  625. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  626. return nochange;
  627. nochange = false;
  628. unsigned r = mergeheap[c];
  629. mergeheap[c] = mergeheap[p];
  630. mergeheap[p] = r;
  631. p = c;
  632. }
  633. }
  634. void siftDownDedupTop()
  635. {
  636. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  637. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  638. if(activeInputs < 2)
  639. return;
  640. unsigned c = 1;
  641. int childcmp = 1;
  642. if(activeInputs >= 3)
  643. {
  644. childcmp = buffCompare(2, 1);
  645. if(childcmp < 0)
  646. c = 2;
  647. }
  648. int cmp = buffCompare(c, 0);
  649. if(cmp > 0)
  650. return;
  651. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  652. while(cmp <= 0)
  653. {
  654. if(cmp == 0)
  655. {
  656. if(mergeheap[c] < mergeheap[0])
  657. {
  658. unsigned r = mergeheap[c];
  659. mergeheap[c] = mergeheap[0];
  660. mergeheap[0] = r;
  661. }
  662. if(!pullInput(mergeheap[c]))
  663. if(!promote(c))
  664. break;
  665. siftDown(c);
  666. }
  667. else
  668. {
  669. unsigned r = mergeheap[c];
  670. mergeheap[c] = mergeheap[0];
  671. mergeheap[0] = r;
  672. if(siftDown(c))
  673. break;
  674. }
  675. cmp = buffCompare(c, 0);
  676. }
  677. // the following loop ensures the uniqueness property holds on the other branch too
  678. c = 3-c;
  679. if(activeInputs <= c)
  680. return;
  681. while(childcmp == 0)
  682. {
  683. if(mergeheap[c] < mergeheap[0])
  684. {
  685. unsigned r = mergeheap[c];
  686. mergeheap[c] = mergeheap[0];
  687. mergeheap[0] = r;
  688. }
  689. if(!pullInput(mergeheap[c]))
  690. if(!promote(c))
  691. break;
  692. siftDown(c);
  693. childcmp = buffCompare(c, 0);
  694. }
  695. }
  696. void init()
  697. {
  698. if (activeInputs>0) {
  699. // setup heap property
  700. if (activeInputs >= 2)
  701. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  702. siftDown(p);
  703. if (partdedup)
  704. siftDownDedupTop();
  705. else
  706. siftDown(0);
  707. }
  708. recno = 0;
  709. }
  710. inline count_t num() const { return recno; }
  711. inline bool _next()
  712. {
  713. if (!activeInputs)
  714. return false;
  715. if (recno) {
  716. if(!pullInput(mergeheap[0]))
  717. if(!promote(0))
  718. return false;
  719. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  720. if(partdedup)
  721. siftDownDedupTop();
  722. else
  723. siftDown(0);
  724. }
  725. recno++;
  726. return true;
  727. }
  728. inline bool eof() const { return activeInputs==0; }
  729. bool pullInput(unsigned i)
  730. {
  731. if (pending[i]) {
  732. assertex(partdedup);
  733. provider.releaseRow(pending[i]);
  734. }
  735. pending[i] = provider.nextRow(i);
  736. if (pending[i])
  737. return true;
  738. provider.stop(i);
  739. #ifdef _DEBUG
  740. assertex(!stopped[i]);
  741. stopped[i] = true;
  742. #endif
  743. return false;
  744. }
  745. public:
  746. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  747. : provider(_provider)
  748. {
  749. partdedup = _partdedup;
  750. icmp = _icmp;
  751. recsize = NULL;
  752. activeInputs = 0;
  753. #ifdef _DEBUG
  754. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  755. memset(stopped,0,numstreams*sizeof(bool));
  756. #endif
  757. unsigned i;
  758. recsize = NULL;
  759. if (numstreams) {
  760. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  761. pending = (const void **)buf;
  762. mergeheap = (unsigned *)(pending+numstreams);
  763. for (i=0;i<numstreams;i++) {
  764. pending[i] = NULL;
  765. if (pullInput(i))
  766. mergeheap[activeInputs++] = i;
  767. }
  768. }
  769. else {
  770. pending = NULL;
  771. mergeheap = NULL;
  772. }
  773. init();
  774. }
  775. void stop()
  776. {
  777. while (activeInputs) {
  778. activeInputs--;
  779. if (pending[mergeheap[activeInputs]]) {
  780. provider.releaseRow(pending[mergeheap[activeInputs]]);
  781. #ifdef _DEBUG
  782. assertex(!stopped[mergeheap[activeInputs]]);
  783. stopped[mergeheap[activeInputs]] = true;
  784. #endif
  785. provider.stop(mergeheap[activeInputs]);
  786. }
  787. }
  788. pending = NULL;
  789. mergeheap = NULL;
  790. workingbuf.clear();
  791. }
  792. ~CRowStreamMerger()
  793. {
  794. stop();
  795. }
  796. inline const void * next()
  797. {
  798. if (!_next())
  799. return NULL;
  800. unsigned strm = mergeheap[0];
  801. const void *row = pending[strm];
  802. pending[strm] = NULL;
  803. return row;
  804. }
  805. };
  806. class CMergeRowStreams : public CInterface, implements IRowStream
  807. {
  808. protected:
  809. CRowStreamMerger *merger;
  810. bool eos;
  811. class cProvider: public CInterface, implements IRowProvider
  812. {
  813. IArrayOf<IRowStream> ostreams;
  814. IRowStream **streams;
  815. Linked<IRowLinkCounter> linkcounter;
  816. const void *nextRow(unsigned idx)
  817. {
  818. return streams[idx]->nextRow();
  819. };
  820. void linkRow(const void *row)
  821. {
  822. linkcounter->linkRow(row);
  823. }
  824. void releaseRow(const void *row)
  825. {
  826. linkcounter->releaseRow(row);
  827. }
  828. void stop(unsigned idx)
  829. {
  830. streams[idx]->stop();
  831. }
  832. public:
  833. IMPLEMENT_IINTERFACE;
  834. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  835. : linkcounter(_linkcounter)
  836. {
  837. ostreams.ensure(numstreams);
  838. unsigned n = 0;
  839. while (n<numstreams)
  840. ostreams.append(*LINK(_streams[n++]));
  841. streams = ostreams.getArray();
  842. }
  843. } *streamprovider;
  844. public:
  845. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  846. {
  847. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  848. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  849. eos = _numstreams==0;
  850. }
  851. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  852. {
  853. streamprovider = NULL;
  854. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  855. eos = _numstreams==0;
  856. }
  857. ~CMergeRowStreams()
  858. {
  859. delete merger;
  860. delete streamprovider;
  861. }
  862. IMPLEMENT_IINTERFACE;
  863. void stop()
  864. {
  865. if (!eos) {
  866. merger->stop();
  867. eos = true;
  868. }
  869. }
  870. const void *nextRow()
  871. {
  872. if (eos)
  873. return NULL;
  874. const void *r = merger->next();
  875. if (!r) {
  876. stop(); // think ok to stop early
  877. return NULL;
  878. }
  879. return r;
  880. }
  881. };
  882. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  883. {
  884. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  885. }
  886. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  887. {
  888. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  889. }