empq_impl.h 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef __EMPQ_IMPL_H
  31. #define __EMPQ_IMPL_H
  32. #include <ostream>
  33. #include <vector>
  34. using namespace std;
  35. #include "empq.h"
  36. #if(0)
  37. #include "option.H"
  38. #define MY_LOG_DEBUG_ID(x) \
  39. if(GETOPT("debug")) cerr << __FILE__ << ":" << __LINE__<< " " << x << endl;
  40. #endif
  41. #undef XXX
  42. #define XXX if(0)
  43. #define MY_LOG_DEBUG_ID(x)
  44. /*****************************************************************/
  45. /* encapsulation of the element=<key/prio, data> together with <buffer_id>
  46. and <stream_id>; used during stream merging to remember where each
  47. key comes from;
  48. assumes that class T implements: Key getPriority()
  49. implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
  50. */
  51. template<class T,class Key>
  52. class ExtendedEltMergeType {
  53. private:
  54. T x;
  55. unsigned short buf_id;
  56. unsigned int str_id;
  57. public:
  58. ExtendedEltMergeType() {}
  59. ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid):
  60. x(e), buf_id(bid), str_id(sid) {}
  61. ~ExtendedEltMergeType() {}
  62. void set (T &e, unsigned short bid, unsigned int sid) {
  63. x = e;
  64. buf_id = bid;
  65. str_id = sid;
  66. }
  67. T elt() const {
  68. return x;
  69. }
  70. unsigned short buffer_id() const {
  71. return buf_id;
  72. }
  73. unsigned int stream_id() const {
  74. return str_id;
  75. }
  76. Key getPriority() const {
  77. return x.getPriority();
  78. }
  79. //print
  80. friend ostream& operator<<(ostream& s,
  81. const ExtendedEltMergeType<T,Key> &elt) {
  82. return s << "<buf_id=" << elt.buf_id
  83. << ",str_id=" << elt.str_id << "> "
  84. << elt.x << " ";
  85. }
  86. friend int operator < (const ExtendedEltMergeType<T,Key> &e1,
  87. const ExtendedEltMergeType<T,Key> &e2) {
  88. return (e1.getPriority() < e2.getPriority());
  89. }
  90. friend int operator <= (const ExtendedEltMergeType<T,Key> &e1,
  91. const ExtendedEltMergeType<T,Key> &e2) {
  92. return (e1.getPriority() <= e2.getPriority());
  93. }
  94. friend int operator > (const ExtendedEltMergeType<T,Key> &e1,
  95. const ExtendedEltMergeType<T,Key> &e2) {
  96. return (e1.getPriority() > e2.getPriority());
  97. }
  98. friend int operator >= (const ExtendedEltMergeType<T,Key> &e1,
  99. const ExtendedEltMergeType<T,Key> &e2) {
  100. return (e1.getPriority() >= e2.getPriority());
  101. }
  102. friend int operator != (const ExtendedEltMergeType<T,Key> &e1,
  103. const ExtendedEltMergeType<T,Key> &e2) {
  104. return (e1.getPriority() != e2.getPriority());
  105. }
  106. friend int operator == (const ExtendedEltMergeType<T,Key> &e1,
  107. const ExtendedEltMergeType<T,Key> &e2) {
  108. return (e1.getPriority() == e2.getPriority());
  109. }
  110. };
  111. //************************************************************/
  112. //create an em_pqueue
  113. template<class T, class Key>
  114. em_pqueue<T,Key>::em_pqueue(long pq_sz, long buf_sz ,
  115. unsigned short nb_buf,
  116. unsigned int buf_ar):
  117. pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf),
  118. crt_buf(0), buf_arity(buf_ar) {
  119. //____________________________________________________________
  120. //ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
  121. AMI_err ae;
  122. size_t mm_avail = getAvailableMemory();
  123. printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
  124. mm_avail/(float)(1<<20));
  125. printf("EM_PQUEUE:available memory before allocation: %ldB\n",
  126. mm_avail);
  127. //____________________________________________________________
  128. //ALLOCATE STRUCTURE
  129. //some dummy checks..
  130. assert(pqsize > 0 && bufsize > 0);
  131. MEMORY_LOG("em_pqueue: allocating int pqueue\n");
  132. //initialize in memory pqueue
  133. pq = new MinMaxHeap<T>(pqsize);
  134. assert(pq);
  135. MEMORY_LOG("em_pqueue: allocating buff_0\n");
  136. //initialize in memory buffer
  137. buff_0 = new im_buffer<T>(bufsize);
  138. assert(buff_0);
  139. char str[200];
  140. sprintf(str, "em_pqueue: allocating array of %ld buff pointers\n",
  141. (long)max_nbuf);
  142. MEMORY_LOG(str);
  143. //allocate ext memory buffers array
  144. buff = new em_buffer<T,Key>* [max_nbuf];
  145. assert(buff);
  146. for (unsigned short i=0; i<max_nbuf; i++) {
  147. buff[i] = NULL;
  148. }
  149. //____________________________________________________________
  150. //some memory checks- make sure the empq fits in memory !!
  151. //estimate available memory after allocation
  152. mm_avail = getAvailableMemory();
  153. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  154. mm_avail/(float)(1<<20));
  155. //estimate AMI_STREAM memory usage
  156. size_t sz_stream;
  157. AMI_STREAM<T> dummy;
  158. if ((ae = dummy.main_memory_usage(&sz_stream,
  159. MM_STREAM_USAGE_MAXIMUM)) !=
  160. AMI_ERROR_NO_ERROR) {
  161. cout << "em_pqueue constructor: failing to get stream_usage\n";
  162. exit(1);
  163. }
  164. cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
  165. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  166. //estimate memory overhead
  167. long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
  168. max_nbuf * sizeof(em_buffer<T,Key>) +
  169. 2*sz_stream + max_nbuf*sz_stream;
  170. mm_overhead *= 8; //overestimate
  171. cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
  172. if (mm_overhead > mm_avail) {
  173. cout << "overhead bigger than available memory"
  174. << "increase -m and try again\n";
  175. exit(1);
  176. }
  177. mm_avail -= mm_overhead;
  178. //arity*sizeof(AMI_STREAM) < memory
  179. cout << "pqsize=" << pqsize
  180. << ", bufsize=" << bufsize
  181. << ", maximum allowed arity=" << mm_avail/sz_stream << endl;
  182. if (buf_arity * sz_stream > mm_avail) {
  183. cout << "sorry - empq excedes memory limits\n";
  184. cout << "try again decreasing arity or pqsize/bufsize\n";
  185. cout.flush();
  186. }
  187. }
  188. //************************************************************/
  189. //create an em_pqueue capable to store <= N elements
  190. template<class T, class Key>
  191. em_pqueue<T,Key>::em_pqueue() {
  192. MY_LOG_DEBUG_ID("em_pqueue constructor");
  193. /************************************************************/
  194. //available memory
  195. AMI_err ae;
  196. //available memory
  197. size_t mm_avail = getAvailableMemory();
  198. printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
  199. mm_avail/(float)(1<<20));
  200. cout.flush();
  201. //AMI_STREAM memory usage
  202. size_t sz_stream;
  203. AMI_STREAM<T> dummy;
  204. if ((ae = dummy.main_memory_usage(&sz_stream,
  205. MM_STREAM_USAGE_MAXIMUM)) !=
  206. AMI_ERROR_NO_ERROR) {
  207. cout << "em_pqueue constructor: failing to get main_memory_usage\n";
  208. exit(1);
  209. }
  210. cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
  211. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  212. cout.flush();
  213. //assume max_nbuf=2 suffices; check after arity is computed
  214. max_nbuf = 2;
  215. //account for temporary memory usage (set up a preliminary arity)
  216. buf_arity = mm_avail/(2 * sz_stream);
  217. long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
  218. max_nbuf * sizeof(em_buffer<T,Key>) +
  219. 2*sz_stream + max_nbuf*sz_stream;
  220. mm_overhead *= 8; //overestimate
  221. cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
  222. if (mm_overhead > mm_avail) {
  223. cout << "overhead bigger than available memory"
  224. << "increase -m and try again\n";
  225. exit(1);
  226. }
  227. mm_avail -= mm_overhead;
  228. #ifdef SAVE_MEMORY
  229. //assign M/2 to pq
  230. pqsize = mm_avail/(2*sizeof(T));
  231. //assign M/2 to buff_0
  232. bufsize = mm_avail/(2*sizeof(T));
  233. #else
  234. //assign M/4 to pq
  235. pqsize = mm_avail/(4*sizeof(T));
  236. //assign M/4 to buff_0
  237. bufsize = mm_avail/(4*sizeof(T));
  238. #endif
  239. cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
  240. cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
  241. cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
  242. //assign M/2 to AMI_STREAMS and compute arity
  243. /* arity is mainly constrained by the size of an AMI_STREAM; the
  244. rest of the memory must accomodate for arity * max_nbuf
  245. *sizeof(AMI_STREAM); there are some temporary stuff like arity *
  246. sizeof(long) (the deleted array), arity * sizeof(T) (the array of
  247. keys for merging) and so on, but the main factor is the
  248. AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
  249. allocates 2 logical blocks) */
  250. #ifdef SAVE_MEMORY
  251. buf_arity = mm_avail/(2 * sz_stream);
  252. #else
  253. buf_arity = mm_avail/(2 * max_nbuf * sz_stream);
  254. #endif
  255. //overestimate usage
  256. if (buf_arity > 3) {
  257. buf_arity -= 3;
  258. } else {
  259. buf_arity = 1;
  260. }
  261. cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
  262. crt_buf = 0;
  263. //initialize in memory pqueue
  264. MEMORY_LOG("em_pqueue: allocating int pqueue\n");
  265. pq = new MinMaxHeap<T>(pqsize);
  266. assert(pq);
  267. //initialize in memory buffer
  268. MEMORY_LOG("em_pqueue: allocating buff_0\n");
  269. buff_0 = new im_buffer<T>(bufsize);
  270. assert(buff_0);
  271. //allocate ext memory buffers array
  272. char str[200];
  273. sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
  274. (long)max_nbuf);
  275. MEMORY_LOG(str);
  276. //allocate ext memory buffers array
  277. buff = new em_buffer<T,Key>* [max_nbuf];
  278. assert(buff);
  279. for (unsigned short i=0; i<max_nbuf; i++) {
  280. buff[i] = NULL;
  281. }
  282. //max nb of items the structure can accomodate (constrained by max_nbuf)
  283. cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
  284. cout.flush();
  285. //check that structure can accomodate N elements
  286. // assert(N < buf_arity * (buf_arity + 1) * bufsize);
  287. //assert(N < maxlen());
  288. mm_avail = getAvailableMemory();
  289. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  290. mm_avail/(float)(1<<20));
  291. }
  292. #ifdef SAVE_MEMORY
  293. //************************************************************/
  294. // create an empq, initialize its pq with im and insert amis in
  295. // buff[0]; im should not be used/deleted after that outside empq;
  296. //
  297. // assumption: im was allocated such that maxsize = mm_avail/T;
  298. // when this constructor is called im is only half full, so we must
  299. // free half of its space and give to buff_0
  300. template<class T, class Key>
  301. em_pqueue<T,Key>::em_pqueue(MinMaxHeap<T> *im, AMI_STREAM<T> *amis) {
  302. AMI_err ae;
  303. int pqcapacity; /* amount of memory we can use for each of new
  304. minmaxheap, and em-buffer */
  305. unsigned int pqcurrentsize; /* number of elements currently in im */
  306. assert(im && amis);
  307. pqcapacity = im->get_maxsize()/2; // we think this memory is now available
  308. pqsize = pqcapacity + 1; //truncate errors
  309. pqcurrentsize = im->size();
  310. //assert( pqcurrentsize <= pqsize);
  311. if(!(pqcurrentsize <= pqsize)) {
  312. cout << "EMPQ: pq maxsize=" << pqsize <<", pq crtsize=" << pqcurrentsize
  313. << "\n";
  314. assert(0);
  315. exit(1);
  316. }
  317. LOG_avail_memo();
  318. /* at this point im is allocated all memory, but it is only at most
  319. half full; we need to relocate im to half space and to allocate
  320. buff_0 the other half; since we use new, there is no realloc, so
  321. we will copy to a file...*/
  322. {
  323. //copy im to a stream and free its memory
  324. T x;
  325. AMI_STREAM<T> tmpstr;
  326. for (unsigned int i=0; i<pqcurrentsize; i++) {
  327. im->extract_min(x);
  328. ae = tmpstr.write_item(x);
  329. assert(ae == AMI_ERROR_NO_ERROR);
  330. }
  331. delete im; im = NULL;
  332. LOG_avail_memo();
  333. //allocate pq and buff_0 half size
  334. bufsize = pqcapacity;
  335. cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
  336. << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n";
  337. cout.flush();
  338. buff_0 = new im_buffer<T>(bufsize);
  339. assert(buff_0);
  340. cout << "EM_PQUEUE: allocating pq size=" << pqsize
  341. << " total " << (float)pqcapacity*sizeof(T)/(1<<20) << "MB\n";
  342. cout.flush();
  343. pq = new MinMaxHeap<T>(pqsize);
  344. assert(pq);
  345. //fill pq from tmp stream
  346. ae = tmpstr.seek(0);
  347. assert(ae == AMI_ERROR_NO_ERROR);
  348. T *elt;
  349. for (unsigned int i=0; i<pqcurrentsize; i++) {
  350. ae = tmpstr.read_item(&elt);
  351. assert(ae == AMI_ERROR_NO_ERROR);
  352. pq->insert(*elt);
  353. }
  354. assert(pq->size() == pqcurrentsize);
  355. }
  356. //estimate buf_arity
  357. //AMI_STREAM memory usage
  358. size_t sz_stream;
  359. AMI_STREAM<T> dummy;
  360. if ((ae = dummy.main_memory_usage(&sz_stream,
  361. MM_STREAM_USAGE_MAXIMUM)) !=
  362. AMI_ERROR_NO_ERROR) {
  363. cout << "em_pqueue constructor: failing to get main_memory_usage\n";
  364. exit(1);
  365. }
  366. cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
  367. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  368. //assume max_nbuf=2 suffices; check after arity is computed
  369. max_nbuf = 2;
  370. buf_arity = pqcapacity * sizeof(T) / sz_stream;
  371. //should account for some overhead
  372. if (buf_arity == 0) {
  373. cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
  374. exit(1);
  375. }
  376. if (buf_arity > 3) {
  377. buf_arity -= 3;
  378. } else {
  379. buf_arity = 1;
  380. }
  381. //added on 05/16/2005 by Laura
  382. if (buf_arity > MAX_STREAMS_OPEN) {
  383. buf_arity = MAX_STREAMS_OPEN;
  384. }
  385. //allocate ext memory buffer array
  386. char str[200];
  387. sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
  388. (long)max_nbuf);
  389. MEMORY_LOG(str);
  390. buff = new em_buffer<T,Key>* [max_nbuf];
  391. assert(buff);
  392. for (unsigned short i=0; i<max_nbuf; i++) {
  393. buff[i] = NULL;
  394. }
  395. crt_buf = 0;
  396. cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
  397. cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
  398. cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
  399. cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
  400. cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
  401. cout.flush();
  402. //estimate available remaining memory
  403. size_t mm_avail = getAvailableMemory();
  404. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  405. mm_avail/(float)(1<<20));
  406. //last thing: insert the input stream in external buffers
  407. //allocate buffer if necessary
  408. //assert(crt_buf==0 && !buff[0]);// given
  409. if(amis->stream_len()) {
  410. //create buff[0] as a level1 buffer
  411. MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
  412. buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
  413. buff[0]->insert(amis);
  414. crt_buf = 1;
  415. }
  416. }
  417. #endif
  418. //************************************************************/
  419. //free space
  420. template<class T, class Key>
  421. em_pqueue<T,Key>::~em_pqueue() {
  422. //delete in memory pqueue
  423. if (pq) {
  424. delete pq; pq = NULL;
  425. }
  426. //delete in memory buffer
  427. if (buff_0) {
  428. delete buff_0; buff_0 = NULL;
  429. }
  430. //delete ext memory buffers
  431. for (unsigned short i=0; i< crt_buf; i++) {
  432. if (buff[i]) delete buff[i];
  433. }
  434. delete [] buff;
  435. }
  436. //************************************************************/
  437. //return maximum capacity of i-th external buffer
  438. template<class T, class Key>
  439. long em_pqueue<T,Key>::maxlen(unsigned short i) {
  440. if (i >= max_nbuf) {
  441. printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n",
  442. i, max_nbuf);
  443. return 0;
  444. }
  445. if (i < crt_buf) {
  446. return buff[i]->get_buf_maxlen();
  447. }
  448. //try allocating buffer
  449. em_buffer<T,Key> * tmp = new em_buffer<T,Key>(i+1, bufsize, buf_arity);
  450. if (!tmp) {
  451. cout << "em_pqueue::max_len: cannot allocate\n";
  452. return 0;
  453. }
  454. long len = tmp->get_buf_maxlen();
  455. delete tmp;
  456. return len;
  457. }
  458. //************************************************************/
  459. //return maximum capacity of em_pqueue
  460. template<class T, class Key>
  461. long em_pqueue<T,Key>::maxlen() {
  462. long len = 0;
  463. for (unsigned short i=0; i< max_nbuf; i++) {
  464. len += maxlen(i);
  465. }
  466. return len + buff_0->get_buf_maxlen();
  467. }
  468. //************************************************************/
  469. //return the total nb of elements in the structure
  470. template<class T, class Key>
  471. unsigned long em_pqueue<T,Key>::size() {
  472. //sum up the lenghts(nb of elements) of the external buffers
  473. unsigned long elen = 0;
  474. for (unsigned short i=0; i < crt_buf; i++) {
  475. elen += buff[i]->get_buf_len();
  476. }
  477. return elen + pq->size() + buff_0->get_buf_len();
  478. }
  479. //************************************************************/
  480. //return true if empty
  481. template<class T, class Key>
  482. bool em_pqueue<T,Key>::is_empty() {
  483. //return (size() == 0);
  484. //more efficient?
  485. return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) &&
  486. (size() == 0));
  487. }
  488. //************************************************************/
  489. //called when pq must be filled from external buffers
  490. template<class T, class Key>
  491. bool em_pqueue<T,Key>::fillpq() {
  492. #ifndef NDEBUG
  493. {
  494. int k=0;
  495. for (unsigned short i=0; i<crt_buf; i++) {
  496. k |= buff[i]->get_buf_len();
  497. }
  498. if(!k) {
  499. cerr << "fillpq called with empty external buff!" << endl;
  500. }
  501. assert(k);
  502. }
  503. #endif
  504. #ifdef EMPQ_PQ_FILL_PRINT
  505. cout << "filling pq\n"; cout .flush();
  506. #endif
  507. XXX cerr << "filling pq" << endl;
  508. MY_LOG_DEBUG_ID("fillpq");
  509. AMI_err ae;
  510. {
  511. char str[200];
  512. sprintf(str, "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
  513. crt_buf);
  514. MEMORY_LOG(str);
  515. }
  516. //merge pqsize smallest elements from each buffer into a new stream
  517. ExtendedMergeStream** outstreams;
  518. outstreams = new ExtendedMergeStream* [crt_buf];
  519. /* gets stream of smallest pqsize elts from each level */
  520. for (unsigned short i=0; i< crt_buf; i++) {
  521. MY_LOG_DEBUG_ID(crt_buf);
  522. outstreams[i] = new ExtendedMergeStream();
  523. assert(buff[i]->get_buf_len());
  524. ae = merge_buffer(buff[i], outstreams[i], pqsize);
  525. assert(ae == AMI_ERROR_NO_ERROR);
  526. assert(outstreams[i]->stream_len());
  527. //print_stream(outstreams[i]); cout.flush();
  528. }
  529. /* merge above streams into pqsize elts in minstream */
  530. if (crt_buf == 1) {
  531. //just one level; make common case faster :)
  532. merge_bufs2pq(outstreams[0]);
  533. delete outstreams[0];
  534. delete [] outstreams;
  535. } else {
  536. //merge the outstreams to get the global mins and delete them afterwards
  537. ExtendedMergeStream *minstream = new ExtendedMergeStream();
  538. //cout << "merging streams\n";
  539. ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
  540. assert(ae == AMI_ERROR_NO_ERROR);
  541. for (int i=0; i< crt_buf; i++) {
  542. delete outstreams[i];
  543. }
  544. delete [] outstreams;
  545. //copy the minstream in the internal pqueue while merging with buff_0;
  546. //the smallest <pqsize> elements between minstream and buff_0 will be
  547. //inserted in internal pqueue;
  548. //also, the elements from minstram which are inserted in pqueue must be
  549. //marked as deleted in the source streams;
  550. merge_bufs2pq(minstream);
  551. delete minstream; minstream = NULL;
  552. //cout << "after merge_bufs2pq: \n" << *this << "\n";
  553. }
  554. XXX assert(pq->size());
  555. XXX cerr << "fillpq done" << endl;
  556. return true;
  557. }
  558. //************************************************************/
  559. //return the element with minimum priority in the structure
  560. template<class T, class Key>
  561. bool
  562. em_pqueue<T,Key>::min(T& elt){
  563. bool ok;
  564. MY_LOG_DEBUG_ID("em_pqueue::min");
  565. //try first the internal pqueue
  566. if (!pq->empty()) {
  567. ok = pq->min(elt);
  568. assert(ok);
  569. return ok;
  570. }
  571. MY_LOG_DEBUG_ID("extract_min: reset pq");
  572. pq->reset();
  573. //if external buffers are empty
  574. if (crt_buf == 0) {
  575. //if internal buffer is empty too, then nothing to extract
  576. if (buff_0->is_empty()) {
  577. //cerr << "min called on empty empq" << endl;
  578. return false;
  579. } else {
  580. #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
  581. cout << "filling pq from B0\n"; cout.flush();
  582. #endif
  583. //ext. buffs empty; fill int pqueue from buff_0
  584. long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
  585. buff_0->reset(pqsize, n);
  586. ok = pq->min(elt);
  587. assert(ok);
  588. return true;
  589. }
  590. } else {
  591. //external buffers are not empty;
  592. //called when pq must be filled from external buffers
  593. XXX print_size();
  594. fillpq();
  595. XXX cerr << "fillpq done; about to take min" << endl;
  596. ok = pq->min(elt);
  597. XXX cerr << "after taking min" << endl;
  598. assert(ok);
  599. return ok;
  600. } //end of else (if ext buffers are not empty)
  601. assert(0); //not reached
  602. }
  603. //************************************************************/
  604. template<class T,class Key>
  605. static void print_ExtendedMergeStream(ExtendedMergeStream &str) {
  606. ExtendedEltMergeType<T,Key> *x;
  607. str.seek(0);
  608. while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
  609. cout << *x << ", ";
  610. }
  611. cout << "\n";
  612. }
  613. //************************************************************/
  614. //delete the element with minimum priority in the structure;
  615. //return false if pq is empty
  616. template<class T, class Key>
  617. bool em_pqueue<T,Key>::extract_min(T& elt) {
  618. bool ok;
  619. MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
  620. //try first the internal pqueue
  621. if (!pq->empty()) {
  622. //cout << "extract from internal pq\n";
  623. MY_LOG_DEBUG_ID("extract from internal pq");
  624. ok = pq->extract_min(elt);
  625. assert(ok);
  626. return ok;
  627. }
  628. //if internal pqueue is empty, fill it up
  629. MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
  630. MY_LOG_DEBUG_ID("extract_min: reset pq");
  631. pq->reset();
  632. //if external buffers are empty
  633. if (crt_buf == 0) {
  634. //if internal buffer is empty too, then nothing to extract
  635. if (buff_0->is_empty()) {
  636. return false;
  637. } else {
  638. #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
  639. cout << "filling pq from B0\n"; cout.flush();
  640. #endif
  641. MY_LOG_DEBUG_ID("filling pq from buff_0");
  642. //ext. buffs empty; fill int pqueue from buff_0
  643. long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
  644. buff_0->reset(pqsize, n);
  645. ok = pq->extract_min(elt);
  646. assert(ok);
  647. return true;
  648. }
  649. } else {
  650. //external buffers are not empty;
  651. //called when pq must be filled from external buffers
  652. MY_LOG_DEBUG_ID("filling pq from buffers");
  653. #ifdef EMPQ_PRINT_SIZE
  654. long x = size(), y;
  655. y = x*sizeof(T) >> 20;
  656. cout << "pqsize:[" << active_streams() << " streams: ";
  657. print_stream_sizes();
  658. cout << " total " << x << "(" << y << "MB)]" << endl;
  659. cout.flush();
  660. #endif
  661. fillpq();
  662. MY_LOG_DEBUG_ID("pq filled");
  663. XXX cerr << "about to get the min" << endl;
  664. assert(pq);
  665. ok = pq->extract_min(elt);
  666. if (!ok) {
  667. cout << "failing assertion: pq->extract_min == true\n";
  668. this->print();
  669. assert(ok);
  670. }
  671. return ok;
  672. } //end of else (if ext buffers are not empty)
  673. assert(0); //not reached
  674. }
  675. //************************************************************/
  676. //extract all elts with min key, add them and return their sum
  677. //delete the element with minimum priority in the structure;
  678. //return false if pq is empty
  679. template<class T, class Key>
  680. bool em_pqueue<T,Key>::extract_all_min(T& elt) {
  681. //cout << "em_pqueue::extract_min_all(T): sorry not implemented\n";
  682. //exit(1);
  683. T next_elt;
  684. bool done = false;
  685. MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_ALL_MIN");
  686. //extract first elt
  687. if (!extract_min(elt)) {
  688. return false;
  689. } else {
  690. while (!done) {
  691. //peek at the next min elt to see if matches
  692. if ((!min(next_elt)) ||
  693. !(next_elt.getPriority() == elt.getPriority())) {
  694. done = true;
  695. } else {
  696. extract_min(next_elt);
  697. elt = elt + next_elt;
  698. MY_LOG_DEBUG_ID("EXTRACT_ALL_MIN: adding " );
  699. MY_LOG_DEBUG_ID(elt);
  700. }
  701. }
  702. }
  703. #ifdef EMPQ_PRINT_EXTRACTALL
  704. cout << "EXTRACTED: " << elt << endl; cout.flush();
  705. #endif
  706. #ifdef EMPQ_PRINT_EMPQ
  707. this->print();
  708. cout << endl;
  709. #endif
  710. return true;
  711. }
  712. //************************************************************/
  713. //copy the minstream in the internal pqueue while merging with buff_0;
  714. //the smallest <pqsize> elements between minstream and buff_0 will be
  715. //inserted in internal pqueue;
  716. //also, the elements from minstram which are inserted in pqueue must be
  717. //marked as deleted in the source streams;
  718. template<class T, class Key>
  719. void em_pqueue<T,Key>::merge_bufs2pq(ExtendedMergeStream *minstream) {
  720. //cout << "bufs2pq: \nminstream: "; print_stream(minstream);
  721. MY_LOG_DEBUG_ID("merge_bufs2pq: enter");
  722. AMI_err ae;
  723. //sort the internal buffer
  724. buff_0->sort();
  725. //cout << "bufs2pq: \nbuff0: " << *buff_0 << endl;
  726. ae = minstream->seek(0); //rewind minstream
  727. assert(ae == AMI_ERROR_NO_ERROR);
  728. bool strEmpty= false, bufEmpty=false;
  729. unsigned int bufPos = 0;
  730. ExtendedEltMergeType<T,Key> *strItem;
  731. T bufElt, strElt;
  732. ae = minstream->read_item(&strItem);
  733. if (ae == AMI_ERROR_END_OF_STREAM) {
  734. strEmpty = true;
  735. } else {
  736. assert(ae == AMI_ERROR_NO_ERROR);
  737. }
  738. if (bufPos < buff_0->get_buf_len()) {
  739. bufElt = buff_0->get_item(bufPos);
  740. } else {
  741. //cout << "buff0 empty\n";
  742. bufEmpty = true;
  743. }
  744. XXX cerr << "pqsize=" << pqsize << endl;
  745. XXX if(strEmpty) cerr << "stream is empty!!" << endl;
  746. for (unsigned int i=0; i< pqsize; i++) {
  747. if (!bufEmpty) {
  748. if ((!strEmpty) && (strElt = strItem->elt(),
  749. bufElt.getPriority() > strElt.getPriority())) {
  750. delete_str_elt(strItem->buffer_id(), strItem->stream_id());
  751. pq->insert(strElt);
  752. ae = minstream->read_item(&strItem);
  753. if (ae == AMI_ERROR_END_OF_STREAM) {
  754. strEmpty = true;
  755. } else {
  756. assert(ae == AMI_ERROR_NO_ERROR);
  757. }
  758. } else {
  759. bufPos++;
  760. pq->insert(bufElt);
  761. if (bufPos < buff_0->get_buf_len()) {
  762. bufElt = buff_0->get_item(bufPos);
  763. } else {
  764. bufEmpty = true;
  765. }
  766. }
  767. } else {
  768. if (!strEmpty) { //stream not empty
  769. strElt = strItem->elt();
  770. //cout << "i=" << i << "str & buff empty\n";
  771. delete_str_elt(strItem->buffer_id(), strItem->stream_id());
  772. pq->insert(strElt);
  773. //cout << "insert " << strElt << "\n";
  774. ae = minstream->read_item(&strItem);
  775. if (ae == AMI_ERROR_END_OF_STREAM) {
  776. strEmpty = true;
  777. } else {
  778. assert(ae == AMI_ERROR_NO_ERROR);
  779. }
  780. } else { //both empty: < pqsize items
  781. break;
  782. }
  783. }
  784. }
  785. //shift left buff_0 in case elements were deleted from the beginning
  786. buff_0->shift_left(bufPos);
  787. MY_LOG_DEBUG_ID("pq filled");
  788. #ifdef EMPQ_PQ_FILL_PRINT
  789. cout << "merge_bufs2pq: pq filled; now cleaning\n"; cout .flush();
  790. #endif
  791. //this->print();
  792. //clean buffers in case some streams have been emptied
  793. cleanup();
  794. MY_LOG_DEBUG_ID("merge_bufs2pq: done");
  795. }
  796. //************************************************************/
  797. //deletes one element from <buffer, stream>
  798. template<class T, class Key>
  799. void em_pqueue<T,Key>::delete_str_elt(unsigned short buf_id,
  800. unsigned int stream_id) {
  801. //check them
  802. assert(buf_id < crt_buf);
  803. assert(stream_id < buff[buf_id]->get_nbstreams());
  804. //update;
  805. buff[buf_id]->incr_deleted(stream_id);
  806. }
  807. //************************************************************/
  808. //clean buffers in case some streams have been emptied
  809. template<class T, class Key>
  810. void em_pqueue<T,Key>::cleanup() {
  811. MY_LOG_DEBUG_ID("em_pqueue::cleanup()");
  812. #ifdef EMPQ_PQ_FILL_PRINT
  813. cout << "em_pqueue: cleanup enter\n"; cout .flush();
  814. #endif
  815. //adjust buffers in case whole streams got deleted
  816. for (unsigned short i=0; i< crt_buf; i++) {
  817. //cout << "clean buffer " << i << ": "; cout.flush();
  818. buff[i]->cleanup();
  819. }
  820. if (crt_buf) {
  821. short i = crt_buf-1;
  822. while ((i>=0) && buff[i]->is_empty()) {
  823. crt_buf--;
  824. i--;
  825. }
  826. }
  827. #ifdef EMPQ_PQ_FILL_PRINT
  828. cout << "em_pqueue: cleanup done\n"; cout .flush();
  829. #endif
  830. //if a stream becomes too short move it on previous level
  831. //to be added..
  832. //cout <<"done cleaning up\n";
  833. }
  834. //************************************************************/
  835. //insert an element; return false if insertion fails
  836. template<class T, class Key>
  837. bool em_pqueue<T,Key>::insert(const T& x) {
  838. bool ok;
  839. #ifdef EMPQ_ASSERT_EXPENSIVE
  840. long init_size = size();
  841. #endif
  842. T val = x;
  843. MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
  844. //if structure is empty insert x in pq; not worth the trouble..
  845. if ((crt_buf == 0) && (buff_0->is_empty())) {
  846. if (!pq->full()) {
  847. MY_LOG_DEBUG_ID("insert in pq");
  848. pq->insert(x);
  849. return true;
  850. }
  851. }
  852. if (!pq->empty()) {
  853. T pqmax;
  854. bool ok;
  855. ok = pq->max(pqmax);
  856. assert(ok);
  857. // cout << "insert " << x << " max: " << pqmax << "\n";
  858. if (x <= pqmax) {
  859. //if x is smaller then pq_max and pq not full, insert x in pq
  860. if (!pq->full()) {
  861. #ifdef EMPQ_ASSERT_EXPENSIVE
  862. assert(size() == init_size);
  863. #endif
  864. pq->insert(x);
  865. return true;
  866. } else {
  867. //if x is smaller then pq_max and pq full, exchange x with pq_max
  868. pq->extract_max(val);
  869. pq->insert(x);
  870. //cout << "max is: " << val << endl;
  871. }
  872. }
  873. }
  874. /* at this point, x >= pqmax.
  875. we need to insert val==x or val==old max.
  876. */
  877. //if buff_0 full, empty it
  878. #ifdef EMPQ_ASSERT_EXPENSIVE
  879. assert(size() == init_size);
  880. #endif
  881. if (buff_0->is_full()) {
  882. #ifdef EMPQ_PRINT_SIZE
  883. long x = size(), y;
  884. y = x*sizeof(T) >> 20;
  885. cout << "pqsize:[" << active_streams() << " streams: ";
  886. print_stream_sizes();
  887. cout << " total " << x << "(" << y << "MB)]" << endl;
  888. cout.flush();
  889. #endif
  890. empty_buff_0();
  891. }
  892. #ifdef EMPQ_ASSERT_EXPENSIVE
  893. assert(size() == init_size);
  894. #endif
  895. //insert x in buff_0
  896. assert(!buff_0->is_full());
  897. MY_LOG_DEBUG_ID("insert in buff_0");
  898. ok = buff_0->insert(val);
  899. assert(ok);
  900. #ifdef EMPQ_PRINT_INSERT
  901. cout << "INSERTED: " << x << endl; cout.flush();
  902. #endif
  903. #ifdef EMPQ_PRINT_EMPQ
  904. this->print();
  905. cout << endl;
  906. #endif
  907. MY_LOG_DEBUG_ID("EM_PQUEUE: INSERTED");
  908. return true;
  909. }
  910. //************************************************************/
  911. /* called when buff_0 is full to empty it on external level_1 buffer;
  912. can produce cascading emptying
  913. */
  914. template<class T, class Key> bool
  915. em_pqueue<T,Key>::empty_buff_0() {
  916. #ifdef EMPQ_ASSERT_EXPENSIVE
  917. long init_size = size();
  918. #endif
  919. #ifdef EMPQ_EMPTY_BUF_PRINT
  920. cout << "emptying buff_0\n"; cout.flush();
  921. print_size();
  922. #endif
  923. MY_LOG_DEBUG_ID("empty buff 0");
  924. assert(buff_0->is_full());
  925. //sort the buffer
  926. buff_0->sort();
  927. //cout << "sorted buff_0: \n" << *buff_0 << "\n";
  928. #ifdef EMPQ_ASSERT_EXPENSIVE
  929. assert(size() == init_size);
  930. #endif
  931. //allocate buffer if necessary
  932. if (!buff[0]) { // XXX should check crt_buf
  933. //create buff[0] as a level1 buffer
  934. MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
  935. buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
  936. }
  937. //check that buff_0 fills exactly a stream of buff[0]
  938. assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
  939. //save buff_0 to stream
  940. MY_LOG_DEBUG_ID("empty buff_0 to stream");
  941. AMI_STREAM<T>* buff_0_str = buff_0->save2str();
  942. assert(buff_0_str);
  943. //MY_LOG_DEBUG_ID("buff_0 emptied");
  944. //reset buff_0
  945. buff_0->reset();
  946. MY_LOG_DEBUG_ID("buf_0 now reset");
  947. #ifdef EMPQ_ASSERT_EXPENSIVE
  948. assert(size() + buff_0->maxlen() == init_size);
  949. #endif
  950. //copy data from buff_0 to buff[0]
  951. if (buff[0]->is_full()) {
  952. //if buff[0] full, empty it recursively
  953. empty_buff(0);
  954. }
  955. buff[0]->insert(buff_0_str);
  956. MY_LOG_DEBUG_ID("stream inserted in buff[0]");
  957. //update the crt_buf pointer if necessary
  958. if (crt_buf == 0) crt_buf = 1;
  959. #ifdef EMPQ_ASSERT_EXPENSIVE
  960. assert(size() == init_size);
  961. #endif
  962. return true;
  963. }
  964. //************************************************************/
  965. /* sort and empty buff[i] into buffer[i+1] recursively; called
  966. by empty_buff_0() to empty subsequent buffers; i must
  967. be a valid (i<crt_buf) full buffer;
  968. */
  969. template<class T, class Key>
  970. void
  971. em_pqueue<T,Key>::empty_buff(unsigned short i) {
  972. #ifdef EMPQ_ASSERT_EXPENSIVE
  973. long init_size = size();
  974. #endif
  975. #ifdef EMPQ_EMPTY_BUF_PRINT
  976. cout << "emptying buffer_" << i << "\n"; cout.flush();
  977. print_size();
  978. #endif
  979. MY_LOG_DEBUG_ID("empty buff ");
  980. MY_LOG_DEBUG_ID(i);
  981. //i must be a valid, full buffer
  982. assert(i<crt_buf);
  983. assert(buff[i]->is_full());
  984. //check there is space to empty to
  985. if (i == max_nbuf-1) {
  986. cerr << "empty_buff:: cannot empty further - structure is full..\n";
  987. print_size();
  988. cerr << "ext buff array should reallocate in a future version..\n";
  989. exit(1);
  990. }
  991. //create next buffer if necessary
  992. if (!buff[i+1]) {
  993. //create buff[i+1] as a level-(i+2) buffer
  994. char str[200];
  995. sprintf(str, "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
  996. MEMORY_LOG(str);
  997. buff[i+1] = new em_buffer<T,Key>(i+2, bufsize, buf_arity);
  998. }
  999. assert(buff[i+1]);
  1000. //check that buff[i] fills exactly a stream of buff[i+1];
  1001. //extraneous (its checked in insert)
  1002. //assert(buff[i]->len() == buff[i+1]->streamlen());
  1003. //sort the buffer into a new stream
  1004. MY_LOG_DEBUG_ID("sort buffer ");
  1005. AMI_STREAM<T>* sorted_buf = buff[i]->sort();
  1006. //assert(sorted_buf->stream_len() == buff[i]->len());
  1007. //this is just for debugging
  1008. if (sorted_buf->stream_len() != buff[i]->get_buf_len()) {
  1009. cout << "sorted_stream_len: " << sorted_buf->stream_len()
  1010. << " , bufflen: " << buff[i]->get_buf_len() << endl; cout.flush();
  1011. AMI_err ae;
  1012. ae = sorted_buf->seek(0);
  1013. assert(ae == AMI_ERROR_NO_ERROR);
  1014. T *x;
  1015. while (sorted_buf->read_item(&x) == AMI_ERROR_NO_ERROR) {
  1016. cout << *x << ", "; cout.flush();
  1017. }
  1018. cout << "\n";
  1019. #ifdef EMPQ_ASSERT_EXPENSIVE
  1020. assert(sorted_buf->stream_len() == buff[i]->len());
  1021. #endif
  1022. }
  1023. #ifdef EMPQ_ASSERT_EXPENSIVE
  1024. assert(size() == init_size);
  1025. #endif
  1026. //reset buff[i] (delete all its streams )
  1027. buff[i]->reset();
  1028. #ifdef EMPQ_ASSERT_EXPENSIVE
  1029. assert(size() == init_size - sorted_buf->stream_len());
  1030. #endif
  1031. //link sorted buff[i] as a substream into buff[i+1];
  1032. //sorted_buf is a new stream, so it starts out with 0 deleted elements;
  1033. //of ocurse, its length might be smaller than nominal;
  1034. if (buff[i+1]->is_full()) {
  1035. empty_buff(i+1);
  1036. }
  1037. buff[i+1]->insert(sorted_buf, 0);
  1038. //update the crt_buf pointer if necessary
  1039. if (crt_buf < i+2) crt_buf = i+2;
  1040. #ifdef EMPQ_ASSERT_EXPENSIVE
  1041. assert(size() == init_size);
  1042. #endif
  1043. }
  1044. //************************************************************/
  1045. /* merge the first <K> elements of the streams of input buffer,
  1046. starting at position <buf.deleted[i]> in each stream; there are
  1047. <buf.arity> streams in total; write output in <outstream>; the
  1048. items written in outstream are of type <merge_output_type> which
  1049. extends T with the stream nb and buffer nb the item comes from;
  1050. this information is needed later to distribute items back; do not
  1051. delete the K merged elements from the input streams; <bufid> is the
  1052. id of the buffer whose streams are being merged;
  1053. the input streams are assumed sorted in increasing order of keys;
  1054. */
  1055. template<class T, class Key>
  1056. AMI_err
  1057. em_pqueue<T,Key>::merge_buffer(em_buffer<T,Key> *buf,
  1058. ExtendedMergeStream *outstream, long K) {
  1059. long* bos = buf->get_bos();
  1060. /* buff[0] is a level-1 buffer and so on */
  1061. unsigned short bufid = buf->get_level() - 1;
  1062. /* Pointers to current leading elements of streams */
  1063. unsigned int arity = buf->get_nbstreams();
  1064. AMI_STREAM<T>** instreams = buf->get_streams();
  1065. std::vector<T*> in_objects(arity);
  1066. AMI_err ami_err;
  1067. unsigned int i, j;
  1068. MY_LOG_DEBUG_ID("merge_buffer ");
  1069. MY_LOG_DEBUG_ID(buf->get_level());
  1070. assert(outstream);
  1071. assert(instreams);
  1072. assert(buf->get_buf_len());
  1073. assert(K>0);
  1074. //array initialized with first key from each stream (only non-null keys
  1075. //must be included)
  1076. MEMORY_LOG("em_pqueue::merge_buffer: allocate keys array\n");
  1077. merge_key<Key>* keys = new merge_key<Key> [arity];
  1078. /* count number of non-empty runs */
  1079. j = 0;
  1080. /* rewind and read the first item from every stream */
  1081. for (i = 0; i < arity ; i++ ) {
  1082. assert(instreams[i]);
  1083. //rewind stream
  1084. if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
  1085. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1086. return ami_err;
  1087. }
  1088. /* read first item */
  1089. ami_err = instreams[i]->read_item(&(in_objects[i]));
  1090. switch(ami_err) {
  1091. case AMI_ERROR_END_OF_STREAM:
  1092. in_objects[i] = NULL;
  1093. break;
  1094. case AMI_ERROR_NO_ERROR:
  1095. //cout << "stream " << i << " read " << *in_objects[i] << "\n";
  1096. //cout.flush();
  1097. // include this key in the array of keys
  1098. keys[j] = merge_key<Key>(in_objects[i]->getPriority(), i);
  1099. // cout << "key " << j << "set to " << keys[j] << "\n";
  1100. j++;
  1101. break;
  1102. default:
  1103. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1104. return ami_err;
  1105. }
  1106. }
  1107. unsigned int NonEmptyRuns = j;
  1108. // cout << "nonempyruns = " << NonEmptyRuns << "\n";
  1109. //build heap from the array of keys
  1110. pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
  1111. //cout << "heap is : " << mergeheap << "\n";
  1112. //repeatedly extract_min from heap and insert next item from same stream
  1113. long extracted = 0;
  1114. //rewind output buffer
  1115. ami_err = outstream->seek(0);
  1116. assert(ami_err == AMI_ERROR_NO_ERROR);
  1117. ExtendedEltMergeType<T,Key> out;
  1118. while (!mergeheap.empty() && (extracted < K)) {
  1119. //find min key and id of stream it comes from
  1120. i = mergeheap.min().stream_id();
  1121. //write min item to output stream
  1122. out = ExtendedEltMergeType<T,Key>(*in_objects[i], bufid, i);
  1123. if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
  1124. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1125. return ami_err;
  1126. }
  1127. //cout << "wrote " << out << "\n";
  1128. extracted++; //update nb of extracted elements
  1129. //read next item from same input stream
  1130. ami_err = instreams[i]->read_item(&(in_objects[i]));
  1131. switch(ami_err) {
  1132. case AMI_ERROR_END_OF_STREAM:
  1133. mergeheap.delete_min();
  1134. break;
  1135. case AMI_ERROR_NO_ERROR:
  1136. //extract the min from the heap and insert next key from the
  1137. //same stream
  1138. {
  1139. Key k = in_objects[i]->getPriority();
  1140. mergeheap.delete_min_and_insert(merge_key<Key>(k, i));
  1141. }
  1142. break;
  1143. default:
  1144. cerr << "WARNING!!! early breakout!!!" << endl;
  1145. return ami_err;
  1146. }
  1147. //cout << "PQ: " << mergeheap << "\n";
  1148. } //while
  1149. //delete [] keys;
  1150. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  1151. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
  1152. //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
  1153. //DESTRUCTOR, AND EVERYTHING SCREWS UP..
  1154. buf->put_streams();
  1155. MY_LOG_DEBUG_ID("merge_buffer: done");
  1156. //cout << "done merging buffer\n";
  1157. assert(extracted == outstream->stream_len());
  1158. assert(extracted); // something in, something out
  1159. return AMI_ERROR_NO_ERROR;
  1160. }
  1161. //************************************************************/
  1162. /* merge the first <K> elements of the input streams; there are <arity>
  1163. streams in total; write output in <outstream>;
  1164. the input streams are assumed sorted in increasing order of their
  1165. keys;
  1166. */
  1167. template<class T, class Key>
  1168. AMI_err
  1169. em_pqueue<T,Key>::merge_streams(ExtendedMergeStream** instreams,
  1170. unsigned short arity,
  1171. ExtendedMergeStream *outstream, long K) {
  1172. MY_LOG_DEBUG_ID("enter merge_streams");
  1173. assert(arity> 1);
  1174. //Pointers to current leading elements of streams
  1175. std::vector<ExtendedEltMergeType<T,Key> > in_objects(arity);
  1176. AMI_err ami_err;
  1177. //unsigned int i;
  1178. unsigned int nonEmptyRuns=0; //count number of non-empty runs
  1179. //array initialized with first element from each stream (only non-null keys
  1180. //must be included)
  1181. MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
  1182. merge_key<Key>* keys = new merge_key<Key> [arity];
  1183. assert(keys);
  1184. //rewind and read the first item from every stream
  1185. for (int i = 0; i < arity ; i++ ) {
  1186. //rewind stream
  1187. if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
  1188. return ami_err;
  1189. }
  1190. //read first item
  1191. ExtendedEltMergeType<T,Key> *objp;
  1192. ami_err = instreams[i]->read_item(&objp);
  1193. switch(ami_err) {
  1194. case AMI_ERROR_NO_ERROR:
  1195. in_objects[i] = *objp;
  1196. keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
  1197. nonEmptyRuns++;
  1198. break;
  1199. case AMI_ERROR_END_OF_STREAM:
  1200. break;
  1201. default:
  1202. return ami_err;
  1203. }
  1204. }
  1205. assert(nonEmptyRuns <= arity);
  1206. //build heap from the array of keys
  1207. pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns); /* takes ownership of keys */
  1208. //repeatedly extract_min from heap and insert next item from same stream
  1209. long extracted = 0;
  1210. //rewind output buffer
  1211. ami_err = outstream->seek(0);
  1212. assert(ami_err == AMI_ERROR_NO_ERROR);
  1213. while (!mergeheap.empty() && (extracted < K)) {
  1214. //find min key and id of stream it comes from
  1215. int id = mergeheap.min().stream_id();
  1216. //write min item to output stream
  1217. assert(id < nonEmptyRuns);
  1218. assert(id >= 0);
  1219. assert(mergeheap.size() == nonEmptyRuns);
  1220. ExtendedEltMergeType<T,Key> obj = in_objects[id];
  1221. if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
  1222. return ami_err;
  1223. }
  1224. //cout << "wrote " << *in_objects[i] << "\n";
  1225. //extract the min from the heap and insert next key from same stream
  1226. assert(id < nonEmptyRuns);
  1227. assert(id >= 0);
  1228. ExtendedEltMergeType<T,Key> *objp;
  1229. ami_err = instreams[id]->read_item(&objp);
  1230. switch(ami_err) {
  1231. case AMI_ERROR_NO_ERROR:
  1232. {
  1233. in_objects[id] = *objp;
  1234. merge_key<Key> tmp = merge_key<Key>(in_objects[id].getPriority(), id);
  1235. mergeheap.delete_min_and_insert(tmp);
  1236. }
  1237. extracted++; //update nb of extracted elements
  1238. break;
  1239. case AMI_ERROR_END_OF_STREAM:
  1240. mergeheap.delete_min();
  1241. break;
  1242. default:
  1243. return ami_err;
  1244. }
  1245. } //while
  1246. //delete [] keys;
  1247. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  1248. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
  1249. //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
  1250. //DESTRUCTOR, AND EVERYTHING SCREWS UP..
  1251. MY_LOG_DEBUG_ID("merge_streams: done");
  1252. return AMI_ERROR_NO_ERROR;
  1253. }
  1254. //************************************************************/
  1255. template<class T, class Key>
  1256. void
  1257. em_pqueue<T,Key>::clear() {
  1258. pq->clear();
  1259. buff_0->clear();
  1260. for(int i=0; i<crt_buf; i++) {
  1261. if(buff[i]) {
  1262. delete buff[i]; buff[i] = NULL;
  1263. }
  1264. }
  1265. crt_buf = 0;
  1266. }
  1267. //************************************************************/
  1268. template<class T, class Key>
  1269. void
  1270. em_pqueue<T,Key>::print_range() {
  1271. cout << "EM_PQ: [pq=" << pqsize
  1272. << ", b=" << bufsize
  1273. << ", bufs=" << max_nbuf
  1274. << ", ar=" << buf_arity << "]\n";
  1275. cout << "PQ: ";
  1276. //pq->print_range();
  1277. pq->print();
  1278. cout << endl;
  1279. cout << "B0: ";
  1280. // buff_0->print_range();
  1281. buff_0->print();
  1282. cout << "\n";
  1283. for (unsigned short i=0; i < crt_buf; i++) {
  1284. cout << "B" << i+1 << ": ";
  1285. buff[i]->print_range();
  1286. cout << endl;
  1287. }
  1288. cout.flush();
  1289. }
  1290. //************************************************************/
  1291. template<class T, class Key>
  1292. void
  1293. em_pqueue<T,Key>::print() {
  1294. cout << "EM_PQ: [pq=" << pqsize
  1295. << ", b=" << bufsize
  1296. << ", bufs=" << max_nbuf
  1297. << ", ar=" << buf_arity << "]\n";
  1298. cout << "PQ: ";
  1299. pq->print();
  1300. cout << endl;
  1301. cout << "B0: ";
  1302. buff_0->print();
  1303. cout << "\n";
  1304. for (unsigned short i=0; i < crt_buf; i++) {
  1305. cout << "B" << i+1 << ": " << endl;
  1306. buff[i]->print();
  1307. cout << endl;
  1308. }
  1309. cout.flush();
  1310. }
  1311. //************************************************************/
  1312. template<class T, class Key>
  1313. void em_pqueue<T,Key>::print_size() {
  1314. //sum up the lenghts(nb of elements) of the external buffers
  1315. long elen = 0;
  1316. cout << "EMPQ: pq=" << pq->size() <<",B0=" << buff_0->get_buf_len() << endl;
  1317. cout.flush();
  1318. for (unsigned short i=0; i < crt_buf; i++) {
  1319. assert(buff[i]);
  1320. cout << "B_" << i+1 << ":"; cout.flush();
  1321. buff[i]->print_stream_sizes();
  1322. elen += buff[i]->get_buf_len();
  1323. //cout << endl; cout.flush();
  1324. }
  1325. cout << "total: " << elen + pq->size() + buff_0->get_buf_len() << endl << endl;
  1326. cout.flush();
  1327. }
  1328. /*****************************************************************/
  1329. template<class T,class Key>
  1330. void em_pqueue<T,Key>::print_stream_sizes() {
  1331. for (unsigned short i=0; i< crt_buf; i++) {
  1332. cout << "[";
  1333. buff[i]->print_stream_sizes();
  1334. cout << "]";
  1335. }
  1336. cout.flush();
  1337. }
  1338. #undef XXX
  1339. #endif