|
@@ -81,6 +81,7 @@ protected:
|
|
Owned<IRowStream> keyOutStream;
|
|
Owned<IRowStream> keyOutStream;
|
|
CActivityBase &owner;
|
|
CActivityBase &owner;
|
|
Linked<IThorRowInterfaces> keyRowIf, fetchRowIf;
|
|
Linked<IThorRowInterfaces> keyRowIf, fetchRowIf;
|
|
|
|
+ StringAttr logicalFilename;
|
|
|
|
|
|
class CFPosHandler : implements IHash, public CSimpleInterface
|
|
class CFPosHandler : implements IHash, public CSimpleInterface
|
|
{
|
|
{
|
|
@@ -122,8 +123,9 @@ protected:
|
|
public:
|
|
public:
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CFetchStream(CActivityBase &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp)
|
|
|
|
- : owner(_owner), keyRowIf(_keyRowIf), fetchRowIf(_fetchRowIf), abortSoon(_abortSoon), iFetchHandler(_iFetchHandler), offsetCount(_offsetCount), tag(_tag), eexp(_eexp)
|
|
|
|
|
|
+ CFetchStream(CActivityBase &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp)
|
|
|
|
+ : owner(_owner), keyRowIf(_keyRowIf), fetchRowIf(_fetchRowIf), abortSoon(_abortSoon), logicalFilename(_logicalFilename),
|
|
|
|
+ iFetchHandler(_iFetchHandler), offsetCount(_offsetCount), tag(_tag), eexp(_eexp)
|
|
{
|
|
{
|
|
distributor = NULL;
|
|
distributor = NULL;
|
|
fposHash = NULL;
|
|
fposHash = NULL;
|
|
@@ -155,7 +157,7 @@ public:
|
|
e->top = e->base + part.queryProperties().getPropInt64("@size");
|
|
e->top = e->base + part.queryProperties().getPropInt64("@size");
|
|
e->index = f;
|
|
e->index = f;
|
|
|
|
|
|
- Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(owner, part);
|
|
|
|
|
|
+ Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(owner, logicalFilename, part);
|
|
e->file = lfile.getClear();
|
|
e->file = lfile.getClear();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -262,9 +264,9 @@ public:
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
-IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp)
|
|
|
|
|
|
+IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp)
|
|
{
|
|
{
|
|
- return new CFetchStream(owner, keyRowIf, fetchRowIf, abortSoon, parts, offsetCount, offsetMapSz, offsetMap, iFetchHandler, tag, eexp);
|
|
|
|
|
|
+ return new CFetchStream(owner, keyRowIf, fetchRowIf, abortSoon, logicalFilename, parts, offsetCount, offsetMapSz, offsetMap, iFetchHandler, tag, eexp);
|
|
}
|
|
}
|
|
|
|
|
|
class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
|
|
class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
|
|
@@ -456,7 +458,7 @@ public:
|
|
}
|
|
}
|
|
|
|
|
|
Owned<IThorRowInterfaces> rowIf = createRowInterfaces(queryRowMetaData());
|
|
Owned<IThorRowInterfaces> rowIf = createRowInterfaces(queryRowMetaData());
|
|
- fetchStream = createFetchStream(*this, keyInIf, rowIf, abortSoon, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp);
|
|
|
|
|
|
+ fetchStream = createFetchStream(*this, keyInIf, rowIf, abortSoon, fetchContext->getFileName(), parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp);
|
|
fetchStreamOut = fetchStream->queryOutput();
|
|
fetchStreamOut = fetchStream->queryOutput();
|
|
fetchStream->start(keyIn);
|
|
fetchStream->start(keyIn);
|
|
initializeFileParts();
|
|
initializeFileParts();
|