|
@@ -34,6 +34,7 @@
|
|
|
#include "rtlcommon.hpp"
|
|
|
#include "thorcommon.hpp"
|
|
|
#include "csvsplitter.hpp"
|
|
|
+#include "thormeta.hpp"
|
|
|
|
|
|
//---------------------------------------------------------------------------------------------------------------------
|
|
|
|
|
@@ -44,8 +45,8 @@
|
|
|
class DiskReadMapping : public CInterfaceOf<IDiskReadMapping>
|
|
|
{
|
|
|
public:
|
|
|
- DiskReadMapping(RecordTranslationMode _mode, const char * _format, unsigned _actualCrc, IOutputMetaData & _actual, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _output, const IPropertyTree * _options)
|
|
|
- : mode(_mode), format(_format), actualCrc(_actualCrc), actualMeta(&_actual), expectedCrc(_expectedCrc), expectedMeta(&_expected), projectedCrc(_projectedCrc), projectedMeta(&_output), options(_options)
|
|
|
+ DiskReadMapping(RecordTranslationMode _mode, const char * _format, unsigned _actualCrc, IOutputMetaData & _actual, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _output, const IPropertyTree * _fileOptions)
|
|
|
+ : mode(_mode), format(_format), actualCrc(_actualCrc), actualMeta(&_actual), expectedCrc(_expectedCrc), expectedMeta(&_expected), projectedCrc(_projectedCrc), projectedMeta(&_output), fileOptions(_fileOptions)
|
|
|
{}
|
|
|
|
|
|
virtual const char * queryFormat() const override { return format; }
|
|
@@ -55,7 +56,7 @@ public:
|
|
|
virtual IOutputMetaData * queryActualMeta() const override { return actualMeta; }
|
|
|
virtual IOutputMetaData * queryExpectedMeta() const override{ return expectedMeta; }
|
|
|
virtual IOutputMetaData * queryProjectedMeta() const override{ return projectedMeta; }
|
|
|
- virtual const IPropertyTree * queryOptions() const override { return options; }
|
|
|
+ virtual const IPropertyTree * queryFileOptions() const override { return fileOptions; }
|
|
|
virtual RecordTranslationMode queryTranslationMode() const override { return mode; }
|
|
|
|
|
|
virtual const IDynamicTransform * queryTranslator() const override
|
|
@@ -71,11 +72,18 @@ public:
|
|
|
|
|
|
virtual bool matches(const IDiskReadMapping * other) const
|
|
|
{
|
|
|
- return mode == other->queryTranslationMode() && streq(format, other->queryFormat()) &&
|
|
|
- ((actualCrc && actualCrc == other->getActualCrc()) || (actualMeta == other->queryActualMeta())) &&
|
|
|
- ((expectedCrc && expectedCrc == other->getExpectedCrc()) || (expectedMeta == other->queryExpectedMeta())) &&
|
|
|
- ((projectedCrc && projectedCrc == other->getProjectedCrc()) || (projectedMeta == other->queryProjectedMeta())) &&
|
|
|
- areMatchingPTrees(options, other->queryOptions());
|
|
|
+ if ((mode != other->queryTranslationMode()) || !streq(format, other->queryFormat()))
|
|
|
+ return false;
|
|
|
+ //if crc is set, then a matching crc counts as a match, otherwise meta must be identical
|
|
|
+ if (((actualCrc && actualCrc == other->getActualCrc()) || (actualMeta == other->queryActualMeta())) &&
|
|
|
+ ((expectedCrc && expectedCrc == other->getExpectedCrc()) || (expectedMeta == other->queryExpectedMeta())) &&
|
|
|
+ ((projectedCrc && projectedCrc == other->getProjectedCrc()) || (projectedMeta == other->queryProjectedMeta())))
|
|
|
+ {
|
|
|
+ if (!areMatchingPTrees(fileOptions->queryPropTree("formatOptions"), other->queryFileOptions()->queryPropTree("formatOptions")))
|
|
|
+ return false;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
virtual bool expectedMatchesProjected() const
|
|
@@ -96,7 +104,7 @@ protected:
|
|
|
Linked<IOutputMetaData> actualMeta;
|
|
|
Linked<IOutputMetaData> expectedMeta;
|
|
|
Linked<IOutputMetaData> projectedMeta;
|
|
|
- Linked<const IPropertyTree> options;
|
|
|
+ Linked<const IPropertyTree> fileOptions;
|
|
|
mutable Owned<const IDynamicTransform> translator;
|
|
|
mutable Owned<const IKeyTranslator> keyedTranslator;
|
|
|
mutable SpinLock translatorLock; // use a spin lock since almost certainly not going to contend
|
|
@@ -135,7 +143,8 @@ void DiskReadMapping::ensureTranslators() const
|
|
|
const RtlRecord & sourceRecord = sourceMeta->queryRecordAccessor(true);
|
|
|
if (strsame(format, "csv"))
|
|
|
{
|
|
|
- type_vals format = options->hasProp("ascii") ? type_string : type_utf8;
|
|
|
+ const IPropertyTree * formatOptions = fileOptions->queryPropTree("formatOptions");
|
|
|
+ type_vals format = formatOptions->hasProp("ascii") ? type_string : type_utf8;
|
|
|
translator.setown(createRecordTranslatorViaCallback(projectedRecord, sourceRecord, format));
|
|
|
}
|
|
|
else if (strsame(format, "xml"))
|
|
@@ -145,7 +154,14 @@ void DiskReadMapping::ensureTranslators() const
|
|
|
else
|
|
|
{
|
|
|
if ((projectedMeta != sourceMeta) && (projectedCrc != sourceCrc))
|
|
|
- translator.setown(createRecordTranslator(projectedRecord, sourceRecord));
|
|
|
+ {
|
|
|
+ //Special case the situation where the output record matches the input record with some virtual fields
|
|
|
+ //appended. This allows alien datatypes or ifblocks in records to also hav virtual file positions/
|
|
|
+ if (fileOptions->getPropBool("@cloneAppendVirtuals"))
|
|
|
+ translator.setown(createCloneVirtualRecordTranslator(projectedRecord, *sourceMeta));
|
|
|
+ else
|
|
|
+ translator.setown(createRecordTranslator(projectedRecord, sourceRecord));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (translator)
|
|
@@ -173,16 +189,16 @@ void DiskReadMapping::ensureTranslators() const
|
|
|
checkedTranslators = true;
|
|
|
}
|
|
|
|
|
|
-THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mode, const char * format, unsigned actualCrc, IOutputMetaData & actual, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, const IPropertyTree * options)
|
|
|
+THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mode, const char * format, unsigned actualCrc, IOutputMetaData & actual, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, const IPropertyTree * fileOptions)
|
|
|
{
|
|
|
assertex(expectedCrc);
|
|
|
- assertex(options);
|
|
|
- return new DiskReadMapping(mode, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);
|
|
|
+ assertex(fileOptions);
|
|
|
+ return new DiskReadMapping(mode, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, fileOptions);
|
|
|
}
|
|
|
|
|
|
THORHELPER_API IDiskReadMapping * createUnprojectedMapping(IDiskReadMapping * mapping)
|
|
|
{
|
|
|
- return createDiskReadMapping(mapping->queryTranslationMode(), mapping->queryFormat(), mapping->getActualCrc(), *mapping->queryActualMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->queryOptions());
|
|
|
+ return createDiskReadMapping(mapping->queryTranslationMode(), mapping->queryFormat(), mapping->getActualCrc(), *mapping->queryActualMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->getExpectedCrc(), *mapping->queryExpectedMeta(), mapping->queryFileOptions());
|
|
|
}
|
|
|
|
|
|
|
|
@@ -250,7 +266,7 @@ DiskRowReader::DiskRowReader(IDiskReadMapping * _mapping)
|
|
|
//Options contain information that is the same for each file that is being read, and potentially expensive to reconfigure.
|
|
|
translator = mapping->queryTranslator();
|
|
|
keyedTranslator = mapping->queryKeyedTranslator();
|
|
|
- const IPropertyTree * options = mapping->queryOptions();
|
|
|
+ const IPropertyTree * options = mapping->queryFileOptions();
|
|
|
if (options->hasProp("encryptionKey"))
|
|
|
{
|
|
|
encryptionKey.resetBuffer();
|
|
@@ -283,7 +299,7 @@ bool DiskRowReader::matches(const char * format, bool streamRemote, IDiskReadMap
|
|
|
//if ((expectedDiskMeta != &_expected) || (projectedDiskMeta != &_projected) || (actualDiskMeta != &_actual))
|
|
|
// return false;
|
|
|
|
|
|
- const IPropertyTree * options = otherMapping->queryOptions();
|
|
|
+ const IPropertyTree * options = otherMapping->queryFileOptions();
|
|
|
if (options->hasProp("encryptionKey"))
|
|
|
{
|
|
|
MemoryBuffer tempEncryptionKey;
|
|
@@ -343,11 +359,12 @@ public:
|
|
|
LocalDiskRowReader(IDiskReadMapping * _mapping);
|
|
|
|
|
|
virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
|
|
|
- virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
- virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override;
|
|
|
|
|
|
protected:
|
|
|
- virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter);
|
|
|
+ virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter);
|
|
|
virtual bool isBinary() const = 0;
|
|
|
|
|
|
protected:
|
|
@@ -370,13 +387,13 @@ bool LocalDiskRowReader::matches(const char * format, bool streamRemote, IDiskRe
|
|
|
}
|
|
|
|
|
|
|
|
|
-bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter)
|
|
|
+bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputMeta, const FieldFilterArray & _expectedFilter)
|
|
|
{
|
|
|
- assertex(meta);
|
|
|
- grouped = meta->getPropBool("grouped");
|
|
|
- compressed = meta->getPropBool("compressed", false);
|
|
|
- blockcompressed = meta->getPropBool("blockCompressed", false);
|
|
|
- bool forceCompressed = meta->getPropBool("forceCompressed", false);
|
|
|
+ assertex(inputMeta);
|
|
|
+ grouped = inputMeta->getPropBool("@grouped");
|
|
|
+ compressed = inputMeta->getPropBool("@compressed", false);
|
|
|
+ blockcompressed = inputMeta->getPropBool("@blockCompressed", false);
|
|
|
+ bool forceCompressed = inputMeta->getPropBool("@forceCompressed", false);
|
|
|
|
|
|
logicalFilename.set(_logicalFilename);
|
|
|
filePart = _partNumber;
|
|
@@ -396,7 +413,7 @@ bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFi
|
|
|
|
|
|
if (isBinary())
|
|
|
{
|
|
|
- size32_t dfsRecordSize = meta->getPropInt("dfsRecordSize");
|
|
|
+ size32_t dfsRecordSize = inputMeta->getPropInt("@recordSize");
|
|
|
size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
|
|
|
if (dfsRecordSize)
|
|
|
{
|
|
@@ -447,9 +464,15 @@ bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFi
|
|
|
if (!inputfileio)
|
|
|
return false;
|
|
|
|
|
|
- unsigned __int64 filesize = inputfileio->size();
|
|
|
+ if (length == unknownFileSize)
|
|
|
+ {
|
|
|
+ offset_t filesize = inputfileio->size();
|
|
|
+ assertex(startOffset <= filesize);
|
|
|
+ length = filesize - startOffset;
|
|
|
+ }
|
|
|
+
|
|
|
//MORE: Allow a previously created input stream to be reused to avoid reallocating the buffer
|
|
|
- inputStream.setown(createFileSerialStream(inputfileio, 0, filesize, readBufferSize));
|
|
|
+ inputStream.setown(createFileSerialStream(inputfileio, startOffset, length, readBufferSize));
|
|
|
|
|
|
expectedFilter.clear();
|
|
|
ForEachItemIn(i, _expectedFilter)
|
|
@@ -457,18 +480,32 @@ bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFi
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-bool LocalDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+bool LocalDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
|
|
|
{
|
|
|
Owned<IFile> inputFile = createIFile(localFilename);
|
|
|
- return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
|
|
|
+ return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, 0, unknownFileSize, inputOptions, expectedFilter);
|
|
|
}
|
|
|
|
|
|
-bool LocalDiskRowReader::setInputFile(const RemoteFilename & filename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+bool LocalDiskRowReader::setInputFile(const RemoteFilename & filename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
|
|
|
{
|
|
|
Owned<IFile> inputFile = createIFile(filename);
|
|
|
- return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
|
|
|
+ return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, 0, unknownFileSize, inputOptions, expectedFilter);
|
|
|
}
|
|
|
|
|
|
+bool LocalDiskRowReader::setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy)
|
|
|
+{
|
|
|
+ const char * logicalFilename = slice.queryLogicalFilename();
|
|
|
+ offset_t baseOffset = slice.queryOffsetOfPart();
|
|
|
+
|
|
|
+ StringBuffer url;
|
|
|
+ slice.getURL(url, copy);
|
|
|
+ Owned<IFile> inputFile = createIFile(url);
|
|
|
+
|
|
|
+ //MORE: These need to be passed on to the input reader
|
|
|
+ offset_t startOffset = slice.queryStartOffset();
|
|
|
+ offset_t length = slice.queryLength();
|
|
|
+ return setInputFile(inputFile, logicalFilename, slice.queryPartNumber(), baseOffset, startOffset, length, slice.queryFileMeta(), expectedFilter);
|
|
|
+}
|
|
|
|
|
|
|
|
|
//---------------------------------------------------------------------------------------------------------------------
|
|
@@ -492,7 +529,7 @@ public:
|
|
|
virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
|
|
|
|
|
|
protected:
|
|
|
- virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
virtual bool isBinary() const { return true; }
|
|
|
|
|
|
inline bool fieldFilterMatch(const void * buffer)
|
|
@@ -516,7 +553,7 @@ private:
|
|
|
inline const void * inlineNextRow(PROCESS processor) __attribute__((always_inline));
|
|
|
|
|
|
protected:
|
|
|
- ISourceRowPrefetcher * actualRowPrefetcher = nullptr;
|
|
|
+ Owned<ISourceRowPrefetcher> actualRowPrefetcher;
|
|
|
const RtlRecord * actualRecord = nullptr;
|
|
|
RowFilter actualFilter; // This refers to the actual disk layout
|
|
|
bool eogPending = false;
|
|
@@ -527,7 +564,7 @@ protected:
|
|
|
BinaryDiskRowReader::BinaryDiskRowReader(IDiskReadMapping * _mapping)
|
|
|
: LocalDiskRowReader(_mapping)
|
|
|
{
|
|
|
- actualRowPrefetcher = actualDiskMeta->createDiskPrefetcher();
|
|
|
+ actualRowPrefetcher.setown(actualDiskMeta->createDiskPrefetcher());
|
|
|
actualRecord = &actualDiskMeta->queryRecordAccessor(true);
|
|
|
needToTranslate = (translator && translator->needsTranslate());
|
|
|
}
|
|
@@ -546,9 +583,9 @@ bool BinaryDiskRowReader::matches(const char * format, bool streamRemote, IDiskR
|
|
|
return LocalDiskRowReader::matches(format, streamRemote, otherMapping);
|
|
|
}
|
|
|
|
|
|
-bool BinaryDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+bool BinaryDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
|
|
|
{
|
|
|
- if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter))
|
|
|
+ if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, startOffset, length, inputOptions, expectedFilter))
|
|
|
return false;
|
|
|
|
|
|
actualFilter.clear().appendFilters(expectedFilter);
|
|
@@ -737,9 +774,9 @@ public:
|
|
|
projectedRecord = &mapping->queryProjectedMeta()->queryRecordAccessor(true);
|
|
|
}
|
|
|
|
|
|
- virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter) override
|
|
|
+ virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & _expectedFilter) override
|
|
|
{
|
|
|
- if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, _expectedFilter))
|
|
|
+ if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, startOffset, length, inputOptions, _expectedFilter))
|
|
|
return false;
|
|
|
|
|
|
projectedFilter.clear().appendFilters(_expectedFilter);
|
|
@@ -836,9 +873,9 @@ public:
|
|
|
virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * otherMapping) override;
|
|
|
|
|
|
protected:
|
|
|
- virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
|
|
|
- void processOption(CSVSplitter::MatchItem element, const IPropertyTree & config, const char * option, const char * dft, const char * dft2 = nullptr);
|
|
|
+ void processOption(CSVSplitter::MatchItem element, const IPropertyTree & csvOptions, const char * option, const char * dft, const char * dft2 = nullptr);
|
|
|
|
|
|
inline bool fieldFilterMatchProjected(const void * buffer)
|
|
|
{
|
|
@@ -869,24 +906,25 @@ protected:
|
|
|
CsvDiskRowReader::CsvDiskRowReader(IDiskReadMapping * _mapping)
|
|
|
: ExternalFormatDiskRowReader(_mapping)
|
|
|
{
|
|
|
- const IPropertyTree & config = *mapping->queryOptions();
|
|
|
+ const IPropertyTree & fileOptions = *mapping->queryFileOptions();
|
|
|
+ const IPropertyTree & csvOptions = *fileOptions.queryPropTree("formatOptions");
|
|
|
|
|
|
- maxRowSize = config.getPropInt64("maxRowSize", defaultMaxCsvRowSizeMB) * 1024 * 1024;
|
|
|
- preserveWhitespace = config.getPropBool("preserveWhitespace", false);
|
|
|
- preserveWhitespace = config.getPropBool("notrim", preserveWhitespace);
|
|
|
+ maxRowSize = csvOptions.getPropInt64("maxRowSize", defaultMaxCsvRowSizeMB) * 1024 * 1024;
|
|
|
+ preserveWhitespace = csvOptions.getPropBool("preserveWhitespace", false);
|
|
|
+ preserveWhitespace = csvOptions.getPropBool("notrim", preserveWhitespace);
|
|
|
|
|
|
const RtlRecord * inputRecord = &mapping->queryActualMeta()->queryRecordAccessor(true);
|
|
|
unsigned numInputFields = inputRecord->getNumFields();
|
|
|
csvSplitter.init(numInputFields, maxRowSize, csvQuote, csvSeparate, csvTerminate, csvEscape, preserveWhitespace);
|
|
|
|
|
|
//MORE: How about options from the file? - test writing with some options and then reading without specifying them
|
|
|
- processOption(CSVSplitter::QUOTE, config, "quote", "\"");
|
|
|
- processOption(CSVSplitter::SEPARATOR, config, "separator", ",");
|
|
|
- processOption(CSVSplitter::TERMINATOR, config, "terminator", "\n", "\r\n");
|
|
|
- if (config.getProp("escape", csvEscape))
|
|
|
+ processOption(CSVSplitter::QUOTE, csvOptions, "quote", "\"");
|
|
|
+ processOption(CSVSplitter::SEPARATOR, csvOptions, "separator", ",");
|
|
|
+ processOption(CSVSplitter::TERMINATOR, csvOptions, "terminator", "\n", "\r\n");
|
|
|
+ if (csvOptions.getProp("escape", csvEscape))
|
|
|
csvSplitter.addEscape(csvEscape);
|
|
|
|
|
|
- headerLines = config.getPropInt64("heading");
|
|
|
+ headerLines = csvOptions.getPropInt64("heading");
|
|
|
fieldFetcher.setown(new CFieldFetcher(csvSplitter, numInputFields));
|
|
|
}
|
|
|
|
|
@@ -898,12 +936,12 @@ bool CsvDiskRowReader::matches(const char * format, bool streamRemote, IDiskRead
|
|
|
return ExternalFormatDiskRowReader::matches(format, streamRemote, otherMapping);
|
|
|
}
|
|
|
|
|
|
-void CsvDiskRowReader::processOption(CSVSplitter::MatchItem element, const IPropertyTree & config, const char * option, const char * dft, const char * dft2)
|
|
|
+void CsvDiskRowReader::processOption(CSVSplitter::MatchItem element, const IPropertyTree & csvOptions, const char * option, const char * dft, const char * dft2)
|
|
|
{
|
|
|
- if (config.hasProp(option))
|
|
|
+ if (csvOptions.hasProp(option))
|
|
|
{
|
|
|
- bool useAscii = mapping->queryOptions()->hasProp("ascii");
|
|
|
- Owned<IPropertyTreeIterator> iter = config.getElements(option);
|
|
|
+ bool useAscii = csvOptions.hasProp("ascii");
|
|
|
+ Owned<IPropertyTreeIterator> iter = csvOptions.getElements(option);
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
const char * value = iter->query().queryProp("");
|
|
@@ -926,9 +964,9 @@ void CsvDiskRowReader::processOption(CSVSplitter::MatchItem element, const IProp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-bool CsvDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter)
|
|
|
+bool CsvDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, offset_t startOffset, offset_t length, const IPropertyTree * inputOptions, const FieldFilterArray & _expectedFilter)
|
|
|
{
|
|
|
- if (!ExternalFormatDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, _expectedFilter))
|
|
|
+ if (!ExternalFormatDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, startOffset, length, inputOptions, _expectedFilter))
|
|
|
return false;
|
|
|
|
|
|
//Skip any header lines..
|
|
@@ -1048,7 +1086,7 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
|
|
|
MemoryBufferBuilder bufferBuilder;
|
|
|
RtlDynamicRowBuilder allocatedBuilder;
|
|
|
Linked<IEngineRowAllocator> outputAllocator;
|
|
|
- IDiskRowStream * rawInputStream;
|
|
|
+ IDiskRowStream * rawInputStream = nullptr;
|
|
|
public:
|
|
|
CompoundProjectRowReader(IDiskRowReader * _input, IDiskReadMapping * _mapping)
|
|
|
: inputReader(_input), mapping(_mapping), bufferBuilder(tempOutputBuffer, 0), allocatedBuilder(nullptr)
|
|
@@ -1059,27 +1097,27 @@ public:
|
|
|
}
|
|
|
IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IDiskRowStream>)
|
|
|
|
|
|
- virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
|
|
|
+ virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override
|
|
|
{
|
|
|
allocatedBuilder.setAllocator(_outputAllocator);
|
|
|
outputAllocator.set(_outputAllocator);
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
|
|
|
+ virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping) override
|
|
|
{
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- virtual void clearInput()
|
|
|
+ virtual void clearInput() override
|
|
|
{
|
|
|
inputReader->clearInput();
|
|
|
rawInputStream = nullptr;
|
|
|
}
|
|
|
|
|
|
- virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+ virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override
|
|
|
{
|
|
|
- if (inputReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, meta, expectedFilter))
|
|
|
+ if (inputReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, inputOptions, expectedFilter))
|
|
|
{
|
|
|
rawInputStream = inputReader->queryAllocatedRowStream(nullptr);
|
|
|
return true;
|
|
@@ -1087,9 +1125,19 @@ public:
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+ virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override
|
|
|
{
|
|
|
- if (inputReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, meta, expectedFilter))
|
|
|
+ if (inputReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, inputOptions, expectedFilter))
|
|
|
+ {
|
|
|
+ rawInputStream = inputReader->queryAllocatedRowStream(nullptr);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override
|
|
|
+ {
|
|
|
+ if (inputReader->setInputFile(slice, expectedFilter, copy))
|
|
|
{
|
|
|
rawInputStream = inputReader->queryAllocatedRowStream(nullptr);
|
|
|
return true;
|
|
@@ -1098,9 +1146,9 @@ public:
|
|
|
}
|
|
|
|
|
|
//interface IRowReader
|
|
|
- virtual bool getCursor(MemoryBuffer & cursor) { return rawInputStream->getCursor(cursor); }
|
|
|
- virtual void setCursor(MemoryBuffer & cursor) { rawInputStream->setCursor(cursor); }
|
|
|
- virtual void stop() { rawInputStream->stop(); }
|
|
|
+ virtual bool getCursor(MemoryBuffer & cursor) override { return rawInputStream->getCursor(cursor); }
|
|
|
+ virtual void setCursor(MemoryBuffer & cursor) override { rawInputStream->setCursor(cursor); }
|
|
|
+ virtual void stop() override { rawInputStream->stop(); }
|
|
|
|
|
|
virtual const void *nextRow(size32_t & resultSize) override
|
|
|
{
|
|
@@ -1158,13 +1206,13 @@ public:
|
|
|
compoundReader.setown(new CompoundProjectRowReader(expectedReader, mapping));
|
|
|
}
|
|
|
|
|
|
- virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
|
|
|
+ virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override
|
|
|
{
|
|
|
assertex(activeReader);
|
|
|
return activeReader->queryAllocatedRowStream(_outputAllocator);
|
|
|
}
|
|
|
|
|
|
- virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
|
|
|
+ virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping) override
|
|
|
{
|
|
|
return directReader->matches(_format, _streamRemote, _mapping);
|
|
|
}
|
|
@@ -1172,31 +1220,41 @@ public:
|
|
|
//Specify where the raw binary input for a particular file is coming from, together with its actual format.
|
|
|
//Does this make sense, or should it be passed a filename? an actual format?
|
|
|
//Needs to specify a filename rather than a ISerialStream so that the interface is consistent for local and remote
|
|
|
- virtual void clearInput()
|
|
|
+ virtual void clearInput() override
|
|
|
{
|
|
|
directReader->clearInput();
|
|
|
compoundReader->clearInput();
|
|
|
activeReader = nullptr;
|
|
|
}
|
|
|
|
|
|
- virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+ virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override
|
|
|
{
|
|
|
bool useProjected = canFilterDirectly(expectedFilter);
|
|
|
if (useProjected)
|
|
|
activeReader = directReader;
|
|
|
else
|
|
|
activeReader = compoundReader;
|
|
|
- return activeReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, meta, expectedFilter);
|
|
|
+ return activeReader->setInputFile(localFilename, logicalFilename, partNumber, baseOffset, inputOptions, expectedFilter);
|
|
|
}
|
|
|
|
|
|
- virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+ virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override
|
|
|
{
|
|
|
bool useProjected = canFilterDirectly(expectedFilter);
|
|
|
if (useProjected)
|
|
|
activeReader = directReader;
|
|
|
else
|
|
|
activeReader = compoundReader;
|
|
|
- return activeReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, meta, expectedFilter);
|
|
|
+ return activeReader->setInputFile(filename, logicalFilename, partNumber, baseOffset, inputOptions, expectedFilter);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override
|
|
|
+ {
|
|
|
+ bool useProjected = canFilterDirectly(expectedFilter);
|
|
|
+ if (useProjected)
|
|
|
+ activeReader = directReader;
|
|
|
+ else
|
|
|
+ activeReader = compoundReader;
|
|
|
+ return activeReader->setInputFile(slice, expectedFilter, copy);
|
|
|
}
|
|
|
|
|
|
protected:
|
|
@@ -1238,8 +1296,9 @@ public:
|
|
|
virtual bool matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping) override;
|
|
|
|
|
|
// IDiskRowReader
|
|
|
- virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
- virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) override;
|
|
|
+ virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override;
|
|
|
|
|
|
private:
|
|
|
template <class PROCESS>
|
|
@@ -1275,7 +1334,7 @@ bool RemoteDiskRowReader::matches(const char * _format, bool _streamRemote, IDis
|
|
|
return DiskRowReader::matches(_format, _streamRemote, _mapping);
|
|
|
}
|
|
|
|
|
|
-bool RemoteDiskRowReader::setInputFile(const RemoteFilename & rfilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilters)
|
|
|
+bool RemoteDiskRowReader::setInputFile(const RemoteFilename & rfilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilters)
|
|
|
{
|
|
|
// NB: only binary handles can be remotely processed by dafilesrv at the moment
|
|
|
|
|
@@ -1332,11 +1391,15 @@ bool RemoteDiskRowReader::setInputFile(const RemoteFilename & rfilename, const c
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-bool RemoteDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
|
|
|
+bool RemoteDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
|
|
|
{
|
|
|
throwUnexpected();
|
|
|
}
|
|
|
|
|
|
+bool RemoteDiskRowReader::setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy)
|
|
|
+{
|
|
|
+ UNIMPLEMENTED;
|
|
|
+}
|
|
|
|
|
|
template <class PROCESS>
|
|
|
const void *RemoteDiskRowReader::inlineNextRow(PROCESS processor)
|