|
@@ -1,6 +1,6 @@
|
|
|
/****************************************************************************
|
|
|
*
|
|
|
- * MODULE: r.terraflow
|
|
|
+ * MODULE: iostream
|
|
|
*
|
|
|
* COPYRIGHT (C) 2007 Laura Toma
|
|
|
*
|
|
@@ -31,14 +31,11 @@
|
|
|
#include <unistd.h>
|
|
|
|
|
|
#include <iostream>
|
|
|
-#include <cstring>
|
|
|
using namespace std;
|
|
|
|
|
|
-#include "mm.h" // Get the memory manager.
|
|
|
+#define MAX_STREAMS_OPEN 200
|
|
|
|
|
|
-#ifdef __MINGW32__
|
|
|
-#define getpagesize() (4096)
|
|
|
-#endif
|
|
|
+#include "mm.h" // Get the memory manager.
|
|
|
|
|
|
#define DEBUG_DELETE if(0)
|
|
|
|
|
@@ -49,7 +46,7 @@ using namespace std;
|
|
|
// All streams will be names STREAM_*****
|
|
|
#define BASE_NAME "STREAM"
|
|
|
|
|
|
-#define STREAM_BUFFER_SIZE (1<<15)
|
|
|
+#define STREAM_BUFFER_SIZE (1<<18)
|
|
|
|
|
|
|
|
|
//
|
|
@@ -71,6 +68,7 @@ enum AMI_err {
|
|
|
AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
|
|
|
};
|
|
|
|
|
|
+extern char *ami_str_error[];
|
|
|
|
|
|
//
|
|
|
// AMI stream types passed to constructors
|
|
@@ -79,7 +77,8 @@ enum AMI_stream_type {
|
|
|
AMI_READ_STREAM = 1, // Open existing stream for reading
|
|
|
AMI_WRITE_STREAM, // Open for writing. Create if non-existent
|
|
|
AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
|
|
|
- AMI_READ_WRITE_STREAM // Open to read and write.
|
|
|
+ AMI_READ_WRITE_STREAM, // Open to read and write.
|
|
|
+ AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
|
|
|
};
|
|
|
|
|
|
|
|
@@ -94,13 +93,11 @@ enum persistence {
|
|
|
PERSIST_READ_ONCE
|
|
|
};
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-template<class T>
|
|
|
-class AMI_STREAM {
|
|
|
-private:
|
|
|
+/* an un-templated version makes for easier debugging */
|
|
|
+class UntypedStream {
|
|
|
+protected:
|
|
|
FILE * fp;
|
|
|
- //int fd; //descriptor of file
|
|
|
+ int fildes; //descriptor of file
|
|
|
AMI_stream_type access_mode;
|
|
|
char path[BUFSIZ];
|
|
|
persistence per;
|
|
@@ -117,35 +114,45 @@ private:
|
|
|
|
|
|
//stream buffer passed in the call to setvbuf when file is opened
|
|
|
char* buf;
|
|
|
+ int eof_reached;
|
|
|
+
|
|
|
+ public:
|
|
|
+ static unsigned int get_block_length() {
|
|
|
+ return STREAM_BUFFER_SIZE;
|
|
|
+ //return getpagesize();
|
|
|
+ };
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+template<class T>
|
|
|
+class AMI_STREAM : public UntypedStream {
|
|
|
|
|
|
protected:
|
|
|
- unsigned int get_block_length();
|
|
|
-
|
|
|
-public:
|
|
|
- T read_tmp;
|
|
|
|
|
|
+ T read_tmp; /* this is ugly... RW */
|
|
|
+
|
|
|
+public:
|
|
|
// An AMI_stream with default name
|
|
|
AMI_STREAM();
|
|
|
|
|
|
// An AMI stream based on a specific path name.
|
|
|
- AMI_STREAM(const char *path_name,
|
|
|
- // AMI_stream_type st = AMI_READ_WRITE_STREAM);
|
|
|
- AMI_stream_type st);
|
|
|
+ AMI_STREAM(const char *path_name, AMI_stream_type st);
|
|
|
+
|
|
|
+ // convenience function with split path_name
|
|
|
+ //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
|
|
|
+
|
|
|
|
|
|
// A psuedo-constructor for substreams.
|
|
|
AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
|
|
|
AMI_STREAM<T> **sub_stream);
|
|
|
-
|
|
|
+
|
|
|
// Destructor
|
|
|
~AMI_STREAM(void);
|
|
|
|
|
|
// Read and write elements.
|
|
|
AMI_err read_item(T **elt);
|
|
|
-
|
|
|
AMI_err write_item(const T &elt);
|
|
|
-
|
|
|
- AMI_err read_array(T *data, off_t len);
|
|
|
-
|
|
|
+ AMI_err read_array(T *data, off_t len, off_t *lenp);
|
|
|
AMI_err write_array(const T *data, off_t len);
|
|
|
|
|
|
// Return the number of items in the stream.
|
|
@@ -153,32 +160,34 @@ public:
|
|
|
|
|
|
// Return the path name of this stream.
|
|
|
AMI_err name(char **stream_name);
|
|
|
+ const char* name() const;
|
|
|
|
|
|
// Move to a specific item in the stream.
|
|
|
AMI_err seek(off_t offset);
|
|
|
-
|
|
|
+
|
|
|
// Query memory usage
|
|
|
- AMI_err main_memory_usage(size_t *usage,
|
|
|
+ static AMI_err main_memory_usage(size_t *usage,
|
|
|
//MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
|
|
|
MM_stream_usage usage_type);
|
|
|
|
|
|
void persist(persistence p);
|
|
|
|
|
|
char *sprint();
|
|
|
+
|
|
|
+ // have we hit the end of the stream
|
|
|
+ int eof();
|
|
|
};
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
-template<class T>
|
|
|
-unsigned int AMI_STREAM<T>::get_block_length() {
|
|
|
- return getpagesize();
|
|
|
-}
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
/* creates a random file name, opens the file for reading and writing
|
|
|
and and returns a file descriptor */
|
|
|
-int ami_single_temp_name(const std::string& base, char* tmp_path);
|
|
|
+/* int ami_single_temp_name(char *base, char* tmp_path); */
|
|
|
+/* fix from Andy Danner */
|
|
|
+int ami_single_temp_name(const std::string& base, char* tmp_path);
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
@@ -201,8 +210,8 @@ AMI_STREAM<T>::AMI_STREAM() {
|
|
|
|
|
|
access_mode = AMI_READ_WRITE_STREAM;
|
|
|
int fd = ami_single_temp_name(BASE_NAME, path);
|
|
|
+ fildes = fd;
|
|
|
fp = open_stream(fd, access_mode);
|
|
|
-
|
|
|
|
|
|
/* a stream is by default buffered with a buffer of size BUFSIZ=1K */
|
|
|
buf = new char[STREAM_BUFFER_SIZE];
|
|
@@ -218,8 +227,11 @@ AMI_STREAM<T>::AMI_STREAM() {
|
|
|
substream_level = 0;
|
|
|
logical_bos = logical_eos = -1;
|
|
|
|
|
|
+ // why is this here in the first place?? -RW
|
|
|
seek(0);
|
|
|
|
|
|
+ eof_reached = 0;
|
|
|
+
|
|
|
// Register memory usage before returning.
|
|
|
//size_t usage;
|
|
|
//main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
|
|
@@ -233,11 +245,18 @@ AMI_STREAM<T>::AMI_STREAM() {
|
|
|
template<class T>
|
|
|
AMI_STREAM<T>::AMI_STREAM(const char *path_name,
|
|
|
AMI_stream_type st = AMI_READ_WRITE_STREAM) {
|
|
|
-
|
|
|
+
|
|
|
access_mode = st;
|
|
|
- strcpy(path, path_name);
|
|
|
- fp = open_stream(path, st);
|
|
|
-
|
|
|
+
|
|
|
+ if(path_name == NULL) {
|
|
|
+ int fd = ami_single_temp_name(BASE_NAME, path);
|
|
|
+ fildes = fd;
|
|
|
+ fp = open_stream(fd, access_mode);
|
|
|
+ } else {
|
|
|
+ strcpy(path, path_name);
|
|
|
+ fp = open_stream(path, st);
|
|
|
+ fildes = -1;
|
|
|
+ }
|
|
|
|
|
|
/* a stream is by default buffered with a buffer of size BUFSIZ=1K */
|
|
|
buf = new char[STREAM_BUFFER_SIZE];
|
|
@@ -246,9 +265,15 @@ AMI_STREAM<T>::AMI_STREAM(const char *path_name,
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
|
+ eof_reached = 0;
|
|
|
+
|
|
|
// By default, all streams are deleted at destruction time.
|
|
|
- per = PERSIST_DELETE;
|
|
|
-
|
|
|
+ if(st == AMI_READ_STREAM) {
|
|
|
+ per = PERSIST_PERSISTENT;
|
|
|
+ } else {
|
|
|
+ per = PERSIST_DELETE;
|
|
|
+ }
|
|
|
+
|
|
|
// Not a substream.
|
|
|
substream_level = 0;
|
|
|
logical_bos = logical_eos = -1;
|
|
@@ -303,12 +328,13 @@ AMI_err AMI_STREAM<T>::new_substream(AMI_stream_type st,
|
|
|
|
|
|
// Set the current position.
|
|
|
substr->seek(0);
|
|
|
-
|
|
|
+
|
|
|
+ substr->eof_reached = 0;
|
|
|
+
|
|
|
//set substream level
|
|
|
substr->substream_level = substream_level + 1;
|
|
|
|
|
|
- //set persistence
|
|
|
- substr->per = per;
|
|
|
+ substr->per = per; //set persistence
|
|
|
|
|
|
//*sub_stream = (AMI_base_stream < T > *)substr;
|
|
|
*sub_stream = substr;
|
|
@@ -327,6 +353,7 @@ off_t AMI_STREAM<T>::stream_len(void) {
|
|
|
struct stat buf;
|
|
|
if (stat(path, &buf) == -1) {
|
|
|
perror("AMI_STREAM::stream_len(): fstat failed ");
|
|
|
+ perror(path);
|
|
|
assert(0);
|
|
|
exit(1);
|
|
|
}
|
|
@@ -349,6 +376,13 @@ AMI_err AMI_STREAM<T>::name(char **stream_name) {
|
|
|
return AMI_ERROR_NO_ERROR;
|
|
|
};
|
|
|
|
|
|
+// Return the path name of this stream.
|
|
|
+template<class T>
|
|
|
+const char *
|
|
|
+AMI_STREAM<T>::name() const {
|
|
|
+ return path;
|
|
|
+};
|
|
|
+
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
@@ -358,8 +392,7 @@ AMI_err AMI_STREAM<T>::seek(off_t offset) {
|
|
|
|
|
|
off_t seek_offset;
|
|
|
|
|
|
- if (substream_level) {
|
|
|
- //substream
|
|
|
+ if (substream_level) { //substream
|
|
|
if (offset > (unsigned) (logical_eos - logical_bos)) {
|
|
|
//offset out of range
|
|
|
cerr << "AMI_STREAM::seek bos=" << logical_bos << ", eos=" << logical_eos
|
|
@@ -377,8 +410,8 @@ AMI_err AMI_STREAM<T>::seek(off_t offset) {
|
|
|
seek_offset = offset * sizeof(T);
|
|
|
}
|
|
|
|
|
|
- if (fseek(fp, seek_offset, SEEK_SET) == -1) {
|
|
|
- cerr << "AMI_STREAM::seek offset=" << seek_offset << "failed.\n";
|
|
|
+ if (fseeko(fp, seek_offset, SEEK_SET) == -1) {
|
|
|
+ cerr << "AMI_STREAM::seek offset=" << seek_offset << " failed.\n";
|
|
|
assert(0);
|
|
|
exit(1);
|
|
|
}
|
|
@@ -392,12 +425,13 @@ AMI_err AMI_STREAM<T>::seek(off_t offset) {
|
|
|
/**********************************************************************/
|
|
|
// Query memory usage
|
|
|
template<class T>
|
|
|
-AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
|
|
|
- MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
|
|
|
+AMI_err
|
|
|
+AMI_STREAM<T>::main_memory_usage(size_t *usage,
|
|
|
+ MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
|
|
|
|
|
|
switch (usage_type) {
|
|
|
case MM_STREAM_USAGE_OVERHEAD:
|
|
|
- *usage = sizeof (*this);
|
|
|
+ *usage = sizeof (AMI_STREAM<T>);
|
|
|
break;
|
|
|
case MM_STREAM_USAGE_BUFFER:
|
|
|
// *usage = get_block_length();
|
|
@@ -406,7 +440,7 @@ AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
|
|
|
case MM_STREAM_USAGE_CURRENT:
|
|
|
case MM_STREAM_USAGE_MAXIMUM:
|
|
|
// *usage = sizeof (*this) + get_block_length();
|
|
|
- *usage = sizeof (*this) + STREAM_BUFFER_SIZE*sizeof(char);
|
|
|
+ *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
|
|
|
break;
|
|
|
}
|
|
|
return AMI_ERROR_NO_ERROR;
|
|
@@ -417,15 +451,15 @@ AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
|
|
|
/**********************************************************************/
|
|
|
template<class T>
|
|
|
AMI_STREAM<T>::~AMI_STREAM(void) {
|
|
|
-
|
|
|
+
|
|
|
DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
|
|
|
- delete buf;
|
|
|
assert(fp);
|
|
|
fclose(fp);
|
|
|
+ delete buf;
|
|
|
|
|
|
// Get rid of the file if not persistent and if not substream.
|
|
|
if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
|
|
|
- if (remove(path) == -1) {
|
|
|
+ if (unlink(path) == -1) {
|
|
|
cerr << "AMI_STREAM: failed to unlink " << path << endl;
|
|
|
perror("cannot unlink ");
|
|
|
assert(0);
|
|
@@ -445,16 +479,21 @@ template<class T>
|
|
|
AMI_err AMI_STREAM<T>::read_item(T **elt) {
|
|
|
|
|
|
assert(fp);
|
|
|
+
|
|
|
//if we go past substream range
|
|
|
if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
|
|
|
return AMI_ERROR_END_OF_STREAM;
|
|
|
|
|
|
} else {
|
|
|
if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
|
|
|
- //cerr << "file=" << path << ":";
|
|
|
- //perror("cannot read!");
|
|
|
- //assume EOF --should fix this XXX
|
|
|
- return AMI_ERROR_END_OF_STREAM;
|
|
|
+ if(feof(fp)) {
|
|
|
+ eof_reached = 1;
|
|
|
+ return AMI_ERROR_END_OF_STREAM;
|
|
|
+ } else {
|
|
|
+ cerr << "file=" << path << ":";
|
|
|
+ perror("cannot read!");
|
|
|
+ return AMI_ERROR_IO_ERROR;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
*elt = &read_tmp;
|
|
@@ -467,21 +506,30 @@ AMI_err AMI_STREAM<T>::read_item(T **elt) {
|
|
|
|
|
|
/**********************************************************************/
|
|
|
template<class T>
|
|
|
-AMI_err AMI_STREAM<T>::read_array(T *data, off_t len) {
|
|
|
-
|
|
|
+AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp=NULL) {
|
|
|
+ size_t nobj;
|
|
|
assert(fp);
|
|
|
|
|
|
//if we go past substream range
|
|
|
if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
|
|
|
+ eof_reached = 1;
|
|
|
return AMI_ERROR_END_OF_STREAM;
|
|
|
|
|
|
} else {
|
|
|
- if (fread((void*)data, sizeof(T), len, fp) < len) {
|
|
|
- cerr << "file=" << path << ":";
|
|
|
- perror("cannot read!");
|
|
|
- //assume EOF --should fix this XXX
|
|
|
- return AMI_ERROR_END_OF_STREAM;
|
|
|
- }
|
|
|
+ nobj = fread((void*)data, sizeof(T), len, fp);
|
|
|
+
|
|
|
+ if (nobj < len) { /* some kind of error */
|
|
|
+ if(feof(fp)) {
|
|
|
+ if(lenp) *lenp = nobj;
|
|
|
+ eof_reached = 1;
|
|
|
+ return AMI_ERROR_END_OF_STREAM;
|
|
|
+ } else {
|
|
|
+ cerr << "file=" << path << ":";
|
|
|
+ perror("cannot read!");
|
|
|
+ return AMI_ERROR_IO_ERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(lenp) *lenp = nobj;
|
|
|
return AMI_ERROR_NO_ERROR;
|
|
|
}
|
|
|
};
|
|
@@ -501,9 +549,11 @@ AMI_err AMI_STREAM<T>::write_item(const T &elt) {
|
|
|
} else {
|
|
|
if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
|
|
|
cerr << "AMI_STREAM::write_item failed.\n";
|
|
|
+ perror(path);
|
|
|
assert(0);
|
|
|
exit(1);
|
|
|
}
|
|
|
+
|
|
|
return AMI_ERROR_NO_ERROR;
|
|
|
}
|
|
|
};
|
|
@@ -512,6 +562,7 @@ AMI_err AMI_STREAM<T>::write_item(const T &elt) {
|
|
|
/**********************************************************************/
|
|
|
template<class T>
|
|
|
AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
|
|
|
+ size_t nobj;
|
|
|
|
|
|
assert(fp);
|
|
|
//if we go past substream range
|
|
@@ -519,12 +570,13 @@ AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
|
|
|
return AMI_ERROR_END_OF_STREAM;
|
|
|
|
|
|
} else {
|
|
|
- if (fwrite(data, sizeof(T), len,fp) < len) {
|
|
|
+ nobj = fwrite(data, sizeof(T), len, fp);
|
|
|
+ if (nobj < len) {
|
|
|
cerr << "AMI_STREAM::write_item failed.\n";
|
|
|
assert(0);
|
|
|
exit(1);
|
|
|
}
|
|
|
- return AMI_ERROR_NO_ERROR;
|
|
|
+ return AMI_ERROR_NO_ERROR;
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -532,7 +584,7 @@ AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
|
|
|
/**********************************************************************/
|
|
|
template<class T>
|
|
|
void AMI_STREAM<T>::persist(persistence p) {
|
|
|
- per = p;
|
|
|
+ per = p;
|
|
|
};
|
|
|
|
|
|
|
|
@@ -551,4 +603,11 @@ char *AMI_STREAM<T>::sprint() {
|
|
|
return buf;
|
|
|
};
|
|
|
|
|
|
+/**********************************************************************/
|
|
|
+template<class T>
|
|
|
+int AMI_STREAM<T>::eof() {
|
|
|
+ return eof_reached;
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
#endif // _AMI_STREAM_H
|