empq.h 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef __EMPQ_H
  31. #define __EMPQ_H
  32. #include <stdio.h>
  33. #include <assert.h>
  34. #include "ami_config.h" //for SAVE_MEMORY
  35. #include "ami_stream.h"
  36. #include "mm.h"
  37. #include "mm_utils.h" //for MEMORY_LOG, getAvailableMemory
  38. #include "imbuffer.h"
  39. #include "embuffer.h"
  40. #include "pqheap.h"
  41. #include "minmaxheap.h"
  42. template<class T,class Key> class ExtendedEltMergeType;
  43. #define ExtendedMergeStream AMI_STREAM<ExtendedEltMergeType<T,Key> >
  44. /**********************************************************
  45. DEBUGGING FLAGS
  46. ***********************************************************/
  47. //enables printing messages when buffers are emptied
  48. //#define EMPQ_EMPTY_BUF_PRINT
  49. //enables printing when pq gets filled from buffers
  50. //#define EMPQ_PQ_FILL_PRINT
  51. //enables priting inserts
  52. //#define EMPQ_PRINT_INSERT
  53. //enables printing deletes
  54. //#define EMPQ_PRINT_EXTRACTALL
  55. //enables printing the empq on insert/extract_all_min
  56. //#define EMPQ_PRINT_EMPQ
  57. //enable priting the size of the EMPQ and nb of active streams
  58. //on fillpq() amd on empty_buff_0
  59. //#define EMPQ_PRINT_SIZE
  60. //enable printing 'fill pq from B0' in extract_min()
  61. //#define EMPQ_PRINT_FILLPQ_FROM_BUFF0
  62. //enable expensive size asserts
  63. //#define EMPQ_ASSERT_EXPENSIVE
  64. /**********************************************************/
  65. /* external memory priority queue
  66. Functionality:
  67. Keep a pqueue PQ of size \theta(M) in memory. Keep a buffer B0 of
  68. size \theta(M) in memory. keep an array of external-memory
  69. buffers, one for each level 1..log_m{n/m} (where N is the maximum
  70. number of items in pqueue at any time).
  71. invariants:
  72. 1. PQ contains the smallest items in the structure.
  73. 2. each stream of any external memory buffers is sorted in
  74. increasing order.
  75. insert(x): if (x < maximum_item(PQ) exchange x with
  76. maximum_item(PQ); if buffer B0 is full, empty it; insert x in B0;
  77. extract_min():
  78. analysis:
  79. 1. inserts: once the buffer B0 is empty, the next sizeof(B0)
  80. inserts are free; one insert can cause many I/Os if cascading
  81. emptying of external buffers Bi occurs. Emptying level-i buffer
  82. costs <arity>^i*sizeof(B0)/B I/Os and occurs every
  83. N/<arity>^i*sizeof(B0) inserts (or less, if deletes too). It can be
  84. proved that the amortized time of 1 insert is 1/B*maxnb_buffers.
  85. */
  86. /*
  87. T is assumed to be a class for which getPriority() and getValue()
  88. are implemented; for simplicity it is assumed that the comparison
  89. operators have been overloaded on T such that
  90. x < y <==> x.getPriority() < y.getPriority()
  91. */
  92. template<class T, class Key>
  93. class em_pqueue {
  94. private:
  95. //in memory priority queue
  96. MinMaxHeap<T> *pq;
  97. //pqueue size
  98. unsigned long pqsize;
  99. //in-memory buffer
  100. im_buffer<T> *buff_0;
  101. //in-memory buffer size
  102. unsigned long bufsize;
  103. //external memory buffers
  104. em_buffer<T,Key>** buff;
  105. /* number of external memory buffers statically allocated in the
  106. beginning; since the number of buffers needed is \log_m{n/m}, we
  107. cannot know it in advance; estimate it roughly and then reallocate
  108. it dynamically on request;
  109. TO DO: dynamic reallocation with a bigger nb of external buffer
  110. if structure becomes full */
  111. unsigned short max_nbuf;
  112. //index of next external buffer entry available for use (i.e. is NULL)
  113. unsigned short crt_buf;
  114. //external buffer arity
  115. unsigned int buf_arity;
  116. public:
  117. //create an em_pqueue of specified size
  118. em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf,
  119. unsigned int buf_ar);
  120. //create an em_pqueue capable to store <= N elements
  121. em_pqueue();
  122. em_pqueue(long N) { em_pqueue(); }; // N not used
  123. #ifdef SAVE_MEMORY
  124. // create an empq, initialize its pq with im and insert amis in
  125. // buff[0]; im should not be used/deleted after that outside empq
  126. em_pqueue(MinMaxHeap<T> *im, AMI_STREAM<T> *amis);
  127. #endif
  128. //copy constructor
  129. em_pqueue(const em_pqueue &ep);
  130. //clean up
  131. ~em_pqueue();
  132. //return the nb of elements in the structure
  133. unsigned long size();
  134. //return true if empty
  135. bool is_empty();
  136. //return true if full
  137. bool is_full() {
  138. cout << "em_pqueue::is_full(): sorry not implemented\n";
  139. exit(1);
  140. }
  141. //return the element with minimum priority in the structure
  142. bool min(T& elt);
  143. //delete the element with minimum priority in the structure;
  144. //return false if pq is empty
  145. bool extract_min(T& elt);
  146. //extract all elts with min key, add them and return their sum
  147. bool extract_all_min(T& elt);
  148. //insert an element; return false if insertion fails
  149. bool insert(const T& elt);
  150. //return maximum capacity of i-th external buffer
  151. long maxlen(unsigned short i);
  152. //return maximum capacity of em_pqueue
  153. long maxlen();
  154. // delete all the data in the pq; reset to empty but don't free memory
  155. void clear();
  156. //print structure
  157. void print_range();
  158. void print();
  159. //print the detailed size of empq (pq, buf_0, buff[i])
  160. void print_size();
  161. friend ostream& operator<<(ostream& s, const em_pqueue &empq) {
  162. s << "EM_PQ: pq size=" << empq.pqsize
  163. << ", buff_0 size=" << empq.bufsize
  164. << ", ext_bufs=" << empq.crt_buf
  165. << "(max " << empq.max_nbuf << ")\n";
  166. s << "IN_MEMORY PQ: \n" << *(empq.pq) << "\n";
  167. s << "IN_MEMORY BUFFER: \n" << *(empq.buff_0) << "\n";
  168. for (unsigned short i=0; i < empq.crt_buf; i++) {
  169. //s << "EM_BUFFER " << i << ":\n" ;
  170. s << *(empq.buff[i]);
  171. }
  172. return s;
  173. }
  174. protected:
  175. //return the nb of active streams in the buffer
  176. int active_streams() {
  177. int totstr = 0;
  178. for (unsigned short i = 0; i< crt_buf; i++) {
  179. totstr+= buff[i]->get_nbstreams();
  180. }
  181. return totstr;
  182. }
  183. //called when buff_0 is full to empty it on external level_1 buffer;
  184. //can produce cascading emptying
  185. bool empty_buff_0();
  186. //sort and empty buffer i into buffer (i+1) recursively;
  187. //called recursively by empty_buff_0() to empty subsequent buffers
  188. //i must be a valid (i<crt_buf) full buffer
  189. void empty_buff(unsigned short i);
  190. /* merge the first <K> elements of the streams of input buffer,
  191. starting at position <buf.deleted[i]> in each stream; there are
  192. <buf.arity> streams in total; write output in <outstream>; the
  193. items written in outstream are of type <merge_output_type> which
  194. extends T with the stream nb and buffer nb the item comes from;
  195. this information is needed later to distribute items back; do not
  196. delete the K merged elements from the input streams; <bufid> is the
  197. id of the buffer whose streams are being merged;
  198. the input streams are assumed sorted in increasing order of keys; */
  199. AMI_err merge_buffer(em_buffer<T,Key> *buf,
  200. ExtendedMergeStream *outstr, long K);
  201. /* merge the first <K> elements of the input streams; there are
  202. <arity> streams in total; write output in <outstream>;
  203. the input streams are assumed sorted in increasing order of their
  204. keys; */
  205. AMI_err merge_streams(ExtendedMergeStream** instr,
  206. unsigned short arity,
  207. ExtendedMergeStream *outstr, long K);
  208. //deletes one element from <buffer, stream>
  209. void delete_str_elt(unsigned short buf_id,
  210. unsigned int stream_id);
  211. /* copy the minstream in the internal pqueue while merging with
  212. buff_0; the smallest <pqsize> elements between minstream and
  213. buff_0 will be inserted in internal pqueue; also, the elements
  214. from minstram which are inserted in pqueue must be marked as
  215. deleted in the source streams; */
  216. void merge_bufs2pq(ExtendedMergeStream *minstream);
  217. //clean buffers in case some streams have been emptied
  218. void cleanup();
  219. //called when pq must be filled from external buffers
  220. bool fillpq();
  221. //print the nb of elements in each stream
  222. void print_stream_sizes();
  223. };
  224. #endif