jio.hpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifndef JIO_INCL
  15. #define JIO_INCL
  16. #include "jiface.hpp"
  17. #include "jarray.hpp"
  18. #include <stdio.h>
  19. typedef count_t findex_t; //row index in a file.
  20. #ifndef IRECORDSIZE_DEFINED // also in eclhelper.hpp
  21. #define IRECORDSIZE_DEFINED
  22. interface IRecordSize: public IInterface
  23. // used to determine record size from record contents
  24. {
  25. virtual size32_t getRecordSize(const void *rec) = 0;
  26. //passing NULL to getRecordSize returns size for fixed records and initial size for variable
  27. virtual size32_t getFixedSize() const = 0;
  28. // returns 0 for variable row size
  29. inline bool isFixedSize() const { return getFixedSize()!=0; }
  30. inline bool isVariableSize() const { return getFixedSize()==0; }
  31. };
  32. #endif
  33. interface IReadSeq : public IInterface
  34. {
  35. // fixed length record read interface
  36. virtual void reset() = 0;
  37. virtual bool get(void *dst) = 0;
  38. virtual unsigned getn(void *dst, unsigned numrecs) = 0;
  39. virtual size32_t getRecordSize() = 0;
  40. virtual void stop() = 0; // indicate finished reading
  41. };
  42. interface IWriteSeq : public IInterface
  43. {
  44. // fixed length record write interface
  45. virtual void flush() = 0;
  46. virtual void put(const void *dst) = 0;
  47. virtual void putn(const void *dst, unsigned numrecs) = 0;
  48. virtual size32_t getRecordSize() = 0;
  49. virtual offset_t getPosition() = 0;
  50. };
  51. interface ISimpleReadStream : public IInterface
  52. {
  53. virtual size32_t read(size32_t max_len, void * data) = 0;
  54. };
  55. interface IIOStream : public ISimpleReadStream
  56. {
  57. virtual void flush() = 0;
  58. virtual size32_t write(size32_t len, const void * data) = 0;
  59. };
  60. #ifdef __x86_64__
  61. extern jlib_decl void writeStringToStream(IIOStream &out, const char *s);
  62. extern jlib_decl void writeCharsNToStream(IIOStream &out, char c, unsigned cnt);
  63. extern jlib_decl void writeCharToStream(IIOStream &out, char c);
  64. #else
  65. inline void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
  66. inline void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
  67. inline void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
  68. #endif
  69. extern jlib_decl IIOStream *createBufferedIOStream(IIOStream *io, unsigned _bufsize=(unsigned)-1);
  70. interface IReceiver : public IInterface
  71. {
  72. virtual bool takeRecord(offset_t pos) = 0;
  73. };
  74. interface IRecordFetchChannel : public IInterface
  75. {
  76. virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver) = 0;
  77. virtual void flush() = 0;
  78. virtual void abort() = 0;
  79. virtual bool isAborted() = 0;
  80. virtual bool isImmediate() = 0;
  81. };
  82. interface IRecordFetcher : public IInterface
  83. {
  84. virtual IRecordFetchChannel *openChannel(bool immediate) = 0;
  85. };
  86. interface IWriteSeqAllocator : public IInterface
  87. {
  88. virtual IWriteSeq *next(size32_t &num) = 0;
  89. };
  90. interface IReadSeqAllocator : public IInterface
  91. {
  92. virtual IReadSeq *next() = 0;
  93. };
  94. extern jlib_decl IReadSeq *createReadSeq(int fh, offset_t _offset, size32_t size, size32_t _bufsize = (size32_t)-1, // bufsize in bytes
  95. unsigned maxrecs=(unsigned)-1, bool compress=false); // compression is *not* blocked and needs buffer size
  96. extern jlib_decl IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize = (size32_t)-1,bool compress=false); // compression is *not* blocked and needs buffer size
  97. extern jlib_decl IWriteSeq *createTeeWriteSeq(IWriteSeq *, IWriteSeq *);
  98. extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa);
  99. extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa);
  100. extern jlib_decl IRecordFetcher *createElevatorFetcher(int fh, size32_t recSize);
  101. extern jlib_decl IRecordSize *createFixedRecordSize(size32_t recsize);
  102. extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta);
  103. extern jlib_decl unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize);
  104. extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails
  105. extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len);
  106. extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos);
  107. class CachedRecordSize
  108. {
  109. public:
  110. inline CachedRecordSize(IRecordSize * _rs = NULL) { set(_rs); }
  111. inline void set(IRecordSize * _rs)
  112. {
  113. rs.set(_rs);
  114. if (_rs)
  115. {
  116. initialSize = _rs->getRecordSize(NULL);
  117. fixedSize = _rs->getFixedSize();
  118. }
  119. }
  120. inline size32_t getInitialSize() const { return initialSize; }
  121. inline size32_t getFixedSize() const { return fixedSize; }
  122. inline size32_t getRecordSize(const void *rec) const { return fixedSize ? fixedSize : rs->getRecordSize(rec); }
  123. inline bool isFixedSize() const { return (fixedSize != 0); }
  124. inline operator IRecordSize * () const { return rs; }
  125. private:
  126. Owned<IRecordSize> rs;
  127. size32_t fixedSize;
  128. size32_t initialSize;
  129. };
  130. interface IFileIO;
  131. interface IFileIOStream;
  132. #ifndef IROWSTREAM_DEFINED
  133. #define IROWSTREAM_DEFINED
  134. interface IRowStream : extends IInterface
  135. {
  136. virtual const void *nextRow()=0; // rows returned must be freed
  137. virtual void stop() = 0; // after stop called NULL is returned
  138. inline const void *ungroupedNextRow()
  139. {
  140. const void *ret = nextRow();
  141. if (!ret)
  142. ret = nextRow();
  143. return ret;
  144. }
  145. };
  146. #endif
  147. interface IRowWriter: extends IInterface
  148. {
  149. virtual void putRow(const void *row) = 0; // takes ownership of row
  150. virtual void flush() = 0;
  151. };
  152. interface IRowLinkCounter: extends IInterface
  153. {
  154. virtual void linkRow(const void *row)=0;
  155. virtual void releaseRow(const void *row)=0;
  156. };
  157. interface IRowProvider: extends IRowLinkCounter
  158. {
  159. virtual const void *nextRow(unsigned idx)=0;
  160. virtual void stop(unsigned idx)=0;
  161. };
  162. extern jlib_decl IRowStream *createNullRowStream();
  163. extern jlib_decl unsigned copyRowStream(IRowStream *in, IRowWriter *out);
  164. extern jlib_decl unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out);
  165. extern jlib_decl unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out);
  166. extern jlib_decl IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped=false);// simple concat
  167. #endif