empq_impl.h 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef __EMPQ_IMPL_H
  31. #define __EMPQ_IMPL_H
  32. #include <ostream>
  33. #include <vector>
  34. #include "empq.h"
  35. #if(0)
  36. #include "option.H"
  37. #define MY_LOG_DEBUG_ID(x) \
  38. if(GETOPT("debug")) cerr << __FILE__ << ":" << __LINE__<< " " << x << endl;
  39. #endif
  40. #undef XXX
  41. #define XXX if(0)
  42. #define MY_LOG_DEBUG_ID(x)
  43. /*****************************************************************/
  44. /* encapsulation of the element=<key/prio, data> together with <buffer_id>
  45. and <stream_id>; used during stream merging to remember where each
  46. key comes from;
  47. assumes that class T implements: Key getPriority()
  48. implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
  49. */
  50. template<class T,class Key>
  51. class ExtendedEltMergeType {
  52. private:
  53. T x;
  54. unsigned short buf_id;
  55. unsigned int str_id;
  56. public:
  57. ExtendedEltMergeType() {}
  58. ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid):
  59. x(e), buf_id(bid), str_id(sid) {}
  60. ~ExtendedEltMergeType() {}
  61. void set (T &e, unsigned short bid, unsigned int sid) {
  62. x = e;
  63. buf_id = bid;
  64. str_id = sid;
  65. }
  66. T elt() const {
  67. return x;
  68. }
  69. unsigned short buffer_id() const {
  70. return buf_id;
  71. }
  72. unsigned int stream_id() const {
  73. return str_id;
  74. }
  75. Key getPriority() const {
  76. return x.getPriority();
  77. }
  78. //print
  79. friend ostream& operator<<(ostream& s,
  80. const ExtendedEltMergeType<T,Key> &elt) {
  81. return s << "<buf_id=" << elt.buf_id
  82. << ",str_id=" << elt.str_id << "> "
  83. << elt.x << " ";
  84. }
  85. friend int operator < (const ExtendedEltMergeType<T,Key> &e1,
  86. const ExtendedEltMergeType<T,Key> &e2) {
  87. return (e1.getPriority() < e2.getPriority());
  88. }
  89. friend int operator <= (const ExtendedEltMergeType<T,Key> &e1,
  90. const ExtendedEltMergeType<T,Key> &e2) {
  91. return (e1.getPriority() <= e2.getPriority());
  92. }
  93. friend int operator > (const ExtendedEltMergeType<T,Key> &e1,
  94. const ExtendedEltMergeType<T,Key> &e2) {
  95. return (e1.getPriority() > e2.getPriority());
  96. }
  97. friend int operator >= (const ExtendedEltMergeType<T,Key> &e1,
  98. const ExtendedEltMergeType<T,Key> &e2) {
  99. return (e1.getPriority() >= e2.getPriority());
  100. }
  101. friend int operator != (const ExtendedEltMergeType<T,Key> &e1,
  102. const ExtendedEltMergeType<T,Key> &e2) {
  103. return (e1.getPriority() != e2.getPriority());
  104. }
  105. friend int operator == (const ExtendedEltMergeType<T,Key> &e1,
  106. const ExtendedEltMergeType<T,Key> &e2) {
  107. return (e1.getPriority() == e2.getPriority());
  108. }
  109. };
  110. //************************************************************/
  111. //create an em_pqueue
  112. template<class T, class Key>
  113. em_pqueue<T,Key>::em_pqueue(long pq_sz, long buf_sz ,
  114. unsigned short nb_buf,
  115. unsigned int buf_ar):
  116. pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf),
  117. crt_buf(0), buf_arity(buf_ar) {
  118. //____________________________________________________________
  119. //ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
  120. AMI_err ae;
  121. size_t mm_avail = getAvailableMemory();
  122. printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
  123. mm_avail/(float)(1<<20));
  124. printf("EM_PQUEUE:available memory before allocation: %ldB\n",
  125. mm_avail);
  126. //____________________________________________________________
  127. //ALLOCATE STRUCTURE
  128. //some dummy checks..
  129. assert(pqsize > 0 && bufsize > 0);
  130. MEMORY_LOG("em_pqueue: allocating int pqueue\n");
  131. //initialize in memory pqueue
  132. pq = new MinMaxHeap<T>(pqsize);
  133. assert(pq);
  134. MEMORY_LOG("em_pqueue: allocating buff_0\n");
  135. //initialize in memory buffer
  136. buff_0 = new im_buffer<T>(bufsize);
  137. assert(buff_0);
  138. char str[200];
  139. sprintf(str, "em_pqueue: allocating array of %ld buff pointers\n",
  140. (long)max_nbuf);
  141. MEMORY_LOG(str);
  142. //allocate ext memory buffers array
  143. buff = new em_buffer<T,Key>* [max_nbuf];
  144. assert(buff);
  145. for (unsigned short i=0; i<max_nbuf; i++) {
  146. buff[i] = NULL;
  147. }
  148. //____________________________________________________________
  149. //some memory checks- make sure the empq fits in memory !!
  150. //estimate available memory after allocation
  151. mm_avail = getAvailableMemory();
  152. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  153. mm_avail/(float)(1<<20));
  154. //estimate AMI_STREAM memory usage
  155. size_t sz_stream;
  156. AMI_STREAM<T> dummy;
  157. if ((ae = dummy.main_memory_usage(&sz_stream,
  158. MM_STREAM_USAGE_MAXIMUM)) !=
  159. AMI_ERROR_NO_ERROR) {
  160. cout << "em_pqueue constructor: failing to get stream_usage\n";
  161. exit(1);
  162. }
  163. cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
  164. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  165. //estimate memory overhead
  166. long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
  167. max_nbuf * sizeof(em_buffer<T,Key>) +
  168. 2*sz_stream + max_nbuf*sz_stream;
  169. mm_overhead *= 8; //overestimate
  170. cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
  171. if (mm_overhead > mm_avail) {
  172. cout << "overhead bigger than available memory"
  173. << "increase -m and try again\n";
  174. exit(1);
  175. }
  176. mm_avail -= mm_overhead;
  177. //arity*sizeof(AMI_STREAM) < memory
  178. cout << "pqsize=" << pqsize
  179. << ", bufsize=" << bufsize
  180. << ", maximum allowed arity=" << mm_avail/sz_stream << endl;
  181. if (buf_arity * sz_stream > mm_avail) {
  182. cout << "sorry - empq excedes memory limits\n";
  183. cout << "try again decreasing arity or pqsize/bufsize\n";
  184. cout.flush();
  185. }
  186. }
  187. //************************************************************/
  188. //create an em_pqueue capable to store <= N elements
  189. template<class T, class Key>
  190. em_pqueue<T,Key>::em_pqueue() {
  191. MY_LOG_DEBUG_ID("em_pqueue constructor");
  192. /************************************************************/
  193. //available memory
  194. AMI_err ae;
  195. //available memory
  196. size_t mm_avail = getAvailableMemory();
  197. printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
  198. mm_avail/(float)(1<<20));
  199. cout.flush();
  200. //AMI_STREAM memory usage
  201. size_t sz_stream;
  202. AMI_STREAM<T> dummy;
  203. if ((ae = dummy.main_memory_usage(&sz_stream,
  204. MM_STREAM_USAGE_MAXIMUM)) !=
  205. AMI_ERROR_NO_ERROR) {
  206. cout << "em_pqueue constructor: failing to get main_memory_usage\n";
  207. exit(1);
  208. }
  209. cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
  210. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  211. cout.flush();
  212. //assume max_nbuf=2 suffices; check after arity is computed
  213. max_nbuf = 2;
  214. //account for temporary memory usage (set up a preliminary arity)
  215. buf_arity = mm_avail/(2 * sz_stream);
  216. long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
  217. max_nbuf * sizeof(em_buffer<T,Key>) +
  218. 2*sz_stream + max_nbuf*sz_stream;
  219. mm_overhead *= 8; //overestimate
  220. cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
  221. if (mm_overhead > mm_avail) {
  222. cout << "overhead bigger than available memory"
  223. << "increase -m and try again\n";
  224. exit(1);
  225. }
  226. mm_avail -= mm_overhead;
  227. #ifdef SAVE_MEMORY
  228. //assign M/2 to pq
  229. pqsize = mm_avail/(2*sizeof(T));
  230. //assign M/2 to buff_0
  231. bufsize = mm_avail/(2*sizeof(T));
  232. #else
  233. //assign M/4 to pq
  234. pqsize = mm_avail/(4*sizeof(T));
  235. //assign M/4 to buff_0
  236. bufsize = mm_avail/(4*sizeof(T));
  237. #endif
  238. cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
  239. cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
  240. cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
  241. //assign M/2 to AMI_STREAMS and compute arity
  242. /* arity is mainly constrained by the size of an AMI_STREAM; the
  243. rest of the memory must accommodate for arity * max_nbuf
  244. *sizeof(AMI_STREAM); there are some temporary stuff like arity *
  245. sizeof(long) (the deleted array), arity * sizeof(T) (the array of
  246. keys for merging) and so on, but the main factor is the
  247. AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
  248. allocates 2 logical blocks) */
  249. #ifdef SAVE_MEMORY
  250. buf_arity = mm_avail/(2 * sz_stream);
  251. #else
  252. buf_arity = mm_avail/(2 * max_nbuf * sz_stream);
  253. #endif
  254. //overestimate usage
  255. if (buf_arity > 3) {
  256. buf_arity -= 3;
  257. } else {
  258. buf_arity = 1;
  259. }
  260. cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
  261. crt_buf = 0;
  262. //initialize in memory pqueue
  263. MEMORY_LOG("em_pqueue: allocating int pqueue\n");
  264. pq = new MinMaxHeap<T>(pqsize);
  265. assert(pq);
  266. //initialize in memory buffer
  267. MEMORY_LOG("em_pqueue: allocating buff_0\n");
  268. buff_0 = new im_buffer<T>(bufsize);
  269. assert(buff_0);
  270. //allocate ext memory buffers array
  271. char str[200];
  272. sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
  273. (long)max_nbuf);
  274. MEMORY_LOG(str);
  275. //allocate ext memory buffers array
  276. buff = new em_buffer<T,Key>* [max_nbuf];
  277. assert(buff);
  278. for (unsigned short i=0; i<max_nbuf; i++) {
  279. buff[i] = NULL;
  280. }
  281. //max nb of items the structure can accommodate (constrained by max_nbuf)
  282. cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
  283. cout.flush();
  284. //check that structure can accommodate N elements
  285. // assert(N < buf_arity * (buf_arity + 1) * bufsize);
  286. //assert(N < maxlen());
  287. mm_avail = getAvailableMemory();
  288. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  289. mm_avail/(float)(1<<20));
  290. }
  291. #ifdef SAVE_MEMORY
  292. //************************************************************/
  293. // create an empq, initialize its pq with im and insert amis in
  294. // buff[0]; im should not be used/deleted after that outside empq;
  295. //
  296. // assumption: im was allocated such that maxsize = mm_avail/T;
  297. // when this constructor is called im is only half full, so we must
  298. // free half of its space and give to buff_0
  299. template<class T, class Key>
  300. em_pqueue<T,Key>::em_pqueue(MinMaxHeap<T> *im, AMI_STREAM<T> *amis) {
  301. AMI_err ae;
  302. int pqcapacity; /* amount of memory we can use for each of new
  303. minmaxheap, and em-buffer */
  304. unsigned int pqcurrentsize; /* number of elements currently in im */
  305. assert(im && amis);
  306. pqcapacity = im->get_maxsize()/2; // we think this memory is now available
  307. pqsize = pqcapacity + 1; //truncate errors
  308. pqcurrentsize = im->size();
  309. //assert( pqcurrentsize <= pqsize);
  310. if(!(pqcurrentsize <= pqsize)) {
  311. cout << "EMPQ: pq maxsize=" << pqsize <<", pq crtsize=" << pqcurrentsize
  312. << "\n";
  313. assert(0);
  314. exit(1);
  315. }
  316. LOG_avail_memo();
  317. /* at this point im is allocated all memory, but it is only at most
  318. half full; we need to relocate im to half space and to allocate
  319. buff_0 the other half; since we use new, there is no realloc, so
  320. we will copy to a file...*/
  321. {
  322. //copy im to a stream and free its memory
  323. T x;
  324. AMI_STREAM<T> tmpstr;
  325. for (unsigned int i=0; i<pqcurrentsize; i++) {
  326. im->extract_min(x);
  327. ae = tmpstr.write_item(x);
  328. assert(ae == AMI_ERROR_NO_ERROR);
  329. }
  330. delete im; im = NULL;
  331. LOG_avail_memo();
  332. //allocate pq and buff_0 half size
  333. bufsize = pqcapacity;
  334. cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
  335. << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n";
  336. cout.flush();
  337. buff_0 = new im_buffer<T>(bufsize);
  338. assert(buff_0);
  339. cout << "EM_PQUEUE: allocating pq size=" << pqsize
  340. << " total " << (float)pqcapacity*sizeof(T)/(1<<20) << "MB\n";
  341. cout.flush();
  342. pq = new MinMaxHeap<T>(pqsize);
  343. assert(pq);
  344. //fill pq from tmp stream
  345. ae = tmpstr.seek(0);
  346. assert(ae == AMI_ERROR_NO_ERROR);
  347. T *elt;
  348. for (unsigned int i=0; i<pqcurrentsize; i++) {
  349. ae = tmpstr.read_item(&elt);
  350. assert(ae == AMI_ERROR_NO_ERROR);
  351. pq->insert(*elt);
  352. }
  353. assert(pq->size() == pqcurrentsize);
  354. }
  355. //estimate buf_arity
  356. //AMI_STREAM memory usage
  357. size_t sz_stream;
  358. AMI_STREAM<T> dummy;
  359. if ((ae = dummy.main_memory_usage(&sz_stream,
  360. MM_STREAM_USAGE_MAXIMUM)) !=
  361. AMI_ERROR_NO_ERROR) {
  362. cout << "em_pqueue constructor: failing to get main_memory_usage\n";
  363. exit(1);
  364. }
  365. cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
  366. cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
  367. //assume max_nbuf=2 suffices; check after arity is computed
  368. max_nbuf = 2;
  369. buf_arity = pqcapacity * sizeof(T) / sz_stream;
  370. //should account for some overhead
  371. if (buf_arity == 0) {
  372. cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
  373. exit(1);
  374. }
  375. if (buf_arity > 3) {
  376. buf_arity -= 3;
  377. } else {
  378. buf_arity = 1;
  379. }
  380. //added on 05/16/2005 by Laura
  381. if (buf_arity > MAX_STREAMS_OPEN) {
  382. buf_arity = MAX_STREAMS_OPEN;
  383. }
  384. //allocate ext memory buffer array
  385. char str[200];
  386. sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
  387. (long)max_nbuf);
  388. MEMORY_LOG(str);
  389. buff = new em_buffer<T,Key>* [max_nbuf];
  390. assert(buff);
  391. for (unsigned short i=0; i<max_nbuf; i++) {
  392. buff[i] = NULL;
  393. }
  394. crt_buf = 0;
  395. cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
  396. cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
  397. cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
  398. cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
  399. cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
  400. cout.flush();
  401. //estimate available remaining memory
  402. size_t mm_avail = getAvailableMemory();
  403. printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
  404. mm_avail/(float)(1<<20));
  405. //last thing: insert the input stream in external buffers
  406. //allocate buffer if necessary
  407. //assert(crt_buf==0 && !buff[0]);// given
  408. if(amis->stream_len()) {
  409. //create buff[0] as a level1 buffer
  410. MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
  411. buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
  412. buff[0]->insert(amis);
  413. crt_buf = 1;
  414. }
  415. }
  416. #endif
  417. //************************************************************/
  418. //free space
  419. template<class T, class Key>
  420. em_pqueue<T,Key>::~em_pqueue() {
  421. //delete in memory pqueue
  422. if (pq) {
  423. delete pq; pq = NULL;
  424. }
  425. //delete in memory buffer
  426. if (buff_0) {
  427. delete buff_0; buff_0 = NULL;
  428. }
  429. //delete ext memory buffers
  430. for (unsigned short i=0; i< crt_buf; i++) {
  431. if (buff[i]) delete buff[i];
  432. }
  433. delete [] buff;
  434. }
  435. //************************************************************/
  436. //return maximum capacity of i-th external buffer
  437. template<class T, class Key>
  438. long em_pqueue<T,Key>::maxlen(unsigned short i) {
  439. if (i >= max_nbuf) {
  440. printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n",
  441. i, max_nbuf);
  442. return 0;
  443. }
  444. if (i < crt_buf) {
  445. return buff[i]->get_buf_maxlen();
  446. }
  447. //try allocating buffer
  448. em_buffer<T,Key> * tmp = new em_buffer<T,Key>(i+1, bufsize, buf_arity);
  449. if (!tmp) {
  450. cout << "em_pqueue::max_len: cannot allocate\n";
  451. return 0;
  452. }
  453. long len = tmp->get_buf_maxlen();
  454. delete tmp;
  455. return len;
  456. }
  457. //************************************************************/
  458. //return maximum capacity of em_pqueue
  459. template<class T, class Key>
  460. long em_pqueue<T,Key>::maxlen() {
  461. long len = 0;
  462. for (unsigned short i=0; i< max_nbuf; i++) {
  463. len += maxlen(i);
  464. }
  465. return len + buff_0->get_buf_maxlen();
  466. }
  467. //************************************************************/
  468. //return the total nb of elements in the structure
  469. template<class T, class Key>
  470. unsigned long em_pqueue<T,Key>::size() {
  471. //sum up the lengths(nb of elements) of the external buffers
  472. unsigned long elen = 0;
  473. for (unsigned short i=0; i < crt_buf; i++) {
  474. elen += buff[i]->get_buf_len();
  475. }
  476. return elen + pq->size() + buff_0->get_buf_len();
  477. }
  478. //************************************************************/
  479. //return true if empty
  480. template<class T, class Key>
  481. bool em_pqueue<T,Key>::is_empty() {
  482. //return (size() == 0);
  483. //more efficient?
  484. return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) &&
  485. (size() == 0));
  486. }
  487. //************************************************************/
  488. //called when pq must be filled from external buffers
  489. template<class T, class Key>
  490. bool em_pqueue<T,Key>::fillpq() {
  491. #ifndef NDEBUG
  492. {
  493. int k=0;
  494. for (unsigned short i=0; i<crt_buf; i++) {
  495. k |= buff[i]->get_buf_len();
  496. }
  497. if(!k) {
  498. cerr << "fillpq called with empty external buff!" << endl;
  499. }
  500. assert(k);
  501. }
  502. #endif
  503. #ifdef EMPQ_PQ_FILL_PRINT
  504. cout << "filling pq\n"; cout .flush();
  505. #endif
  506. XXX cerr << "filling pq" << endl;
  507. MY_LOG_DEBUG_ID("fillpq");
  508. AMI_err ae;
  509. {
  510. char str[200];
  511. sprintf(str, "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
  512. crt_buf);
  513. MEMORY_LOG(str);
  514. }
  515. //merge pqsize smallest elements from each buffer into a new stream
  516. ExtendedMergeStream** outstreams;
  517. outstreams = new ExtendedMergeStream* [crt_buf];
  518. /* gets stream of smallest pqsize elts from each level */
  519. for (unsigned short i=0; i< crt_buf; i++) {
  520. MY_LOG_DEBUG_ID(crt_buf);
  521. outstreams[i] = new ExtendedMergeStream();
  522. assert(buff[i]->get_buf_len());
  523. ae = merge_buffer(buff[i], outstreams[i], pqsize);
  524. assert(ae == AMI_ERROR_NO_ERROR);
  525. assert(outstreams[i]->stream_len());
  526. //print_stream(outstreams[i]); cout.flush();
  527. }
  528. /* merge above streams into pqsize elts in minstream */
  529. if (crt_buf == 1) {
  530. //just one level; make common case faster :)
  531. merge_bufs2pq(outstreams[0]);
  532. delete outstreams[0];
  533. delete [] outstreams;
  534. } else {
  535. //merge the outstreams to get the global mins and delete them afterwards
  536. ExtendedMergeStream *minstream = new ExtendedMergeStream();
  537. //cout << "merging streams\n";
  538. ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
  539. assert(ae == AMI_ERROR_NO_ERROR);
  540. for (int i=0; i< crt_buf; i++) {
  541. delete outstreams[i];
  542. }
  543. delete [] outstreams;
  544. //copy the minstream in the internal pqueue while merging with buff_0;
  545. //the smallest <pqsize> elements between minstream and buff_0 will be
  546. //inserted in internal pqueue;
  547. //also, the elements from minstram which are inserted in pqueue must be
  548. //marked as deleted in the source streams;
  549. merge_bufs2pq(minstream);
  550. delete minstream; minstream = NULL;
  551. //cout << "after merge_bufs2pq: \n" << *this << "\n";
  552. }
  553. XXX assert(pq->size());
  554. XXX cerr << "fillpq done" << endl;
  555. return true;
  556. }
  557. //************************************************************/
  558. //return the element with minimum priority in the structure
  559. template<class T, class Key>
  560. bool
  561. em_pqueue<T,Key>::min(T& elt){
  562. bool ok;
  563. MY_LOG_DEBUG_ID("em_pqueue::min");
  564. //try first the internal pqueue
  565. if (!pq->empty()) {
  566. ok = pq->min(elt);
  567. assert(ok);
  568. return ok;
  569. }
  570. MY_LOG_DEBUG_ID("extract_min: reset pq");
  571. pq->reset();
  572. //if external buffers are empty
  573. if (crt_buf == 0) {
  574. //if internal buffer is empty too, then nothing to extract
  575. if (buff_0->is_empty()) {
  576. //cerr << "min called on empty empq" << endl;
  577. return false;
  578. } else {
  579. #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
  580. cout << "filling pq from B0\n"; cout.flush();
  581. #endif
  582. //ext. buffs empty; fill int pqueue from buff_0
  583. long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
  584. buff_0->reset(pqsize, n);
  585. ok = pq->min(elt);
  586. assert(ok);
  587. return true;
  588. }
  589. } else {
  590. //external buffers are not empty;
  591. //called when pq must be filled from external buffers
  592. XXX print_size();
  593. fillpq();
  594. XXX cerr << "fillpq done; about to take min" << endl;
  595. ok = pq->min(elt);
  596. XXX cerr << "after taking min" << endl;
  597. assert(ok);
  598. return ok;
  599. } //end of else (if ext buffers are not empty)
  600. assert(0); //not reached
  601. }
  602. //************************************************************/
  603. template<class T,class Key>
  604. static void print_ExtendedMergeStream(ExtendedMergeStream &str) {
  605. ExtendedEltMergeType<T,Key> *x;
  606. str.seek(0);
  607. while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
  608. cout << *x << ", ";
  609. }
  610. cout << "\n";
  611. }
  612. //************************************************************/
  613. //delete the element with minimum priority in the structure;
  614. //return false if pq is empty
  615. template<class T, class Key>
  616. bool em_pqueue<T,Key>::extract_min(T& elt) {
  617. bool ok;
  618. MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
  619. //try first the internal pqueue
  620. if (!pq->empty()) {
  621. //cout << "extract from internal pq\n";
  622. MY_LOG_DEBUG_ID("extract from internal pq");
  623. ok = pq->extract_min(elt);
  624. assert(ok);
  625. return ok;
  626. }
  627. //if internal pqueue is empty, fill it up
  628. MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
  629. MY_LOG_DEBUG_ID("extract_min: reset pq");
  630. pq->reset();
  631. //if external buffers are empty
  632. if (crt_buf == 0) {
  633. //if internal buffer is empty too, then nothing to extract
  634. if (buff_0->is_empty()) {
  635. return false;
  636. } else {
  637. #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
  638. cout << "filling pq from B0\n"; cout.flush();
  639. #endif
  640. MY_LOG_DEBUG_ID("filling pq from buff_0");
  641. //ext. buffs empty; fill int pqueue from buff_0
  642. long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
  643. buff_0->reset(pqsize, n);
  644. ok = pq->extract_min(elt);
  645. assert(ok);
  646. return true;
  647. }
  648. } else {
  649. //external buffers are not empty;
  650. //called when pq must be filled from external buffers
  651. MY_LOG_DEBUG_ID("filling pq from buffers");
  652. #ifdef EMPQ_PRINT_SIZE
  653. long x = size(), y;
  654. y = x*sizeof(T) >> 20;
  655. cout << "pqsize:[" << active_streams() << " streams: ";
  656. print_stream_sizes();
  657. cout << " total " << x << "(" << y << "MB)]" << endl;
  658. cout.flush();
  659. #endif
  660. fillpq();
  661. MY_LOG_DEBUG_ID("pq filled");
  662. XXX cerr << "about to get the min" << endl;
  663. assert(pq);
  664. ok = pq->extract_min(elt);
  665. if (!ok) {
  666. cout << "failing assertion: pq->extract_min == true\n";
  667. this->print();
  668. assert(ok);
  669. }
  670. return ok;
  671. } //end of else (if ext buffers are not empty)
  672. assert(0); //not reached
  673. }
  674. //************************************************************/
  675. //extract all elts with min key, add them and return their sum
  676. //delete the element with minimum priority in the structure;
  677. //return false if pq is empty
  678. template<class T, class Key>
  679. bool em_pqueue<T,Key>::extract_all_min(T& elt) {
  680. //cout << "em_pqueue::extract_min_all(T): sorry not implemented\n";
  681. //exit(1);
  682. T next_elt;
  683. bool done = false;
  684. MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_ALL_MIN");
  685. //extract first elt
  686. if (!extract_min(elt)) {
  687. return false;
  688. } else {
  689. while (!done) {
  690. //peek at the next min elt to see if matches
  691. if ((!min(next_elt)) ||
  692. !(next_elt.getPriority() == elt.getPriority())) {
  693. done = true;
  694. } else {
  695. extract_min(next_elt);
  696. elt = elt + next_elt;
  697. MY_LOG_DEBUG_ID("EXTRACT_ALL_MIN: adding " );
  698. MY_LOG_DEBUG_ID(elt);
  699. }
  700. }
  701. }
  702. #ifdef EMPQ_PRINT_EXTRACTALL
  703. cout << "EXTRACTED: " << elt << endl; cout.flush();
  704. #endif
  705. #ifdef EMPQ_PRINT_EMPQ
  706. this->print();
  707. cout << endl;
  708. #endif
  709. return true;
  710. }
  711. //************************************************************/
  712. //copy the minstream in the internal pqueue while merging with buff_0;
  713. //the smallest <pqsize> elements between minstream and buff_0 will be
  714. //inserted in internal pqueue;
  715. //also, the elements from minstram which are inserted in pqueue must be
  716. //marked as deleted in the source streams;
  717. template<class T, class Key>
  718. void em_pqueue<T,Key>::merge_bufs2pq(ExtendedMergeStream *minstream) {
  719. //cout << "bufs2pq: \nminstream: "; print_stream(minstream);
  720. MY_LOG_DEBUG_ID("merge_bufs2pq: enter");
  721. AMI_err ae;
  722. //sort the internal buffer
  723. buff_0->sort();
  724. //cout << "bufs2pq: \nbuff0: " << *buff_0 << endl;
  725. ae = minstream->seek(0); //rewind minstream
  726. assert(ae == AMI_ERROR_NO_ERROR);
  727. bool strEmpty= false, bufEmpty=false;
  728. unsigned int bufPos = 0;
  729. ExtendedEltMergeType<T,Key> *strItem;
  730. T bufElt, strElt;
  731. ae = minstream->read_item(&strItem);
  732. if (ae == AMI_ERROR_END_OF_STREAM) {
  733. strEmpty = true;
  734. } else {
  735. assert(ae == AMI_ERROR_NO_ERROR);
  736. }
  737. if (bufPos < buff_0->get_buf_len()) {
  738. bufElt = buff_0->get_item(bufPos);
  739. } else {
  740. //cout << "buff0 empty\n";
  741. bufEmpty = true;
  742. }
  743. XXX cerr << "pqsize=" << pqsize << endl;
  744. XXX if(strEmpty) cerr << "stream is empty!!" << endl;
  745. for (unsigned int i=0; i< pqsize; i++) {
  746. if (!bufEmpty) {
  747. if ((!strEmpty) && (strElt = strItem->elt(),
  748. bufElt.getPriority() > strElt.getPriority())) {
  749. delete_str_elt(strItem->buffer_id(), strItem->stream_id());
  750. pq->insert(strElt);
  751. ae = minstream->read_item(&strItem);
  752. if (ae == AMI_ERROR_END_OF_STREAM) {
  753. strEmpty = true;
  754. } else {
  755. assert(ae == AMI_ERROR_NO_ERROR);
  756. }
  757. } else {
  758. bufPos++;
  759. pq->insert(bufElt);
  760. if (bufPos < buff_0->get_buf_len()) {
  761. bufElt = buff_0->get_item(bufPos);
  762. } else {
  763. bufEmpty = true;
  764. }
  765. }
  766. } else {
  767. if (!strEmpty) { //stream not empty
  768. strElt = strItem->elt();
  769. //cout << "i=" << i << "str & buff empty\n";
  770. delete_str_elt(strItem->buffer_id(), strItem->stream_id());
  771. pq->insert(strElt);
  772. //cout << "insert " << strElt << "\n";
  773. ae = minstream->read_item(&strItem);
  774. if (ae == AMI_ERROR_END_OF_STREAM) {
  775. strEmpty = true;
  776. } else {
  777. assert(ae == AMI_ERROR_NO_ERROR);
  778. }
  779. } else { //both empty: < pqsize items
  780. break;
  781. }
  782. }
  783. }
  784. //shift left buff_0 in case elements were deleted from the beginning
  785. buff_0->shift_left(bufPos);
  786. MY_LOG_DEBUG_ID("pq filled");
  787. #ifdef EMPQ_PQ_FILL_PRINT
  788. cout << "merge_bufs2pq: pq filled; now cleaning\n"; cout .flush();
  789. #endif
  790. //this->print();
  791. //clean buffers in case some streams have been emptied
  792. cleanup();
  793. MY_LOG_DEBUG_ID("merge_bufs2pq: done");
  794. }
  795. //************************************************************/
  796. //deletes one element from <buffer, stream>
  797. template<class T, class Key>
  798. void em_pqueue<T,Key>::delete_str_elt(unsigned short buf_id,
  799. unsigned int stream_id) {
  800. //check them
  801. assert(buf_id < crt_buf);
  802. assert(stream_id < buff[buf_id]->get_nbstreams());
  803. //update;
  804. buff[buf_id]->incr_deleted(stream_id);
  805. }
  806. //************************************************************/
  807. //clean buffers in case some streams have been emptied
  808. template<class T, class Key>
  809. void em_pqueue<T,Key>::cleanup() {
  810. MY_LOG_DEBUG_ID("em_pqueue::cleanup()");
  811. #ifdef EMPQ_PQ_FILL_PRINT
  812. cout << "em_pqueue: cleanup enter\n"; cout .flush();
  813. #endif
  814. //adjust buffers in case whole streams got deleted
  815. for (unsigned short i=0; i< crt_buf; i++) {
  816. //cout << "clean buffer " << i << ": "; cout.flush();
  817. buff[i]->cleanup();
  818. }
  819. if (crt_buf) {
  820. short i = crt_buf-1;
  821. while ((i>=0) && buff[i]->is_empty()) {
  822. crt_buf--;
  823. i--;
  824. }
  825. }
  826. #ifdef EMPQ_PQ_FILL_PRINT
  827. cout << "em_pqueue: cleanup done\n"; cout .flush();
  828. #endif
  829. //if a stream becomes too short move it on previous level
  830. //to be added..
  831. //cout <<"done cleaning up\n";
  832. }
  833. //************************************************************/
  834. //insert an element; return false if insertion fails
  835. template<class T, class Key>
  836. bool em_pqueue<T,Key>::insert(const T& x) {
  837. bool ok;
  838. #ifdef EMPQ_ASSERT_EXPENSIVE
  839. long init_size = size();
  840. #endif
  841. T val = x;
  842. MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
  843. //if structure is empty insert x in pq; not worth the trouble..
  844. if ((crt_buf == 0) && (buff_0->is_empty())) {
  845. if (!pq->full()) {
  846. MY_LOG_DEBUG_ID("insert in pq");
  847. pq->insert(x);
  848. return true;
  849. }
  850. }
  851. if (!pq->empty()) {
  852. T pqmax;
  853. ok = pq->max(pqmax);
  854. assert(ok);
  855. // cout << "insert " << x << " max: " << pqmax << "\n";
  856. if (x <= pqmax) {
  857. //if x is smaller then pq_max and pq not full, insert x in pq
  858. if (!pq->full()) {
  859. #ifdef EMPQ_ASSERT_EXPENSIVE
  860. assert(size() == init_size);
  861. #endif
  862. pq->insert(x);
  863. return true;
  864. } else {
  865. //if x is smaller then pq_max and pq full, exchange x with pq_max
  866. pq->extract_max(val);
  867. pq->insert(x);
  868. //cout << "max is: " << val << endl;
  869. }
  870. }
  871. }
  872. /* at this point, x >= pqmax.
  873. we need to insert val==x or val==old max.
  874. */
  875. //if buff_0 full, empty it
  876. #ifdef EMPQ_ASSERT_EXPENSIVE
  877. assert(size() == init_size);
  878. #endif
  879. if (buff_0->is_full()) {
  880. #ifdef EMPQ_PRINT_SIZE
  881. long x = size(), y;
  882. y = x*sizeof(T) >> 20;
  883. cout << "pqsize:[" << active_streams() << " streams: ";
  884. print_stream_sizes();
  885. cout << " total " << x << "(" << y << "MB)]" << endl;
  886. cout.flush();
  887. #endif
  888. empty_buff_0();
  889. }
  890. #ifdef EMPQ_ASSERT_EXPENSIVE
  891. assert(size() == init_size);
  892. #endif
  893. //insert x in buff_0
  894. assert(!buff_0->is_full());
  895. MY_LOG_DEBUG_ID("insert in buff_0");
  896. ok = buff_0->insert(val);
  897. assert(ok);
  898. #ifdef EMPQ_PRINT_INSERT
  899. cout << "INSERTED: " << x << endl; cout.flush();
  900. #endif
  901. #ifdef EMPQ_PRINT_EMPQ
  902. this->print();
  903. cout << endl;
  904. #endif
  905. MY_LOG_DEBUG_ID("EM_PQUEUE: INSERTED");
  906. return true;
  907. }
  908. //************************************************************/
  909. /* called when buff_0 is full to empty it on external level_1 buffer;
  910. can produce cascading emptying
  911. */
  912. template<class T, class Key> bool
  913. em_pqueue<T,Key>::empty_buff_0() {
  914. #ifdef EMPQ_ASSERT_EXPENSIVE
  915. long init_size = size();
  916. #endif
  917. #ifdef EMPQ_EMPTY_BUF_PRINT
  918. cout << "emptying buff_0\n"; cout.flush();
  919. print_size();
  920. #endif
  921. MY_LOG_DEBUG_ID("empty buff 0");
  922. assert(buff_0->is_full());
  923. //sort the buffer
  924. buff_0->sort();
  925. //cout << "sorted buff_0: \n" << *buff_0 << "\n";
  926. #ifdef EMPQ_ASSERT_EXPENSIVE
  927. assert(size() == init_size);
  928. #endif
  929. //allocate buffer if necessary
  930. if (!buff[0]) { // XXX should check crt_buf
  931. //create buff[0] as a level1 buffer
  932. MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
  933. buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
  934. }
  935. //check that buff_0 fills exactly a stream of buff[0]
  936. assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
  937. //save buff_0 to stream
  938. MY_LOG_DEBUG_ID("empty buff_0 to stream");
  939. AMI_STREAM<T>* buff_0_str = buff_0->save2str();
  940. assert(buff_0_str);
  941. //MY_LOG_DEBUG_ID("buff_0 emptied");
  942. //reset buff_0
  943. buff_0->reset();
  944. MY_LOG_DEBUG_ID("buf_0 now reset");
  945. #ifdef EMPQ_ASSERT_EXPENSIVE
  946. assert(size() + buff_0->maxlen() == init_size);
  947. #endif
  948. //copy data from buff_0 to buff[0]
  949. if (buff[0]->is_full()) {
  950. //if buff[0] full, empty it recursively
  951. empty_buff(0);
  952. }
  953. buff[0]->insert(buff_0_str);
  954. MY_LOG_DEBUG_ID("stream inserted in buff[0]");
  955. //update the crt_buf pointer if necessary
  956. if (crt_buf == 0) crt_buf = 1;
  957. #ifdef EMPQ_ASSERT_EXPENSIVE
  958. assert(size() == init_size);
  959. #endif
  960. return true;
  961. }
  962. //************************************************************/
  963. /* sort and empty buff[i] into buffer[i+1] recursively; called
  964. by empty_buff_0() to empty subsequent buffers; i must
  965. be a valid (i<crt_buf) full buffer;
  966. */
  967. template<class T, class Key>
  968. void
  969. em_pqueue<T,Key>::empty_buff(unsigned short i) {
  970. #ifdef EMPQ_ASSERT_EXPENSIVE
  971. long init_size = size();
  972. #endif
  973. #ifdef EMPQ_EMPTY_BUF_PRINT
  974. cout << "emptying buffer_" << i << "\n"; cout.flush();
  975. print_size();
  976. #endif
  977. MY_LOG_DEBUG_ID("empty buff ");
  978. MY_LOG_DEBUG_ID(i);
  979. //i must be a valid, full buffer
  980. assert(i<crt_buf);
  981. assert(buff[i]->is_full());
  982. //check there is space to empty to
  983. if (i == max_nbuf-1) {
  984. cerr << "empty_buff:: cannot empty further - structure is full..\n";
  985. print_size();
  986. cerr << "ext buff array should reallocate in a future version..\n";
  987. exit(1);
  988. }
  989. //create next buffer if necessary
  990. if (!buff[i+1]) {
  991. //create buff[i+1] as a level-(i+2) buffer
  992. char str[200];
  993. sprintf(str, "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
  994. MEMORY_LOG(str);
  995. buff[i+1] = new em_buffer<T,Key>(i+2, bufsize, buf_arity);
  996. }
  997. assert(buff[i+1]);
  998. //check that buff[i] fills exactly a stream of buff[i+1];
  999. //extraneous (its checked in insert)
  1000. //assert(buff[i]->len() == buff[i+1]->streamlen());
  1001. //sort the buffer into a new stream
  1002. MY_LOG_DEBUG_ID("sort buffer ");
  1003. AMI_STREAM<T>* sorted_buf = buff[i]->sort();
  1004. //assert(sorted_buf->stream_len() == buff[i]->len());
  1005. //this is just for debugging
  1006. if (sorted_buf->stream_len() != buff[i]->get_buf_len()) {
  1007. cout << "sorted_stream_len: " << sorted_buf->stream_len()
  1008. << " , bufflen: " << buff[i]->get_buf_len() << endl; cout.flush();
  1009. AMI_err ae;
  1010. ae = sorted_buf->seek(0);
  1011. assert(ae == AMI_ERROR_NO_ERROR);
  1012. T *x;
  1013. while (sorted_buf->read_item(&x) == AMI_ERROR_NO_ERROR) {
  1014. cout << *x << ", "; cout.flush();
  1015. }
  1016. cout << "\n";
  1017. #ifdef EMPQ_ASSERT_EXPENSIVE
  1018. assert(sorted_buf->stream_len() == buff[i]->len());
  1019. #endif
  1020. }
  1021. #ifdef EMPQ_ASSERT_EXPENSIVE
  1022. assert(size() == init_size);
  1023. #endif
  1024. //reset buff[i] (delete all its streams )
  1025. buff[i]->reset();
  1026. #ifdef EMPQ_ASSERT_EXPENSIVE
  1027. assert(size() == init_size - sorted_buf->stream_len());
  1028. #endif
  1029. //link sorted buff[i] as a substream into buff[i+1];
  1030. //sorted_buf is a new stream, so it starts out with 0 deleted elements;
  1031. //of ocurse, its length might be smaller than nominal;
  1032. if (buff[i+1]->is_full()) {
  1033. empty_buff(i+1);
  1034. }
  1035. buff[i+1]->insert(sorted_buf, 0);
  1036. //update the crt_buf pointer if necessary
  1037. if (crt_buf < i+2) crt_buf = i+2;
  1038. #ifdef EMPQ_ASSERT_EXPENSIVE
  1039. assert(size() == init_size);
  1040. #endif
  1041. }
  1042. //************************************************************/
  1043. /* merge the first <K> elements of the streams of input buffer,
  1044. starting at position <buf.deleted[i]> in each stream; there are
  1045. <buf.arity> streams in total; write output in <outstream>; the
  1046. items written in outstream are of type <merge_output_type> which
  1047. extends T with the stream nb and buffer nb the item comes from;
  1048. this information is needed later to distribute items back; do not
  1049. delete the K merged elements from the input streams; <bufid> is the
  1050. id of the buffer whose streams are being merged;
  1051. the input streams are assumed sorted in increasing order of keys;
  1052. */
  1053. template<class T, class Key>
  1054. AMI_err
  1055. em_pqueue<T,Key>::merge_buffer(em_buffer<T,Key> *buf,
  1056. ExtendedMergeStream *outstream, long K) {
  1057. long* bos = buf->get_bos();
  1058. /* buff[0] is a level-1 buffer and so on */
  1059. unsigned short bufid = buf->get_level() - 1;
  1060. /* Pointers to current leading elements of streams */
  1061. unsigned int arity = buf->get_nbstreams();
  1062. AMI_STREAM<T>** instreams = buf->get_streams();
  1063. std::vector<T*> in_objects(arity);
  1064. AMI_err ami_err;
  1065. unsigned int i, j;
  1066. MY_LOG_DEBUG_ID("merge_buffer ");
  1067. MY_LOG_DEBUG_ID(buf->get_level());
  1068. assert(outstream);
  1069. assert(instreams);
  1070. assert(buf->get_buf_len());
  1071. assert(K>0);
  1072. //array initialized with first key from each stream (only non-null keys
  1073. //must be included)
  1074. MEMORY_LOG("em_pqueue::merge_buffer: allocate keys array\n");
  1075. merge_key<Key>* keys = new merge_key<Key> [arity];
  1076. /* count number of non-empty runs */
  1077. j = 0;
  1078. /* rewind and read the first item from every stream */
  1079. for (i = 0; i < arity ; i++ ) {
  1080. assert(instreams[i]);
  1081. //rewind stream
  1082. if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
  1083. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1084. return ami_err;
  1085. }
  1086. /* read first item */
  1087. ami_err = instreams[i]->read_item(&(in_objects[i]));
  1088. switch(ami_err) {
  1089. case AMI_ERROR_END_OF_STREAM:
  1090. in_objects[i] = NULL;
  1091. break;
  1092. case AMI_ERROR_NO_ERROR:
  1093. //cout << "stream " << i << " read " << *in_objects[i] << "\n";
  1094. //cout.flush();
  1095. // include this key in the array of keys
  1096. keys[j] = merge_key<Key>(in_objects[i]->getPriority(), i);
  1097. // cout << "key " << j << "set to " << keys[j] << "\n";
  1098. j++;
  1099. break;
  1100. default:
  1101. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1102. return ami_err;
  1103. }
  1104. }
  1105. unsigned int NonEmptyRuns = j;
  1106. // cout << "nonempyruns = " << NonEmptyRuns << "\n";
  1107. //build heap from the array of keys
  1108. pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
  1109. //cout << "heap is : " << mergeheap << "\n";
  1110. //repeatedly extract_min from heap and insert next item from same stream
  1111. long extracted = 0;
  1112. //rewind output buffer
  1113. ami_err = outstream->seek(0);
  1114. assert(ami_err == AMI_ERROR_NO_ERROR);
  1115. ExtendedEltMergeType<T,Key> out;
  1116. while (!mergeheap.empty() && (extracted < K)) {
  1117. //find min key and id of stream it comes from
  1118. i = mergeheap.min().stream_id();
  1119. //write min item to output stream
  1120. out = ExtendedEltMergeType<T,Key>(*in_objects[i], bufid, i);
  1121. if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
  1122. cerr << "WARNING!!! EARLY EXIT!!!" << endl;
  1123. return ami_err;
  1124. }
  1125. //cout << "wrote " << out << "\n";
  1126. extracted++; //update nb of extracted elements
  1127. //read next item from same input stream
  1128. ami_err = instreams[i]->read_item(&(in_objects[i]));
  1129. switch(ami_err) {
  1130. case AMI_ERROR_END_OF_STREAM:
  1131. mergeheap.delete_min();
  1132. break;
  1133. case AMI_ERROR_NO_ERROR:
  1134. //extract the min from the heap and insert next key from the
  1135. //same stream
  1136. {
  1137. Key k = in_objects[i]->getPriority();
  1138. mergeheap.delete_min_and_insert(merge_key<Key>(k, i));
  1139. }
  1140. break;
  1141. default:
  1142. cerr << "WARNING!!! early breakout!!!" << endl;
  1143. return ami_err;
  1144. }
  1145. //cout << "PQ: " << mergeheap << "\n";
  1146. } //while
  1147. //delete [] keys;
  1148. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  1149. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
  1150. //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
  1151. //DESTRUCTOR, AND EVERYTHING SCREWS UP..
  1152. buf->put_streams();
  1153. MY_LOG_DEBUG_ID("merge_buffer: done");
  1154. //cout << "done merging buffer\n";
  1155. assert(extracted == outstream->stream_len());
  1156. assert(extracted); // something in, something out
  1157. return AMI_ERROR_NO_ERROR;
  1158. }
  1159. //************************************************************/
  1160. /* merge the first <K> elements of the input streams; there are <arity>
  1161. streams in total; write output in <outstream>;
  1162. the input streams are assumed sorted in increasing order of their
  1163. keys;
  1164. */
  1165. template<class T, class Key>
  1166. AMI_err
  1167. em_pqueue<T,Key>::merge_streams(ExtendedMergeStream** instreams,
  1168. unsigned short arity,
  1169. ExtendedMergeStream *outstream, long K) {
  1170. MY_LOG_DEBUG_ID("enter merge_streams");
  1171. assert(arity> 1);
  1172. //Pointers to current leading elements of streams
  1173. std::vector<ExtendedEltMergeType<T,Key> > in_objects(arity);
  1174. AMI_err ami_err;
  1175. //unsigned int i;
  1176. unsigned int nonEmptyRuns=0; //count number of non-empty runs
  1177. //array initialized with first element from each stream (only non-null keys
  1178. //must be included)
  1179. MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
  1180. merge_key<Key>* keys = new merge_key<Key> [arity];
  1181. assert(keys);
  1182. //rewind and read the first item from every stream
  1183. for (int i = 0; i < arity ; i++ ) {
  1184. //rewind stream
  1185. if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
  1186. return ami_err;
  1187. }
  1188. //read first item
  1189. ExtendedEltMergeType<T,Key> *objp;
  1190. ami_err = instreams[i]->read_item(&objp);
  1191. switch(ami_err) {
  1192. case AMI_ERROR_NO_ERROR:
  1193. in_objects[i] = *objp;
  1194. keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
  1195. nonEmptyRuns++;
  1196. break;
  1197. case AMI_ERROR_END_OF_STREAM:
  1198. break;
  1199. default:
  1200. return ami_err;
  1201. }
  1202. }
  1203. assert(nonEmptyRuns <= arity);
  1204. //build heap from the array of keys
  1205. pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns); /* takes ownership of keys */
  1206. //repeatedly extract_min from heap and insert next item from same stream
  1207. long extracted = 0;
  1208. //rewind output buffer
  1209. ami_err = outstream->seek(0);
  1210. assert(ami_err == AMI_ERROR_NO_ERROR);
  1211. while (!mergeheap.empty() && (extracted < K)) {
  1212. //find min key and id of stream it comes from
  1213. int id = mergeheap.min().stream_id();
  1214. //write min item to output stream
  1215. assert(id < nonEmptyRuns);
  1216. assert(id >= 0);
  1217. assert(mergeheap.size() == nonEmptyRuns);
  1218. ExtendedEltMergeType<T,Key> obj = in_objects[id];
  1219. if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
  1220. return ami_err;
  1221. }
  1222. //cout << "wrote " << *in_objects[i] << "\n";
  1223. //extract the min from the heap and insert next key from same stream
  1224. assert(id < nonEmptyRuns);
  1225. assert(id >= 0);
  1226. ExtendedEltMergeType<T,Key> *objp;
  1227. ami_err = instreams[id]->read_item(&objp);
  1228. switch(ami_err) {
  1229. case AMI_ERROR_NO_ERROR:
  1230. {
  1231. in_objects[id] = *objp;
  1232. merge_key<Key> tmp = merge_key<Key>(in_objects[id].getPriority(), id);
  1233. mergeheap.delete_min_and_insert(tmp);
  1234. }
  1235. extracted++; //update nb of extracted elements
  1236. break;
  1237. case AMI_ERROR_END_OF_STREAM:
  1238. mergeheap.delete_min();
  1239. break;
  1240. default:
  1241. return ami_err;
  1242. }
  1243. } //while
  1244. //delete [] keys;
  1245. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  1246. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
  1247. //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
  1248. //DESTRUCTOR, AND EVERYTHING SCREWS UP..
  1249. MY_LOG_DEBUG_ID("merge_streams: done");
  1250. return AMI_ERROR_NO_ERROR;
  1251. }
  1252. //************************************************************/
  1253. template<class T, class Key>
  1254. void
  1255. em_pqueue<T,Key>::clear() {
  1256. pq->clear();
  1257. buff_0->clear();
  1258. for(int i=0; i<crt_buf; i++) {
  1259. if(buff[i]) {
  1260. delete buff[i]; buff[i] = NULL;
  1261. }
  1262. }
  1263. crt_buf = 0;
  1264. }
  1265. //************************************************************/
  1266. template<class T, class Key>
  1267. void
  1268. em_pqueue<T,Key>::print_range() {
  1269. cout << "EM_PQ: [pq=" << pqsize
  1270. << ", b=" << bufsize
  1271. << ", bufs=" << max_nbuf
  1272. << ", ar=" << buf_arity << "]\n";
  1273. cout << "PQ: ";
  1274. //pq->print_range();
  1275. pq->print();
  1276. cout << endl;
  1277. cout << "B0: ";
  1278. // buff_0->print_range();
  1279. buff_0->print();
  1280. cout << "\n";
  1281. for (unsigned short i=0; i < crt_buf; i++) {
  1282. cout << "B" << i+1 << ": ";
  1283. buff[i]->print_range();
  1284. cout << endl;
  1285. }
  1286. cout.flush();
  1287. }
  1288. //************************************************************/
  1289. template<class T, class Key>
  1290. void
  1291. em_pqueue<T,Key>::print() {
  1292. cout << "EM_PQ: [pq=" << pqsize
  1293. << ", b=" << bufsize
  1294. << ", bufs=" << max_nbuf
  1295. << ", ar=" << buf_arity << "]\n";
  1296. cout << "PQ: ";
  1297. pq->print();
  1298. cout << endl;
  1299. cout << "B0: ";
  1300. buff_0->print();
  1301. cout << "\n";
  1302. for (unsigned short i=0; i < crt_buf; i++) {
  1303. cout << "B" << i+1 << ": " << endl;
  1304. buff[i]->print();
  1305. cout << endl;
  1306. }
  1307. cout.flush();
  1308. }
  1309. //************************************************************/
  1310. template<class T, class Key>
  1311. void em_pqueue<T,Key>::print_size() {
  1312. //sum up the lengths(nb of elements) of the external buffers
  1313. long elen = 0;
  1314. cout << "EMPQ: pq=" << pq->size() <<",B0=" << buff_0->get_buf_len() << endl;
  1315. cout.flush();
  1316. for (unsigned short i=0; i < crt_buf; i++) {
  1317. assert(buff[i]);
  1318. cout << "B_" << i+1 << ":"; cout.flush();
  1319. buff[i]->print_stream_sizes();
  1320. elen += buff[i]->get_buf_len();
  1321. //cout << endl; cout.flush();
  1322. }
  1323. cout << "total: " << elen + pq->size() + buff_0->get_buf_len() << endl << endl;
  1324. cout.flush();
  1325. }
  1326. /*****************************************************************/
  1327. template<class T,class Key>
  1328. void em_pqueue<T,Key>::print_stream_sizes() {
  1329. for (unsigned short i=0; i< crt_buf; i++) {
  1330. cout << "[";
  1331. buff[i]->print_stream_sizes();
  1332. cout << "]";
  1333. }
  1334. cout.flush();
  1335. }
  1336. #undef XXX
  1337. #endif