jsort.cpp 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _DEBUG
  26. // #define PARANOID
  27. //#define TESTPARSORT
  28. //#define MCMERGESTATS
  29. #endif
  30. //#define PARANOID_PARTITION
  31. //#define TRACE_PARTITION
  32. #define PARALLEL_GRANULARITY 1024
  33. static bool sortParallel(unsigned &numcpus)
  34. {
  35. unsigned numCPUs = getAffinityCpus();
  36. if ((numcpus==0)||(numcpus>numCPUs))
  37. numcpus = numCPUs;
  38. #ifdef TESTPARSORT
  39. numcpus = 2;
  40. return true; // to test
  41. #endif
  42. return (numcpus>1);
  43. }
  44. //define two variants of the same insertion function.
  45. #define COMPARE(search,position) compare(search,position)
  46. #define INSERT(position,search) memmove(position,search, width)
  47. NO_SANITIZE_FUNCTION
  48. void * binary_add(const void *newitem, const void *base,
  49. size32_t nmemb,
  50. size32_t width,
  51. int ( *compare)(const void *_e1, const void *_e2),
  52. bool * ItemAdded)
  53. {
  54. #include "jsort.inc"
  55. }
  56. #undef COMPARE
  57. #undef INSERT
  58. //---------------------------------------------------------------------------
  59. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  60. #define INSERT(position,search) *(const void * *)(position) = search
  61. #define NEVER_ADD
  62. NO_SANITIZE_FUNCTION
  63. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  64. size32_t nmemb,
  65. sortCompareFunction compare,
  66. bool * ItemAdded)
  67. {
  68. #define width sizeof(void*)
  69. #include "jsort.inc"
  70. #undef width
  71. }
  72. #undef NEVER_ADD
  73. #undef INSERT
  74. #undef COMPARE
  75. //---------------------------------------------------------------------------
  76. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  77. #define INSERT(position,search) *(const void * *)(position) = search
  78. #define NEVER_ADD
  79. NO_SANITIZE_FUNCTION
  80. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  81. size32_t nmemb,
  82. ICompare & compare,
  83. bool * ItemAdded)
  84. {
  85. #define width sizeof(void*)
  86. #include "jsort.inc"
  87. #undef width
  88. }
  89. #undef NEVER_ADD
  90. #undef INSERT
  91. #undef COMPARE
  92. //---------------------------------------------------------------------------
  93. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  94. #define INSERT(position,search) *(const void * *)(position) = search
  95. #define ALWAYS_ADD
  96. NO_SANITIZE_FUNCTION
  97. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  98. size32_t nmemb,
  99. sortCompareFunction compare)
  100. {
  101. #define width sizeof(void*)
  102. #include "jsort.inc"
  103. #undef width
  104. }
  105. #undef ALWAYS_ADD
  106. #undef INSERT
  107. #undef COMPARE
  108. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  109. #define INSERT(position,search) *(const void * *)(position) = search
  110. #define ALWAYS_ADD
  111. NO_SANITIZE_FUNCTION
  112. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  113. size32_t nmemb,
  114. ICompare const & compare)
  115. {
  116. #define width sizeof(void*)
  117. #include "jsort.inc"
  118. #undef width
  119. }
  120. #undef ALWAYS_ADD
  121. #undef INSERT
  122. #undef COMPARE
  123. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  124. #define INSERT(position,search) *(const void * *)(position) = search
  125. #define ALWAYS_ADD
  126. #define SEEK_LAST_MATCH
  127. NO_SANITIZE_FUNCTION
  128. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  129. size32_t nmemb,
  130. sortCompareFunction compare)
  131. {
  132. #define width sizeof(void*)
  133. #include "jsort.inc"
  134. #undef width
  135. }
  136. #undef SEEK_LAST_MATCH
  137. #undef ALWAYS_ADD
  138. #undef INSERT
  139. #undef COMPARE
  140. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  141. #define INSERT(position,search) *(const void * *)(position) = search
  142. #define ALWAYS_ADD
  143. #define SEEK_LAST_MATCH
  144. NO_SANITIZE_FUNCTION
  145. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  146. size32_t nmemb,
  147. ICompare const & compare)
  148. {
  149. #define width sizeof(void*)
  150. #include "jsort.inc"
  151. #undef width
  152. }
  153. #undef SEEK_LAST_MATCH
  154. #undef ALWAYS_ADD
  155. #undef INSERT
  156. #undef COMPARE
  157. //=========================================================================
  158. // optimized quicksort for array of pointers to fixed size objects
  159. typedef void * ELEMENT;
  160. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  161. #define VECTOR _VECTOR
  162. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  163. #define SWAP swap
  164. #define CMP(a,b) memcmp(*(a),*(b),es)
  165. #define MED3(a,b,c) med3a(a,b,c,es)
  166. #define RECURSE(a,b) qsortvec(a, b, es)
  167. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  168. {
  169. return CMP(a, b) < 0 ?
  170. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  171. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  172. }
  173. void qsortvec(void **a, size32_t n, size32_t es)
  174. #include "jsort2.inc"
  175. #undef CMP
  176. #undef MED3
  177. #undef RECURSE
  178. //---------------------------------------------------------------------------
  179. #define CMP(a,b) (compare(*(a),*(b)))
  180. #define MED3(a,b,c) med3c(a,b,c,compare)
  181. #define RECURSE(a,b) qsortvec(a, b, compare)
  182. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  183. {
  184. return CMP(a, b) < 0 ?
  185. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  186. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  187. }
  188. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  189. #include "jsort2.inc"
  190. #undef CMP
  191. #undef MED3
  192. #undef RECURSE
  193. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  194. #define MED3(a,b,c) med3ic(a,b,c,compare)
  195. #define RECURSE(a,b) qsortvec(a, b, compare)
  196. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  197. {
  198. return CMP(a, b) < 0 ?
  199. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  200. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  201. }
  202. void qsortvec(void **a, size32_t n, const ICompare & compare)
  203. #include "jsort2.inc"
  204. // Parallel version (only 2 threads currently)
  205. class cParQSortBase
  206. {
  207. struct sJobItem
  208. {
  209. unsigned start;
  210. unsigned num;
  211. };
  212. NonReentrantSpinLock joblock;
  213. QueueOf<sJobItem,false> jobq;
  214. Semaphore jobqsem;
  215. unsigned waiting;
  216. unsigned numsubthreads;
  217. bool done;
  218. class cThread: public Thread
  219. {
  220. cParQSortBase *parent;
  221. public:
  222. cThread(cParQSortBase *_parent)
  223. : Thread("cParQSort")
  224. {
  225. parent = _parent;
  226. }
  227. int run()
  228. {
  229. parent->run();
  230. return 0;
  231. }
  232. } **threads;
  233. bool waitForWork(unsigned &s,unsigned &n)
  234. {
  235. NonReentrantSpinBlock block(joblock);
  236. while (!done) {
  237. sJobItem *qi = jobq.dequeue();
  238. if (qi) {
  239. s = qi->start;
  240. n = qi->num;
  241. delete qi;
  242. return true;
  243. }
  244. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  245. done = true;
  246. jobqsem.signal(waiting);
  247. break;
  248. }
  249. waiting++;
  250. NonReentrantSpinUnblock unblock(joblock);
  251. jobqsem.wait();
  252. }
  253. s = 0; // remove uninitialised variable warnings
  254. n = 0;
  255. return false;
  256. }
  257. public:
  258. cParQSortBase(unsigned _numsubthreads)
  259. {
  260. numsubthreads = _numsubthreads;
  261. done = false;
  262. waiting = 0;
  263. threads = new cThread*[numsubthreads];
  264. for (unsigned i=0;i<numsubthreads;i++)
  265. threads[i] = new cThread(this);
  266. }
  267. ~cParQSortBase()
  268. {
  269. for (unsigned i=0;i<numsubthreads;i++)
  270. threads[i]->Release();
  271. delete [] threads;
  272. }
  273. void start()
  274. {
  275. for (unsigned i=0;i<numsubthreads;i++)
  276. threads[i]->start();
  277. }
  278. void subsort(unsigned s, unsigned n)
  279. {
  280. do {
  281. sJobItem *qi;
  282. while (n>PARALLEL_GRANULARITY) {
  283. unsigned r1;
  284. unsigned r2;
  285. partition(s, n, r1, r2);
  286. unsigned n2 = n+s-r2;
  287. if (r1==s) {
  288. n = n2;
  289. s = r2;
  290. }
  291. else {
  292. if (n2!=0) {
  293. qi = new sJobItem;
  294. qi->num = n2;
  295. qi->start = r2;
  296. NonReentrantSpinBlock block(joblock);
  297. jobq.enqueue(qi);
  298. if (waiting) {
  299. jobqsem.signal(waiting);
  300. waiting = 0;
  301. }
  302. }
  303. n = r1-s;
  304. }
  305. }
  306. serialsort(s,n);
  307. NonReentrantSpinBlock block(joblock);
  308. if (waiting==numsubthreads) { // well we are done so are rest
  309. done = true;
  310. jobqsem.signal(waiting);
  311. break;
  312. }
  313. }
  314. while(waitForWork(s,n));
  315. }
  316. void run()
  317. {
  318. unsigned s;
  319. unsigned n;
  320. if (waitForWork(s,n))
  321. subsort(s,n);
  322. }
  323. void join()
  324. {
  325. for (unsigned i=0;i<numsubthreads;i++)
  326. threads[i]->join();
  327. }
  328. virtual void serialsort(unsigned from, unsigned len)=0;
  329. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  330. };
  331. #define DOPARTITION \
  332. VECTOR a = array+s; \
  333. VECTOR pm = a + (n / 2); \
  334. VECTOR pl = a; \
  335. VECTOR pn = a + (n - 1) ; \
  336. if (n > 40) { \
  337. unsigned d = (n / 8); \
  338. pl = MED3(pl, pl + d, pl + 2 * d); \
  339. pm = MED3(pm - d, pm, pm + d); \
  340. pn = MED3(pn - 2 * d, pn - d, pn); \
  341. } \
  342. pm = MED3(pl, pm, pn); \
  343. SWAP(a, pm); \
  344. VECTOR pa = a + 1; \
  345. VECTOR pb = pa; \
  346. VECTOR pc = a + (n - 1); \
  347. VECTOR pd = pc; \
  348. int r; \
  349. for (;;) { \
  350. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  351. if (r == 0) { \
  352. SWAP(pa, pb); \
  353. pa++; \
  354. } \
  355. pb++; \
  356. } \
  357. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  358. if (r == 0) { \
  359. SWAP(pc, pd); \
  360. pd--; \
  361. } \
  362. pc--; \
  363. } \
  364. if (pb > pc) \
  365. break; \
  366. SWAP(pb, pc); \
  367. pb++; \
  368. pc--; \
  369. } \
  370. pn = a + n; \
  371. r = MIN(pa - a, pb - pa); \
  372. VECTOR v1 = a; \
  373. VECTOR v2 = pb-r; \
  374. while (r) { \
  375. SWAP(v1,v2); v1++; v2++; r--; \
  376. }; \
  377. r = MIN(pd - pc, pn - pd - 1); \
  378. v1 = pb; \
  379. v2 = pn-r; \
  380. while (r) { \
  381. SWAP(v1,v2); v1++; v2++; r--; \
  382. }; \
  383. r1 = (pb-pa)+s; \
  384. r2 = n-(pd-pc)+s;
  385. class cParQSort: public cParQSortBase
  386. {
  387. VECTOR array;
  388. const ICompare &compare;
  389. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  390. {
  391. DOPARTITION
  392. }
  393. void serialsort(unsigned from, unsigned len)
  394. {
  395. qsortvec(array+from,len,compare);
  396. }
  397. public:
  398. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  399. : cParQSortBase(_numsubthreads), compare(_compare)
  400. {
  401. array = _a;
  402. cParQSortBase::start();
  403. }
  404. };
  405. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  406. {
  407. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  408. qsortvec(a,n,compare);
  409. return;
  410. }
  411. cParQSort sorter(a,compare,numcpus-1);
  412. sorter.subsort(0,n);
  413. sorter.join();
  414. #ifdef TESTPARSORT
  415. for (unsigned i=1;i<n;i++)
  416. if (compare.docompare(a[i-1],a[i])>0)
  417. IERRLOG("parqsortvec failed %d",i);
  418. #endif
  419. }
  420. #undef CMP
  421. #undef MED3
  422. #undef RECURSE
  423. //---------------------------------------------------------------------------
  424. #undef VECTOR
  425. #undef SWAP
  426. typedef void *** _IVECTOR;
  427. #define VECTOR _IVECTOR
  428. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  429. #define SWAP swapind
  430. #define CMP(a,b) cmpicindstable(a,b,compare)
  431. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  432. {
  433. int ret = compare.docompare(**a,**b);
  434. if (ret==0)
  435. {
  436. if (*a>*b)
  437. ret = 1;
  438. else if (*a<*b)
  439. ret = -1;
  440. }
  441. return ret;
  442. }
  443. #define MED3(a,b,c) med3ic(a,b,c,compare)
  444. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  445. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  446. {
  447. return CMP(a, b) < 0 ?
  448. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  449. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  450. }
  451. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  452. #include "jsort2.inc"
  453. class cParQSortStable: public cParQSortBase
  454. {
  455. VECTOR array;
  456. const ICompare &compare;
  457. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  458. {
  459. DOPARTITION
  460. }
  461. void serialsort(unsigned from, unsigned len)
  462. {
  463. doqsortvecstable(array+from,len,compare);
  464. }
  465. public:
  466. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  467. : cParQSortBase(_numsubthreads),compare(_compare)
  468. {
  469. array = _a;
  470. cParQSortBase::start();
  471. }
  472. };
  473. #undef CMP
  474. #undef CMP1
  475. #undef MED3
  476. #undef RECURSE
  477. #undef VECTOR
  478. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  479. {
  480. for(unsigned i=0; i<n; ++i)
  481. index[i] = rows+i;
  482. doqsortvecstable(index, n, compare);
  483. }
  484. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  485. {
  486. memcpy(temp, rows, n * sizeof(void*));
  487. qsortvecstable(temp, n, compare, (void * * *)rows);
  488. //I'm sure this violates the aliasing rules...
  489. void * * * rowsAsIndex = (void * * *)rows;
  490. for(unsigned i=0; i<n; ++i)
  491. rows[i] = *rowsAsIndex[i];
  492. }
  493. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  494. {
  495. for(unsigned i=0; i<n; ++i)
  496. index[i] = rows+i;
  497. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  498. doqsortvecstable(index,n,compare);
  499. return;
  500. }
  501. cParQSortStable sorter(index,compare,numcpus-1);
  502. sorter.subsort(0,n);
  503. sorter.join();
  504. }
  505. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  506. {
  507. memcpy(temp, rows, n * sizeof(void*));
  508. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  509. //I'm sure this violates the aliasing rules...
  510. void * * * rowsAsIndex = (void * * *)rows;
  511. for(size32_t i=0; i<n; ++i)
  512. rows[i] = *rowsAsIndex[i];
  513. }
  514. //=========================================================================
  515. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  516. {
  517. bool nochange = true;
  518. while(1)
  519. {
  520. unsigned c = p*2 + 1;
  521. if(c >= num)
  522. return nochange;
  523. if(c+1 < num)
  524. {
  525. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  526. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  527. ++c;
  528. }
  529. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  530. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  531. return nochange;
  532. nochange = false;
  533. unsigned r = heap[c];
  534. heap[c] = heap[p];
  535. heap[p] = r;
  536. p = c;
  537. }
  538. }
  539. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  540. {
  541. bool nochange = true;
  542. while(c > 0)
  543. {
  544. unsigned p = (unsigned)(c-1)/2;
  545. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  546. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  547. return nochange;
  548. nochange = false;
  549. unsigned r = heap[c];
  550. heap[c] = heap[p];
  551. heap[p] = r;
  552. c = p;
  553. }
  554. return nochange;
  555. }
  556. //=========================================================================
  557. #include <assert.h>
  558. #include <stdio.h>
  559. #include <stdlib.h>
  560. #include <fcntl.h>
  561. #include <string.h>
  562. #include <stddef.h>
  563. #include <time.h>
  564. #ifdef _WIN32
  565. #include <io.h>
  566. #include <sys\types.h>
  567. #include <sys\stat.h>
  568. #else
  569. #include <sys/types.h>
  570. #include <sys/stat.h>
  571. #endif
  572. #ifndef off_t
  573. #define off_t __int64
  574. #endif
  575. typedef void ** VECTOR;
  576. interface IMergeSorter
  577. {
  578. public:
  579. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  580. };
  581. #define INSERTMAX 10000
  582. #define BUFFSIZE 0x100000 // used for output buffer
  583. //==================================================================================================
  584. class CRowStreamMerger
  585. {
  586. const void **pending;
  587. size32_t *recsize;
  588. unsigned *mergeheap;
  589. unsigned activeInputs;
  590. count_t recno;
  591. const ICompare *icmp;
  592. bool partdedup;
  593. IRowProvider &provider;
  594. MemoryAttr workingbuf;
  595. #ifdef _DEBUG
  596. bool *stopped;
  597. MemoryAttr ma;
  598. #endif
  599. inline int buffCompare(unsigned a, unsigned b)
  600. {
  601. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  602. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  603. }
  604. bool promote(unsigned p)
  605. {
  606. activeInputs--;
  607. if(activeInputs == p)
  608. return false;
  609. mergeheap[p] = mergeheap[activeInputs];
  610. return true;
  611. }
  612. inline bool siftDown(unsigned p)
  613. {
  614. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  615. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  616. bool nochange = true;
  617. while(1)
  618. {
  619. unsigned c = p*2 + 1;
  620. if(c >= activeInputs)
  621. return nochange;
  622. if(c+1 < activeInputs)
  623. {
  624. int childcmp = buffCompare(c+1, c);
  625. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  626. ++c;
  627. }
  628. int cmp = buffCompare(c, p);
  629. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  630. return nochange;
  631. nochange = false;
  632. unsigned r = mergeheap[c];
  633. mergeheap[c] = mergeheap[p];
  634. mergeheap[p] = r;
  635. p = c;
  636. }
  637. }
  638. void siftDownDedupTop()
  639. {
  640. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  641. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  642. if(activeInputs < 2)
  643. return;
  644. unsigned c = 1;
  645. int childcmp = 1;
  646. if(activeInputs >= 3)
  647. {
  648. childcmp = buffCompare(2, 1);
  649. if(childcmp < 0)
  650. c = 2;
  651. }
  652. int cmp = buffCompare(c, 0);
  653. if(cmp > 0)
  654. return;
  655. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  656. while(cmp <= 0)
  657. {
  658. if(cmp == 0)
  659. {
  660. if(mergeheap[c] < mergeheap[0])
  661. {
  662. unsigned r = mergeheap[c];
  663. mergeheap[c] = mergeheap[0];
  664. mergeheap[0] = r;
  665. }
  666. if(!pullInput(mergeheap[c]))
  667. if(!promote(c))
  668. break;
  669. siftDown(c);
  670. }
  671. else
  672. {
  673. unsigned r = mergeheap[c];
  674. mergeheap[c] = mergeheap[0];
  675. mergeheap[0] = r;
  676. if(siftDown(c))
  677. break;
  678. }
  679. cmp = buffCompare(c, 0);
  680. }
  681. // the following loop ensures the uniqueness property holds on the other branch too
  682. c = 3-c;
  683. if(activeInputs <= c)
  684. return;
  685. while(childcmp == 0)
  686. {
  687. if(mergeheap[c] < mergeheap[0])
  688. {
  689. unsigned r = mergeheap[c];
  690. mergeheap[c] = mergeheap[0];
  691. mergeheap[0] = r;
  692. }
  693. if(!pullInput(mergeheap[c]))
  694. if(!promote(c))
  695. break;
  696. siftDown(c);
  697. childcmp = buffCompare(c, 0);
  698. }
  699. }
  700. void init()
  701. {
  702. if (activeInputs>0) {
  703. // setup heap property
  704. if (activeInputs >= 2)
  705. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  706. siftDown(p);
  707. if (partdedup)
  708. siftDownDedupTop();
  709. else
  710. siftDown(0);
  711. }
  712. recno = 0;
  713. }
  714. inline count_t num() const { return recno; }
  715. inline bool _next()
  716. {
  717. if (!activeInputs)
  718. return false;
  719. if (recno) {
  720. if(!pullInput(mergeheap[0]))
  721. if(!promote(0))
  722. return false;
  723. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  724. if(partdedup)
  725. siftDownDedupTop();
  726. else
  727. siftDown(0);
  728. }
  729. recno++;
  730. return true;
  731. }
  732. inline bool eof() const { return activeInputs==0; }
  733. bool pullInput(unsigned i)
  734. {
  735. if (pending[i]) {
  736. assertex(partdedup);
  737. provider.releaseRow(pending[i]);
  738. }
  739. pending[i] = provider.nextRow(i);
  740. if (pending[i])
  741. return true;
  742. provider.stop(i);
  743. #ifdef _DEBUG
  744. assertex(!stopped[i]);
  745. stopped[i] = true;
  746. #endif
  747. return false;
  748. }
  749. public:
  750. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  751. : provider(_provider)
  752. {
  753. partdedup = _partdedup;
  754. icmp = _icmp;
  755. recsize = NULL;
  756. activeInputs = 0;
  757. #ifdef _DEBUG
  758. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  759. memset(stopped,0,numstreams*sizeof(bool));
  760. #endif
  761. unsigned i;
  762. recsize = NULL;
  763. if (numstreams) {
  764. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  765. pending = (const void **)buf;
  766. mergeheap = (unsigned *)(pending+numstreams);
  767. for (i=0;i<numstreams;i++) {
  768. pending[i] = NULL;
  769. if (pullInput(i))
  770. mergeheap[activeInputs++] = i;
  771. }
  772. }
  773. else {
  774. pending = NULL;
  775. mergeheap = NULL;
  776. }
  777. init();
  778. }
  779. void stop()
  780. {
  781. while (activeInputs) {
  782. activeInputs--;
  783. if (pending[mergeheap[activeInputs]]) {
  784. provider.releaseRow(pending[mergeheap[activeInputs]]);
  785. #ifdef _DEBUG
  786. assertex(!stopped[mergeheap[activeInputs]]);
  787. stopped[mergeheap[activeInputs]] = true;
  788. #endif
  789. provider.stop(mergeheap[activeInputs]);
  790. }
  791. }
  792. pending = NULL;
  793. mergeheap = NULL;
  794. workingbuf.clear();
  795. }
  796. ~CRowStreamMerger()
  797. {
  798. stop();
  799. }
  800. inline const void * next()
  801. {
  802. if (!_next())
  803. return NULL;
  804. unsigned strm = mergeheap[0];
  805. const void *row = pending[strm];
  806. pending[strm] = NULL;
  807. return row;
  808. }
  809. };
  810. class CMergeRowStreams : implements IRowStream, public CInterface
  811. {
  812. protected:
  813. CRowStreamMerger *merger;
  814. bool eos;
  815. class cProvider: implements IRowProvider, public CInterface
  816. {
  817. IArrayOf<IRowStream> ostreams;
  818. IRowStream **streams;
  819. Linked<IRowLinkCounter> linkcounter;
  820. const void *nextRow(unsigned idx)
  821. {
  822. return streams[idx]->nextRow();
  823. };
  824. void linkRow(const void *row)
  825. {
  826. linkcounter->linkRow(row);
  827. }
  828. void releaseRow(const void *row)
  829. {
  830. linkcounter->releaseRow(row);
  831. }
  832. void stop(unsigned idx)
  833. {
  834. streams[idx]->stop();
  835. }
  836. public:
  837. IMPLEMENT_IINTERFACE;
  838. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  839. : linkcounter(_linkcounter)
  840. {
  841. ostreams.ensureCapacity(numstreams);
  842. unsigned n = 0;
  843. while (n<numstreams)
  844. ostreams.append(*LINK(_streams[n++]));
  845. streams = ostreams.getArray();
  846. }
  847. } *streamprovider;
  848. public:
  849. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  850. {
  851. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  852. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  853. eos = _numstreams==0;
  854. }
  855. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  856. {
  857. streamprovider = NULL;
  858. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  859. eos = _numstreams==0;
  860. }
  861. ~CMergeRowStreams()
  862. {
  863. delete merger;
  864. delete streamprovider;
  865. }
  866. IMPLEMENT_IINTERFACE;
  867. void stop()
  868. {
  869. if (!eos) {
  870. merger->stop();
  871. eos = true;
  872. }
  873. }
  874. const void *nextRow()
  875. {
  876. if (eos)
  877. return NULL;
  878. const void *r = merger->next();
  879. if (!r) {
  880. stop(); // think ok to stop early
  881. return NULL;
  882. }
  883. return r;
  884. }
  885. };
  886. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  887. {
  888. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  889. }
  890. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  891. {
  892. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  893. }