replacementHeapBlock.h 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef REPLACEMENT_HEAPBLOCK_H
  31. #define REPLACEMENT_HEAPBLOCK_H
  32. #include <assert.h>
  33. #include "mem_stream.h"
  34. #include "replacementHeap.h"
  35. #define RBHEAP_DEBUG if(0)
  36. /*****************************************************************/
  37. /* encapsulation of the element and the run it comes from;
  38. */
  39. template<class T>
  40. class BlockHeapElement {
  41. public:
  42. T value;
  43. MEM_STREAM<T> *run;
  44. BlockHeapElement(): run(NULL) {};
  45. friend ostream& operator << (ostream& s, const BlockHeapElement &p) {
  46. return s << "[" << p.value << "]";
  47. };
  48. };
  49. /*****************************************************************/
  50. /*
  51. This is a heap of HeapElements, i.e. elements which come from streams;
  52. when an element is consumed, the heap knows how to replace it with the
  53. next element from the same stream.
  54. Compare is a class that has a member function called "compare" which
  55. is used to compare two elements of type T
  56. */
  57. template<class T,class Compare>
  58. class ReplacementHeapBlock {
  59. private:
  60. BlockHeapElement<T>* mergeHeap; //the heap;
  61. size_t arity; //max size
  62. size_t size; //represents actual size, i.e. the nb of (non-empty)
  63. //runs; they are stored contigously in the first
  64. //<size> positions of mergeHeap; once a run becomes
  65. //empty, it is deleted and size is decremented. size
  66. //stores the next position where a HeapElement can be
  67. //added.
  68. protected:
  69. void heapify(size_t i);
  70. void buildheap();
  71. /* for each run in the heap, read an element from the run into the heap */
  72. void init();
  73. /* add a run; make sure total nb of runs does not exceed heap arity */
  74. void addRun(MEM_STREAM<T> *run);
  75. /* delete the i-th run (and the element); that is, swap the ith run
  76. with the last one, and decrement size; just like in a heap, but
  77. no heapify. Note: this function messes up the heap order. If the
  78. user wants to maintain heap property should call heapify
  79. specifically.
  80. */
  81. void deleteRun(size_t i);
  82. public:
  83. //allocate array mergeHeap, where the streams are stored in runList
  84. ReplacementHeapBlock<T,Compare>(queue <MEM_STREAM<T>*> *runList);
  85. //delete array mergeHeap
  86. ~ReplacementHeapBlock<T,Compare>();
  87. //is heap empty?
  88. int empty() const {
  89. return (size == 0);
  90. }
  91. //delete mergeHeap[0].value, replace it with the next element from
  92. //the same stream, and re-heapify
  93. T extract_min();
  94. ostream & print(ostream& s) const {
  95. s << "ReplacementheapBlock " << this << ": " << size << " runs";
  96. #if(0)
  97. char* runname;
  98. off_t runlen;
  99. for(size_t i=0; i<size; i++) {
  100. s << endl << " <- i=" << i<< ": " << mergeHeap[i].run;
  101. assert(mergeHeap[i].run);
  102. mergeHeap[i].run->name(&runname);
  103. runlen = mergeHeap[i].run->stream_len();
  104. s << ", " << runname << ", len=" << runlen;
  105. delete runname; //this should be safe
  106. }
  107. #endif
  108. s << endl;
  109. return s;
  110. }
  111. };
  112. /*****************************************************************/
  113. //allocate array mergeHeap, where the streams are stored in runList
  114. template<class T,class Compare>
  115. ReplacementHeapBlock<T,Compare>
  116. ::ReplacementHeapBlock(queue <MEM_STREAM<T>*> *runList) {
  117. RBHEAP_DEBUG cerr << "ReplacementHeapBlock " << endl;
  118. arity = runList->length();
  119. size = 0; //no run yet
  120. MEM_STREAM<T>* str;
  121. mergeHeap = new BlockHeapElement<T>[arity];
  122. for (unsigned int i=0; i< arity; i++) {
  123. //pop a stream from the list and add it to heap
  124. str = NULL;
  125. runList->dequeue(&str);
  126. assert(str);
  127. addRun(str);
  128. }
  129. init();
  130. }
  131. /*****************************************************************/
  132. template<class T,class Compare>
  133. ReplacementHeapBlock<T,Compare>::~ReplacementHeapBlock<T,Compare>() {
  134. if (!empty()) {
  135. cerr << "warning: ~ReplacementHeapBlock: heap not empty!\n";
  136. }
  137. //delete the runs first
  138. for(size_t i=0; i<size; i++) {
  139. if (mergeHeap[i].run)
  140. delete mergeHeap[i].run;
  141. }
  142. delete [] mergeHeap;
  143. }
  144. /*****************************************************************/
  145. /* add a run; make sure total nb of runs does not exceed heap arity
  146. */
  147. template<class T,class Compare>
  148. void
  149. ReplacementHeapBlock<T,Compare>::addRun(MEM_STREAM<T> *r) {
  150. assert(r);
  151. if(size == arity) {
  152. cerr << "ReplacementHeapBlockBlock::addRun size =" << size << ",arity=" << arity
  153. << " full, cannot add another run.\n";
  154. assert(0);
  155. exit(1);
  156. }
  157. assert(size < arity);
  158. mergeHeap[size].run = r;
  159. size++;
  160. RBHEAP_DEBUG
  161. {char* strname;
  162. r->name(&strname);
  163. cerr << "ReplacementHeapBlock::addRun added run " << strname
  164. << " (rheap size=" << size << ")" << endl;
  165. delete strname;
  166. cerr.flush();
  167. }
  168. }
  169. /*****************************************************************/
  170. /* delete the i-th run (and the value); that is, swap ith element with
  171. the last one, and decrement size; just like in a heap, but no
  172. heapify. Note: this function messes up the heap order. If the user
  173. wants to maintain heap property should call heapify specifically.
  174. */
  175. template<class T,class Compare>
  176. void
  177. ReplacementHeapBlock<T,Compare>::deleteRun(size_t i) {
  178. assert(i >= 0 && i < size && mergeHeap[i].run);
  179. RBHEAP_DEBUG
  180. {
  181. cerr << "ReplacementHeapBlock::deleteRun deleting run " << i << ", "
  182. << mergeHeap[i].run << endl;
  183. print(cerr);
  184. }
  185. //delete it
  186. delete mergeHeap[i].run;
  187. //and replace it with
  188. if (size > 1) {
  189. mergeHeap[i].value = mergeHeap[size-1].value;
  190. mergeHeap[i].run = mergeHeap[size-1].run;
  191. }
  192. size--;
  193. }
  194. /*****************************************************************/
  195. /* for each run in the heap, read an element from the run into the
  196. heap; if ith run is empty, delete it
  197. */
  198. template<class T,class Compare>
  199. void
  200. ReplacementHeapBlock<T,Compare>::init() {
  201. AMI_err err;
  202. T* elt;
  203. size_t i;
  204. RBHEAP_DEBUG cerr << "ReplacementHeapBlock::init " ;
  205. i=0;
  206. while (i<size) {
  207. assert(mergeHeap[i].run);
  208. // Rewind run i
  209. err = mergeHeap[i].run->seek(0);
  210. if (err != AMI_ERROR_NO_ERROR) {
  211. cerr << "ReplacementHeapBlock::Init(): cannot seek run " << i << "\n";
  212. assert(0);
  213. exit(1);
  214. }
  215. //read first item from run i
  216. err = mergeHeap[i].run->read_item(&elt);
  217. if (err != AMI_ERROR_NO_ERROR) {
  218. if (err == AMI_ERROR_END_OF_STREAM) {
  219. deleteRun(i);
  220. //need to iterate one more time with same i;
  221. } else {
  222. cerr << "ReplacementHeapBlock::Init(): cannot read run " << i << "\n";
  223. assert(0);
  224. exit(1);
  225. }
  226. } else {
  227. //copy.... can this be avoided? xxx
  228. mergeHeap[i].value = *elt;
  229. i++;
  230. }
  231. }
  232. buildheap();
  233. }
  234. /*****************************************************************/
  235. template<class T,class Compare>
  236. void
  237. ReplacementHeapBlock<T,Compare>::heapify(size_t i) {
  238. size_t min_index = i;
  239. size_t lc = rheap_lchild(i);
  240. size_t rc = rheap_rchild(i);
  241. Compare cmpobj;
  242. assert(i >= 0 && i < size);
  243. if ((lc < size) &&
  244. (cmpobj.compare(mergeHeap[lc].value, mergeHeap[min_index].value) == -1)) {
  245. min_index = lc;
  246. }
  247. if ((rc < size) &&
  248. (cmpobj.compare(mergeHeap[rc].value, mergeHeap[min_index].value) == -1)) {
  249. min_index = rc;
  250. }
  251. if (min_index != i) {
  252. BlockHeapElement<T> tmp = mergeHeap[min_index];
  253. mergeHeap[min_index] = mergeHeap[i];
  254. mergeHeap[i] = tmp;
  255. heapify(min_index);
  256. }
  257. return;
  258. }
  259. /*****************************************************************/
  260. template<class T,class Compare>
  261. void
  262. ReplacementHeapBlock<T,Compare>::buildheap() {
  263. if (size > 1) {
  264. for (int i = rheap_parent(size-1); i>=0; i--) {
  265. heapify(i);
  266. }
  267. }
  268. RBHEAP_DEBUG cerr << "Buildheap done\n";
  269. return;
  270. }
  271. /*****************************************************************/
  272. template<class T,class Compare>
  273. T
  274. ReplacementHeapBlock<T,Compare>::extract_min() {
  275. T *elt, min;
  276. AMI_err err;
  277. assert(!empty()); //user's job to check first if it's empty
  278. min = mergeHeap[0].value;
  279. //read a new element from the same run
  280. assert(mergeHeap[0].run);
  281. err = mergeHeap[0].run->read_item(&elt);
  282. if (err != AMI_ERROR_NO_ERROR) {
  283. //if run is empty, delete it
  284. if (err == AMI_ERROR_END_OF_STREAM) {
  285. RBHEAP_DEBUG cerr << "rheap extract_min: run " << mergeHeap[0].run
  286. << " empty. deleting\n ";
  287. deleteRun(0);
  288. } else {
  289. cerr << "ReplacementHeapBlock::extract_min: cannot read\n";
  290. assert(0);
  291. exit(1);
  292. }
  293. } else {
  294. //copy...can this be avoided?
  295. mergeHeap[0].value = *elt;
  296. }
  297. //restore heap
  298. if (size > 0) heapify(0);
  299. return min;
  300. }
  301. #endif