|
@@ -1119,10 +1119,21 @@ protected:
|
|
|
bool eofSeen = false;
|
|
|
const RtlRecord *record = nullptr;
|
|
|
RowFilter filters;
|
|
|
+ RtlDynRow *filterRow = nullptr;
|
|
|
// virtual field values
|
|
|
StringAttr logicalFilename;
|
|
|
unsigned numInputFields = 0;
|
|
|
|
|
|
+ inline bool fieldFilterMatch(const void * buffer)
|
|
|
+ {
|
|
|
+ if (filterRow)
|
|
|
+ {
|
|
|
+ filterRow->setRow(buffer, filters.getNumFieldsRequired());
|
|
|
+ return filters.matches(*filterRow);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ return true;
|
|
|
+ }
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(PARENT);
|
|
|
|
|
@@ -1133,11 +1144,23 @@ public:
|
|
|
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteDiskBaseActivity: fileName missing");
|
|
|
logicalFilename.set(config.queryProp("virtualFields/logicalFilename"));
|
|
|
}
|
|
|
+ ~CRemoteDiskBaseActivity()
|
|
|
+ {
|
|
|
+ delete filterRow;
|
|
|
+ }
|
|
|
void setupInputMeta(const IPropertyTree &config, IOutputMetaData *_inMeta)
|
|
|
{
|
|
|
inMeta.setown(_inMeta);
|
|
|
record = &inMeta->queryRecordAccessor(true);
|
|
|
numInputFields = record->getNumFields();
|
|
|
+
|
|
|
+ if (config.hasProp("keyFilter"))
|
|
|
+ {
|
|
|
+ filterRow = new RtlDynRow(*record);
|
|
|
+ Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
|
|
|
+ ForEach(*filterIter)
|
|
|
+ filters.addFilter(*record, filterIter->query().queryProp(nullptr));
|
|
|
+ }
|
|
|
}
|
|
|
// IRemoteReadActivity impl.
|
|
|
virtual unsigned __int64 queryProcessed() const override
|
|
@@ -1199,7 +1222,6 @@ protected:
|
|
|
unsigned __int64 startPos = 0;
|
|
|
bool compressed = false;
|
|
|
bool cursorDirty = false;
|
|
|
- RtlDynRow *filterRow = nullptr;
|
|
|
// virtual field values
|
|
|
unsigned partNum = 0;
|
|
|
offset_t baseFpos = 0;
|
|
@@ -1259,16 +1281,6 @@ protected:
|
|
|
opened = false;
|
|
|
eofSeen = true;
|
|
|
}
|
|
|
- inline bool fieldFilterMatch(const void * buffer)
|
|
|
- {
|
|
|
- if (filterRow)
|
|
|
- {
|
|
|
- filterRow->setRow(buffer, 0);
|
|
|
- return filters.matches(*filterRow);
|
|
|
- }
|
|
|
- else
|
|
|
- return true;
|
|
|
- }
|
|
|
public:
|
|
|
CRemoteStreamReadBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
|
|
|
{
|
|
@@ -1278,21 +1290,6 @@ public:
|
|
|
partNum = config.getPropInt("virtualFields/partNum");
|
|
|
baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
|
|
|
}
|
|
|
- ~CRemoteStreamReadBaseActivity()
|
|
|
- {
|
|
|
- delete filterRow;
|
|
|
- }
|
|
|
- void setupInputMeta(const IPropertyTree &config, IOutputMetaData *inMeta)
|
|
|
- {
|
|
|
- PARENT::setupInputMeta(config, inMeta);
|
|
|
- if (config.hasProp("keyFilter"))
|
|
|
- {
|
|
|
- filterRow = new RtlDynRow(*record);
|
|
|
- Owned<IPropertyTreeIterator> filterIter = config.getElements("keyFilter");
|
|
|
- ForEach(*filterIter)
|
|
|
- filters.addFilter(*record, filterIter->query().queryProp(nullptr));
|
|
|
- }
|
|
|
- }
|
|
|
// IVirtualFieldCallback impl.
|
|
|
virtual unsigned __int64 getFilePosition(const void * row) override
|
|
|
{
|
|
@@ -2057,20 +2054,23 @@ public:
|
|
|
while (keyManager->lookup(true))
|
|
|
{
|
|
|
const byte *keyRow = keyManager->queryKeyBuffer();
|
|
|
- if (translator)
|
|
|
- retSz = translator->translate(outBuilder, *this, keyRow);
|
|
|
- else
|
|
|
+ if (fieldFilterMatch(keyRow))
|
|
|
{
|
|
|
- retSz = keyManager->queryRowSize();
|
|
|
- outBuilder.ensureCapacity(retSz, nullptr);
|
|
|
- memcpy(outBuilder.getSelf(), keyRow, retSz);
|
|
|
- }
|
|
|
- dbgassertex(retSz);
|
|
|
+ if (translator)
|
|
|
+ retSz = translator->translate(outBuilder, *this, keyRow);
|
|
|
+ else
|
|
|
+ {
|
|
|
+ retSz = keyManager->queryRowSize();
|
|
|
+ outBuilder.ensureCapacity(retSz, nullptr);
|
|
|
+ memcpy(outBuilder.getSelf(), keyRow, retSz);
|
|
|
+ }
|
|
|
+ dbgassertex(retSz);
|
|
|
|
|
|
- const void *ret = outBuilder.getSelf();
|
|
|
- outBuilder.finishRow(retSz);
|
|
|
- ++processed;
|
|
|
- return ret;
|
|
|
+ const void *ret = outBuilder.getSelf();
|
|
|
+ outBuilder.finishRow(retSz);
|
|
|
+ ++processed;
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
}
|
|
|
retSz = 0;
|
|
|
}
|