fttransform.ipp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef FTTRANSFORM_IPP
  14. #define FTTRANSFORM_IPP
  15. #include "jptree.hpp"
  16. #include "filecopy.hpp"
  17. #include "fttransform.hpp"
  18. #include "ftbase.ipp"
  19. #include "daft.hpp"
  20. #include "daftformat.hpp"
  21. //---------------------------------------------------------------------------
  22. class CTransformerBase : implements ITransformer, public CInterface
  23. {
  24. public:
  25. CTransformerBase();
  26. IMPLEMENT_IINTERFACE
  27. virtual void beginTransform(IFileIOStream * out);
  28. virtual void endTransform(IFileIOStream * out);
  29. virtual bool getInputCRC(crc32_t & value) { return false; }
  30. virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey) = 0;
  31. virtual void setInputCRC(crc32_t value);
  32. virtual stat_type getStatistic(StatisticKind kind) = 0;
  33. protected:
  34. bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length);
  35. protected:
  36. IFileAttr inputFile;
  37. offset_t startOffset;
  38. offset_t maxOffset;
  39. };
  40. class CTransformer : public CTransformerBase
  41. {
  42. public:
  43. CTransformer(size32_t _bufferSize);
  44. ~CTransformer();
  45. virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey);
  46. virtual size32_t getBlock(IFileIOStream * out);
  47. virtual offset_t tell();
  48. virtual stat_type getStatistic(StatisticKind kind) override { return input->getStatistic(kind); }
  49. protected:
  50. size32_t read(size32_t maxLength, void * buffer);
  51. virtual size32_t getN(byte * buffer, size32_t maxLength) = 0;
  52. protected:
  53. IFileIOAttr input;
  54. offset_t cursor;
  55. size32_t bufferSize;
  56. byte * buffer;
  57. };
  58. //----------------------------------------------------------------------------
  59. //Copying fixed->fixed, don't really care about the record size when copying.
  60. class CNullTransformer : public CTransformer
  61. {
  62. public:
  63. CNullTransformer(size32_t buffersize);
  64. virtual size32_t getN(byte * buffer, size32_t maxLength);
  65. virtual bool getInputCRC(crc32_t & value) { value = inputCRC; return true; }
  66. virtual void setInputCRC(crc32_t value);
  67. protected:
  68. bool doInputCRC;
  69. crc32_t inputCRC;
  70. };
  71. //----------------------------------------------------------------------------
  72. //Copying fixed->fixed, don't really care about the record size when copying.
  73. class CFixedToVarTransformer : public CTransformer
  74. {
  75. public:
  76. CFixedToVarTransformer(size32_t _recordSize,size32_t buffersize, bool _bigendian);
  77. virtual size32_t getN(byte * buffer, size32_t maxLength);
  78. virtual offset_t tell();
  79. protected:
  80. typedef unsigned varLenType;
  81. enum { minBlockSize = 32768 };
  82. protected:
  83. size32_t recordSize;
  84. bool bigendian;
  85. };
  86. //----------------------------------------------------------------------------
  87. //Copying fixed->fixed, don't really care about the record size when copying.
  88. class CVarToFixedTransformer : public CTransformer
  89. {
  90. public:
  91. CVarToFixedTransformer(size32_t _recordSize,size32_t buffersize,bool _bigendian);
  92. ~CVarToFixedTransformer();
  93. virtual size32_t getN(byte * buffer, size32_t maxLength);
  94. virtual offset_t tell();
  95. protected:
  96. typedef unsigned varLenType;
  97. enum { minBlockSize = 32768 };
  98. protected:
  99. size32_t recordSize;
  100. size32_t savedSize;
  101. byte * savedBuffer;
  102. bool bigendian;
  103. };
  104. //----------------------------------------------------------------------------
  105. class CBlockToVarTransformer : public CTransformer
  106. {
  107. public:
  108. CBlockToVarTransformer(bool _bigendian);
  109. virtual size32_t getN(byte * buffer, size32_t maxLength);
  110. virtual offset_t tell();
  111. protected:
  112. typedef unsigned blockLenType;
  113. protected:
  114. blockLenType nextBlockSize;
  115. bool bigendian;
  116. };
  117. //----------------------------------------------------------------------------
  118. class CVarToBlockTransformer : public CTransformer
  119. {
  120. public:
  121. CVarToBlockTransformer(bool _bigendian);
  122. ~CVarToBlockTransformer();
  123. virtual size32_t getN(byte * buffer, size32_t maxLength);
  124. virtual offset_t tell();
  125. protected:
  126. typedef unsigned blockLenType;
  127. typedef unsigned varLenType;
  128. protected:
  129. size32_t savedSize;
  130. byte * savedBuffer;
  131. bool bigendian;
  132. };
  133. //----------------------------------------------------------------------------
  134. class CGeneralTransformer : public CTransformerBase
  135. {
  136. public:
  137. CGeneralTransformer(const FileFormat & srcFormat, const FileFormat & tgtFormat);
  138. virtual void beginTransform(IFileIOStream * out);
  139. virtual void endTransform(IFileIOStream * out);
  140. virtual size32_t getBlock(IFileIOStream * out);
  141. virtual bool getInputCRC(crc32_t & value);
  142. virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey);
  143. virtual void setInputCRC(crc32_t value);
  144. virtual offset_t tell();
  145. virtual stat_type getStatistic(StatisticKind kind) override { UNIMPLEMENTED; }
  146. protected:
  147. Owned<IFormatProcessor> processor;
  148. Owned<IOutputProcessor> target;
  149. TransformCursor cursor;
  150. };
  151. //----------------------------------------------------------------------------
  152. class DALIFT_API TransferServer
  153. {
  154. public:
  155. TransferServer(ISocket * _masterSocket);
  156. void deserializeAction(MemoryBuffer & msg, unsigned action);
  157. bool pull();
  158. bool push();
  159. protected:
  160. void appendTransformed(unsigned whichChunk, ITransformer * input);
  161. unsigned queryLastOutput(unsigned outputIndex);
  162. void sendProgress(OutputProgress & curProgress);
  163. void transferChunk(unsigned chunkIndex);
  164. void wrapOutInCRC(unsigned startCRC);
  165. protected:
  166. PartitionPointArray partition;
  167. OutputProgressArray progress;
  168. FileFormat srcFormat;
  169. FileFormat tgtFormat;
  170. ISocket * masterSocket;
  171. Linked<IFileIOStream> out;
  172. Linked<CrcIOStream> crcOut;
  173. unsigned lastTick;
  174. unsigned updateFrequency;
  175. offset_t totalLengthRead;
  176. offset_t totalLengthToRead;
  177. bool calcInputCRC;
  178. bool calcOutputCRC;
  179. bool copySourceTimeStamp;
  180. bool mirror;
  181. bool isSafeMode;
  182. unsigned throttleNicSpeed;
  183. unsigned numParallelSlaves;
  184. bool compressedInput;
  185. bool compressOutput;
  186. bool copyCompressed;
  187. size32_t transferBufferSize;
  188. StringAttr encryptKey;
  189. StringAttr decryptKey;
  190. int fileUmask;
  191. };
  192. ITransformer * createTransformer(IFile * input, offset_t startOffset, offset_t length, const FileFormat & srcFormat, const FileFormat & tgtFormat);
  193. #endif