jsort.cpp 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include <string.h>
  16. #include <limits.h>
  17. #include "jsort.hpp"
  18. #include "jio.hpp"
  19. #include "jmisc.hpp"
  20. #include "jexcept.hpp"
  21. #include "jfile.hpp"
  22. #include "jthread.hpp"
  23. #include "jqueue.tpp"
  24. #ifdef _DEBUG
  25. // #define PARANOID
  26. // #define DOUBLE_COMPARE
  27. //#define TESTPARSORT
  28. //#define MCMERGESTATS
  29. #endif
  30. #define PARALLEL_GRANULARITY 1024
  31. static bool sortParallel(unsigned &numcpus)
  32. {
  33. static unsigned numCPUs = 0;
  34. if (numCPUs==0) {
  35. numCPUs = getAffinityCpus();
  36. }
  37. if ((numcpus==0)||(numcpus>numCPUs))
  38. numcpus = numCPUs;
  39. #ifdef TESTPARSORT
  40. numcpus = 2;
  41. return true; // to test
  42. #endif
  43. return (numcpus>1);
  44. }
  45. //define two variants of the same insertion function.
  46. #define COMPARE(search,position) compare(search,position)
  47. #define INSERT(position,search) memmove(position,search, width)
  48. void * binary_add(const void *newitem, const void *base,
  49. size32_t nmemb,
  50. size32_t width,
  51. int ( *compare)(const void *_e1, const void *_e2),
  52. bool * ItemAdded)
  53. {
  54. #include "jsort.inc"
  55. }
  56. #undef COMPARE
  57. #undef INSERT
  58. //---------------------------------------------------------------------------
  59. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  60. #define INSERT(position,search) *(const void * *)(position) = search
  61. #define NEVER_ADD
  62. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  63. size32_t nmemb,
  64. sortCompareFunction compare,
  65. bool * ItemAdded)
  66. {
  67. #define width sizeof(void*)
  68. #include "jsort.inc"
  69. #undef width
  70. }
  71. #undef NEVER_ADD
  72. #undef INSERT
  73. #undef COMPARE
  74. //---------------------------------------------------------------------------
  75. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  76. #define INSERT(position,search) *(const void * *)(position) = search
  77. #define NEVER_ADD
  78. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  79. size32_t nmemb,
  80. ICompare & compare,
  81. bool * ItemAdded)
  82. {
  83. #define width sizeof(void*)
  84. #include "jsort.inc"
  85. #undef width
  86. }
  87. #undef NEVER_ADD
  88. #undef INSERT
  89. #undef COMPARE
  90. //---------------------------------------------------------------------------
  91. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  92. #define INSERT(position,search) *(const void * *)(position) = search
  93. #define ALWAYS_ADD
  94. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  95. size32_t nmemb,
  96. sortCompareFunction compare)
  97. {
  98. #define width sizeof(void*)
  99. #include "jsort.inc"
  100. #undef width
  101. }
  102. #undef ALWAYS_ADD
  103. #undef INSERT
  104. #undef COMPARE
  105. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  106. #define INSERT(position,search) *(const void * *)(position) = search
  107. #define ALWAYS_ADD
  108. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  109. size32_t nmemb,
  110. ICompare const & compare)
  111. {
  112. #define width sizeof(void*)
  113. #include "jsort.inc"
  114. #undef width
  115. }
  116. #undef ALWAYS_ADD
  117. #undef INSERT
  118. #undef COMPARE
  119. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  120. #define INSERT(position,search) *(const void * *)(position) = search
  121. #define ALWAYS_ADD
  122. #define SEEK_LAST_MATCH
  123. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  124. size32_t nmemb,
  125. sortCompareFunction compare)
  126. {
  127. #define width sizeof(void*)
  128. #include "jsort.inc"
  129. #undef width
  130. }
  131. #undef SEEK_LAST_MATCH
  132. #undef ALWAYS_ADD
  133. #undef INSERT
  134. #undef COMPARE
  135. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  136. #define INSERT(position,search) *(const void * *)(position) = search
  137. #define ALWAYS_ADD
  138. #define SEEK_LAST_MATCH
  139. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  140. size32_t nmemb,
  141. ICompare const & compare)
  142. {
  143. #define width sizeof(void*)
  144. #include "jsort.inc"
  145. #undef width
  146. }
  147. #undef SEEK_LAST_MATCH
  148. #undef ALWAYS_ADD
  149. #undef INSERT
  150. #undef COMPARE
  151. //=========================================================================
  152. // optimized quicksort for array of pointers to fixed size objects
  153. typedef void * ELEMENT;
  154. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  155. #define VECTOR _VECTOR
  156. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  157. #define SWAP swap
  158. #define CMP(a,b) memcmp(*(a),*(b),es)
  159. #define MED3(a,b,c) med3a(a,b,c,es)
  160. #define RECURSE(a,b) qsortvec(a, b, es)
  161. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  162. {
  163. return CMP(a, b) < 0 ?
  164. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  165. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  166. }
  167. void qsortvec(void **a, size32_t n, size32_t es)
  168. #include "jsort2.inc"
  169. #undef CMP
  170. #undef MED3
  171. #undef RECURSE
  172. //---------------------------------------------------------------------------
  173. #define CMP(a,b) (compare(*(a),*(b)))
  174. #define MED3(a,b,c) med3c(a,b,c,compare)
  175. #define RECURSE(a,b) qsortvec(a, b, compare)
  176. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  177. {
  178. return CMP(a, b) < 0 ?
  179. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  180. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  181. }
  182. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  183. #include "jsort2.inc"
  184. #undef CMP
  185. #undef MED3
  186. #undef RECURSE
  187. //---------------------------------------------------------------------------
  188. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  189. #define MED3(a,b,c) med3ic(a,b,c,compare)
  190. #define RECURSE(a,b) qsortvec(a, b, compare)
  191. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  192. {
  193. return CMP(a, b) < 0 ?
  194. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  195. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  196. }
  197. void qsortvec(void **a, size32_t n, const ICompare & compare)
  198. #include "jsort2.inc"
  199. // Parallel version (only 2 threads currently)
  200. class cParQSortBase
  201. {
  202. struct sJobItem
  203. {
  204. unsigned start;
  205. unsigned num;
  206. };
  207. NonReentrantSpinLock joblock;
  208. QueueOf<sJobItem,false> jobq;
  209. Semaphore jobqsem;
  210. unsigned waiting;
  211. unsigned numsubthreads;
  212. bool done;
  213. class cThread: public Thread
  214. {
  215. cParQSortBase *parent;
  216. public:
  217. cThread(cParQSortBase *_parent)
  218. : Thread("cParQSort")
  219. {
  220. parent = _parent;
  221. }
  222. int run()
  223. {
  224. parent->run();
  225. return 0;
  226. }
  227. } **threads;
  228. bool waitForWork(unsigned &s,unsigned &n)
  229. {
  230. NonReentrantSpinBlock block(joblock);
  231. while (!done) {
  232. sJobItem *qi = jobq.dequeue();
  233. if (qi) {
  234. s = qi->start;
  235. n = qi->num;
  236. delete qi;
  237. return true;
  238. }
  239. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  240. done = true;
  241. jobqsem.signal(waiting);
  242. break;
  243. }
  244. waiting++;
  245. NonReentrantSpinUnblock unblock(joblock);
  246. jobqsem.wait();
  247. }
  248. return false;
  249. }
  250. public:
  251. cParQSortBase(unsigned _numsubthreads)
  252. {
  253. numsubthreads = _numsubthreads;
  254. done = false;
  255. waiting = 0;
  256. threads = new cThread*[numsubthreads];
  257. for (unsigned i=0;i<numsubthreads;i++)
  258. threads[i] = new cThread(this);
  259. }
  260. ~cParQSortBase()
  261. {
  262. for (unsigned i=0;i<numsubthreads;i++)
  263. threads[i]->Release();
  264. delete [] threads;
  265. }
  266. void start()
  267. {
  268. for (unsigned i=0;i<numsubthreads;i++)
  269. threads[i]->start();
  270. }
  271. void subsort(unsigned s, unsigned n)
  272. {
  273. do {
  274. sJobItem *qi;
  275. while (n>PARALLEL_GRANULARITY) {
  276. unsigned r1;
  277. unsigned r2;
  278. partition(s, n, r1, r2);
  279. unsigned n2 = n+s-r2;
  280. if (r1==s) {
  281. n = n2;
  282. s = r2;
  283. }
  284. else {
  285. if (n2!=0) {
  286. qi = new sJobItem;
  287. qi->num = n2;
  288. qi->start = r2;
  289. NonReentrantSpinBlock block(joblock);
  290. jobq.enqueue(qi);
  291. if (waiting) {
  292. jobqsem.signal(waiting);
  293. waiting = 0;
  294. }
  295. }
  296. n = r1-s;
  297. }
  298. }
  299. serialsort(s,n);
  300. NonReentrantSpinBlock block(joblock);
  301. if (waiting==numsubthreads) { // well we are done so are rest
  302. done = true;
  303. jobqsem.signal(waiting);
  304. break;
  305. }
  306. }
  307. while(waitForWork(s,n));
  308. }
  309. void run()
  310. {
  311. unsigned s;
  312. unsigned n;
  313. if (waitForWork(s,n))
  314. subsort(s,n);
  315. }
  316. void join()
  317. {
  318. for (unsigned i=0;i<numsubthreads;i++)
  319. threads[i]->join();
  320. }
  321. virtual void serialsort(unsigned from, unsigned len)=0;
  322. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  323. };
  324. #define DOPARTITION \
  325. VECTOR a = array+s; \
  326. VECTOR pm = a + (n / 2); \
  327. VECTOR pl = a; \
  328. VECTOR pn = a + (n - 1) ; \
  329. if (n > 40) { \
  330. unsigned d = (n / 8); \
  331. pl = MED3(pl, pl + d, pl + 2 * d); \
  332. pm = MED3(pm - d, pm, pm + d); \
  333. pn = MED3(pn - 2 * d, pn - d, pn); \
  334. } \
  335. pm = MED3(pl, pm, pn); \
  336. SWAP(a, pm); \
  337. VECTOR pa = a + 1; \
  338. VECTOR pb = pa; \
  339. VECTOR pc = a + (n - 1); \
  340. VECTOR pd = pc; \
  341. int r; \
  342. for (;;) { \
  343. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  344. if (r == 0) { \
  345. SWAP(pa, pb); \
  346. pa++; \
  347. } \
  348. pb++; \
  349. } \
  350. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  351. if (r == 0) { \
  352. SWAP(pc, pd); \
  353. pd--; \
  354. } \
  355. pc--; \
  356. } \
  357. if (pb > pc) \
  358. break; \
  359. SWAP(pb, pc); \
  360. pb++; \
  361. pc--; \
  362. } \
  363. pn = a + n; \
  364. r = MIN(pa - a, pb - pa); \
  365. VECTOR v1 = a; \
  366. VECTOR v2 = pb-r; \
  367. while (r) { \
  368. SWAP(v1,v2); v1++; v2++; r--; \
  369. }; \
  370. r = MIN(pd - pc, pn - pd - 1); \
  371. v1 = pb; \
  372. v2 = pn-r; \
  373. while (r) { \
  374. SWAP(v1,v2); v1++; v2++; r--; \
  375. }; \
  376. r1 = (pb-pa)+s; \
  377. r2 = n-(pd-pc)+s;
  378. class cParQSort: public cParQSortBase
  379. {
  380. VECTOR array;
  381. const ICompare &compare;
  382. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  383. {
  384. DOPARTITION
  385. }
  386. void serialsort(unsigned from, unsigned len)
  387. {
  388. qsortvec(array+from,len,compare);
  389. }
  390. public:
  391. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  392. : cParQSortBase(_numsubthreads), compare(_compare)
  393. {
  394. array = _a;
  395. cParQSortBase::start();
  396. }
  397. };
  398. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  399. {
  400. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  401. qsortvec(a,n,compare);
  402. return;
  403. }
  404. cParQSort sorter(a,compare,numcpus-1);
  405. sorter.subsort(0,n);
  406. sorter.join();
  407. #ifdef TESTPARSORT
  408. for (unsigned i=1;i<n;i++)
  409. if (compare.docompare(a[i-1],a[i])>0)
  410. ERRLOG("parqsortvec failed %d",i);
  411. #endif
  412. }
  413. #undef CMP
  414. #undef MED3
  415. #undef RECURSE
  416. //---------------------------------------------------------------------------
  417. #define CMP(a,b) cmpic2(a,b,compare1,compare2)
  418. #define MED3(a,b,c) med3ic2(a,b,c,compare1,compare2)
  419. #define RECURSE(a,b) qsortvec(a, b, compare1, compare2)
  420. static inline int cmpic2(VECTOR a, VECTOR b, const ICompare & compare1, const ICompare & compare2)
  421. {
  422. int ret = compare1.docompare(*(a),*(b));
  423. if (ret==0)
  424. ret = compare2.docompare(*(a),*(b));
  425. return ret;
  426. }
  427. static inline VECTOR med3ic2(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare1, const ICompare & compare2)
  428. {
  429. return CMP(a, b) < 0 ?
  430. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  431. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  432. }
  433. void qsortvec(void **a, size32_t n, const ICompare & compare1, const ICompare & compare2)
  434. #include "jsort2.inc"
  435. class cParQSort2: public cParQSortBase
  436. {
  437. VECTOR array;
  438. const ICompare &compare1;
  439. const ICompare &compare2;
  440. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  441. {
  442. DOPARTITION
  443. }
  444. void serialsort(unsigned from, unsigned len)
  445. {
  446. qsortvec(array+from,len,compare1,compare2);
  447. }
  448. public:
  449. cParQSort2(VECTOR _a,const ICompare &_compare1,const ICompare &_compare2, unsigned _numsubthreads)
  450. : cParQSortBase(_numsubthreads),compare1(_compare1), compare2(_compare2)
  451. {
  452. array = _a;
  453. cParQSortBase::start();
  454. }
  455. };
  456. void parqsortvec(void **a, size32_t n, const ICompare & compare1, const ICompare & compare2,unsigned numcpus)
  457. {
  458. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  459. qsortvec(a,n,compare1,compare2);
  460. return;
  461. }
  462. cParQSort2 sorter(a,compare1,compare2,numcpus-1);
  463. sorter.subsort(0,n);
  464. sorter.join();
  465. #ifdef TESTPARSORT
  466. for (unsigned i=1;i<n;i++) {
  467. int cmp = compare1.docompare(a[i-1],a[i]);
  468. if (cmp>0)
  469. ERRLOG("parqsortvec(2,1) failed %d",i);
  470. else if ((cmp==0)&&(compare2.docompare(a[i-1],a[i])>0))
  471. ERRLOG("parqsortvec(2,2) failed %d",i);
  472. }
  473. #endif
  474. }
  475. #undef CMP
  476. #undef MED3
  477. #undef RECURSE
  478. //---------------------------------------------------------------------------
  479. #undef VECTOR
  480. #undef SWAP
  481. typedef void *** _IVECTOR;
  482. #define VECTOR _IVECTOR
  483. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  484. #define SWAP swapind
  485. #define CMP(a,b) cmpicindstable(a,b,compare)
  486. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  487. {
  488. int ret = compare.docompare(**a,**b);
  489. if (ret==0)
  490. if (*a>*b)
  491. ret = 1;
  492. else if (*a<*b)
  493. ret = -1;
  494. return ret;
  495. }
  496. #define MED3(a,b,c) med3ic(a,b,c,compare)
  497. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  498. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  499. {
  500. return CMP(a, b) < 0 ?
  501. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  502. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  503. }
  504. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  505. #include "jsort2.inc"
  506. class cParQSortStable: public cParQSortBase
  507. {
  508. VECTOR array;
  509. const ICompare &compare;
  510. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  511. {
  512. DOPARTITION
  513. }
  514. void serialsort(unsigned from, unsigned len)
  515. {
  516. doqsortvecstable(array+from,len,compare);
  517. }
  518. public:
  519. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  520. : cParQSortBase(_numsubthreads),compare(_compare)
  521. {
  522. array = _a;
  523. cParQSortBase::start();
  524. }
  525. };
  526. #undef CMP
  527. #undef CMP1
  528. #undef MED3
  529. #undef RECURSE
  530. #undef VECTOR
  531. void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  532. {
  533. for(unsigned i=0; i<n; ++i)
  534. index[i] = rows+i;
  535. doqsortvecstable(index, n, compare);
  536. }
  537. void parqsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  538. {
  539. for(unsigned i=0; i<n; ++i)
  540. index[i] = rows+i;
  541. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  542. doqsortvecstable(index,n,compare);
  543. return;
  544. }
  545. cParQSortStable sorter(index,compare,numcpus-1);
  546. sorter.subsort(0,n);
  547. sorter.join();
  548. }
  549. //=========================================================================
  550. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  551. {
  552. bool nochange = true;
  553. while(1)
  554. {
  555. unsigned c = p*2 + 1;
  556. if(c >= num)
  557. return nochange;
  558. if(c+1 < num)
  559. {
  560. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  561. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  562. ++c;
  563. }
  564. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  565. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  566. return nochange;
  567. nochange = false;
  568. unsigned r = heap[c];
  569. heap[c] = heap[p];
  570. heap[p] = r;
  571. p = c;
  572. }
  573. }
  574. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  575. {
  576. bool nochange = true;
  577. while(c > 0)
  578. {
  579. unsigned p = (unsigned)(c-1)/2;
  580. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  581. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  582. return nochange;
  583. nochange = false;
  584. unsigned r = heap[c];
  585. heap[c] = heap[p];
  586. heap[p] = r;
  587. c = p;
  588. }
  589. return nochange;
  590. }
  591. //=========================================================================
  592. #include <assert.h>
  593. #include <stdio.h>
  594. #include <stdlib.h>
  595. #include <fcntl.h>
  596. #include <string.h>
  597. #include <stddef.h>
  598. #include <time.h>
  599. #ifdef _WIN32
  600. #include <io.h>
  601. #include <sys\types.h>
  602. #include <sys\stat.h>
  603. #else
  604. #include <sys/types.h>
  605. #include <sys/stat.h>
  606. #endif
  607. #ifndef off_t
  608. #define off_t __int64
  609. #endif
  610. typedef void ** VECTOR;
  611. interface IMergeSorter
  612. {
  613. public:
  614. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  615. };
  616. #define INSERTMAX 10000
  617. #define BUFFSIZE 0x100000 // used for output buffer
  618. //==================================================================================================
  619. #ifdef DOUBLE_COMPARE
  620. #define BuffCompare(a, b) ((MSbuffcomp(a,b,inbuffers,mergeheap,icmp)+MSbuffcomp(a,b,inbuffers,mergeheap,icmp))/2)
  621. #else
  622. #define BuffCompare(a, b) MSbuffcomp(a,b,inbuffers,mergeheap,icmp)
  623. #endif
  624. static inline int MSbuffcomp(unsigned a,unsigned b, void **inbuffers, unsigned *mergeheap, ICompare *icmp)
  625. {
  626. int ret = icmp->docompare(inbuffers[mergeheap[a]], inbuffers[mergeheap[b]]);
  627. if (ret==0)
  628. ret = (int)mergeheap[a]-mergeheap[b];
  629. return ret;
  630. }
  631. class CRowStreamMerger
  632. {
  633. const void **pending;
  634. size32_t *recsize;
  635. unsigned *mergeheap;
  636. unsigned activeInputs;
  637. count_t recno;
  638. ICompare *icmp;
  639. bool partdedup;
  640. IRowProvider &provider;
  641. MemoryAttr workingbuf;
  642. #ifdef _DEBUG
  643. bool *stopped;
  644. MemoryAttr ma;
  645. #endif
  646. inline int buffCompare(unsigned a, unsigned b)
  647. {
  648. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  649. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  650. }
  651. bool promote(unsigned p)
  652. {
  653. activeInputs--;
  654. if(activeInputs == p)
  655. return false;
  656. mergeheap[p] = mergeheap[activeInputs];
  657. return true;
  658. }
  659. inline bool siftDown(unsigned p)
  660. {
  661. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  662. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  663. bool nochange = true;
  664. while(1)
  665. {
  666. unsigned c = p*2 + 1;
  667. if(c >= activeInputs)
  668. return nochange;
  669. if(c+1 < activeInputs)
  670. {
  671. int childcmp = buffCompare(c+1, c);
  672. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  673. ++c;
  674. }
  675. int cmp = buffCompare(c, p);
  676. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  677. return nochange;
  678. nochange = false;
  679. unsigned r = mergeheap[c];
  680. mergeheap[c] = mergeheap[p];
  681. mergeheap[p] = r;
  682. p = c;
  683. }
  684. }
  685. void siftDownDedupTop()
  686. {
  687. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  688. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  689. if(activeInputs < 2)
  690. return;
  691. unsigned c = 1;
  692. int childcmp = 1;
  693. if(activeInputs >= 3)
  694. {
  695. childcmp = buffCompare(2, 1);
  696. if(childcmp < 0)
  697. c = 2;
  698. }
  699. int cmp = buffCompare(c, 0);
  700. if(cmp > 0)
  701. return;
  702. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  703. while(cmp <= 0)
  704. {
  705. if(cmp == 0)
  706. {
  707. if(mergeheap[c] < mergeheap[0])
  708. {
  709. unsigned r = mergeheap[c];
  710. mergeheap[c] = mergeheap[0];
  711. mergeheap[0] = r;
  712. }
  713. if(!pullInput(mergeheap[c]))
  714. if(!promote(c))
  715. break;
  716. siftDown(c);
  717. }
  718. else
  719. {
  720. unsigned r = mergeheap[c];
  721. mergeheap[c] = mergeheap[0];
  722. mergeheap[0] = r;
  723. if(siftDown(c))
  724. break;
  725. }
  726. cmp = buffCompare(c, 0);
  727. }
  728. // the following loop ensures the uniqueness property holds on the other branch too
  729. c = 3-c;
  730. if(activeInputs <= c)
  731. return;
  732. while(childcmp == 0)
  733. {
  734. if(mergeheap[c] < mergeheap[0])
  735. {
  736. unsigned r = mergeheap[c];
  737. mergeheap[c] = mergeheap[0];
  738. mergeheap[0] = r;
  739. }
  740. if(!pullInput(mergeheap[c]))
  741. if(!promote(c))
  742. break;
  743. siftDown(c);
  744. childcmp = buffCompare(c, 0);
  745. }
  746. }
  747. void init()
  748. {
  749. if (activeInputs>0) {
  750. // setup heap property
  751. if (activeInputs >= 2)
  752. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  753. siftDown(p);
  754. if (partdedup)
  755. siftDownDedupTop();
  756. else
  757. siftDown(0);
  758. }
  759. recno = 0;
  760. }
  761. inline count_t num() const { return recno; }
  762. inline bool _next()
  763. {
  764. if (!activeInputs)
  765. return false;
  766. if (recno) {
  767. if(!pullInput(mergeheap[0]))
  768. if(!promote(0))
  769. return false;
  770. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  771. if(partdedup)
  772. siftDownDedupTop();
  773. else
  774. siftDown(0);
  775. }
  776. recno++;
  777. return true;
  778. }
  779. inline bool eof() const { return activeInputs==0; }
  780. bool pullInput(unsigned i)
  781. {
  782. if (pending[i]) {
  783. assertex(partdedup);
  784. provider.releaseRow(pending[i]);
  785. }
  786. pending[i] = provider.nextRow(i);
  787. if (pending[i])
  788. return true;
  789. provider.stop(i);
  790. #ifdef _DEBUG
  791. assertex(!stopped[i]);
  792. stopped[i] = true;
  793. #endif
  794. return false;
  795. }
  796. public:
  797. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams,ICompare *_icmp,bool _partdedup=false)
  798. : provider(_provider)
  799. {
  800. partdedup = _partdedup;
  801. icmp = _icmp;
  802. recsize = NULL;
  803. activeInputs = 0;
  804. #ifdef _DEBUG
  805. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  806. memset(stopped,0,numstreams*sizeof(bool));
  807. #endif
  808. unsigned i;
  809. recsize = NULL;
  810. if (numstreams) {
  811. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  812. pending = (const void **)buf;
  813. mergeheap = (unsigned *)(pending+numstreams);
  814. for (i=0;i<numstreams;i++) {
  815. pending[i] = NULL;
  816. if (pullInput(i))
  817. mergeheap[activeInputs++] = i;
  818. }
  819. }
  820. else {
  821. pending = NULL;
  822. mergeheap = NULL;
  823. }
  824. init();
  825. }
  826. void stop()
  827. {
  828. while (activeInputs) {
  829. activeInputs--;
  830. if (pending[mergeheap[activeInputs]]) {
  831. provider.releaseRow(pending[mergeheap[activeInputs]]);
  832. #ifdef _DEBUG
  833. assertex(!stopped[mergeheap[activeInputs]]);
  834. stopped[mergeheap[activeInputs]] = true;
  835. #endif
  836. provider.stop(mergeheap[activeInputs]);
  837. }
  838. }
  839. pending = NULL;
  840. mergeheap = NULL;
  841. workingbuf.clear();
  842. }
  843. ~CRowStreamMerger()
  844. {
  845. stop();
  846. }
  847. inline const void * next()
  848. {
  849. if (!_next())
  850. return NULL;
  851. unsigned strm = mergeheap[0];
  852. const void *row = pending[strm];
  853. pending[strm] = NULL;
  854. return row;
  855. }
  856. };
  857. class CMergeRowStreams : public CInterface, implements IRowStream
  858. {
  859. protected:
  860. CRowStreamMerger *merger;
  861. bool eos;
  862. class cProvider: public CInterface, implements IRowProvider
  863. {
  864. IArrayOf<IRowStream> ostreams;
  865. IRowStream **streams;
  866. Linked<IRowLinkCounter> linkcounter;
  867. const void *nextRow(unsigned idx)
  868. {
  869. return streams[idx]->nextRow();
  870. };
  871. void linkRow(const void *row)
  872. {
  873. linkcounter->linkRow(row);
  874. }
  875. void releaseRow(const void *row)
  876. {
  877. linkcounter->releaseRow(row);
  878. }
  879. void stop(unsigned idx)
  880. {
  881. streams[idx]->stop();
  882. }
  883. public:
  884. IMPLEMENT_IINTERFACE;
  885. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  886. : linkcounter(_linkcounter)
  887. {
  888. ostreams.ensure(numstreams);
  889. unsigned n = 0;
  890. while (n<numstreams)
  891. ostreams.append(*LINK(_streams[n++]));
  892. streams = ostreams.getArray();
  893. }
  894. } *streamprovider;
  895. public:
  896. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  897. {
  898. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  899. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  900. eos = _numstreams==0;
  901. }
  902. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  903. {
  904. streamprovider = NULL;
  905. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  906. eos = _numstreams==0;
  907. }
  908. ~CMergeRowStreams()
  909. {
  910. delete merger;
  911. delete streamprovider;
  912. }
  913. IMPLEMENT_IINTERFACE;
  914. void stop()
  915. {
  916. if (!eos) {
  917. merger->stop();
  918. eos = true;
  919. }
  920. }
  921. const void *nextRow()
  922. {
  923. if (eos)
  924. return NULL;
  925. const void *r = merger->next();
  926. if (!r) {
  927. stop(); // think ok to stop early
  928. return NULL;
  929. }
  930. return r;
  931. }
  932. };
  933. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  934. {
  935. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  936. }
  937. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  938. {
  939. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  940. }