瀏覽代碼

HPCC-19565 Remote disk read support in Thor

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 年之前
父節點
當前提交
45c41b701d

+ 7 - 16
common/remote/sockfile.cpp

@@ -66,7 +66,6 @@
 
 static const unsigned __int64 defaultFileStreamChooseN = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
 static const unsigned __int64 defaultFileStreamSkipN = 0;
-static const unsigned __int64 defaultFileStreamRowLimit = (unsigned __int64) -1;
 static const unsigned defaultDaFSNumRecs = 100;
 enum OutputFormat:byte { outFmt_Binary, outFmt_Xml, outFmt_Json };
 
@@ -1790,7 +1789,7 @@ public:
     IMPLEMENT_IINTERFACE;
     // Really a stream, but life (maybe) easier elsewhere if looks like a file
     // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
-    CRemoteFilteredFileIO(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped)
+    CRemoteFilteredFileIO(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseNLimit)
     : CRemoteBase(ep, filename)
     {
         // NB: input_grouped == output_grouped for now, but may want output to be ungrouped
@@ -1802,6 +1801,8 @@ public:
             " \"compressed\" : \"%s\",\n"
             " \"input_grouped\" : \"%s\",\n"
             " \"output_grouped\" : \"%s\"", filename, boolToStr(compressed), boolToStr(grouped), boolToStr(grouped));
+        if (chooseNLimit)
+            request.appendf(",\n \"choosen\" : \"%" I64F "u\"", chooseNLimit);
         if (fieldFilters.numFilterFields())
         {
             request.append(",\n \"keyfilter\" : [\n  ");
@@ -1952,7 +1953,7 @@ class CRemoteFilteredRowStream : public CRemoteFilteredFileIO, implements IRowSt
 {
 public:
     CRemoteFilteredRowStream(const RtlRecord &_recInfo, SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped)
-    : CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped), recInfo(_recInfo)
+    : CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, 0), recInfo(_recInfo)
     {}
     virtual const byte *queryNextRow()  // NOTE - rows returned must NOT be freed
     {
@@ -1974,11 +1975,11 @@ protected:
     const RtlRecord &recInfo;
 };
 
-extern IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped)
+extern IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseNLimit)
 {
     try
     {
-        return new CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped);
+        return new CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, chooseNLimit);
     }
     catch (IException *e)
     {
@@ -3804,7 +3805,6 @@ class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>, impl
     Linked<IOutputMetaData> outMeta;
     Owned<const IDynamicTransform> translator;
     unsigned __int64 chooseN = 0;
-    unsigned __int64 limit = 0;
     unsigned __int64 processed = 0;
     unsigned __int64 startPos = 0;
     bool compressed = false;
@@ -3862,7 +3862,6 @@ class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>, impl
         prefetcher.setown(helper->queryDiskRecordSize()->createDiskPrefetcher());
 
         chooseN = helper->getChooseNLimit();
-        limit = helper->getRowLimit();
         opened = true;
     }
     void close()
@@ -3933,13 +3932,6 @@ public:
 
                 if (rowSz)
                 {
-                    if (processed >= limit)
-                    {
-                        helper->onLimitExceeded();
-                        eogPending = false;
-                        someInGroup = false;
-                        return nullptr;
-                    }
                     processed++;
                     eogPending = eog;
                     someInGroup = true;
@@ -4017,7 +4009,6 @@ IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
     const char *fileName = actNode.queryProp("fileName");
     unsigned __int64 chooseN = actNode.getPropInt64("choosen", defaultFileStreamChooseN);
     unsigned __int64 skipN = actNode.getPropInt64("skipN", defaultFileStreamSkipN);
-    unsigned __int64 rowLimit = actNode.getPropInt64("rowLimit", defaultFileStreamRowLimit);
     bool compressed = actNode.getPropBool("compressed");
 
     bool inputGrouped = actNode.getPropBool("input_grouped", false);
@@ -4028,7 +4019,7 @@ IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
     Owned<IOutputMetaData> outMeta = getTypeInfoOutputMetaData(actNode, "output", outputGrouped);
     if (!outMeta)
         outMeta.set(inMeta);
-    Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, LINK(inMeta), LINK(outMeta), LINK(outMeta), chooseN, skipN, rowLimit);
+    Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, LINK(inMeta), LINK(outMeta), LINK(outMeta), chooseN, skipN, (unsigned __int64)-1);
     Owned<CRemoteDiskReadActivity> ret = new CRemoteDiskReadActivity(*helper, compressed, inputGrouped, outputGrouped);
     Owned<IPropertyTreeIterator> filterIter = actNode.getElements("keyfilter");
     ForEach(*filterIter)

+ 1 - 1
common/remote/sockfile.hpp

@@ -80,7 +80,7 @@ extern REMOTE_API ISocket *connectDafs(SocketEndpoint &ep, unsigned timeoutms);
 extern REMOTE_API ISocket *checkSocketSecure(ISocket *socket);
 class IOutputMetaData;
 class RowFilter;
-extern REMOTE_API IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped);
+extern REMOTE_API IFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseNLimit);
 
 
 // client only

+ 17 - 5
common/thorhelper/thorcommon.cpp

@@ -1054,13 +1054,12 @@ protected:
                 return nullptr;
             currentRowOffset = prefetchBuffer.tell();
             prefetcher->readAhead(prefetchBuffer);
-            const byte * row = prefetchBuffer.queryRow();
-            bool matched = fieldFilterMatch(row);
+            bool matched = fieldFilterMatch(prefetchBuffer.queryRow());
             checkEog();
             if (matched) // NB: prefetchDone() call must be paired with a row returned from prefetchRow()
             {
                 hadMatchInGroup = true;
-                return row;
+                return prefetchBuffer.queryRow(); // NB: buffer ptr could have changed due to reading eog byte
             }
             else
                 prefetchBuffer.finishedRow();
@@ -1342,6 +1341,15 @@ public:
 unsigned CRowStreamReader::rdnum;
 #endif
 
+IExtRowStream *createRowStreamEx(IFileIO *fileIO, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, ITranslator *translatorContainer, IVirtualFieldCallback * fieldCallback)
+{
+    EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(rwFlags);
+    if (maxrows == (unsigned __int64)-1)
+        return new CRowStreamReader(fileIO, NULL, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
+    else
+        return new CLimitedRowStreamReader(fileIO, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
+}
+
 bool UseMemoryMappedRead = false;
 
 IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp, ITranslator *translatorContainer, IVirtualFieldCallback * fieldCallback)
@@ -2058,8 +2066,12 @@ static bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<con
                 {
                     throw MakeStringException(0, "Translatable record layout mismatch detected for file %s, but translation disabled", tracing);
                 }
-                if (keyedTranslator)
-                    keyedTranslator->setown(createKeyTranslator(sourceFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true)));
+                if (keyedTranslator && (sourceFormat != expectedFormat))
+                {
+                    Owned<const IKeyTranslator> _keyedTranslator = createKeyTranslator(sourceFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true));
+                    if (_keyedTranslator->needsTranslate())
+                        keyedTranslator->swap(_keyedTranslator);
+                }
             }
             else
                 translator.clear();

+ 1 - 0
common/thorhelper/thorcommon.hpp

@@ -195,6 +195,7 @@ interface ITranslator : extends IInterface
     virtual const IKeyTranslator *queryKeyedTranslator() const = 0;
 };
 interface IExpander;
+extern THORHELPER_API IExtRowStream *createRowStreamEx(IFileIO *fileIO, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback=nullptr);
 extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback=nullptr);
 extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback = nullptr);
 interface ICompressor;

+ 6 - 1
ecl/hthor/hthor.cpp

@@ -8281,7 +8281,8 @@ bool CHThorDiskReadBaseActivity::openNext()
                         setDafsEndpointPort(ep);
                         StringBuffer path;
                         rfilename.getLocalPath(path);
-                        inputfileio.setown(createRemoteFilteredFile(ep, path, actualDiskMeta, projectedDiskMeta, actualFilter, compressed, grouped));
+
+                        inputfileio.setown(createRemoteFilteredFile(ep, path, actualDiskMeta, projectedDiskMeta, actualFilter, compressed, grouped, remoteLimit));
                         if (inputfileio)
                         {
                             actualDiskMeta.set(projectedDiskMeta);
@@ -8539,6 +8540,8 @@ void CHThorDiskReadActivity::ready()
     if (helper.getFlags() & TDRlimitskips)
         limit = (unsigned __int64) -1;
     stopAfter = helper.getChooseNLimit();
+    if (!helper.transformMayFilter())
+        remoteLimit = stopAfter;
 }
 
 
@@ -8849,6 +8852,8 @@ void CHThorDiskCountActivity::ready()
     PARENT::ready(); 
     finished = false;
     stopAfter = helper.getChooseNLimit();
+    if (!helper.hasFilter())
+        remoteLimit = stopAfter;
 }
 
 void CHThorDiskCountActivity::gatherInfo(IFileDescriptor * fd)

+ 3 - 5
ecl/hthor/hthor.ipp

@@ -2254,6 +2254,8 @@ protected:
     bool grouped;
     enum ReadType:byte { rt_unknown, rt_binary, rt_csv, rt_xml } readType = rt_unknown;
 
+    unsigned __int64 stopAfter = 0;
+    unsigned __int64 remoteLimit = 0;
     unsigned __int64 localOffset;
     unsigned __int64 offsetOfPart;
     StringBuffer mangledHelperFileName;
@@ -2265,6 +2267,7 @@ protected:
     IPointerArrayOf<IOutputMetaData> actualLayouts;  // Do we need to keep more than one?
     IConstArrayOf<IFieldFilter> fieldFilters;  // These refer to the expected layout
     RowFilter actualFilter;               // This refers to the actual disk layout
+
     void close();
     virtual void open();
     void resolve();
@@ -2358,7 +2361,6 @@ protected:
     unsigned __int64 lastGroupProcessed;
     RtlDynamicRowBuilder outBuilder;
     unsigned __int64 limit;
-    unsigned __int64 stopAfter;
 
 public:
     CHThorDiskReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &_arg, ThorActivityKind _kind);
@@ -2396,7 +2398,6 @@ protected:
     size32_t            maxDiskSize;
     CSVSplitter         csvSplitter;    
     unsigned __int64 limit;
-    unsigned __int64 stopAfter;
     size32_t maxRowSize;
 };
 
@@ -2436,7 +2437,6 @@ protected:
     Owned<IXMLParse> xmlParser;
     Owned<IColumnProvider> lastMatch;
     unsigned __int64 limit;
-    unsigned __int64 stopAfter;
 };
 
 
@@ -2447,7 +2447,6 @@ protected:
     IHThorDiskNormalizeArg &helper;
     RtlDynamicRowBuilder outBuilder;
     unsigned __int64 limit;
-    unsigned __int64 stopAfter;
     size32_t lastSizeRead;
     bool expanding;
 
@@ -2494,7 +2493,6 @@ class CHThorDiskCountActivity : public CHThorBinaryDiskReadBase
 protected:
     IHThorDiskCountArg &helper;
     bool finished;
-    unsigned __int64 stopAfter;
 
     virtual void gatherInfo(IFileDescriptor * fileDesc);
 

+ 1 - 2
rtl/eclrtl/eclhelper_dyn.cpp

@@ -382,8 +382,7 @@ extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
     const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
     unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
     unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
-    unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
-    Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), LINK(out), LINK(out), chooseN, skipN, rowLimit);
+    Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), LINK(out), LINK(out), chooseN, skipN, (unsigned __int64) -1);
     Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
     ForEach(*filters)
         ret->addFilter(filters->query().queryProp("@value"));

+ 93 - 31
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -25,10 +25,14 @@
 #include "jtime.hpp"
 
 #include "dafdesc.hpp"
+#include "rtlcommon.hpp"
 #include "rtlkey.hpp"
 #include "rtlrecord.hpp"
 #include "eclhelper.hpp" // tmp for IHThor..Arg interfaces.
 
+#include "rmtfile.hpp"
+#include "sockfile.hpp"
+
 #include "thormisc.hpp"
 #include "thmfilemanager.hpp"
 #include "thorport.hpp"
@@ -56,6 +60,9 @@ protected:
     unsigned numSegFieldsUsed = 0;
     bool needTransform = false;
     rowcount_t totalProgress = 0;
+    rowcount_t stopAfter = 0;
+    rowcount_t remoteLimit = 0;
+    rowcount_t limit = 0;
 
     // return a ITranslator based on published format in part and expected/format
     ITranslator *getTranslators(IPartDescriptor &partDesc)
@@ -239,8 +246,7 @@ void CDiskRecordPartHandler::getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescri
 
 void CDiskRecordPartHandler::open()
 {
-    CDiskPartHandlerBase::open();
-
+    // free last part and note progress
     Owned<IExtRowStream> partStream;
     {
         CriticalBlock block(statsCs);
@@ -250,46 +256,96 @@ void CDiskRecordPartHandler::open()
         activity.totalProgress += partStream->queryProgress();
     partStream.clear();
 
-    unsigned rwFlags = DEFAULT_RWFLAGS;
+    unsigned rwFlags = 0;
     if (checkFileCrc) // NB: if compressed, this will be turned off by base class
         rwFlags |= rw_crc;
     if (activity.grouped)
         rwFlags |= rw_grouped;
 
+    IOutputMetaData *projectedFormat = activity.helper->queryProjectedDiskRecordSize();
+    IOutputMetaData *expectedFormat = activity.helper->queryDiskRecordSize();
     Owned<ITranslator> translator = activity.getTranslators(*partDesc);
-    if (compressed)
+    IOutputMetaData *actualFormat = translator ? &translator->queryActualFormat() : expectedFormat;
+    bool canSerializeTypeInfo = actualFormat->queryTypeInfo()->canSerialize() && projectedFormat->queryTypeInfo()->canSerialize();
+    if (canSerializeTypeInfo)
     {
-        rwFlags |= rw_compress;
-        partStream.setown(createRowStream(iFile, activity.queryProjectedDiskRowInterfaces(), rwFlags, activity.eexp, translator, this));
-        if (!partStream.get())
+        for (unsigned copy=0; copy<partDesc->numCopies(); copy++)
         {
-            if (!blockCompressed)
-                throw MakeStringException(-1,"Unsupported compressed file format: %s", filename.get());
-            else
-                throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
+            RemoteFilename rfn;
+            partDesc->getFilename(copy, rfn);
+
+            StringBuffer path;
+            if (!rfn.isLocal() || testForceRemote(rfn.getLocalPath(path)))
+            {
+                // Open a stream from remote file, having passed actual, expected, projected, and filters to it
+                SocketEndpoint ep(rfn.queryEndpoint());
+                setDafsEndpointPort(ep);
+
+                RowFilter actualFilter;
+                if (activity.fieldFilters.ordinality())
+                {
+                    if (actualFormat != expectedFormat && translator->queryKeyedTranslator())
+                        translator->queryKeyedTranslator()->translate(actualFilter, activity.fieldFilters);
+                    else
+                        actualFilter.appendFilters(activity.fieldFilters);
+                }
+                Owned<IFileIO> iFileIO = createRemoteFilteredFile(ep, path, actualFormat, projectedFormat, actualFilter, compressed, activity.grouped, activity.remoteLimit);
+                if (iFileIO)
+                {
+                    rfn.getPath(path);
+                    filename.set(path);
+                    checkFileCrc = false;
+
+                    // JCSMORE - needTransform - see CDiskPartHandler::nextRow(), may need/want to differentiate if only transform is only remote
+
+                    partStream.setown(createRowStreamEx(iFileIO, activity.queryProjectedDiskRowInterfaces(), 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, nullptr, this));
+                    ActPrintLog(&activity, "%s[part=%d]: reading remote dafilesrv file '%s' (logical file = %s)", kindStr, which, path.str(), activity.logicalFilename.get());
+                    break;
+                }
+            }
         }
     }
-    else
-        partStream.setown(createRowStream(iFile, activity.queryProjectedDiskRowInterfaces(), rwFlags, nullptr, translator, this));
 
     if (!partStream)
-        throw MakeActivityException(&activity, 0, "Failed to open file '%s'", filename.get());
-    ActPrintLog(&activity, "%s[part=%d]: %s (%s)", kindStr, which, activity.isFixedDiskWidth ? "fixed" : "variable", filename.get());
-    if (activity.isFixedDiskWidth) 
     {
-        if (!compressed || blockCompressed)
+        CDiskPartHandlerBase::open(); // NB: base opens an IFile
+
+        rwFlags |= DEFAULT_RWFLAGS;
+
+        if (compressed)
         {
-            unsigned fixedSize = activity.diskRowMinSz;
-            if (partDesc->queryProperties().hasProp("@size"))
+            rwFlags |= rw_compress;
+            partStream.setown(createRowStream(iFile, activity.queryProjectedDiskRowInterfaces(), rwFlags, activity.eexp, translator, this));
+            if (!partStream.get())
             {
-                offset_t lsize = partDesc->queryProperties().getPropInt64("@size");
-                if (0 != lsize % fixedSize)
-                    throw MakeActivityException(&activity, TE_BadFileLength, "Fixed length file %s [DFS size=%" I64F "d] is not a multiple of fixed record size : %d", filename.get(), lsize, fixedSize);
+                if (!blockCompressed)
+                    throw MakeStringException(-1,"Unsupported compressed file format: %s", filename.get());
+                else
+                    throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
             }
         }
-    }
+        else
+            partStream.setown(createRowStream(iFile, activity.queryProjectedDiskRowInterfaces(), rwFlags, nullptr, translator, this));
 
-    partStream->setFilters(activity.fieldFilters);
+        if (!partStream)
+            throw MakeActivityException(&activity, 0, "Failed to open file '%s'", filename.get());
+        ActPrintLog(&activity, "%s[part=%d]: %s (%s)", kindStr, which, activity.isFixedDiskWidth ? "fixed" : "variable", filename.get());
+        if (activity.isFixedDiskWidth)
+        {
+            if (!compressed || blockCompressed)
+            {
+                unsigned fixedSize = activity.diskRowMinSz;
+                if (partDesc->queryProperties().hasProp("@size"))
+                {
+                    offset_t lsize = partDesc->queryProperties().getPropInt64("@size");
+                    if (0 != lsize % fixedSize)
+                        throw MakeActivityException(&activity, TE_BadFileLength, "Fixed length file %s [DFS size=%" I64F "d] is not a multiple of fixed record size : %d", filename.get(), lsize, fixedSize);
+                }
+            }
+        }
+
+        partStream->setFilters(activity.fieldFilters);
+    }
 
     {
         CriticalBlock block(statsCs);
@@ -436,8 +492,6 @@ public:
 
 public:
     bool unsorted = false, countSent = false;
-    rowcount_t limit = 0;
-    rowcount_t stopAfter = 0;
     IRowStream *out = nullptr;
 
     IHThorDiskReadArg *helper;
@@ -516,6 +570,12 @@ public:
         else
             limit = (rowcount_t)helper->getRowLimit();
         stopAfter = (rowcount_t)helper->getChooseNLimit();
+        if (!helper->transformMayFilter())
+        {
+            remoteLimit = stopAfter;
+            if (limit && (limit < remoteLimit))
+                remoteLimit = limit+1; // 1 more to ensure triggered when received back. // JCSMORE remote side could handle skip too..
+        }
         out = createSequentialPartHandler(partHandler, partDescs, grouped); // **
     }
     virtual bool isGrouped() const override { return grouped; }
@@ -542,7 +602,8 @@ public:
         rowcount_t c = getDataLinkCount();
         if (stopAfter && (c >= stopAfter))  // NB: only slave limiter, global performed in chained choosen activity
             return NULL;
-        if (c >= limit) { // NB: only slave limiter, global performed in chained limit activity
+        if (c >= limit) // NB: only slave limiter, global performed in chained limit activity
+        {
             helper->onLimitExceeded();
             return NULL;
         }
@@ -627,8 +688,6 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord
     };
 
     IHThorDiskNormalizeArg *helper;
-    rowcount_t limit = 0;
-    rowcount_t stopAfter = 0;
     IRowStream *out = nullptr;
 
 public:
@@ -698,7 +757,8 @@ public:
         rowcount_t c = getDataLinkCount();
         if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity
             return NULL;
-        if (c >= limit) { // NB: only slave limiter, global performed in chained limit activity
+        if (c >= limit) // NB: only slave limiter, global performed in chained limit activity
+        {
             helper->onLimitExceeded();
             return NULL;
         }
@@ -872,7 +932,7 @@ class CDiskCountSlave : public CDiskReadSlaveActivityRecord
     typedef CDiskReadSlaveActivityRecord PARENT;
 
     IHThorDiskCountArg *helper;
-    rowcount_t stopAfter = 0, preknownTotalCount = 0;
+    rowcount_t preknownTotalCount = 0;
     bool eoi = false, totalCountKnown = false;
 
 public:
@@ -914,6 +974,8 @@ public:
         ActivityTimer s(totalCycles, timeActivities);
         CDiskReadSlaveActivityRecord::start();
         stopAfter = (rowcount_t)helper->getChooseNLimit();
+        if (!helper->hasFilter())
+            remoteLimit = stopAfter;
         eoi = false;
         if (!helper->canMatchAny())
         {