jsort.cpp 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <string.h>
  15. #include <limits.h>
  16. #include "jsort.hpp"
  17. #include "jio.hpp"
  18. #include "jmisc.hpp"
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jthread.hpp"
  22. #include "jqueue.tpp"
  23. #include "jset.hpp"
  24. #include "jutil.hpp"
  25. #ifdef _USE_TBB
  26. #include "tbb/task.h"
  27. #include "tbb/task_scheduler_init.h"
  28. #include "tbb/parallel_sort.h"
  29. #endif
  30. #ifdef _DEBUG
  31. // #define PARANOID
  32. //#define TESTPARSORT
  33. //#define MCMERGESTATS
  34. #endif
  35. //#define PARANOID_PARTITION
  36. //#define TRACE_PARTITION
  37. #define PARALLEL_GRANULARITY 1024
  38. static bool sortParallel(unsigned &numcpus)
  39. {
  40. static unsigned numCPUs = 0;
  41. if (numCPUs==0) {
  42. numCPUs = getAffinityCpus();
  43. }
  44. if ((numcpus==0)||(numcpus>numCPUs))
  45. numcpus = numCPUs;
  46. #ifdef TESTPARSORT
  47. numcpus = 2;
  48. return true; // to test
  49. #endif
  50. return (numcpus>1);
  51. }
  52. //define two variants of the same insertion function.
  53. #define COMPARE(search,position) compare(search,position)
  54. #define INSERT(position,search) memmove(position,search, width)
  55. void * binary_add(const void *newitem, const void *base,
  56. size32_t nmemb,
  57. size32_t width,
  58. int ( *compare)(const void *_e1, const void *_e2),
  59. bool * ItemAdded)
  60. {
  61. #include "jsort.inc"
  62. }
  63. #undef COMPARE
  64. #undef INSERT
  65. //---------------------------------------------------------------------------
  66. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  67. #define INSERT(position,search) *(const void * *)(position) = search
  68. #define NEVER_ADD
  69. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  70. size32_t nmemb,
  71. sortCompareFunction compare,
  72. bool * ItemAdded)
  73. {
  74. #define width sizeof(void*)
  75. #include "jsort.inc"
  76. #undef width
  77. }
  78. #undef NEVER_ADD
  79. #undef INSERT
  80. #undef COMPARE
  81. //---------------------------------------------------------------------------
  82. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  83. #define INSERT(position,search) *(const void * *)(position) = search
  84. #define NEVER_ADD
  85. extern jlib_decl void * binary_vec_find(const void *newitem, const void * * base,
  86. size32_t nmemb,
  87. ICompare & compare,
  88. bool * ItemAdded)
  89. {
  90. #define width sizeof(void*)
  91. #include "jsort.inc"
  92. #undef width
  93. }
  94. #undef NEVER_ADD
  95. #undef INSERT
  96. #undef COMPARE
  97. //---------------------------------------------------------------------------
  98. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  99. #define INSERT(position,search) *(const void * *)(position) = search
  100. #define ALWAYS_ADD
  101. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  102. size32_t nmemb,
  103. sortCompareFunction compare)
  104. {
  105. #define width sizeof(void*)
  106. #include "jsort.inc"
  107. #undef width
  108. }
  109. #undef ALWAYS_ADD
  110. #undef INSERT
  111. #undef COMPARE
  112. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  113. #define INSERT(position,search) *(const void * *)(position) = search
  114. #define ALWAYS_ADD
  115. extern jlib_decl void * binary_vec_insert(const void *newitem, const void * * base,
  116. size32_t nmemb,
  117. ICompare const & compare)
  118. {
  119. #define width sizeof(void*)
  120. #include "jsort.inc"
  121. #undef width
  122. }
  123. #undef ALWAYS_ADD
  124. #undef INSERT
  125. #undef COMPARE
  126. #define COMPARE(search,position) compare(search,*(const void * *)(position))
  127. #define INSERT(position,search) *(const void * *)(position) = search
  128. #define ALWAYS_ADD
  129. #define SEEK_LAST_MATCH
  130. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  131. size32_t nmemb,
  132. sortCompareFunction compare)
  133. {
  134. #define width sizeof(void*)
  135. #include "jsort.inc"
  136. #undef width
  137. }
  138. #undef SEEK_LAST_MATCH
  139. #undef ALWAYS_ADD
  140. #undef INSERT
  141. #undef COMPARE
  142. #define COMPARE(search,position) compare.docompare(search,*(const void * *)(position))
  143. #define INSERT(position,search) *(const void * *)(position) = search
  144. #define ALWAYS_ADD
  145. #define SEEK_LAST_MATCH
  146. extern jlib_decl void * binary_vec_insert_stable(const void *newitem, const void * * base,
  147. size32_t nmemb,
  148. ICompare const & compare)
  149. {
  150. #define width sizeof(void*)
  151. #include "jsort.inc"
  152. #undef width
  153. }
  154. #undef SEEK_LAST_MATCH
  155. #undef ALWAYS_ADD
  156. #undef INSERT
  157. #undef COMPARE
  158. //=========================================================================
  159. // optimized quicksort for array of pointers to fixed size objects
  160. typedef void * ELEMENT;
  161. typedef void ** _VECTOR; // bit messy but allow to be redefined later
  162. #define VECTOR _VECTOR
  163. static inline void swap(VECTOR a, VECTOR b) { ELEMENT t = *a; *a = *b; *b = t; }
  164. #define SWAP swap
  165. #define CMP(a,b) memcmp(*(a),*(b),es)
  166. #define MED3(a,b,c) med3a(a,b,c,es)
  167. #define RECURSE(a,b) qsortvec(a, b, es)
  168. static inline VECTOR med3a(VECTOR a, VECTOR b, VECTOR c, size32_t es)
  169. {
  170. return CMP(a, b) < 0 ?
  171. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  172. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  173. }
  174. void qsortvec(void **a, size32_t n, size32_t es)
  175. #include "jsort2.inc"
  176. #undef CMP
  177. #undef MED3
  178. #undef RECURSE
  179. //---------------------------------------------------------------------------
  180. #define CMP(a,b) (compare(*(a),*(b)))
  181. #define MED3(a,b,c) med3c(a,b,c,compare)
  182. #define RECURSE(a,b) qsortvec(a, b, compare)
  183. static inline VECTOR med3c(VECTOR a, VECTOR b, VECTOR c, sortCompareFunction compare)
  184. {
  185. return CMP(a, b) < 0 ?
  186. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  187. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  188. }
  189. void qsortvec(void **a, size32_t n, sortCompareFunction compare)
  190. #include "jsort2.inc"
  191. #undef CMP
  192. #undef MED3
  193. #undef RECURSE
  194. //---------------------------------------------------------------------------
  195. // tbb versions of the quick sort to provide a useful base comparison
  196. class TbbCompareWrapper
  197. {
  198. public:
  199. TbbCompareWrapper(const ICompare & _compare) : compare(_compare) {}
  200. bool operator()(void * const & l, void * const & r) const { return compare.docompare(l, r) < 0; }
  201. const ICompare & compare;
  202. };
  203. class TbbCompareIndirectWrapper
  204. {
  205. public:
  206. TbbCompareIndirectWrapper(const ICompare & _compare) : compare(_compare) {}
  207. bool operator()(void * * const & l, void * * const & r) const
  208. {
  209. int ret = compare.docompare(*l,*r);
  210. if (ret==0)
  211. {
  212. if (l < r)
  213. return true;
  214. else
  215. return false;
  216. }
  217. return (ret < 0);
  218. }
  219. const ICompare & compare;
  220. };
  221. void tbbqsortvec(void **a, size_t n, const ICompare & compare)
  222. {
  223. #ifdef _USE_TBB
  224. TbbCompareWrapper tbbcompare(compare);
  225. tbb::parallel_sort(a, a+n, tbbcompare);
  226. #else
  227. throwUnexpectedX("TBB quicksort not available");
  228. #endif
  229. }
  230. void tbbqsortstable(void ** rows, size_t n, const ICompare & compare, void ** temp)
  231. {
  232. #ifdef _USE_TBB
  233. void * * * rowsAsIndex = (void * * *)rows;
  234. memcpy(temp, rows, n * sizeof(void*));
  235. for(unsigned i=0; i<n; ++i)
  236. rowsAsIndex[i] = temp+i;
  237. TbbCompareIndirectWrapper tbbcompare(compare);
  238. tbb::parallel_sort(rowsAsIndex, rowsAsIndex+n, tbbcompare);
  239. //I'm sure this violates the aliasing rules...
  240. for(unsigned i=0; i<n; ++i)
  241. rows[i] = *rowsAsIndex[i];
  242. #else
  243. throwUnexpectedX("TBB quicksort not available");
  244. #endif
  245. }
  246. //---------------------------------------------------------------------------
  247. #define CMP(a,b) (compare.docompare(*(a),*(b)))
  248. #define MED3(a,b,c) med3ic(a,b,c,compare)
  249. #define RECURSE(a,b) qsortvec(a, b, compare)
  250. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  251. {
  252. return CMP(a, b) < 0 ?
  253. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  254. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  255. }
  256. void qsortvec(void **a, size32_t n, const ICompare & compare)
  257. #include "jsort2.inc"
  258. // Parallel version (only 2 threads currently)
  259. class cParQSortBase
  260. {
  261. struct sJobItem
  262. {
  263. unsigned start;
  264. unsigned num;
  265. };
  266. NonReentrantSpinLock joblock;
  267. QueueOf<sJobItem,false> jobq;
  268. Semaphore jobqsem;
  269. unsigned waiting;
  270. unsigned numsubthreads;
  271. bool done;
  272. class cThread: public Thread
  273. {
  274. cParQSortBase *parent;
  275. public:
  276. cThread(cParQSortBase *_parent)
  277. : Thread("cParQSort")
  278. {
  279. parent = _parent;
  280. }
  281. int run()
  282. {
  283. parent->run();
  284. return 0;
  285. }
  286. } **threads;
  287. bool waitForWork(unsigned &s,unsigned &n)
  288. {
  289. NonReentrantSpinBlock block(joblock);
  290. while (!done) {
  291. sJobItem *qi = jobq.dequeue();
  292. if (qi) {
  293. s = qi->start;
  294. n = qi->num;
  295. delete qi;
  296. return true;
  297. }
  298. if (waiting==numsubthreads) { // well we know we are done and so are rest so exit
  299. done = true;
  300. jobqsem.signal(waiting);
  301. break;
  302. }
  303. waiting++;
  304. NonReentrantSpinUnblock unblock(joblock);
  305. jobqsem.wait();
  306. }
  307. s = 0; // remove uninitialised variable warnings
  308. n = 0;
  309. return false;
  310. }
  311. public:
  312. cParQSortBase(unsigned _numsubthreads)
  313. {
  314. numsubthreads = _numsubthreads;
  315. done = false;
  316. waiting = 0;
  317. threads = new cThread*[numsubthreads];
  318. for (unsigned i=0;i<numsubthreads;i++)
  319. threads[i] = new cThread(this);
  320. }
  321. ~cParQSortBase()
  322. {
  323. for (unsigned i=0;i<numsubthreads;i++)
  324. threads[i]->Release();
  325. delete [] threads;
  326. }
  327. void start()
  328. {
  329. for (unsigned i=0;i<numsubthreads;i++)
  330. threads[i]->start();
  331. }
  332. void subsort(unsigned s, unsigned n)
  333. {
  334. do {
  335. sJobItem *qi;
  336. while (n>PARALLEL_GRANULARITY) {
  337. unsigned r1;
  338. unsigned r2;
  339. partition(s, n, r1, r2);
  340. unsigned n2 = n+s-r2;
  341. if (r1==s) {
  342. n = n2;
  343. s = r2;
  344. }
  345. else {
  346. if (n2!=0) {
  347. qi = new sJobItem;
  348. qi->num = n2;
  349. qi->start = r2;
  350. NonReentrantSpinBlock block(joblock);
  351. jobq.enqueue(qi);
  352. if (waiting) {
  353. jobqsem.signal(waiting);
  354. waiting = 0;
  355. }
  356. }
  357. n = r1-s;
  358. }
  359. }
  360. serialsort(s,n);
  361. NonReentrantSpinBlock block(joblock);
  362. if (waiting==numsubthreads) { // well we are done so are rest
  363. done = true;
  364. jobqsem.signal(waiting);
  365. break;
  366. }
  367. }
  368. while(waitForWork(s,n));
  369. }
  370. void run()
  371. {
  372. unsigned s;
  373. unsigned n;
  374. if (waitForWork(s,n))
  375. subsort(s,n);
  376. }
  377. void join()
  378. {
  379. for (unsigned i=0;i<numsubthreads;i++)
  380. threads[i]->join();
  381. }
  382. virtual void serialsort(unsigned from, unsigned len)=0;
  383. virtual void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) = 0; // NB s, r1 and r2 are relative to array
  384. };
  385. #define DOPARTITION \
  386. VECTOR a = array+s; \
  387. VECTOR pm = a + (n / 2); \
  388. VECTOR pl = a; \
  389. VECTOR pn = a + (n - 1) ; \
  390. if (n > 40) { \
  391. unsigned d = (n / 8); \
  392. pl = MED3(pl, pl + d, pl + 2 * d); \
  393. pm = MED3(pm - d, pm, pm + d); \
  394. pn = MED3(pn - 2 * d, pn - d, pn); \
  395. } \
  396. pm = MED3(pl, pm, pn); \
  397. SWAP(a, pm); \
  398. VECTOR pa = a + 1; \
  399. VECTOR pb = pa; \
  400. VECTOR pc = a + (n - 1); \
  401. VECTOR pd = pc; \
  402. int r; \
  403. for (;;) { \
  404. while (pb <= pc && (r = CMP(pb, a)) <= 0) { \
  405. if (r == 0) { \
  406. SWAP(pa, pb); \
  407. pa++; \
  408. } \
  409. pb++; \
  410. } \
  411. while (pb <= pc && (r = CMP(pc, a)) >= 0) { \
  412. if (r == 0) { \
  413. SWAP(pc, pd); \
  414. pd--; \
  415. } \
  416. pc--; \
  417. } \
  418. if (pb > pc) \
  419. break; \
  420. SWAP(pb, pc); \
  421. pb++; \
  422. pc--; \
  423. } \
  424. pn = a + n; \
  425. r = MIN(pa - a, pb - pa); \
  426. VECTOR v1 = a; \
  427. VECTOR v2 = pb-r; \
  428. while (r) { \
  429. SWAP(v1,v2); v1++; v2++; r--; \
  430. }; \
  431. r = MIN(pd - pc, pn - pd - 1); \
  432. v1 = pb; \
  433. v2 = pn-r; \
  434. while (r) { \
  435. SWAP(v1,v2); v1++; v2++; r--; \
  436. }; \
  437. r1 = (pb-pa)+s; \
  438. r2 = n-(pd-pc)+s;
  439. class cParQSort: public cParQSortBase
  440. {
  441. VECTOR array;
  442. const ICompare &compare;
  443. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  444. {
  445. DOPARTITION
  446. }
  447. void serialsort(unsigned from, unsigned len)
  448. {
  449. qsortvec(array+from,len,compare);
  450. }
  451. public:
  452. cParQSort(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  453. : cParQSortBase(_numsubthreads), compare(_compare)
  454. {
  455. array = _a;
  456. cParQSortBase::start();
  457. }
  458. };
  459. void parqsortvec(void **a, size32_t n, const ICompare & compare, unsigned numcpus)
  460. {
  461. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  462. qsortvec(a,n,compare);
  463. return;
  464. }
  465. cParQSort sorter(a,compare,numcpus-1);
  466. sorter.subsort(0,n);
  467. sorter.join();
  468. #ifdef TESTPARSORT
  469. for (unsigned i=1;i<n;i++)
  470. if (compare.docompare(a[i-1],a[i])>0)
  471. ERRLOG("parqsortvec failed %d",i);
  472. #endif
  473. }
  474. #undef CMP
  475. #undef MED3
  476. #undef RECURSE
  477. //---------------------------------------------------------------------------
  478. #undef VECTOR
  479. #undef SWAP
  480. typedef void *** _IVECTOR;
  481. #define VECTOR _IVECTOR
  482. static inline void swapind(VECTOR a, VECTOR b) { void ** t = *a; *a = *b; *b = t; }
  483. #define SWAP swapind
  484. #define CMP(a,b) cmpicindstable(a,b,compare)
  485. static inline int cmpicindstable(VECTOR a, VECTOR b, const ICompare & compare)
  486. {
  487. int ret = compare.docompare(**a,**b);
  488. if (ret==0)
  489. {
  490. if (*a>*b)
  491. ret = 1;
  492. else if (*a<*b)
  493. ret = -1;
  494. }
  495. return ret;
  496. }
  497. #define MED3(a,b,c) med3ic(a,b,c,compare)
  498. #define RECURSE(a,b) doqsortvecstable(a, b, compare)
  499. static inline VECTOR med3ic(VECTOR a, VECTOR b, VECTOR c, const ICompare & compare)
  500. {
  501. return CMP(a, b) < 0 ?
  502. (CMP(b, c) < 0 ? b : (CMP(a, c) < 0 ? c : a ))
  503. : (CMP(b, c) > 0 ? b : (CMP(a, c) < 0 ? a : c ));
  504. }
  505. static void doqsortvecstable(VECTOR a, size32_t n, const ICompare & compare)
  506. #include "jsort2.inc"
  507. class cParQSortStable: public cParQSortBase
  508. {
  509. VECTOR array;
  510. const ICompare &compare;
  511. void partition(unsigned s, unsigned n, unsigned &r1, unsigned &r2) // NB s, r1 and r2 are relative to array
  512. {
  513. DOPARTITION
  514. }
  515. void serialsort(unsigned from, unsigned len)
  516. {
  517. doqsortvecstable(array+from,len,compare);
  518. }
  519. public:
  520. cParQSortStable(VECTOR _a,const ICompare &_compare, unsigned _numsubthreads)
  521. : cParQSortBase(_numsubthreads),compare(_compare)
  522. {
  523. array = _a;
  524. cParQSortBase::start();
  525. }
  526. };
  527. #undef CMP
  528. #undef CMP1
  529. #undef MED3
  530. #undef RECURSE
  531. #undef VECTOR
  532. static void qsortvecstable(void ** const rows, size32_t n, const ICompare & compare, void *** index)
  533. {
  534. for(unsigned i=0; i<n; ++i)
  535. index[i] = rows+i;
  536. doqsortvecstable(index, n, compare);
  537. }
  538. void qsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp)
  539. {
  540. memcpy(temp, rows, n * sizeof(void*));
  541. qsortvecstable(temp, n, compare, (void * * *)rows);
  542. //I'm sure this violates the aliasing rules...
  543. void * * * rowsAsIndex = (void * * *)rows;
  544. for(unsigned i=0; i<n; ++i)
  545. rows[i] = *rowsAsIndex[i];
  546. }
  547. static void parqsortvecstable(void ** rows, size32_t n, const ICompare & compare, void *** index, unsigned numcpus)
  548. {
  549. for(unsigned i=0; i<n; ++i)
  550. index[i] = rows+i;
  551. if ((n<=PARALLEL_GRANULARITY)||!sortParallel(numcpus)) {
  552. doqsortvecstable(index,n,compare);
  553. return;
  554. }
  555. cParQSortStable sorter(index,compare,numcpus-1);
  556. sorter.subsort(0,n);
  557. sorter.join();
  558. }
  559. void parqsortvecstableinplace(void ** rows, size32_t n, const ICompare & compare, void ** temp, unsigned numcpus)
  560. {
  561. memcpy(temp, rows, n * sizeof(void*));
  562. parqsortvecstable(temp, n, compare, (void * * *)rows, numcpus);
  563. //I'm sure this violates the aliasing rules...
  564. void * * * rowsAsIndex = (void * * *)rows;
  565. for(size32_t i=0; i<n; ++i)
  566. rows[i] = *rowsAsIndex[i];
  567. }
  568. //-----------------------------------------------------------------------------------------------------------------------------
  569. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2)
  570. {
  571. void * * tgt = result;
  572. loop
  573. {
  574. if (compare.docompare(*ret1, *ret2) <= 0)
  575. {
  576. *tgt++ = *ret1++;
  577. if (--n1 == 0)
  578. {
  579. //There must be at least one row in the right partition - copy any that remain
  580. do
  581. {
  582. *tgt++ = *ret2++;
  583. } while (--n2);
  584. return result;
  585. }
  586. }
  587. else
  588. {
  589. *tgt++ = *ret2++;
  590. if (--n2 == 0)
  591. {
  592. //There must be at least one row in the left partition - copy any that remain
  593. do
  594. {
  595. *tgt++ = *ret1++;
  596. } while (--n1);
  597. return result;
  598. }
  599. }
  600. }
  601. }
  602. inline void * * mergePartitions(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  603. {
  604. void * * tgt = result;
  605. while (n--)
  606. {
  607. if (compare.docompare(*ret1, *ret2) <= 0)
  608. {
  609. *tgt++ = *ret1++;
  610. if (--n1 == 0)
  611. {
  612. while (n--)
  613. {
  614. *tgt++ = *ret2++;
  615. }
  616. return result;
  617. }
  618. }
  619. else
  620. {
  621. *tgt++ = *ret2++;
  622. if (--n2 == 0)
  623. {
  624. while (n--)
  625. {
  626. *tgt++ = *ret1++;
  627. }
  628. return result;
  629. }
  630. }
  631. }
  632. return result;
  633. }
  634. inline void clonePartition(void * * result, size_t n, void * * src)
  635. {
  636. void * * tgt = result;
  637. while (n--)
  638. *tgt++ = *src++;
  639. }
  640. inline void * * mergePartitionsRev(const ICompare & compare, void * * result, size_t n1, void * * ret1, size_t n2, void * * ret2, size_t n)
  641. {
  642. void * * tgt = result+n1+n2-1;
  643. ret1 += (n1-1);
  644. ret2 += (n2-1);
  645. while (n--)
  646. {
  647. if (compare.docompare(*ret1, *ret2) >= 0)
  648. {
  649. *tgt-- = *ret1--;
  650. if (--n1 == 0)
  651. {
  652. while (n--)
  653. {
  654. *tgt-- = *ret2--;
  655. }
  656. return result;
  657. }
  658. }
  659. else
  660. {
  661. *tgt-- = *ret2--;
  662. if (--n2 == 0)
  663. {
  664. //There must be at least one row in the left partition - copy any that remain
  665. while (n--)
  666. {
  667. *tgt-- = *ret1--;
  668. }
  669. return result;
  670. }
  671. }
  672. }
  673. return result;
  674. }
  675. static void * * mergeSort(void ** rows, size_t n, const ICompare & compare, void ** tmp, unsigned depth)
  676. {
  677. void * * result = (depth & 1) ? tmp : rows;
  678. //This could be coded to perform an "optimal" 3 element compare, but the following code is much simpler,
  679. //and in performance testing it executed marginally more quickly
  680. if (n <= 2)
  681. {
  682. //Check for n == 1, but compare against 2 to avoid another comparison
  683. if (n < 2)
  684. {
  685. if (result != rows)
  686. result[0] = rows[0];
  687. }
  688. else
  689. {
  690. void * left = rows[0];
  691. void * right = rows[1];
  692. if (compare.docompare(left, right) <= 0)
  693. {
  694. result[0] = left;
  695. result[1] = right;
  696. }
  697. else
  698. {
  699. result[0] = right;
  700. result[1] = left;
  701. }
  702. }
  703. return result;
  704. }
  705. size_t n1 = (n+1)/2;
  706. size_t n2 = n - n1;
  707. void * * ret1 = mergeSort(rows, n1, compare, tmp, depth+1);
  708. void * * ret2 = mergeSort(rows+n1, n2, compare, tmp + n1, depth+1);
  709. dbgassertex(ret2 == ret1 + n1);
  710. dbgassertex(ret2 != result);
  711. return mergePartitions(compare, result, n1, ret1, n2, ret2);
  712. }
  713. void msortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp)
  714. {
  715. if (n <= 1)
  716. return;
  717. mergeSort(rows, n, compare, temp, 0);
  718. }
  719. //=========================================================================
  720. #ifdef _USE_TBB
  721. static const unsigned numPartitionSamples = 3;
  722. //These constants are probably architecture and number of core dependent
  723. static const size_t singleThreadedMSortThreshold = 2000;
  724. static const size_t multiThreadedBlockThreshold = 64; // must be at least 2!
  725. using tbb::task;
  726. class TbbParallelMergeSorter
  727. {
  728. class SplitTask : public tbb::task
  729. {
  730. public:
  731. SplitTask(task * _next1, task * _next2) : next1(_next1), next2(_next2)
  732. {
  733. }
  734. virtual task * execute()
  735. {
  736. if (next1->decrement_ref_count() == 0)
  737. spawn(*next1);
  738. if (next2->decrement_ref_count() == 0)
  739. return next2;
  740. return NULL;
  741. }
  742. protected:
  743. task * next1;
  744. task * next2;
  745. };
  746. class BisectTask : public tbb::task
  747. {
  748. public:
  749. BisectTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth, task * _next)
  750. : sorter(_sorter), rows(_rows), temp(_temp), next(_next), n(_n), depth(_depth)
  751. {
  752. }
  753. virtual task * execute()
  754. {
  755. loop
  756. {
  757. //On entry next is assumed to be used once by this function
  758. if ((n <= multiThreadedBlockThreshold) || (depth >= sorter.singleThreadDepth))
  759. {
  760. //Create a new task rather than calling sort directly, so that the successor is set up correctly
  761. //It would be possible to sort then if (next->decrement_ref_count()) return next; instead
  762. task * sort = new (next->allocate_child()) SubSortTask(sorter, rows, n, temp, depth);
  763. return sort;
  764. }
  765. void * * result = (depth & 1) ? temp : rows;
  766. void * * src = (depth & 1) ? rows : temp;
  767. size_t n1 = (n+1)/2;
  768. size_t n2 = n-n1;
  769. task * mergeTask;
  770. if (depth < sorter.parallelMergeDepth)
  771. {
  772. unsigned partitions = sorter.numPartitionCores() >> depth;
  773. if (partitions > 1)
  774. {
  775. PartitionSplitTask * splitTask = new (allocate_root()) PartitionSplitTask(n1, src, n2, src+n1, partitions, sorter.compare);
  776. for (unsigned i=0; i < partitions; i++)
  777. {
  778. MergeTask * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  779. mergeFwdTask->set_ref_count(1);
  780. MergeTask * mergeRevTask = new (allocate_additional_child_of(*next)) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, 0);
  781. mergeRevTask->set_ref_count(1);
  782. splitTask->setTasks(i, mergeFwdTask, mergeRevTask);
  783. }
  784. next->decrement_ref_count();
  785. mergeTask = splitTask;
  786. }
  787. else
  788. {
  789. task * mergeFwdTask = new (allocate_additional_child_of(*next)) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n1);
  790. mergeFwdTask->set_ref_count(1);
  791. task * mergeRevTask = new (next->allocate_child()) MergeRevTask(sorter.compare, result, n1, src, n2, src+n1, n2);
  792. mergeRevTask->set_ref_count(1);
  793. mergeTask = new (allocate_root()) SplitTask(mergeFwdTask, mergeRevTask);
  794. }
  795. }
  796. else
  797. {
  798. mergeTask = new (next->allocate_child()) MergeTask(sorter.compare, result, n1, src, n2, src+n1, n);
  799. }
  800. mergeTask->set_ref_count(2);
  801. task * bisectRightTask = new (allocate_root()) BisectTask(sorter, rows+n1, n2, temp+n1, depth+1, mergeTask);
  802. spawn(*bisectRightTask);
  803. //recurse directly on the left side rather than creating a new task
  804. n = n1;
  805. depth = depth+1;
  806. next = mergeTask;
  807. }
  808. }
  809. protected:
  810. TbbParallelMergeSorter & sorter;
  811. void ** rows;
  812. void ** temp;
  813. task * next;
  814. size_t n;
  815. unsigned depth;
  816. };
  817. class SubSortTask : public tbb::task
  818. {
  819. public:
  820. SubSortTask(TbbParallelMergeSorter & _sorter, void ** _rows, size_t _n, void ** _temp, unsigned _depth)
  821. : sorter(_sorter), rows(_rows), temp(_temp), n(_n), depth(_depth)
  822. {
  823. }
  824. virtual task * execute()
  825. {
  826. mergeSort(rows, n, sorter.compare, temp, depth);
  827. return NULL;
  828. }
  829. protected:
  830. TbbParallelMergeSorter & sorter;
  831. void ** rows;
  832. void ** temp;
  833. size_t n;
  834. unsigned depth;
  835. };
  836. class MergeTask : public tbb::task
  837. {
  838. public:
  839. MergeTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  840. : compare(_compare),result(_result), src1(_src1), src2(_src2), n1(_n1), n2(_n2), n(_n)
  841. {
  842. }
  843. virtual task * execute()
  844. {
  845. //After the ranges are adjusted it is possible for one input to shrink to zero size (e.g., if input is sorted)
  846. if (n1 == 0)
  847. {
  848. assertex(n <= n2);
  849. clonePartition(result, n, src2);
  850. }
  851. else if (n2 == 0)
  852. {
  853. assertex(n <= n1);
  854. clonePartition(result, n, src1);
  855. }
  856. else
  857. mergePartitions(compare, result, n1, src1, n2, src2, n);
  858. return NULL;
  859. }
  860. void adjustRange(size_t deltaLeft, size_t numLeft, size_t deltaRight, size_t numRight, size_t num)
  861. {
  862. src1 += deltaLeft;
  863. n1 = numLeft;
  864. src2 += deltaRight;
  865. n2 = numRight;
  866. result += (deltaLeft + deltaRight);
  867. n = num;
  868. }
  869. protected:
  870. const ICompare & compare;
  871. void * * result;
  872. void * * src1;
  873. void * * src2;
  874. size_t n1;
  875. size_t n2;
  876. size_t n;
  877. };
  878. class MergeRevTask : public MergeTask
  879. {
  880. public:
  881. MergeRevTask(const ICompare & _compare, void * * _result, size_t _n1, void * * _src1, size_t _n2, void * * _src2, size_t _n)
  882. : MergeTask(_compare, _result, _n1, _src1, _n2, _src2, _n)
  883. {
  884. }
  885. virtual task * execute()
  886. {
  887. if (n1 == 0)
  888. {
  889. assertex(n <= n2);
  890. //This is a reverse merge, so copy n from the end of the input
  891. unsigned delta = n2 - n;
  892. clonePartition(result + delta, n, src2 + delta);
  893. }
  894. else if (n2 == 0)
  895. {
  896. assertex(n <= n1);
  897. unsigned delta = n1 - n;
  898. clonePartition(result + delta, n, src1 + delta);
  899. }
  900. else
  901. mergePartitionsRev(compare, result, n2, src2, n1, src1, n);
  902. return NULL;
  903. }
  904. };
  905. class PartitionSplitTask : public tbb::task
  906. {
  907. public:
  908. PartitionSplitTask(size_t _n1, void * * _src1, size_t _n2, void * * _src2, unsigned _numPartitions, const ICompare & _compare)
  909. : compare(_compare), numPartitions(_numPartitions), n1(_n1), n2(_n2), src1(_src1), src2(_src2)
  910. {
  911. //These could be local variables in calculatePartitions(), but placed here to simplify cleanup. (Should consider using alloca)
  912. posLeft = new size_t[numPartitions+1];
  913. posRight = new size_t[numPartitions+1];
  914. tasks = new MergeTask *[numPartitions*2];
  915. for (unsigned i=0; i < numPartitions*2; i++)
  916. tasks[i] = NULL;
  917. }
  918. ~PartitionSplitTask()
  919. {
  920. delete [] posLeft;
  921. delete [] posRight;
  922. delete [] tasks;
  923. }
  924. void calculatePartitions()
  925. {
  926. #ifdef PARANOID_PARTITION
  927. {
  928. for (unsigned ix=1; ix<n1; ix++)
  929. if (compare.docompare(src1[ix-1], src1[ix]) > 0)
  930. DBGLOG("Failure left@%u", ix);
  931. }
  932. {
  933. for (unsigned ix=1; ix<n2; ix++)
  934. if (compare.docompare(src2[ix-1], src2[ix]) > 0)
  935. DBGLOG("Failure right@%u", ix);
  936. }
  937. #endif
  938. //If dividing into P parts, select S*P-1 even points from each side.
  939. unsigned numSamples = numPartitionSamples*numPartitions-1;
  940. QuantilePositionIterator iterLeft(n1, numSamples+1, false);
  941. QuantilePositionIterator iterRight(n2, numSamples+1, false);
  942. iterLeft.first();
  943. iterRight.first();
  944. size_t prevLeft = 0;
  945. size_t prevRight =0;
  946. posLeft[0] = 0;
  947. posRight[0] = 0;
  948. //From the merged list, for sample i [zero based], we can guarantee that there are at least (i+1)*(n1+n2)/numSamples*2
  949. //rows before sample i, and at most (i+2)*(n1+n2)/numSamples*2 samples after it.
  950. //=> pick samples [0, 2*numSamples, 4*numSamples ...]
  951. //NOTE: Include elements at position 0 to ensure sorted inputs are partitioned evenly
  952. for (unsigned part = 1; part < numPartitions; part++)
  953. {
  954. unsigned numToSkip = numPartitionSamples*2;
  955. if (part == 1)
  956. numToSkip++;
  957. for (unsigned skip=numToSkip; skip-- != 0; )
  958. {
  959. size_t leftPos = iterLeft.get();
  960. size_t rightPos = iterRight.get();
  961. int c;
  962. if (leftPos == n1)
  963. c = +1;
  964. else if (rightPos == n2)
  965. c = -1;
  966. else
  967. c = compare.docompare(src1[leftPos], src2[rightPos]);
  968. if (skip == 0)
  969. {
  970. if (c <= 0)
  971. {
  972. //value in left is smallest. Find the position of the value <= the left value
  973. posLeft[part] = leftPos;
  974. size_t matchRight = findFirstGE(src1[leftPos], prevRight, rightPos, src2);
  975. posRight[part] = matchRight;
  976. prevRight = matchRight; // potentially reduce the search range next time
  977. }
  978. else
  979. {
  980. size_t matchLeft = findFirstGT(src2[rightPos], prevLeft, leftPos, src1);
  981. posLeft[part] = matchLeft;
  982. posRight[part] = rightPos;
  983. prevLeft = matchLeft; // potentially reduce the search range next time
  984. }
  985. }
  986. if (c <= 0)
  987. {
  988. iterLeft.next();
  989. prevLeft = leftPos;
  990. }
  991. else
  992. {
  993. iterRight.next();
  994. prevRight = rightPos;
  995. }
  996. }
  997. }
  998. posLeft[numPartitions] = n1;
  999. posRight[numPartitions] = n2;
  1000. #ifdef TRACE_PARTITION
  1001. DBGLOG("%d,%d -> {", (unsigned)n1, (unsigned)n2);
  1002. #endif
  1003. for (unsigned i= 0; i < numPartitions; i++)
  1004. {
  1005. size_t start = posLeft[i] + posRight[i];
  1006. size_t end = posLeft[i+1] + posRight[i+1];
  1007. size_t num = end - start;
  1008. size_t numFwd = num/2;
  1009. #ifdef TRACE_PARTITION
  1010. DBGLOG(" ([%d..%d],[%d..%d] %d,%d = %d)",
  1011. (unsigned)posLeft[i], (unsigned)posLeft[i+1], (unsigned)posRight[i], (unsigned)posRight[i+1],
  1012. (unsigned)start, (unsigned)end, (unsigned)num);
  1013. #endif
  1014. MergeTask & mergeFwdTask = *tasks[i*2];
  1015. MergeTask & mergeRevTask = *tasks[i*2+1];
  1016. mergeFwdTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1017. posRight[i], posRight[i+1]-posRight[i],
  1018. numFwd);
  1019. mergeRevTask.adjustRange(posLeft[i], posLeft[i+1]-posLeft[i],
  1020. posRight[i], posRight[i+1]-posRight[i],
  1021. num-numFwd);
  1022. }
  1023. }
  1024. virtual task * execute()
  1025. {
  1026. calculatePartitions();
  1027. for (unsigned i=0; i < numPartitions*2; i++)
  1028. {
  1029. if (tasks[i]->decrement_ref_count() == 0)
  1030. spawn(*tasks[i]);
  1031. }
  1032. return NULL;
  1033. }
  1034. void setTasks(unsigned i, MergeTask * fwd, MergeTask * rev)
  1035. {
  1036. tasks[i*2] = fwd;
  1037. tasks[i*2+1] = rev;
  1038. }
  1039. protected:
  1040. size_t findFirstGE(void * seek, size_t low, size_t high, void * * rows)
  1041. {
  1042. if (low == high)
  1043. return low;
  1044. while (high - low > 1)
  1045. {
  1046. size_t mid = low + (high - low) / 2;
  1047. if (compare.docompare(rows[mid], seek) < 0)
  1048. low = mid;
  1049. else
  1050. high = mid;
  1051. }
  1052. if (compare.docompare(rows[low], seek) < 0)
  1053. return low+1;
  1054. return low;
  1055. }
  1056. size_t findFirstGT(void * seek, size_t low, size_t high, void * * rows)
  1057. {
  1058. if (low == high)
  1059. return low;
  1060. while (high - low > 1)
  1061. {
  1062. size_t mid = low + (high - low) / 2;
  1063. if (compare.docompare(rows[mid], seek) <= 0)
  1064. low = mid;
  1065. else
  1066. high = mid;
  1067. }
  1068. if (compare.docompare(rows[low], seek) <= 0)
  1069. return low+1;
  1070. return low;
  1071. }
  1072. protected:
  1073. const ICompare & compare;
  1074. unsigned numPartitions;
  1075. size_t n1;
  1076. size_t n2;
  1077. void * * src1;
  1078. void * * src2;
  1079. size_t * posLeft;
  1080. size_t * posRight;
  1081. MergeTask * * tasks;
  1082. };
  1083. public:
  1084. TbbParallelMergeSorter(void * * _rows, const ICompare & _compare) : compare(_compare), baseRows(_rows)
  1085. {
  1086. //The following constants control the number of iterations to be performed in parallel.
  1087. //The sort is split into more parts than there are cpus so that the effect of delays from one task tend to be evened out.
  1088. //The following constants should possibly be tuned on each platform. The following gave a good balance on a 2x8way xeon
  1089. const unsigned extraBisectDepth = 3;
  1090. const unsigned extraParallelMergeDepth = 3;
  1091. unsigned numCpus = tbb::task_scheduler_init::default_num_threads();
  1092. unsigned ln2NumCpus = (numCpus <= 1) ? 0 : getMostSignificantBit(numCpus-1);
  1093. assertex(numCpus <= (1U << ln2NumCpus));
  1094. //Merge in parallel once it is likely to be beneficial
  1095. parallelMergeDepth = ln2NumCpus+ extraParallelMergeDepth;
  1096. //Aim to execute in parallel until the width is 8*the maximum number of parallel task
  1097. singleThreadDepth = ln2NumCpus + extraBisectDepth;
  1098. partitionCores = numCpus / 2;
  1099. }
  1100. unsigned numPartitionCores() const { return partitionCores; }
  1101. void sortRoot(void ** rows, size_t n, void ** temp)
  1102. {
  1103. task * end = new (task::allocate_root()) tbb::empty_task();
  1104. end->set_ref_count(1+1);
  1105. task * task = new (task::allocate_root()) BisectTask(*this, rows, n, temp, 0, end);
  1106. end->spawn(*task);
  1107. end->wait_for_all();
  1108. end->destroy(*end);
  1109. }
  1110. public:
  1111. const ICompare & compare;
  1112. unsigned singleThreadDepth;
  1113. unsigned parallelMergeDepth;
  1114. unsigned partitionCores;
  1115. void * * baseRows;
  1116. };
  1117. //-------------------------------------------------------------------------------------------------------------------
  1118. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1119. {
  1120. if ((n <= singleThreadedMSortThreshold) || ncpus == 1)
  1121. {
  1122. msortvecstableinplace(rows, n, compare, temp);
  1123. return;
  1124. }
  1125. TbbParallelMergeSorter sorter(rows, compare);
  1126. sorter.sortRoot(rows, n, temp);
  1127. }
  1128. #else
  1129. void parmsortvecstableinplace(void ** rows, size_t n, const ICompare & compare, void ** temp, unsigned ncpus)
  1130. {
  1131. parqsortvecstableinplace(rows, (size32_t)n, compare, temp, ncpus);
  1132. }
  1133. #endif
  1134. //=========================================================================
  1135. bool heap_push_down(unsigned p, unsigned num, unsigned * heap, const void ** rows, ICompare * compare)
  1136. {
  1137. bool nochange = true;
  1138. while(1)
  1139. {
  1140. unsigned c = p*2 + 1;
  1141. if(c >= num)
  1142. return nochange;
  1143. if(c+1 < num)
  1144. {
  1145. int childcmp = compare->docompare(rows[heap[c+1]], rows[heap[c]]);
  1146. if((childcmp < 0) || ((childcmp == 0) && (heap[c+1] < heap[c])))
  1147. ++c;
  1148. }
  1149. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1150. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1151. return nochange;
  1152. nochange = false;
  1153. unsigned r = heap[c];
  1154. heap[c] = heap[p];
  1155. heap[p] = r;
  1156. p = c;
  1157. }
  1158. }
  1159. bool heap_push_up(unsigned c, unsigned * heap, const void ** rows, ICompare * compare)
  1160. {
  1161. bool nochange = true;
  1162. while(c > 0)
  1163. {
  1164. unsigned p = (unsigned)(c-1)/2;
  1165. int cmp = compare->docompare(rows[heap[c]], rows[heap[p]]);
  1166. if((cmp > 0) || ((cmp == 0) && (heap[c] > heap[p])))
  1167. return nochange;
  1168. nochange = false;
  1169. unsigned r = heap[c];
  1170. heap[c] = heap[p];
  1171. heap[p] = r;
  1172. c = p;
  1173. }
  1174. return nochange;
  1175. }
  1176. //=========================================================================
  1177. #include <assert.h>
  1178. #include <stdio.h>
  1179. #include <stdlib.h>
  1180. #include <fcntl.h>
  1181. #include <string.h>
  1182. #include <stddef.h>
  1183. #include <time.h>
  1184. #ifdef _WIN32
  1185. #include <io.h>
  1186. #include <sys\types.h>
  1187. #include <sys\stat.h>
  1188. #else
  1189. #include <sys/types.h>
  1190. #include <sys/stat.h>
  1191. #endif
  1192. #ifndef off_t
  1193. #define off_t __int64
  1194. #endif
  1195. typedef void ** VECTOR;
  1196. interface IMergeSorter
  1197. {
  1198. public:
  1199. virtual IWriteSeq *getOutputStream(bool isEOF) = 0;
  1200. };
  1201. #define INSERTMAX 10000
  1202. #define BUFFSIZE 0x100000 // used for output buffer
  1203. //==================================================================================================
  1204. class CRowStreamMerger
  1205. {
  1206. const void **pending;
  1207. size32_t *recsize;
  1208. unsigned *mergeheap;
  1209. unsigned activeInputs;
  1210. count_t recno;
  1211. const ICompare *icmp;
  1212. bool partdedup;
  1213. IRowProvider &provider;
  1214. MemoryAttr workingbuf;
  1215. #ifdef _DEBUG
  1216. bool *stopped;
  1217. MemoryAttr ma;
  1218. #endif
  1219. inline int buffCompare(unsigned a, unsigned b)
  1220. {
  1221. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::buffCompare");
  1222. return icmp->docompare(pending[mergeheap[a]], pending[mergeheap[b]]);
  1223. }
  1224. bool promote(unsigned p)
  1225. {
  1226. activeInputs--;
  1227. if(activeInputs == p)
  1228. return false;
  1229. mergeheap[p] = mergeheap[activeInputs];
  1230. return true;
  1231. }
  1232. inline bool siftDown(unsigned p)
  1233. {
  1234. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDown");
  1235. // assuming that all descendants of p form a heap, sift p down to its correct position, and so include it in the heap
  1236. bool nochange = true;
  1237. while(1)
  1238. {
  1239. unsigned c = p*2 + 1;
  1240. if(c >= activeInputs)
  1241. return nochange;
  1242. if(c+1 < activeInputs)
  1243. {
  1244. int childcmp = buffCompare(c+1, c);
  1245. if((childcmp < 0) || ((childcmp == 0) && (mergeheap[c+1] < mergeheap[c])))
  1246. ++c;
  1247. }
  1248. int cmp = buffCompare(c, p);
  1249. if((cmp > 0) || ((cmp == 0) && (mergeheap[c] > mergeheap[p])))
  1250. return nochange;
  1251. nochange = false;
  1252. unsigned r = mergeheap[c];
  1253. mergeheap[c] = mergeheap[p];
  1254. mergeheap[p] = r;
  1255. p = c;
  1256. }
  1257. }
  1258. void siftDownDedupTop()
  1259. {
  1260. //MTIME_SECTION(defaultTimer, "CJStreamMergerBase::siftDownDedupTop");
  1261. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  1262. if(activeInputs < 2)
  1263. return;
  1264. unsigned c = 1;
  1265. int childcmp = 1;
  1266. if(activeInputs >= 3)
  1267. {
  1268. childcmp = buffCompare(2, 1);
  1269. if(childcmp < 0)
  1270. c = 2;
  1271. }
  1272. int cmp = buffCompare(c, 0);
  1273. if(cmp > 0)
  1274. return;
  1275. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  1276. while(cmp <= 0)
  1277. {
  1278. if(cmp == 0)
  1279. {
  1280. if(mergeheap[c] < mergeheap[0])
  1281. {
  1282. unsigned r = mergeheap[c];
  1283. mergeheap[c] = mergeheap[0];
  1284. mergeheap[0] = r;
  1285. }
  1286. if(!pullInput(mergeheap[c]))
  1287. if(!promote(c))
  1288. break;
  1289. siftDown(c);
  1290. }
  1291. else
  1292. {
  1293. unsigned r = mergeheap[c];
  1294. mergeheap[c] = mergeheap[0];
  1295. mergeheap[0] = r;
  1296. if(siftDown(c))
  1297. break;
  1298. }
  1299. cmp = buffCompare(c, 0);
  1300. }
  1301. // the following loop ensures the uniqueness property holds on the other branch too
  1302. c = 3-c;
  1303. if(activeInputs <= c)
  1304. return;
  1305. while(childcmp == 0)
  1306. {
  1307. if(mergeheap[c] < mergeheap[0])
  1308. {
  1309. unsigned r = mergeheap[c];
  1310. mergeheap[c] = mergeheap[0];
  1311. mergeheap[0] = r;
  1312. }
  1313. if(!pullInput(mergeheap[c]))
  1314. if(!promote(c))
  1315. break;
  1316. siftDown(c);
  1317. childcmp = buffCompare(c, 0);
  1318. }
  1319. }
  1320. void init()
  1321. {
  1322. if (activeInputs>0) {
  1323. // setup heap property
  1324. if (activeInputs >= 2)
  1325. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  1326. siftDown(p);
  1327. if (partdedup)
  1328. siftDownDedupTop();
  1329. else
  1330. siftDown(0);
  1331. }
  1332. recno = 0;
  1333. }
  1334. inline count_t num() const { return recno; }
  1335. inline bool _next()
  1336. {
  1337. if (!activeInputs)
  1338. return false;
  1339. if (recno) {
  1340. if(!pullInput(mergeheap[0]))
  1341. if(!promote(0))
  1342. return false;
  1343. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  1344. if(partdedup)
  1345. siftDownDedupTop();
  1346. else
  1347. siftDown(0);
  1348. }
  1349. recno++;
  1350. return true;
  1351. }
  1352. inline bool eof() const { return activeInputs==0; }
  1353. bool pullInput(unsigned i)
  1354. {
  1355. if (pending[i]) {
  1356. assertex(partdedup);
  1357. provider.releaseRow(pending[i]);
  1358. }
  1359. pending[i] = provider.nextRow(i);
  1360. if (pending[i])
  1361. return true;
  1362. provider.stop(i);
  1363. #ifdef _DEBUG
  1364. assertex(!stopped[i]);
  1365. stopped[i] = true;
  1366. #endif
  1367. return false;
  1368. }
  1369. public:
  1370. CRowStreamMerger(IRowProvider &_provider,unsigned numstreams, const ICompare *_icmp,bool _partdedup=false)
  1371. : provider(_provider)
  1372. {
  1373. partdedup = _partdedup;
  1374. icmp = _icmp;
  1375. recsize = NULL;
  1376. activeInputs = 0;
  1377. #ifdef _DEBUG
  1378. stopped = (bool *)ma.allocate(numstreams*sizeof(bool));
  1379. memset(stopped,0,numstreams*sizeof(bool));
  1380. #endif
  1381. unsigned i;
  1382. recsize = NULL;
  1383. if (numstreams) {
  1384. byte *buf = (byte *)workingbuf.allocate(numstreams*(sizeof(void *)+sizeof(unsigned)));
  1385. pending = (const void **)buf;
  1386. mergeheap = (unsigned *)(pending+numstreams);
  1387. for (i=0;i<numstreams;i++) {
  1388. pending[i] = NULL;
  1389. if (pullInput(i))
  1390. mergeheap[activeInputs++] = i;
  1391. }
  1392. }
  1393. else {
  1394. pending = NULL;
  1395. mergeheap = NULL;
  1396. }
  1397. init();
  1398. }
  1399. void stop()
  1400. {
  1401. while (activeInputs) {
  1402. activeInputs--;
  1403. if (pending[mergeheap[activeInputs]]) {
  1404. provider.releaseRow(pending[mergeheap[activeInputs]]);
  1405. #ifdef _DEBUG
  1406. assertex(!stopped[mergeheap[activeInputs]]);
  1407. stopped[mergeheap[activeInputs]] = true;
  1408. #endif
  1409. provider.stop(mergeheap[activeInputs]);
  1410. }
  1411. }
  1412. pending = NULL;
  1413. mergeheap = NULL;
  1414. workingbuf.clear();
  1415. }
  1416. ~CRowStreamMerger()
  1417. {
  1418. stop();
  1419. }
  1420. inline const void * next()
  1421. {
  1422. if (!_next())
  1423. return NULL;
  1424. unsigned strm = mergeheap[0];
  1425. const void *row = pending[strm];
  1426. pending[strm] = NULL;
  1427. return row;
  1428. }
  1429. };
  1430. class CMergeRowStreams : public CInterface, implements IRowStream
  1431. {
  1432. protected:
  1433. CRowStreamMerger *merger;
  1434. bool eos;
  1435. class cProvider: public CInterface, implements IRowProvider
  1436. {
  1437. IArrayOf<IRowStream> ostreams;
  1438. IRowStream **streams;
  1439. Linked<IRowLinkCounter> linkcounter;
  1440. const void *nextRow(unsigned idx)
  1441. {
  1442. return streams[idx]->nextRow();
  1443. };
  1444. void linkRow(const void *row)
  1445. {
  1446. linkcounter->linkRow(row);
  1447. }
  1448. void releaseRow(const void *row)
  1449. {
  1450. linkcounter->releaseRow(row);
  1451. }
  1452. void stop(unsigned idx)
  1453. {
  1454. streams[idx]->stop();
  1455. }
  1456. public:
  1457. IMPLEMENT_IINTERFACE;
  1458. cProvider(IRowStream **_streams, unsigned numstreams, IRowLinkCounter *_linkcounter)
  1459. : linkcounter(_linkcounter)
  1460. {
  1461. ostreams.ensure(numstreams);
  1462. unsigned n = 0;
  1463. while (n<numstreams)
  1464. ostreams.append(*LINK(_streams[n++]));
  1465. streams = ostreams.getArray();
  1466. }
  1467. } *streamprovider;
  1468. public:
  1469. CMergeRowStreams(unsigned _numstreams,IRowStream **_instreams,ICompare *_icmp, bool partdedup, IRowLinkCounter *_linkcounter)
  1470. {
  1471. streamprovider = new cProvider(_instreams, _numstreams, _linkcounter);
  1472. merger = new CRowStreamMerger(*streamprovider,_numstreams,_icmp,partdedup);
  1473. eos = _numstreams==0;
  1474. }
  1475. CMergeRowStreams(unsigned _numstreams,IRowProvider &_provider,ICompare *_icmp, bool partdedup)
  1476. {
  1477. streamprovider = NULL;
  1478. merger = new CRowStreamMerger(_provider,_numstreams,_icmp,partdedup);
  1479. eos = _numstreams==0;
  1480. }
  1481. ~CMergeRowStreams()
  1482. {
  1483. delete merger;
  1484. delete streamprovider;
  1485. }
  1486. IMPLEMENT_IINTERFACE;
  1487. void stop()
  1488. {
  1489. if (!eos) {
  1490. merger->stop();
  1491. eos = true;
  1492. }
  1493. }
  1494. const void *nextRow()
  1495. {
  1496. if (eos)
  1497. return NULL;
  1498. const void *r = merger->next();
  1499. if (!r) {
  1500. stop(); // think ok to stop early
  1501. return NULL;
  1502. }
  1503. return r;
  1504. }
  1505. };
  1506. IRowStream *createRowStreamMerger(unsigned numstreams,IRowStream **instreams,ICompare *icmp,bool partdedup,IRowLinkCounter *linkcounter)
  1507. {
  1508. return new CMergeRowStreams(numstreams,instreams,icmp,partdedup,linkcounter);
  1509. }
  1510. IRowStream *createRowStreamMerger(unsigned numstreams,IRowProvider &provider,ICompare *icmp,bool partdedup)
  1511. {
  1512. return new CMergeRowStreams(numstreams,provider,icmp,partdedup);
  1513. }