|
@@ -33,6 +33,7 @@
|
|
|
#include "eclrtl_imp.hpp"
|
|
|
#include "rtlread_imp.hpp"
|
|
|
#include "rtlcommon.hpp"
|
|
|
+#include "rtldynfield.hpp"
|
|
|
|
|
|
#include "jhtree.hpp"
|
|
|
#include "jlog.hpp"
|
|
@@ -157,11 +158,6 @@ public:
|
|
|
return ret.appendf("%p", this);
|
|
|
}
|
|
|
|
|
|
- IRecordLayoutTranslator::Mode getEnableFieldTranslation() const
|
|
|
- {
|
|
|
- return queryFactory.queryOptions().enableFieldTranslation;
|
|
|
- }
|
|
|
-
|
|
|
const char *queryQueryName() const
|
|
|
{
|
|
|
return queryFactory.queryQueryName();
|
|
@@ -852,16 +848,18 @@ protected:
|
|
|
CachedOutputMetaData diskSize;
|
|
|
Owned<IInMemoryIndexCursor> cursor;
|
|
|
Linked<IInMemoryIndexManager> manager;
|
|
|
+ Linked<ITranslatorSet> translators;
|
|
|
Owned<IInMemoryFileProcessor> processor;
|
|
|
- Owned<IFileIOArray> varFiles;
|
|
|
CriticalSection pcrit;
|
|
|
|
|
|
public:
|
|
|
CRoxieDiskReadBaseActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager,
|
|
|
+ IInMemoryIndexManager *_manager,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
|
|
|
: CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
|
|
|
manager(_manager),
|
|
|
+ translators(_translators),
|
|
|
parallelPartNo(_parallelPartNo),
|
|
|
numParallel(_numParallel),
|
|
|
forceUnkeyed(_forceUnkeyed)
|
|
@@ -869,15 +867,8 @@ public:
|
|
|
helper = (IHThorDiskReadBaseArg *) basehelper;
|
|
|
variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
|
|
|
isOpt = (helper->getFlags() & TDRoptional) != 0;
|
|
|
- Linked<IOutputMetaData> diskMeta(helper->queryDiskRecordSize()->querySerializedDiskMeta());
|
|
|
- if (diskMeta->isGrouped())
|
|
|
- {
|
|
|
- diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
|
|
|
- isGrouped = true;
|
|
|
- }
|
|
|
- else
|
|
|
- isGrouped = false;
|
|
|
- diskSize.set(diskMeta);
|
|
|
+ diskSize.set(helper->queryProjectedDiskRecordSize()->querySerializedDiskMeta());
|
|
|
+ isGrouped = diskSize.isGrouped();
|
|
|
processed = 0;
|
|
|
readPos = 0;
|
|
|
isKeyed = false;
|
|
@@ -888,7 +879,7 @@ public:
|
|
|
resentInfo.read(usedKey);
|
|
|
if (usedKey)
|
|
|
{
|
|
|
- cursor.setown(manager->createCursor(diskMeta->queryRecordAccessor(true)));
|
|
|
+ cursor.setown(manager->createCursor(diskSize.queryRecordAccessor(true)));
|
|
|
cursor->deserializeCursorPos(resentInfo);
|
|
|
isKeyed = true;
|
|
|
}
|
|
@@ -913,25 +904,10 @@ public:
|
|
|
|
|
|
virtual void setVariableFileInfo()
|
|
|
{
|
|
|
- const IPropertyTree *options = varFileInfo->queryProperties();
|
|
|
- if (options)
|
|
|
- {
|
|
|
- bool isFileGrouped = options->getPropBool("@grouped");
|
|
|
- if (isFileGrouped && !isGrouped)
|
|
|
- {
|
|
|
- // We are prepared to read contents of a grouped persist ungrouped... But not vice versa
|
|
|
- WARNLOG("Published group information for file %s does not match coded information - assuming grouped", queryDynamicFileName());
|
|
|
- Owned<IOutputMetaData> diskMeta(new CSuffixedOutputMeta(+1, LINK(diskSize.queryOriginal())));
|
|
|
- diskSize.set(diskMeta);
|
|
|
- isGrouped = true;
|
|
|
- }
|
|
|
- size32_t dfsSize = options->getPropInt("@recordSize");
|
|
|
- if (dfsSize && diskSize.isFixedSize() && dfsSize != diskSize.getFixedSize())
|
|
|
- throw MakeStringException(ROXIE_LAYOUT_MISMATCH, "Published record size %d for file %s (%s) does not match coded record size %d", dfsSize, queryDynamicFileName(), isGrouped ? "grouped" : "ungrouped", diskSize.getFixedSize());
|
|
|
- }
|
|
|
unsigned channel = packet->queryHeader().channel;
|
|
|
- varFiles.setown(varFileInfo->getIFileIOArray(isOpt, channel)); // MORE could combine
|
|
|
- manager.setown(varFileInfo->getIndexManager(isOpt, channel, varFiles, diskSize, false, 0));
|
|
|
+ unsigned formatCrc = basefactory->getFormatCrc(helper->getFormatCrc());
|
|
|
+ translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation())); // MORE - FormatCRC may be wrong here. Needs to be crc of projected not expected
|
|
|
+ manager.setown(varFileInfo->getIndexManager(isOpt, channel, nullptr, false, 0));
|
|
|
}
|
|
|
|
|
|
inline bool queryKeyed() const
|
|
@@ -1030,9 +1006,8 @@ public:
|
|
|
class CRoxieDiskBaseActivityFactory : public CSlaveActivityFactory
|
|
|
{
|
|
|
protected:
|
|
|
- Owned<IFileIOArray> fileArray;
|
|
|
+ Owned<ITranslatorSet> translators;
|
|
|
Owned<IInMemoryIndexManager> manager;
|
|
|
-
|
|
|
public:
|
|
|
CRoxieDiskBaseActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
|
: CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
|
|
@@ -1047,8 +1022,9 @@ public:
|
|
|
if (datafile)
|
|
|
{
|
|
|
unsigned channel = queryFactory.queryChannel();
|
|
|
- fileArray.setown(datafile->getIFileIOArray(isOpt, channel));
|
|
|
- manager.setown(datafile->getIndexManager(isOpt, channel, fileArray, helper->queryDiskRecordSize(), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false), _graphNode.getPropInt("att[@name=\"_preloadSize\"]/@value", 0)));
|
|
|
+ unsigned formatCrc = getFormatCrc(helper->getFormatCrc());
|
|
|
+ translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation()));
|
|
|
+ manager.setown(datafile->getIndexManager(isOpt, channel, translators->queryActualLayout(0), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false), _graphNode.getPropInt("att[@name=\"_preloadSize\"]/@value", 0)));
|
|
|
Owned<IPropertyTreeIterator> memKeyInfo = queryFactory.queryPackage().getInMemoryIndexInfo(_graphNode);
|
|
|
if (memKeyInfo)
|
|
|
{
|
|
@@ -1089,8 +1065,8 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CRoxieDiskReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorDiskReadArg *) basehelper;
|
|
@@ -1106,7 +1082,9 @@ public:
|
|
|
{
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(isKeyed ? createKeyedRecordProcessor(cursor, *this, resent) :
|
|
|
- createUnkeyedRecordProcessor(cursor, *this, diskSize.isVariableSize(), isGrouped, manager->createReader(readPos, parallelPartNo, numParallel)));
|
|
|
+ createUnkeyedRecordProcessor(cursor, *this, diskSize.isVariableSize(), isGrouped,
|
|
|
+ manager->createReader(readPos, parallelPartNo, numParallel, translators,
|
|
|
+ queryContext->queryCodeContext(), basefactory->queryId())));
|
|
|
}
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|
|
@@ -1134,8 +1112,8 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager, const IResolvedFile *_datafile, size32_t _maxRowSize)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true), datafile(_datafile), maxRowSize(_maxRowSize)
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators, const IResolvedFile *_datafile, size32_t _maxRowSize)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true), datafile(_datafile), maxRowSize(_maxRowSize)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorCsvReadArg *) basehelper;
|
|
@@ -1152,7 +1130,7 @@ public:
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(
|
|
|
createCsvRecordProcessor(*this,
|
|
|
- manager->createReader(readPos, parallelPartNo, numParallel),
|
|
|
+ manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId()),
|
|
|
packet->queryHeader().channel==1 && !resent,
|
|
|
varFileInfo ? varFileInfo.get() : datafile, maxRowSize));
|
|
|
}
|
|
@@ -1179,8 +1157,8 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true)
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorXmlReadArg *) basehelper;
|
|
@@ -1195,7 +1173,7 @@ public:
|
|
|
{
|
|
|
{
|
|
|
CriticalBlock p(pcrit);
|
|
|
- processor.setown(createXmlRecordProcessor(*this, manager->createReader(readPos, parallelPartNo, numParallel)));
|
|
|
+ processor.setown(createXmlRecordProcessor(*this, manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId())));
|
|
|
}
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|
|
@@ -1220,7 +1198,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager);
|
|
|
+ return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager, translators);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -1246,7 +1224,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, datafile, maxRowSize);
|
|
|
+ return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, translators, datafile, maxRowSize);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -1266,7 +1244,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager);
|
|
|
+ return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager, translators);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -1475,9 +1453,8 @@ class UnkeyedVariableRecordProcessor : public UnkeyedRecordProcessor
|
|
|
{
|
|
|
public:
|
|
|
UnkeyedVariableRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, bool _isGrouped, IDirectReader *_reader)
|
|
|
- : UnkeyedRecordProcessor(_cursor, _owner, _reader), isGrouped(_isGrouped), deserializeSource(_reader)
|
|
|
+ : UnkeyedRecordProcessor(_cursor, _owner, _reader), isGrouped(_isGrouped)
|
|
|
{
|
|
|
- prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
|
|
|
}
|
|
|
|
|
|
virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
|
|
@@ -1485,25 +1462,17 @@ public:
|
|
|
unsigned totalSizeSent = 0;
|
|
|
helper->setCallback(reader->queryThorDiskCallback());
|
|
|
unsigned lastGroupProcessed = processed;
|
|
|
- while (!aborted && !deserializeSource.eos())
|
|
|
+ while (!aborted && !reader->eos())
|
|
|
{
|
|
|
// This loop is the inner loop for memory diskreads - so keep it efficient!
|
|
|
- prefetcher->readAhead(deserializeSource);
|
|
|
- const byte *nextRec = deserializeSource.queryRow();
|
|
|
+ const byte *nextRec = reader->nextRow();
|
|
|
size32_t transformedSize;
|
|
|
if (cursor && cursor->isFiltered(nextRec))
|
|
|
transformedSize = 0;
|
|
|
else
|
|
|
transformedSize = owner.doTransform(output, nextRec);
|
|
|
- bool eog;
|
|
|
- if (isGrouped)
|
|
|
- {
|
|
|
- size32_t sizeRead = deserializeSource.queryRowSize();
|
|
|
- eog = nextRec[sizeRead-1];
|
|
|
- }
|
|
|
- else
|
|
|
- eog = false;
|
|
|
- deserializeSource.finishedRow();
|
|
|
+ bool eog = isGrouped && reader->eog();
|
|
|
+ reader->finishedRow();
|
|
|
if (transformedSize)
|
|
|
{
|
|
|
processed++;
|
|
@@ -1522,7 +1491,7 @@ public:
|
|
|
si.append(siLen);
|
|
|
si.append(processed);
|
|
|
si.append(false); // not using a key
|
|
|
- offset_t readPos = deserializeSource.tell();
|
|
|
+ offset_t readPos = reader->tell();
|
|
|
si.append(readPos);
|
|
|
siLen = si.length() - sizeof(siLen);
|
|
|
si.writeDirect(0, sizeof(siLen), &siLen);
|
|
@@ -1538,7 +1507,7 @@ public:
|
|
|
si.append(siLen);
|
|
|
si.append(processed);
|
|
|
si.append(false); // not using a key
|
|
|
- offset_t readPos = deserializeSource.tell();
|
|
|
+ offset_t readPos = reader->tell();
|
|
|
si.append(readPos);
|
|
|
siLen = si.length() - sizeof(siLen);
|
|
|
si.writeDirect(0, sizeof(siLen), &siLen);
|
|
@@ -1549,14 +1518,12 @@ public:
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
- CThorContiguousRowBuffer deserializeSource;
|
|
|
- Owned<ISourceRowPrefetcher> prefetcher;
|
|
|
bool isGrouped;
|
|
|
};
|
|
|
|
|
|
IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, bool isGrouped, IDirectReader *_reader)
|
|
|
{
|
|
|
- if (variableDisk || isGrouped)
|
|
|
+ if (variableDisk || isGrouped || _reader->isTranslating())
|
|
|
return new UnkeyedVariableRecordProcessor(cursor, owner, isGrouped, _reader);
|
|
|
else
|
|
|
return new UnkeyedRecordProcessor(cursor, owner, _reader);
|
|
@@ -1795,8 +1762,8 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CRoxieDiskNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorDiskNormalizeArg *) basehelper;
|
|
@@ -1813,7 +1780,7 @@ public:
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(isKeyed ?
|
|
|
createKeyedNormalizeRecordProcessor(cursor, *this, resent) :
|
|
|
- createUnkeyedNormalizeRecordProcessor(cursor, *this, manager->createReader(readPos, parallelPartNo, numParallel)));
|
|
|
+ createUnkeyedNormalizeRecordProcessor(cursor, *this, manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId())));
|
|
|
}
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|
|
@@ -1839,7 +1806,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager);
|
|
|
+ return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager, translators);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -2035,8 +2002,8 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CRoxieDiskCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorDiskCountArg *) basehelper;
|
|
@@ -2053,7 +2020,7 @@ public:
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(isKeyed ?
|
|
|
createKeyedCountRecordProcessor(cursor, *this) :
|
|
|
- createUnkeyedCountRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel)));
|
|
|
+ createUnkeyedCountRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId())));
|
|
|
}
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|
|
|
processor->doQuery(output, processed, (unsigned __int64) -1, stopAfter);
|
|
@@ -2070,7 +2037,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager);
|
|
|
+ return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager, translators);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -2302,8 +2269,9 @@ protected:
|
|
|
public:
|
|
|
CRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
IInMemoryIndexManager *_manager,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _parallelPartNo, _numParallel, _forceUnkeyed)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, _parallelPartNo, _numParallel, _forceUnkeyed)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorDiskAggregateArg *) basehelper;
|
|
@@ -2324,7 +2292,7 @@ public:
|
|
|
{
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(isKeyed ? createKeyedAggregateRecordProcessor(cursor, *this) :
|
|
|
- createUnkeyedAggregateRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel)));
|
|
|
+ createUnkeyedAggregateRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId())));
|
|
|
}
|
|
|
processor->doQuery(output, 0, 0, 0);
|
|
|
}
|
|
@@ -2434,7 +2402,7 @@ protected:
|
|
|
OwnedConstRoxieRow finalRow;
|
|
|
public:
|
|
|
CParallelRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager, unsigned _numParallel) :
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
|
|
|
CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel)
|
|
|
{
|
|
|
helper = (IHThorDiskAggregateArg *) basehelper;
|
|
@@ -2444,7 +2412,7 @@ public:
|
|
|
// MORE - avoiding serializing to dummy would be more efficient...
|
|
|
deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
}
|
|
|
- CRoxieDiskAggregateActivity *part0 = new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, numParallel, false);
|
|
|
+ CRoxieDiskAggregateActivity *part0 = new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
|
|
|
parts.append(*part0);
|
|
|
if (part0->queryKeyed())
|
|
|
{
|
|
@@ -2454,7 +2422,7 @@ public:
|
|
|
else
|
|
|
{
|
|
|
for (unsigned i = 1; i < numParallel; i++)
|
|
|
- parts.append(*new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, i, numParallel, true));
|
|
|
+ parts.append(*new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2560,9 +2528,9 @@ public:
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
if (parallelAggregate > 1)
|
|
|
- return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
|
|
|
+ return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
|
|
|
else
|
|
|
- return new CRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, 0, 1, false);
|
|
|
+ return new CRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -2779,8 +2747,9 @@ protected:
|
|
|
public:
|
|
|
CRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
IInMemoryIndexManager *_manager,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
unsigned partNo, unsigned numParts, bool _forceUnkeyed)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, partNo, numParts, _forceUnkeyed),
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, partNo, numParts, _forceUnkeyed),
|
|
|
helper((IHThorDiskGroupAggregateArg *) basehelper),
|
|
|
results(*helper, *helper)
|
|
|
{
|
|
@@ -2804,7 +2773,7 @@ public:
|
|
|
CriticalBlock p(pcrit);
|
|
|
processor.setown(isKeyed ?
|
|
|
createKeyedGroupAggregateRecordProcessor(cursor, results, *helper) :
|
|
|
- createUnkeyedGroupAggregateRecordProcessor(cursor, results, *helper, manager->createReader(readPos, parallelPartNo, numParallel),
|
|
|
+ createUnkeyedGroupAggregateRecordProcessor(cursor, results, *helper, manager->createReader(readPos, parallelPartNo, numParallel, translators, queryContext->queryCodeContext(), basefactory->queryId()),
|
|
|
queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
}
|
|
|
processor->doQuery(output, 0, 0, 0);
|
|
@@ -2824,7 +2793,7 @@ protected:
|
|
|
|
|
|
public:
|
|
|
CParallelRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
- IInMemoryIndexManager *_manager, unsigned _numParallel) :
|
|
|
+ IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
|
|
|
CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel),
|
|
|
helper((IHThorDiskGroupAggregateArg *) basehelper),
|
|
|
resultAggregator(*helper, *helper)
|
|
@@ -2836,7 +2805,7 @@ public:
|
|
|
// MORE - avoiding serializing to dummy would be more efficient...
|
|
|
deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
}
|
|
|
- CRoxieDiskGroupAggregateActivity *part0 = new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, numParallel, false);
|
|
|
+ CRoxieDiskGroupAggregateActivity *part0 = new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
|
|
|
parts.append(*part0);
|
|
|
if (part0->queryKeyed())
|
|
|
{
|
|
@@ -2846,7 +2815,7 @@ public:
|
|
|
else
|
|
|
{
|
|
|
for (unsigned i = 1; i < numParallel; i++)
|
|
|
- parts.append(*new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, i, numParallel, true));
|
|
|
+ parts.append(*new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2931,9 +2900,9 @@ public:
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
if (parallelAggregate > 1)
|
|
|
- return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
|
|
|
+ return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
|
|
|
else
|
|
|
- return new CRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, 0, 1, false);
|
|
|
+ return new CRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -4279,6 +4248,7 @@ ISlaveActivityFactory *createRoxieIndexGroupAggregateActivityFactory(IPropertyTr
|
|
|
class CRoxieFetchActivityFactory : public CSlaveActivityFactory
|
|
|
{
|
|
|
public:
|
|
|
+ Owned<ITranslatorSet> translators; // MORE - use them!
|
|
|
Owned<IFileIOArray> fileArray;
|
|
|
|
|
|
CRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
@@ -4292,7 +4262,11 @@ public:
|
|
|
OwnedRoxieString fname(helper->getFileName());
|
|
|
datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
|
|
|
if (datafile)
|
|
|
+ {
|
|
|
+ unsigned formatCrc = getFormatCrc(helper->getDiskFormatCrc());
|
|
|
+ translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation()));
|
|
|
fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -4302,11 +4276,6 @@ public:
|
|
|
{
|
|
|
return CSlaveActivityFactory::toString(s.append("FETCH "));
|
|
|
}
|
|
|
-
|
|
|
- inline IFileIO *getFilePart(unsigned partNo, offset_t &_base) const
|
|
|
- {
|
|
|
- return fileArray->getFilePart(partNo, _base);
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
class CRoxieFetchActivityBase : public CRoxieSlaveActivity
|
|
@@ -4316,18 +4285,22 @@ protected:
|
|
|
const CRoxieFetchActivityFactory *factory;
|
|
|
Owned<IFileIO> rawFile;
|
|
|
Owned<ISerialStream> rawStream;
|
|
|
- CThorStreamDeserializerSource deserializeSource;
|
|
|
offset_t base;
|
|
|
char *inputData;
|
|
|
char *inputLimit;
|
|
|
- Owned<IFileIOArray> varFiles;
|
|
|
+ Linked<ITranslatorSet> translators;
|
|
|
+ Linked<IFileIOArray> files;
|
|
|
+ const IDynamicTransform *translator = nullptr;
|
|
|
bool needsRHS;
|
|
|
|
|
|
virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData) = 0;
|
|
|
|
|
|
public:
|
|
|
- CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
|
|
|
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory)
|
|
|
+ CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
|
|
|
+ const CRoxieFetchActivityFactory *_aFactory,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
+ IFileIOArray *_files)
|
|
|
+ : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory), translators(_translators), files(_files)
|
|
|
{
|
|
|
helper = (IHThorFetchBaseArg *) basehelper;
|
|
|
base = 0;
|
|
@@ -4346,7 +4319,9 @@ public:
|
|
|
|
|
|
virtual void setVariableFileInfo()
|
|
|
{
|
|
|
- varFiles.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
|
|
|
+ unsigned formatCrc = basefactory->getFormatCrc(helper->getDiskFormatCrc());
|
|
|
+ translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation()));
|
|
|
+ files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
|
|
|
}
|
|
|
|
|
|
virtual IMessagePacker *process();
|
|
@@ -4418,31 +4393,55 @@ IMessagePacker *CRoxieFetchActivityBase::process()
|
|
|
class CRoxieFetchActivity : public CRoxieFetchActivityBase
|
|
|
{
|
|
|
Owned<IEngineRowAllocator> diskAllocator;
|
|
|
- Owned<IOutputRowDeserializer> rowDeserializer;
|
|
|
+ CThorContiguousRowBuffer prefetchSource;
|
|
|
+ Owned<ISourceRowPrefetcher> rowPrefetcher;
|
|
|
+
|
|
|
public:
|
|
|
- CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
|
|
|
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
|
|
|
+ CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
|
|
|
+ const CRoxieFetchActivityFactory *_aFactory,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
+ IFileIOArray *_files)
|
|
|
+ : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files)
|
|
|
{
|
|
|
- IOutputMetaData *diskMeta = helper->queryDiskRecordSize();
|
|
|
+ // If we ever supported superfiles this would need to move to setPartNo, and pass proper subfile idx in
|
|
|
+ rowPrefetcher.setown(translators->getPrefetcher(0, false, queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
+
|
|
|
+ IOutputMetaData *diskMeta = helper->queryProjectedDiskRecordSize();
|
|
|
diskAllocator.setown(getRowAllocator(diskMeta, basefactory->queryId()));
|
|
|
- rowDeserializer.setown(diskMeta->createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
}
|
|
|
|
|
|
virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
|
|
|
{
|
|
|
- RtlDynamicRowBuilder diskRowBuilder(diskAllocator);
|
|
|
- deserializeSource.reset(pos);
|
|
|
- unsigned sizeRead = rowDeserializer->deserialize(diskRowBuilder.ensureRow(), deserializeSource);
|
|
|
- OwnedConstRoxieRow rawBuffer = diskRowBuilder.finalizeRowClear(sizeRead);
|
|
|
- // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
|
|
|
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
|
|
|
- return h->transform(rowBuilder, rawBuffer, inputData, rawpos);
|
|
|
+ prefetchSource.reset(pos);
|
|
|
+ rowPrefetcher->readAhead(prefetchSource);
|
|
|
+ const byte *diskRow = prefetchSource.queryRow();
|
|
|
+ if (translator)
|
|
|
+ {
|
|
|
+ MemoryBuffer buf;
|
|
|
+ MemoryBufferBuilder aBuilder(buf, 0);
|
|
|
+ translator->translate(aBuilder, diskRow);
|
|
|
+ // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
|
|
|
+ IHThorFetchArg *h = (IHThorFetchArg *) helper;
|
|
|
+ return h->transform(rowBuilder, buf.toByteArray(), inputData, rawpos);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
|
|
|
+ IHThorFetchArg *h = (IHThorFetchArg *) helper;
|
|
|
+ return h->transform(rowBuilder, diskRow, inputData, rawpos);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void setPartNo(bool filechanged)
|
|
|
+ {
|
|
|
+ CRoxieFetchActivityBase::setPartNo(filechanged);
|
|
|
+ prefetchSource.setStream(rawStream);
|
|
|
}
|
|
|
};
|
|
|
|
|
|
IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieFetchActivity(logctx, packet, helperFactory, this);
|
|
|
+ return new CRoxieFetchActivity(logctx, packet, helperFactory, this, translators, fileArray);
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------------
|
|
@@ -4451,10 +4450,15 @@ class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
|
|
|
{
|
|
|
CSVSplitter csvSplitter;
|
|
|
size32_t maxRowSize;
|
|
|
+ CThorStreamDeserializerSource deserializeSource;
|
|
|
|
|
|
public:
|
|
|
- CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns, size32_t _maxRowSize)
|
|
|
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory), maxRowSize(_maxRowSize)
|
|
|
+ CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
|
|
|
+ const CRoxieFetchActivityFactory *_aFactory,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
+ IFileIOArray *_files,
|
|
|
+ unsigned _maxColumns, size32_t _maxRowSize)
|
|
|
+ : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files), maxRowSize(_maxRowSize)
|
|
|
{
|
|
|
const char * quotes = NULL;
|
|
|
const char * separators = NULL;
|
|
@@ -4477,6 +4481,8 @@ public:
|
|
|
IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
|
|
|
ICsvParameters *csvInfo = h->queryCsvParameters();
|
|
|
csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators, escapes);
|
|
|
+ if (translator)
|
|
|
+ UNIMPLEMENTED; // It's not obvious how we would support this.
|
|
|
}
|
|
|
|
|
|
virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
|
|
@@ -4499,6 +4505,12 @@ public:
|
|
|
}
|
|
|
return h->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), inputData, rawpos);
|
|
|
}
|
|
|
+
|
|
|
+ virtual void setPartNo(bool filechanged)
|
|
|
+ {
|
|
|
+ CRoxieFetchActivityBase::setPartNo(filechanged);
|
|
|
+ deserializeSource.setStream(rawStream);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSelect
|
|
@@ -4507,14 +4519,21 @@ class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSe
|
|
|
Owned<IColumnProvider> lastMatch;
|
|
|
Owned<IFileIOStream> rawStreamX;
|
|
|
unsigned streamBufferSize;
|
|
|
+ CThorStreamDeserializerSource deserializeSource;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CRoxieFetchActivityBase)
|
|
|
|
|
|
- CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _streamBufferSize)
|
|
|
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory),
|
|
|
+ CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
|
|
|
+ const CRoxieFetchActivityFactory *_aFactory,
|
|
|
+ ITranslatorSet *_translators,
|
|
|
+ IFileIOArray *_files,
|
|
|
+ unsigned _streamBufferSize)
|
|
|
+ : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files),
|
|
|
streamBufferSize(_streamBufferSize)
|
|
|
{
|
|
|
+ if (translator)
|
|
|
+ UNIMPLEMENTED;
|
|
|
}
|
|
|
|
|
|
virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
|
|
@@ -4523,8 +4542,8 @@ public:
|
|
|
try
|
|
|
{
|
|
|
while(!lastMatch)
|
|
|
- if(!parser->next())
|
|
|
- throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
|
|
|
+ if(!parser->next())
|
|
|
+ throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
|
|
|
IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
|
|
|
unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
|
|
|
lastMatch.clear();
|
|
@@ -4546,6 +4565,7 @@ public:
|
|
|
virtual void setPartNo(bool filechanged)
|
|
|
{
|
|
|
CRoxieFetchActivityBase::setPartNo(filechanged);
|
|
|
+ deserializeSource.setStream(rawStream);
|
|
|
rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
|
|
|
parser.setown((factory->getKind()==TAKjsonfetch) ? createJSONParse(*rawStreamX, "/", *this) : createXMLParse(*rawStreamX, "/", *this));
|
|
|
}
|
|
@@ -4554,10 +4574,10 @@ public:
|
|
|
|
|
|
void CRoxieFetchActivityBase::setPartNo(bool filechanged)
|
|
|
{
|
|
|
- rawFile.setown(variableFileName ? varFiles->getFilePart(lastPartNo.partNo, base) : factory->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
|
|
|
+ rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
|
|
|
+ translator = translators->queryTranslator(0); // MORE - superfiles
|
|
|
assertex(rawFile != NULL);
|
|
|
rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
|
|
|
- deserializeSource.setStream(rawStream);
|
|
|
}
|
|
|
|
|
|
class CRoxieCSVFetchActivityFactory : public CRoxieFetchActivityFactory
|
|
@@ -4581,7 +4601,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, maxColumns, maxRowSize);
|
|
|
+ return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, maxColumns, maxRowSize);
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -4595,7 +4615,7 @@ public:
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, 4096);
|
|
|
+ return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, 4096);
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -4962,7 +4982,8 @@ ISlaveActivityFactory *createRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_
|
|
|
class CRoxieKeyedJoinFetchActivityFactory : public CSlaveActivityFactory
|
|
|
{
|
|
|
public:
|
|
|
- Owned<IFileIOArray> fileArray;
|
|
|
+ Owned<ITranslatorSet> translators;
|
|
|
+ Owned<IFileIOArray> files;
|
|
|
|
|
|
CRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
|
: CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
|
|
@@ -4976,7 +4997,11 @@ public:
|
|
|
OwnedRoxieString fileName(helper->getFileName());
|
|
|
datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
|
|
|
if (datafile)
|
|
|
- fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
|
|
|
+ {
|
|
|
+ unsigned formatCrc = getFormatCrc(helper->getDiskFormatCrc());
|
|
|
+ translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation()));
|
|
|
+ files.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -4987,35 +5012,36 @@ public:
|
|
|
{
|
|
|
return CSlaveActivityFactory::toString(s.append("KEYEDJOIN FETCH "));
|
|
|
}
|
|
|
-
|
|
|
- IFileIO *getFilePart(unsigned partNo, offset_t &_base) const
|
|
|
- {
|
|
|
- return fileArray->getFilePart(partNo, _base);
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
|
|
|
{
|
|
|
IHThorKeyedJoinArg *helper;
|
|
|
Owned<IFileIO> rawFile;
|
|
|
- const CRoxieKeyedJoinFetchActivityFactory *factory;
|
|
|
offset_t base;
|
|
|
const char *inputLimit;
|
|
|
const char *inputData;
|
|
|
- Owned<IFileIOArray> varFiles;
|
|
|
+ Linked<ITranslatorSet> translators;
|
|
|
+ Linked<IFileIOArray> files;
|
|
|
Owned<ISerialStream> rawStream;
|
|
|
- CThorStreamDeserializerSource deserializeSource;
|
|
|
+ CThorContiguousRowBuffer prefetchSource;
|
|
|
+ Owned<ISourceRowPrefetcher> prefetcher;
|
|
|
+ const IDynamicTransform *translator = nullptr;
|
|
|
|
|
|
virtual void setPartNo(bool filechanged)
|
|
|
{
|
|
|
- rawFile.setown(variableFileName ? varFiles->getFilePart(lastPartNo.partNo, base) : factory->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
|
|
|
+ rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
|
|
|
+ translator = translators->queryTranslator(0); // MORE - superfiles
|
|
|
rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
|
|
|
- deserializeSource.setStream(rawStream);
|
|
|
+ prefetcher.setown(translators->getPrefetcher(0, false, queryContext->queryCodeContext(), basefactory->queryId()));
|
|
|
+ prefetchSource.setStream(rawStream);
|
|
|
}
|
|
|
|
|
|
public:
|
|
|
- CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory)
|
|
|
- : factory(_aFactory),
|
|
|
+ CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory,
|
|
|
+ IFileIOArray *_files, ITranslatorSet *_translators)
|
|
|
+ : files(_files),
|
|
|
+ translators(_translators),
|
|
|
CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory)
|
|
|
{
|
|
|
// MORE - no continuation row support?
|
|
@@ -5038,7 +5064,9 @@ public:
|
|
|
|
|
|
virtual void setVariableFileInfo()
|
|
|
{
|
|
|
- varFiles.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
|
|
|
+ unsigned formatCrc = basefactory->getFormatCrc(helper->getDiskFormatCrc());
|
|
|
+ translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation()));
|
|
|
+ files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
|
|
|
}
|
|
|
|
|
|
virtual IMessagePacker *process();
|
|
@@ -5058,9 +5086,6 @@ IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
|
|
|
unsigned skipped = 0;
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned totalSizeSent = 0;
|
|
|
- Owned<IOutputRowDeserializer> rowDeserializer = helper->queryDiskRecordSize()->createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId());
|
|
|
- Owned<IEngineRowAllocator> diskAllocator = getRowAllocator(helper->queryDiskRecordSize(), basefactory->queryId());
|
|
|
- RtlDynamicRowBuilder diskRowBuilder(diskAllocator);
|
|
|
|
|
|
CachedOutputMetaData joinFieldsMeta(helper->queryJoinFieldsRecordSize());
|
|
|
Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
|
|
@@ -5080,9 +5105,16 @@ IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
|
|
|
else
|
|
|
pos = rp-base;
|
|
|
|
|
|
- deserializeSource.reset(pos);
|
|
|
- unsigned sizeRead = rowDeserializer->deserialize(diskRowBuilder.ensureRow(), deserializeSource);
|
|
|
- OwnedConstRoxieRow rawBuffer = diskRowBuilder.finalizeRowClear(sizeRead);
|
|
|
+ prefetchSource.reset(pos);
|
|
|
+ prefetcher->readAhead(prefetchSource);
|
|
|
+ const byte *rawRHS = prefetchSource.queryRow();
|
|
|
+ MemoryBuffer buf;
|
|
|
+ if (translator)
|
|
|
+ {
|
|
|
+ MemoryBufferBuilder aBuilder(buf, 0);
|
|
|
+ translator->translate(aBuilder, rawRHS);
|
|
|
+ rawRHS = (const byte *) buf.toByteArray();
|
|
|
+ }
|
|
|
|
|
|
const KeyedJoinHeader *headerPtr = (KeyedJoinHeader *) inputData;
|
|
|
inputData = &headerPtr->rhsdata[0];
|
|
@@ -5091,9 +5123,9 @@ IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
|
|
|
memcpy(&inputSize, inputData, sizeof(inputSize));
|
|
|
inputData += sizeof(inputSize);
|
|
|
}
|
|
|
- if (helper->fetchMatch(inputData, rawBuffer))
|
|
|
+ if (helper->fetchMatch(inputData, rawRHS))
|
|
|
{
|
|
|
- unsigned thisSize = helper->extractJoinFields(jfRowBuilder, rawBuffer, (IBlobProvider*)NULL);
|
|
|
+ unsigned thisSize = helper->extractJoinFields(jfRowBuilder, rawRHS, (IBlobProvider*)NULL);
|
|
|
jfRowBuilder.writeToOutput(thisSize, headerPtr->fpos, headerPtr->thisGroup, headerPtr->partNo);
|
|
|
totalSizeSent += KEYEDJOIN_RECORD_SIZE(thisSize);
|
|
|
processed++;
|
|
@@ -5131,7 +5163,7 @@ IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
|
|
|
|
|
|
IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this);
|
|
|
+ return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this, files, translators);
|
|
|
}
|
|
|
|
|
|
ISlaveActivityFactory *createRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
@@ -5315,7 +5347,9 @@ public:
|
|
|
bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
|
|
|
datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
|
|
|
if (datafile)
|
|
|
+ {
|
|
|
fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|