|
@@ -10429,20 +10429,22 @@ void CHThorExternalActivity::stop()
|
|
|
|
|
|
//=====================================================================================================
|
|
|
|
|
|
-CHThorNewDiskReadBaseActivity::CHThorNewDiskReadBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+CHThorNewDiskReadBaseActivity::CHThorNewDiskReadBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), segHelper(_segHelper)
|
|
|
{
|
|
|
helper.setCallback(this);
|
|
|
expectedDiskMeta = helper.queryDiskRecordSize();
|
|
|
projectedDiskMeta = helper.queryProjectedDiskRecordSize();
|
|
|
- readerOptions.setown(createPTree());
|
|
|
+ formatOptions.setown(createPTree());
|
|
|
if (_node)
|
|
|
{
|
|
|
const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
|
|
|
if (recordTranslationModeHintText)
|
|
|
recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
|
|
|
}
|
|
|
- readerOptions->setPropInt("translationMode", (int)getLayoutTranslationMode());
|
|
|
+
|
|
|
+ PropertyTreeXmlWriter writer(formatOptions);
|
|
|
+ helper.getFormatOptions(writer);
|
|
|
}
|
|
|
|
|
|
CHThorNewDiskReadBaseActivity::~CHThorNewDiskReadBaseActivity()
|
|
@@ -10490,7 +10492,7 @@ void CHThorNewDiskReadBaseActivity::resolveFile()
|
|
|
{
|
|
|
//If in a child query, and the filenames haven't changed, the information about the resolved filenames will also not have changed
|
|
|
//MORE: Is this ever untrue?
|
|
|
- if (subfiles && !(helper.getFlags() & TDXvarfilename))
|
|
|
+ if (subfiles && !(helper.getFlags() & (TDXvarfilename|TDRdynformatoptions)))
|
|
|
return;
|
|
|
|
|
|
//Only clear these members if we are re-resolving the file - otherwise the previous entries are still valid
|
|
@@ -10499,6 +10501,16 @@ void CHThorNewDiskReadBaseActivity::resolveFile()
|
|
|
dfsParts.clear();
|
|
|
subfiles.kill();
|
|
|
|
|
|
+ Owned<IPropertyTree> curFormatOptions;
|
|
|
+ if (helper.getFlags() & TDRdynformatoptions)
|
|
|
+ {
|
|
|
+ curFormatOptions.setown(createPTreeFromIPT(formatOptions));
|
|
|
+ PropertyTreeXmlWriter writer(curFormatOptions);
|
|
|
+ helper.getFormatDynOptions(writer);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ curFormatOptions.set(formatOptions);
|
|
|
+
|
|
|
OwnedRoxieString fileName(helper.getFileName());
|
|
|
mangleHelperFileName(mangledHelperFileName, fileName, agent.queryWuid(), helper.getFlags());
|
|
|
if (helper.getFlags() & (TDXtemporary | TDXjobtemp))
|
|
@@ -10508,7 +10520,7 @@ void CHThorNewDiskReadBaseActivity::resolveFile()
|
|
|
tempFileName.set(agent.queryTemporaryFile(mangledFilename.str()));
|
|
|
logicalFileName = tempFileName.str();
|
|
|
gatherInfo(NULL);
|
|
|
- subfiles.append(*extractFileInformation(nullptr));
|
|
|
+ subfiles.append(*extractFileInformation(nullptr, curFormatOptions));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -10535,22 +10547,22 @@ void CHThorNewDiskReadBaseActivity::resolveFile()
|
|
|
for (; s<numsubs; s++)
|
|
|
{
|
|
|
IDistributedFile &subfile = super->querySubFile(s, true);
|
|
|
- subfiles.append(*extractFileInformation(&subfile));
|
|
|
+ subfiles.append(*extractFileInformation(&subfile, curFormatOptions));
|
|
|
}
|
|
|
assertex(fdesc);
|
|
|
superfile.set(fdesc->querySuperFileDescriptor());
|
|
|
}
|
|
|
else
|
|
|
- subfiles.append(*extractFileInformation(dFile));
|
|
|
+ subfiles.append(*extractFileInformation(dFile, curFormatOptions));
|
|
|
|
|
|
if((helper.getFlags() & (TDXtemporary | TDXjobtemp)) == 0)
|
|
|
agent.logFileAccess(dFile, "HThor", "READ");
|
|
|
}
|
|
|
else
|
|
|
- subfiles.append(*extractFileInformation(nullptr));
|
|
|
+ subfiles.append(*extractFileInformation(nullptr, curFormatOptions));
|
|
|
}
|
|
|
else
|
|
|
- subfiles.append(*extractFileInformation(nullptr));
|
|
|
+ subfiles.append(*extractFileInformation(nullptr, curFormatOptions));
|
|
|
|
|
|
if (!ldFile)
|
|
|
{
|
|
@@ -10586,19 +10598,46 @@ void CHThorNewDiskReadBaseActivity::gatherInfo(IFileDescriptor * fileDesc)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-CHThorNewDiskReadBaseActivity::InputFileInfo * CHThorNewDiskReadBaseActivity::extractFileInformation(IDistributedFile * distributedFile)
|
|
|
+static void queryInheritProp(IPropertyTree & target, const char * targetName, IPropertyTree & source, const char * sourceName)
|
|
|
+{
|
|
|
+ if (source.hasProp(sourceName) && !target.hasProp(targetName))
|
|
|
+ target.setProp(targetName, source.queryProp(sourceName));
|
|
|
+}
|
|
|
+
|
|
|
+static void queryInheritSeparatorProp(IPropertyTree & target, const char * targetName, IPropertyTree & source, const char * sourceName)
|
|
|
+{
|
|
|
+ //Legacy - commas are quoted if they occur in a separator list, so need to remove the leading backslashes
|
|
|
+ if (source.hasProp(sourceName) && !target.hasProp(targetName))
|
|
|
+ {
|
|
|
+ StringBuffer unquoted;
|
|
|
+ const char * text = source.queryProp(sourceName);
|
|
|
+ while (*text)
|
|
|
+ {
|
|
|
+ if ((text[0] == '\\') && (text[1] == ','))
|
|
|
+ text++;
|
|
|
+ unquoted.append(*text++);
|
|
|
+ }
|
|
|
+ target.setProp(targetName, unquoted);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+CHThorNewDiskReadBaseActivity::InputFileInfo * CHThorNewDiskReadBaseActivity::extractFileInformation(IDistributedFile * distributedFile, const IPropertyTree * curFormatOptions)
|
|
|
{
|
|
|
Owned<IPropertyTree> meta = createPTree();
|
|
|
unsigned actualCrc = helper.getDiskFormatCrc();
|
|
|
Linked<IOutputMetaData> actualDiskMeta = expectedDiskMeta;
|
|
|
+ Linked<const IPropertyTree> fileFormatOptions = curFormatOptions;
|
|
|
bool compressed = false;
|
|
|
bool blockcompressed = false;
|
|
|
+ const char * readFormat = helper.queryFormat();
|
|
|
|
|
|
if (distributedFile)
|
|
|
{
|
|
|
const char *kind = queryFileKind(distributedFile);
|
|
|
//Do not use the field translation if the file was originally csv/xml - unless explicitly set
|
|
|
- if (strisame(kind, "flat") || (RecordTranslationMode::AlwaysDisk == getLayoutTranslationMode()))
|
|
|
+ if ((strisame(kind, "flat") || (RecordTranslationMode::AlwaysDisk == getLayoutTranslationMode())) &&
|
|
|
+// (strisame(readFormat, "flat") || strisame(kind, readFormat)))
|
|
|
+ (strisame(readFormat, "flat"))) // Not sure about this - only allow fixed source format if reading as flat
|
|
|
{
|
|
|
//Yuk this will be horrible - it needs to cache it for each distributed file
|
|
|
//and also common them up if they are the same.
|
|
@@ -10626,6 +10665,18 @@ CHThorNewDiskReadBaseActivity::InputFileInfo * CHThorNewDiskReadBaseActivity::ex
|
|
|
blockcompressed = true;
|
|
|
compressed = true;
|
|
|
}
|
|
|
+
|
|
|
+ //MORE: There should probably be a generic way of storing and extracting format options for a file
|
|
|
+ IPropertyTree & options = distributedFile->queryAttributes();
|
|
|
+ Linked<IPropertyTree> tempOptions = createPTreeFromIPT(fileFormatOptions);
|
|
|
+ queryInheritProp(*tempOptions, "quote", options, "@csvQuote");
|
|
|
+ queryInheritSeparatorProp(*tempOptions, "separator", options, "@csvSeparate");
|
|
|
+ queryInheritProp(*tempOptions, "terminator", options, "@csvTerminate");
|
|
|
+ queryInheritProp(*tempOptions, "escape", options, "@csvEscape");
|
|
|
+ dbglogXML(fileFormatOptions);
|
|
|
+ dbglogXML(tempOptions);
|
|
|
+ if (!areMatchingPTrees(fileFormatOptions, tempOptions))
|
|
|
+ fileFormatOptions.setown(tempOptions.getClear());
|
|
|
}
|
|
|
|
|
|
meta->setPropBool("grouped", grouped);
|
|
@@ -10635,7 +10686,8 @@ CHThorNewDiskReadBaseActivity::InputFileInfo * CHThorNewDiskReadBaseActivity::ex
|
|
|
|
|
|
InputFileInfo & target = * new InputFileInfo;
|
|
|
target.file = distributedFile;
|
|
|
- target.meta.swap(meta);
|
|
|
+ target.formatOptions.swap(fileFormatOptions);
|
|
|
+ target.meta.setown(meta.getClear());
|
|
|
target.actualCrc = actualCrc;
|
|
|
target.actualMeta.swap(actualDiskMeta);
|
|
|
return ⌖
|
|
@@ -10759,10 +10811,7 @@ bool CHThorNewDiskReadBaseActivity::openNextPart(bool prevWasMissing)
|
|
|
void CHThorNewDiskReadBaseActivity::initStream(IDiskRowReader * reader, const char * filename)
|
|
|
{
|
|
|
activeReader = reader;
|
|
|
- if (useRawStream)
|
|
|
- rawRowStream = reader->queryRawRowStream();
|
|
|
- else
|
|
|
- roxieRowStream = reader->queryAllocatedRowStream(rowAllocator);
|
|
|
+ inputRowStream = reader->queryAllocatedRowStream(rowAllocator);
|
|
|
|
|
|
StringBuffer report("Reading file ");
|
|
|
report.append(filename);
|
|
@@ -10771,34 +10820,33 @@ void CHThorNewDiskReadBaseActivity::initStream(IDiskRowReader * reader, const ch
|
|
|
|
|
|
void CHThorNewDiskReadBaseActivity::setEmptyStream()
|
|
|
{
|
|
|
- if (useRawStream)
|
|
|
- rawRowStream = queryNullRawRowStream();
|
|
|
- else
|
|
|
- roxieRowStream = queryNullAllocatedRowStream();
|
|
|
+ inputRowStream = queryNullDiskRowStream();
|
|
|
finishedParts = true;
|
|
|
}
|
|
|
|
|
|
-IDiskRowReader * CHThorNewDiskReadBaseActivity::ensureRowReader(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, IPropertyTree * options)
|
|
|
+IDiskRowReader * CHThorNewDiskReadBaseActivity::ensureRowReader(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options)
|
|
|
{
|
|
|
+ Owned<IDiskReadMapping> mapping = createDiskReadMapping(getLayoutTranslationMode(), format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);
|
|
|
+
|
|
|
ForEachItemIn(i, readers)
|
|
|
{
|
|
|
IDiskRowReader & cur = readers.item(i);
|
|
|
- if (cur.matches(format, streamRemote, expectedCrc, expected, projectedCrc, projected, actualCrc, actual, options))
|
|
|
+ if (cur.matches(format, streamRemote, mapping))
|
|
|
return &cur;
|
|
|
}
|
|
|
- IDiskRowReader * reader = createDiskReader(format, streamRemote, expectedCrc, expected, projectedCrc, projected, actualCrc, actual, options);
|
|
|
+ IDiskRowReader * reader = createDiskReader(format, streamRemote, mapping);
|
|
|
readers.append(*reader);
|
|
|
return reader;
|
|
|
}
|
|
|
|
|
|
bool CHThorNewDiskReadBaseActivity::openFilePart(const char * filename)
|
|
|
{
|
|
|
- const char * format = "thor"; // more - should extract from the current file (could even mix flat and csv...)
|
|
|
+ const char * format = helper.queryFormat(); // more - should extract from the current file (could even mix flat and csv...)
|
|
|
InputFileInfo * fileInfo = &subfiles.item(0);
|
|
|
|
|
|
unsigned expectedCrc = helper.getDiskFormatCrc();
|
|
|
unsigned projectedCrc = helper.getProjectedFormatCrc();
|
|
|
- IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, expectedCrc, *expectedDiskMeta, readerOptions);
|
|
|
+ IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, expectedCrc, *expectedDiskMeta, fileInfo->formatOptions);
|
|
|
if (reader->setInputFile(filename, logicalFileName, 0, offsetOfPart, fileInfo->meta, fieldFilters))
|
|
|
{
|
|
|
initStream(reader, filename);
|
|
@@ -10809,6 +10857,7 @@ bool CHThorNewDiskReadBaseActivity::openFilePart(const char * filename)
|
|
|
|
|
|
bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * localFile, IDistributedFilePart * filePart, unsigned whichPart)
|
|
|
{
|
|
|
+ IDistributedFile * distributedFile = localFile->queryDistributedFile();
|
|
|
InputFileInfo * fileInfo = &subfiles.item(0);
|
|
|
if (superfile && filePart)
|
|
|
{
|
|
@@ -10817,7 +10866,7 @@ bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * local
|
|
|
if (superfile->mapSubPart(partNum, subfile, lnum))
|
|
|
{
|
|
|
fileInfo = &subfiles.item(subfile);
|
|
|
- IDistributedFile * distributedFile = fileInfo->file;
|
|
|
+ distributedFile = fileInfo->file;
|
|
|
logicalFileName = distributedFile->queryLogicalName();
|
|
|
}
|
|
|
}
|
|
@@ -10830,12 +10879,13 @@ bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * local
|
|
|
bool tryRemoteStream = actualDiskMeta->queryTypeInfo()->canInterpret() && actualDiskMeta->queryTypeInfo()->canSerialize() &&
|
|
|
projectedDiskMeta->queryTypeInfo()->canInterpret() && projectedDiskMeta->queryTypeInfo()->canSerialize();
|
|
|
|
|
|
+
|
|
|
/*
|
|
|
* If a file part can be accessed local, then read it locally
|
|
|
* If a file part supports a remote stream, then use that
|
|
|
* Otherwise failover to the legacy remote access.
|
|
|
*/
|
|
|
- const char * format = "thor"; // more - should extract from the current file (could even mix flat and csv...)
|
|
|
+ const char * format = helper.queryFormat(); // more - should extract from the current file (could even mix flat and csv...)
|
|
|
Owned<IException> saveOpenExc;
|
|
|
StringBuffer filename, filenamelist;
|
|
|
std::vector<unsigned> remoteCandidates;
|
|
@@ -10851,8 +10901,8 @@ bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * local
|
|
|
{
|
|
|
StringBuffer path;
|
|
|
rfn.getPath(path);
|
|
|
- IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, readerOptions);
|
|
|
- if (reader->setInputFile(path.str(), logicalFileName, filePart->getPartIndex(), offsetOfPart, fileInfo->meta, fieldFilters))
|
|
|
+ IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, fileInfo->formatOptions);
|
|
|
+ if (reader->setInputFile(path.str(), logicalFileName, whichPart, offsetOfPart, fileInfo->meta, fieldFilters))
|
|
|
{
|
|
|
initStream(reader, path.str());
|
|
|
return true;
|
|
@@ -10874,8 +10924,8 @@ bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * local
|
|
|
filenamelist.append('\n').append(filename);
|
|
|
try
|
|
|
{
|
|
|
- IDiskRowReader * reader = ensureRowReader(format, tryRemoteStream, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, readerOptions);
|
|
|
- if (reader->setInputFile(rfilename, logicalFileName, filePart->getPartIndex(), offsetOfPart, fileInfo->meta, fieldFilters))
|
|
|
+ IDiskRowReader * reader = ensureRowReader(format, tryRemoteStream, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, fileInfo->formatOptions);
|
|
|
+ if (reader->setInputFile(rfilename, logicalFileName, whichPart, offsetOfPart, fileInfo->meta, fieldFilters))
|
|
|
{
|
|
|
initStream(reader, filename);
|
|
|
return true;
|
|
@@ -10956,7 +11006,7 @@ void CHThorNewDiskReadBaseActivity::append(FFoption option, const IFieldFilter *
|
|
|
|
|
|
//=====================================================================================================
|
|
|
|
|
|
-CHThorNewDiskReadActivity::CHThorNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
+CHThorNewDiskReadActivity::CHThorNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
|
|
|
: CHThorNewDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
|
|
|
{
|
|
|
needTransform = false;
|
|
@@ -11011,13 +11061,13 @@ const void *CHThorNewDiskReadActivity::nextRow()
|
|
|
|
|
|
try
|
|
|
{
|
|
|
- if (rawRowStream)
|
|
|
+ if (useRawStream)
|
|
|
{
|
|
|
for (;;)
|
|
|
{
|
|
|
//Returns a row in the serialized form of the projected format
|
|
|
size32_t nextSize;
|
|
|
- const byte * next = (const byte *)rawRowStream->nextRow(nextSize);
|
|
|
+ const byte * next = (const byte *)inputRowStream->nextRow(nextSize);
|
|
|
if (!isSpecialRow(next))
|
|
|
{
|
|
|
size32_t thisSize = 0;
|
|
@@ -11070,7 +11120,7 @@ const void *CHThorNewDiskReadActivity::nextRow()
|
|
|
//whether there was a limit, a transform etc., but unlikely to save more than a couple of boolean tests.
|
|
|
for (;;)
|
|
|
{
|
|
|
- const byte * next = (const byte *)roxieRowStream->nextRow();
|
|
|
+ const byte * next = (const byte *)inputRowStream->nextRow();
|
|
|
if (!isSpecialRow(next))
|
|
|
{
|
|
|
if (unlikely((processed - initialProcessed) >= limit))
|