ami_stream.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. /****************************************************************************
  2. *
  3. * MODULE: iostream
  4. *
  5. * COPYRIGHT (C) 2007 Laura Toma
  6. *
  7. *
  8. * Iostream is a library that implements streams, external memory
  9. * sorting on streams, and an external memory priority queue on
  10. * streams. These are the fundamental components used in external
  11. * memory algorithms.
  12. * Credits: The library was developed by Laura Toma. The kernel of
  13. * class STREAM is based on the similar class existent in the GPL TPIE
  14. * project developed at Duke University. The sorting and priority
  15. * queue have been developed by Laura Toma based on communications
  16. * with Rajiv Wickremesinghe. The library was developed as part of
  17. * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
  18. * Rajiv Wickremesinghe as part of the Terracost project.
  19. *
  20. * This program is free software; you can redistribute it and/or modify
  21. * it under the terms of the GNU General Public License as published by
  22. * the Free Software Foundation; either version 2 of the License, or
  23. * (at your option) any later version.
  24. *
  25. * This program is distributed in the hope that it will be useful,
  26. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  27. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  28. * General Public License for more details. *
  29. * **************************************************************************/
  30. #ifndef _AMI_STREAM_H
  31. #define _AMI_STREAM_H
  32. #include <grass/config.h>
  33. #include <sys/types.h>
  34. #include <sys/stat.h>
  35. #include <stdio.h>
  36. #include <stdlib.h>
  37. #include <assert.h>
  38. #include <fcntl.h>
  39. #include <errno.h>
  40. #include <unistd.h>
  41. #include <cstring>
  42. #include <iostream>
  43. using namespace std;
  44. extern "C" {
  45. #include <grass/gis.h>
  46. }
  47. #define MAX_STREAMS_OPEN 200
  48. #include "mm.h" // Get the memory manager.
  49. #define DEBUG_DELETE if(0)
  50. #define DEBUG_STREAM_LEN if(0)
  51. #define DEBUG_ASSERT if(0)
  52. // The name of the environment variable which keeps the name of the
  53. // directory where streams are stored
  54. #define STREAM_TMPDIR "STREAM_DIR"
  55. // All streams will be names STREAM_*****
  56. #define BASE_NAME "STREAM"
  57. #define STREAM_BUFFER_SIZE (1<<18)
  58. //
  59. // AMI error codes are returned using the AMI_err type.
  60. //
  61. enum AMI_err {
  62. AMI_ERROR_NO_ERROR = 0,
  63. AMI_ERROR_IO_ERROR,
  64. AMI_ERROR_END_OF_STREAM,
  65. AMI_ERROR_OUT_OF_RANGE,
  66. AMI_ERROR_READ_ONLY,
  67. AMI_ERROR_OS_ERROR,
  68. AMI_ERROR_MM_ERROR,
  69. AMI_ERROR_OBJECT_INITIALIZATION,
  70. AMI_ERROR_PERMISSION_DENIED,
  71. AMI_ERROR_INSUFFICIENT_MAIN_MEMORY,
  72. AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS,
  73. AMI_ERROR_ENV_UNDEFINED,
  74. AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
  75. };
  76. extern const char *ami_str_error[];
  77. //
  78. // AMI stream types passed to constructors
  79. //
  80. enum AMI_stream_type {
  81. AMI_READ_STREAM = 1, // Open existing stream for reading
  82. AMI_WRITE_STREAM, // Open for writing. Create if non-existent
  83. AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
  84. AMI_READ_WRITE_STREAM, // Open to read and write.
  85. AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
  86. };
  87. enum persistence {
  88. // Delete the stream from the disk when it is destructed.
  89. PERSIST_DELETE = 0,
  90. // Do not delete the stream from the disk when it is destructed.
  91. PERSIST_PERSISTENT,
  92. // Delete each block of data from the disk as it is read.
  93. PERSIST_READ_ONCE
  94. };
  95. /* an un-templated version makes for easier debugging */
  96. class UntypedStream {
  97. protected:
  98. FILE * fp;
  99. int fildes; //descriptor of file
  100. AMI_stream_type access_mode;
  101. char path[BUFSIZ];
  102. persistence per;
  103. //0 for streams, positive for substreams
  104. unsigned int substream_level;
  105. // If this stream is actually a substream, these will be set to
  106. // indicate the portion of the file that is part of this stream. If
  107. // the stream is the whole file, they will be set to -1. Both are in
  108. // T units.
  109. off_t logical_bos;
  110. off_t logical_eos;
  111. //stream buffer passed in the call to setvbuf when file is opened
  112. char* buf;
  113. int eof_reached;
  114. public:
  115. static unsigned int get_block_length() {
  116. return STREAM_BUFFER_SIZE;
  117. //return getpagesize();
  118. };
  119. };
  120. template<class T>
  121. class AMI_STREAM : public UntypedStream {
  122. protected:
  123. T read_tmp; /* this is ugly... RW */
  124. public:
  125. // An AMI_stream with default name
  126. AMI_STREAM();
  127. // An AMI stream based on a specific path name.
  128. AMI_STREAM(const char *path_name, AMI_stream_type st = AMI_READ_WRITE_STREAM);
  129. // convenience function with split path_name
  130. //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
  131. // A psuedo-constructor for substreams.
  132. AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
  133. AMI_STREAM<T> **sub_stream);
  134. // Destructor
  135. ~AMI_STREAM(void);
  136. // Read and write elements.
  137. AMI_err read_item(T **elt);
  138. AMI_err write_item(const T &elt);
  139. AMI_err read_array(T *data, off_t len, off_t *lenp=NULL);
  140. AMI_err write_array(const T *data, off_t len);
  141. // Return the number of items in the stream.
  142. off_t stream_len(void);
  143. // Return the path name of this stream.
  144. AMI_err name(char **stream_name);
  145. const char* name() const;
  146. // Move to a specific item in the stream.
  147. AMI_err seek(off_t offset);
  148. // Query memory usage
  149. static AMI_err main_memory_usage(size_t *usage,
  150. MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
  151. void persist(persistence p);
  152. char *sprint();
  153. // have we hit the end of the stream
  154. int eof();
  155. };
  156. /**********************************************************************/
  157. /**********************************************************************/
  158. /* creates a random file name, opens the file for reading and writing
  159. and and returns a file descriptor */
  160. /* int ami_single_temp_name(char *base, char* tmp_path); */
  161. /* fix from Andy Danner */
  162. int ami_single_temp_name(const std::string& base, char* tmp_path);
  163. /**********************************************************************/
  164. /* given fd=fide descriptor, associates with it a stream aopened in
  165. access_mode and returns it */
  166. FILE* open_stream(int fd, AMI_stream_type st);
  167. /**********************************************************************/
  168. /* open the file whose name is pathname in access mode */
  169. FILE* open_stream(char* pathname, AMI_stream_type st);
  170. /********************************************************************/
  171. // An AMI stream with default name.
  172. template<class T>
  173. AMI_STREAM<T>::AMI_STREAM() {
  174. access_mode = AMI_READ_WRITE_STREAM;
  175. int fd = ami_single_temp_name(BASE_NAME, path);
  176. fildes = fd;
  177. fp = open_stream(fd, access_mode);
  178. /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
  179. buf = new char[STREAM_BUFFER_SIZE];
  180. if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
  181. cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
  182. << strerror(errno) << endl;
  183. exit(1);
  184. }
  185. // By default, all streams are deleted at destruction time.
  186. per = PERSIST_DELETE;
  187. // Not a substream.
  188. substream_level = 0;
  189. logical_bos = logical_eos = -1;
  190. // why is this here in the first place?? -RW
  191. seek(0);
  192. eof_reached = 0;
  193. // Register memory usage before returning.
  194. //size_t usage;
  195. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  196. //MM_manager.register_allocation(usage);
  197. }
  198. /**********************************************************************/
  199. // An AMI stream based on a specific path name.
  200. template<class T>
  201. AMI_STREAM<T>::AMI_STREAM(const char *path_name, AMI_stream_type st) {
  202. access_mode = st;
  203. if(path_name == NULL) {
  204. int fd = ami_single_temp_name(BASE_NAME, path);
  205. fildes = fd;
  206. fp = open_stream(fd, access_mode);
  207. } else {
  208. strcpy(path, path_name);
  209. fp = open_stream(path, st);
  210. fildes = -1;
  211. }
  212. /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
  213. buf = new char[STREAM_BUFFER_SIZE];
  214. if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
  215. cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
  216. << strerror(errno) << endl;
  217. exit(1);
  218. }
  219. eof_reached = 0;
  220. // By default, all streams are deleted at destruction time.
  221. if(st == AMI_READ_STREAM) {
  222. per = PERSIST_PERSISTENT;
  223. } else {
  224. per = PERSIST_DELETE;
  225. }
  226. // Not a substream.
  227. substream_level = 0;
  228. logical_bos = logical_eos = -1;
  229. seek(0);
  230. // Register memory usage before returning.
  231. //size_t usage;
  232. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  233. //MM_manager.register_allocation(usage);
  234. };
  235. /**********************************************************************/
  236. // A psuedo-constructor for substreams.
  237. template<class T>
  238. AMI_err AMI_STREAM<T>::new_substream(AMI_stream_type st,
  239. off_t sub_begin,
  240. off_t sub_end,
  241. AMI_STREAM<T> **sub_stream) {
  242. //assume this for now
  243. assert(st == AMI_READ_STREAM);
  244. #ifdef __MINGW32__
  245. /* MINGW32: reopen file here for stream_len() below */
  246. //reopen the file
  247. AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
  248. #endif
  249. //check range
  250. if (substream_level) {
  251. if( (sub_begin >= (logical_eos - logical_bos)) ||
  252. (sub_end >= (logical_eos - logical_bos)) ) {
  253. return AMI_ERROR_OUT_OF_RANGE;
  254. }
  255. } else {
  256. off_t len = stream_len();
  257. if (sub_begin > len || sub_end > len) {
  258. return AMI_ERROR_OUT_OF_RANGE;
  259. }
  260. }
  261. #ifndef __MINGW32__
  262. //reopen the file
  263. AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
  264. #endif
  265. // Set up the beginning and end positions.
  266. if (substream_level) {
  267. substr->logical_bos = logical_bos + sub_begin;
  268. substr->logical_eos = logical_bos + sub_end + 1;
  269. } else {
  270. substr->logical_bos = sub_begin;
  271. substr->logical_eos = sub_end + 1;
  272. }
  273. // Set the current position.
  274. substr->seek(0);
  275. substr->eof_reached = 0;
  276. //set substream level
  277. substr->substream_level = substream_level + 1;
  278. substr->per = per; //set persistence
  279. //*sub_stream = (AMI_base_stream < T > *)substr;
  280. *sub_stream = substr;
  281. return AMI_ERROR_NO_ERROR;
  282. };
  283. /**********************************************************************/
  284. // Return the number of items in the stream.
  285. template<class T>
  286. off_t AMI_STREAM<T>::stream_len(void) {
  287. fflush(fp);
  288. #ifdef __MINGW32__
  289. //stat() fails on MS Windows if the file is open, so try G_ftell() instead.
  290. //FIXME: not 64bit safe, but WinGrass isn't either right now.
  291. off_t posn_save, st_size;
  292. posn_save = G_ftell(fp);
  293. if(posn_save == -1) {
  294. perror("ERROR: AMI_STREAM::stream_len(): ftell(fp) failed ");
  295. perror(path);
  296. exit(1);
  297. }
  298. G_fseek(fp, 0, SEEK_END);
  299. st_size = G_ftell(fp);
  300. if(st_size == -1) {
  301. perror("ERROR: AMI_STREAM::stream_len(): ftell[SEEK_END] failed ");
  302. perror(path);
  303. exit(1);
  304. }
  305. G_fseek(fp, posn_save, SEEK_SET);
  306. //debug stream_len:
  307. DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%d\n",
  308. path, st_size, sizeof(T));
  309. return (st_size / sizeof(T));
  310. #else
  311. struct stat buf;
  312. if (stat(path, &buf) == -1) {
  313. perror("AMI_STREAM::stream_len(): fstat failed ");
  314. DEBUG_ASSERT assert(0);
  315. exit(1);
  316. }
  317. //debug stream_len:
  318. DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%lud\n",
  319. path, (long long int)buf.st_size, sizeof(T));
  320. return (buf.st_size / sizeof(T));
  321. #endif
  322. };
  323. /**********************************************************************/
  324. // Return the path name of this stream.
  325. template<class T>
  326. AMI_err AMI_STREAM<T>::name(char **stream_name) {
  327. *stream_name = new char [strlen(path) + 1];
  328. strcpy(*stream_name, path);
  329. return AMI_ERROR_NO_ERROR;
  330. };
  331. // Return the path name of this stream.
  332. template<class T>
  333. const char *
  334. AMI_STREAM<T>::name() const {
  335. return path;
  336. };
  337. /**********************************************************************/
  338. // Move to a specific offset within the (sub)stream.
  339. template<class T>
  340. AMI_err AMI_STREAM<T>::seek(off_t offset) {
  341. off_t seek_offset;
  342. if (substream_level) { //substream
  343. if (offset > (unsigned) (logical_eos - logical_bos)) {
  344. //offset out of range
  345. cerr << "ERROR: AMI_STREAM::seek bos=" << logical_bos << ", eos="
  346. << logical_eos << ", offset " << offset << " out of range.\n";
  347. DEBUG_ASSERT assert(0);
  348. exit(1);
  349. } else {
  350. //offset in range
  351. seek_offset = (logical_bos + offset) * sizeof(T);
  352. }
  353. } else {
  354. //not a substream
  355. seek_offset = offset * sizeof(T);
  356. }
  357. G_fseek(fp, seek_offset, SEEK_SET);
  358. return AMI_ERROR_NO_ERROR;
  359. }
  360. /**********************************************************************/
  361. // Query memory usage
  362. template<class T>
  363. AMI_err
  364. AMI_STREAM<T>::main_memory_usage(size_t *usage, MM_stream_usage usage_type) {
  365. switch (usage_type) {
  366. case MM_STREAM_USAGE_OVERHEAD:
  367. *usage = sizeof (AMI_STREAM<T>);
  368. break;
  369. case MM_STREAM_USAGE_BUFFER:
  370. // *usage = get_block_length();
  371. *usage = STREAM_BUFFER_SIZE*sizeof(char);
  372. break;
  373. case MM_STREAM_USAGE_CURRENT:
  374. case MM_STREAM_USAGE_MAXIMUM:
  375. // *usage = sizeof (*this) + get_block_length();
  376. *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
  377. break;
  378. }
  379. return AMI_ERROR_NO_ERROR;
  380. };
  381. /**********************************************************************/
  382. template<class T>
  383. AMI_STREAM<T>::~AMI_STREAM(void) {
  384. DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
  385. assert(fp);
  386. fclose(fp);
  387. delete buf;
  388. // Get rid of the file if not persistent and if not substream.
  389. if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
  390. if (unlink(path) == -1) {
  391. cerr << "ERROR: AMI_STREAM: failed to unlink " << path << endl;
  392. perror("cannot unlink: ");
  393. DEBUG_ASSERT assert(0);
  394. exit(1);
  395. }
  396. }
  397. // Register memory deallocation before returning.
  398. //size_t usage;
  399. //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
  400. //MM_manager.register_deallocation(usage);
  401. };
  402. /**********************************************************************/
  403. template<class T>
  404. AMI_err AMI_STREAM<T>::read_item(T **elt) {
  405. assert(fp);
  406. //if we go past substream range
  407. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  408. return AMI_ERROR_END_OF_STREAM;
  409. } else {
  410. if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
  411. if(feof(fp)) {
  412. eof_reached = 1;
  413. return AMI_ERROR_END_OF_STREAM;
  414. } else {
  415. cerr << "ERROR: file=" << path << ":";
  416. perror("cannot read!");
  417. return AMI_ERROR_IO_ERROR;
  418. }
  419. }
  420. *elt = &read_tmp;
  421. return AMI_ERROR_NO_ERROR;
  422. }
  423. };
  424. /**********************************************************************/
  425. template<class T>
  426. AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp) {
  427. size_t nobj;
  428. assert(fp);
  429. //if we go past substream range
  430. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  431. eof_reached = 1;
  432. return AMI_ERROR_END_OF_STREAM;
  433. } else {
  434. nobj = fread((void*)data, sizeof(T), len, fp);
  435. if (nobj < len) { /* some kind of error */
  436. if(feof(fp)) {
  437. if(lenp) *lenp = nobj;
  438. eof_reached = 1;
  439. return AMI_ERROR_END_OF_STREAM;
  440. } else {
  441. cerr << "ERROR: file=" << path << ":";
  442. perror("cannot read!");
  443. return AMI_ERROR_IO_ERROR;
  444. }
  445. }
  446. if(lenp) *lenp = nobj;
  447. return AMI_ERROR_NO_ERROR;
  448. }
  449. };
  450. /**********************************************************************/
  451. template<class T>
  452. AMI_err AMI_STREAM<T>::write_item(const T &elt) {
  453. assert(fp);
  454. //if we go past substream range
  455. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  456. return AMI_ERROR_END_OF_STREAM;
  457. } else {
  458. if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
  459. cerr << "ERROR: AMI_STREAM::write_item failed.\n";
  460. if (path && *path)
  461. perror(path);
  462. else
  463. perror("AMI_STREAM::write_item: ");
  464. DEBUG_ASSERT assert(0);
  465. exit(1);
  466. }
  467. return AMI_ERROR_NO_ERROR;
  468. }
  469. };
  470. /**********************************************************************/
  471. template<class T>
  472. AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
  473. size_t nobj;
  474. assert(fp);
  475. //if we go past substream range
  476. if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
  477. return AMI_ERROR_END_OF_STREAM;
  478. } else {
  479. nobj = fwrite(data, sizeof(T), len, fp);
  480. if (nobj < len) {
  481. cerr << "ERROR: AMI_STREAM::write_array failed.\n";
  482. if (path && *path)
  483. perror(path);
  484. else
  485. perror("AMI_STREAM::write_array: ");
  486. DEBUG_ASSERT assert(0);
  487. exit(1);
  488. }
  489. return AMI_ERROR_NO_ERROR;
  490. }
  491. };
  492. /**********************************************************************/
  493. template<class T>
  494. void AMI_STREAM<T>::persist(persistence p) {
  495. per = p;
  496. };
  497. /**********************************************************************/
  498. // sprint()
  499. // Return a string describing the stream
  500. //
  501. // This function gives easy access to the file name, length.
  502. // It is not reentrant, but this should not be too much of a problem
  503. // if you are careful.
  504. template<class T>
  505. char *AMI_STREAM<T>::sprint() {
  506. static char buf[BUFSIZ];
  507. sprintf(buf, "[AMI_STREAM %s %ld]", path, (long)stream_len());
  508. return buf;
  509. };
  510. /**********************************************************************/
  511. template<class T>
  512. int AMI_STREAM<T>::eof() {
  513. return eof_reached;
  514. };
  515. #endif // _AMI_STREAM_H