thorread.cpp 54 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "thorfile.hpp"
  15. #include "eclhelper.hpp"
  16. #include "eclrtl.hpp"
  17. #include "eclrtl_imp.hpp"
  18. #include "rtlfield.hpp"
  19. #include "rtlds_imp.hpp"
  20. #include "rtldynfield.hpp"
  21. #include "roxiemem.hpp"
  22. #include "rmtclient.hpp"
  23. #include "rmtfile.hpp"
  24. #include "thorread.hpp"
  25. #include "rtlcommon.hpp"
  26. #include "thorcommon.hpp"
  27. #include "csvsplitter.hpp"
  28. //---------------------------------------------------------------------------------------------------------------------
  29. /*
  30. * A class that implements IDiskReadMapping - which provides all the information representing a translation from actual->expected->projected.
  31. */
  32. //It might be sensible to have result structure which is (mode, expected, projected) shared by all actual->result mappings
  33. class DiskReadMapping : public CInterfaceOf<IDiskReadMapping>
  34. {
  35. public:
  36. DiskReadMapping(RecordTranslationMode _mode, const char * _format, unsigned _actualCrc, IOutputMetaData & _actual, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _output, const IPropertyTree * _options)
  37. : mode(_mode), format(_format), actualCrc(_actualCrc), actualMeta(&_actual), expectedCrc(_expectedCrc), expectedMeta(&_expected), projectedCrc(_projectedCrc), projectedMeta(&_output), options(_options)
  38. {}
  39. virtual const char * queryFormat() const override { return format; }
  40. virtual unsigned getActualCrc() const override { return actualCrc; }
  41. virtual unsigned getExpectedCrc() const override { return expectedCrc; }
  42. virtual unsigned getProjectedCrc() const override { return projectedCrc; }
  43. virtual IOutputMetaData * queryActualMeta() const override { return actualMeta; }
  44. virtual IOutputMetaData * queryExpectedMeta() const override{ return expectedMeta; }
  45. virtual IOutputMetaData * queryProjectedMeta() const override{ return projectedMeta; }
  46. virtual const IPropertyTree * queryOptions() const override { return options; }
  47. virtual RecordTranslationMode queryTranslationMode() const override { return mode; }
  48. virtual const IDynamicTransform * queryTranslator() const override
  49. {
  50. ensureTranslators();
  51. return translator;
  52. }
  53. virtual const IKeyTranslator *queryKeyedTranslator() const override
  54. {
  55. ensureTranslators();
  56. return keyedTranslator;
  57. }
  58. virtual bool matches(const IDiskReadMapping * other) const
  59. {
  60. return mode == other->queryTranslationMode() && streq(format, other->queryFormat()) &&
  61. ((actualCrc && actualCrc == other->getActualCrc()) || (actualMeta == other->queryActualMeta())) &&
  62. ((expectedCrc && expectedCrc == other->getExpectedCrc()) || (expectedMeta == other->queryExpectedMeta())) &&
  63. ((projectedCrc && projectedCrc == other->getProjectedCrc()) || (projectedMeta == other->queryProjectedMeta())) &&
  64. areMatchingPTrees(options, other->queryOptions());
  65. }
  66. virtual bool expectedMatchesProjected() const
  67. {
  68. return (expectedCrc && (expectedCrc == projectedCrc)) || (expectedMeta == projectedMeta);
  69. }
  70. protected:
  71. void ensureTranslators() const;
  72. protected:
  73. RecordTranslationMode mode;
  74. mutable std::atomic<bool> checkedTranslators = { false };
  75. StringAttr format;
  76. unsigned actualCrc;
  77. unsigned expectedCrc;
  78. unsigned projectedCrc;
  79. Linked<IOutputMetaData> actualMeta;
  80. Linked<IOutputMetaData> expectedMeta;
  81. Linked<IOutputMetaData> projectedMeta;
  82. Linked<const IPropertyTree> options;
  83. mutable Owned<const IDynamicTransform> translator;
  84. mutable Owned<const IKeyTranslator> keyedTranslator;
  85. mutable SpinLock translatorLock; // use a spin lock since almost certainly not going to contend
  86. };
  87. void DiskReadMapping::ensureTranslators() const
  88. {
  89. if (checkedTranslators.load())
  90. return;
  91. SpinBlock block(translatorLock);
  92. if (checkedTranslators.load())
  93. return;
  94. const char * filename = ""; // not known at this point
  95. IOutputMetaData * sourceMeta = expectedMeta;
  96. unsigned sourceCrc = expectedCrc;
  97. if (mode != RecordTranslationMode::AlwaysECL)
  98. {
  99. if (actualCrc && actualMeta)
  100. {
  101. sourceMeta = actualMeta;
  102. sourceCrc = actualCrc;
  103. }
  104. if (actualCrc && expectedCrc && (actualCrc != expectedCrc) && (RecordTranslationMode::None == mode))
  105. throwTranslationError(actualMeta->queryRecordAccessor(true), expectedMeta->queryRecordAccessor(true), filename);
  106. }
  107. //This has a very low possibility of Meta crcs accidentally matching, which could lead to a crashes on an untranslated files.
  108. const RtlRecord & projectedRecord = projectedMeta->queryRecordAccessor(true);
  109. const RtlRecord & sourceRecord = sourceMeta->queryRecordAccessor(true);
  110. if (strsame(format, "csv"))
  111. {
  112. type_vals format = options->hasProp("ascii") ? type_string : type_utf8;
  113. translator.setown(createRecordTranslatorViaCallback(projectedRecord, sourceRecord, format));
  114. }
  115. else if (strsame(format, "xml"))
  116. {
  117. translator.setown(createRecordTranslatorViaCallback(projectedRecord, sourceRecord, type_utf8));
  118. }
  119. else
  120. {
  121. if ((projectedMeta != sourceMeta) && (projectedCrc != sourceCrc))
  122. translator.setown(createRecordTranslator(projectedRecord, sourceRecord));
  123. }
  124. if (translator)
  125. {
  126. DBGLOG("Record layout translator created for %s", filename);
  127. translator->describe();
  128. if (!translator->canTranslate())
  129. throw MakeStringException(0, "Untranslatable record layout mismatch detected for file %s", filename);
  130. if (translator->needsTranslate())
  131. {
  132. if (sourceMeta != expectedMeta)
  133. {
  134. Owned<const IKeyTranslator> _keyedTranslator = createKeyTranslator(sourceMeta->queryRecordAccessor(true), expectedMeta->queryRecordAccessor(true));
  135. //MORE: What happens if the key filters cannot be translated?
  136. if (_keyedTranslator->needsTranslate())
  137. keyedTranslator.swap(_keyedTranslator);
  138. }
  139. }
  140. else
  141. translator.clear();
  142. }
  143. checkedTranslators = true;
  144. }
  145. THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mode, const char * format, unsigned actualCrc, IOutputMetaData & actual, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, const IPropertyTree * options)
  146. {
  147. assertex(expectedCrc);
  148. assertex(options);
  149. return new DiskReadMapping(mode, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);
  150. }
  151. THORHELPER_API IDiskReadMapping * createUnprojectedMapping(IDiskReadMapping * mapping)
  152. {
  153. return createDiskReadMapping(mapping->queryTranslationMode(), mapping->queryFormat(), mapping->getActualCrc(), *mapping->queryActualMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->queryOptions());
  154. }
  155. //---------------------------------------------------------------------------------------------------------------------
  156. constexpr size32_t defaultReadBufferSize = 0x10000;
  157. /*
  158. * The base class for reading rows from an external file. Each activity will have an instance of a disk reader for
  159. * each actual file format.
  160. */
  161. class DiskRowReader : public CInterfaceOf<IDiskRowStream>, implements IDiskRowReader, implements IThorDiskCallback
  162. {
  163. public:
  164. DiskRowReader(IDiskReadMapping * _mapping);
  165. IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IDiskRowStream>)
  166. virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override;
  167. virtual bool getCursor(MemoryBuffer & cursor) override;
  168. virtual void setCursor(MemoryBuffer & cursor) override;
  169. virtual void stop() override;
  170. virtual void clearInput() override;
  171. virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override;
  172. // IThorDiskCallback
  173. virtual offset_t getFilePosition(const void * row) override;
  174. virtual offset_t getLocalFilePosition(const void * row) override;
  175. virtual const char * queryLogicalFilename(const void * row) override;
  176. virtual const byte * lookupBlob(unsigned __int64 id) override { UNIMPLEMENTED; }
  177. protected:
  178. virtual offset_t getLocalOffset();
  179. protected:
  180. Owned<ISerialStream> inputStream;
  181. Owned<IFileIO> inputfileio;
  182. CThorContiguousRowBuffer inputBuffer; // more: move to derived classes.
  183. Owned<IEngineRowAllocator> outputAllocator;
  184. RtlDynamicRowBuilder allocatedBuilder;
  185. const IDynamicTransform * translator = nullptr;
  186. const IKeyTranslator * keyedTranslator = nullptr;
  187. Linked<IDiskReadMapping> mapping;
  188. IOutputMetaData * actualDiskMeta = nullptr;
  189. MemoryBuffer encryptionKey;
  190. size32_t readBufferSize = defaultReadBufferSize;
  191. bool grouped = false;
  192. bool stranded = false;
  193. bool compressed = false;
  194. bool blockcompressed = false;
  195. bool rowcompressed = false;
  196. //The following refer to the current input file:
  197. offset_t fileBaseOffset = 0;
  198. StringAttr logicalFilename;
  199. unsigned filePart = 0;
  200. };
  201. DiskRowReader::DiskRowReader(IDiskReadMapping * _mapping)
  202. : mapping(_mapping), actualDiskMeta(_mapping->queryActualMeta()), allocatedBuilder(nullptr)
  203. {
  204. //Options contain information that is the same for each file that is being read, and potentially expensive to reconfigure.
  205. translator = mapping->queryTranslator();
  206. keyedTranslator = mapping->queryKeyedTranslator();
  207. const IPropertyTree * options = mapping->queryOptions();
  208. if (options->hasProp("encryptionKey"))
  209. {
  210. encryptionKey.resetBuffer();
  211. options->getPropBin("encryptionKey", encryptionKey);
  212. }
  213. readBufferSize = options->getPropInt("readBufferSize", defaultReadBufferSize);
  214. }
  215. IDiskRowStream * DiskRowReader::queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
  216. {
  217. outputAllocator.set(_outputAllocator);
  218. allocatedBuilder.setAllocator(_outputAllocator);
  219. return this;
  220. }
  221. void DiskRowReader::clearInput()
  222. {
  223. inputBuffer.setStream(nullptr);
  224. inputStream.clear();
  225. }
  226. bool DiskRowReader::matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping)
  227. {
  228. if (!mapping->matches(otherMapping))
  229. return false;
  230. //MORE: Check translation mode
  231. //MORE: Is the previous check sufficient? If not, once getDaliLayoutInfo is cached the following line could be enabled.
  232. //if ((expectedDiskMeta != &_expected) || (projectedDiskMeta != &_projected) || (actualDiskMeta != &_actual))
  233. // return false;
  234. const IPropertyTree * options = otherMapping->queryOptions();
  235. if (options->hasProp("encryptionKey"))
  236. {
  237. MemoryBuffer tempEncryptionKey;
  238. options->getPropBin("encryptionKey", tempEncryptionKey);
  239. if (!encryptionKey.matches(tempEncryptionKey))
  240. return false;
  241. }
  242. if (readBufferSize != options->getPropInt("readBufferSize", defaultReadBufferSize))
  243. return false;
  244. return true;
  245. }
  246. bool DiskRowReader::getCursor(MemoryBuffer & cursor)
  247. {
  248. return true;
  249. }
  250. void DiskRowReader::setCursor(MemoryBuffer & cursor)
  251. {
  252. }
  253. void DiskRowReader::stop()
  254. {
  255. }
  256. // IThorDiskCallback
  257. offset_t DiskRowReader::getFilePosition(const void * row)
  258. {
  259. return getLocalOffset() + fileBaseOffset;
  260. }
  261. offset_t DiskRowReader::getLocalFilePosition(const void * row)
  262. {
  263. return makeLocalFposOffset(filePart, getLocalOffset());
  264. }
  265. const char * DiskRowReader::queryLogicalFilename(const void * row)
  266. {
  267. return logicalFilename;
  268. }
  269. offset_t DiskRowReader::getLocalOffset()
  270. {
  271. return inputBuffer.tell();
  272. }
  273. //---------------------------------------------------------------------------------------------------------------------
  274. /*
  275. * base class for reading a local file (or a remote file via the block based IFile interface)
  276. */
  277. class LocalDiskRowReader : public DiskRowReader
  278. {
  279. public:
  280. LocalDiskRowReader(IDiskReadMapping * _mapping);
  281. virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
  282. virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  283. virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  284. protected:
  285. virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter);
  286. virtual bool isBinary() const = 0;
  287. protected:
  288. IConstArrayOf<IFieldFilter> expectedFilter; // These refer to the expected layout
  289. MemoryBuffer tempOutputBuffer;
  290. MemoryBufferBuilder bufferBuilder;
  291. };
  292. LocalDiskRowReader::LocalDiskRowReader(IDiskReadMapping * _mapping)
  293. : DiskRowReader(_mapping), bufferBuilder(tempOutputBuffer, 0)
  294. {
  295. }
  296. bool LocalDiskRowReader::matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping)
  297. {
  298. if (streamRemote)
  299. return false;
  300. return DiskRowReader::matches(format, streamRemote, otherMapping);
  301. }
  302. bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter)
  303. {
  304. assertex(meta);
  305. grouped = meta->getPropBool("grouped");
  306. compressed = meta->getPropBool("compressed", false);
  307. blockcompressed = meta->getPropBool("blockCompressed", false);
  308. bool forceCompressed = meta->getPropBool("forceCompressed", false);
  309. logicalFilename.set(_logicalFilename);
  310. filePart = _partNumber;
  311. fileBaseOffset = _baseOffset;
  312. try
  313. {
  314. if (!inputFile->exists())
  315. return false;
  316. }
  317. catch (IException *e)
  318. {
  319. EXCLOG(e, "DiskReadStage::setInputFile()");
  320. e->Release();
  321. return false;
  322. }
  323. if (isBinary())
  324. {
  325. size32_t dfsRecordSize = meta->getPropInt("dfsRecordSize");
  326. size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
  327. if (dfsRecordSize)
  328. {
  329. if (fixedDiskRecordSize)
  330. {
  331. if (grouped)
  332. fixedDiskRecordSize++;
  333. if (!((dfsRecordSize == fixedDiskRecordSize) || (grouped && (dfsRecordSize+1 == fixedDiskRecordSize)))) //last for backwards compatibility, as hthor used to publish @recordSize not including the grouping byte
  334. throw MakeStringException(0, "Published record size %d for file %s does not match coded record size %d", dfsRecordSize, logicalFilename.str(), fixedDiskRecordSize);
  335. if (!compressed && forceCompressed && (fixedDiskRecordSize >= MIN_ROWCOMPRESS_RECSIZE))
  336. {
  337. StringBuffer msg;
  338. msg.append("Ignoring compression attribute on file ").append(logicalFilename.str()).append(", which is not published as compressed");
  339. WARNLOG("%s", msg.str());
  340. //MORE: No simple way to do this, unless we are passed an engine context:
  341. //agent.addWuException(msg.str(), WRN_MismatchCompressInfo, SeverityWarning, MSGAUD_user, "hthor");
  342. compressed = true;
  343. }
  344. }
  345. }
  346. else
  347. {
  348. if (!compressed && forceCompressed)
  349. {
  350. if ((fixedDiskRecordSize == 0) || (fixedDiskRecordSize + (grouped?1:0) >= MIN_ROWCOMPRESS_RECSIZE))
  351. compressed = true;
  352. }
  353. }
  354. }
  355. rowcompressed = false;
  356. if (compressed)
  357. {
  358. Owned<IExpander> eexp;
  359. if (encryptionKey.length()!=0)
  360. eexp.setown(createAESExpander256((size32_t)encryptionKey.length(),encryptionKey.bufferBase()));
  361. inputfileio.setown(createCompressedFileReader(inputFile,eexp));
  362. if(!inputfileio && !blockcompressed) //fall back to old decompression, unless dfs marked as new
  363. {
  364. inputfileio.setown(inputFile->open(IFOread));
  365. if(inputfileio)
  366. rowcompressed = true;
  367. }
  368. }
  369. else
  370. inputfileio.setown(inputFile->open(IFOread));
  371. if (!inputfileio)
  372. return false;
  373. unsigned __int64 filesize = inputfileio->size();
  374. //MORE: Allow a previously created input stream to be reused to avoid reallocating the buffer
  375. inputStream.setown(createFileSerialStream(inputfileio, 0, filesize, readBufferSize));
  376. expectedFilter.clear();
  377. ForEachItemIn(i, _expectedFilter)
  378. expectedFilter.append(OLINK(_expectedFilter.item(i)));
  379. return true;
  380. }
  381. bool LocalDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  382. {
  383. Owned<IFile> inputFile = createIFile(localFilename);
  384. return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
  385. }
  386. bool LocalDiskRowReader::setInputFile(const RemoteFilename & filename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  387. {
  388. Owned<IFile> inputFile = createIFile(filename);
  389. return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
  390. }
  391. //---------------------------------------------------------------------------------------------------------------------
  392. /*
  393. * base class for reading a binary local file
  394. */
  395. class BinaryDiskRowReader : public LocalDiskRowReader
  396. {
  397. public:
  398. BinaryDiskRowReader(IDiskReadMapping * _mapping);
  399. virtual const void *nextRow() override;
  400. virtual const void *nextRow(size32_t & resultSize) override;
  401. virtual const void * nextRow(MemoryBufferBuilder & builder) override;
  402. virtual bool getCursor(MemoryBuffer & cursor) override;
  403. virtual void setCursor(MemoryBuffer & cursor) override;
  404. virtual void stop() override;
  405. virtual void clearInput() override;
  406. virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
  407. protected:
  408. virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  409. virtual bool isBinary() const { return true; }
  410. inline bool fieldFilterMatch(const void * buffer)
  411. {
  412. if (actualFilter.numFilterFields())
  413. {
  414. unsigned numOffsets = actualRecord->getNumVarFields() + 1;
  415. size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
  416. RtlRow row(*actualRecord, nullptr, numOffsets, variableOffsets);
  417. row.setRow(buffer, 0); // Use lazy offset calculation
  418. return actualFilter.matches(row);
  419. }
  420. else
  421. return true;
  422. }
  423. size32_t getFixedDiskRecordSize();
  424. private:
  425. template <class PROCESS>
  426. inline const void * inlineNextRow(PROCESS processor) __attribute__((always_inline));
  427. protected:
  428. ISourceRowPrefetcher * actualRowPrefetcher = nullptr;
  429. const RtlRecord * actualRecord = nullptr;
  430. RowFilter actualFilter; // This refers to the actual disk layout
  431. bool eogPending = false;
  432. bool needToTranslate;
  433. };
  434. BinaryDiskRowReader::BinaryDiskRowReader(IDiskReadMapping * _mapping)
  435. : LocalDiskRowReader(_mapping)
  436. {
  437. actualRowPrefetcher = actualDiskMeta->createDiskPrefetcher();
  438. actualRecord = &actualDiskMeta->queryRecordAccessor(true);
  439. needToTranslate = (translator && translator->needsTranslate());
  440. }
  441. void BinaryDiskRowReader::clearInput()
  442. {
  443. LocalDiskRowReader::clearInput();
  444. eogPending = false;
  445. }
  446. bool BinaryDiskRowReader::matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping)
  447. {
  448. if (!strieq(format, "flat"))
  449. return false;
  450. return LocalDiskRowReader::matches(format, streamRemote, otherMapping);
  451. }
  452. bool BinaryDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  453. {
  454. if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter))
  455. return false;
  456. actualFilter.clear().appendFilters(expectedFilter);
  457. if (keyedTranslator)
  458. keyedTranslator->translate(actualFilter);
  459. unsigned __int64 filesize = inputfileio->size();
  460. if (!compressed && getFixedDiskRecordSize() && ((offset_t)-1 != filesize) && (filesize % getFixedDiskRecordSize()) != 0)
  461. {
  462. StringBuffer s;
  463. s.append("File ").append(inputFile->queryFilename()).append(" size is ").append(filesize).append(" which is not a multiple of ").append(getFixedDiskRecordSize());
  464. throw makeStringException(MSGAUD_user, 1, s.str());
  465. }
  466. inputBuffer.setStream(inputStream);
  467. eogPending = false;
  468. return true;
  469. }
  470. template <class PROCESS>
  471. const void *BinaryDiskRowReader::inlineNextRow(PROCESS processor)
  472. {
  473. for (;;)
  474. {
  475. //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
  476. //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
  477. //Multiple eogs should also be harmless if the engines switch to this representation.
  478. if (eogPending)
  479. {
  480. eogPending = false;
  481. return eogRow;
  482. }
  483. inputBuffer.finishedRow();
  484. if (inputBuffer.eos())
  485. return eofRow;
  486. //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
  487. //Is there a better way storing this (and combining it with the eog markers)?
  488. if (stranded)
  489. {
  490. bool eosPending;
  491. inputBuffer.read(eosPending);
  492. if (eosPending)
  493. return eosRow;
  494. //Call finishRow() so it is not included in the row pointer. This should be special cased in the base class
  495. inputBuffer.finishedRow();
  496. if (inputBuffer.eos())
  497. return eofRow;
  498. }
  499. actualRowPrefetcher->readAhead(inputBuffer);
  500. size32_t sizeRead = inputBuffer.queryRowSize();
  501. if (grouped)
  502. inputBuffer.read(eogPending);
  503. const byte * next = inputBuffer.queryRow();
  504. if (likely(fieldFilterMatch(next))) // NOTE - keyed fields are checked pre-translation
  505. return processor(sizeRead, next);
  506. }
  507. }
  508. //Implementation of IAllocRowStream, return a row allocated with roxiemem
  509. const void *BinaryDiskRowReader::nextRow()
  510. {
  511. return inlineNextRow(
  512. [this](size32_t sizeRead, const byte * next)
  513. {
  514. if (needToTranslate)
  515. {
  516. size32_t size = translator->translate(allocatedBuilder.ensureRow(), *this, next);
  517. return allocatedBuilder.finalizeRowClear(size);
  518. }
  519. else
  520. {
  521. size32_t allocatedSize;
  522. void * result = outputAllocator->createRow(sizeRead, allocatedSize);
  523. memcpy(result, next, sizeRead);
  524. return (const void *)outputAllocator->finalizeRow(sizeRead, result, allocatedSize);
  525. }
  526. }
  527. );
  528. }
  529. //Similar to above, except the code at the end will translate to a local buffer or return the pointer
  530. const void *BinaryDiskRowReader::nextRow(size32_t & resultSize)
  531. {
  532. return inlineNextRow(
  533. [this,&resultSize](size32_t sizeRead, const byte * next)
  534. {
  535. if (needToTranslate)
  536. {
  537. //MORE: optimize the case where fields are lost off the end, and not bother translating - but return the modified size.
  538. tempOutputBuffer.clear();
  539. resultSize = translator->translate(bufferBuilder, *this, next);
  540. const void * ret = bufferBuilder.getSelf();
  541. bufferBuilder.finishRow(resultSize);
  542. return ret;
  543. }
  544. else
  545. {
  546. resultSize = sizeRead;
  547. return (const void *)next;
  548. }
  549. }
  550. );
  551. }
  552. //return a row allocated within a MemoryBufferBuilder
  553. const void *BinaryDiskRowReader::nextRow(MemoryBufferBuilder & builder)
  554. {
  555. return inlineNextRow(
  556. [this,&builder](size32_t sizeRead, const byte * next)
  557. {
  558. //MORE: optimize the case where fields are lost off the end, and not bother translating - but return the modified size.
  559. if (needToTranslate)
  560. {
  561. size32_t resultSize = translator->translate(builder, *this, next);
  562. const void * ret = builder.getSelf();
  563. builder.finishRow(resultSize);
  564. return ret;
  565. }
  566. else
  567. {
  568. builder.appendBytes(sizeRead, next);
  569. return (const void *)(builder.getSelf() - sizeRead);
  570. }
  571. }
  572. );
  573. }
  574. //Common to IAllocRowStream and IRawRowStream
  575. bool BinaryDiskRowReader::getCursor(MemoryBuffer & cursor)
  576. {
  577. //Is the following needed?
  578. inputBuffer.finishedRow();
  579. cursor.append(inputBuffer.tell());
  580. cursor.append(eogPending);
  581. return true;
  582. }
  583. void BinaryDiskRowReader::setCursor(MemoryBuffer & cursor)
  584. {
  585. unsigned __int64 startPos;
  586. cursor.read(startPos);
  587. cursor.read(eogPending);
  588. if (inputBuffer.tell() != startPos)
  589. inputBuffer.reset(startPos);
  590. }
  591. void BinaryDiskRowReader::stop()
  592. {
  593. }
  594. // IDiskRowReader
  595. size32_t BinaryDiskRowReader::getFixedDiskRecordSize()
  596. {
  597. size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
  598. if (fixedDiskRecordSize && grouped)
  599. fixedDiskRecordSize += 1;
  600. return fixedDiskRecordSize;
  601. }
  602. //---------------------------------------------------------------------------------------------------------------------
  603. /*
  604. * base class for reading a non-binary local file that uses IDynamicFieldValueFetcher to extract the values from
  605. * the input data file.
  606. */
  607. class ExternalFormatDiskRowReader : public LocalDiskRowReader
  608. {
  609. public:
  610. ExternalFormatDiskRowReader(IDiskReadMapping * _mapping) : LocalDiskRowReader(_mapping)
  611. {
  612. projectedRecord = &mapping->queryProjectedMeta()->queryRecordAccessor(true);
  613. }
  614. virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter) override
  615. {
  616. if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, _expectedFilter))
  617. return false;
  618. projectedFilter.clear().appendFilters(_expectedFilter);
  619. //If the following is false then needs keyedTranslator code - but mapping from expected to PROJECTED
  620. assertex(mapping->expectedMatchesProjected() || projectedFilter.numFilterFields() == 0);
  621. //if (keyedTranslator)
  622. // keyedTranslator->translate(projectedFilter);
  623. return true;
  624. }
  625. //Common to IAllocRowStream and IRawRowStream
  626. virtual bool getCursor(MemoryBuffer & cursor) override
  627. {
  628. cursor.append(inputStream->tell());
  629. return true;
  630. }
  631. virtual void setCursor(MemoryBuffer & cursor) override
  632. {
  633. unsigned __int64 startPos;
  634. cursor.read(startPos);
  635. if (inputStream->tell() != startPos)
  636. inputStream->reset(startPos);
  637. }
  638. virtual offset_t getLocalOffset() override
  639. {
  640. return inputStream->tell();
  641. }
  642. protected:
  643. virtual bool isBinary() const { return false; }
  644. protected:
  645. Owned<const IDynamicFieldValueFetcher> fieldFetcher;
  646. RowFilter projectedFilter;
  647. const RtlRecord * projectedRecord = nullptr;
  648. };
  649. class CNullNestedRowIterator : public CSimpleInterfaceOf<IDynamicRowIterator>
  650. {
  651. public:
  652. virtual bool first() override { return false; }
  653. virtual bool next() override { return false; }
  654. virtual bool isValid() override { return false; }
  655. virtual IDynamicFieldValueFetcher &query() override
  656. {
  657. throwUnexpected();
  658. }
  659. };
  660. static CNullNestedRowIterator nullNestedRowIterator;
  661. //---------------------------------------------------------------------------------------------------------------------
  662. /*
  663. * class for reading a csv local file
  664. */
  665. class CsvDiskRowReader : public ExternalFormatDiskRowReader
  666. {
  667. private:
  668. class CFieldFetcher : public CSimpleInterfaceOf<IDynamicFieldValueFetcher>
  669. {
  670. CSVSplitter &csvSplitter;
  671. unsigned numInputFields;
  672. public:
  673. CFieldFetcher(CSVSplitter &_csvSplitter, unsigned _numInputFields) : csvSplitter(_csvSplitter), numInputFields(_numInputFields)
  674. {
  675. }
  676. virtual const byte *queryValue(unsigned fieldNum, size_t &sz) const override
  677. {
  678. dbgassertex(fieldNum < numInputFields);
  679. sz = csvSplitter.queryLengths()[fieldNum];
  680. return csvSplitter.queryData()[fieldNum];
  681. }
  682. virtual IDynamicRowIterator *getNestedIterator(unsigned fieldNum) const override
  683. {
  684. return LINK(&nullNestedRowIterator);
  685. }
  686. virtual size_t getSize(unsigned fieldNum) const override { throwUnexpected(); }
  687. virtual size32_t getRecordSize() const override { throwUnexpected(); }
  688. };
  689. public:
  690. CsvDiskRowReader(IDiskReadMapping * _mapping);
  691. virtual const void *nextRow() override;
  692. virtual const void *nextRow(size32_t & resultSize) override;
  693. virtual const void *nextRow(MemoryBufferBuilder & builder) override;
  694. virtual void stop() override;
  695. virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
  696. protected:
  697. virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  698. void processOption(CSVSplitter::MatchItem element, const IPropertyTree & config, const char * option, const char * dft, const char * dft2 = nullptr);
  699. inline bool fieldFilterMatchProjected(const void * buffer)
  700. {
  701. if (projectedFilter.numFilterFields())
  702. {
  703. unsigned numOffsets = projectedRecord->getNumVarFields() + 1;
  704. size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
  705. RtlRow row(*projectedRecord, nullptr, numOffsets, variableOffsets);
  706. row.setRow(buffer, 0); // Use lazy offset calculation
  707. return projectedFilter.matches(row);
  708. }
  709. else
  710. return true;
  711. }
  712. size32_t getFixedDiskRecordSize();
  713. protected:
  714. constexpr static unsigned defaultMaxCsvRowSizeMB = 10;
  715. StringBuffer csvQuote, csvSeparate, csvTerminate, csvEscape;
  716. unsigned __int64 headerLines = 0;
  717. unsigned __int64 maxRowSize = 0;
  718. bool preserveWhitespace = false;
  719. CSVSplitter csvSplitter;
  720. };
  721. CsvDiskRowReader::CsvDiskRowReader(IDiskReadMapping * _mapping)
  722. : ExternalFormatDiskRowReader(_mapping)
  723. {
  724. const IPropertyTree & config = *mapping->queryOptions();
  725. maxRowSize = config.getPropInt64("maxRowSize", defaultMaxCsvRowSizeMB) * 1024 * 1024;
  726. preserveWhitespace = config.getPropBool("preserveWhitespace", false);
  727. preserveWhitespace = config.getPropBool("notrim", preserveWhitespace);
  728. const RtlRecord * inputRecord = &mapping->queryActualMeta()->queryRecordAccessor(true);
  729. unsigned numInputFields = inputRecord->getNumFields();
  730. csvSplitter.init(numInputFields, maxRowSize, csvQuote, csvSeparate, csvTerminate, csvEscape, preserveWhitespace);
  731. //MORE: How about options from the file? - test writing with some options and then reading without specifying them
  732. processOption(CSVSplitter::QUOTE, config, "quote", "\"");
  733. processOption(CSVSplitter::SEPARATOR, config, "separator", ",");
  734. processOption(CSVSplitter::TERMINATOR, config, "terminator", "\n", "\r\n");
  735. if (config.getProp("escape", csvEscape))
  736. csvSplitter.addEscape(csvEscape);
  737. headerLines = config.getPropInt64("heading");
  738. fieldFetcher.setown(new CFieldFetcher(csvSplitter, numInputFields));
  739. }
  740. bool CsvDiskRowReader::matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping)
  741. {
  742. if (!strieq(format, "csv"))
  743. return false;
  744. return ExternalFormatDiskRowReader::matches(format, streamRemote, otherMapping);
  745. }
  746. void CsvDiskRowReader::processOption(CSVSplitter::MatchItem element, const IPropertyTree & config, const char * option, const char * dft, const char * dft2)
  747. {
  748. if (config.hasProp(option))
  749. {
  750. bool useAscii = mapping->queryOptions()->hasProp("ascii");
  751. Owned<IPropertyTreeIterator> iter = config.getElements(option);
  752. ForEach(*iter)
  753. {
  754. const char * value = iter->query().queryProp("");
  755. StringBuffer temp;
  756. if (value && useAscii)
  757. {
  758. char * ascii = rtlUtf8ToVStr(rtlUtf8Length(strlen(value), value), value);
  759. csvSplitter.addItem(element, ascii);
  760. free(ascii);
  761. }
  762. else
  763. csvSplitter.addItem(element, value);
  764. }
  765. }
  766. else
  767. {
  768. csvSplitter.addItem(element, dft);
  769. if (dft2)
  770. csvSplitter.addItem(element, dft2);
  771. }
  772. }
  773. bool CsvDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter)
  774. {
  775. if (!ExternalFormatDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, _expectedFilter))
  776. return false;
  777. //Skip any header lines..
  778. for (unsigned __int64 line = 0; line < headerLines; line++)
  779. {
  780. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  781. if (0 == lineLength)
  782. break;
  783. inputStream->skip(lineLength);
  784. }
  785. return true;
  786. }
  787. //Implementation of IAllocRowStream
  788. const void *CsvDiskRowReader::nextRow()
  789. {
  790. for (;;) //while (processed < chooseN)
  791. {
  792. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  793. if (!lineLength)
  794. break;
  795. size32_t resultSize = translator->translate(allocatedBuilder.ensureRow(), *this, *fieldFetcher);
  796. inputStream->skip(lineLength);
  797. roxiemem::OwnedConstRoxieRow result = allocatedBuilder.finalizeRowClear(resultSize);
  798. if (fieldFilterMatchProjected(result))
  799. return result.getClear();
  800. }
  801. return eofRow;
  802. }
  803. //Implementation of IRawRowStream
  804. const void *CsvDiskRowReader::nextRow(size32_t & resultSize)
  805. {
  806. for (;;)
  807. {
  808. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  809. if (!lineLength)
  810. break;
  811. resultSize = translator->translate(bufferBuilder, *this, *fieldFetcher);
  812. dbgassertex(resultSize);
  813. const void *ret = bufferBuilder.getSelf();
  814. if (fieldFilterMatchProjected(ret))
  815. {
  816. bufferBuilder.finishRow(resultSize);
  817. inputStream->skip(lineLength);
  818. return ret;
  819. }
  820. else
  821. bufferBuilder.removeBytes(resultSize);
  822. inputStream->skip(lineLength);
  823. }
  824. resultSize = 0;
  825. return nullptr;
  826. }
  827. const void * CsvDiskRowReader::nextRow(MemoryBufferBuilder & builder)
  828. {
  829. for (;;)
  830. {
  831. size32_t lineLength = csvSplitter.splitLine(inputStream, maxRowSize);
  832. if (!lineLength)
  833. break;
  834. size32_t resultSize = translator->translate(bufferBuilder, *this, *fieldFetcher);
  835. dbgassertex(resultSize);
  836. const void *ret = builder.getSelf();
  837. if (fieldFilterMatchProjected(ret))
  838. {
  839. builder.finishRow(resultSize);
  840. inputStream->skip(lineLength);
  841. return ret;
  842. }
  843. else
  844. builder.removeBytes(resultSize);
  845. inputStream->skip(lineLength);
  846. }
  847. return nullptr;
  848. }
  849. void CsvDiskRowReader::stop()
  850. {
  851. }
  852. // IDiskRowReader
  853. size32_t CsvDiskRowReader::getFixedDiskRecordSize()
  854. {
  855. size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
  856. if (fixedDiskRecordSize && grouped)
  857. fixedDiskRecordSize += 1;
  858. return fixedDiskRecordSize;
  859. }
  860. //---------------------------------------------------------------------------------------------------------------------
  861. /*
  862. * This class is used to project the input rows - for the situations where the disk reader cannot perform
  863. * all the filtering and projection that is required.
  864. */
  865. class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implements IDiskRowReader
  866. {
  867. Linked<IDiskReadMapping> mapping;
  868. Linked<IDiskRowReader> inputReader;
  869. UnexpectedVirtualFieldCallback unexpectedCallback;
  870. Owned<const IDynamicTransform> translator;
  871. MemoryBuffer tempOutputBuffer;
  872. MemoryBufferBuilder bufferBuilder;
  873. RtlDynamicRowBuilder allocatedBuilder;
  874. Linked<IEngineRowAllocator> outputAllocator;
  875. IDiskRowStream * rawInputStream;
  876. public:
  877. CompoundProjectRowReader(IDiskRowReader * _input, IDiskReadMapping * _mapping)
  878. : inputReader(_input), mapping(_mapping), bufferBuilder(tempOutputBuffer, 0), allocatedBuilder(nullptr)
  879. {
  880. const RtlRecord &inRecord = mapping->queryExpectedMeta()->queryRecordAccessor(true);
  881. const RtlRecord &outRecord = mapping->queryProjectedMeta()->queryRecordAccessor(true);
  882. translator.setown(createRecordTranslator(outRecord, inRecord));
  883. }
  884. IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IDiskRowStream>)
  885. virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
  886. {
  887. allocatedBuilder.setAllocator(_outputAllocator);
  888. outputAllocator.set(_outputAllocator);
  889. return this;
  890. }
  891. virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
  892. {
  893. return false;
  894. }
  895. virtual void clearInput()
  896. {
  897. inputReader->clearInput();
  898. rawInputStream = nullptr;
  899. }
  900. virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  901. {
  902. if (inputReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, meta, expectedFilter))
  903. {
  904. rawInputStream = inputReader->queryAllocatedRowStream(nullptr);
  905. return true;
  906. }
  907. return false;
  908. }
  909. virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  910. {
  911. if (inputReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, meta, expectedFilter))
  912. {
  913. rawInputStream = inputReader->queryAllocatedRowStream(nullptr);
  914. return true;
  915. }
  916. return false;
  917. }
  918. //interface IRowReader
  919. virtual bool getCursor(MemoryBuffer & cursor) { return rawInputStream->getCursor(cursor); }
  920. virtual void setCursor(MemoryBuffer & cursor) { rawInputStream->setCursor(cursor); }
  921. virtual void stop() { rawInputStream->stop(); }
  922. virtual const void *nextRow(size32_t & resultSize) override
  923. {
  924. size32_t rawInputSize;
  925. const void * next = rawInputStream->nextRow(rawInputSize);
  926. if (isSpecialRow(next))
  927. return next;
  928. //MORE: optimize the case where fields are lost off the end, and not bother translating - but return the modified size.
  929. tempOutputBuffer.clear();
  930. resultSize = translator->translate(bufferBuilder, unexpectedCallback, (const byte *)next);
  931. const void * ret = bufferBuilder.getSelf();
  932. bufferBuilder.finishRow(resultSize);
  933. return ret;
  934. }
  935. virtual const void *nextRow() override
  936. {
  937. size32_t rawInputSize;
  938. const void * next = rawInputStream->nextRow(rawInputSize);
  939. if (isSpecialRow(next))
  940. return next;
  941. size32_t size = translator->translate(allocatedBuilder.ensureRow(), unexpectedCallback, (const byte *)next);
  942. return allocatedBuilder.finalizeRowClear(size);
  943. }
  944. virtual const void *nextRow(MemoryBufferBuilder & builder) override
  945. {
  946. size32_t rawInputSize;
  947. const void * next = rawInputStream->nextRow(rawInputSize);
  948. if (isSpecialRow(next))
  949. return next;
  950. //MORE: optimize the case where fields are lost off the end, and not bother translating - but return the modified size.
  951. size32_t resultSize = translator->translate(builder, unexpectedCallback, (const byte *)next);
  952. const void * ret = builder.getSelf();
  953. bufferBuilder.finishRow(resultSize);
  954. return ret;
  955. }
  956. };
  957. /*
  958. * This class is used for formats which may or may not be able to perform all the filtering and projection that an
  959. * input dataset requires. Depending on the filter it will add an extra layer of translation if required.
  960. */
  961. class AlternativeDiskRowReader : public CInterfaceOf<IDiskRowReader>
  962. {
  963. public:
  964. AlternativeDiskRowReader(IDiskRowReader * projectedReader, IDiskRowReader * expectedReader, IDiskReadMapping * mapping)
  965. {
  966. directReader.set(projectedReader);
  967. compoundReader.setown(new CompoundProjectRowReader(expectedReader, mapping));
  968. }
  969. virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
  970. {
  971. assertex(activeReader);
  972. return activeReader->queryAllocatedRowStream(_outputAllocator);
  973. }
  974. virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
  975. {
  976. return directReader->matches(_format, _streamRemote, _mapping);
  977. }
  978. //Specify where the raw binary input for a particular file is coming from, together with its actual format.
  979. //Does this make sense, or should it be passed a filename? an actual format?
  980. //Needs to specify a filename rather than a ISerialStream so that the interface is consistent for local and remote
  981. virtual void clearInput()
  982. {
  983. directReader->clearInput();
  984. compoundReader->clearInput();
  985. activeReader = nullptr;
  986. }
  987. virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  988. {
  989. bool useProjected = canFilterDirectly(expectedFilter);
  990. if (useProjected)
  991. activeReader = directReader;
  992. else
  993. activeReader = compoundReader;
  994. return activeReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, meta, expectedFilter);
  995. }
  996. virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  997. {
  998. bool useProjected = canFilterDirectly(expectedFilter);
  999. if (useProjected)
  1000. activeReader = directReader;
  1001. else
  1002. activeReader = compoundReader;
  1003. return activeReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, meta, expectedFilter);
  1004. }
  1005. protected:
  1006. bool canFilterDirectly(const FieldFilterArray & expectedFilter)
  1007. {
  1008. if (expectedFilter.ordinality() == 0)
  1009. return true;
  1010. //MORE: Check if all the fields being filtered are in the projected output
  1011. return false;
  1012. }
  1013. protected:
  1014. Owned<IDiskRowReader> directReader;
  1015. Owned<IDiskRowReader> compoundReader;
  1016. IDiskRowReader * activeReader = nullptr;
  1017. };
  1018. //---------------------------------------------------------------------------------------------------------------------
  1019. /*
  1020. * This class is used to read files that have been remotely filtered and projected by dafilesrv.
  1021. */
  1022. class RemoteDiskRowReader : public DiskRowReader
  1023. {
  1024. public:
  1025. RemoteDiskRowReader(const char * _format, IDiskReadMapping * _mapping);
  1026. virtual const void *nextRow() override;
  1027. virtual const void *nextRow(size32_t & resultSize) override;
  1028. virtual const void *nextRow(MemoryBufferBuilder & builder) override;
  1029. virtual bool getCursor(MemoryBuffer & cursor) override;
  1030. virtual void setCursor(MemoryBuffer & cursor) override;
  1031. virtual void stop() override;
  1032. virtual void clearInput() override;
  1033. virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping) override;
  1034. // IDiskRowReader
  1035. virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  1036. virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
  1037. private:
  1038. template <class PROCESS>
  1039. inline const void * inlineNextRow(PROCESS processor) __attribute__((always_inline));
  1040. protected:
  1041. ISourceRowPrefetcher * projectedRowPrefetcher = nullptr;
  1042. StringAttr format;
  1043. RecordTranslationMode translationMode;
  1044. bool eogPending = false;
  1045. };
  1046. RemoteDiskRowReader::RemoteDiskRowReader(const char * _format, IDiskReadMapping * _mapping)
  1047. : DiskRowReader(_mapping), format(_format)
  1048. {
  1049. translationMode = mapping->queryTranslationMode();
  1050. projectedRowPrefetcher = mapping->queryProjectedMeta()->createDiskPrefetcher();
  1051. }
  1052. void RemoteDiskRowReader::clearInput()
  1053. {
  1054. DiskRowReader::clearInput();
  1055. eogPending = false;
  1056. }
  1057. bool RemoteDiskRowReader::matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
  1058. {
  1059. if (!_streamRemote)
  1060. return false;
  1061. if (!strieq(format, _format))
  1062. return false;
  1063. return DiskRowReader::matches(_format, _streamRemote, _mapping);
  1064. }
  1065. bool RemoteDiskRowReader::setInputFile(const RemoteFilename & rfilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilters)
  1066. {
  1067. // NB: only binary handles can be remotely processed by dafilesrv at the moment
  1068. // Open a stream from remote file, having passed actual, expected, projected, and filters to it
  1069. SocketEndpoint ep(rfilename.queryEndpoint());
  1070. setDafsEndpointPort(ep);
  1071. StringBuffer localPath;
  1072. rfilename.getLocalPath(localPath);
  1073. RowFilter actualFilter;
  1074. actualFilter.appendFilters(expectedFilters);
  1075. if (keyedTranslator)
  1076. keyedTranslator->translate(actualFilter);
  1077. //MORE: This needs to be passed to this function - either in the meta or another parameter
  1078. unsigned __int64 remoteLimit = 0;
  1079. //MORE: Need to serialize the translation mode..
  1080. Owned<IRemoteFileIO> remoteFileIO = createRemoteFilteredFile(ep, localPath, actualDiskMeta, mapping->queryProjectedMeta(), actualFilter, compressed, grouped, remoteLimit);
  1081. if (remoteFileIO)
  1082. {
  1083. StringBuffer tmp;
  1084. remoteFileIO->addVirtualFieldMapping("logicalFilename", _logicalFilename);
  1085. remoteFileIO->addVirtualFieldMapping("baseFpos", tmp.clear().append(_baseOffset).str());
  1086. remoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(_partNumber).str());
  1087. try
  1088. {
  1089. remoteFileIO->ensureAvailable(); // force open now, because want to failover to other copies or legacy if fails
  1090. }
  1091. catch (IException *e)
  1092. {
  1093. #ifdef _DEBUG
  1094. EXCLOG(e, nullptr);
  1095. #endif
  1096. e->Release();
  1097. return false;
  1098. }
  1099. Owned<IFile> iFile = createIFile(rfilename);
  1100. // remote side does projection/translation/filtering
  1101. inputfileio.setown(remoteFileIO.getClear());
  1102. if (!inputfileio)
  1103. return false;
  1104. }
  1105. //MORE: Allow a previously created input stream to be reused to avoid reallocating the buffer
  1106. inputStream.setown(createFileSerialStream(inputfileio, 0, (offset_t)-1, readBufferSize));
  1107. inputBuffer.setStream(inputStream);
  1108. eogPending = false;
  1109. return true;
  1110. }
  1111. bool RemoteDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
  1112. {
  1113. throwUnexpected();
  1114. }
  1115. template <class PROCESS>
  1116. const void *RemoteDiskRowReader::inlineNextRow(PROCESS processor)
  1117. {
  1118. for (;;)
  1119. {
  1120. //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
  1121. //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
  1122. //Multiple eogs should also be harmless if the engines switch to this representation.
  1123. if (eogPending)
  1124. {
  1125. eogPending = false;
  1126. return eogRow;
  1127. }
  1128. inputBuffer.finishedRow();
  1129. if (inputBuffer.eos())
  1130. return eofRow;
  1131. //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
  1132. //Is there a better way storing this (and combining it with the eog markers)?
  1133. if (stranded)
  1134. {
  1135. bool eosPending;
  1136. inputBuffer.read(eosPending);
  1137. if (eosPending)
  1138. return eosRow;
  1139. //Call finishRow() so it is not included in the row pointer. This should be special cased in the base class
  1140. inputBuffer.finishedRow();
  1141. if (inputBuffer.eos())
  1142. return eofRow;
  1143. }
  1144. projectedRowPrefetcher->readAhead(inputBuffer);
  1145. size32_t sizeRead = inputBuffer.queryRowSize();
  1146. if (grouped)
  1147. inputBuffer.read(eogPending);
  1148. const byte * next = inputBuffer.queryRow();
  1149. return processor(sizeRead, next);
  1150. }
  1151. }
  1152. //Implementation of IAllocRowStream
  1153. const void *RemoteDiskRowReader::nextRow()
  1154. {
  1155. return inlineNextRow(
  1156. [this](size32_t sizeRead, const byte * next)
  1157. {
  1158. size32_t allocatedSize;
  1159. void * result = outputAllocator->createRow(sizeRead, allocatedSize);
  1160. memcpy(result, next, sizeRead);
  1161. return outputAllocator->finalizeRow(sizeRead, result, allocatedSize);
  1162. }
  1163. );
  1164. }
  1165. //Similar to above, except the code at the end will translate to a local buffer or return the pointer
  1166. const void *RemoteDiskRowReader::nextRow(size32_t & resultSize)
  1167. {
  1168. return inlineNextRow(
  1169. [this,&resultSize](size32_t sizeRead, const byte * next)
  1170. {
  1171. resultSize = sizeRead;
  1172. return next;
  1173. }
  1174. );
  1175. }
  1176. //Experimental use of lambdas to common up a few function definitions.
  1177. const void *RemoteDiskRowReader::nextRow(MemoryBufferBuilder & builder)
  1178. {
  1179. return inlineNextRow(
  1180. [this,&builder](size32_t sizeRead, const byte * next)
  1181. {
  1182. builder.appendBytes(sizeRead, next);
  1183. return (const void *)(builder.getSelf() - sizeRead);
  1184. }
  1185. );
  1186. }
  1187. bool RemoteDiskRowReader::getCursor(MemoryBuffer & cursor)
  1188. {
  1189. throwUnexpected();
  1190. return false;
  1191. }
  1192. void RemoteDiskRowReader::setCursor(MemoryBuffer & cursor)
  1193. {
  1194. throwUnexpected();
  1195. }
  1196. void RemoteDiskRowReader::stop()
  1197. {
  1198. }
  1199. ///---------------------------------------------------------------------------------------------------------------------
  1200. IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping)
  1201. {
  1202. if (strieq(format, "flat"))
  1203. return new BinaryDiskRowReader(_mapping);
  1204. if (strieq(format, "csv"))
  1205. return new CsvDiskRowReader(_mapping);
  1206. UNIMPLEMENTED;
  1207. }
  1208. //3 possible cases
  1209. // no filter
  1210. // filter can be performed on the projected output
  1211. // filter can only be performed on expected -> need to project to expected as a temporary row
  1212. IDiskRowReader * createLocalDiskReader(const char * format, IDiskReadMapping * mapping)
  1213. {
  1214. Owned<IDiskRowReader> directReader = doCreateLocalDiskReader(format, mapping);
  1215. if (mapping->expectedMatchesProjected() || strieq(format, "flat"))
  1216. return directReader.getClear();
  1217. Owned<IDiskReadMapping> expectedMapping = createUnprojectedMapping(mapping);
  1218. Owned<IDiskRowReader> expectedReader = doCreateLocalDiskReader(format, expectedMapping);
  1219. return new AlternativeDiskRowReader(directReader, expectedReader, mapping);
  1220. }
  1221. IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * _mapping)
  1222. {
  1223. return new RemoteDiskRowReader(format, _mapping);
  1224. }
  1225. IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * _mapping)
  1226. {
  1227. if (streamRemote)
  1228. return createRemoteDiskReader(format, _mapping);
  1229. else
  1230. return createLocalDiskReader(format, _mapping);
  1231. }
  1232. /*
  1233. Aims:
  1234. - Avoid creating multiple translators for mappings from one format to another - especially subfiles.
  1235. (Since cost of creating the mapping may be quite high.)
  1236. - Persist translators from query instance to query instance in roxie.
  1237. - Possibly share dynamic meta information between queries (e.g., same file used more than once).
  1238. (since cost and size of creating the informaion isn't trivial - and may have knock on effects to
  1239. allow more translators to be reused.)
  1240. - Share disk readers within an activity for all subfiles that have the same format
  1241. (Creating stream readers and other internal allocations can be relatively expensive).
  1242. - Reuse disk readers for calls to a child query. Similar reasons to sharing within an activity.
  1243. - It is assumed that projected is always a strict subset of expected
  1244. Complications
  1245. - IOutputMetaData cannot be shared between queries in roxie because the dll may be unloaded.
  1246. - Some filters cannot be converted from expected to actual.
  1247. csv - no filters can be converted
  1248. field mapping - if a field being filtered does not have a 1:1 mapping
  1249. This conflicts with wanting to reuse as much as possible from time to time - e.g. between subqueries
  1250. but the filter might be possible to apply to the projected if all fields are present.
  1251. Solutions:
  1252. - Add a flag to IOutputMetaData to indicate the field information is dynamic (and not dependent on a dll)
  1253. - If a filter cannot be translated, first project to expected, and then from expected to projected
  1254. - Allow query<X>RowStream to return different pointers after setInputFile() is called.
  1255. - setFilter() could be implemented as a separate call - it would avoid re-translating for subfiles, but may
  1256. be slightly tricky to track whether it has been called.
  1257. */