jsort.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _DEBUG
  26. // #define PARANOID
  27. //#define TESTPARSORT
  28. //#define MCMERGESTATS
  29. #endif
  30. //#define PARANOID_PARTITION
  31. //#define TRACE_PARTITION
  32. #define PARALLEL_GRANULARITY 1024
  33. static bool sortParallel(unsigned &numcpus)
  34. {
  35. unsigned numCPUs = getAffinityCpus();
  36. if ((numcpus==0)||(numcpus>numCPUs))
  37. numcpus = numCPUs;
  38. #ifdef TESTPARSORT
  39. numcpus = 2;
  40. return true; // to test
  41. #endif
  42. return (numcpus>1);
  43. }
  44. //define two variants of the same insertion function.
  45. #define COMPARE(search,position) compare(search,position)
  46. #define INSERT(position,search) memmove(position,search, width)
  47. void * binary_add(const void *newitem, const void *base,
  48. size32_t nmemb,
  49. size32_t width,
  50. int ( *compare)(const void *_e1, const void *_e2),
  51. bool * ItemAdded)
  52. {
  53. #include "jsort.inc"
  54. }
  55. #undef COMPARE
  56. #undef INSERT
  57. //---------------------------------------------------------------------------
  58. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  59. #define INSERT(position,search) *(const void * *)(position) = search
  60. #define NEVER_ADD
  61. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  62. size32_t nmemb,
  63. sortCompareFunction compare,
  64. bool * ItemAdded)
  65. {
  66. #define width sizeof(void*)
  67. #include "jsort.inc"
  68. #undef width
  69. }
  70. #undef NEVER_ADD
  71. #undef INSERT
  72. #undef COMPARE
  73. //---------------------------------------------------------------------------
  74. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  75. #define INSERT(position,search) *(const void * *)(position) = search
  76. #define NEVER_ADD
  77. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  78. size32_t nmemb,
  79. ICompare & compare,
  80. bool * ItemAdded)
  81. {
  82. #define width sizeof(void*)
  83. #include "jsort.inc"
  84. #undef width
  85. }
  86. #undef NEVER_ADD
  87. #undef INSERT
  88. #undef COMPARE
  89. //---------------------------------------------------------------------------
  90. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  91. #define INSERT(position,search) *(const void * *)(position) = search
  92. #define ALWAYS_ADD
  93. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  94. size32_t nmemb,
  95. sortCompareFunction compare)
  96. {
  97. #define width sizeof(void*)
  98. #include "jsort.inc"
  99. #undef width
  100. }
  101. #undef ALWAYS_ADD
  102. #undef INSERT
  103. #undef COMPARE
  104. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  105. #define INSERT(position,search) *(const void * *)(position) = search
  106. #define ALWAYS_ADD
  107. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  108. size32_t nmemb,
  109. ICompare const & compare)
  110. {
  111. #define width sizeof(void*)
  112. #include "jsort.inc"
  113. #undef width
  114. }
  115. #undef ALWAYS_ADD
  116. #undef INSERT
  117. #undef COMPARE
  118. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  119. #define INSERT(position,search) *(const void * *)(position) = search
  120. #define ALWAYS_ADD
  121. #define SEEK_LAST_MATCH
  122. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  123. size32_t nmemb,
  124. sortCompareFunction compare)
  125. {
  126. #define width sizeof(void*)
  127. #include "jsort.inc"
  128. #undef width
  129. }
  130. #undef SEEK_LAST_MATCH
  131. #undef ALWAYS_ADD
  132. #undef INSERT
  133. #undef COMPARE
  134. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  135. #define INSERT(position,search) *(const void * *)(position) = search
  136. #define ALWAYS_ADD
  137. #define SEEK_LAST_MATCH
  138. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  139. size32_t nmemb,
  140. ICompare const & compare)
  141. {
  142. #define width sizeof(void*)
  143. #include "jsort.inc"
  144. #undef width
  145. }
  146. #undef SEEK_LAST_MATCH
  147. #undef ALWAYS_ADD
  148. #undef INSERT
  149. #undef COMPARE
  150. //=========================================================================
  151. // optimized quicksort for array of pointers to fixed size objects
  152. typedef void * ELEMENT;
  153. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  154. #define VECTOR _VECTOR
  155. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  156. #define SWAP swap
  157. #define CMP(a,b) memcmp(*(a),*(b),es)
  158. #define MED3(a,b,c) med3a(a,b,c,es)
  159. #define RECURSE(a,b) qsortvec(a, b, es)
  160. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  161. {
  162. return CMP(a, b) < 0 ?
  163. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  164. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  165. }
  166. void qsortvec(void **a, size32_t n, size32_t es)
  167. #include "jsort2.inc"
  168. #undef CMP
  169. #undef MED3
  170. #undef RECURSE
  171. //---------------------------------------------------------------------------
  172. #define CMP(a,b) (compare(*(a),*(b)))
  173. #define MED3(a,b,c) med3c(a,b,c,compare)
  174. #define RECURSE(a,b) qsortvec(a, b, compare)
  175. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  176. {
  177. return CMP(a, b) < 0 ?
  178. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  179. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  180. }
  181. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  182. #include "jsort2.inc"
  183. #undef CMP
  184. #undef MED3
  185. #undef RECURSE
  186. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  187. #define MED3(a,b,c) med3ic(a,b,c,compare)
  188. #define RECURSE(a,b) qsortvec(a, b, compare)
  189. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  190. {
  191. return CMP(a, b) < 0 ?
  192. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  193. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  194. }
  195. void qsortvec(void **a, size32_t n, const ICompare & compare)
  196. #include "jsort2.inc"
  197. // Parallel version (only 2 threads currently)
  198. class cParQSortBase
  199. {
  200. struct sJobItem
  201. {
  202. unsigned start;
  203. unsigned num;
  204. };
  205. NonReentrantSpinLock joblock;
  206. QueueOf<sJobItem,false> jobq;
  207. Semaphore jobqsem;
  208. unsigned waiting;
  209. unsigned numsubthreads;
  210. bool done;
  211. class cThread: public Thread
  212. {
  213. cParQSortBase *parent;
  214. public:
  215. cThread(cParQSortBase *_parent)
  216. : Thread("cParQSort")
  217. {
  218. parent = _parent;
  219. }
  220. int run()
  221. {
  222. parent->run();
  223. return 0;
  224. }
  225. } **threads;
  226. bool waitForWork(unsigned &s,unsigned &n)
  227. {
  228. NonReentrantSpinBlock block(joblock);
  229. while (!done) {
  230. sJobItem *qi = jobq.dequeue();
  231. if (qi) {
  232. s = qi->start;
  233. n = qi->num;
  234. delete qi;
  235. return true;
  236. }
  237. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  238. done = true;
  239. jobqsem.signal(waiting);
  240. break;
  241. }
  242. waiting++;
  243. NonReentrantSpinUnblock unblock(joblock);
  244. jobqsem.wait();
  245. }
  246. s = 0; // remove uninitialised variable warnings
  247. n = 0;
  248. return false;
  249. }
  250. public:
  251. cParQSortBase(unsigned _numsubthreads)
  252. {
  253. numsubthreads = _numsubthreads;
  254. done = false;
  255. waiting = 0;
  256. threads = new cThread*[numsubthreads];
  257. for (unsigned i=0;i<numsubthreads;i++)
  258. threads[i] = new cThread(this);
  259. }
  260. ~cParQSortBase()
  261. {
  262. for (unsigned i=0;i<numsubthreads;i++)
  263. threads[i]->Release();
  264. delete [] threads;
  265. }
  266. void start()
  267. {
  268. for (unsigned i=0;i<numsubthreads;i++)
  269. threads[i]->start();
  270. }
  271. void subsort(unsigned s, unsigned n)
  272. {
  273. do {
  274. sJobItem *qi;
  275. while (n>PARALLEL_GRANULARITY) {
  276. unsigned r1;
  277. unsigned r2;
  278. partition(s, n, r1, r2);
  279. unsigned n2 = n+s-r2;
  280. if (r1==s) {
  281. n = n2;
  282. s = r2;
  283. }
  284. else {
  285. if (n2!=0) {
  286. qi = new sJobItem;
  287. qi->num = n2;
  288. qi->start = r2;
  289. NonReentrantSpinBlock block(joblock);
  290. jobq.enqueue(qi);
  291. if (waiting) {
  292. jobqsem.signal(waiting);
  293. waiting = 0;
  294. }
  295. }
  296. n = r1-s;
  297. }
  298. }
  299. serialsort(s,n);
  300. NonReentrantSpinBlock block(joblock);
  301. if (waiting==numsubthreads) { // well we are done so are rest
  302. done = true;
  303. jobqsem.signal(waiting);
  304. break;
  305. }
  306. }
  307. while(waitForWork(s,n));
  308. }
  309. void run()
  310. {
  311. unsigned s;
  312. unsigned n;
  313. if (waitForWork(s,n))
  314. subsort(s,n);
  315. }
  316. void join()
  317. {
  318. for (unsigned i=0;i<numsubthreads;i++)
  319. threads[i]->join();
  320. }
  321. virtual void serialsort(unsigned from, unsigned len)=0;
  322. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  323. };
  324. #define DOPARTITION \
  325. VECTOR a = array+s; \
  326. VECTOR pm = a + (n / 2); \
  327. VECTOR pl = a; \
  328. VECTOR pn = a + (n - 1) ; \
  329. if (n > 40) { \
  330. unsigned d = (n / 8); \
  331. pl = MED3(pl, pl + d, pl + 2 * d); \
  332. pm = MED3(pm - d, pm, pm + d); \
  333. pn = MED3(pn - 2 * d, pn - d, pn); \
  334. } \
  335. pm = MED3(pl, pm, pn); \
  336. SWAP(a, pm); \
  337. VECTOR pa = a + 1; \
  338. VECTOR pb = pa; \
  339. VECTOR pc = a + (n - 1); \
  340. VECTOR pd = pc; \
  341. int r; \
  342. for (;;) { \
  343. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  344. if (r == 0) { \
  345. SWAP(pa, pb); \
  346. pa++; \
  347. } \
  348. pb++; \
  349. } \
  350. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  351. if (r == 0) { \
  352. SWAP(pc, pd); \
  353. pd--; \
  354. } \
  355. pc--; \
  356. } \
  357. if (pb > pc) \
  358. break; \
  359. SWAP(pb, pc); \
  360. pb++; \
  361. pc--; \
  362. } \
  363. pn = a + n; \
  364. r = MIN(pa - a, pb - pa); \
  365. VECTOR v1 = a; \
  366. VECTOR v2 = pb-r; \
  367. while (r) { \
  368. SWAP(v1,v2); v1++; v2++; r--; \
  369. }; \
  370. r = MIN(pd - pc, pn - pd - 1); \
  371. v1 = pb; \
  372. v2 = pn-r; \
  373. while (r) { \
  374. SWAP(v1,v2); v1++; v2++; r--; \
  375. }; \
  376. r1 = (pb-pa)+s; \
  377. r2 = n-(pd-pc)+s;
  378. class cParQSort: public cParQSortBase
  379. {
  380. VECTOR array;
  381. const ICompare &compare;
  382. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  383. {
  384. DOPARTITION
  385. }
  386. void serialsort(unsigned from, unsigned len)
  387. {
  388. qsortvec(array+from,len,compare);
  389. }
  390. public:
  391. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  392. : cParQSortBase(_numsubthreads), compare(_compare)
  393. {
  394. array = _a;
  395. cParQSortBase::start();
  396. }
  397. };
  398. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  399. {
  400. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  401. qsortvec(a,n,compare);
  402. return;
  403. }
  404. cParQSort sorter(a,compare,numcpus-1);
  405. sorter.subsort(0,n);
  406. sorter.join();
  407. #ifdef TESTPARSORT
  408. for (unsigned i=1;i<n;i++)
  409. if (compare.docompare(a[i-1],a[i])>0)
  410. ERRLOG("parqsortvec failed %d",i);
  411. #endif
  412. }
  413. #undef CMP
  414. #undef MED3
  415. #undef RECURSE
  416. //---------------------------------------------------------------------------
  417. #undef VECTOR
  418. #undef SWAP
  419. typedef void *** _IVECTOR;
  420. #define VECTOR _IVECTOR
  421. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  422. #define SWAP swapind
  423. #define CMP(a,b) cmpicindstable(a,b,compare)
  424. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  425. {
  426. int ret = compare.docompare(**a,**b);
  427. if (ret==0)
  428. {
  429. if (*a>*b)
  430. ret = 1;
  431. else if (*a<*b)
  432. ret = -1;
  433. }
  434. return ret;
  435. }
  436. #define MED3(a,b,c) med3ic(a,b,c,compare)
  437. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  438. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  439. {
  440. return CMP(a, b) < 0 ?
  441. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  442. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  443. }
  444. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  445. #include "jsort2.inc"
  446. class cParQSortStable: public cParQSortBase
  447. {
  448. VECTOR array;
  449. const ICompare &compare;
  450. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  451. {
  452. DOPARTITION
  453. }
  454. void serialsort(unsigned from, unsigned len)
  455. {
  456. doqsortvecstable(array+from,len,compare);
  457. }
  458. public:
  459. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  460. : cParQSortBase(_numsubthreads),compare(_compare)
  461. {
  462. array = _a;
  463. cParQSortBase::start();
  464. }
  465. };
  466. #undef CMP
  467. #undef CMP1
  468. #undef MED3
  469. #undef RECURSE
  470. #undef VECTOR
  471. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  472. {
  473. for(unsigned i=0; i<n; ++i)
  474. index[i] = rows+i;
  475. doqsortvecstable(index, n, compare);
  476. }
  477. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  478. {
  479. memcpy(temp, rows, n * sizeof(void*));
  480. qsortvecstable(temp, n, compare, (void * * *)rows);
  481. //I'm sure this violates the aliasing rules...
  482. void * * * rowsAsIndex = (void * * *)rows;
  483. for(unsigned i=0; i<n; ++i)
  484. rows[i] = *rowsAsIndex[i];
  485. }
  486. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  487. {
  488. for(unsigned i=0; i<n; ++i)
  489. index[i] = rows+i;
  490. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  491. doqsortvecstable(index,n,compare);
  492. return;
  493. }
  494. cParQSortStable sorter(index,compare,numcpus-1);
  495. sorter.subsort(0,n);
  496. sorter.join();
  497. }
  498. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  499. {
  500. memcpy(temp, rows, n * sizeof(void*));
  501. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  502. //I'm sure this violates the aliasing rules...
  503. void * * * rowsAsIndex = (void * * *)rows;
  504. for(size32_t i=0; i<n; ++i)
  505. rows[i] = *rowsAsIndex[i];
  506. }
  507. //=========================================================================
  508. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  509. {
  510. bool nochange = true;
  511. while(1)
  512. {
  513. unsigned c = p*2 + 1;
  514. if(c >= num)
  515. return nochange;
  516. if(c+1 < num)
  517. {
  518. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  519. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  520. ++c;
  521. }
  522. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  523. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  524. return nochange;
  525. nochange = false;
  526. unsigned r = heap[c];
  527. heap[c] = heap[p];
  528. heap[p] = r;
  529. p = c;
  530. }
  531. }
  532. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  533. {
  534. bool nochange = true;
  535. while(c > 0)
  536. {
  537. unsigned p = (unsigned)(c-1)/2;
  538. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  539. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  540. return nochange;
  541. nochange = false;
  542. unsigned r = heap[c];
  543. heap[c] = heap[p];
  544. heap[p] = r;
  545. c = p;
  546. }
  547. return nochange;
  548. }
  549. //=========================================================================
  550. #include <assert.h>
  551. #include <stdio.h>
  552. #include <stdlib.h>
  553. #include <fcntl.h>
  554. #include <string.h>
  555. #include <stddef.h>
  556. #include <time.h>
  557. #ifdef _WIN32
  558. #include <io.h>
  559. #include <sys\types.h>
  560. #include <sys\stat.h>
  561. #else
  562. #include <sys/types.h>
  563. #include <sys/stat.h>
  564. #endif
  565. #ifndef off_t
  566. #define off_t __int64
  567. #endif
  568. typedef void ** VECTOR;
  569. interface IMergeSorter
  570. {
  571. public:
  572. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  573. };
  574. #define INSERTMAX 10000
  575. #define BUFFSIZE 0x100000 // used for output buffer
  576. //==================================================================================================
  577. class CRowStreamMerger
  578. {
  579. const void **pending;
  580. size32_t *recsize;
  581. unsigned *mergeheap;
  582. unsigned activeInputs;
  583. count_t recno;
  584. const ICompare *icmp;
  585. bool partdedup;
  586. IRowProvider &provider;
  587. MemoryAttr workingbuf;
  588. #ifdef _DEBUG
  589. bool *stopped;
  590. MemoryAttr ma;
  591. #endif
  592. inline int buffCompare(unsigned a, unsigned b)
  593. {
  594. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  595. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  596. }
  597. bool promote(unsigned p)
  598. {
  599. activeInputs--;
  600. if(activeInputs == p)
  601. return false;
  602. mergeheap[p] = mergeheap[activeInputs];
  603. return true;
  604. }
  605. inline bool siftDown(unsigned p)
  606. {
  607. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  608. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  609. bool nochange = true;
  610. while(1)
  611. {
  612. unsigned c = p*2 + 1;
  613. if(c >= activeInputs)
  614. return nochange;
  615. if(c+1 < activeInputs)
  616. {
  617. int childcmp = buffCompare(c+1, c);
  618. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  619. ++c;
  620. }
  621. int cmp = buffCompare(c, p);
  622. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  623. return nochange;
  624. nochange = false;
  625. unsigned r = mergeheap[c];
  626. mergeheap[c] = mergeheap[p];
  627. mergeheap[p] = r;
  628. p = c;
  629. }
  630. }
  631. void siftDownDedupTop()
  632. {
  633. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  634. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  635. if(activeInputs < 2)
  636. return;
  637. unsigned c = 1;
  638. int childcmp = 1;
  639. if(activeInputs >= 3)
  640. {
  641. childcmp = buffCompare(2, 1);
  642. if(childcmp < 0)
  643. c = 2;
  644. }
  645. int cmp = buffCompare(c, 0);
  646. if(cmp > 0)
  647. return;
  648. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  649. while(cmp <= 0)
  650. {
  651. if(cmp == 0)
  652. {
  653. if(mergeheap[c] < mergeheap[0])
  654. {
  655. unsigned r = mergeheap[c];
  656. mergeheap[c] = mergeheap[0];
  657. mergeheap[0] = r;
  658. }
  659. if(!pullInput(mergeheap[c]))
  660. if(!promote(c))
  661. break;
  662. siftDown(c);
  663. }
  664. else
  665. {
  666. unsigned r = mergeheap[c];
  667. mergeheap[c] = mergeheap[0];
  668. mergeheap[0] = r;
  669. if(siftDown(c))
  670. break;
  671. }
  672. cmp = buffCompare(c, 0);
  673. }
  674. // the following loop ensures the uniqueness property holds on the other branch too
  675. c = 3-c;
  676. if(activeInputs <= c)
  677. return;
  678. while(childcmp == 0)
  679. {
  680. if(mergeheap[c] < mergeheap[0])
  681. {
  682. unsigned r = mergeheap[c];
  683. mergeheap[c] = mergeheap[0];
  684. mergeheap[0] = r;
  685. }
  686. if(!pullInput(mergeheap[c]))
  687. if(!promote(c))
  688. break;
  689. siftDown(c);
  690. childcmp = buffCompare(c, 0);
  691. }
  692. }
  693. void init()
  694. {
  695. if (activeInputs>0) {
  696. // setup heap property
  697. if (activeInputs >= 2)
  698. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  699. siftDown(p);
  700. if (partdedup)
  701. siftDownDedupTop();
  702. else
  703. siftDown(0);
  704. }
  705. recno = 0;
  706. }
  707. inline count_t num() const { return recno; }
  708. inline bool _next()
  709. {
  710. if (!activeInputs)
  711. return false;
  712. if (recno) {
  713. if(!pullInput(mergeheap[0]))
  714. if(!promote(0))
  715. return false;
  716. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  717. if(partdedup)
  718. siftDownDedupTop();
  719. else
  720. siftDown(0);
  721. }
  722. recno++;
  723. return true;
  724. }
  725. inline bool eof() const { return activeInputs==0; }
  726. bool pullInput(unsigned i)
  727. {
  728. if (pending[i]) {
  729. assertex(partdedup);
  730. provider.releaseRow(pending[i]);
  731. }
  732. pending[i] = provider.nextRow(i);
  733. if (pending[i])
  734. return true;
  735. provider.stop(i);
  736. #ifdef _DEBUG
  737. assertex(!stopped[i]);
  738. stopped[i] = true;
  739. #endif
  740. return false;
  741. }
  742. public:
  743. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  744. : provider(_provider)
  745. {
  746. partdedup = _partdedup;
  747. icmp = _icmp;
  748. recsize = NULL;
  749. activeInputs = 0;
  750. #ifdef _DEBUG
  751. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  752. memset(stopped,0,numstreams*sizeof(bool));
  753. #endif
  754. unsigned i;
  755. recsize = NULL;
  756. if (numstreams) {
  757. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  758. pending = (const void **)buf;
  759. mergeheap = (unsigned *)(pending+numstreams);
  760. for (i=0;i<numstreams;i++) {
  761. pending[i] = NULL;
  762. if (pullInput(i))
  763. mergeheap[activeInputs++] = i;
  764. }
  765. }
  766. else {
  767. pending = NULL;
  768. mergeheap = NULL;
  769. }
  770. init();
  771. }
  772. void stop()
  773. {
  774. while (activeInputs) {
  775. activeInputs--;
  776. if (pending[mergeheap[activeInputs]]) {
  777. provider.releaseRow(pending[mergeheap[activeInputs]]);
  778. #ifdef _DEBUG
  779. assertex(!stopped[mergeheap[activeInputs]]);
  780. stopped[mergeheap[activeInputs]] = true;
  781. #endif
  782. provider.stop(mergeheap[activeInputs]);
  783. }
  784. }
  785. pending = NULL;
  786. mergeheap = NULL;
  787. workingbuf.clear();
  788. }
  789. ~CRowStreamMerger()
  790. {
  791. stop();
  792. }
  793. inline const void * next()
  794. {
  795. if (!_next())
  796. return NULL;
  797. unsigned strm = mergeheap[0];
  798. const void *row = pending[strm];
  799. pending[strm] = NULL;
  800. return row;
  801. }
  802. };
  803. class CMergeRowStreams : implements IRowStream, public CInterface
  804. {
  805. protected:
  806. CRowStreamMerger *merger;
  807. bool eos;
  808. class cProvider: implements IRowProvider, public CInterface
  809. {
  810. IArrayOf<IRowStream> ostreams;
  811. IRowStream **streams;
  812. Linked<IRowLinkCounter> linkcounter;
  813. const void *nextRow(unsigned idx)
  814. {
  815. return streams[idx]->nextRow();
  816. };
  817. void linkRow(const void *row)
  818. {
  819. linkcounter->linkRow(row);
  820. }
  821. void releaseRow(const void *row)
  822. {
  823. linkcounter->releaseRow(row);
  824. }
  825. void stop(unsigned idx)
  826. {
  827. streams[idx]->stop();
  828. }
  829. public:
  830. IMPLEMENT_IINTERFACE;
  831. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  832. : linkcounter(_linkcounter)
  833. {
  834. ostreams.ensure(numstreams);
  835. unsigned n = 0;
  836. while (n<numstreams)
  837. ostreams.append(*LINK(_streams[n++]));
  838. streams = ostreams.getArray();
  839. }
  840. } *streamprovider;
  841. public:
  842. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  843. {
  844. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  845. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  846. eos = _numstreams==0;
  847. }
  848. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  849. {
  850. streamprovider = NULL;
  851. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  852. eos = _numstreams==0;
  853. }
  854. ~CMergeRowStreams()
  855. {
  856. delete merger;
  857. delete streamprovider;
  858. }
  859. IMPLEMENT_IINTERFACE;
  860. void stop()
  861. {
  862. if (!eos) {
  863. merger->stop();
  864. eos = true;
  865. }
  866. }
  867. const void *nextRow()
  868. {
  869. if (eos)
  870. return NULL;
  871. const void *r = merger->next();
  872. if (!r) {
  873. stop(); // think ok to stop early
  874. return NULL;
  875. }
  876. return r;
  877. }
  878. };
  879. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  880. {
  881. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  882. }
  883. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  884. {
  885. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  886. }