replacementHeap.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef REPLACEMENT_QUEUE_H
  31. #define REPLACEMENT_QUEUE_H
  32. #include <assert.h>
  33. #include "queue.h"
  34. #define RHEAP_DEBUG if(0)
  35. /*****************************************************************/
  36. /* encapsulation of the element and the run it comes from;
  37. */
  38. template<class T>
  39. class HeapElement {
  40. public:
  41. T value;
  42. AMI_STREAM<T> *run;
  43. HeapElement(): run(NULL) {};
  44. friend ostream& operator << (ostream& s, const HeapElement &p) {
  45. return s << "[" << p.value << "]";
  46. };
  47. };
  48. /*****************************************************************/
  49. /*
  50. This is a heap of HeapElements, i.e. elements which come from streams;
  51. when an element is consumed, the heap knows how to replace it with the
  52. next element from the same stream.
  53. Compare is a class that has a member function called "compare" which
  54. is used to compare two elements of type T
  55. */
  56. template<class T,class Compare>
  57. class ReplacementHeap {
  58. private:
  59. HeapElement<T>* mergeHeap; //the heap;
  60. size_t arity; //max size
  61. size_t size; //represents actual size, i.e. the nb of (non-empty)
  62. //runs; they are stored contigously in the first
  63. //<size> positions of mergeHeap; once a run becomes
  64. //empty, it is deleted and size is decremented. size
  65. //stores the next position where a HeapElement can be
  66. //added.
  67. protected:
  68. void heapify(size_t i);
  69. void buildheap();
  70. /* for each run in the heap, read an element from the run into the heap */
  71. void init();
  72. /* add a run; make sure total nb of runs does not exceed heap arity */
  73. void addRun(AMI_STREAM<T> *run);
  74. /* delete the i-th run (and the element); that is, swap the ith run
  75. with the last one, and decrement size; just like in a heap, but
  76. no heapify. Note: this function messes up the heap order. If the
  77. user wants to maintain heap property should call heapify
  78. specifically.
  79. */
  80. void deleteRun(size_t i);
  81. public:
  82. //allocate array mergeHeap and the runs in runList
  83. ReplacementHeap<T,Compare>(size_t arity, queue<char*>* runList);
  84. //delete array mergeHeap
  85. ~ReplacementHeap<T,Compare>();
  86. //is heap empty?
  87. int empty() const {
  88. return (size == 0);
  89. }
  90. //delete mergeHeap[0].value, replace it with the next element from
  91. //the same stream, and re-heapify
  92. T extract_min();
  93. ostream & print(ostream& s) const {
  94. char* runname;
  95. off_t runlen;
  96. s << "Replacementheap " << this << ": " << size << " runs";
  97. for(size_t i=0; i<size; i++) {
  98. s << endl << " <- i=" << i<< ": " << mergeHeap[i].run;
  99. assert(mergeHeap[i].run);
  100. mergeHeap[i].run->name(&runname);
  101. runlen = mergeHeap[i].run->stream_len();
  102. s << ", " << runname << ", len=" << runlen;
  103. delete runname; //this should be safe
  104. }
  105. s << endl;
  106. return s;
  107. }
  108. };
  109. /*****************************************************************/
  110. template<class T,class Compare>
  111. ReplacementHeap<T,Compare>::ReplacementHeap(size_t g_arity,
  112. queue<char*>* runList) {
  113. char* name=NULL;
  114. assert(runList && g_arity > 0);
  115. RHEAP_DEBUG cerr << "ReplacementHeap arity=" << g_arity << "\n";
  116. arity = g_arity;
  117. size = 0; //no run yet
  118. mergeHeap = new HeapElement<T>[arity];
  119. for (unsigned int i=0; i< arity; i++) {
  120. //pop a stream from the list and add it to heap
  121. runList->dequeue(&name);
  122. AMI_STREAM<T>* str = new AMI_STREAM<T>(name);
  123. assert(str);
  124. delete name; //str makes its own copy
  125. addRun(str);
  126. }
  127. init();
  128. }
  129. /*****************************************************************/
  130. template<class T,class Compare>
  131. ReplacementHeap<T,Compare>::~ReplacementHeap<T,Compare>() {
  132. if (!empty()) {
  133. cerr << "warning: ~ReplacementHeap: heap not empty!\n";
  134. }
  135. //delete the runs first
  136. for(size_t i=0; i<size; i++) {
  137. if (mergeHeap[i].run)
  138. delete mergeHeap[i].run;
  139. }
  140. delete [] mergeHeap;
  141. }
  142. /*****************************************************************/
  143. /* add a run; make sure total nb of runs does not exceed heap arity
  144. */
  145. template<class T,class Compare>
  146. void
  147. ReplacementHeap<T,Compare>::addRun(AMI_STREAM<T> *r) {
  148. assert(r);
  149. if(size == arity) {
  150. cerr << "ReplacementHeap::addRun size =" << size << ",arity=" << arity
  151. << " full, cannot add another run.\n";
  152. assert(0);
  153. exit(1);
  154. }
  155. assert(size < arity);
  156. mergeHeap[size].run = r;
  157. size++;
  158. RHEAP_DEBUG
  159. {char* strname;
  160. r->name(&strname);
  161. cerr << "ReplacementHeap::addRun added run " << strname
  162. << " (rheap size=" << size << ")" << endl;
  163. delete strname;
  164. cerr.flush();
  165. }
  166. }
  167. /*****************************************************************/
  168. /* delete the i-th run (and the value); that is, swap ith element with
  169. the last one, and decrement size; just like in a heap, but no
  170. heapify. Note: this function messes up the heap order. If the user
  171. wants to maintain heap property should call heapify specifically.
  172. */
  173. template<class T,class Compare>
  174. void
  175. ReplacementHeap<T,Compare>::deleteRun(size_t i) {
  176. assert(i >= 0 && i < size && mergeHeap[i].run);
  177. RHEAP_DEBUG
  178. {
  179. cerr << "ReplacementHeap::deleteRun deleting run " << i << ", "
  180. << mergeHeap[i].run << endl;
  181. print(cerr);
  182. }
  183. //delete it
  184. delete mergeHeap[i].run;
  185. //and replace it with
  186. if (size > 1) {
  187. mergeHeap[i].value = mergeHeap[size-1].value;
  188. mergeHeap[i].run = mergeHeap[size-1].run;
  189. }
  190. size--;
  191. }
  192. /*****************************************************************/
  193. /* for each run in the heap, read an element from the run into the
  194. heap; if ith run is empty, delete it
  195. */
  196. template<class T,class Compare>
  197. void
  198. ReplacementHeap<T,Compare>::init() {
  199. AMI_err err;
  200. T* elt;
  201. size_t i;
  202. RHEAP_DEBUG cerr << "ReplacementHeap::init " ;
  203. i=0;
  204. while (i<size) {
  205. assert(mergeHeap[i].run);
  206. // Rewind run i
  207. err = mergeHeap[i].run->seek(0);
  208. if (err != AMI_ERROR_NO_ERROR) {
  209. cerr << "ReplacementHeap::Init(): cannot seek run " << i << "\n";
  210. assert(0);
  211. exit(1);
  212. }
  213. //read first item from run i
  214. err = mergeHeap[i].run->read_item(&elt);
  215. if (err != AMI_ERROR_NO_ERROR) {
  216. if (err == AMI_ERROR_END_OF_STREAM) {
  217. deleteRun(i);
  218. //need to iterate one more time with same i;
  219. } else {
  220. cerr << "ReplacementHeap::Init(): cannot read run " << i << "\n";
  221. assert(0);
  222. exit(1);
  223. }
  224. } else {
  225. //copy.... can this be avoided? xxx
  226. mergeHeap[i].value = *elt;
  227. i++;
  228. }
  229. }
  230. buildheap();
  231. }
  232. // Helper functions for navigating through a binary heap.
  233. /* for simplicity the heap structure is slightly modified as:
  234. 0
  235. |
  236. 1
  237. /\
  238. 2 3
  239. /\ /\
  240. 4 5 6 7
  241. */
  242. // The children of an element of the heap.
  243. static inline size_t rheap_lchild(size_t index) {
  244. return 2 * index;
  245. }
  246. static inline size_t rheap_rchild(size_t index) {
  247. return 2 * index + 1;
  248. }
  249. // The parent of an element.
  250. static inline size_t rheap_parent(size_t index) {
  251. return index >> 1;
  252. }
  253. // this functions are duplicated in pqheap.H...XXX
  254. /*****************************************************************/
  255. template<class T,class Compare>
  256. void
  257. ReplacementHeap<T,Compare>::heapify(size_t i) {
  258. size_t min_index = i;
  259. size_t lc = rheap_lchild(i);
  260. size_t rc = rheap_rchild(i);
  261. Compare cmpobj;
  262. assert(i >= 0 && i < size);
  263. if ((lc < size) &&
  264. (cmpobj.compare(mergeHeap[lc].value, mergeHeap[min_index].value) == -1)) {
  265. min_index = lc;
  266. }
  267. if ((rc < size) &&
  268. (cmpobj.compare(mergeHeap[rc].value, mergeHeap[min_index].value) == -1)) {
  269. min_index = rc;
  270. }
  271. if (min_index != i) {
  272. HeapElement<T> tmp = mergeHeap[min_index];
  273. mergeHeap[min_index] = mergeHeap[i];
  274. mergeHeap[i] = tmp;
  275. heapify(min_index);
  276. }
  277. return;
  278. }
  279. /*****************************************************************/
  280. template<class T,class Compare>
  281. void
  282. ReplacementHeap<T,Compare>::buildheap() {
  283. if (size > 1) {
  284. for (int i = rheap_parent(size-1); i>=0; i--) {
  285. heapify(i);
  286. }
  287. }
  288. RHEAP_DEBUG cerr << "Buildheap done\n";
  289. return;
  290. }
  291. /*****************************************************************/
  292. template<class T,class Compare>
  293. T
  294. ReplacementHeap<T,Compare>::extract_min() {
  295. T *elt, min;
  296. AMI_err err;
  297. assert(!empty()); //user's job to check first if it's empty
  298. min = mergeHeap[0].value;
  299. //read a new element from the same run
  300. assert(mergeHeap[0].run);
  301. err = mergeHeap[0].run->read_item(&elt);
  302. if (err != AMI_ERROR_NO_ERROR) {
  303. //if run is empty, delete it
  304. if (err == AMI_ERROR_END_OF_STREAM) {
  305. RHEAP_DEBUG cerr << "rheap extract_min: run " << mergeHeap[0].run
  306. << " empty. deleting\n ";
  307. deleteRun(0);
  308. } else {
  309. cerr << "ReplacementHeap::extract_min: cannot read\n";
  310. assert(0);
  311. exit(1);
  312. }
  313. } else {
  314. //copy...can this be avoided?
  315. mergeHeap[0].value = *elt;
  316. }
  317. //restore heap
  318. if (size > 0) heapify(0);
  319. return min;
  320. }
  321. #endif