embuffer.h 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef __EMBUFFER_H
  31. #define __EMBUFFER_H
  32. #include <stdio.h>
  33. #include <assert.h>
  34. #include <stdlib.h>
  35. #include <math.h>
  36. #include "ami_config.h" //for SAVE_MEMORY
  37. #include "ami_stream.h"
  38. #include "mm.h"
  39. #include "mm_utils.h"
  40. #include "pqheap.h"
  41. #define MY_LOG_DEBUG_ID(x) //inhibit debug printing
  42. //#define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
  43. /**********************************************************
  44. DEBUGGING FLAGS
  45. ***********************************************************/
  46. //setting this enables checking that the streams/arrays inserted in
  47. //buffers are sorted in increasing order
  48. //#define EMBUF_CHECK_INSERT
  49. //enable checking that stream name is the same as the one stored in
  50. //the buffer name[]
  51. //#define EMBUF_CHECK_NAME
  52. //enable printing names as they are checked
  53. //#define EMBUF_CHECK_NAME_PRINT
  54. //enable printing when streams in a buffer are shifted left to
  55. //check that names are shifted accordingly
  56. //#define EMBUF_DELETE_STREAM_PRINT
  57. //enable printing the name of the stream which is inserted in buff
  58. //#define EMBUF_PRINT_INSERT
  59. //enable printing the stream names/sizes in cleanup()
  60. //#define EMBUF_CLEANUP_PRINT
  61. //enable printing when get/put_stream is called (for each stream)
  62. //#define EMBUF_PRINT_GETPUT_STREAM
  63. //enable printing when get/put_streams is called
  64. //#define EMBUF_PRINT_GETPUT_STREAMS
  65. /***********************************************************/
  66. /*****************************************************************/
  67. /* encapsulation of the key together with stream_id; used during
  68. stream merging to save space;
  69. */
  70. template<class KEY>
  71. class merge_key {
  72. public:
  73. KEY k;
  74. unsigned int str_id; //id of the stream where key comes from
  75. public:
  76. merge_key(): str_id(0) {}
  77. merge_key(const KEY &x, const unsigned int sid):
  78. k(x), str_id(sid) {}
  79. ~merge_key() {}
  80. void set(const KEY &x, const unsigned int sid) {
  81. k = x;
  82. str_id = sid;
  83. }
  84. KEY key() const {
  85. return k;
  86. }
  87. unsigned int stream_id() const {
  88. return str_id;
  89. }
  90. KEY getPriority() const {
  91. return k;
  92. }
  93. friend ostream& operator<<(ostream& s, const merge_key<KEY> &x) {
  94. return s << "<str_id=" << x.str_id << "> " << x.k << " ";
  95. }
  96. friend int operator < (const merge_key &x,
  97. const merge_key &y) {
  98. return (x.k < y.k);
  99. }
  100. friend int operator <= (const merge_key &x,
  101. const merge_key &y) {
  102. return (x.k <= y.k);
  103. }
  104. friend int operator > (const merge_key &x,
  105. const merge_key &y) {
  106. return (x.k > y.k);
  107. }
  108. friend int operator >= (const merge_key &x,
  109. const merge_key &y) {
  110. return (x.k >= y.k);
  111. }
  112. friend int operator != (const merge_key &x,
  113. const merge_key &y) {
  114. return (x.k != y.k);
  115. }
  116. friend int operator == (const merge_key &x,
  117. const merge_key &y) {
  118. return (x.k == y.k);
  119. }
  120. friend merge_key operator + (const merge_key &x,
  121. const merge_key &y) {
  122. assert(0);
  123. return x;
  124. // Key sum = x.k + y.k;
  125. // merge_key f(sum, x.str_id);
  126. // return f;
  127. }
  128. };
  129. /*****************************************************************
  130. *****************************************************************
  131. *****************************************************************
  132. external_memory buffer
  133. Each level-i buffer can store up to <arity>^i * <basesize> items,
  134. where tipically <arity> is \theta(m) and <basesize> is \theta(M);
  135. therefore log_m{n/m} buffers are needed to store N items, one
  136. buffer for each level 1..log_m{n/m}. All buffers must have same
  137. values or <arity> and <basesize>.
  138. Functionality:
  139. A level-i on-disk buffer stores <arity>^i * <basesize> items of
  140. data, organized in <arity> streams of <arity>^{i-1} items each;
  141. <basesize> is same for all buffers and equal to the size of the
  142. level 0 buffer (in memory buffer).
  143. Invariant: all the <arity> streams of a level-i buffer are in
  144. sorted order; in this way sorting the buffer is done by merging the
  145. <arity> streams in linear time.
  146. Items are inserted in level i-buffer only a whole stream at a time
  147. (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
  148. the buffer are full, the buffer is sorted and emptied into a stream
  149. of a level (i+1)-buffer.
  150. The <arity> streams of a buffer are allocated contigously from left
  151. to r ight. The unused streams are NULL; The buffer keeps the index of
  152. the last used(non-NULL) stream. When a buffer becomes full and is
  153. empty, all its buffers are set to NULL.
  154. *****************************************************************
  155. *****************************************************************
  156. ***************************************************************** */
  157. /* T is a type with priority of type K and method getPriority() */
  158. template<class T, class Key>
  159. class em_buffer {
  160. private:
  161. //number of streams in a buffer;
  162. unsigned int arity;
  163. //level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
  164. //has a slightly different behaviour so it is implemented as a
  165. //different class <im_buffer>)
  166. unsigned short level;
  167. //level-i buffer contains m streams of data, each of size
  168. //arity^{i-1}*basesize;
  169. AMI_STREAM<T>** data;
  170. //the buffers can be depleted to fill the internal pq;
  171. //keep an array which counts, for each stream, how many elements
  172. //have been deleted (implicitely from the begining of stream)
  173. long* deleted;
  174. //nb of items in each substream; this can be found out by calling
  175. //stream_len() on the stream, but it is more costly esp in the case
  176. //when streams are on disk and must be moved in and out just to find
  177. //stream length; streamsize is set only at stream creation, and the
  178. //actual size must substract the number of iteme deleted from the
  179. //bos
  180. unsigned long* streamsize;
  181. //index of the next available(empty) stream (out of the total m
  182. //streams in the buffer);
  183. unsigned int index;
  184. //nb of items in a stream of level_1 buffer
  185. unsigned long basesize;
  186. public:
  187. //create a level-i buffer of given basesize;
  188. em_buffer(const unsigned short i, const unsigned long bs,
  189. const unsigned int ar);
  190. //copy constructor;
  191. em_buffer(const em_buffer &buf);
  192. //free the stream array and the streams pointers
  193. ~em_buffer();
  194. //return the level of the buffer;
  195. unsigned short get_level() const { return level;}
  196. //return the ith stream (load stream in memory)
  197. AMI_STREAM<T>* get_stream(unsigned int i);
  198. //return a pointer to the streams of the buffer (loads streams in
  199. //memory)
  200. AMI_STREAM<T>** get_streams();
  201. //put the ith stream back to disk
  202. void put_stream(unsigned int i);
  203. //called in pair with get_streams to put all streams back to disk
  204. void put_streams();
  205. //return a pointer to the array of deletion count for each stream
  206. long* get_bos() const { return deleted;}
  207. //return the index of the last stream in buffer which contains data;
  208. unsigned int laststream() const { return index -1;}
  209. //return the index of the next available stream in the buffer
  210. unsigned int nextstream() const { return index;}
  211. //increment the index of the next available stream in the buffer
  212. void incr_nextstream() { ++index;}
  213. //return nb of (non-empty) streams in buffer
  214. unsigned int get_nbstreams() const { return index;}
  215. //return arity
  216. unsigned int get_arity() const { return arity;}
  217. //return total nb of deleted elements in all active streams of the buffer
  218. long total_deleted() const {
  219. long tot = 0;
  220. for (unsigned int i=0; i< index; i++) {
  221. tot += deleted[i];
  222. }
  223. return tot;
  224. }
  225. //mark as deleted one more element from i'th stream
  226. void incr_deleted(unsigned int i) {
  227. assert(i<index);
  228. deleted[i]++;
  229. }
  230. //return the nominal size of a stream (nb of items):
  231. //arity^{level-1}*basesize;
  232. unsigned long get_stream_maxlen() const {
  233. return (unsigned long)pow((double)arity,(double)level-1)*basesize;
  234. }
  235. //return the actual size of stream i; i must be the index of a valid
  236. //stream
  237. unsigned long get_stream_len(unsigned int i) {
  238. //assert(i>= 0 && i<index);
  239. return streamsize[i] - deleted[i];
  240. }
  241. //return the total current size of the buffer; account for the
  242. //deleted elements;
  243. unsigned long get_buf_len() {
  244. unsigned long tot = 0;
  245. for (unsigned int i=0; i< index; i++) {
  246. tot += get_stream_len(i);
  247. }
  248. return tot;
  249. }
  250. //return the total maximal capacity of the buffer
  251. unsigned long get_buf_maxlen() {
  252. return arity * get_stream_maxlen();
  253. }
  254. //return true if buffer is empty (all streams are empty)
  255. bool is_empty() {
  256. return ((nextstream() == 0) || (get_buf_len() == 0));
  257. }
  258. //return true if buffer is full(all streams are full)
  259. bool is_full() const {
  260. return (nextstream() == arity);
  261. }
  262. //reset
  263. void reset();
  264. //clean buffer: in case some streams have been emptied by deletion
  265. //delete them and shift streams left;
  266. void cleanup();
  267. //create and return a stream which contains all elements of all
  268. //streams of the buffer in sorted ascending order of their
  269. //keys(priorities);
  270. AMI_STREAM<T>* sort();
  271. // insert an array into the buffer; can only insert one
  272. // level-i-full-stream-len nb of items at a time; assume the length
  273. // of the array is precisely the streamlen of level-i buffer n =
  274. // (pow(arity,level-1)*basesize); assume array is sorted; return the
  275. // number of items actually inserted
  276. long insert(T* a, long n);
  277. // insert a stream into the buffer; assume the length of the stream
  278. // is precisely the streamlen of level-i buffer n =
  279. // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
  280. // is set to point to the argument stream; (in this way no stream
  281. // copying is done, just one pointer copy). The user should be aware
  282. // the the argument stream is 'lost' - that is a stream cannot be
  283. // inserted repeatedly into many buffers because this would lead to
  284. // several buffers pointing to the same stream.
  285. // stream is assumed sorted; bos = how many elements are deleted
  286. // from the begining of stream;
  287. // return the number of items actually inserted
  288. long insert(AMI_STREAM<T>* str,
  289. //long bos=0);
  290. long bos);
  291. //print range of elements in buffer
  292. void print_range();
  293. //print all elements in buffer
  294. void print();
  295. //prints the sizes of the streams in the buffer
  296. void print_stream_sizes();
  297. //print the elements in the buffer
  298. friend ostream& operator<<(ostream& s, em_buffer &b) {
  299. s << "BUFFER_" << b.level << ": ";
  300. if (b.index ==0) {
  301. s << "[]";
  302. }
  303. s << "\n";
  304. b.get_streams();
  305. for (unsigned int i=0; i < b.index; i++) {
  306. b.print_stream(s, i);
  307. }
  308. b.put_streams();
  309. return s;
  310. }
  311. private:
  312. // merge the input streams; there are <arity> streams in total;
  313. // write output in <outstream>; the input streams are assumed sorted
  314. // in increasing order of their keys;
  315. AMI_err substream_merge(AMI_STREAM<T>** instreams,
  316. unsigned int arity,
  317. AMI_STREAM<T> *outstream);
  318. //print to stream the elements in i'th stream
  319. void print_stream(ostream& s, unsigned int i);
  320. #ifdef SAVE_MEMORY
  321. //array of names of streams;
  322. char** name;
  323. //return the designated name for stream i
  324. char* get_stream_name(unsigned int i) const;
  325. //print all stream names in buffer
  326. void print_stream_names();
  327. //checks that name[i] is the same as stream name; stream i must be in
  328. //memory (by a previous get_stream call, for instance) in order to
  329. //find its length
  330. void check_name(unsigned int i);
  331. #endif
  332. };
  333. /************************************************************/
  334. //create a level-i buffer of given basesize;
  335. template <class T, class Key>
  336. em_buffer<T,Key>::em_buffer(const unsigned short i, const unsigned long bs,
  337. const unsigned int ar) :
  338. arity(ar), level(i), basesize(bs) {
  339. assert((level>=1) && (basesize >=0));
  340. char str[100];
  341. sprintf(str, "em_buffer: allocate %d AMI_STREAM*, total %ld\n",
  342. arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
  343. MEMORY_LOG(str);
  344. //allocate STREAM* array
  345. data = new AMI_STREAM<T>* [arity];
  346. //allocate deleted array
  347. sprintf(str, "em_buffer: allocate deleted array: %ld\n",
  348. (long)(arity*sizeof(long)));
  349. MEMORY_LOG(str);
  350. deleted = new long[arity];
  351. //allocate streamsize array
  352. sprintf(str, "em_buffer: allocate streamsize array: %ld\n",
  353. (long)(arity*sizeof(long)));
  354. MEMORY_LOG(str);
  355. streamsize = new unsigned long[arity];
  356. #ifdef SAVE_MEMORY
  357. //allocate name array
  358. sprintf(str, "em_buffer: allocate name array: %ld\n",
  359. (long)(arity*sizeof(char*)));
  360. MEMORY_LOG(str);
  361. name = new char* [arity];
  362. assert(name);
  363. #endif
  364. //assert data
  365. if ((!data) || (!deleted) || (!streamsize)) {
  366. cerr << "em_buffer: cannot allocate\n";
  367. exit(1);
  368. }
  369. //initialize the <arity> streams to NULL, deleted[], streamsize[]
  370. //and name[]
  371. for (unsigned int i=0; i< arity; i++) {
  372. data[i] = NULL;
  373. deleted[i] = 0;
  374. streamsize[i] = 0;
  375. #ifdef SAVE_MEMORY
  376. name[i] = NULL;
  377. #endif
  378. }
  379. //set index
  380. index = 0;
  381. #ifdef SAVE_MEMORY
  382. //streams_in_memory = false;
  383. #endif
  384. }
  385. /************************************************************/
  386. //copy constructor;
  387. template<class T, class Key>
  388. em_buffer<T,Key>::em_buffer(const em_buffer &buf):
  389. level(buf.level), basesize(buf.basesize),
  390. index(buf.index), arity(buf.arity) {
  391. assert(0);//should not get called
  392. MEMORY_LOG("em_buffer: copy constr start\n");
  393. get_streams();
  394. for (unsigned int i=0; i< index; i++) {
  395. assert(data[i]);
  396. delete data[i]; //delete old stream if existing
  397. data[i] = NULL;
  398. //call copy constructor; i'm not sure that it actually duplicates
  399. //the stream and copies the data; should that in the BTE
  400. //sometimes..
  401. data[i] = new AMI_STREAM<T>(*buf.data[i]);
  402. deleted[i] = buf.deleted[i];
  403. streamsize[i] = buf.streamsize[i];
  404. #ifdef SAVE_MEMORY
  405. assert(name[i]);
  406. delete name[i];
  407. name[i] = NULL;
  408. name[i] = buf.name[i];
  409. #endif
  410. }
  411. put_streams();
  412. MEMORY_LOG("em_buffer: copy constr end\n");
  413. }
  414. /************************************************************/
  415. //free the stream array and the streams pointers
  416. template<class T, class Key>
  417. em_buffer<T,Key>::~em_buffer() {
  418. assert(data);
  419. //delete the m streams in the buffer
  420. get_streams();
  421. for (unsigned int i=0; i<index; i++) {
  422. assert(data[i]);
  423. #ifdef SAVE_MEMORY
  424. check_name(i);
  425. delete name[i];
  426. #endif
  427. delete data[i];
  428. data[i] = NULL;
  429. }
  430. delete [] data;
  431. delete [] deleted;
  432. delete [] streamsize;
  433. #ifdef SAVE_MEMORY
  434. delete [] name;
  435. #endif
  436. }
  437. #ifdef SAVE_MEMORY
  438. /************************************************************/
  439. //checks that name[i] is the same as stream name; stream i must be in
  440. //memory (by a previous get_stream call, for instance) in order to
  441. //find its length
  442. template<class T, class Key>
  443. void em_buffer<T,Key>::check_name(unsigned int i) {
  444. #ifdef EMBUF_CHECK_NAME
  445. assert(i>=0 && i < index);
  446. assert(data[i]);
  447. char* fooname;
  448. data[i]->name(&fooname);//name() allocates the string
  449. #ifdef EMBUF_CHECK_NAME_PRINT
  450. cout << "::check_name: checking stream [" << level << "," << i << "] name:"
  451. << fooname << endl;
  452. cout.flush();
  453. #endif
  454. if (strcmp(name[i], fooname) != 0) {
  455. cerr << "name[" << i << "]=" << name[i]
  456. << ", streamname=" << fooname << endl;
  457. }
  458. assert(strcmp(fooname, name[i]) == 0);
  459. delete fooname;
  460. #endif
  461. }
  462. #endif
  463. /************************************************************/
  464. //if SAVE_MEMORY flag is set, load the stream in memory; return the
  465. //ith stream
  466. template<class T, class Key>
  467. AMI_STREAM<T>* em_buffer<T,Key>::get_stream(unsigned int i) {
  468. assert(i>=0 && i < index);
  469. #ifdef SAVE_MEMORY
  470. MY_LOG_DEBUG_ID("em_buffer::get_stream");
  471. MY_LOG_DEBUG_ID(i);
  472. if (data[i] == NULL) {
  473. //stream is on disk, load it in memory
  474. assert(name[i]);
  475. MY_LOG_DEBUG_ID("load stream in memory");
  476. MY_LOG_DEBUG_ID(name[i]);
  477. #ifdef EMBUF_PRINT_GETPUT_STREAM
  478. cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
  479. cout.flush();
  480. #endif
  481. //assert that file exists
  482. FILE* fp;
  483. if ((fp = fopen(name[i],"rb")) == NULL) {
  484. cerr << "get_stream: checking that stream " << name[i] << "exists\n";
  485. perror(name[i]);
  486. assert(0);
  487. exit(1);
  488. }
  489. fclose(fp);
  490. //create an AMI_STREAM from file
  491. data[i] = new AMI_STREAM<T>(name[i]);
  492. assert(data[i]);
  493. } else {
  494. //if data[i] not NULL, stream must be already in memory
  495. MY_LOG_DEBUG_ID("stream not NULL");
  496. MY_LOG_DEBUG_ID(data[i]->sprint());
  497. }
  498. #endif
  499. //NOW STREAM IS IN MEMORY
  500. //some assertion checks
  501. assert(data[i]);
  502. assert(data[i]->stream_len() == streamsize[i]);
  503. #ifdef SAVE_MEMORY
  504. check_name(i);
  505. #endif
  506. return data[i];
  507. }
  508. /************************************************************/
  509. //if SAVE_MEMORY flag is set, put the i'th stream back on disk
  510. template<class T, class Key>
  511. void em_buffer<T,Key>::put_stream(unsigned int i) {
  512. assert(i>=0 && i < index);
  513. #ifdef SAVE_MEMORY
  514. MY_LOG_DEBUG_ID("em_buffer::put_stream");
  515. MY_LOG_DEBUG_ID(i);
  516. if (data[i] != NULL) {
  517. //stream is in memory, put it on disk
  518. MY_LOG_DEBUG_ID("stream put to disk");
  519. MY_LOG_DEBUG_ID(data[i]->sprint());
  520. check_name(i);
  521. #ifdef EMBUF_PRINT_GETPUT_STREAM
  522. cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
  523. cout.flush();
  524. #endif
  525. //make stream persistent and delete it
  526. data[i]->persist(PERSIST_PERSISTENT);
  527. delete data[i];
  528. data[i] = NULL;
  529. } else {
  530. //data[i] is NULL, so stream must be already put on disk
  531. MY_LOG_DEBUG_ID("stream is NULL");
  532. }
  533. #endif
  534. }
  535. /************************************************************/
  536. //return a pointer to the streams of the buffer
  537. template<class T, class Key>
  538. AMI_STREAM<T>** em_buffer<T,Key>::get_streams() {
  539. #ifdef SAVE_MEMORY
  540. MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
  541. #ifdef EMBUF_PRINT_GETPUT_STREAMS
  542. cout << "em_buffer::get_streams (buffer " << level <<")";
  543. cout << ": index = " << index << "(arity=" << arity << ")\n";
  544. cout.flush();
  545. #endif
  546. for (unsigned int i=0; i<index; i++) {
  547. get_stream(i);
  548. assert(data[i]);
  549. }
  550. #endif
  551. return data;
  552. }
  553. /************************************************************/
  554. //called in pair with load_streams to store streams on disk
  555. //and release the memory
  556. template<class T, class Key>
  557. void em_buffer<T,Key>::put_streams() {
  558. #ifdef SAVE_MEMORY
  559. MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
  560. #ifdef EMBUF_PRINT_GETPUT_STREAMS
  561. cout << "em_buffer::put_streams (buffer " << level <<")";
  562. cout << ": index = " << index << "(arity=" << arity << ")\n";
  563. cout.flush();
  564. #endif
  565. for (unsigned int i=0; i<index; i++) {
  566. put_stream(i);
  567. assert(data[i] == NULL);
  568. }
  569. #endif
  570. }
  571. #ifdef SAVE_MEMORY
  572. /************************************************************/
  573. //return the name of the ith stream
  574. template<class T, class Key>
  575. char* em_buffer<T,Key>::get_stream_name(unsigned int i) const {
  576. assert(i>=0 && i<index);
  577. assert(name[i]);
  578. return name[i];
  579. }
  580. #endif
  581. #ifdef SAVE_MEMORY
  582. /************************************************************/
  583. template<class T, class Key>
  584. void em_buffer<T,Key>::print_stream_names() {
  585. unsigned int i;
  586. for (i=0; i<index; i++) {
  587. assert(name[i]);
  588. cout << "stream " << i << ": " << name[i] << endl;
  589. }
  590. cout.flush();
  591. }
  592. #endif
  593. /************************************************************/
  594. //clean buffer in case some streams have been emptied by deletion
  595. template<class T, class Key>
  596. void em_buffer<T,Key>::cleanup() {
  597. MY_LOG_DEBUG_ID("em_buffer::cleanup()");
  598. #ifdef EMBUF_CLEANUP_PRINT
  599. #ifdef SAVE_MEMORY
  600. if (index>0) {
  601. cout << "before cleanup:\n";
  602. print_stream_names();
  603. print_stream_sizes();
  604. cout.flush();
  605. }
  606. #endif
  607. #endif
  608. //load all streams in memory
  609. get_streams();
  610. //count streams of size=0
  611. unsigned int i, empty=0;
  612. for (i=0; i<index; i++) {
  613. if (get_stream_len(i) == 0) {
  614. //printing..
  615. #ifdef EMBUF_DELETE_STREAM_PRINT
  616. cout<<"deleting stream [" << level << "," << i <<"]:" ;
  617. #ifdef SAVE_MEMORY
  618. cout << name[i];
  619. #endif
  620. cout << endl;
  621. cout.flush();
  622. #endif
  623. #ifdef SAVE_MEMORY
  624. //stream is empty ==> delete its name
  625. assert(name[i]);
  626. delete name[i];
  627. name[i] = NULL;
  628. #endif
  629. //stream is empty ==> reset data
  630. assert(data[i]);
  631. //data[i]->persist(PERSIST_DELETE); //this is done automatically..
  632. delete data[i];
  633. data[i] = NULL;
  634. deleted[i] = 0;
  635. streamsize[i] = 0;
  636. empty++;
  637. }
  638. }
  639. //streams are in memory; all streams which are NULL must have been
  640. //deleted
  641. //shift streams to the left in case holes were introduced
  642. unsigned int j=0;
  643. if (empty) {
  644. #ifdef EMBUF_DELETE_STREAM_PRINT
  645. cout << "em_buffer::cleanup: shifting streams\n"; cout.flush();
  646. #endif
  647. for (i=0; i<index; i++) {
  648. //if i'th stream is not empty, shift it left if necessary
  649. if (data[i]) {
  650. if (i!=j) {
  651. //set j'th stream to point to i'th stream
  652. //cout << j << " set to " << i << endl; cout.flush();
  653. data[j] = data[i];
  654. deleted[j] = deleted[i];
  655. streamsize[j] = streamsize[i];
  656. //set i'th stream to point to NULL
  657. data[i] = NULL;
  658. deleted[i] = 0;
  659. streamsize[i] = 0;
  660. #ifdef SAVE_MEMORY
  661. //fix the names
  662. /* already done assert(name[j]); */
  663. /* delete name[j]; */
  664. name[j] = name[i];
  665. name[i] = NULL;
  666. check_name(j);
  667. #endif
  668. } else {
  669. //cout << i << " left the same" << endl;
  670. }
  671. j++;
  672. } //if data[i] != NULL
  673. }//for i
  674. //set the index
  675. assert(index == j + empty);
  676. index = j;
  677. #ifdef EMBUF_DELETE_STREAM_PRINT
  678. cout << "em_buffer::cleanup: index set to " << index << endl;
  679. cout.flush();
  680. #endif
  681. } //if empty
  682. //put streams back to disk
  683. put_streams();
  684. #ifdef EMBUF_CLEANUP_PRINT
  685. #ifdef SAVE_MEMORY
  686. if (index >0) {
  687. cout << "after cleanup:\n";
  688. print_stream_names();
  689. print_stream_sizes();
  690. cout.flush();
  691. }
  692. #endif
  693. #endif
  694. }
  695. /************************************************************/
  696. //delete all streams
  697. template<class T, class Key>
  698. void em_buffer<T,Key>::reset() {
  699. get_streams();
  700. //make streams not-persistent and delete them
  701. for (unsigned int i=0; i<index; i++) {
  702. assert(data[i]);
  703. assert(streamsize[i] == data[i]->stream_len());
  704. #ifdef SAVE_MEMORY
  705. check_name(i);
  706. assert(name[i]);
  707. delete name[i];
  708. name[i] = NULL;
  709. #endif
  710. data[i]->persist(PERSIST_DELETE);
  711. delete data[i];
  712. data[i] = NULL;
  713. deleted[i] = 0;
  714. streamsize[i] = 0;
  715. }
  716. index = 0;
  717. }
  718. /************************************************************/
  719. //create and return a stream which contains all elements of
  720. //all streams of the buffer in sorted ascending order of
  721. //their keys (priorities);
  722. template<class T, class Key>
  723. AMI_STREAM<T>*
  724. em_buffer<T,Key>::sort() {
  725. //create stream
  726. MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
  727. AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>(); /* will be deleteed in insert() */
  728. assert(sorted_stream);
  729. //merge the streams into sorted stream
  730. AMI_err aerr;
  731. //Key dummykey;
  732. //must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
  733. // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
  734. // 0, dummykey);
  735. //could not use AMI_merge so i had to write my own..
  736. get_streams();
  737. aerr = substream_merge(data, arity, sorted_stream);
  738. assert(aerr == AMI_ERROR_NO_ERROR);
  739. put_streams();
  740. return sorted_stream;
  741. }
  742. /************************************************************/
  743. /* merge the input streams; there are <arity> streams in total; write
  744. output in <outstream>;
  745. the input streams are assumed sorted in increasing order of their
  746. keys;
  747. assumes the instreams are in memory (no need for get_streams()) */
  748. template<class T, class Key>
  749. AMI_err em_buffer<T,Key>::substream_merge(AMI_STREAM<T>** instreams,
  750. unsigned int arity,
  751. AMI_STREAM<T> *outstream) {
  752. unsigned int i, j;
  753. //some assertion checks
  754. assert(instreams);
  755. assert(outstream);
  756. for (i = 0; i < arity ; i++ ) {
  757. assert(instreams[i]);
  758. #ifdef SAVE_MEMORY
  759. check_name(i);
  760. #endif
  761. }
  762. T* in_objects[arity]; //pointers to current leading elements of streams
  763. AMI_err ami_err;
  764. char str[200];
  765. sprintf(str, "em_buffer::substream_merge: allocate keys array, total %ldB\n",
  766. (long)((long)arity * sizeof(merge_key<Key>)));
  767. MEMORY_LOG(str);
  768. //keys array is initialized with smallest key from each stream (only
  769. //non-null keys must be included)
  770. merge_key<Key>* keys;
  771. //merge_key<Key>* keys = new (merge_key<Key>)[arity];
  772. typedef merge_key<Key> footype;
  773. keys = new footype[arity];
  774. assert(keys);
  775. //count number of non-empty streams
  776. j = 0;
  777. //rewind and read the first item from every stream initializing
  778. //in_objects and keys
  779. for (i = 0; i < arity ; i++ ) {
  780. assert(instreams[i]);
  781. //rewind stream
  782. if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
  783. return ami_err;
  784. }
  785. //read first item from stream
  786. if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
  787. AMI_ERROR_NO_ERROR) {
  788. if (ami_err == AMI_ERROR_END_OF_STREAM) {
  789. in_objects[i] = NULL;
  790. } else {
  791. return ami_err;
  792. }
  793. } else {
  794. //include this key in the array of keys
  795. Key k = in_objects[i]->getPriority();
  796. keys[j].set(k, i);
  797. j++;
  798. }
  799. }
  800. unsigned int NonEmptyRuns = j;
  801. //build heap from the array of keys
  802. pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
  803. //repeatedly extract_min from heap, write it to output stream and
  804. //insert next element from same stream
  805. merge_key<Key> minelt;
  806. //rewind output buffer
  807. ami_err = outstream->seek(0);
  808. assert(ami_err == AMI_ERROR_NO_ERROR);
  809. while (!mergeheap.empty()) {
  810. //find min key and id of the stream from whereit comes
  811. mergeheap.min(minelt);
  812. i = minelt.stream_id();
  813. //write min item to output stream
  814. if ((ami_err = outstream->write_item(*in_objects[i]))
  815. != AMI_ERROR_NO_ERROR) {
  816. return ami_err;
  817. }
  818. //read next item from same input stream
  819. if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
  820. != AMI_ERROR_NO_ERROR) {
  821. if (ami_err != AMI_ERROR_END_OF_STREAM) {
  822. return ami_err;
  823. }
  824. }
  825. //extract the min from the heap and insert next key from same stream
  826. if (ami_err == AMI_ERROR_END_OF_STREAM) {
  827. mergeheap.delete_min();
  828. } else {
  829. Key k = in_objects[i]->getPriority();
  830. merge_key<Key> nextit(k, i);
  831. mergeheap.delete_min_and_insert(nextit);
  832. }
  833. } //while
  834. //delete [] keys;
  835. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  836. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
  837. //I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
  838. //AND EVERYTHING SCREWS UP..
  839. return AMI_ERROR_NO_ERROR;
  840. }
  841. /************************************************************/
  842. // insert an array into the buffer; assume array is sorted; return the
  843. // number of items actually inserted; if SAVE_MEMORY FLAG is on, put
  844. // stream on disk and release its memory
  845. template<class T, class Key>
  846. long em_buffer<T,Key>::insert(T* a, long n) {
  847. assert(a);
  848. if (is_full()) {
  849. cout << "em_buffer::insert: buffer full\n";
  850. return 0;
  851. }
  852. //can only insert one full stream at a time
  853. //relaxed..
  854. //assert(n == get_stream_maxlen());
  855. //create the stream
  856. MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
  857. AMI_STREAM<T>* str = new AMI_STREAM<T>();
  858. assert(str);
  859. //write the array to stream
  860. AMI_err ae;
  861. for (long i=0; i< n; i++) {
  862. ae = str->write_item(a[i]);
  863. assert(ae == AMI_ERROR_NO_ERROR);
  864. }
  865. assert(n == str->stream_len());
  866. //insert the stream in the buffer
  867. return insert(str);
  868. }
  869. /************************************************************/
  870. /* insert a stream into the buffer; the next free entry in the buffer
  871. is set to point to the stream; if SAVE_MEMORY flag is on, the
  872. stream is put to disk;
  873. the <nextstream> pointer of buffer is set to point to the argument
  874. stream; (in this way no stream copying is done, just one pointer
  875. copy). The user should be aware the the argument stream is 'lost' -
  876. that is a stream cannot be inserted repeatedly into many buffers
  877. because this would lead to several buffers pointing to the same
  878. stream.
  879. stream is assume stream is sorted; bos = how many elements must be
  880. skipped (were deleted) from the begining fo stream;
  881. return the number of items actually inserted */
  882. template<class T, class Key>
  883. long em_buffer<T,Key>::insert(AMI_STREAM<T>* str, long bos=0) {
  884. assert(str);
  885. if (is_full()) {
  886. cout << "em_buffer::insert: buffer full\n";
  887. return 0;
  888. }
  889. //can only insert one level-i-full-stream at a time;
  890. //relaxed..can specify bos;
  891. //not only that, but the length of the stream can be smaller
  892. //than nominal length, because a stream is normally obtained by
  893. //merging streams which can be shorter;
  894. //assert(str->stream_len() == get_stream_len() - bos);
  895. #ifdef EMBUF_CHECK_INSERT
  896. //check that stream is sorted
  897. cout << "CHECK_INSERT: checking stream is sorted\n";
  898. AMI_err ae;
  899. str->seek(0);
  900. T *crt=NULL, *prev=NULL;
  901. while (str->read_item(&crt)) {
  902. assert(ae == AMI_ERROR_NO_ERROR);
  903. if (prev) assert(*prev <= *crt);
  904. }
  905. #endif
  906. //nextstream must be empty
  907. assert(str);
  908. assert(data[nextstream()] == NULL);
  909. assert(deleted[nextstream()] == 0);
  910. assert(streamsize[nextstream()] == 0);
  911. #ifdef SAVE_MEMORY
  912. assert(name[nextstream()] == NULL);
  913. #endif
  914. //set next entry i the buffer to point to this stream
  915. data[nextstream()] = str;
  916. deleted[nextstream()] = bos;
  917. streamsize[nextstream()] = str->stream_len();
  918. #ifdef SAVE_MEMORY
  919. //set next name entry in buffer to point to this stream's name
  920. char* s;
  921. str->name(&s); //name() allocates the string
  922. name[nextstream()] = s;
  923. //put stream on disk and release its memory
  924. str->persist(PERSIST_PERSISTENT);
  925. delete str; //stream should be persistent; just delete it
  926. data[nextstream()] = NULL;
  927. #ifdef EMBUF_PRINT_INSERT
  928. cout << "insert stream " << s << " at buf [" << level
  929. << "," << nextstream() << "]" << endl;
  930. #endif
  931. #endif
  932. //increment the index of next available stream in buffer
  933. incr_nextstream();
  934. #ifdef EMBUF_PRINT_INSERT
  935. print_stream_sizes();
  936. print_stream_names();
  937. #endif
  938. #ifdef SAVE_MEMORY
  939. MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
  940. MY_LOG_DEBUG_ID(name[nextstream()-1]);
  941. #endif
  942. //return nb of items inserted
  943. return get_stream_len(nextstream()-1);
  944. }
  945. /************************************************************/
  946. //print the elements of the i'th stream of the buffer to a stream;
  947. //assumes stream is in memory;
  948. template<class T, class Key>
  949. void em_buffer<T,Key>::print_stream(ostream& s, unsigned int i) {
  950. assert(data[i]);
  951. assert((i>=0) && (i<index));
  952. AMI_err ae;
  953. T* x;
  954. s << "STREAM " << i << ": [";
  955. ae = data[i]->seek(deleted[i]);
  956. assert(ae == AMI_ERROR_NO_ERROR);
  957. for (long j = 0; j < get_stream_len(i); j++) {
  958. ae = data[i]->read_item(&x);
  959. assert(ae == AMI_ERROR_NO_ERROR);
  960. s << *x << ",";
  961. }
  962. s << "]\n";
  963. }
  964. /************************************************************/
  965. //print elements range in buffer (read first and last element in each
  966. //substream and find global min and max)
  967. template<class T, class Key>
  968. void em_buffer<T,Key>::print_range() {
  969. T *min, *max;
  970. AMI_err ae;
  971. get_streams();
  972. for (unsigned int i=0; i< index; i++) {
  973. cout << "[";
  974. //read min element in substream i
  975. ae = data[i]->seek(deleted[i]);
  976. assert(ae == AMI_ERROR_NO_ERROR);
  977. ae = data[i]->read_item(&min);
  978. assert(ae == AMI_ERROR_NO_ERROR);
  979. cout << min->getPriority() << "..";
  980. //read max element in substream i
  981. ae = data[i]->seek(streamsize[i] - 1);
  982. assert(ae == AMI_ERROR_NO_ERROR);
  983. ae = data[i]->read_item(&max);
  984. assert(ae == AMI_ERROR_NO_ERROR);
  985. cout << max->getPriority()
  986. << " (sz=" << get_stream_len(i) << ")] ";
  987. }
  988. for (unsigned int i=index; i< arity; i++) {
  989. cout << "[] ";
  990. }
  991. put_streams();
  992. }
  993. /************************************************************/
  994. //print all elements in buffer
  995. template<class T, class Key>
  996. void em_buffer<T,Key>::print() {
  997. T *x;
  998. AMI_err ae;
  999. get_streams();
  1000. for (unsigned int i=0; i<index; i++) {
  1001. cout << " [";
  1002. ae = data[i]->seek(deleted[i]);
  1003. assert(ae == AMI_ERROR_NO_ERROR);
  1004. for (unsigned long j=0; j<get_stream_len(i); j++) {
  1005. ae = data[i]->read_item(&x);
  1006. assert(ae == AMI_ERROR_NO_ERROR);
  1007. cout << x->getPriority() << ",";
  1008. }
  1009. cout << "]" << endl;
  1010. }
  1011. for (unsigned int i=index; i< arity; i++) {
  1012. cout << "[] ";
  1013. }
  1014. put_streams();
  1015. }
  1016. /************************************************************/
  1017. //print the sizes of the substreams in the buffer
  1018. template<class T, class Key>
  1019. void em_buffer<T,Key>::print_stream_sizes() {
  1020. cout << "(streams=" << index << ") sizes=[";
  1021. for (unsigned int i=0; i< arity; i++) {
  1022. cout << get_stream_len(i) << ",";
  1023. }
  1024. cout << "]" << endl;
  1025. cout.flush();
  1026. }
  1027. #endif