ami_stream.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef _AMI_STREAM_H
  31. #define _AMI_STREAM_H
  32. #include <grass/config.h>
  33. #include <sys/types.h>
  34. #include <sys/stat.h>
  35. #include <stdio.h>
  36. #include <stdlib.h>
  37. #include <assert.h>
  38. #include <fcntl.h>
  39. #include <errno.h>
  40. #include <unistd.h>
  41. #include <cstring>
  42. #include <iostream>
  43. using std::cout;
  44. using std::cerr;
  45. using std::endl;
  46. using std::ostream;
  47. using std::ofstream;
  48. using std::istream;
  49. extern "C" {
  50. #include <grass/gis.h>
  51. }
  52. #define MAX_STREAMS_OPEN 200
  53. #include "mm.h" // Get the memory manager.
  54. #define DEBUG_DELETE if(0)
  55. #define DEBUG_STREAM_LEN if(0)
  56. #define DEBUG_ASSERT if(0)
  57. // The name of the environment variable which keeps the name of the
  58. // directory where streams are stored
  59. #define STREAM_TMPDIR "STREAM_DIR"
  60. // All streams will be names STREAM_*****
  61. #define BASE_NAME "STREAM"
  62. #define STREAM_BUFFER_SIZE (1<<18)
  63. //
  64. // AMI error codes are returned using the AMI_err type.
  65. //
  66. enum AMI_err {
  67. AMI_ERROR_NO_ERROR = 0,
  68. AMI_ERROR_IO_ERROR,
  69. AMI_ERROR_END_OF_STREAM,
  70. AMI_ERROR_OUT_OF_RANGE,
  71. AMI_ERROR_READ_ONLY,
  72. AMI_ERROR_OS_ERROR,
  73. AMI_ERROR_MM_ERROR,
  74. AMI_ERROR_OBJECT_INITIALIZATION,
  75. AMI_ERROR_PERMISSION_DENIED,
  76. AMI_ERROR_INSUFFICIENT_MAIN_MEMORY,
  77. AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS,
  78. AMI_ERROR_ENV_UNDEFINED,
  79. AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
  80. };
  81. extern const char *ami_str_error[];
  82. //
  83. // AMI stream types passed to constructors
  84. //
  85. enum AMI_stream_type {
  86. AMI_READ_STREAM = 1, // Open existing stream for reading
  87. AMI_WRITE_STREAM, // Open for writing. Create if non-existent
  88. AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
  89. AMI_READ_WRITE_STREAM, // Open to read and write.
  90. AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
  91. };
  92. enum persistence {
  93. // Delete the stream from the disk when it is destructed.
  94. PERSIST_DELETE = 0,
  95. // Do not delete the stream from the disk when it is destructed.
  96. PERSIST_PERSISTENT,
  97. // Delete each block of data from the disk as it is read.
  98. PERSIST_READ_ONCE
  99. };
  100. /* an un-templated version makes for easier debugging */
  101. class UntypedStream {
  102. protected:
  103. FILE * fp;
  104. int fildes; //descriptor of file
  105. AMI_stream_type access_mode;
  106. char path[BUFSIZ];
  107. persistence per;
  108. //0 for streams, positive for substreams
  109. unsigned int substream_level;
  110. // If this stream is actually a substream, these will be set to
  111. // indicate the portion of the file that is part of this stream. If
  112. // the stream is the whole file, they will be set to -1. Both are in
  113. // T units.
  114. off_t logical_bos;
  115. off_t logical_eos;
  116. //stream buffer passed in the call to setvbuf when file is opened
  117. char* buf;
  118. int eof_reached;
  119. public:
  120. static unsigned int get_block_length() {
  121. return STREAM_BUFFER_SIZE;
  122. //return getpagesize();
  123. };
  124. };
  125. template<class T>
  126. class AMI_STREAM : public UntypedStream {
  127. protected:
  128. T read_tmp; /* this is ugly... RW */
  129. public:
  130. // An AMI_stream with default name
  131. AMI_STREAM();
  132. // An AMI stream based on a specific path name.
  133. AMI_STREAM(const char *path_name, AMI_stream_type st = AMI_READ_WRITE_STREAM);
  134. // convenience function with split path_name
  135. //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
  136. // A psuedo-constructor for substreams.
  137. AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
  138. AMI_STREAM<T> **sub_stream);
  139. // Destructor
  140. ~AMI_STREAM(void);
  141. // Read and write elements.
  142. AMI_err read_item(T **elt);
  143. AMI_err write_item(const T &elt);
  144. AMI_err read_array(T *data, off_t len, off_t *lenp=NULL);
  145. AMI_err write_array(const T *data, off_t len);
  146. // Return the number of items in the stream.
  147. off_t stream_len(void);
  148. // Return the path name of this stream.
  149. AMI_err name(char **stream_name);
  150. const char* name() const;
  151. // Move to a specific item in the stream.
  152. AMI_err seek(off_t offset);
  153. // Query memory usage
  154. static AMI_err main_memory_usage(size_t *usage,
  155. MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
  156. void persist(persistence p);
  157. char *sprint();
  158. // have we hit the end of the stream
  159. int eof();
  160. };
  161. /**********************************************************************/
  162. /**********************************************************************/
  163. /* creates a random file name, opens the file for reading and writing
  164. and and returns a file descriptor */
  165. /* int ami_single_temp_name(char *base, char* tmp_path); */
  166. /* fix from Andy Danner */
  167. int ami_single_temp_name(const std::string& base, char* tmp_path);
  168. /**********************************************************************/
  169. /* given fd=fide descriptor, associates with it a stream aopened in
  170. access_mode and returns it */
  171. FILE* open_stream(int fd, AMI_stream_type st);
  172. /**********************************************************************/
  173. /* open the file whose name is pathname in access mode */
  174. FILE* open_stream(char* pathname, AMI_stream_type st);
  175. /********************************************************************/
  176. // An AMI stream with default name.
  177. template<class T>
  178. AMI_STREAM<T>::AMI_STREAM() {
  179. access_mode = AMI_READ_WRITE_STREAM;
  180. int fd = ami_single_temp_name(BASE_NAME, path);
  181. fildes = fd;
  182. fp = open_stream(fd, access_mode);
  183. /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
  184. buf = new char[STREAM_BUFFER_SIZE];
  185. if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
  186. cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
  187. << strerror(errno) << endl;
  188. exit(1);
  189. }
  190. // By default, all streams are deleted at destruction time.
  191. per = PERSIST_DELETE;
  192. // Not a substream.
  193. substream_level = 0;
  194. logical_bos = logical_eos = -1;
  195. // why is this here in the first place?? -RW
  196. seek(0);
  197. eof_reached = 0;
  198. // Register memory usage before returning.
  199. //size_t usage;
  200. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  201. //MM_manager.register_allocation(usage);
  202. }
  203. /**********************************************************************/
  204. // An AMI stream based on a specific path name.
  205. template<class T>
  206. AMI_STREAM<T>::AMI_STREAM(const char *path_name, AMI_stream_type st) {
  207. access_mode = st;
  208. if(path_name == NULL) {
  209. int fd = ami_single_temp_name(BASE_NAME, path);
  210. fildes = fd;
  211. fp = open_stream(fd, access_mode);
  212. } else {
  213. strcpy(path, path_name);
  214. fp = open_stream(path, st);
  215. fildes = -1;
  216. }
  217. /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
  218. buf = new char[STREAM_BUFFER_SIZE];
  219. if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
  220. cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
  221. << strerror(errno) << endl;
  222. exit(1);
  223. }
  224. eof_reached = 0;
  225. // By default, all streams are deleted at destruction time.
  226. if(st == AMI_READ_STREAM) {
  227. per = PERSIST_PERSISTENT;
  228. } else {
  229. per = PERSIST_DELETE;
  230. }
  231. // Not a substream.
  232. substream_level = 0;
  233. logical_bos = logical_eos = -1;
  234. seek(0);
  235. // Register memory usage before returning.
  236. //size_t usage;
  237. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  238. //MM_manager.register_allocation(usage);
  239. }
  240. /**********************************************************************/
  241. // A psuedo-constructor for substreams.
  242. template<class T>
  243. AMI_err AMI_STREAM<T>::new_substream(AMI_stream_type st,
  244. off_t sub_begin,
  245. off_t sub_end,
  246. AMI_STREAM<T> **sub_stream) {
  247. //assume this for now
  248. assert(st == AMI_READ_STREAM);
  249. #ifdef __MINGW32__
  250. /* MINGW32: reopen file here for stream_len() below */
  251. //reopen the file
  252. AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
  253. #endif
  254. //check range
  255. if (substream_level) {
  256. if( (sub_begin >= (logical_eos - logical_bos)) ||
  257. (sub_end >= (logical_eos - logical_bos)) ) {
  258. return AMI_ERROR_OUT_OF_RANGE;
  259. }
  260. } else {
  261. off_t len = stream_len();
  262. if (sub_begin > len || sub_end > len) {
  263. return AMI_ERROR_OUT_OF_RANGE;
  264. }
  265. }
  266. #ifndef __MINGW32__
  267. //reopen the file
  268. AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
  269. #endif
  270. // Set up the beginning and end positions.
  271. if (substream_level) {
  272. substr->logical_bos = logical_bos + sub_begin;
  273. substr->logical_eos = logical_bos + sub_end + 1;
  274. } else {
  275. substr->logical_bos = sub_begin;
  276. substr->logical_eos = sub_end + 1;
  277. }
  278. // Set the current position.
  279. substr->seek(0);
  280. substr->eof_reached = 0;
  281. //set substream level
  282. substr->substream_level = substream_level + 1;
  283. substr->per = per; //set persistence
  284. //*sub_stream = (AMI_base_stream < T > *)substr;
  285. *sub_stream = substr;
  286. return AMI_ERROR_NO_ERROR;
  287. }
  288. /**********************************************************************/
  289. // Return the number of items in the stream.
  290. template<class T>
  291. off_t AMI_STREAM<T>::stream_len(void) {
  292. fflush(fp);
  293. #ifdef __MINGW32__
  294. //stat() fails on MS Windows if the file is open, so try G_ftell() instead.
  295. //FIXME: not 64bit safe, but WinGrass isn't either right now.
  296. off_t posn_save, st_size;
  297. posn_save = G_ftell(fp);
  298. if(posn_save == -1) {
  299. perror("ERROR: AMI_STREAM::stream_len(): ftell(fp) failed ");
  300. perror(path);
  301. exit(1);
  302. }
  303. G_fseek(fp, 0, SEEK_END);
  304. st_size = G_ftell(fp);
  305. if(st_size == -1) {
  306. perror("ERROR: AMI_STREAM::stream_len(): ftell[SEEK_END] failed ");
  307. perror(path);
  308. exit(1);
  309. }
  310. G_fseek(fp, posn_save, SEEK_SET);
  311. //debug stream_len:
  312. DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%d\n",
  313. path, st_size, sizeof(T));
  314. return (st_size / sizeof(T));
  315. #else
  316. struct stat statbuf;
  317. if (stat(path, &statbuf) == -1) {
  318. perror("AMI_STREAM::stream_len(): fstat failed ");
  319. DEBUG_ASSERT assert(0);
  320. exit(1);
  321. }
  322. //debug stream_len:
  323. DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%lud\n",
  324. path, (long long int)statbuf.st_size, sizeof(T));
  325. return (statbuf.st_size / sizeof(T));
  326. #endif
  327. }
  328. /**********************************************************************/
  329. // Return the path name of this stream.
  330. template<class T>
  331. AMI_err AMI_STREAM<T>::name(char **stream_name) {
  332. *stream_name = new char [strlen(path) + 1];
  333. strcpy(*stream_name, path);
  334. return AMI_ERROR_NO_ERROR;
  335. }
  336. // Return the path name of this stream.
  337. template<class T>
  338. const char *
  339. AMI_STREAM<T>::name() const {
  340. return path;
  341. }
  342. /**********************************************************************/
  343. // Move to a specific offset within the (sub)stream.
  344. template<class T>
  345. AMI_err AMI_STREAM<T>::seek(off_t offset) {
  346. off_t seek_offset;
  347. if (substream_level) { //substream
  348. if (offset > (unsigned) (logical_eos - logical_bos)) {
  349. //offset out of range
  350. cerr << "ERROR: AMI_STREAM::seek bos=" << logical_bos << ", eos="
  351. << logical_eos << ", offset " << offset << " out of range.\n";
  352. DEBUG_ASSERT assert(0);
  353. exit(1);
  354. } else {
  355. //offset in range
  356. seek_offset = (logical_bos + offset) * sizeof(T);
  357. }
  358. } else {
  359. //not a substream
  360. seek_offset = offset * sizeof(T);
  361. }
  362. G_fseek(fp, seek_offset, SEEK_SET);
  363. return AMI_ERROR_NO_ERROR;
  364. }
  365. /**********************************************************************/
  366. // Query memory usage
  367. template<class T>
  368. AMI_err
  369. AMI_STREAM<T>::main_memory_usage(size_t *usage, MM_stream_usage usage_type) {
  370. switch (usage_type) {
  371. case MM_STREAM_USAGE_OVERHEAD:
  372. *usage = sizeof (AMI_STREAM<T>);
  373. break;
  374. case MM_STREAM_USAGE_BUFFER:
  375. // *usage = get_block_length();
  376. *usage = STREAM_BUFFER_SIZE*sizeof(char);
  377. break;
  378. case MM_STREAM_USAGE_CURRENT:
  379. case MM_STREAM_USAGE_MAXIMUM:
  380. // *usage = sizeof (*this) + get_block_length();
  381. *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
  382. break;
  383. }
  384. return AMI_ERROR_NO_ERROR;
  385. }
  386. /**********************************************************************/
  387. template<class T>
  388. AMI_STREAM<T>::~AMI_STREAM(void) {
  389. DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
  390. assert(fp);
  391. fclose(fp);
  392. delete buf;
  393. // Get rid of the file if not persistent and if not substream.
  394. if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
  395. if (unlink(path) == -1) {
  396. cerr << "ERROR: AMI_STREAM: failed to unlink " << path << endl;
  397. perror("cannot unlink: ");
  398. DEBUG_ASSERT assert(0);
  399. exit(1);
  400. }
  401. }
  402. // Register memory deallocation before returning.
  403. //size_t usage;
  404. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  405. //MM_manager.register_deallocation(usage);
  406. }
  407. /**********************************************************************/
  408. template<class T>
  409. AMI_err AMI_STREAM<T>::read_item(T **elt) {
  410. assert(fp);
  411. //if we go past substream range
  412. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  413. return AMI_ERROR_END_OF_STREAM;
  414. } else {
  415. if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
  416. if(feof(fp)) {
  417. eof_reached = 1;
  418. return AMI_ERROR_END_OF_STREAM;
  419. } else {
  420. cerr << "ERROR: file=" << path << ":";
  421. perror("cannot read!");
  422. return AMI_ERROR_IO_ERROR;
  423. }
  424. }
  425. *elt = &read_tmp;
  426. return AMI_ERROR_NO_ERROR;
  427. }
  428. }
  429. /**********************************************************************/
  430. template<class T>
  431. AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp) {
  432. size_t nobj;
  433. assert(fp);
  434. //if we go past substream range
  435. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  436. eof_reached = 1;
  437. return AMI_ERROR_END_OF_STREAM;
  438. } else {
  439. nobj = fread((void*)data, sizeof(T), len, fp);
  440. if (nobj < len) { /* some kind of error */
  441. if(feof(fp)) {
  442. if(lenp) *lenp = nobj;
  443. eof_reached = 1;
  444. return AMI_ERROR_END_OF_STREAM;
  445. } else {
  446. cerr << "ERROR: file=" << path << ":";
  447. perror("cannot read!");
  448. return AMI_ERROR_IO_ERROR;
  449. }
  450. }
  451. if(lenp) *lenp = nobj;
  452. return AMI_ERROR_NO_ERROR;
  453. }
  454. }
  455. /**********************************************************************/
  456. template<class T>
  457. AMI_err AMI_STREAM<T>::write_item(const T &elt) {
  458. assert(fp);
  459. //if we go past substream range
  460. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  461. return AMI_ERROR_END_OF_STREAM;
  462. } else {
  463. if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
  464. cerr << "ERROR: AMI_STREAM::write_item failed.\n";
  465. if (*path)
  466. perror(path);
  467. else
  468. perror("AMI_STREAM::write_item: ");
  469. DEBUG_ASSERT assert(0);
  470. exit(1);
  471. }
  472. return AMI_ERROR_NO_ERROR;
  473. }
  474. }
  475. /**********************************************************************/
  476. template<class T>
  477. AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
  478. size_t nobj;
  479. assert(fp);
  480. //if we go past substream range
  481. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  482. return AMI_ERROR_END_OF_STREAM;
  483. } else {
  484. nobj = fwrite(data, sizeof(T), len, fp);
  485. if (nobj < len) {
  486. cerr << "ERROR: AMI_STREAM::write_array failed.\n";
  487. if (*path)
  488. perror(path);
  489. else
  490. perror("AMI_STREAM::write_array: ");
  491. DEBUG_ASSERT assert(0);
  492. exit(1);
  493. }
  494. return AMI_ERROR_NO_ERROR;
  495. }
  496. }
  497. /**********************************************************************/
  498. template<class T>
  499. void AMI_STREAM<T>::persist(persistence p) {
  500. per = p;
  501. }
  502. /**********************************************************************/
  503. // sprint()
  504. // Return a string describing the stream
  505. //
  506. // This function gives easy access to the file name, length.
  507. // It is not reentrant, but this should not be too much of a problem
  508. // if you are careful.
  509. template<class T>
  510. char *AMI_STREAM<T>::sprint() {
  511. static char desc[BUFSIZ + 256];
  512. sprintf(desc, "[AMI_STREAM %s %ld]", path, (long)stream_len());
  513. return desc;
  514. }
  515. /**********************************************************************/
  516. template<class T>
  517. int AMI_STREAM<T>::eof() {
  518. return eof_reached;
  519. }
  520. #endif // _AMI_STREAM_H