jio.hpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef JIO_INCL
  14. #define JIO_INCL
  15. #include "jiface.hpp"
  16. #include "jarray.hpp"
  17. #include <stdio.h>
  18. typedef count_t findex_t; //row index in a file.
  19. #ifndef IRECORDSIZE_DEFINED // also in eclhelper.hpp
  20. #define IRECORDSIZE_DEFINED
  21. interface IRecordSize: public IInterface
  22. // used to determine record size from record contents
  23. {
  24. virtual size32_t getRecordSize(const void *rec) = 0;
  25. //passing NULL to getRecordSize returns size for fixed records and initial size for variable
  26. virtual size32_t getFixedSize() const = 0;
  27. // returns 0 for variable row size
  28. virtual size32_t getMinRecordSize() const = 0;
  29. // The minimum size that a variable (or fixed) size record can be.
  30. inline bool isFixedSize() const { return getFixedSize()!=0; }
  31. inline bool isVariableSize() const { return getFixedSize()==0; }
  32. };
  33. #endif
  34. interface IReadSeq : public IInterface
  35. {
  36. // fixed length record read interface
  37. virtual void reset() = 0;
  38. virtual bool get(void *dst) = 0;
  39. virtual unsigned getn(void *dst, unsigned numrecs) = 0;
  40. virtual size32_t getRecordSize() = 0;
  41. virtual void stop() = 0; // indicate finished reading
  42. };
  43. interface IWriteSeq : public IInterface
  44. {
  45. // fixed length record write interface
  46. virtual void flush() = 0;
  47. virtual void put(const void *dst) = 0;
  48. virtual void putn(const void *dst, unsigned numrecs) = 0;
  49. virtual size32_t getRecordSize() = 0;
  50. virtual offset_t getPosition() = 0;
  51. };
  52. interface ISimpleReadStream : public IInterface
  53. {
  54. virtual size32_t read(size32_t max_len, void * data) = 0;
  55. };
  56. interface IIOStream : public ISimpleReadStream
  57. {
  58. virtual void flush() = 0;
  59. virtual size32_t write(size32_t len, const void * data) = 0;
  60. };
  61. #ifdef __x86_64__
  62. extern jlib_decl void writeStringToStream(IIOStream &out, const char *s);
  63. extern jlib_decl void writeCharsNToStream(IIOStream &out, char c, unsigned cnt);
  64. extern jlib_decl void writeCharToStream(IIOStream &out, char c);
  65. #else
  66. inline void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
  67. inline void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
  68. inline void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
  69. #endif
  70. extern jlib_decl IIOStream *createBufferedIOStream(IIOStream *io, unsigned _bufsize=(unsigned)-1);
  71. interface IReceiver : public IInterface
  72. {
  73. virtual bool takeRecord(offset_t pos) = 0;
  74. };
  75. interface IRecordFetchChannel : public IInterface
  76. {
  77. virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver) = 0;
  78. virtual void flush() = 0;
  79. virtual void abort() = 0;
  80. virtual bool isAborted() = 0;
  81. virtual bool isImmediate() = 0;
  82. };
  83. interface IRecordFetcher : public IInterface
  84. {
  85. virtual IRecordFetchChannel *openChannel(bool immediate) = 0;
  86. };
  87. interface IWriteSeqAllocator : public IInterface
  88. {
  89. virtual IWriteSeq *next(size32_t &num) = 0;
  90. };
  91. interface IReadSeqAllocator : public IInterface
  92. {
  93. virtual IReadSeq *next() = 0;
  94. };
  95. extern jlib_decl IReadSeq *createReadSeq(int fh, offset_t _offset, size32_t size, size32_t _bufsize = (size32_t)-1, // bufsize in bytes
  96. unsigned maxrecs=(unsigned)-1, bool compress=false); // compression is *not* blocked and needs buffer size
  97. extern jlib_decl IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize = (size32_t)-1,bool compress=false); // compression is *not* blocked and needs buffer size
  98. extern jlib_decl IWriteSeq *createTeeWriteSeq(IWriteSeq *, IWriteSeq *);
  99. extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa);
  100. extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa);
  101. extern jlib_decl IRecordFetcher *createElevatorFetcher(int fh, size32_t recSize);
  102. extern jlib_decl IRecordSize *createFixedRecordSize(size32_t recsize);
  103. extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta);
  104. extern jlib_decl unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize);
  105. extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails
  106. extern jlib_decl offset_t checked_lseeki64(int handle, offset_t offset, int origin);
  107. extern jlib_decl size32_t checked_write(int handle, const void *buffer, size32_t count);
  108. extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len);
  109. extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos);
  110. interface IFileIO;
  111. interface IFileIOStream;
  112. #ifndef IROWSTREAM_DEFINED
  113. #define IROWSTREAM_DEFINED
  114. interface IRowStream : extends IInterface
  115. {
  116. virtual const void *nextRow()=0; // rows returned must be freed
  117. virtual void stop() = 0; // after stop called NULL is returned
  118. inline const void *ungroupedNextRow()
  119. {
  120. const void *ret = nextRow();
  121. if (!ret)
  122. ret = nextRow();
  123. return ret;
  124. }
  125. };
  126. #endif
  127. interface IRowWriter: extends IInterface
  128. {
  129. virtual void putRow(const void *row) = 0; // takes ownership of row
  130. virtual void flush() = 0;
  131. };
  132. interface IRowWriterEx : extends IRowWriter
  133. {
  134. public:
  135. virtual void noteStopped() = 0;
  136. };
  137. interface IRowLinkCounter: extends IInterface
  138. {
  139. virtual void linkRow(const void *row)=0;
  140. virtual void releaseRow(const void *row)=0;
  141. };
  142. interface IRowProvider: extends IRowLinkCounter
  143. {
  144. virtual const void *nextRow(unsigned idx)=0;
  145. virtual void stop(unsigned idx)=0;
  146. };
  147. extern jlib_decl IRowStream *createNullRowStream();
  148. extern jlib_decl unsigned copyRowStream(IRowStream *in, IRowWriter *out);
  149. extern jlib_decl unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out);
  150. extern jlib_decl unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out);
  151. extern jlib_decl IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped=false);// simple concat
  152. #endif