ami_sort_impl.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef AMI_SORT_IMPL_H
  31. #define AMI_SORT_IMPL_H
  32. #include "ami_stream.h"
  33. #include "mem_stream.h"
  34. #include "mm.h"
  35. #include "quicksort.h"
  36. #include "queue.h"
  37. #include "replacementHeap.h"
  38. #include "replacementHeapBlock.h"
  39. #define SDEBUG if(0)
  40. /* if this flag is defined, a run will be split into blocks, each
  41. block sorted and then all blocks merged */
  42. #define BLOCKED_RUN
  43. /* ---------------------------------------------------------------------- */
  44. //set run_size, last_run_size and nb_runs depending on how much memory
  45. //is available
  46. template<class T>
  47. static void
  48. initializeRunFormation(AMI_STREAM<T> *instream,
  49. size_t &run_size, size_t &last_run_size,
  50. unsigned int &nb_runs) {
  51. size_t mm_avail = MM_manager.memory_available();
  52. off_t strlen;
  53. #ifdef BLOCKED_RUN
  54. // not in place, can only use half memory
  55. mm_avail = mm_avail/2;
  56. #endif
  57. run_size = mm_avail/sizeof(T);
  58. strlen = instream->stream_len();
  59. if (strlen == 0) {
  60. nb_runs = last_run_size = 0;
  61. } else {
  62. if (strlen % run_size == 0) {
  63. nb_runs = strlen/run_size;
  64. last_run_size = run_size;
  65. } else {
  66. nb_runs = strlen/run_size + 1;
  67. last_run_size = strlen % run_size;
  68. }
  69. }
  70. SDEBUG cout << "nb_runs=" << nb_runs
  71. << ", run_size=" << run_size
  72. << ", last_run_size=" << last_run_size
  73. << "\n";
  74. }
  75. /* ---------------------------------------------------------------------- */
  76. /* data is allocated; read run_size elements from stream into data and
  77. sort them using quicksort */
  78. template<class T, class Compare>
  79. size_t makeRun_Block(AMI_STREAM<T> *instream, T* data,
  80. unsigned int run_size, Compare *cmp) {
  81. AMI_err err;
  82. off_t new_run_size = 0;
  83. //read next run from input stream
  84. err = instream->read_array(data, run_size, &new_run_size);
  85. assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
  86. //sort it in memory in place
  87. quicksort(data, new_run_size, *cmp);
  88. return new_run_size;
  89. }
  90. /* ---------------------------------------------------------------------- */
  91. /* data is allocated; read run_size elements from stream into data and
  92. sort them using quicksort; instead of reading the whole chunk at
  93. once, it reads it in blocks, sorts each block and then merges the
  94. blocks together. Note: it is not in place! it allocates another
  95. array of same size as data, writes the sorted run into it and
  96. deteles data, and replaces data with outdata */
  97. template<class T, class Compare>
  98. void makeRun(AMI_STREAM<T> *instream, T* &data,
  99. int run_size, Compare *cmp) {
  100. unsigned int nblocks, last_block_size, crt_block_size, block_size;
  101. block_size = STREAM_BUFFER_SIZE;
  102. if (run_size % block_size == 0) {
  103. nblocks = run_size / block_size;
  104. last_block_size = block_size;
  105. } else {
  106. nblocks = run_size / block_size + 1;
  107. last_block_size = run_size % block_size;
  108. }
  109. //create queue of blocks waiting to be merged
  110. queue<MEM_STREAM<T> *> *blockList;
  111. MEM_STREAM<T>* str;
  112. blockList = new queue<MEM_STREAM<T> *>(nblocks);
  113. for (unsigned int i=0; i < nblocks; i++) {
  114. crt_block_size = (i == nblocks-1) ? last_block_size: block_size;
  115. makeRun_Block(instream, &(data[i*block_size]), crt_block_size, cmp);
  116. str = new MEM_STREAM<T>( &(data[i*block_size]), crt_block_size);
  117. blockList->enqueue(str);
  118. }
  119. assert(blockList->length() == nblocks);
  120. //now data consists of sorted blocks: merge them
  121. ReplacementHeapBlock<T,Compare> rheap(blockList);
  122. SDEBUG rheap.print(cerr);
  123. int i = 0;
  124. T elt;
  125. T* outdata = new T [run_size];
  126. while (!rheap.empty()) {
  127. elt = rheap.extract_min();
  128. outdata[i] = elt;
  129. //SDEBUG cerr << "makeRun: written " << elt << endl;
  130. i++;
  131. }
  132. assert( i == run_size && blockList->length() == 0);
  133. delete blockList;
  134. T* tmp = data;
  135. delete [] tmp;
  136. data = outdata;
  137. }
  138. /* ---------------------------------------------------------------------- */
  139. //partition instream in streams that fit in main memory, sort each
  140. //stream, remember its name, make it persistent and store it on
  141. //disk. if entire stream fits in memory, sort it and store it and
  142. //return it.
  143. //assume instream is allocated prior to the call.
  144. // set nb_runs and allocate runNames.
  145. //The comparison object "cmp", of (user-defined) class represented by
  146. //Compare, must have a member function called "compare" which is used
  147. //for sorting the input stream.
  148. template<class T, class Compare>
  149. queue<char*>*
  150. runFormation(AMI_STREAM<T> *instream, Compare *cmp) {
  151. size_t run_size,last_run_size, crt_run_size;
  152. unsigned int nb_runs;
  153. queue<char*>* runList;
  154. T* data;
  155. AMI_STREAM<T>* str;
  156. char* strname;
  157. assert(instream && cmp);
  158. SDEBUG cout << "runFormation: ";
  159. SDEBUG MM_manager.print();
  160. /* leave this in for now, in case some file-based implementations do
  161. anything funny... -RW */
  162. //rewind file
  163. instream->seek(0); //should check error xxx
  164. //estimate run_size, last_run_size and nb_runs
  165. initializeRunFormation(instream, run_size, last_run_size, nb_runs);
  166. //create runList (if 0 size, queue uses default)
  167. runList = new queue<char*>(nb_runs);
  168. /* allocate space for a run */
  169. if (nb_runs <= 1) {
  170. //don't waste space if input stream is smaller than run_size
  171. data = new T[last_run_size];
  172. } else {
  173. data = new T[run_size];
  174. }
  175. SDEBUG MM_manager.print();
  176. for (size_t i=0; i< nb_runs; i++) {
  177. //while(!instream->eof()) {
  178. crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
  179. SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
  180. //crt_run_size = makeRun_Block(instream, data, run_size, cmp);
  181. #ifdef BLOCKED_RUN
  182. makeRun(instream, data, crt_run_size, cmp);
  183. #else
  184. makeRun_Block(instream, data, crt_run_size, cmp);
  185. #endif
  186. SDEBUG MM_manager.print();
  187. //read next run from input stream
  188. //err = instream->read_array(data, crt_run_size);
  189. //assert(err == AMI_ERROR_NO_ERROR);
  190. //sort it in memory in place
  191. //quicksort(data, crt_run_size, *cmp);
  192. if(crt_run_size > 0) {
  193. //create a new stream to hold this run
  194. str = new AMI_STREAM<T>();
  195. str->write_array(data, crt_run_size);
  196. assert(str->stream_len() == crt_run_size);
  197. //remember this run's name
  198. str->name(&strname); /* deleted after we dequeue */
  199. runList->enqueue(strname);
  200. //delete the stream -- should not keep too many streams open
  201. str->persist(PERSIST_PERSISTENT);
  202. delete str;
  203. }
  204. }
  205. SDEBUG MM_manager.print();
  206. //release the run memory!
  207. delete [] data;
  208. SDEBUG cout << "runFormation: done.\n";
  209. SDEBUG MM_manager.print();
  210. return runList;
  211. }
  212. /* ---------------------------------------------------------------------- */
  213. //this is one pass of merge; estimate max possible merge arity <ar>
  214. //and merge the first <ar> streams from runList ; create and return
  215. //the resulting stream (does not add it to the queue -- the calling
  216. //function will do that)
  217. //input streams are assumed to be sorted, and are not necessarily of
  218. //the same length.
  219. //streamList does not contains streams, but names of streams, which
  220. //must be opened in order to be merged
  221. //The comparison object "cmp", of (user-defined) class represented by
  222. //Compare, must have a member function called "compare" which is used
  223. //for sorting the input stream.
  224. template<class T, class Compare>
  225. AMI_STREAM<T>*
  226. singleMerge(queue<char*>* streamList, Compare *cmp)
  227. {
  228. AMI_STREAM<T>* mergedStr;
  229. size_t mm_avail, blocksize;
  230. unsigned int arity, max_arity;
  231. T elt;
  232. assert(streamList && cmp);
  233. SDEBUG cout << "singleMerge: ";
  234. //estimate max possible merge arity with available memory (approx M/B)
  235. mm_avail = MM_manager.memory_available();
  236. //blocksize = getpagesize();
  237. //should use AMI function, but there's no stream at this point
  238. //now use static mtd -RW 5/05
  239. AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_MAXIMUM);
  240. max_arity = mm_avail / blocksize;
  241. if(max_arity < 2) {
  242. cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
  243. max_arity = 2;
  244. } else if(max_arity > MAX_STREAMS_OPEN) {
  245. max_arity = MAX_STREAMS_OPEN;
  246. }
  247. arity = (streamList->length() < max_arity) ?
  248. streamList->length() : max_arity;
  249. SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n";
  250. /* create the output stream. if this is a complete merge, use finalpath */
  251. //create output stream
  252. mergedStr = new AMI_STREAM<T>();
  253. ReplacementHeap<T,Compare> rheap(arity, streamList);
  254. SDEBUG rheap.print(cerr);
  255. int i = 0;
  256. while (!rheap.empty()) {
  257. //mergedStr->write_item( rheap.extract_min() );
  258. //xxx should check error here
  259. elt = rheap.extract_min();
  260. mergedStr->write_item(elt);
  261. //SDEBUG cerr << "smerge: written " << elt << endl;
  262. i++;
  263. }
  264. SDEBUG cout << "..done\n";
  265. return mergedStr;
  266. }
  267. /* ---------------------------------------------------------------------- */
  268. //merge runs whose names are given by runList; this may entail
  269. //multiple passes of singleMerge();
  270. //return the resulting output stream
  271. //input streams are assumed to be sorted, and are not necessarily of
  272. //the same length.
  273. //The comparison object "cmp", of (user-defined) class represented by
  274. //Compare, must have a member function called "compare" which is used
  275. //for sorting the input stream.
  276. template<class T, class Compare>
  277. AMI_STREAM<T>*
  278. multiMerge(queue<char*>* runList, Compare *cmp)
  279. {
  280. AMI_STREAM<T> * mergedStr= NULL;
  281. char* path;
  282. assert(runList && runList->length() > 1 && cmp);
  283. SDEBUG cout << "multiMerge: " << runList->length() << " runs" << endl;
  284. while (runList->length() > 1) {
  285. //merge streams from streamlist into mergedStr
  286. mergedStr = singleMerge<T,Compare>(runList, cmp);
  287. //i thought the templates are not needed in the call, but seems to
  288. //help the compiler..laura
  289. assert(mergedStr);
  290. //if more runs exist, delete this stream and add it to list
  291. if (runList->length() > 0) {
  292. mergedStr->name(&path);
  293. runList->enqueue(path);
  294. mergedStr->persist(PERSIST_PERSISTENT);
  295. delete mergedStr;
  296. }
  297. }
  298. assert(runList->length() == 0);
  299. assert(mergedStr);
  300. return mergedStr;
  301. }
  302. #endif