12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328 |
- /****************************************************************************
- *
- * MODULE: iostream
- *
- * COPYRIGHT (C) 2007 Laura Toma
- *
- *
- * Iostream is a library that implements streams, external memory
- * sorting on streams, and an external memory priority queue on
- * streams. These are the fundamental components used in external
- * memory algorithms.
- * Credits: The library was developed by Laura Toma. The kernel of
- * class STREAM is based on the similar class existent in the GPL TPIE
- * project developed at Duke University. The sorting and priority
- * queue have been developed by Laura Toma based on communications
- * with Rajiv Wickremesinghe. The library was developed as part of
- * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
- * Rajiv Wickremesinghe as part of the Terracost project.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details. *
- * **************************************************************************/
- #ifndef __EMBUFFER_H
- #define __EMBUFFER_H
- #include <stdio.h>
- #include <assert.h>
- #include <stdlib.h>
- #include <math.h>
- #include "ami_config.h" //for SAVE_MEMORY
- #include "ami_stream.h"
- #include "mm.h"
- #include "mm_utils.h"
- #include "pqheap.h"
- #define MY_LOG_DEBUG_ID(x) //inhibit debug printing
- //#define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
- /**********************************************************
- DEBUGGING FLAGS
- ***********************************************************/
- //setting this enables checking that the streams/arrays inserted in
- //buffers are sorted in increasing order
- //#define EMBUF_CHECK_INSERT
- //enable checking that stream name is the same as the one stored in
- //the buffer name[]
- //#define EMBUF_CHECK_NAME
- //enable printing names as they are checked
- //#define EMBUF_CHECK_NAME_PRINT
- //enable printing when streams in a buffer are shifted left to
- //check that names are shifted accordingly
- //#define EMBUF_DELETE_STREAM_PRINT
- //enable printing the name of the stream which is inserted in buff
- //#define EMBUF_PRINT_INSERT
- //enable printing the stream names/sizes in cleanup()
- //#define EMBUF_CLEANUP_PRINT
- //enable printing when get/put_stream is called (for each stream)
- //#define EMBUF_PRINT_GETPUT_STREAM
- //enable printing when get/put_streams is called
- //#define EMBUF_PRINT_GETPUT_STREAMS
- /***********************************************************/
- /*****************************************************************/
- /* encapsulation of the key together with stream_id; used during
- stream merging to save space;
- */
- template<class KEY>
- class merge_key {
- public:
- KEY k;
- unsigned int str_id; //id of the stream where key comes from
- public:
- merge_key(): str_id(0) {}
- merge_key(const KEY &x, const unsigned int sid):
- k(x), str_id(sid) {}
-
- ~merge_key() {}
-
- void set(const KEY &x, const unsigned int sid) {
- k = x;
- str_id = sid;
- }
- KEY key() const {
- return k;
- }
- unsigned int stream_id() const {
- return str_id;
- }
- KEY getPriority() const {
- return k;
- }
- friend ostream& operator<<(ostream& s, const merge_key<KEY> &x) {
- return s << "<str_id=" << x.str_id << "> " << x.k << " ";
- }
- friend int operator < (const merge_key &x,
- const merge_key &y) {
- return (x.k < y.k);
- }
- friend int operator <= (const merge_key &x,
- const merge_key &y) {
- return (x.k <= y.k);
- }
- friend int operator > (const merge_key &x,
- const merge_key &y) {
- return (x.k > y.k);
- }
- friend int operator >= (const merge_key &x,
- const merge_key &y) {
- return (x.k >= y.k);
- }
- friend int operator != (const merge_key &x,
- const merge_key &y) {
- return (x.k != y.k);
- }
- friend int operator == (const merge_key &x,
- const merge_key &y) {
- return (x.k == y.k);
- }
- friend merge_key operator + (const merge_key &x,
- const merge_key &y) {
- assert(0);
- return x;
- // Key sum = x.k + y.k;
- // merge_key f(sum, x.str_id);
- // return f;
- }
- };
- /*****************************************************************
- *****************************************************************
- *****************************************************************
- external_memory buffer
-
- Each level-i buffer can store up to <arity>^i * <basesize> items,
- where tipically <arity> is \theta(m) and <basesize> is \theta(M);
- therefore log_m{n/m} buffers are needed to store N items, one
- buffer for each level 1..log_m{n/m}. All buffers must have same
- values or <arity> and <basesize>.
-
- Functionality:
-
- A level-i on-disk buffer stores <arity>^i * <basesize> items of
- data, organized in <arity> streams of <arity>^{i-1} items each;
- <basesize> is same for all buffers and equal to the size of the
- level 0 buffer (in memory buffer).
-
- Invariant: all the <arity> streams of a level-i buffer are in
- sorted order; in this way sorting the buffer is done by merging the
- <arity> streams in linear time.
-
- Items are inserted in level i-buffer only a whole stream at a time
- (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
- the buffer are full, the buffer is sorted and emptied into a stream
- of a level (i+1)-buffer.
-
- The <arity> streams of a buffer are allocated contigously from left
- to r ight. The unused streams are NULL; The buffer keeps the index of
- the last used(non-NULL) stream. When a buffer becomes full and is
- empty, all its buffers are set to NULL.
- *****************************************************************
- *****************************************************************
- ***************************************************************** */
- /* T is a type with priority of type K and method getPriority() */
- template<class T, class Key>
- class em_buffer {
- private:
- //number of streams in a buffer;
- unsigned int arity;
-
- //level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
- //has a slightly different behaviour so it is implemented as a
- //different class <im_buffer>)
- unsigned short level;
- //level-i buffer contains m streams of data, each of size
- //arity^{i-1}*basesize;
- AMI_STREAM<T>** data;
-
- //the buffers can be depleted to fill the internal pq;
- //keep an array which counts, for each stream, how many elements
- //have been deleted (implicitely from the beginning of stream)
- long* deleted;
- //nb of items in each substream; this can be found out by calling
- //stream_len() on the stream, but it is more costly esp in the case
- //when streams are on disk and must be moved in and out just to find
- //stream length; streamsize is set only at stream creation, and the
- //actual size must subtract the number of iteme deleted from the
- //bos
- unsigned long* streamsize;
-
- //index of the next available(empty) stream (out of the total m
- //streams in the buffer);
- unsigned int index;
-
- //nb of items in a stream of level_1 buffer
- unsigned long basesize;
- public:
- //create a level-i buffer of given basesize;
- em_buffer(const unsigned short i, const unsigned long bs,
- const unsigned int ar);
-
- //copy constructor;
- em_buffer(const em_buffer &buf);
- //free the stream array and the streams pointers
- ~em_buffer();
-
- //return the level of the buffer;
- unsigned short get_level() const { return level;}
- //return the ith stream (load stream in memory)
- AMI_STREAM<T>* get_stream(unsigned int i);
-
- //return a pointer to the streams of the buffer (loads streams in
- //memory)
- AMI_STREAM<T>** get_streams();
- //put the ith stream back to disk
- void put_stream(unsigned int i);
-
- //called in pair with get_streams to put all streams back to disk
- void put_streams();
- //return a pointer to the array of deletion count for each stream
- long* get_bos() const { return deleted;}
-
- //return the index of the last stream in buffer which contains data;
- unsigned int laststream() const { return index -1;}
- //return the index of the next available stream in the buffer
- unsigned int nextstream() const { return index;}
- //increment the index of the next available stream in the buffer
- void incr_nextstream() { ++index;}
- //return nb of (non-empty) streams in buffer
- unsigned int get_nbstreams() const { return index;}
-
- //return arity
- unsigned int get_arity() const { return arity;}
- //return total nb of deleted elements in all active streams of the buffer
- long total_deleted() const {
- long tot = 0;
- for (unsigned int i=0; i< index; i++) {
- tot += deleted[i];
- }
- return tot;
- }
-
- //mark as deleted one more element from i'th stream
- void incr_deleted(unsigned int i) {
- assert(i<index);
- deleted[i]++;
- }
- //return the nominal size of a stream (nb of items):
- //arity^{level-1}*basesize;
- unsigned long get_stream_maxlen() const {
- return (unsigned long)pow((double)arity,(double)level-1)*basesize;
- }
-
- //return the actual size of stream i; i must be the index of a valid
- //stream
- unsigned long get_stream_len(unsigned int i) {
- //assert(i>= 0 && i<index);
- return streamsize[i] - deleted[i];
- }
- //return the total current size of the buffer; account for the
- //deleted elements;
- unsigned long get_buf_len() {
- unsigned long tot = 0;
- for (unsigned int i=0; i< index; i++) {
- tot += get_stream_len(i);
- }
- return tot;
- }
-
- //return the total maximal capacity of the buffer
- unsigned long get_buf_maxlen() {
- return arity * get_stream_maxlen();
- }
-
- //return true if buffer is empty (all streams are empty)
- bool is_empty() {
- return ((nextstream() == 0) || (get_buf_len() == 0));
- }
-
- //return true if buffer is full(all streams are full)
- bool is_full() const {
- return (nextstream() == arity);
- }
-
- //reset
- void reset();
-
- //clean buffer: in case some streams have been emptied by deletion
- //delete them and shift streams left;
- void cleanup();
-
- //create and return a stream which contains all elements of all
- //streams of the buffer in sorted ascending order of their
- //keys(priorities);
- AMI_STREAM<T>* sort();
- // insert an array into the buffer; can only insert one
- // level-i-full-stream-len nb of items at a time; assume the length
- // of the array is precisely the streamlen of level-i buffer n =
- // (pow(arity,level-1)*basesize); assume array is sorted; return the
- // number of items actually inserted
- long insert(T* a, long n);
- // insert a stream into the buffer; assume the length of the stream
- // is precisely the streamlen of level-i buffer n =
- // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
- // is set to point to the argument stream; (in this way no stream
- // copying is done, just one pointer copy). The user should be aware
- // the the argument stream is 'lost' - that is a stream cannot be
- // inserted repeatedly into many buffers because this would lead to
- // several buffers pointing to the same stream.
- // stream is assumed sorted; bos = how many elements are deleted
- // from the beginning of stream;
-
- // return the number of items actually inserted
- long insert(AMI_STREAM<T>* str,
- long bos=0);
-
- //print range of elements in buffer
- void print_range();
-
- //print all elements in buffer
- void print();
-
- //prints the sizes of the streams in the buffer
- void print_stream_sizes();
-
- //print the elements in the buffer
- friend ostream& operator<<(ostream& s, em_buffer &b) {
- s << "BUFFER_" << b.level << ": ";
- if (b.index ==0) {
- s << "[]";
- }
- s << "\n";
- b.get_streams();
- for (unsigned int i=0; i < b.index; i++) {
- b.print_stream(s, i);
- }
- b.put_streams();
- return s;
- }
-
-
- private:
- // merge the input streams; there are <arity> streams in total;
- // write output in <outstream>; the input streams are assumed sorted
- // in increasing order of their keys;
- AMI_err substream_merge(AMI_STREAM<T>** instreams,
- unsigned int arity,
- AMI_STREAM<T> *outstream);
-
- //print to stream the elements in i'th stream
- void print_stream(ostream& s, unsigned int i);
- #ifdef SAVE_MEMORY
- //array of names of streams;
- char** name;
- //return the designated name for stream i
- char* get_stream_name(unsigned int i) const;
-
- //print all stream names in buffer
- void print_stream_names();
- //checks that name[i] is the same as stream name; stream i must be in
- //memory (by a previous get_stream call, for instance) in order to
- //find its length
- void check_name(unsigned int i);
- #endif
- };
- /************************************************************/
- //create a level-i buffer of given basesize;
- template <class T, class Key>
- em_buffer<T,Key>::em_buffer(const unsigned short i, const unsigned long bs,
- const unsigned int ar) :
- arity(ar), level(i), basesize(bs) {
- assert((level>=1) && (basesize >=0));
-
- char str[100];
- sprintf(str, "em_buffer: allocate %d AMI_STREAM*, total %ld\n",
- arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
- MEMORY_LOG(str);
- //allocate STREAM* array
- data = new AMI_STREAM<T>* [arity];
-
- //allocate deleted array
- sprintf(str, "em_buffer: allocate deleted array: %ld\n",
- (long)(arity*sizeof(long)));
- MEMORY_LOG(str);
- deleted = new long[arity];
-
- //allocate streamsize array
- sprintf(str, "em_buffer: allocate streamsize array: %ld\n",
- (long)(arity*sizeof(long)));
- MEMORY_LOG(str);
- streamsize = new unsigned long[arity];
-
- #ifdef SAVE_MEMORY
- //allocate name array
- sprintf(str, "em_buffer: allocate name array: %ld\n",
- (long)(arity*sizeof(char*)));
- MEMORY_LOG(str);
- name = new char* [arity];
- assert(name);
- #endif
- //assert data
- if ((!data) || (!deleted) || (!streamsize)) {
- cerr << "em_buffer: cannot allocate\n";
- exit(1);
- }
-
- //initialize the <arity> streams to NULL, deleted[], streamsize[]
- //and name[]
- for (unsigned int ui=0; ui< arity; ui++) {
- data[ui] = NULL;
- deleted[ui] = 0;
- streamsize[ui] = 0;
- #ifdef SAVE_MEMORY
- name[ui] = NULL;
- #endif
- }
- //set index
- index = 0;
- #ifdef SAVE_MEMORY
- //streams_in_memory = false;
- #endif
- }
- /************************************************************/
- //copy constructor;
- template<class T, class Key>
- em_buffer<T,Key>::em_buffer(const em_buffer &buf):
- level(buf.level), basesize(buf.basesize),
- index(buf.index), arity(buf.arity) {
- assert(0);//should not get called
- MEMORY_LOG("em_buffer: copy constr start\n");
- get_streams();
- for (unsigned int i=0; i< index; i++) {
- assert(data[i]);
- delete data[i]; //delete old stream if existing
- data[i] = NULL;
-
- //call copy constructor; i'm not sure that it actually duplicates
- //the stream and copies the data; should that in the BTE
- //sometimes..
- data[i] = new AMI_STREAM<T>(*buf.data[i]);
- deleted[i] = buf.deleted[i];
- streamsize[i] = buf.streamsize[i];
- #ifdef SAVE_MEMORY
- assert(name[i]);
- delete name[i];
- name[i] = NULL;
- name[i] = buf.name[i];
- #endif
- }
- put_streams();
- MEMORY_LOG("em_buffer: copy constr end\n");
- }
- /************************************************************/
- //free the stream array and the streams pointers
- template<class T, class Key>
- em_buffer<T,Key>::~em_buffer() {
- assert(data);
- //delete the m streams in the buffer
- get_streams();
- for (unsigned int i=0; i<index; i++) {
- assert(data[i]);
- #ifdef SAVE_MEMORY
- check_name(i);
- delete name[i];
- #endif
- delete data[i];
- data[i] = NULL;
- }
-
- delete [] data;
- delete [] deleted;
- delete [] streamsize;
- #ifdef SAVE_MEMORY
- delete [] name;
- #endif
- }
- #ifdef SAVE_MEMORY
- /************************************************************/
- //checks that name[i] is the same as stream name; stream i must be in
- //memory (by a previous get_stream call, for instance) in order to
- //find its length
- template<class T, class Key>
- void em_buffer<T,Key>::check_name(unsigned int i) {
- #ifdef EMBUF_CHECK_NAME
- assert(i>=0 && i < index);
- assert(data[i]);
- char* fooname;
- data[i]->name(&fooname);//name() allocates the string
- #ifdef EMBUF_CHECK_NAME_PRINT
- cout << "::check_name: checking stream [" << level << "," << i << "] name:"
- << fooname << endl;
- cout.flush();
- #endif
- if (strcmp(name[i], fooname) != 0) {
- cerr << "name[" << i << "]=" << name[i]
- << ", streamname=" << fooname << endl;
- }
- assert(strcmp(fooname, name[i]) == 0);
- delete fooname;
- #endif
- }
- #endif
- /************************************************************/
- //if SAVE_MEMORY flag is set, load the stream in memory; return the
- //ith stream
- template<class T, class Key>
- AMI_STREAM<T>* em_buffer<T,Key>::get_stream(unsigned int i) {
- assert(i>=0 && i < index);
-
- #ifdef SAVE_MEMORY
- MY_LOG_DEBUG_ID("em_buffer::get_stream");
- MY_LOG_DEBUG_ID(i);
-
- if (data[i] == NULL) {
- //stream is on disk, load it in memory
- assert(name[i]);
- MY_LOG_DEBUG_ID("load stream in memory");
- MY_LOG_DEBUG_ID(name[i]);
-
- #ifdef EMBUF_PRINT_GETPUT_STREAM
- cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
- cout.flush();
- #endif
-
- //assert that file exists
- FILE* fp;
- if ((fp = fopen(name[i],"rb")) == NULL) {
- cerr << "get_stream: checking that stream " << name[i] << "exists\n";
- perror(name[i]);
- assert(0);
- exit(1);
- }
- fclose(fp);
- //create an AMI_STREAM from file
- data[i] = new AMI_STREAM<T>(name[i]);
- assert(data[i]);
- } else {
- //if data[i] not NULL, stream must be already in memory
- MY_LOG_DEBUG_ID("stream not NULL");
- MY_LOG_DEBUG_ID(data[i]->sprint());
- }
- #endif
-
- //NOW STREAM IS IN MEMORY
- //some assertion checks
- assert(data[i]);
- assert(data[i]->stream_len() == streamsize[i]);
- #ifdef SAVE_MEMORY
- check_name(i);
- #endif
- return data[i];
- }
- /************************************************************/
- //if SAVE_MEMORY flag is set, put the i'th stream back on disk
- template<class T, class Key>
- void em_buffer<T,Key>::put_stream(unsigned int i) {
- assert(i>=0 && i < index);
- #ifdef SAVE_MEMORY
- MY_LOG_DEBUG_ID("em_buffer::put_stream");
- MY_LOG_DEBUG_ID(i);
-
- if (data[i] != NULL) {
- //stream is in memory, put it on disk
- MY_LOG_DEBUG_ID("stream put to disk");
- MY_LOG_DEBUG_ID(data[i]->sprint());
- check_name(i);
- #ifdef EMBUF_PRINT_GETPUT_STREAM
- cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
- cout.flush();
- #endif
-
- //make stream persistent and delete it
- data[i]->persist(PERSIST_PERSISTENT);
- delete data[i];
- data[i] = NULL;
- } else {
- //data[i] is NULL, so stream must be already put on disk
- MY_LOG_DEBUG_ID("stream is NULL");
- }
- #endif
-
- }
- /************************************************************/
- //return a pointer to the streams of the buffer
- template<class T, class Key>
- AMI_STREAM<T>** em_buffer<T,Key>::get_streams() {
- #ifdef SAVE_MEMORY
- MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
- #ifdef EMBUF_PRINT_GETPUT_STREAMS
- cout << "em_buffer::get_streams (buffer " << level <<")";
- cout << ": index = " << index << "(arity=" << arity << ")\n";
- cout.flush();
- #endif
- for (unsigned int i=0; i<index; i++) {
- get_stream(i);
- assert(data[i]);
- }
- #endif
- return data;
- }
- /************************************************************/
- //called in pair with load_streams to store streams on disk
- //and release the memory
- template<class T, class Key>
- void em_buffer<T,Key>::put_streams() {
- #ifdef SAVE_MEMORY
- MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
- #ifdef EMBUF_PRINT_GETPUT_STREAMS
- cout << "em_buffer::put_streams (buffer " << level <<")";
- cout << ": index = " << index << "(arity=" << arity << ")\n";
- cout.flush();
- #endif
- for (unsigned int i=0; i<index; i++) {
- put_stream(i);
- assert(data[i] == NULL);
- }
- #endif
- }
- #ifdef SAVE_MEMORY
- /************************************************************/
- //return the name of the ith stream
- template<class T, class Key>
- char* em_buffer<T,Key>::get_stream_name(unsigned int i) const {
-
- assert(i>=0 && i<index);
- assert(name[i]);
- return name[i];
- }
- #endif
- #ifdef SAVE_MEMORY
- /************************************************************/
- template<class T, class Key>
- void em_buffer<T,Key>::print_stream_names() {
- unsigned int i;
- for (i=0; i<index; i++) {
- assert(name[i]);
- cout << "stream " << i << ": " << name[i] << endl;
- }
- cout.flush();
- }
- #endif
- /************************************************************/
- //clean buffer in case some streams have been emptied by deletion
- template<class T, class Key>
- void em_buffer<T,Key>::cleanup() {
-
- MY_LOG_DEBUG_ID("em_buffer::cleanup()");
- #ifdef EMBUF_CLEANUP_PRINT
- #ifdef SAVE_MEMORY
- if (index>0) {
- cout << "before cleanup:\n";
- print_stream_names();
- print_stream_sizes();
- cout.flush();
- }
- #endif
- #endif
-
- //load all streams in memory
- get_streams();
- //count streams of size=0
- unsigned int i, empty=0;
- for (i=0; i<index; i++) {
-
- if (get_stream_len(i) == 0) {
- //printing..
- #ifdef EMBUF_DELETE_STREAM_PRINT
- cout<<"deleting stream [" << level << "," << i <<"]:" ;
- #ifdef SAVE_MEMORY
- cout << name[i];
- #endif
- cout << endl;
- cout.flush();
- #endif
-
- #ifdef SAVE_MEMORY
- //stream is empty ==> delete its name
- assert(name[i]);
- delete name[i];
- name[i] = NULL;
- #endif
- //stream is empty ==> reset data
- assert(data[i]);
- //data[i]->persist(PERSIST_DELETE); //this is done automatically..
- delete data[i];
- data[i] = NULL;
- deleted[i] = 0;
- streamsize[i] = 0;
- empty++;
- }
- }
- //streams are in memory; all streams which are NULL must have been
- //deleted
- //shift streams to the left in case holes were introduced
- unsigned int j=0;
- if (empty) {
- #ifdef EMBUF_DELETE_STREAM_PRINT
- cout << "em_buffer::cleanup: shifting streams\n"; cout.flush();
- #endif
- for (i=0; i<index; i++) {
- //if i'th stream is not empty, shift it left if necessary
- if (data[i]) {
- if (i!=j) {
- //set j'th stream to point to i'th stream
- //cout << j << " set to " << i << endl; cout.flush();
- data[j] = data[i];
- deleted[j] = deleted[i];
- streamsize[j] = streamsize[i];
- //set i'th stream to point to NULL
- data[i] = NULL;
- deleted[i] = 0;
- streamsize[i] = 0;
- #ifdef SAVE_MEMORY
- //fix the names
- /* already done assert(name[j]); */
- /* delete name[j]; */
- name[j] = name[i];
- name[i] = NULL;
- check_name(j);
- #endif
- } else {
- //cout << i << " left the same" << endl;
- }
- j++;
- } //if data[i] != NULL
- }//for i
- //set the index
- assert(index == j + empty);
- index = j;
-
- #ifdef EMBUF_DELETE_STREAM_PRINT
- cout << "em_buffer::cleanup: index set to " << index << endl;
- cout.flush();
- #endif
- } //if empty
- //put streams back to disk
- put_streams();
- #ifdef EMBUF_CLEANUP_PRINT
- #ifdef SAVE_MEMORY
- if (index >0) {
- cout << "after cleanup:\n";
- print_stream_names();
- print_stream_sizes();
- cout.flush();
- }
- #endif
- #endif
- }
- /************************************************************/
- //delete all streams
- template<class T, class Key>
- void em_buffer<T,Key>::reset() {
-
- get_streams();
-
- //make streams not-persistent and delete them
- for (unsigned int i=0; i<index; i++) {
- assert(data[i]);
- assert(streamsize[i] == data[i]->stream_len());
- #ifdef SAVE_MEMORY
- check_name(i);
- assert(name[i]);
- delete name[i];
- name[i] = NULL;
- #endif
-
- data[i]->persist(PERSIST_DELETE);
- delete data[i];
- data[i] = NULL;
- deleted[i] = 0;
- streamsize[i] = 0;
- }
-
- index = 0;
- }
- /************************************************************/
- //create and return a stream which contains all elements of
- //all streams of the buffer in sorted ascending order of
- //their keys (priorities);
- template<class T, class Key>
- AMI_STREAM<T>*
- em_buffer<T,Key>::sort() {
-
- //create stream
- MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
- AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>(); /* will be deleteed in insert() */
- assert(sorted_stream);
-
- //merge the streams into sorted stream
- AMI_err aerr;
- //Key dummykey;
- //must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
- // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
- // 0, dummykey);
- //could not use AMI_merge so i had to write my own..
- get_streams();
- aerr = substream_merge(data, arity, sorted_stream);
- assert(aerr == AMI_ERROR_NO_ERROR);
-
- put_streams();
-
- return sorted_stream;
- }
-
- /************************************************************/
- /* merge the input streams; there are <arity> streams in total; write
- output in <outstream>;
-
- the input streams are assumed sorted in increasing order of their
- keys;
-
- assumes the instreams are in memory (no need for get_streams()) */
- template<class T, class Key>
- AMI_err em_buffer<T,Key>::substream_merge(AMI_STREAM<T>** instreams,
- unsigned int arity,
- AMI_STREAM<T> *outstream) {
-
- unsigned int i, j;
-
- //some assertion checks
- assert(instreams);
- assert(outstream);
- for (i = 0; i < arity ; i++ ) {
- assert(instreams[i]);
- #ifdef SAVE_MEMORY
- check_name(i);
- #endif
- }
- std::vector<T*> in_objects(arity); //pointers to current leading elements of streams
- AMI_err ami_err;
-
-
- char str[200];
- sprintf(str, "em_buffer::substream_merge: allocate keys array, total %ldB\n",
- (long)((long)arity * sizeof(merge_key<Key>)));
- MEMORY_LOG(str);
-
- //keys array is initialized with smallest key from each stream (only
- //non-null keys must be included)
- merge_key<Key>* keys;
- //merge_key<Key>* keys = new (merge_key<Key>)[arity];
- typedef merge_key<Key> footype;
- keys = new footype[arity];
- assert(keys);
-
- //count number of non-empty streams
- j = 0;
- //rewind and read the first item from every stream initializing
- //in_objects and keys
- for (i = 0; i < arity ; i++ ) {
- assert(instreams[i]);
- //rewind stream
- if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
- return ami_err;
- }
- //read first item from stream
- if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
- AMI_ERROR_NO_ERROR) {
- if (ami_err == AMI_ERROR_END_OF_STREAM) {
- in_objects[i] = NULL;
- } else {
- return ami_err;
- }
- } else {
- //include this key in the array of keys
- Key k = in_objects[i]->getPriority();
- keys[j].set(k, i);
- j++;
- }
- }
- unsigned int NonEmptyRuns = j;
- //build heap from the array of keys
- pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
- //repeatedly extract_min from heap, write it to output stream and
- //insert next element from same stream
- merge_key<Key> minelt;
- //rewind output buffer
- ami_err = outstream->seek(0);
- assert(ami_err == AMI_ERROR_NO_ERROR);
- while (!mergeheap.empty()) {
- //find min key and id of the stream from whereit comes
- mergeheap.min(minelt);
- i = minelt.stream_id();
- //write min item to output stream
- if ((ami_err = outstream->write_item(*in_objects[i]))
- != AMI_ERROR_NO_ERROR) {
- return ami_err;
- }
- //read next item from same input stream
- if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
- != AMI_ERROR_NO_ERROR) {
- if (ami_err != AMI_ERROR_END_OF_STREAM) {
- return ami_err;
- }
- }
- //extract the min from the heap and insert next key from same stream
- if (ami_err == AMI_ERROR_END_OF_STREAM) {
- mergeheap.delete_min();
- } else {
- Key k = in_objects[i]->getPriority();
- merge_key<Key> nextit(k, i);
- mergeheap.delete_min_and_insert(nextit);
- }
- } //while
-
- //delete [] keys;
- //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
- //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
- //I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
- //AND EVERYTHING SCREWS UP..
-
- return AMI_ERROR_NO_ERROR;
- }
- /************************************************************/
- // insert an array into the buffer; assume array is sorted; return the
- // number of items actually inserted; if SAVE_MEMORY FLAG is on, put
- // stream on disk and release its memory
- template<class T, class Key>
- long em_buffer<T,Key>::insert(T* a, long n) {
- assert(a);
- if (is_full()) {
- cout << "em_buffer::insert: buffer full\n";
- return 0;
- }
-
- //can only insert one full stream at a time
- //relaxed..
- //assert(n == get_stream_maxlen());
-
- //create the stream
- MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
- AMI_STREAM<T>* str = new AMI_STREAM<T>();
- assert(str);
-
- //write the array to stream
- AMI_err ae;
- for (long i=0; i< n; i++) {
- ae = str->write_item(a[i]);
- assert(ae == AMI_ERROR_NO_ERROR);
- }
- assert(n == str->stream_len());
- //insert the stream in the buffer
- return insert(str);
- }
- /************************************************************/
- /* insert a stream into the buffer; the next free entry in the buffer
- is set to point to the stream; if SAVE_MEMORY flag is on, the
- stream is put to disk;
-
- the <nextstream> pointer of buffer is set to point to the argument
- stream; (in this way no stream copying is done, just one pointer
- copy). The user should be aware the the argument stream is 'lost' -
- that is a stream cannot be inserted repeatedly into many buffers
- because this would lead to several buffers pointing to the same
- stream.
-
- stream is assume stream is sorted; bos = how many elements must be
- skipped (were deleted) from the beginning fo stream;
-
- return the number of items actually inserted */
- template<class T, class Key>
- long em_buffer<T,Key>::insert(AMI_STREAM<T>* str, long bos) {
- assert(str);
-
- if (is_full()) {
- cout << "em_buffer::insert: buffer full\n";
- return 0;
- }
-
- //can only insert one level-i-full-stream at a time;
- //relaxed..can specify bos;
- //not only that, but the length of the stream can be smaller
- //than nominal length, because a stream is normally obtained by
- //merging streams which can be shorter;
- //assert(str->stream_len() == get_stream_len() - bos);
- #ifdef EMBUF_CHECK_INSERT
- //check that stream is sorted
- cout << "CHECK_INSERT: checking stream is sorted\n";
- AMI_err ae;
- str->seek(0);
- T *crt=NULL, *prev=NULL;
- while (str->read_item(&crt)) {
- assert(ae == AMI_ERROR_NO_ERROR);
- if (prev) assert(*prev <= *crt);
- }
- #endif
-
- //nextstream must be empty
- assert(str);
- assert(data[nextstream()] == NULL);
- assert(deleted[nextstream()] == 0);
- assert(streamsize[nextstream()] == 0);
- #ifdef SAVE_MEMORY
- assert(name[nextstream()] == NULL);
- #endif
- //set next entry i the buffer to point to this stream
- data[nextstream()] = str;
- deleted[nextstream()] = bos;
- streamsize[nextstream()] = str->stream_len();
- #ifdef SAVE_MEMORY
- //set next name entry in buffer to point to this stream's name
- char* s;
- str->name(&s); //name() allocates the string
- name[nextstream()] = s;
-
- //put stream on disk and release its memory
- str->persist(PERSIST_PERSISTENT);
- delete str; //stream should be persistent; just delete it
- data[nextstream()] = NULL;
- #ifdef EMBUF_PRINT_INSERT
- cout << "insert stream " << s << " at buf [" << level
- << "," << nextstream() << "]" << endl;
- #endif
- #endif
-
- //increment the index of next available stream in buffer
- incr_nextstream();
- #ifdef EMBUF_PRINT_INSERT
- print_stream_sizes();
- print_stream_names();
- #endif
-
- #ifdef SAVE_MEMORY
- MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
- MY_LOG_DEBUG_ID(name[nextstream()-1]);
- #endif
- //return nb of items inserted
- return get_stream_len(nextstream()-1);
- }
- /************************************************************/
- //print the elements of the i'th stream of the buffer to a stream;
- //assumes stream is in memory;
- template<class T, class Key>
- void em_buffer<T,Key>::print_stream(ostream& s, unsigned int i) {
-
- assert(data[i]);
- assert((i>=0) && (i<index));
-
- AMI_err ae;
- T* x;
- s << "STREAM " << i << ": [";
- ae = data[i]->seek(deleted[i]);
- assert(ae == AMI_ERROR_NO_ERROR);
-
- for (long j = 0; j < get_stream_len(i); j++) {
- ae = data[i]->read_item(&x);
- assert(ae == AMI_ERROR_NO_ERROR);
- s << *x << ",";
- }
- s << "]\n";
- }
- /************************************************************/
- //print elements range in buffer (read first and last element in each
- //substream and find global min and max)
- template<class T, class Key>
- void em_buffer<T,Key>::print_range() {
- T *min, *max;
- AMI_err ae;
-
- get_streams();
- for (unsigned int i=0; i< index; i++) {
- cout << "[";
- //read min element in substream i
- ae = data[i]->seek(deleted[i]);
- assert(ae == AMI_ERROR_NO_ERROR);
- ae = data[i]->read_item(&min);
- assert(ae == AMI_ERROR_NO_ERROR);
- cout << min->getPriority() << "..";
- //read max element in substream i
- ae = data[i]->seek(streamsize[i] - 1);
- assert(ae == AMI_ERROR_NO_ERROR);
- ae = data[i]->read_item(&max);
- assert(ae == AMI_ERROR_NO_ERROR);
- cout << max->getPriority()
- << " (sz=" << get_stream_len(i) << ")] ";
- }
- for (unsigned int i=index; i< arity; i++) {
- cout << "[] ";
- }
-
- put_streams();
- }
- /************************************************************/
- //print all elements in buffer
- template<class T, class Key>
- void em_buffer<T,Key>::print() {
- T *x;
- AMI_err ae;
-
- get_streams();
- for (unsigned int i=0; i<index; i++) {
- cout << " [";
- ae = data[i]->seek(deleted[i]);
- assert(ae == AMI_ERROR_NO_ERROR);
- for (unsigned long j=0; j<get_stream_len(i); j++) {
- ae = data[i]->read_item(&x);
- assert(ae == AMI_ERROR_NO_ERROR);
- cout << x->getPriority() << ",";
- }
- cout << "]" << endl;
- }
- for (unsigned int i=index; i< arity; i++) {
- cout << "[] ";
- }
- put_streams();
- }
- /************************************************************/
- //print the sizes of the substreams in the buffer
- template<class T, class Key>
- void em_buffer<T,Key>::print_stream_sizes() {
- cout << "(streams=" << index << ") sizes=[";
- for (unsigned int i=0; i< arity; i++) {
- cout << get_stream_len(i) << ",";
- }
- cout << "]" << endl;
- cout.flush();
- }
- #endif
|