embuffer.h 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef __EMBUFFER_H
  31. #define __EMBUFFER_H
  32. #include <stdio.h>
  33. #include <assert.h>
  34. #include <stdlib.h>
  35. #include <math.h>
  36. #include "ami_config.h" //for SAVE_MEMORY
  37. #include "ami_stream.h"
  38. #include "mm.h"
  39. #include "mm_utils.h"
  40. #include "pqheap.h"
  41. #define MY_LOG_DEBUG_ID(x) //inhibit debug printing
  42. //#define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
  43. /**********************************************************
  44. DEBUGGING FLAGS
  45. ***********************************************************/
  46. //setting this enables checking that the streams/arrays inserted in
  47. //buffers are sorted in increasing order
  48. //#define EMBUF_CHECK_INSERT
  49. //enable checking that stream name is the same as the one stored in
  50. //the buffer name[]
  51. //#define EMBUF_CHECK_NAME
  52. //enable printing names as they are checked
  53. //#define EMBUF_CHECK_NAME_PRINT
  54. //enable printing when streams in a buffer are shifted left to
  55. //check that names are shifted accordingly
  56. //#define EMBUF_DELETE_STREAM_PRINT
  57. //enable printing the name of the stream which is inserted in buff
  58. //#define EMBUF_PRINT_INSERT
  59. //enable printing the stream names/sizes in cleanup()
  60. //#define EMBUF_CLEANUP_PRINT
  61. //enable printing when get/put_stream is called (for each stream)
  62. //#define EMBUF_PRINT_GETPUT_STREAM
  63. //enable printing when get/put_streams is called
  64. //#define EMBUF_PRINT_GETPUT_STREAMS
  65. /***********************************************************/
  66. /*****************************************************************/
  67. /* encapsulation of the key together with stream_id; used during
  68. stream merging to save space;
  69. */
  70. template<class KEY>
  71. class merge_key {
  72. public:
  73. KEY k;
  74. unsigned int str_id; //id of the stream where key comes from
  75. public:
  76. merge_key(): str_id(0) {}
  77. merge_key(const KEY &x, const unsigned int sid):
  78. k(x), str_id(sid) {}
  79. ~merge_key() {}
  80. void set(const KEY &x, const unsigned int sid) {
  81. k = x;
  82. str_id = sid;
  83. }
  84. KEY key() const {
  85. return k;
  86. }
  87. unsigned int stream_id() const {
  88. return str_id;
  89. }
  90. KEY getPriority() const {
  91. return k;
  92. }
  93. friend ostream& operator<<(ostream& s, const merge_key<KEY> &x) {
  94. return s << "<str_id=" << x.str_id << "> " << x.k << " ";
  95. }
  96. friend int operator < (const merge_key &x,
  97. const merge_key &y) {
  98. return (x.k < y.k);
  99. }
  100. friend int operator <= (const merge_key &x,
  101. const merge_key &y) {
  102. return (x.k <= y.k);
  103. }
  104. friend int operator > (const merge_key &x,
  105. const merge_key &y) {
  106. return (x.k > y.k);
  107. }
  108. friend int operator >= (const merge_key &x,
  109. const merge_key &y) {
  110. return (x.k >= y.k);
  111. }
  112. friend int operator != (const merge_key &x,
  113. const merge_key &y) {
  114. return (x.k != y.k);
  115. }
  116. friend int operator == (const merge_key &x,
  117. const merge_key &y) {
  118. return (x.k == y.k);
  119. }
  120. friend merge_key operator + (const merge_key &x,
  121. const merge_key &y) {
  122. assert(0);
  123. return x;
  124. // Key sum = x.k + y.k;
  125. // merge_key f(sum, x.str_id);
  126. // return f;
  127. }
  128. };
  129. /*****************************************************************
  130. *****************************************************************
  131. *****************************************************************
  132. external_memory buffer
  133. Each level-i buffer can store up to <arity>^i * <basesize> items,
  134. where tipically <arity> is \theta(m) and <basesize> is \theta(M);
  135. therefore log_m{n/m} buffers are needed to store N items, one
  136. buffer for each level 1..log_m{n/m}. All buffers must have same
  137. values or <arity> and <basesize>.
  138. Functionality:
  139. A level-i on-disk buffer stores <arity>^i * <basesize> items of
  140. data, organized in <arity> streams of <arity>^{i-1} items each;
  141. <basesize> is same for all buffers and equal to the size of the
  142. level 0 buffer (in memory buffer).
  143. Invariant: all the <arity> streams of a level-i buffer are in
  144. sorted order; in this way sorting the buffer is done by merging the
  145. <arity> streams in linear time.
  146. Items are inserted in level i-buffer only a whole stream at a time
  147. (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
  148. the buffer are full, the buffer is sorted and emptied into a stream
  149. of a level (i+1)-buffer.
  150. The <arity> streams of a buffer are allocated contigously from left
  151. to r ight. The unused streams are NULL; The buffer keeps the index of
  152. the last used(non-NULL) stream. When a buffer becomes full and is
  153. empty, all its buffers are set to NULL.
  154. *****************************************************************
  155. *****************************************************************
  156. ***************************************************************** */
  157. /* T is a type with priority of type K and method getPriority() */
  158. template<class T, class Key>
  159. class em_buffer {
  160. private:
  161. //number of streams in a buffer;
  162. unsigned int arity;
  163. //level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
  164. //has a slightly different behaviour so it is implemented as a
  165. //different class <im_buffer>)
  166. unsigned short level;
  167. //level-i buffer contains m streams of data, each of size
  168. //arity^{i-1}*basesize;
  169. AMI_STREAM<T>** data;
  170. //the buffers can be depleted to fill the internal pq;
  171. //keep an array which counts, for each stream, how many elements
  172. //have been deleted (implicitely from the beginning of stream)
  173. long* deleted;
  174. //nb of items in each substream; this can be found out by calling
  175. //stream_len() on the stream, but it is more costly esp in the case
  176. //when streams are on disk and must be moved in and out just to find
  177. //stream length; streamsize is set only at stream creation, and the
  178. //actual size must subtract the number of iteme deleted from the
  179. //bos
  180. unsigned long* streamsize;
  181. //index of the next available(empty) stream (out of the total m
  182. //streams in the buffer);
  183. unsigned int index;
  184. //nb of items in a stream of level_1 buffer
  185. unsigned long basesize;
  186. public:
  187. //create a level-i buffer of given basesize;
  188. em_buffer(const unsigned short i, const unsigned long bs,
  189. const unsigned int ar);
  190. //copy constructor;
  191. em_buffer(const em_buffer &buf);
  192. //free the stream array and the streams pointers
  193. ~em_buffer();
  194. //return the level of the buffer;
  195. unsigned short get_level() const { return level;}
  196. //return the ith stream (load stream in memory)
  197. AMI_STREAM<T>* get_stream(unsigned int i);
  198. //return a pointer to the streams of the buffer (loads streams in
  199. //memory)
  200. AMI_STREAM<T>** get_streams();
  201. //put the ith stream back to disk
  202. void put_stream(unsigned int i);
  203. //called in pair with get_streams to put all streams back to disk
  204. void put_streams();
  205. //return a pointer to the array of deletion count for each stream
  206. long* get_bos() const { return deleted;}
  207. //return the index of the last stream in buffer which contains data;
  208. unsigned int laststream() const { return index -1;}
  209. //return the index of the next available stream in the buffer
  210. unsigned int nextstream() const { return index;}
  211. //increment the index of the next available stream in the buffer
  212. void incr_nextstream() { ++index;}
  213. //return nb of (non-empty) streams in buffer
  214. unsigned int get_nbstreams() const { return index;}
  215. //return arity
  216. unsigned int get_arity() const { return arity;}
  217. //return total nb of deleted elements in all active streams of the buffer
  218. long total_deleted() const {
  219. long tot = 0;
  220. for (unsigned int i=0; i< index; i++) {
  221. tot += deleted[i];
  222. }
  223. return tot;
  224. }
  225. //mark as deleted one more element from i'th stream
  226. void incr_deleted(unsigned int i) {
  227. assert(i<index);
  228. deleted[i]++;
  229. }
  230. //return the nominal size of a stream (nb of items):
  231. //arity^{level-1}*basesize;
  232. unsigned long get_stream_maxlen() const {
  233. return (unsigned long)pow((double)arity,(double)level-1)*basesize;
  234. }
  235. //return the actual size of stream i; i must be the index of a valid
  236. //stream
  237. unsigned long get_stream_len(unsigned int i) {
  238. //assert(i>= 0 && i<index);
  239. return streamsize[i] - deleted[i];
  240. }
  241. //return the total current size of the buffer; account for the
  242. //deleted elements;
  243. unsigned long get_buf_len() {
  244. unsigned long tot = 0;
  245. for (unsigned int i=0; i< index; i++) {
  246. tot += get_stream_len(i);
  247. }
  248. return tot;
  249. }
  250. //return the total maximal capacity of the buffer
  251. unsigned long get_buf_maxlen() {
  252. return arity * get_stream_maxlen();
  253. }
  254. //return true if buffer is empty (all streams are empty)
  255. bool is_empty() {
  256. return ((nextstream() == 0) || (get_buf_len() == 0));
  257. }
  258. //return true if buffer is full(all streams are full)
  259. bool is_full() const {
  260. return (nextstream() == arity);
  261. }
  262. //reset
  263. void reset();
  264. //clean buffer: in case some streams have been emptied by deletion
  265. //delete them and shift streams left;
  266. void cleanup();
  267. //create and return a stream which contains all elements of all
  268. //streams of the buffer in sorted ascending order of their
  269. //keys(priorities);
  270. AMI_STREAM<T>* sort();
  271. // insert an array into the buffer; can only insert one
  272. // level-i-full-stream-len nb of items at a time; assume the length
  273. // of the array is precisely the streamlen of level-i buffer n =
  274. // (pow(arity,level-1)*basesize); assume array is sorted; return the
  275. // number of items actually inserted
  276. long insert(T* a, long n);
  277. // insert a stream into the buffer; assume the length of the stream
  278. // is precisely the streamlen of level-i buffer n =
  279. // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
  280. // is set to point to the argument stream; (in this way no stream
  281. // copying is done, just one pointer copy). The user should be aware
  282. // the the argument stream is 'lost' - that is a stream cannot be
  283. // inserted repeatedly into many buffers because this would lead to
  284. // several buffers pointing to the same stream.
  285. // stream is assumed sorted; bos = how many elements are deleted
  286. // from the beginning of stream;
  287. // return the number of items actually inserted
  288. long insert(AMI_STREAM<T>* str,
  289. long bos=0);
  290. //print range of elements in buffer
  291. void print_range();
  292. //print all elements in buffer
  293. void print();
  294. //prints the sizes of the streams in the buffer
  295. void print_stream_sizes();
  296. //print the elements in the buffer
  297. friend ostream& operator<<(ostream& s, em_buffer &b) {
  298. s << "BUFFER_" << b.level << ": ";
  299. if (b.index ==0) {
  300. s << "[]";
  301. }
  302. s << "\n";
  303. b.get_streams();
  304. for (unsigned int i=0; i < b.index; i++) {
  305. b.print_stream(s, i);
  306. }
  307. b.put_streams();
  308. return s;
  309. }
  310. private:
  311. // merge the input streams; there are <arity> streams in total;
  312. // write output in <outstream>; the input streams are assumed sorted
  313. // in increasing order of their keys;
  314. AMI_err substream_merge(AMI_STREAM<T>** instreams,
  315. unsigned int arity,
  316. AMI_STREAM<T> *outstream);
  317. //print to stream the elements in i'th stream
  318. void print_stream(ostream& s, unsigned int i);
  319. #ifdef SAVE_MEMORY
  320. //array of names of streams;
  321. char** name;
  322. //return the designated name for stream i
  323. char* get_stream_name(unsigned int i) const;
  324. //print all stream names in buffer
  325. void print_stream_names();
  326. //checks that name[i] is the same as stream name; stream i must be in
  327. //memory (by a previous get_stream call, for instance) in order to
  328. //find its length
  329. void check_name(unsigned int i);
  330. #endif
  331. };
  332. /************************************************************/
  333. //create a level-i buffer of given basesize;
  334. template <class T, class Key>
  335. em_buffer<T,Key>::em_buffer(const unsigned short i, const unsigned long bs,
  336. const unsigned int ar) :
  337. arity(ar), level(i), basesize(bs) {
  338. assert((level>=1) && (basesize >=0));
  339. char str[100];
  340. sprintf(str, "em_buffer: allocate %d AMI_STREAM*, total %ld\n",
  341. arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
  342. MEMORY_LOG(str);
  343. //allocate STREAM* array
  344. data = new AMI_STREAM<T>* [arity];
  345. //allocate deleted array
  346. sprintf(str, "em_buffer: allocate deleted array: %ld\n",
  347. (long)(arity*sizeof(long)));
  348. MEMORY_LOG(str);
  349. deleted = new long[arity];
  350. //allocate streamsize array
  351. sprintf(str, "em_buffer: allocate streamsize array: %ld\n",
  352. (long)(arity*sizeof(long)));
  353. MEMORY_LOG(str);
  354. streamsize = new unsigned long[arity];
  355. #ifdef SAVE_MEMORY
  356. //allocate name array
  357. sprintf(str, "em_buffer: allocate name array: %ld\n",
  358. (long)(arity*sizeof(char*)));
  359. MEMORY_LOG(str);
  360. name = new char* [arity];
  361. assert(name);
  362. #endif
  363. //assert data
  364. if ((!data) || (!deleted) || (!streamsize)) {
  365. cerr << "em_buffer: cannot allocate\n";
  366. exit(1);
  367. }
  368. //initialize the <arity> streams to NULL, deleted[], streamsize[]
  369. //and name[]
  370. for (unsigned int ui=0; ui< arity; ui++) {
  371. data[ui] = NULL;
  372. deleted[ui] = 0;
  373. streamsize[ui] = 0;
  374. #ifdef SAVE_MEMORY
  375. name[ui] = NULL;
  376. #endif
  377. }
  378. //set index
  379. index = 0;
  380. #ifdef SAVE_MEMORY
  381. //streams_in_memory = false;
  382. #endif
  383. }
  384. /************************************************************/
  385. //copy constructor;
  386. template<class T, class Key>
  387. em_buffer<T,Key>::em_buffer(const em_buffer &buf):
  388. level(buf.level), basesize(buf.basesize),
  389. index(buf.index), arity(buf.arity) {
  390. assert(0);//should not get called
  391. MEMORY_LOG("em_buffer: copy constr start\n");
  392. get_streams();
  393. for (unsigned int i=0; i< index; i++) {
  394. assert(data[i]);
  395. delete data[i]; //delete old stream if existing
  396. data[i] = NULL;
  397. //call copy constructor; i'm not sure that it actually duplicates
  398. //the stream and copies the data; should that in the BTE
  399. //sometimes..
  400. data[i] = new AMI_STREAM<T>(*buf.data[i]);
  401. deleted[i] = buf.deleted[i];
  402. streamsize[i] = buf.streamsize[i];
  403. #ifdef SAVE_MEMORY
  404. assert(name[i]);
  405. delete name[i];
  406. name[i] = NULL;
  407. name[i] = buf.name[i];
  408. #endif
  409. }
  410. put_streams();
  411. MEMORY_LOG("em_buffer: copy constr end\n");
  412. }
  413. /************************************************************/
  414. //free the stream array and the streams pointers
  415. template<class T, class Key>
  416. em_buffer<T,Key>::~em_buffer() {
  417. assert(data);
  418. //delete the m streams in the buffer
  419. get_streams();
  420. for (unsigned int i=0; i<index; i++) {
  421. assert(data[i]);
  422. #ifdef SAVE_MEMORY
  423. check_name(i);
  424. delete name[i];
  425. #endif
  426. delete data[i];
  427. data[i] = NULL;
  428. }
  429. delete [] data;
  430. delete [] deleted;
  431. delete [] streamsize;
  432. #ifdef SAVE_MEMORY
  433. delete [] name;
  434. #endif
  435. }
  436. #ifdef SAVE_MEMORY
  437. /************************************************************/
  438. //checks that name[i] is the same as stream name; stream i must be in
  439. //memory (by a previous get_stream call, for instance) in order to
  440. //find its length
  441. template<class T, class Key>
  442. void em_buffer<T,Key>::check_name(unsigned int i) {
  443. #ifdef EMBUF_CHECK_NAME
  444. assert(i>=0 && i < index);
  445. assert(data[i]);
  446. char* fooname;
  447. data[i]->name(&fooname);//name() allocates the string
  448. #ifdef EMBUF_CHECK_NAME_PRINT
  449. cout << "::check_name: checking stream [" << level << "," << i << "] name:"
  450. << fooname << endl;
  451. cout.flush();
  452. #endif
  453. if (strcmp(name[i], fooname) != 0) {
  454. cerr << "name[" << i << "]=" << name[i]
  455. << ", streamname=" << fooname << endl;
  456. }
  457. assert(strcmp(fooname, name[i]) == 0);
  458. delete fooname;
  459. #endif
  460. }
  461. #endif
  462. /************************************************************/
  463. //if SAVE_MEMORY flag is set, load the stream in memory; return the
  464. //ith stream
  465. template<class T, class Key>
  466. AMI_STREAM<T>* em_buffer<T,Key>::get_stream(unsigned int i) {
  467. assert(i>=0 && i < index);
  468. #ifdef SAVE_MEMORY
  469. MY_LOG_DEBUG_ID("em_buffer::get_stream");
  470. MY_LOG_DEBUG_ID(i);
  471. if (data[i] == NULL) {
  472. //stream is on disk, load it in memory
  473. assert(name[i]);
  474. MY_LOG_DEBUG_ID("load stream in memory");
  475. MY_LOG_DEBUG_ID(name[i]);
  476. #ifdef EMBUF_PRINT_GETPUT_STREAM
  477. cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
  478. cout.flush();
  479. #endif
  480. //assert that file exists
  481. FILE* fp;
  482. if ((fp = fopen(name[i],"rb")) == NULL) {
  483. cerr << "get_stream: checking that stream " << name[i] << "exists\n";
  484. perror(name[i]);
  485. assert(0);
  486. exit(1);
  487. }
  488. fclose(fp);
  489. //create an AMI_STREAM from file
  490. data[i] = new AMI_STREAM<T>(name[i]);
  491. assert(data[i]);
  492. } else {
  493. //if data[i] not NULL, stream must be already in memory
  494. MY_LOG_DEBUG_ID("stream not NULL");
  495. MY_LOG_DEBUG_ID(data[i]->sprint());
  496. }
  497. #endif
  498. //NOW STREAM IS IN MEMORY
  499. //some assertion checks
  500. assert(data[i]);
  501. assert(data[i]->stream_len() == streamsize[i]);
  502. #ifdef SAVE_MEMORY
  503. check_name(i);
  504. #endif
  505. return data[i];
  506. }
  507. /************************************************************/
  508. //if SAVE_MEMORY flag is set, put the i'th stream back on disk
  509. template<class T, class Key>
  510. void em_buffer<T,Key>::put_stream(unsigned int i) {
  511. assert(i>=0 && i < index);
  512. #ifdef SAVE_MEMORY
  513. MY_LOG_DEBUG_ID("em_buffer::put_stream");
  514. MY_LOG_DEBUG_ID(i);
  515. if (data[i] != NULL) {
  516. //stream is in memory, put it on disk
  517. MY_LOG_DEBUG_ID("stream put to disk");
  518. MY_LOG_DEBUG_ID(data[i]->sprint());
  519. check_name(i);
  520. #ifdef EMBUF_PRINT_GETPUT_STREAM
  521. cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
  522. cout.flush();
  523. #endif
  524. //make stream persistent and delete it
  525. data[i]->persist(PERSIST_PERSISTENT);
  526. delete data[i];
  527. data[i] = NULL;
  528. } else {
  529. //data[i] is NULL, so stream must be already put on disk
  530. MY_LOG_DEBUG_ID("stream is NULL");
  531. }
  532. #endif
  533. }
  534. /************************************************************/
  535. //return a pointer to the streams of the buffer
  536. template<class T, class Key>
  537. AMI_STREAM<T>** em_buffer<T,Key>::get_streams() {
  538. #ifdef SAVE_MEMORY
  539. MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
  540. #ifdef EMBUF_PRINT_GETPUT_STREAMS
  541. cout << "em_buffer::get_streams (buffer " << level <<")";
  542. cout << ": index = " << index << "(arity=" << arity << ")\n";
  543. cout.flush();
  544. #endif
  545. for (unsigned int i=0; i<index; i++) {
  546. get_stream(i);
  547. assert(data[i]);
  548. }
  549. #endif
  550. return data;
  551. }
  552. /************************************************************/
  553. //called in pair with load_streams to store streams on disk
  554. //and release the memory
  555. template<class T, class Key>
  556. void em_buffer<T,Key>::put_streams() {
  557. #ifdef SAVE_MEMORY
  558. MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
  559. #ifdef EMBUF_PRINT_GETPUT_STREAMS
  560. cout << "em_buffer::put_streams (buffer " << level <<")";
  561. cout << ": index = " << index << "(arity=" << arity << ")\n";
  562. cout.flush();
  563. #endif
  564. for (unsigned int i=0; i<index; i++) {
  565. put_stream(i);
  566. assert(data[i] == NULL);
  567. }
  568. #endif
  569. }
  570. #ifdef SAVE_MEMORY
  571. /************************************************************/
  572. //return the name of the ith stream
  573. template<class T, class Key>
  574. char* em_buffer<T,Key>::get_stream_name(unsigned int i) const {
  575. assert(i>=0 && i<index);
  576. assert(name[i]);
  577. return name[i];
  578. }
  579. #endif
  580. #ifdef SAVE_MEMORY
  581. /************************************************************/
  582. template<class T, class Key>
  583. void em_buffer<T,Key>::print_stream_names() {
  584. unsigned int i;
  585. for (i=0; i<index; i++) {
  586. assert(name[i]);
  587. cout << "stream " << i << ": " << name[i] << endl;
  588. }
  589. cout.flush();
  590. }
  591. #endif
  592. /************************************************************/
  593. //clean buffer in case some streams have been emptied by deletion
  594. template<class T, class Key>
  595. void em_buffer<T,Key>::cleanup() {
  596. MY_LOG_DEBUG_ID("em_buffer::cleanup()");
  597. #ifdef EMBUF_CLEANUP_PRINT
  598. #ifdef SAVE_MEMORY
  599. if (index>0) {
  600. cout << "before cleanup:\n";
  601. print_stream_names();
  602. print_stream_sizes();
  603. cout.flush();
  604. }
  605. #endif
  606. #endif
  607. //load all streams in memory
  608. get_streams();
  609. //count streams of size=0
  610. unsigned int i, empty=0;
  611. for (i=0; i<index; i++) {
  612. if (get_stream_len(i) == 0) {
  613. //printing..
  614. #ifdef EMBUF_DELETE_STREAM_PRINT
  615. cout<<"deleting stream [" << level << "," << i <<"]:" ;
  616. #ifdef SAVE_MEMORY
  617. cout << name[i];
  618. #endif
  619. cout << endl;
  620. cout.flush();
  621. #endif
  622. #ifdef SAVE_MEMORY
  623. //stream is empty ==> delete its name
  624. assert(name[i]);
  625. delete name[i];
  626. name[i] = NULL;
  627. #endif
  628. //stream is empty ==> reset data
  629. assert(data[i]);
  630. //data[i]->persist(PERSIST_DELETE); //this is done automatically..
  631. delete data[i];
  632. data[i] = NULL;
  633. deleted[i] = 0;
  634. streamsize[i] = 0;
  635. empty++;
  636. }
  637. }
  638. //streams are in memory; all streams which are NULL must have been
  639. //deleted
  640. //shift streams to the left in case holes were introduced
  641. unsigned int j=0;
  642. if (empty) {
  643. #ifdef EMBUF_DELETE_STREAM_PRINT
  644. cout << "em_buffer::cleanup: shifting streams\n"; cout.flush();
  645. #endif
  646. for (i=0; i<index; i++) {
  647. //if i'th stream is not empty, shift it left if necessary
  648. if (data[i]) {
  649. if (i!=j) {
  650. //set j'th stream to point to i'th stream
  651. //cout << j << " set to " << i << endl; cout.flush();
  652. data[j] = data[i];
  653. deleted[j] = deleted[i];
  654. streamsize[j] = streamsize[i];
  655. //set i'th stream to point to NULL
  656. data[i] = NULL;
  657. deleted[i] = 0;
  658. streamsize[i] = 0;
  659. #ifdef SAVE_MEMORY
  660. //fix the names
  661. /* already done assert(name[j]); */
  662. /* delete name[j]; */
  663. name[j] = name[i];
  664. name[i] = NULL;
  665. check_name(j);
  666. #endif
  667. } else {
  668. //cout << i << " left the same" << endl;
  669. }
  670. j++;
  671. } //if data[i] != NULL
  672. }//for i
  673. //set the index
  674. assert(index == j + empty);
  675. index = j;
  676. #ifdef EMBUF_DELETE_STREAM_PRINT
  677. cout << "em_buffer::cleanup: index set to " << index << endl;
  678. cout.flush();
  679. #endif
  680. } //if empty
  681. //put streams back to disk
  682. put_streams();
  683. #ifdef EMBUF_CLEANUP_PRINT
  684. #ifdef SAVE_MEMORY
  685. if (index >0) {
  686. cout << "after cleanup:\n";
  687. print_stream_names();
  688. print_stream_sizes();
  689. cout.flush();
  690. }
  691. #endif
  692. #endif
  693. }
  694. /************************************************************/
  695. //delete all streams
  696. template<class T, class Key>
  697. void em_buffer<T,Key>::reset() {
  698. get_streams();
  699. //make streams not-persistent and delete them
  700. for (unsigned int i=0; i<index; i++) {
  701. assert(data[i]);
  702. assert(streamsize[i] == data[i]->stream_len());
  703. #ifdef SAVE_MEMORY
  704. check_name(i);
  705. assert(name[i]);
  706. delete name[i];
  707. name[i] = NULL;
  708. #endif
  709. data[i]->persist(PERSIST_DELETE);
  710. delete data[i];
  711. data[i] = NULL;
  712. deleted[i] = 0;
  713. streamsize[i] = 0;
  714. }
  715. index = 0;
  716. }
  717. /************************************************************/
  718. //create and return a stream which contains all elements of
  719. //all streams of the buffer in sorted ascending order of
  720. //their keys (priorities);
  721. template<class T, class Key>
  722. AMI_STREAM<T>*
  723. em_buffer<T,Key>::sort() {
  724. //create stream
  725. MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
  726. AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>(); /* will be deleteed in insert() */
  727. assert(sorted_stream);
  728. //merge the streams into sorted stream
  729. AMI_err aerr;
  730. //Key dummykey;
  731. //must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
  732. // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
  733. // 0, dummykey);
  734. //could not use AMI_merge so i had to write my own..
  735. get_streams();
  736. aerr = substream_merge(data, arity, sorted_stream);
  737. assert(aerr == AMI_ERROR_NO_ERROR);
  738. put_streams();
  739. return sorted_stream;
  740. }
  741. /************************************************************/
  742. /* merge the input streams; there are <arity> streams in total; write
  743. output in <outstream>;
  744. the input streams are assumed sorted in increasing order of their
  745. keys;
  746. assumes the instreams are in memory (no need for get_streams()) */
  747. template<class T, class Key>
  748. AMI_err em_buffer<T,Key>::substream_merge(AMI_STREAM<T>** instreams,
  749. unsigned int arity,
  750. AMI_STREAM<T> *outstream) {
  751. unsigned int i, j;
  752. //some assertion checks
  753. assert(instreams);
  754. assert(outstream);
  755. for (i = 0; i < arity ; i++ ) {
  756. assert(instreams[i]);
  757. #ifdef SAVE_MEMORY
  758. check_name(i);
  759. #endif
  760. }
  761. std::vector<T*> in_objects(arity); //pointers to current leading elements of streams
  762. AMI_err ami_err;
  763. char str[200];
  764. sprintf(str, "em_buffer::substream_merge: allocate keys array, total %ldB\n",
  765. (long)((long)arity * sizeof(merge_key<Key>)));
  766. MEMORY_LOG(str);
  767. //keys array is initialized with smallest key from each stream (only
  768. //non-null keys must be included)
  769. merge_key<Key>* keys;
  770. //merge_key<Key>* keys = new (merge_key<Key>)[arity];
  771. typedef merge_key<Key> footype;
  772. keys = new footype[arity];
  773. assert(keys);
  774. //count number of non-empty streams
  775. j = 0;
  776. //rewind and read the first item from every stream initializing
  777. //in_objects and keys
  778. for (i = 0; i < arity ; i++ ) {
  779. assert(instreams[i]);
  780. //rewind stream
  781. if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
  782. return ami_err;
  783. }
  784. //read first item from stream
  785. if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
  786. AMI_ERROR_NO_ERROR) {
  787. if (ami_err == AMI_ERROR_END_OF_STREAM) {
  788. in_objects[i] = NULL;
  789. } else {
  790. return ami_err;
  791. }
  792. } else {
  793. //include this key in the array of keys
  794. Key k = in_objects[i]->getPriority();
  795. keys[j].set(k, i);
  796. j++;
  797. }
  798. }
  799. unsigned int NonEmptyRuns = j;
  800. //build heap from the array of keys
  801. pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
  802. //repeatedly extract_min from heap, write it to output stream and
  803. //insert next element from same stream
  804. merge_key<Key> minelt;
  805. //rewind output buffer
  806. ami_err = outstream->seek(0);
  807. assert(ami_err == AMI_ERROR_NO_ERROR);
  808. while (!mergeheap.empty()) {
  809. //find min key and id of the stream from whereit comes
  810. mergeheap.min(minelt);
  811. i = minelt.stream_id();
  812. //write min item to output stream
  813. if ((ami_err = outstream->write_item(*in_objects[i]))
  814. != AMI_ERROR_NO_ERROR) {
  815. return ami_err;
  816. }
  817. //read next item from same input stream
  818. if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
  819. != AMI_ERROR_NO_ERROR) {
  820. if (ami_err != AMI_ERROR_END_OF_STREAM) {
  821. return ami_err;
  822. }
  823. }
  824. //extract the min from the heap and insert next key from same stream
  825. if (ami_err == AMI_ERROR_END_OF_STREAM) {
  826. mergeheap.delete_min();
  827. } else {
  828. Key k = in_objects[i]->getPriority();
  829. merge_key<Key> nextit(k, i);
  830. mergeheap.delete_min_and_insert(nextit);
  831. }
  832. } //while
  833. //delete [] keys;
  834. //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
  835. //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
  836. //I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
  837. //AND EVERYTHING SCREWS UP..
  838. return AMI_ERROR_NO_ERROR;
  839. }
  840. /************************************************************/
  841. // insert an array into the buffer; assume array is sorted; return the
  842. // number of items actually inserted; if SAVE_MEMORY FLAG is on, put
  843. // stream on disk and release its memory
  844. template<class T, class Key>
  845. long em_buffer<T,Key>::insert(T* a, long n) {
  846. assert(a);
  847. if (is_full()) {
  848. cout << "em_buffer::insert: buffer full\n";
  849. return 0;
  850. }
  851. //can only insert one full stream at a time
  852. //relaxed..
  853. //assert(n == get_stream_maxlen());
  854. //create the stream
  855. MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
  856. AMI_STREAM<T>* str = new AMI_STREAM<T>();
  857. assert(str);
  858. //write the array to stream
  859. AMI_err ae;
  860. for (long i=0; i< n; i++) {
  861. ae = str->write_item(a[i]);
  862. assert(ae == AMI_ERROR_NO_ERROR);
  863. }
  864. assert(n == str->stream_len());
  865. //insert the stream in the buffer
  866. return insert(str);
  867. }
  868. /************************************************************/
  869. /* insert a stream into the buffer; the next free entry in the buffer
  870. is set to point to the stream; if SAVE_MEMORY flag is on, the
  871. stream is put to disk;
  872. the <nextstream> pointer of buffer is set to point to the argument
  873. stream; (in this way no stream copying is done, just one pointer
  874. copy). The user should be aware the the argument stream is 'lost' -
  875. that is a stream cannot be inserted repeatedly into many buffers
  876. because this would lead to several buffers pointing to the same
  877. stream.
  878. stream is assume stream is sorted; bos = how many elements must be
  879. skipped (were deleted) from the beginning fo stream;
  880. return the number of items actually inserted */
  881. template<class T, class Key>
  882. long em_buffer<T,Key>::insert(AMI_STREAM<T>* str, long bos) {
  883. assert(str);
  884. if (is_full()) {
  885. cout << "em_buffer::insert: buffer full\n";
  886. return 0;
  887. }
  888. //can only insert one level-i-full-stream at a time;
  889. //relaxed..can specify bos;
  890. //not only that, but the length of the stream can be smaller
  891. //than nominal length, because a stream is normally obtained by
  892. //merging streams which can be shorter;
  893. //assert(str->stream_len() == get_stream_len() - bos);
  894. #ifdef EMBUF_CHECK_INSERT
  895. //check that stream is sorted
  896. cout << "CHECK_INSERT: checking stream is sorted\n";
  897. AMI_err ae;
  898. str->seek(0);
  899. T *crt=NULL, *prev=NULL;
  900. while (str->read_item(&crt)) {
  901. assert(ae == AMI_ERROR_NO_ERROR);
  902. if (prev) assert(*prev <= *crt);
  903. }
  904. #endif
  905. //nextstream must be empty
  906. assert(str);
  907. assert(data[nextstream()] == NULL);
  908. assert(deleted[nextstream()] == 0);
  909. assert(streamsize[nextstream()] == 0);
  910. #ifdef SAVE_MEMORY
  911. assert(name[nextstream()] == NULL);
  912. #endif
  913. //set next entry i the buffer to point to this stream
  914. data[nextstream()] = str;
  915. deleted[nextstream()] = bos;
  916. streamsize[nextstream()] = str->stream_len();
  917. #ifdef SAVE_MEMORY
  918. //set next name entry in buffer to point to this stream's name
  919. char* s;
  920. str->name(&s); //name() allocates the string
  921. name[nextstream()] = s;
  922. //put stream on disk and release its memory
  923. str->persist(PERSIST_PERSISTENT);
  924. delete str; //stream should be persistent; just delete it
  925. data[nextstream()] = NULL;
  926. #ifdef EMBUF_PRINT_INSERT
  927. cout << "insert stream " << s << " at buf [" << level
  928. << "," << nextstream() << "]" << endl;
  929. #endif
  930. #endif
  931. //increment the index of next available stream in buffer
  932. incr_nextstream();
  933. #ifdef EMBUF_PRINT_INSERT
  934. print_stream_sizes();
  935. print_stream_names();
  936. #endif
  937. #ifdef SAVE_MEMORY
  938. MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
  939. MY_LOG_DEBUG_ID(name[nextstream()-1]);
  940. #endif
  941. //return nb of items inserted
  942. return get_stream_len(nextstream()-1);
  943. }
  944. /************************************************************/
  945. //print the elements of the i'th stream of the buffer to a stream;
  946. //assumes stream is in memory;
  947. template<class T, class Key>
  948. void em_buffer<T,Key>::print_stream(ostream& s, unsigned int i) {
  949. assert(data[i]);
  950. assert((i>=0) && (i<index));
  951. AMI_err ae;
  952. T* x;
  953. s << "STREAM " << i << ": [";
  954. ae = data[i]->seek(deleted[i]);
  955. assert(ae == AMI_ERROR_NO_ERROR);
  956. for (long j = 0; j < get_stream_len(i); j++) {
  957. ae = data[i]->read_item(&x);
  958. assert(ae == AMI_ERROR_NO_ERROR);
  959. s << *x << ",";
  960. }
  961. s << "]\n";
  962. }
  963. /************************************************************/
  964. //print elements range in buffer (read first and last element in each
  965. //substream and find global min and max)
  966. template<class T, class Key>
  967. void em_buffer<T,Key>::print_range() {
  968. T *min, *max;
  969. AMI_err ae;
  970. get_streams();
  971. for (unsigned int i=0; i< index; i++) {
  972. cout << "[";
  973. //read min element in substream i
  974. ae = data[i]->seek(deleted[i]);
  975. assert(ae == AMI_ERROR_NO_ERROR);
  976. ae = data[i]->read_item(&min);
  977. assert(ae == AMI_ERROR_NO_ERROR);
  978. cout << min->getPriority() << "..";
  979. //read max element in substream i
  980. ae = data[i]->seek(streamsize[i] - 1);
  981. assert(ae == AMI_ERROR_NO_ERROR);
  982. ae = data[i]->read_item(&max);
  983. assert(ae == AMI_ERROR_NO_ERROR);
  984. cout << max->getPriority()
  985. << " (sz=" << get_stream_len(i) << ")] ";
  986. }
  987. for (unsigned int i=index; i< arity; i++) {
  988. cout << "[] ";
  989. }
  990. put_streams();
  991. }
  992. /************************************************************/
  993. //print all elements in buffer
  994. template<class T, class Key>
  995. void em_buffer<T,Key>::print() {
  996. T *x;
  997. AMI_err ae;
  998. get_streams();
  999. for (unsigned int i=0; i<index; i++) {
  1000. cout << " [";
  1001. ae = data[i]->seek(deleted[i]);
  1002. assert(ae == AMI_ERROR_NO_ERROR);
  1003. for (unsigned long j=0; j<get_stream_len(i); j++) {
  1004. ae = data[i]->read_item(&x);
  1005. assert(ae == AMI_ERROR_NO_ERROR);
  1006. cout << x->getPriority() << ",";
  1007. }
  1008. cout << "]" << endl;
  1009. }
  1010. for (unsigned int i=index; i< arity; i++) {
  1011. cout << "[] ";
  1012. }
  1013. put_streams();
  1014. }
  1015. /************************************************************/
  1016. //print the sizes of the substreams in the buffer
  1017. template<class T, class Key>
  1018. void em_buffer<T,Key>::print_stream_sizes() {
  1019. cout << "(streams=" << index << ") sizes=[";
  1020. for (unsigned int i=0; i< arity; i++) {
  1021. cout << get_stream_len(i) << ",";
  1022. }
  1023. cout << "]" << endl;
  1024. cout.flush();
  1025. }
  1026. #endif