|
@@ -22,10 +22,9 @@
|
|
|
#include "jqueue.tpp"
|
|
|
#include "dasess.hpp"
|
|
|
#include "thorxmlwrite.hpp"
|
|
|
-#include "eclhelper_dyn.hpp"
|
|
|
#include "thorstep.ipp"
|
|
|
#include "roxiedebug.hpp"
|
|
|
-#include "hqlexpr.hpp"
|
|
|
+#include "thorcommon.hpp"
|
|
|
#include "rtldynfield.hpp"
|
|
|
|
|
|
#define MAX_FETCH_LOOKAHEAD 1000
|
|
@@ -83,14 +82,6 @@ void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-bool rltEnabled(IConstWorkUnit const * wu)
|
|
|
-{
|
|
|
- if(wu->hasDebugValue("layoutTranslationEnabled"))
|
|
|
- return wu->getDebugValueBool("layoutTranslationEnabled", false);
|
|
|
- else
|
|
|
- return wu->getDebugValueBool("hthorLayoutTranslationEnabled", false);
|
|
|
-}
|
|
|
-
|
|
|
static void setProgress(IPropertyTree &node, const char *name, const char *value)
|
|
|
{
|
|
|
StringBuffer attr("@");
|
|
@@ -684,7 +675,7 @@ const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDist
|
|
|
if(agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
|
|
|
return NULL;
|
|
|
|
|
|
- if(!rltEnabled(agent.queryWorkUnit()))
|
|
|
+ if(agent.rltEnabled() == RecordTranslationMode::None)
|
|
|
{
|
|
|
verifyFormatCrc(helper.getFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, true);
|
|
|
return NULL;
|
|
@@ -693,32 +684,17 @@ const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDist
|
|
|
if(verifyFormatCrc(helper.getFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, false))
|
|
|
return NULL;
|
|
|
|
|
|
- Owned<IOutputMetaData> actualFormat;
|
|
|
IPropertyTree &props = f->queryAttributes();
|
|
|
- bool isGrouped = props.getPropBool("@grouped", false);
|
|
|
- if (props.hasProp("_rtlType"))
|
|
|
- {
|
|
|
- MemoryBuffer layoutBin;
|
|
|
- props.getPropBin("_rtlType", layoutBin);
|
|
|
- actualFormat.setown(createTypeInfoOutputMetaData(layoutBin, isGrouped, nullptr));
|
|
|
- }
|
|
|
- else if (props.hasProp("ECL"))
|
|
|
- {
|
|
|
- StringBuffer layoutECL;
|
|
|
- props.getProp("ECL", layoutECL);
|
|
|
- MultiErrorReceiver errs;
|
|
|
- Owned<IHqlExpression> expr = parseQuery(layoutECL.str(), &errs);
|
|
|
- if (errs.errCount() == 0)
|
|
|
- {
|
|
|
- MemoryBuffer layoutBin;
|
|
|
- if (exportBinaryType(layoutBin, expr))
|
|
|
- actualFormat.setown(createTypeInfoOutputMetaData(layoutBin, isGrouped, nullptr));
|
|
|
- }
|
|
|
- }
|
|
|
+ Owned<IOutputMetaData> actualFormat = getDaliLayoutInfo(props);
|
|
|
if (actualFormat)
|
|
|
{
|
|
|
actualLayouts.append(actualFormat.getLink()); // ensure adequate lifespan
|
|
|
- return createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
|
|
|
+ Owned<const IDynamicTransform> payloadTranslator = createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
|
|
|
+ if (!payloadTranslator->canTranslate())
|
|
|
+ throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
|
|
|
+ if (payloadTranslator->keyedTranslated())
|
|
|
+ throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
|
|
|
+ return payloadTranslator.getClear();
|
|
|
}
|
|
|
throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - key layout information not found", f->queryLogicalName());
|
|
|
}
|
|
@@ -1778,14 +1754,14 @@ protected:
|
|
|
unsigned activityId;
|
|
|
CachedOutputMetaData const & outputMeta;
|
|
|
IEngineRowAllocator * rowAllocator;
|
|
|
- IOutputRowDeserializer * rowDeserializer;
|
|
|
+ ISourceRowPrefetcher * prefetcher;
|
|
|
public:
|
|
|
- FetchPartHandlerBase(offset_t _base, offset_t _size, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
|
|
|
+ FetchPartHandlerBase(offset_t _base, offset_t _size, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
|
|
|
: blockcompressed(_blockcompressed),
|
|
|
encryptionkey(_encryptionkey),
|
|
|
activityId(_activityId),
|
|
|
outputMeta(_outputMeta),
|
|
|
- rowDeserializer(_rowDeserializer),
|
|
|
+ prefetcher(_prefetcher),
|
|
|
rowAllocator(_rowAllocator)
|
|
|
{
|
|
|
base = _base;
|
|
@@ -1869,8 +1845,8 @@ public:
|
|
|
class SimpleFetchPartHandlerBase : public FetchPartHandlerBase, public ThreadedPartHandler<FetchRequest>
|
|
|
{
|
|
|
public:
|
|
|
- SimpleFetchPartHandlerBase(IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
|
|
|
- : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
|
|
|
+ SimpleFetchPartHandlerBase(IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
|
|
|
+ : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
|
|
|
ThreadedPartHandler<FetchRequest>(_part, _handler, _threadPool)
|
|
|
{
|
|
|
}
|
|
@@ -1893,8 +1869,8 @@ private:
|
|
|
class FlatFetchPartHandler : public SimpleFetchPartHandlerBase
|
|
|
{
|
|
|
public:
|
|
|
- FlatFetchPartHandler(IFlatFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
|
|
|
- : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
|
|
|
+ FlatFetchPartHandler(IFlatFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
|
|
|
+ : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
|
|
|
owner(_owner)
|
|
|
{
|
|
|
}
|
|
@@ -1973,7 +1949,7 @@ template <class PARTHANDLER>
|
|
|
class IFetchHandlerFactory
|
|
|
{
|
|
|
public:
|
|
|
- virtual PARTHANDLER * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator) = 0;
|
|
|
+ virtual PARTHANDLER * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator) = 0;
|
|
|
};
|
|
|
|
|
|
template <class PARTHANDLER, class LEFTPTR, class REQUEST>
|
|
@@ -1982,7 +1958,7 @@ class DistributedFileFetchHandler : public DistributedFileFetchHandlerBase
|
|
|
public:
|
|
|
typedef DistributedFileFetchHandler<PARTHANDLER, LEFTPTR, REQUEST> SELF;
|
|
|
|
|
|
- DistributedFileFetchHandler(IDistributedFile * f, IFetchHandlerFactory<PARTHANDLER> & factory, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator) : file(f)
|
|
|
+ DistributedFileFetchHandler(IDistributedFile * f, IFetchHandlerFactory<PARTHANDLER> & factory, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator) : file(f)
|
|
|
{
|
|
|
numParts = f->numParts();
|
|
|
parts = new PARTHANDLER *[numParts];
|
|
@@ -1994,7 +1970,7 @@ public:
|
|
|
{
|
|
|
IDistributedFilePart *part = f->getPart(idx);
|
|
|
offset_t size = getPartSize(part);
|
|
|
- parts[idx] = factory.createFetchPartHandler(part, base, size, this, blockcompressed, encryptionkey, rowDeserializer, rowAllocator);
|
|
|
+ parts[idx] = factory.createFetchPartHandler(part, base, size, this, blockcompressed, encryptionkey, prefetcher, rowAllocator);
|
|
|
base += size;
|
|
|
}
|
|
|
exception = NULL;
|
|
@@ -2257,7 +2233,7 @@ public:
|
|
|
fetch.getFileEncryptKey(kl,k);
|
|
|
MemoryAttr encryptionkey;
|
|
|
encryptionkey.setOwn(kl,k);
|
|
|
- parts.setown(new DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest>(f, *this, encryptionkey, rowDeserializer, rowAllocator));
|
|
|
+ parts.setown(new DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest>(f, *this, encryptionkey, prefetcher, rowAllocator));
|
|
|
}
|
|
|
|
|
|
virtual void stopParts()
|
|
@@ -2357,7 +2333,9 @@ public:
|
|
|
dequeuedSeq = 0;
|
|
|
}
|
|
|
protected:
|
|
|
- Owned<IOutputRowDeserializer> rowDeserializer;
|
|
|
+ Owned<ISourceRowPrefetcher> prefetcher;
|
|
|
+ Owned<IOutputMetaData> actualDiskMeta;
|
|
|
+ Owned<const IDynamicTransform> translator;
|
|
|
private:
|
|
|
PartHandlerThreadFactory<FetchRequest> threadFactory;
|
|
|
Owned<DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest> > parts;
|
|
@@ -2381,20 +2359,31 @@ public:
|
|
|
{
|
|
|
CHThorFetchActivityBase::ready();
|
|
|
rowLimit = helper.getRowLimit();
|
|
|
- rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
|
|
|
- diskAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void initParts(IDistributedFile * f) override
|
|
|
+ {
|
|
|
+ CHThorFetchActivityBase::initParts(f);
|
|
|
+ prefetcher.setown(actualDiskMeta->createDiskPrefetcher());
|
|
|
}
|
|
|
|
|
|
virtual bool needsAllocator() const { return true; }
|
|
|
|
|
|
virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
|
|
|
{
|
|
|
- CThorStreamDeserializerSource deserializeSource;
|
|
|
- deserializeSource.setStream(rawStream);
|
|
|
- deserializeSource.reset(pos);
|
|
|
- RtlDynamicRowBuilder rowBuilder(diskAllocator);
|
|
|
- unsigned sizeRead = rowDeserializer->deserialize(rowBuilder.ensureRow(), deserializeSource);
|
|
|
- OwnedConstRoxieRow rawBuffer(rowBuilder.finalizeRowClear(sizeRead));
|
|
|
+ CThorContiguousRowBuffer prefetchSource;
|
|
|
+ prefetchSource.setStream(rawStream);
|
|
|
+ prefetchSource.reset(pos);
|
|
|
+ prefetcher->readAhead(prefetchSource);
|
|
|
+ const byte *rawBuffer = prefetchSource.queryRow();
|
|
|
+
|
|
|
+ MemoryBuffer buf;
|
|
|
+ if (translator)
|
|
|
+ {
|
|
|
+ MemoryBufferBuilder aBuilder(buf, 0);
|
|
|
+ translator->translate(aBuilder, rawBuffer);
|
|
|
+ rawBuffer = reinterpret_cast<const byte *>(buf.toByteArray());
|
|
|
+ }
|
|
|
|
|
|
CriticalBlock procedure(transformCrit);
|
|
|
size32_t thisSize;
|
|
@@ -2422,22 +2411,51 @@ public:
|
|
|
helper.onLimitExceeded();
|
|
|
}
|
|
|
|
|
|
- virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
|
|
|
+ virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
|
|
|
{
|
|
|
- return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
|
|
|
+ return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
virtual void verifyFetchFormatCrc(IDistributedFile * f)
|
|
|
{
|
|
|
- if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
|
|
|
- ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
+ actualDiskMeta.set(helper.queryDiskRecordSize());
|
|
|
+ translator.clear();
|
|
|
+ if (agent.rltEnabled()==RecordTranslationMode::None)
|
|
|
+ {
|
|
|
+ if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
|
|
|
+ ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ bool crcMatched = ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, false); // MORE - fetch requires all to match.
|
|
|
+ if (!crcMatched)
|
|
|
+ {
|
|
|
+ IPropertyTree &props = f->queryAttributes();
|
|
|
+ actualDiskMeta.setown(getDaliLayoutInfo(props));
|
|
|
+ if (actualDiskMeta)
|
|
|
+ {
|
|
|
+ translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
|
|
|
+ if (translator->canTranslate())
|
|
|
+ {
|
|
|
+ if (agent.rltEnabled()==RecordTranslationMode::None)
|
|
|
+ throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
|
|
|
+#ifdef _DEBUG
|
|
|
+ translator->describe();
|
|
|
+#endif
|
|
|
+ }
|
|
|
+ else
|
|
|
+ throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());
|
|
|
+ }
|
|
|
+ else
|
|
|
+ throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s - key layout information not found", f->queryLogicalName());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
CriticalSection transformCrit;
|
|
|
IHThorFetchArg & helper;
|
|
|
- Owned<IEngineRowAllocator> diskAllocator;
|
|
|
};
|
|
|
|
|
|
extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind)
|
|
@@ -2527,7 +2545,6 @@ public:
|
|
|
{
|
|
|
CHThorFetchActivityBase::ready();
|
|
|
rowLimit = helper.getRowLimit();
|
|
|
- rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
|
|
|
}
|
|
|
|
|
|
virtual void onLimitExceeded()
|
|
@@ -2535,9 +2552,9 @@ public:
|
|
|
helper.onLimitExceeded();
|
|
|
}
|
|
|
|
|
|
- virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
|
|
|
+ virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
|
|
|
{
|
|
|
- return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
|
|
|
+ return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
|
|
|
}
|
|
|
|
|
|
protected:
|
|
@@ -2675,7 +2692,7 @@ public:
|
|
|
helper.onLimitExceeded();
|
|
|
}
|
|
|
|
|
|
- virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
|
|
|
+ virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
|
|
|
{
|
|
|
return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, kind==TAKjsonfetch); //MORE: need to put correct stream buffer size here, when Gavin provides it
|
|
|
}
|
|
@@ -3285,8 +3302,8 @@ public:
|
|
|
class KeyedJoinFetchPartHandler : public FetchPartHandlerBase, public ThreadedPartHandler<KeyedJoinFetchRequest>
|
|
|
{
|
|
|
public:
|
|
|
- KeyedJoinFetchPartHandler(IKeyedJoinFetchHandlerCallback & _owner, IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
|
|
|
- : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
|
|
|
+ KeyedJoinFetchPartHandler(IKeyedJoinFetchHandlerCallback & _owner, IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, ISourceRowPrefetcher * _prefetcher, IEngineRowAllocator *_rowAllocator)
|
|
|
+ : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _prefetcher, _rowAllocator),
|
|
|
ThreadedPartHandler<KeyedJoinFetchRequest>(_part, _handler, _threadPool),
|
|
|
owner(_owner)
|
|
|
{
|
|
@@ -3353,10 +3370,10 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
|
|
|
IDistributedFile * dFile;
|
|
|
IDistributedSuperFile * super;
|
|
|
CachedOutputMetaData eclKeySize;
|
|
|
- Owned<IOutputRowDeserializer> rowDeserializer;
|
|
|
- Owned<IEngineRowAllocator> diskAllocator;
|
|
|
- IPointerArrayOf<IOutputMetaData> actualLayouts;
|
|
|
-
|
|
|
+ Owned<ISourceRowPrefetcher> prefetcher;
|
|
|
+ IPointerArrayOf<IOutputMetaData> actualLayouts; // all the index layouts are saved in here to ensure their lifetime is adequate
|
|
|
+ Owned<IOutputMetaData> actualDiskMeta; // only one disk layout is permitted
|
|
|
+ Owned<const IDynamicTransform> translator;
|
|
|
public:
|
|
|
CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind)
|
|
|
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _arg.queryDiskRecordSize()), helper(_arg)
|
|
@@ -3406,11 +3423,6 @@ public:
|
|
|
defaultRight.setown(rowBuilder.finalizeRowClear(thisSize));
|
|
|
}
|
|
|
}
|
|
|
- if (needsDiskRead)
|
|
|
- {
|
|
|
- rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
|
|
|
- diskAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
virtual void stop()
|
|
@@ -3435,7 +3447,8 @@ public:
|
|
|
if (needsDiskRead)
|
|
|
{
|
|
|
inputRowAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
|
|
|
- parts.setown(new DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest>(f, *this, encryptionkey, rowDeserializer, inputRowAllocator));
|
|
|
+ parts.setown(new DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest>(f, *this, encryptionkey, prefetcher, inputRowAllocator));
|
|
|
+ prefetcher.setown(actualDiskMeta->createDiskPrefetcher());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3527,20 +3540,26 @@ public:
|
|
|
stopThread();
|
|
|
}
|
|
|
|
|
|
- virtual KeyedJoinFetchPartHandler * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
|
|
|
+ virtual KeyedJoinFetchPartHandler * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, ISourceRowPrefetcher * prefetcher, IEngineRowAllocator *rowAllocator)
|
|
|
{
|
|
|
- return new KeyedJoinFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
|
|
|
+ return new KeyedJoinFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
|
|
|
}
|
|
|
|
|
|
virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
|
|
|
{
|
|
|
- CThorStreamDeserializerSource deserializeSource;
|
|
|
- deserializeSource.setStream(rawStream);
|
|
|
- deserializeSource.reset(pos);
|
|
|
- RtlDynamicRowBuilder rowBuilder(diskAllocator);
|
|
|
- unsigned sizeRead = rowDeserializer->deserialize(rowBuilder.ensureRow(), deserializeSource);
|
|
|
- OwnedConstRoxieRow row = rowBuilder.finalizeRowClear(sizeRead);
|
|
|
+ CThorContiguousRowBuffer prefetchSource;
|
|
|
+ prefetchSource.setStream(rawStream);
|
|
|
+ prefetchSource.reset(pos);
|
|
|
+ prefetcher->readAhead(prefetchSource);
|
|
|
+ const byte *row = prefetchSource.queryRow();
|
|
|
|
|
|
+ MemoryBuffer buf;
|
|
|
+ if (translator)
|
|
|
+ {
|
|
|
+ MemoryBufferBuilder aBuilder(buf, 0);
|
|
|
+ translator->translate(aBuilder, row);
|
|
|
+ row = reinterpret_cast<const byte *>(buf.toByteArray());
|
|
|
+ }
|
|
|
if(match(fetch->ms, row))
|
|
|
{
|
|
|
if(exclude)
|
|
@@ -4019,7 +4038,7 @@ protected:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- if(!rltEnabled(agent.queryWorkUnit()))
|
|
|
+ if(agent.rltEnabled() == RecordTranslationMode::None)
|
|
|
{
|
|
|
verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, true);
|
|
|
return NULL;
|
|
@@ -4030,32 +4049,17 @@ protected:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- Owned<IOutputMetaData> actualFormat;
|
|
|
IPropertyTree &props = f->queryAttributes();
|
|
|
- bool isGrouped = props.getPropBool("@grouped", false);
|
|
|
- if (props.hasProp("_rtlType"))
|
|
|
- {
|
|
|
- MemoryBuffer layoutBin;
|
|
|
- props.getPropBin("_rtlType", layoutBin);
|
|
|
- actualFormat.setown(createTypeInfoOutputMetaData(layoutBin, isGrouped, nullptr));
|
|
|
- }
|
|
|
- else if (props.hasProp("ECL"))
|
|
|
- {
|
|
|
- StringBuffer layoutECL;
|
|
|
- props.getProp("ECL", layoutECL);
|
|
|
- MultiErrorReceiver errs;
|
|
|
- Owned<IHqlExpression> expr = parseQuery(layoutECL.str(), &errs);
|
|
|
- if (errs.errCount() == 0)
|
|
|
- {
|
|
|
- MemoryBuffer layoutBin;
|
|
|
- if (exportBinaryType(layoutBin, expr))
|
|
|
- actualFormat.setown(createTypeInfoOutputMetaData(layoutBin, isGrouped, nullptr));
|
|
|
- }
|
|
|
- }
|
|
|
+ Owned<IOutputMetaData> actualFormat = getDaliLayoutInfo(props);
|
|
|
if (actualFormat)
|
|
|
{
|
|
|
actualLayouts.append(actualFormat.getLink()); // ensure adequate lifespan
|
|
|
- return createRecordTranslator(helper.queryProjectedIndexRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
|
|
|
+ Owned<const IDynamicTransform> payloadTranslator = createRecordTranslator(helper.queryProjectedIndexRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
|
|
|
+ if (!payloadTranslator->canTranslate())
|
|
|
+ throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
|
|
|
+ if (payloadTranslator->keyedTranslated())
|
|
|
+ throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
|
|
|
+ return payloadTranslator.getClear();
|
|
|
}
|
|
|
throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - key layout information not found", f->queryLogicalName());
|
|
|
}
|
|
@@ -4080,8 +4084,35 @@ protected:
|
|
|
|
|
|
virtual void verifyFetchFormatCrc(IDistributedFile * f)
|
|
|
{
|
|
|
- if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
|
|
|
- ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
+ actualDiskMeta.set(helper.queryDiskRecordSize());
|
|
|
+ translator.clear();
|
|
|
+ if (agent.rltEnabled()==RecordTranslationMode::None)
|
|
|
+ {
|
|
|
+ if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
|
|
|
+ ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ bool crcMatched = ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, false); // MORE - fetch requires all to match.
|
|
|
+ if (!crcMatched)
|
|
|
+ {
|
|
|
+ IPropertyTree &props = f->queryAttributes();
|
|
|
+ actualDiskMeta.setown(getDaliLayoutInfo(props));
|
|
|
+ if (actualDiskMeta)
|
|
|
+ {
|
|
|
+ translator.setown(createRecordTranslator(helper.queryProjectedDiskRecordSize()->queryRecordAccessor(true), actualDiskMeta->queryRecordAccessor(true)));
|
|
|
+ if (translator->canTranslate())
|
|
|
+ {
|
|
|
+ if (agent.rltEnabled()==RecordTranslationMode::None)
|
|
|
+ throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
|
|
|
+ }
|
|
|
+ else
|
|
|
+ throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());
|
|
|
+ }
|
|
|
+ else
|
|
|
+ throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s - key layout information not found", f->queryLogicalName());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
virtual const RtlRecord &queryIndexRecord()
|