Procházet zdrojové kódy

HPCC-22278 Introduce a new hthor disk read activity

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday před 6 roky
rodič
revize
1b07322b8c

+ 3 - 0
common/thorhelper/CMakeLists.txt

@@ -33,6 +33,7 @@ set (    SRCS
          thorfile.cpp 
          thorparse.cpp 
          thorpipe.cpp 
+         thorread.cpp
          thorrparse.cpp 
          thorsoapcall.cpp 
          thorstats.cpp
@@ -54,6 +55,7 @@ set (    SRCS
          thorfile.hpp 
          thorparse.hpp 
          thorpipe.hpp 
+         thorread.hpp
          thorrparse.hpp 
          thorsoapcall.hpp 
          thorstats.hpp
@@ -96,6 +98,7 @@ include_directories (
          ./../../testing/unittests
          ./../../system/tbb_sm/tbb/include
          ./../../system/security/shared
+         ${HPCC_SOURCE_DIR}/fs/dafsclient
     )
 
 ADD_DEFINITIONS( -DTHORHELPER_EXPORTS -D_USRDLL )

+ 1 - 0
common/thorhelper/commonext.cpp

@@ -211,6 +211,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKspillread] = "spillread" ;
     kindArray[TAKspillwrite] = "spillwrite" ;
     kindArray[TAKnwaydistribute] = "nwaydistribute";
+    kindArray[TAKnewdiskread] = "newdiskread";
 
 //Non standard
     kindArray[TAKsubgraph] = "subgraph";

+ 2 - 0
common/thorhelper/thorcommon.cpp

@@ -819,6 +819,7 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKspillread:              return "Spill Read";
     case TAKspillwrite:             return "Spill Write";
     case TAKnwaydistribute:         return "Nway Distribute";
+    case TAKnewdiskread:            return "Disk Read";
     }
     throwUnexpected();
 }
@@ -2097,6 +2098,7 @@ static bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<con
                 if (keyedTranslator && (sourceFormat != expectedFormat))
                 {
                     Owned<const IKeyTranslator> _keyedTranslator = createKeyTranslator(sourceFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true));
+                    //MORE: What happens if the key filters cannot be translated?
                     if (_keyedTranslator->needsTranslate())
                         keyedTranslator->swap(_keyedTranslator);
                 }

+ 805 - 0
common/thorhelper/thorread.cpp

@@ -0,0 +1,805 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "jliball.hpp"
+
+#include "thorfile.hpp"
+
+#include "eclhelper.hpp"
+#include "eclrtl.hpp"
+#include "eclrtl_imp.hpp"
+#include "rtlfield.hpp"
+#include "rtlds_imp.hpp"
+#include "rtldynfield.hpp"
+
+#include "rmtclient.hpp"
+#include "rmtfile.hpp"
+
+#include "thorread.hpp"
+#include "rtlcommon.hpp"
+#include "thorcommon.hpp"
+
+//---------------------------------------------------------------------------------------------------------------------
+
+constexpr size32_t defaultReadBufferSize = 0x10000;
+
+class DiskRowReader : extends CInterfaceOf<IAllocRowStream>, implements IRawRowStream, implements IDiskRowReader, implements IThorDiskCallback
+{
+public:
+    DiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+    IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IAllocRowStream>)
+
+    virtual IRawRowStream * queryRawRowStream() override;
+    virtual IAllocRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override;
+
+    virtual bool getCursor(MemoryBuffer & cursor) override;
+    virtual void setCursor(MemoryBuffer & cursor) override;
+    virtual void stop() override;
+
+    virtual void clearInput() override;
+    virtual bool matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options) override;
+
+// IThorDiskCallback
+    virtual offset_t getFilePosition(const void * row) override;
+    virtual offset_t getLocalFilePosition(const void * row) override;
+    virtual const char * queryLogicalFilename(const void * row) override;
+    virtual const byte * lookupBlob(unsigned __int64 id) override { UNIMPLEMENTED; }
+
+
+protected:
+    offset_t getLocalOffset();
+
+protected:
+    Owned<ISerialStream> input;
+    Owned<IFileIO> inputfileio;
+    CThorContiguousRowBuffer inputBuffer;
+    Owned<IEngineRowAllocator> outputAllocator;
+    Owned<const IDynamicTransform> translator;
+    Owned<const IKeyTranslator> keyedTranslator;
+    Linked<IOutputMetaData> expectedDiskMeta = nullptr;
+    Linked<IOutputMetaData> projectedDiskMeta = nullptr;
+    Linked<IOutputMetaData> actualDiskMeta = nullptr;
+    unsigned expectedCrc = 0;
+    unsigned projectedCrc = 0;
+    unsigned actualCrc = 0;
+    MemoryBuffer encryptionKey;
+    size32_t readBufferSize = defaultReadBufferSize;
+    bool grouped = false;
+    bool stranded = false;
+    bool compressed = false;
+    bool blockcompressed = false;
+    bool rowcompressed = false;
+
+//The following refer to the current input file:
+    offset_t fileBaseOffset = 0;
+    StringAttr logicalFilename;
+    unsigned filePart = 0;
+};
+
+
+DiskRowReader::DiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+: expectedCrc(_expectedCrc), expectedDiskMeta(&_expected), projectedCrc(_projectedCrc), projectedDiskMeta(&_projected), actualCrc(_actualCrc), actualDiskMeta(&_actual)
+{
+    assertex(options);
+
+    //Not sure this should really be being passed in here...
+    RecordTranslationMode translationMode = (RecordTranslationMode)options->getPropInt("translationMode", (int)RecordTranslationMode::All);
+    //MORE: HPCC-22287 This is too late to be able to do something different if the keyed filters cannot be translated.
+    getTranslators(translator, keyedTranslator, "BinaryDiskRowReader", expectedCrc, expectedDiskMeta, actualCrc, actualDiskMeta, projectedCrc, projectedDiskMeta, translationMode);
+
+    //Options contain information that is the same for each file that is being read, and potentially expensive to reconfigure.
+    if (options->hasProp("encryptionKey"))
+    {
+        encryptionKey.resetBuffer();
+        options->getPropBin("encryptionKey", encryptionKey);
+    }
+    readBufferSize = options->getPropInt("readBufferSize", defaultReadBufferSize);
+}
+
+IRawRowStream * DiskRowReader::queryRawRowStream()
+{
+    return this;
+}
+
+IAllocRowStream * DiskRowReader::queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator)
+{
+    outputAllocator.set(_outputAllocator);
+    return this;
+}
+
+void DiskRowReader::clearInput()
+{
+    inputBuffer.setStream(nullptr);
+    input.clear();
+}
+
+bool DiskRowReader::matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if ((expectedCrc != _expectedCrc) || (projectedCrc != _projectedCrc) || (actualCrc != _actualCrc))
+        return false;
+
+    //MORE: Check translation mode
+
+    //MORE: Is the previous check sufficient?  If not, once getDaliLayoutInfo is cached the following line could be enabled.
+    //if ((expectedDiskMeta != &_expected) || (projectedDiskMeta != &_projected) || (actualDiskMeta != &_actual))
+    //    return false;
+
+    if (options->hasProp("encryptionKey"))
+    {
+        MemoryBuffer tempEncryptionKey;
+        options->getPropBin("encryptionKey", tempEncryptionKey);
+        if (!encryptionKey.matches(tempEncryptionKey))
+            return false;
+    }
+    if (readBufferSize != options->getPropInt("readBufferSize", defaultReadBufferSize))
+        return false;
+    return true;
+}
+
+
+bool DiskRowReader::getCursor(MemoryBuffer & cursor)
+{
+    return true;
+}
+
+void DiskRowReader::setCursor(MemoryBuffer & cursor)
+{
+}
+
+void DiskRowReader::stop()
+{
+}
+
+
+// IThorDiskCallback
+offset_t DiskRowReader::getFilePosition(const void * row)
+{
+    return getLocalOffset() + fileBaseOffset;
+}
+
+offset_t DiskRowReader::getLocalFilePosition(const void * row)
+{
+    return makeLocalFposOffset(filePart, getLocalOffset());
+}
+
+const char * DiskRowReader::queryLogicalFilename(const void * row)
+{
+    return logicalFilename;
+}
+
+offset_t DiskRowReader::getLocalOffset()
+{
+    return inputBuffer.tell();
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class LocalDiskRowReader : public DiskRowReader
+{
+public:
+    LocalDiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+
+    virtual bool matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options) override;
+    virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
+    virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
+
+protected:
+    virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter);
+
+protected:
+    IConstArrayOf<IFieldFilter> expectedFilter;  // These refer to the expected layout
+};
+
+
+LocalDiskRowReader::LocalDiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+: DiskRowReader(_expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options)
+{
+}
+
+bool LocalDiskRowReader::matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if (streamRemote)
+        return false;
+    return DiskRowReader::matches(format, streamRemote, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+}
+
+
+bool LocalDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & _expectedFilter)
+{
+    assertex(meta);
+    grouped = meta->getPropBool("grouped");
+    compressed = meta->getPropBool("compressed", false);
+    blockcompressed = meta->getPropBool("blockCompressed", false);
+    bool forceCompressed = meta->getPropBool("forceCompressed", false);
+
+    logicalFilename.set(_logicalFilename);
+    filePart = _partNumber;
+    fileBaseOffset = _baseOffset;
+
+    try
+    {
+        if (!inputFile->exists())
+            return false;
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, "DiskReadStage::setInputFile()");
+        e->Release();
+        return false;
+    }
+
+    size32_t dfsRecordSize = meta->getPropInt("dfsRecordSize");
+    size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
+    if (dfsRecordSize)
+    {
+        if (fixedDiskRecordSize)
+        {
+            if (grouped)
+                fixedDiskRecordSize++;
+            if (!((dfsRecordSize == fixedDiskRecordSize) || (grouped && (dfsRecordSize+1 == fixedDiskRecordSize)))) //last for backwards compatibility, as hthor used to publish @recordSize not including the grouping byte
+                throw MakeStringException(0, "Published record size %d for file %s does not match coded record size %d", dfsRecordSize, logicalFilename.str(), fixedDiskRecordSize);
+
+            if (!compressed && forceCompressed && (fixedDiskRecordSize >= MIN_ROWCOMPRESS_RECSIZE))
+            {
+                StringBuffer msg;
+                msg.append("Ignoring compression attribute on file ").append(logicalFilename.str()).append(", which is not published as compressed");
+                WARNLOG("%s", msg.str());
+                //MORE: No simple way to do this, unless we are passed an engine context:
+                //agent.addWuException(msg.str(), WRN_MismatchCompressInfo, SeverityWarning, "hthor");
+                compressed = true;
+            }
+        }
+    }
+    else
+    {
+        if (!compressed && forceCompressed)
+        {
+            if ((fixedDiskRecordSize == 0) || (fixedDiskRecordSize + (grouped?1:0) >= MIN_ROWCOMPRESS_RECSIZE))
+                compressed = true;
+        }
+    }
+
+    rowcompressed = false;
+    if (compressed)
+    {
+        Owned<IExpander> eexp;
+        if (encryptionKey.length()!=0)
+            eexp.setown(createAESExpander256((size32_t)encryptionKey.length(),encryptionKey.bufferBase()));
+        inputfileio.setown(createCompressedFileReader(inputFile,eexp));
+        if(!inputfileio && !blockcompressed) //fall back to old decompression, unless dfs marked as new
+        {
+            inputfileio.setown(inputFile->open(IFOread));
+            if(inputfileio)
+                rowcompressed = true;
+        }
+    }
+    else
+        inputfileio.setown(inputFile->open(IFOread));
+    if (!inputfileio)
+        return false;
+
+    expectedFilter.clear();
+    ForEachItemIn(i, _expectedFilter)
+        expectedFilter.append(OLINK(_expectedFilter.item(i)));
+    return true;
+}
+
+bool LocalDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
+{
+    Owned<IFile> inputFile = createIFile(localFilename);
+    return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
+}
+
+bool LocalDiskRowReader::setInputFile(const RemoteFilename & filename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
+{
+    Owned<IFile> inputFile = createIFile(filename);
+    return setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter);
+}
+
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class BinaryDiskRowReader : public LocalDiskRowReader
+{
+public:
+    BinaryDiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+
+    virtual const void *nextRow() override;
+    virtual const void *nextRow(size32_t & resultSize) override;
+    virtual bool getCursor(MemoryBuffer & cursor) override;
+    virtual void setCursor(MemoryBuffer & cursor) override;
+    virtual void stop() override;
+
+    virtual void clearInput() override;
+    virtual bool matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options) override;
+
+protected:
+    virtual bool setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
+
+    inline bool segMonitorsMatch(const void * buffer)
+    {
+        if (actualFilter.numFilterFields())
+        {
+            const RtlRecord &actual = actualDiskMeta->queryRecordAccessor(true);
+            unsigned numOffsets = actual.getNumVarFields() + 1;
+            size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
+            RtlRow row(actual, nullptr, numOffsets, variableOffsets);
+            row.setRow(buffer, 0);  // Use lazy offset calculation
+            return actualFilter.matches(row);
+        }
+        else
+            return true;
+    }
+
+    size32_t getFixedDiskRecordSize();
+
+protected:
+    MemoryBuffer tempOutputBuffer;
+    MemoryBufferBuilder bufferBuilder;
+    ISourceRowPrefetcher * actualRowPrefetcher = nullptr;
+    RowFilter actualFilter;               // This refers to the actual disk layout
+    bool eogPending = false;
+    bool needToTranslate;
+};
+
+
+BinaryDiskRowReader::BinaryDiskRowReader(unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+: LocalDiskRowReader(_expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options), bufferBuilder(tempOutputBuffer, 0)
+{
+    actualRowPrefetcher = actualDiskMeta->createDiskPrefetcher();
+    needToTranslate = (translator && translator->needsTranslate());
+}
+
+
+void BinaryDiskRowReader::clearInput()
+{
+    DiskRowReader::clearInput();
+    eogPending = false;
+}
+
+bool BinaryDiskRowReader::matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if (!strieq(format, "thor") && !strieq(format, "flat"))
+        return false;
+    return LocalDiskRowReader::matches(format, streamRemote, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+}
+
+bool BinaryDiskRowReader::setInputFile(IFile * inputFile, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
+{
+    if (!LocalDiskRowReader::setInputFile(inputFile, _logicalFilename, _partNumber, _baseOffset, meta, expectedFilter))
+        return false;
+
+    actualFilter.clear();
+    actualFilter.appendFilters(expectedFilter);
+    if (keyedTranslator)
+        keyedTranslator->translate(actualFilter);
+
+    unsigned __int64 filesize = inputfileio->size();
+    if (!compressed && getFixedDiskRecordSize() && ((offset_t)-1 != filesize) && (filesize % getFixedDiskRecordSize()) != 0)
+    {
+        StringBuffer s;
+        s.append("File ").append(inputFile->queryFilename()).append(" size is ").append(filesize).append(" which is not a multiple of ").append(getFixedDiskRecordSize());
+        throw makeStringException(MSGAUD_user, 1, s.str());
+    }
+
+    //MORE: Allow a previously created input stream to be reused to avoid reallocating the buffer
+    input.setown(createFileSerialStream(inputfileio, 0, filesize, readBufferSize));
+
+    inputBuffer.setStream(input);
+    eogPending = false;
+    return true;
+}
+
+//Implementation of IAllocRowStream
+const void *BinaryDiskRowReader::nextRow()
+{
+    for (;;)
+    {
+        //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
+        //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
+        //Multiple eogs should also be harmless if the engines switch to this representation.
+        if (eogPending)
+        {
+            eogPending = false;
+            return eogRow;
+        }
+
+        inputBuffer.finishedRow();
+        if (inputBuffer.eos())
+            return eofRow;
+
+        //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
+        //Is there a better way storing this (and combining it with the eog markers)?
+        if (stranded)
+        {
+            bool eosPending;
+            inputBuffer.read(eosPending);
+            if (eosPending)
+                return eosRow;
+
+            //Call finishRow() so it is not included in the row pointer.  This should be special cased in the base class
+            inputBuffer.finishedRow();
+            if (inputBuffer.eos())
+                return eofRow;
+        }
+
+        actualRowPrefetcher->readAhead(inputBuffer);
+        size32_t sizeRead = inputBuffer.queryRowSize();
+        if (grouped)
+            inputBuffer.read(eogPending);
+        const byte * next = inputBuffer.queryRow();
+
+        if (likely(segMonitorsMatch(next))) // NOTE - keyed fields are checked pre-translation
+        {
+            if (needToTranslate)
+            {
+                RtlDynamicRowBuilder builder(outputAllocator);  // MORE: Make this into a member to reduce overhead
+                size32_t size = translator->translate(builder, *this, next);
+                return builder.finalizeRowClear(size);
+            }
+            else
+            {
+                size32_t allocatedSize;
+                void * result = outputAllocator->createRow(sizeRead, allocatedSize);
+                memcpy(result, next, sizeRead);
+                return outputAllocator->finalizeRow(sizeRead, result, allocatedSize);
+            }
+        }
+    }
+}
+
+
+//Implementation of IRawRowStream
+const void *BinaryDiskRowReader::nextRow(size32_t & resultSize)
+{
+    //Similar to above, except the code at the end will translate to a local buffer or return the pointer
+    for (;;)
+    {
+        //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
+        //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
+        //Multiple eogs should also be harmless if the engines switch to this representation.
+        if (eogPending)
+        {
+            eogPending = false;
+            return eogRow;
+        }
+
+        inputBuffer.finishedRow();
+        if (inputBuffer.eos())
+            return eofRow;
+
+        //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
+        //Is there a better way storing this (and combining it with the eog markers)?
+        if (stranded)
+        {
+            bool eosPending;
+            inputBuffer.read(eosPending);
+            if (eosPending)
+                return eosRow;
+
+            //Call finishRow() so it is not included in the row pointer.  This should be special cased in the base class
+            inputBuffer.finishedRow();
+            if (inputBuffer.eos())
+                return eofRow;
+        }
+
+        actualRowPrefetcher->readAhead(inputBuffer);
+        size32_t sizeRead = inputBuffer.queryRowSize();
+        if (grouped)
+            inputBuffer.read(eogPending);
+        const byte * next = inputBuffer.queryRow();
+
+        if (likely(segMonitorsMatch(next))) // NOTE - keyed fields are checked pre-translation
+        {
+            if (needToTranslate)
+            {
+                //MORE: optimize the case where fields are lost off the end, and not bother translating - but return the modified size.
+                tempOutputBuffer.clear();
+                resultSize = translator->translate(bufferBuilder, *this, next);
+                const void * ret = bufferBuilder.getSelf();
+                bufferBuilder.finishRow(resultSize);
+                return ret;
+            }
+            else
+            {
+                resultSize = sizeRead;
+                return next;
+            }
+        }
+    }
+}
+
+
+//Common to IAllocRowStream and IRawRowStream
+bool BinaryDiskRowReader::getCursor(MemoryBuffer & cursor)
+{
+    //Is the following needed?
+    inputBuffer.finishedRow();
+    //save position, eog if grouped, and anything else that is required.
+    return false;
+}
+
+void BinaryDiskRowReader::setCursor(MemoryBuffer & cursor)
+{
+}
+
+void BinaryDiskRowReader::stop()
+{
+}
+
+
+// IDiskRowReader
+
+size32_t BinaryDiskRowReader::getFixedDiskRecordSize()
+{
+    size32_t fixedDiskRecordSize = actualDiskMeta->getFixedSize();
+    if (fixedDiskRecordSize && grouped)
+        fixedDiskRecordSize += 1;
+    return fixedDiskRecordSize;
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+
+class RemoteDiskRowReader : public LocalDiskRowReader
+{
+public:
+    RemoteDiskRowReader(const char * _format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+
+    virtual const void *nextRow() override;
+    virtual const void *nextRow(size32_t & resultSize) override;
+    virtual bool getCursor(MemoryBuffer & cursor) override;
+    virtual void setCursor(MemoryBuffer & cursor) override;
+    virtual void stop() override;
+
+    virtual void clearInput() override;
+    virtual bool matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options) override;
+
+// IDiskRowReader
+    virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
+    virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) override;
+
+protected:
+    ISourceRowPrefetcher * projectedRowPrefetcher = nullptr;
+    StringAttr format;
+    RecordTranslationMode translationMode;
+    bool eogPending = false;
+};
+
+
+RemoteDiskRowReader::RemoteDiskRowReader(const char * _format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+: LocalDiskRowReader(_expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options), format(_format)
+{
+    translationMode = (RecordTranslationMode)options->getPropInt("translationMode", (int)RecordTranslationMode::All);
+    projectedRowPrefetcher = projectedDiskMeta->createDiskPrefetcher();
+}
+
+void RemoteDiskRowReader::clearInput()
+{
+    DiskRowReader::clearInput();
+    eogPending = false;
+}
+
+bool RemoteDiskRowReader::matches(const char * _format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if (!streamRemote)
+        return false;
+    if (!strieq(format, _format))
+        return false;
+    return LocalDiskRowReader::matches(format, streamRemote, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+}
+
+bool RemoteDiskRowReader::setInputFile(const RemoteFilename & rfilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilters)
+{
+    // NB: only binary handles can be remotely processed by dafilesrv at the moment
+
+    // Open a stream from remote file, having passed actual, expected, projected, and filters to it
+    SocketEndpoint ep(rfilename.queryEndpoint());
+    setDafsEndpointPort(ep);
+
+    StringBuffer localPath;
+    rfilename.getLocalPath(localPath);
+
+    RowFilter actualFilter;
+    actualFilter.appendFilters(expectedFilters);
+
+    if (keyedTranslator)
+        keyedTranslator->translate(actualFilter);
+
+    //MORE: This needs to be passed to this function - either in the meta or another parameter
+    unsigned __int64 remoteLimit = 0;
+    //MORE: Need to serialize the translation mode..
+    Owned<IRemoteFileIO> remoteFileIO = createRemoteFilteredFile(ep, localPath, actualDiskMeta, projectedDiskMeta, actualFilter, compressed, grouped, remoteLimit);
+    if (remoteFileIO)
+    {
+        StringBuffer tmp;
+        remoteFileIO->addVirtualFieldMapping("logicalFilename", _logicalFilename);
+        remoteFileIO->addVirtualFieldMapping("baseFpos", tmp.clear().append(_baseOffset).str());
+        remoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(_partNumber).str());
+
+        try
+        {
+            remoteFileIO->ensureAvailable(); // force open now, because want to failover to other copies or legacy if fails
+        }
+        catch (IException *e)
+        {
+    #ifdef _DEBUG
+            EXCLOG(e, nullptr);
+    #endif
+            e->Release();
+            return false;
+        }
+
+        Owned<IFile> iFile = createIFile(rfilename);
+
+        // remote side does projection/translation/filtering
+        inputfileio.setown(remoteFileIO.getClear());
+        if (!inputfileio)
+            return false;
+    }
+
+    //MORE: Allow a previously created input stream to be reused to avoid reallocating the buffer
+    input.setown(createFileSerialStream(inputfileio, 0, (offset_t)-1, readBufferSize));
+
+    inputBuffer.setStream(input);
+    eogPending = false;
+    return true;
+}
+
+bool RemoteDiskRowReader::setInputFile(const char * localFilename, const char * _logicalFilename, unsigned _partNumber, offset_t _baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter)
+{
+    throwUnexpected();
+}
+
+
+
+//Implementation of IAllocRowStream
+const void *RemoteDiskRowReader::nextRow()
+{
+    for (;;)
+    {
+        //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
+        //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
+        //Multiple eogs should also be harmless if the engines switch to this representation.
+        if (eogPending)
+        {
+            eogPending = false;
+            return eogRow;
+        }
+
+        inputBuffer.finishedRow();
+        if (inputBuffer.eos())
+            return eofRow;
+
+        //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
+        //Is there a better way storing this (and combining it with the eog markers)?
+        if (stranded)
+        {
+            bool eosPending;
+            inputBuffer.read(eosPending);
+            if (eosPending)
+                return eosRow;
+
+            //Call finishRow() so it is not included in the row pointer.  This should be special cased in the base class
+            inputBuffer.finishedRow();
+            if (inputBuffer.eos())
+                return eofRow;
+        }
+
+        projectedRowPrefetcher->readAhead(inputBuffer);
+        size32_t sizeRead = inputBuffer.queryRowSize();
+        if (grouped)
+            inputBuffer.read(eogPending);
+        const byte * next = inputBuffer.queryRow();
+
+        size32_t allocatedSize;
+        void * result = outputAllocator->createRow(sizeRead, allocatedSize);
+        memcpy(result, next, sizeRead);
+        return outputAllocator->finalizeRow(sizeRead, result, allocatedSize);
+    }
+}
+
+
+//Implementation of IRawRowStream
+const void *RemoteDiskRowReader::nextRow(size32_t & resultSize)
+{
+    //Similar to above, except the code at the end will translate to a local buffer or return the pointer
+    for (;;)
+    {
+        //This may return multiple eog in a row with no intervening records - e.g. if all stripped by keyed filter.
+        //It is up to the caller to filter duplicates (to avoid the overhead of multiple pieces of code checking)
+        //Multiple eogs should also be harmless if the engines switch to this representation.
+        if (eogPending)
+        {
+            eogPending = false;
+            return eogRow;
+        }
+
+        inputBuffer.finishedRow();
+        if (inputBuffer.eos())
+            return eofRow;
+
+        //Currently each row in a stranded file contains a flag to indicate if the next is an end of strand.
+        //Is there a better way storing this (and combining it with the eog markers)?
+        if (stranded)
+        {
+            bool eosPending;
+            inputBuffer.read(eosPending);
+            if (eosPending)
+                return eosRow;
+
+            //Call finishRow() so it is not included in the row pointer.  This should be special cased in the base class
+            inputBuffer.finishedRow();
+            if (inputBuffer.eos())
+                return eofRow;
+        }
+
+        projectedRowPrefetcher->readAhead(inputBuffer);
+        size32_t sizeRead = inputBuffer.queryRowSize();
+        if (grouped)
+            inputBuffer.read(eogPending);
+        const byte * next = inputBuffer.queryRow();
+        resultSize = sizeRead;
+        return next;
+    }
+}
+
+
+//Common to IAllocRowStream and IRawRowStream
+bool RemoteDiskRowReader::getCursor(MemoryBuffer & cursor)
+{
+    //Is the following needed?
+    inputBuffer.finishedRow();
+    //save position, eog if grouped, and anything else that is required.
+    return false;
+}
+
+void RemoteDiskRowReader::setCursor(MemoryBuffer & cursor)
+{
+}
+
+void RemoteDiskRowReader::stop()
+{
+}
+
+
+///---------------------------------------------------------------------------------------------------------------------
+
+
+IDiskRowReader * createLocalDiskReader(const char * format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if (strieq(format, "thor") || strieq(format, "flat"))
+        return new BinaryDiskRowReader(_expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+
+    UNIMPLEMENTED;
+}
+
+
+IDiskRowReader * createRemoteDiskReader(const char * format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    return new RemoteDiskRowReader(format, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+}
+
+IDiskRowReader * createDiskReader(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options)
+{
+    if (streamRemote)
+        return createRemoteDiskReader(format, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+    else
+        return createLocalDiskReader(format, _expectedCrc, _expected, _projectedCrc, _projected, _actualCrc, _actual, options);
+}

+ 68 - 0
common/thorhelper/thorread.hpp

@@ -0,0 +1,68 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef __THORREAD_HPP_
+#define __THORREAD_HPP_
+
+#ifdef THORHELPER_EXPORTS
+ #define THORHELPER_API DECL_EXPORT
+#else
+ #define THORHELPER_API DECL_IMPORT
+#endif
+
+#include "jrowstream.hpp"
+#include "rtlkey.hpp"
+
+typedef IConstArrayOf<IFieldFilter> FieldFilterArray;
+interface IRowReader : extends IInterface
+{
+public:
+    virtual IRawRowStream * queryRawRowStream() = 0;
+    virtual IAllocRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0;
+};
+
+interface ITranslator;
+interface IDiskRowReader : extends IRowReader
+{
+public:
+    virtual bool matches(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options) = 0;
+
+    //Specify where the raw binary input for a particular file is coming from, together with its actual format.
+    //Does this make sense, or should it be passed a filename?  an actual format?
+    //Needs to specify a filename rather than a ISerialStream so that the interface is consistent for local and remote
+    virtual void clearInput() = 0;
+    virtual bool setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) = 0;
+    virtual bool setInputFile(const RemoteFilename & filename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * meta, const FieldFilterArray & expectedFilter) = 0;
+};
+
+//MORE: These functions have too many parameters - should probably move them into something like the following?:
+class TranslateOptions
+{
+    IOutputMetaData * expected;
+    IOutputMetaData * projected;
+    IOutputMetaData * actual;
+    unsigned expectedCrc;
+    unsigned projectedCrc;
+    unsigned actualCrc;
+};
+
+//Create a row reader for a thor binary file.  The expected, projected, actual and options never change.  The file providing the data can change.
+extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, unsigned _expectedCrc, IOutputMetaData & _expected, unsigned _projectedCrc, IOutputMetaData & _projected, unsigned _actualCrc, IOutputMetaData & _actual, const IPropertyTree * options);
+
+#endif

+ 1 - 0
ecl/eclagent/agentctx.hpp

@@ -120,6 +120,7 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual const char *queryWuid() = 0;
 
     virtual void updateWULogfile() = 0;
+    virtual bool forceNewDiskReadActivity() const = 0;
 };
 
 #endif // AGENTCTX_HPP_INCL

+ 1 - 0
ecl/eclagent/eclagent.cpp

@@ -519,6 +519,7 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
 
     userid.set(wuRead->queryUser());
     useProductionLibraries = wuRead->getDebugValueBool("useProductionLibraries", false);
+    useNewDiskReadActivity = wuRead->getDebugValueBool("useNewDiskReadActivity", agentTopology->getPropBool("@useNewDiskReadActivity", true));
     clusterNames.append(wuRead->queryClusterName());
     clusterWidth = -1;
     abortmonitor = new cAbortMonitor(*this);

+ 7 - 0
ecl/eclagent/eclagent.ipp

@@ -238,6 +238,11 @@ public:
         return ctx->getLayoutTranslationMode();
     }
 
+    virtual bool forceNewDiskReadActivity() const
+    {
+        return ctx->forceNewDiskReadActivity();
+    }
+
 protected:
     IAgentContext * ctx;
 };
@@ -354,6 +359,7 @@ private:
     bool isRemoteWorkunit;
     bool resolveFilesLocally;
     bool writeResultsToStdout;
+    bool useNewDiskReadActivity;
     Owned<IUserDescriptor> standAloneUDesc;
     outputFmts outputFmt;
     unsigned __int64 stopAfter;
@@ -411,6 +417,7 @@ private:
     virtual IDebuggableContext *queryDebugContext() const { return debugContext; };
 
     EclGraph * loadGraph(const char * graphName, IConstWorkUnit * wu, ILoadedDllEntry * dll, bool isLibrary);
+    virtual bool forceNewDiskReadActivity() const { return useNewDiskReadActivity; }
 
     class cAbortMonitor: public Thread, implements IExceptionHandler
     {

+ 11 - 0
ecl/eclagent/eclgraph.cpp

@@ -234,6 +234,8 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKdiskread:
     case TAKspillread:
         return createDiskReadActivity(agent, activityId, subgraphId, (IHThorDiskReadArg &)arg, kind, node);
+    case TAKnewdiskread:
+        return createNewDiskReadActivity(agent, activityId, subgraphId, (IHThorNewDiskReadArg &)arg, kind, node);
     case TAKdisknormalize:
         return createDiskNormalizeActivity(agent, activityId, subgraphId, (IHThorDiskNormalizeArg &)arg, kind, node);
     case TAKdiskaggregate:
@@ -480,6 +482,15 @@ void EclGraphElement::createActivity(IAgentContext & agent, EclSubGraph * owner)
                 }
             }
             arg.setown(createHelper(agent, owner));
+            //Use the new disk read activity unless disabled or the transform uses a virtual field
+            if (((kind == TAKdiskread) || (kind == TAKspillread)) && agent.forceNewDiskReadActivity())
+            {
+                unsigned flags = ((IHThorNewDiskReadArg &)*arg).getFlags();
+                //New activity doesn't currently support virtual callbacks from the transform.  We may want
+                //to implement a new variant to support it without imposing the overhead on the general cases.
+                if ((flags & TDRtransformvirtual) == 0)
+                    kind = TAKnewdiskread;
+            }
             activity.setown(::createActivity(agent, id, subgraph->id, resultsGraph ? resultsGraph->id : 0, kind, isLocal, isGrouped, *arg, node, this));
 
             ForEachItemIn(i2, branches)

+ 26 - 17
ecl/hqlcpp/hqlsource.cpp

@@ -661,7 +661,6 @@ public:
         monitorsForGrouping = false;
         useImplementationClass = false;
         isUnfilteredCount = false;
-        isVirtualLogicalFilenameUsed = false;
         requiresOrderedMerge = false;
         rootSelfRow = NULL;
         activityKind = TAKnone;
@@ -797,7 +796,9 @@ public:
     bool            generateUnfilteredTransform;
     bool            useImplementationClass;
     bool            isUnfilteredCount;
-    bool            isVirtualLogicalFilenameUsed;
+    bool            isVirtualLogicalFilenameUsed = false;
+    bool            transformUsesVirtualLogicalFilename = false;
+    bool            transformUsesVirtualFilePosition = false;
     bool            requiresOrderedMerge;
     bool            newInputMapping;
     bool            extractCanMatch = false;
@@ -806,24 +807,25 @@ protected:
 };
 
 
-struct HQLCPP_API HqlFilePositionDefinedValue : public HqlSimpleDefinedValue
+struct HQLCPP_API MonitoredDefinedValue : public HqlSimpleDefinedValue
 {
 public:
-    HqlFilePositionDefinedValue(SourceBuilder & _builder, IHqlExpression * _original, IHqlExpression * _expr) 
-    : HqlSimpleDefinedValue(_original, _expr), builder(_builder)
+    MonitoredDefinedValue(bool & _usedFlag, IHqlExpression * _original, IHqlExpression * _expr)
+    : HqlSimpleDefinedValue(_original, _expr), usedFlag(_usedFlag)
     { }
 
-    virtual IHqlExpression * queryExpr() const              
-    { 
-        builder.isVirtualLogicalFilenameUsed = true;
+    virtual IHqlExpression * queryExpr() const
+    {
+        usedFlag = true;
         return HqlSimpleDefinedValue::queryExpr();
     }
 
 public:
-    SourceBuilder & builder;
+    bool & usedFlag;
 };
 
 
+
 bool SourceBuilder::isSourceInvariant(IHqlExpression * dataset, IHqlExpression * expr)
 {
     if (containsAssertKeyed(expr))
@@ -1108,19 +1110,19 @@ void SourceBuilder::associateFilePositions(BuildCtx & ctx, const char * provider
     if (fpos)
     {
         Owned<IHqlExpression> fposExpr = createFileposCall(translator, getFilePositionId, provider, rowname);
-        ctx.associateExpr(fpos, fposExpr);
+        ctx.associateOwn(*new MonitoredDefinedValue(transformUsesVirtualFilePosition, fpos, fposExpr));
     }
 
     if (lfpos)
     {
         Owned<IHqlExpression> fposExpr = createFileposCall(translator, getLocalFilePositionId, provider, rowname);
-        ctx.associateExpr(lfpos, fposExpr);
+        ctx.associateOwn(*new MonitoredDefinedValue(transformUsesVirtualFilePosition, lfpos, fposExpr));
     }
 
     if (logicalFilenameMarker)
     {
         Owned<IHqlExpression> nameExpr = createFileposCall(translator, queryLogicalFilenameId, provider, rowname);
-        ctx.associateOwn(*new HqlFilePositionDefinedValue(*this, logicalFilenameMarker, nameExpr));
+        ctx.associateOwn(*new MonitoredDefinedValue(transformUsesVirtualLogicalFilename, logicalFilenameMarker, nameExpr));
     }
 }
 
@@ -1135,14 +1137,17 @@ void SourceBuilder::rebindFilepositons(BuildCtx & ctx, IHqlExpression * dataset,
     {
         OwnedHqlExpr selector = createSelector(side, dataset, selSeq);
         OwnedHqlExpr selectorFpos = getFilepos(selector, isLocal);
-        ctx.associateExpr(selectorFpos, match->queryExpr());
+        ctx.associateOwn(*new MonitoredDefinedValue(transformUsesVirtualFilePosition, selectorFpos, match->queryExpr()));
     }
 }
 
 
 void SourceBuilder::rebindFilepositons(BuildCtx & ctx, IHqlExpression * dataset, node_operator side, IHqlExpression * selSeq)
 {
-    bool savedIsVirtualLogicalFilenameUsed = isVirtualLogicalFilenameUsed;  // don't allow the rebinding to set the flag.
+    // don't allow the rebinding to modify these flags.
+    bool savedVirtualLogicalFilenameUsed = transformUsesVirtualLogicalFilename;
+    bool savedVirtualFilePositionUsed = transformUsesVirtualFilePosition;
+
     rebindFilepositons(ctx, dataset, side, selSeq, true);
     rebindFilepositons(ctx, dataset, side, selSeq, false);
     OwnedHqlExpr searchLogicalFilename = getFileLogicalName(dataset);
@@ -1151,9 +1156,10 @@ void SourceBuilder::rebindFilepositons(BuildCtx & ctx, IHqlExpression * dataset,
     {
         OwnedHqlExpr selector = createSelector(side, dataset, selSeq);
         OwnedHqlExpr selectorLogicalFilename = getFileLogicalName(dataset);
-        ctx.associateOwn(*new HqlFilePositionDefinedValue(*this, selectorLogicalFilename, match->queryExpr()));
+        ctx.associateOwn(*new MonitoredDefinedValue(transformUsesVirtualLogicalFilename, selectorLogicalFilename, match->queryExpr()));
     }
-    isVirtualLogicalFilenameUsed = savedIsVirtualLogicalFilenameUsed;
+    transformUsesVirtualLogicalFilename = savedVirtualLogicalFilenameUsed;
+    transformUsesVirtualFilePosition = savedVirtualFilePositionUsed;
 }
 
 
@@ -2953,7 +2959,10 @@ void DiskReadBuilderBase::buildFlagsMember(IHqlExpression * expr)
     if (!nameExpr->isConstant()) flags.append("|TDXvarfilename");
     if (translator.hasDynamicFilename(tableExpr)) flags.append("|TDXdynamicfilename");
     if (isUnfilteredCount) flags.append("|TDRunfilteredcount");
-    if (isVirtualLogicalFilenameUsed) flags.append("|TDRfilenamecallback");
+    if (isVirtualLogicalFilenameUsed || transformUsesVirtualLogicalFilename)
+        flags.append("|TDRfilenamecallback");
+    if (transformUsesVirtualFilePosition || transformUsesVirtualLogicalFilename)
+        flags.append("|TDRtransformvirtual");
     if (requiresOrderedMerge) flags.append("|TDRorderedmerge");
 
     if (flags.length())

+ 0 - 2
ecl/hqlcpp/hqlsource.ipp

@@ -36,8 +36,6 @@ public:
     void gatherVirtualFields(IHqlExpression * record, bool ignoreVirtuals, bool ensureSerialized);
     bool hasVirtuals()      { return virtuals.ordinality() != 0; }
     bool hasVirtualsOrDeserialize() { return requiresDeserialize || virtuals.ordinality() != 0; }
-    bool needFilePosition() { return virtuals.ordinality() > 0; }
-    bool needFilePosition(bool local);
 
 public:
     HqlExprArray    physicalFields;

+ 671 - 0
ecl/hthor/hthor.cpp

@@ -53,6 +53,7 @@
 #include "ftbase.ipp"
 #include "rtldynfield.hpp"
 #include "rtlnewkey.hpp"
+#include "thorread.hpp"
 
 #define EMPTY_LOOP_LIMIT 1000
 
@@ -10422,6 +10423,674 @@ void CHThorExternalActivity::stop()
 
 //=====================================================================================================
 
+CHThorNewDiskReadBaseActivity::CHThorNewDiskReadBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind, IPropertyTree *_node)
+: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), segHelper(_segHelper)
+{
+    helper.setCallback(this);
+    expectedDiskMeta = helper.queryDiskRecordSize();
+    projectedDiskMeta = helper.queryProjectedDiskRecordSize();
+    readerOptions.setown(createPTree());
+    if (_node)
+    {
+        const char *recordTranslationModeHintText = _node->queryProp("hint[@name='layoutTranslation']/@value");
+        if (recordTranslationModeHintText)
+            recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
+    }
+    readerOptions->setPropInt("translationMode", (int)getLayoutTranslationMode());
+}
+
+CHThorNewDiskReadBaseActivity::~CHThorNewDiskReadBaseActivity()
+{
+    close();
+}
+
+void CHThorNewDiskReadBaseActivity::ready()
+{
+    CHThorActivityBase::ready();
+
+    opened = false;
+    offsetOfPart = 0;
+    partNum = (unsigned)-1;
+
+    resolveFile();
+
+    fieldFilters.kill();
+    segHelper.createSegmentMonitors(this);
+}
+
+void CHThorNewDiskReadBaseActivity::stop()
+{
+    close();
+    CHThorActivityBase::stop();
+}
+
+unsigned __int64 CHThorNewDiskReadBaseActivity::getFilePosition(const void * row)
+{
+    //Ideally these functions would not need to be implemented - they should always be implemented by the translation layer
+    throwUnexpected();
+}
+
+unsigned __int64 CHThorNewDiskReadBaseActivity::getLocalFilePosition(const void * row)
+{
+    throwUnexpected();
+}
+
+const char * CHThorNewDiskReadBaseActivity::queryLogicalFilename(const void * row)
+{
+    throwUnexpected();
+}
+
+void CHThorNewDiskReadBaseActivity::resolveFile()
+{
+    //If in a child query, and the filenames haven't changed, the information about the resolved filenames will also not have changed
+    //MORE: Is this ever untrue?
+    if (subfiles && !(helper.getFlags() & TDXvarfilename))
+        return;
+
+    //Only clear these members if we are re-resolving the file - otherwise the previous entries are still valid
+    ldFile.clear();
+    tempFileName.clear();
+    dfsParts.clear();
+    subfiles.kill();
+
+    OwnedRoxieString fileName(helper.getFileName());
+    mangleHelperFileName(mangledHelperFileName, fileName, agent.queryWuid(), helper.getFlags());
+    if (helper.getFlags() & (TDXtemporary | TDXjobtemp))
+    {
+        StringBuffer mangledFilename;
+        mangleLocalTempFilename(mangledFilename, mangledHelperFileName.str());
+        tempFileName.set(agent.queryTemporaryFile(mangledFilename.str()));
+        logicalFileName = tempFileName.str();
+        gatherInfo(NULL);
+        subfiles.append(*extractFileInformation(nullptr));
+    }
+    else
+    {
+        ldFile.setown(resolveLFNFlat(agent, mangledHelperFileName.str(), "Read", 0 != (helper.getFlags() & TDRoptional)));
+        if ( mangledHelperFileName.charAt(0) == '~')
+            logicalFileName = mangledHelperFileName.str()+1;
+        else
+            logicalFileName = mangledHelperFileName.str();
+
+        if (ldFile)
+        {
+            Owned<IFileDescriptor> fdesc;
+            fdesc.setown(ldFile->getFileDescriptor());
+            gatherInfo(fdesc);
+            IDistributedFile *dFile = ldFile->queryDistributedFile();
+            if (dFile)  //only makes sense for distributed (non local) files
+            {
+                dfsParts.setown(dFile->getIterator());
+                IDistributedSuperFile *super = dFile->querySuperFile();
+                if (super)
+                {
+                    unsigned numsubs = super->numSubFiles(true);
+                    unsigned s=0;
+                    for (; s<numsubs; s++)
+                    {
+                        IDistributedFile &subfile = super->querySubFile(s, true);
+                        subfiles.append(*extractFileInformation(&subfile));
+                    }
+                    assertex(fdesc);
+                    superfile.set(fdesc->querySuperFileDescriptor());
+                }
+                else
+                    subfiles.append(*extractFileInformation(dFile));
+
+                if((helper.getFlags() & (TDXtemporary | TDXjobtemp)) == 0)
+                    agent.logFileAccess(dFile, "HThor", "READ");
+            }
+            else
+                subfiles.append(*extractFileInformation(nullptr));
+        }
+        else
+            subfiles.append(*extractFileInformation(nullptr));
+
+        if (!ldFile)
+        {
+            StringBuffer buff;
+            buff.appendf("Input file '%s' was missing but declared optional", mangledHelperFileName.str());
+            WARNLOG("%s", buff.str());
+            agent.addWuException(buff.str(), WRN_SkipMissingOptFile, SeverityInformation, "hthor");
+        }
+    }
+
+    assertex(subfiles.ordinality() != 0);
+}
+
+void CHThorNewDiskReadBaseActivity::gatherInfo(IFileDescriptor * fileDesc)
+{
+    if (fileDesc)
+    {
+        if (!agent.queryResolveFilesLocally())
+        {
+            grouped = fileDesc->isGrouped();
+            if (grouped != ((helper.getFlags() & TDXgrouped) != 0))
+            {
+                StringBuffer msg;
+                msg.append("DFS and code generated group info. differs: DFS(").append(grouped ? "grouped" : "ungrouped").append("), CodeGen(").append(grouped ? "ungrouped" : "grouped").append("), using DFS info");
+                WARNLOG("%s", msg.str());
+                agent.addWuException(msg.str(), WRN_MismatchGroupInfo, SeverityError, "hthor");
+            }
+        }
+        else
+            grouped = ((helper.getFlags() & TDXgrouped) != 0);
+    }
+    else
+    {
+        grouped = ((helper.getFlags() & TDXgrouped) != 0);
+    }
+}
+
+CHThorNewDiskReadBaseActivity::InputFileInfo * CHThorNewDiskReadBaseActivity::extractFileInformation(IDistributedFile * distributedFile)
+{
+    Owned<IPropertyTree> meta = createPTree();
+    unsigned actualCrc = helper.getDiskFormatCrc();
+    Linked<IOutputMetaData> actualDiskMeta = expectedDiskMeta;
+    bool compressed = false;
+    bool blockcompressed = false;
+
+    if (distributedFile)
+    {
+        const char *kind = queryFileKind(distributedFile);
+        //Do not use the field translation if the file was originally csv/xml - unless explicitly set
+        if (strisame(kind, "flat") || (RecordTranslationMode::AlwaysDisk == getLayoutTranslationMode()))
+        {
+            //Yuk this will be horrible - it needs to cache it for each distributed file
+            //and also common them up if they are the same.
+            IPropertyTree &props = distributedFile->queryAttributes();
+            Owned<IOutputMetaData> publishedMeta = getDaliLayoutInfo(props);
+            if (publishedMeta)
+            {
+                actualDiskMeta.setown(publishedMeta.getClear());
+                actualCrc = props.getPropInt("@formatCrc");
+            }
+
+            size32_t dfsSize = props.getPropInt("@recordSize");
+            if (dfsSize != 0)
+                meta->setPropInt("dfsRecordSize", dfsSize);
+        }
+        compressed = distributedFile->isCompressed(&blockcompressed); //try new decompression, fall back to old unless marked as block
+
+        //Check for encryption key
+        void *k;
+        size32_t kl;
+        helper.getEncryptKey(kl,k);
+        if (kl)
+        {
+            meta->setPropBin("encryptionKey", kl, k);
+            blockcompressed = true;
+            compressed = true;
+        }
+    }
+
+    meta->setPropBool("grouped", grouped);
+    meta->setPropBool("compressed", compressed);
+    meta->setPropBool("blockCompressed", blockcompressed);
+    meta->setPropBool("forceCompressed", (helper.getFlags() & TDXcompress) != 0);
+
+    InputFileInfo & target = * new InputFileInfo;
+    target.file = distributedFile;
+    target.meta.swap(meta);
+    target.actualCrc = actualCrc;
+    target.actualMeta.swap(actualDiskMeta);
+    return &target;
+}
+
+
+void CHThorNewDiskReadBaseActivity::close()
+{
+    closepart();
+    if(ldFile)
+    {
+        IDistributedFile * dFile = ldFile->queryDistributedFile();
+        if(dFile)
+            dFile->setAccessed();
+    }
+}
+
+void CHThorNewDiskReadBaseActivity::closepart()
+{
+    if (activeReader)
+    {
+        activeReader->clearInput();
+        activeReader = nullptr;
+    }
+    logicalFileName = "";
+}
+
+static void saveOrRelease(Owned<IException> & target, IException * e)
+{
+    if (target.get())
+        ::Release(e);
+    else
+        target.setown(e);
+}
+
+static void getFilename(RemoteFilename & rfilename, IDistributedFilePart * curPart, ILocalOrDistributedFile * localFile, unsigned partNum, unsigned copy)
+{
+    if (curPart)
+        curPart->getFilename(rfilename,copy);
+    else
+        localFile->getPartFilename(rfilename,partNum,copy);
+}
+
+bool CHThorNewDiskReadBaseActivity::openFirstPart()
+{
+    partNum = 0;
+    if (dfsParts)       // more should really be fileDesc or something
+    {
+        if (dfsParts->first())
+        {
+            openFilePart(ldFile, &dfsParts->query(), 0);
+            return true;
+        }
+    }
+    else if (ldFile)
+    {
+        if (ldFile->numParts() != 0)
+        {
+            openFilePart(ldFile, nullptr, 0);
+            return true;
+        }
+    }
+    else if (!tempFileName.isEmpty())
+    {
+        openFilePart(tempFileName);
+        return true;
+    }
+    setEmptyStream();
+    return false;
+}
+
+bool CHThorNewDiskReadBaseActivity::openNextPart()
+{
+    if (finishedParts)
+        return false;
+
+    offset_t sizeFilePart = 0;
+    if (dfsParts)
+        sizeFilePart = dfsParts->query().getFileSize(true, false);
+    else if (ldFile)
+        sizeFilePart = ldFile->getPartFileSize(partNum);
+
+    offsetOfPart += sizeFilePart;
+    closepart();
+
+    partNum++;
+    if (dfsParts)
+    {
+        if (dfsParts->next())
+        {
+            openFilePart(ldFile, &dfsParts->query(), partNum);
+            return true;
+        }
+    }
+    else if (ldFile)
+    {
+        if (partNum < ldFile->numParts())
+        {
+            openFilePart(ldFile, nullptr, partNum);
+            return true;
+        }
+    }
+    setEmptyStream();
+    return false;
+}
+
+bool CHThorNewDiskReadBaseActivity::initStream(IDiskRowReader * reader, const char * filename)
+{
+    activeReader = reader;
+    if (useRawStream)
+        rawRowStream = reader->queryRawRowStream();
+    else
+        roxieRowStream = reader->queryAllocatedRowStream(rowAllocator);
+
+    StringBuffer report("Reading file ");
+    report.append(filename);
+    agent.reportProgress(report.str());
+
+    return true;
+}
+
+void CHThorNewDiskReadBaseActivity::setEmptyStream()
+{
+    if (useRawStream)
+        rawRowStream = queryNullRawRowStream();
+    else
+        roxieRowStream = queryNullAllocatedRowStream();
+    finishedParts = true;
+}
+
+IDiskRowReader * CHThorNewDiskReadBaseActivity::ensureRowReader(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, IPropertyTree * options)
+{
+    ForEachItemIn(i, readers)
+    {
+        IDiskRowReader & cur = readers.item(i);
+        if (cur.matches(format, streamRemote, expectedCrc, expected, projectedCrc, projected, actualCrc, actual, options))
+            return &cur;
+    }
+    IDiskRowReader * reader = createDiskReader(format, streamRemote, expectedCrc, expected, projectedCrc, projected, actualCrc, actual, options);
+    readers.append(*reader);
+    return reader;
+}
+
+bool CHThorNewDiskReadBaseActivity::openFilePart(const char * filename)
+{
+    const char * format = "thor";   // more - should extract from the current file (could even mix flat and csv...)
+    InputFileInfo * fileInfo = &subfiles.item(0);
+
+    unsigned expectedCrc = helper.getDiskFormatCrc();
+    unsigned projectedCrc = helper.getProjectedFormatCrc();
+    IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, expectedCrc, *expectedDiskMeta, readerOptions);
+    if (reader->setInputFile(filename, logicalFileName, 0, offsetOfPart, fileInfo->meta, fieldFilters))
+        return initStream(reader, filename);
+    return false;
+}
+
+bool CHThorNewDiskReadBaseActivity::openFilePart(ILocalOrDistributedFile * localFile, IDistributedFilePart * filePart, unsigned whichPart)
+{
+    InputFileInfo * fileInfo = &subfiles.item(0);
+    if (superfile && filePart)
+    {
+        unsigned subfile;
+        unsigned lnum;
+        if (superfile->mapSubPart(partNum, subfile, lnum))
+        {
+            fileInfo = &subfiles.item(subfile);
+            IDistributedFile * distributedFile = fileInfo->file;
+            logicalFileName = distributedFile->queryLogicalName();
+        }
+    }
+
+    unsigned expectedCrc = helper.getDiskFormatCrc();
+    unsigned projectedCrc = helper.getProjectedFormatCrc();
+    unsigned actualCrc = fileInfo->actualCrc;
+    IOutputMetaData * actualDiskMeta = fileInfo->actualMeta;
+
+    bool tryRemoteStream = actualDiskMeta->queryTypeInfo()->canInterpret() && actualDiskMeta->queryTypeInfo()->canSerialize() &&
+                           projectedDiskMeta->queryTypeInfo()->canInterpret() && projectedDiskMeta->queryTypeInfo()->canSerialize();
+
+    /*
+     * If a file part can be accessed local, then read it locally
+     * If a file part supports a remote stream, then use that
+     * Otherwise failover to the legacy remote access.
+     */
+    const char * format = "thor";   // more - should extract from the current file (could even mix flat and csv...)
+    Owned<IException> saveOpenExc;
+    StringBuffer filename, filenamelist;
+    std::vector<unsigned> remoteCandidates;
+
+    // scan for local part 1st
+    //MORE: Order of copies should be optimized at this point....
+    unsigned numCopies = filePart?filePart->numCopies():ldFile->numPartCopies(partNum);
+    for (unsigned copy=0; copy<numCopies; copy++)
+    {
+        RemoteFilename rfn;
+        getFilename(rfn, filePart, localFile, partNum, copy);
+        if (!isRemoteReadCandidate(agent, rfn))
+        {
+            StringBuffer path;
+            rfn.getPath(path);
+            IDiskRowReader * reader = ensureRowReader(format, false, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, readerOptions);
+            if (reader->setInputFile(path.str(), logicalFileName, filePart->getPartIndex(), offsetOfPart, fileInfo->meta, fieldFilters))
+                return initStream(reader, path.str());
+        }
+        else
+            remoteCandidates.push_back(copy);
+    }
+
+    //First try remote streaming, and if that does not succeed, fall back to remote reading.
+    bool allowFallbackToNonStreaming = false;
+    for (;;)
+    {
+        for (unsigned copy: remoteCandidates)
+        {
+            RemoteFilename rfilename;
+            getFilename(rfilename, filePart, localFile, partNum, copy);
+            rfilename.getPath(filename.clear());
+            filenamelist.append('\n').append(filename);
+            try
+            {
+                IDiskRowReader * reader = ensureRowReader(format, tryRemoteStream, expectedCrc, *expectedDiskMeta, projectedCrc, *projectedDiskMeta, actualCrc, *actualDiskMeta, readerOptions);
+                if (reader->setInputFile(rfilename, logicalFileName, filePart->getPartIndex(), offsetOfPart, fileInfo->meta, fieldFilters))
+                    return initStream(reader, filename);
+            }
+            catch (IException *E)
+            {
+                saveOrRelease(saveOpenExc, E);
+            }
+        }
+
+        if (!tryRemoteStream || !allowFallbackToNonStreaming)
+            break;
+        tryRemoteStream = false;
+    }
+
+    if (!(helper.getFlags() & TDRoptional))
+    {
+        StringBuffer s;
+        if (filenamelist)
+        {
+            if (saveOpenExc.get())
+            {
+                if (strstr(mangledHelperFileName.str(),"::>")!=NULL) // if a 'special' filename just use saved exception
+                    saveOpenExc->errorMessage(s);
+                else
+                {
+                    s.append("Could not open logical file ").append(mangledHelperFileName.str()).append(" in any of these locations:").append(filenamelist).append(" (");
+                    saveOpenExc->errorMessage(s).append(")");
+                }
+            }
+            else
+                s.append("Could not open logical file ").append(mangledHelperFileName.str()).append(" in any of these locations:").append(filenamelist).append(" (").append((unsigned)GetLastError()).append(")");
+        }
+        else
+            s.append("Could not open local physical file ").append(filename).append(" (").append((unsigned)GetLastError()).append(")");
+        agent.fail(1, s.str());
+    }
+
+    return false;
+}
+
+
+
+bool CHThorNewDiskReadBaseActivity::openNext()
+{
+    return openNextPart();
+}
+
+void CHThorNewDiskReadBaseActivity::open()
+{
+    assertex(!opened);
+    opened = true;
+    if (!segHelper.canMatchAny())
+    {
+        setEmptyStream();
+    }
+    else
+    {
+        if (!openFirstPart())
+            setEmptyStream();
+    }
+}
+
+void CHThorNewDiskReadBaseActivity::verifyRecordFormatCrc()
+{
+    //MORE: Need to configure based on csv/xml
+    ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), ldFile?ldFile->queryDistributedFile():NULL, false, true);
+}
+
+void CHThorNewDiskReadBaseActivity::append(FFoption option, const IFieldFilter * filter)
+{
+    if (filter->isWild())
+        filter->Release();
+    else
+        fieldFilters.append(*filter);
+}
+
+//=====================================================================================================
+
+CHThorNewDiskReadActivity::CHThorNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &_arg, ThorActivityKind _kind, IPropertyTree *_node)
+: CHThorNewDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _node), helper(_arg), outBuilder(NULL)
+{
+    needTransform = false;
+    lastGroupProcessed = 0;
+    hasMatchFilter = helper.hasMatchFilter();
+    useRawStream = hasMatchFilter || helper.needTransform();
+}
+
+void CHThorNewDiskReadActivity::ready()
+{
+    PARENT::ready();
+    outBuilder.setAllocator(rowAllocator);
+    lastGroupProcessed = processed;
+    needTransform = helper.needTransform() || fieldFilters.length();
+    limit = helper.getRowLimit();
+    if (helper.getFlags() & TDRlimitskips)
+        limit = (unsigned __int64) -1;
+    stopAfter = helper.getChooseNLimit();
+    assertex(stopAfter != 0);
+    if (!helper.transformMayFilter() && !helper.hasMatchFilter())
+        remoteLimit = stopAfter;
+    finishedParts = false;
+}
+
+
+void CHThorNewDiskReadActivity::stop()
+{
+    outBuilder.clear();
+    PARENT::stop();
+}
+
+
+void CHThorNewDiskReadActivity::onLimitExceeded()
+{
+    if ( agent.queryCodeContext()->queryDebugContext())
+        agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
+    helper.onLimitExceeded();
+}
+
+const void *CHThorNewDiskReadActivity::nextRow()
+{
+    //Avoid this check on each row- e.g., initialising streams with a null stream, which returns eof, and falls through to eof processing
+    if (!opened) open();
+
+    // Only check once per row returned.  Potentially means that heavily filtered datasets may wait a long time to check for abort
+    queryUpdateProgress();
+
+    //Avoid this test...  Combine the limit checking with choosen, and have choosen/limit triggering set the
+    //stream to a special no more rows stream so that subsequent calls do not read records.
+    if ((processed - initialProcessed) >= stopAfter)
+        return nullptr;
+
+    try
+    {
+        if (rawRowStream)
+        {
+            for (;;)
+            {
+                //Returns a row in the serialized form of the projected format
+                size32_t nextSize;
+                const byte * next = (const byte *)rawRowStream->nextRow(nextSize);
+                if (!isSpecialRow(next))
+                {
+                    size32_t thisSize = 0;
+                    if (likely(!hasMatchFilter || helper.canMatch(next)))
+                    {
+                        size32_t thisSize = helper.transform(outBuilder.ensureRow(), next);
+                        if (thisSize != 0)
+                        {
+                            if (unlikely((processed - initialProcessed) >= limit))
+                            {
+                                outBuilder.clear();
+                                onLimitExceeded();
+                                return nullptr;
+                            }
+                            processed++;
+                            return outBuilder.finalizeRowClear(thisSize);
+                        }
+                    }
+                }
+                else
+                {
+                    switch (getSpecialRowType(next))
+                    {
+                    case SpecialRow::eof:
+                        if (!openNext())
+                            return next; // i.e. eof
+                        //rawStream will have changed, but it cannot change into a rowStream
+                        break;
+                    case SpecialRow::eos:
+                        return next;
+                    case SpecialRow::eog:
+                        if (processed != lastGroupProcessed)
+                        {
+                            lastGroupProcessed = processed;
+                            //MORE: Change to return next - i.e. an eog marker
+                            return nullptr;
+                        }
+                        break;
+                    default:
+                        throwUnexpected();
+                    }
+                }
+            }
+        }
+        else
+        {
+            //This branch avoids a memcpy from actual to projected followed by a deserialize - since it can map directly
+            //May be more efficient to use this branch if serialized==deserialized and there is a filter, but no transform.
+            //It would be possibel to have two (or more) different implementations, which were created based on
+            //whether there was a limit, a transform etc., but unlikely to save more than a couple of boolean tests.
+            for (;;)
+            {
+                const byte * next = (const byte *)roxieRowStream->nextRow();
+                if (!isSpecialRow(next))
+                {
+                    if (unlikely((processed - initialProcessed) >= limit))
+                    {
+                        ReleaseRoxieRow(next);
+                        onLimitExceeded();
+                        return nullptr;
+                    }
+                    processed++;
+                    return next;
+                }
+                else
+                {
+                    switch (getSpecialRowType(next))
+                    {
+                    case SpecialRow::eof:
+                        if (!openNext())
+                            return next;
+                        //rowStream will have changed
+                        break;
+                    case SpecialRow::eos:
+                        return next;
+                    case SpecialRow::eog:
+                        if (processed != lastGroupProcessed)
+                        {
+                            lastGroupProcessed = processed;
+                            return nullptr;
+                        }
+                        break;
+                    default:
+                        throwUnexpected();
+                    }
+                }
+            }
+        }
+    }
+    catch(IException * e)
+    {
+        throw makeWrappedException(e);
+    }
+    return NULL;
+}
+
+//=====================================================================================================
+
 MAKEFACTORY(DiskWrite);
 MAKEFACTORY(Iterate);
 MAKEFACTORY(Filter);
@@ -10528,6 +11197,8 @@ MAKEFACTORY_EXTRA(DiskGroupAggregate, IPropertyTree *)
 MAKEFACTORY_EXTRA(CsvRead, IPropertyTree *)
 MAKEFACTORY_EXTRA(XmlRead, IPropertyTree *)
 
+MAKEFACTORY_EXTRA(NewDiskRead, IPropertyTree *)
+
 MAKEFACTORY_EXTRA(LocalResultRead, __int64)
 MAKEFACTORY_EXTRA(LocalResultWrite, __int64)
 MAKEFACTORY_EXTRA(DictionaryResultWrite, __int64)

+ 2 - 0
ecl/hthor/hthor.hpp

@@ -172,6 +172,8 @@ extern HTHOR_API IHThorActivity *createDiskAggregateActivity(IAgentContext &_age
 extern HTHOR_API IHThorActivity *createDiskCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskCountArg &arg, ThorActivityKind kind, IPropertyTree *node);
 extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskGroupAggregateArg &arg, ThorActivityKind kind, IPropertyTree *node);
 
+extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, IPropertyTree *node);
+
 extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, IPropertyTree *_node);
 extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, IPropertyTree *_node);
 extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind kind, IPropertyTree *_node);

+ 126 - 0
ecl/hthor/hthor.ipp

@@ -43,6 +43,7 @@
 #include "rtlrecord.hpp"
 #include "roxiemem.hpp"
 #include "roxierowbuff.hpp"
+#include "thorread.hpp"
 
 roxiemem::IRowManager * queryRowManager();
 using roxiemem::OwnedConstRoxieRow;
@@ -2921,6 +2922,131 @@ public:
 };
 
 
+class CHThorNewDiskReadBaseActivity : public CHThorActivityBase, implements IThorDiskCallback, implements IIndexReadContext
+{
+protected:
+    struct InputFileInfo : public CInterface
+    {
+        IDistributedFile * file;
+        Owned<IOutputMetaData> actualMeta;
+        Owned<IPropertyTree> meta;
+        unsigned actualCrc;
+    };
+
+    IHThorDiskReadBaseArg &helper;
+    IHThorCompoundBaseArg & segHelper;
+    IDiskRowReader * activeReader = nullptr;
+    IArrayOf<IDiskRowReader> readers;
+    IRawRowStream * rawRowStream = nullptr;
+    IAllocRowStream * roxieRowStream = nullptr;
+    StringBuffer mangledHelperFileName;
+    StringAttr tempFileName;
+    const char * logicalFileName = "";
+    CIArrayOf<InputFileInfo> subfiles;
+    Owned<ISuperFileDescriptor> superfile;
+    Owned<IDistributedFilePartIterator> dfsParts;
+    Owned<ILocalOrDistributedFile> ldFile;
+    IOutputMetaData *expectedDiskMeta = nullptr;
+    IOutputMetaData *projectedDiskMeta = nullptr;
+    IConstArrayOf<IFieldFilter> fieldFilters;  // These refer to the expected layout
+    Owned<IPropertyTree> readerOptions;
+    unsigned partNum = 0;
+    RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
+    bool useRawStream = false; // Constant for the lifetime of the activity
+    bool grouped = false;
+    bool opened = false;
+    bool finishedParts = false;
+    unsigned __int64 stopAfter = 0;
+    unsigned __int64 offsetOfPart = 0;
+
+    void close();
+    void resolveFile();
+    virtual void verifyRecordFormatCrc();
+    virtual void gatherInfo(IFileDescriptor * fileDesc);
+    StringBuffer &translateLFNtoLocal(const char *filename, StringBuffer &localName);
+
+    inline void queryUpdateProgress()
+    {
+        agent.reportProgress(NULL);
+    }
+
+    RecordTranslationMode getLayoutTranslationMode()
+    {
+        if (recordTranslationModeHint != RecordTranslationMode::Unspecified)
+            return recordTranslationModeHint;
+        return agent.getLayoutTranslationMode();
+    }
+
+public:
+    CHThorNewDiskReadBaseActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind, IPropertyTree *node);
+    ~CHThorNewDiskReadBaseActivity();
+    IMPLEMENT_IINTERFACE
+
+    virtual void ready();
+    virtual void stop();
+
+    IHThorInput *queryOutput(unsigned index)                { return this; }
+
+//interface IHThorInput
+    virtual bool isGrouped()                                { return grouped; }
+    virtual IOutputMetaData * queryOutputMeta() const               { return outputMeta; }
+
+//interface IFilePositionProvider
+    virtual unsigned __int64 getFilePosition(const void * row);
+    virtual unsigned __int64 getLocalFilePosition(const void * row);
+    virtual const char * queryLogicalFilename(const void * row);
+    virtual const byte * lookupBlob(unsigned __int64 id) { UNIMPLEMENTED; }
+
+//interface IIndexReadContext
+    virtual void append(IKeySegmentMonitor *segment) override { throwUnexpected(); }
+    virtual void append(FFoption option, const IFieldFilter * filter) override;
+
+protected:
+    bool openFirstPart();
+    bool initStream(IDiskRowReader * reader, const char * filename);
+    InputFileInfo * extractFileInformation(IDistributedFile * fileDesc);
+    bool openFilePart(const char * filename);
+    bool openFilePart(ILocalOrDistributedFile * localFile, IDistributedFilePart * filePart, unsigned whichPart);
+    void setEmptyStream();
+
+    virtual void open();
+    virtual bool openNext();
+    virtual void closepart();
+
+    bool openNextPart();
+    IDiskRowReader * ensureRowReader(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, IPropertyTree * options);
+};
+
+
+class CHThorNewDiskReadActivity : public CHThorNewDiskReadBaseActivity
+{
+    typedef CHThorNewDiskReadBaseActivity PARENT;
+protected:
+    IHThorDiskReadArg &helper;
+    bool needTransform;
+    bool hasMatchFilter;
+    unsigned __int64 lastGroupProcessed;
+    RtlDynamicRowBuilder outBuilder;
+    unsigned __int64 limit;
+    unsigned __int64 remoteLimit = 0;
+
+public:
+    CHThorNewDiskReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadArg &_arg, ThorActivityKind _kind, IPropertyTree *node);
+
+    virtual void ready();
+    virtual void stop();
+    virtual bool needsAllocator() const { return true; }
+
+    //interface IHThorInput
+    virtual const void *nextRow();
+
+protected:
+    void onLimitExceeded();
+};
+
+
+
+
 #define MAKEFACTORY(NAME) \
 extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind) \
 {   return new CHThor ## NAME ##Activity(_agent, _activityId, _subgraphId, arg, kind); }

+ 8 - 11
rtl/eclrtl/rtlcommon.cpp

@@ -53,18 +53,19 @@ CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : CConti
     readOffset = 0;
 }
 
-void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
+
+size32_t CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
 {
     ensureAccessible(readOffset + len);
     memcpy(ptr, cur+readOffset, len);
     readOffset += len;
+    return len;
 }
 
 
 size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
 {
-    doRead(len, ptr);
-    return len;
+    return doRead(len, ptr);
 }
 
 size32_t CThorContiguousRowBuffer::readSize()
@@ -77,8 +78,7 @@ size32_t CThorContiguousRowBuffer::readSize()
 size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
 {
     size32_t size = sizePackedInt();
-    doRead(size, ptr);
-    return size;
+    return doRead(size, ptr);
 }
 
 size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
@@ -88,24 +88,21 @@ size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offse
 
     size32_t size = sizeUtf8(len);
     byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
+    return doRead(size, self+offset);
 }
 
 size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
 {
     size32_t size = sizeVStr();
     byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
+    return doRead(size, self+offset);
 }
 
 size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
 {
     size32_t size = sizeVUni();
     byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
+    return doRead(size, self+offset);
 }
 
 

+ 7 - 3
rtl/eclrtl/rtlcommon.hpp

@@ -7,8 +7,9 @@
 #include "eclrtl.hpp"
 #include "eclhelper.hpp"
 
-//The CContiguousRowBuffer is a buffer used for reading ahead into a file, and unsuring there is a contiguous
+//The CContiguousRowBuffer is a buffer used for reading ahead into a file, and ensuring there is a contiguous
 //block of data available to the reader.  Fixed size files could use this directly.
+//NOTE: This class does not allocate a buffer - it uses the buffer in the input stream.
 class ECLRTL_API CContiguousRowBuffer
 {
 public:
@@ -114,6 +115,11 @@ public:
         readOffset = 0;
     }
 
+    inline void read(byte & next) { doRead(1, &next); }
+    inline void read(bool & next) { doRead(1, &next); }
+
+    size32_t doRead(size32_t len, void * ptr);
+
 protected:
     size32_t sizePackedInt();
     size32_t sizeUtf8(size32_t len);
@@ -123,8 +129,6 @@ protected:
 
 private:
 
-    void doRead(size32_t len, void * ptr);
-
     inline void ensureAccessible(size32_t required)
     {
         if (required > maxAvailable())

+ 9 - 1
rtl/eclrtl/rtldynfield.hpp

@@ -103,7 +103,15 @@ interface IRtlFieldTypeDeserializer : public IInterface
 
 };
 
-enum class RecordTranslationMode:byte { None = 0, All = 1, Payload = 2, AlwaysDisk = 3, AlwaysECL = 4, Unspecified = 5 };  // AlwaysDisk and AlwaysECL are for testing purposes only
+enum class RecordTranslationMode : byte
+{
+    None = 0,       // Never translate - throw an error if the ecl does not match published
+    All = 1,        // Translate all fields.
+    Payload = 2,    // Translate all fields in datasets, and only payload fields in indexes
+    AlwaysDisk = 3, // Always translate - even if wouldn't normally (e.g. csv/xml source read as binary), or crcs happen to match
+    AlwaysECL = 4,  // Ignore the published format - can make sense to force no translation e.g. when field names have changed
+    Unspecified = 5
+};  // AlwaysDisk and AlwaysECL are for testing purposes only
 
 extern ECLRTL_API RecordTranslationMode getTranslationMode(const char *modeStr);
 extern ECLRTL_API const char *getTranslationModeText(RecordTranslationMode val);

+ 1 - 1
rtl/eclrtl/rtlnewkey.cpp

@@ -2192,7 +2192,7 @@ bool RowFilter::matches(const RtlRow & row) const
     return true;
 }
 
-void RowFilter::appendFilters(IConstArrayOf<IFieldFilter> & _filters)
+void RowFilter::appendFilters(const IConstArrayOf<IFieldFilter> & _filters)
 {
     ForEachItemIn(i, _filters)
     {

+ 1 - 1
rtl/eclrtl/rtlnewkey.hpp

@@ -73,7 +73,7 @@ public:
     void recalcFieldsRequired();
     void remove(unsigned idx);
     void clear();
-    void appendFilters(IConstArrayOf<IFieldFilter> &_filters);
+    void appendFilters(const IConstArrayOf<IFieldFilter> &_filters);
 protected:
     IConstArrayOf<IFieldFilter> filters;
     unsigned numFieldsRequired = 0;

+ 6 - 0
rtl/include/eclhelper.hpp

@@ -1055,6 +1055,7 @@ enum ThorActivityKind
     TAKspillread,
     TAKspillwrite,
     TAKnwaydistribute,
+    TAKnewdiskread,  // This activity will eventually have a refactored interface, currently a placeholder
 
     TAKlast
 };
@@ -1122,6 +1123,7 @@ enum
     TDRkeyedlimitcreates= 0x00400000,
     TDRunfilteredcount  = 0x00800000,       // count/aggregegate doesn't have an additional filter
     TDRfilenamecallback = 0x01000000,
+    TDRtransformvirtual = 0x02000000,       // transform uses a virtual field.
 
 //disk write flags
     TDWextend           = 0x0100,
@@ -2556,6 +2558,10 @@ struct IHThorXmlReadArg: public IHThorDiskReadBaseArg
     virtual void onLimitExceeded() = 0;
 };
 
+struct IHThorNewDiskReadArg: public IHThorDiskReadArg
+{
+};
+
 typedef unsigned thor_loop_counter_t;
 struct IHThorLoopArg : public IHThorArg
 {

+ 2 - 0
system/jlib/CMakeLists.txt

@@ -79,6 +79,7 @@ set (    SRCS
          jptree.cpp 
          jqueue.cpp
          jregexp.cpp 
+         jrowstream.cpp
          jsem.cpp 
          jset.cpp 
          jsmartsock.cpp 
@@ -148,6 +149,7 @@ set (    INCLUDES
         jqueue.tpp
         jregexp.hpp
         jrespool.tpp
+        jrowstream.hpp
         jscm.hpp
         jsem.hpp
         jset.hpp

+ 7 - 5
system/jlib/jbuff.cpp

@@ -458,6 +458,13 @@ void MemoryBuffer::swapWith(MemoryBuffer & other)
     SWAP(swapEndian, other.swapEndian, bool);
 }
 
+bool MemoryBuffer::matches(const MemoryBuffer & other) const
+{
+    if (curLen != other.curLen)
+        return false;
+    return (memcmp(buffer, other.buffer, curLen) == 0);
+}
+
 //-----------------------------------------------------------------------
 
 
@@ -468,11 +475,6 @@ MemoryBuffer::MemoryBuffer(size_t initial)
     _realloc((size32_t)initial);
 }
 
-MemoryBuffer::MemoryBuffer(MemoryBuffer & value __attribute__((unused)))
-{
-    assertex(!"This should never be used");
-}
-
 MemoryBuffer::MemoryBuffer(size_t len, const void * newBuffer)
 {
     init();

+ 5 - 8
system/jlib/jbuff.hpp

@@ -126,6 +126,7 @@ public:
     inline MemoryBuffer()  { init(); }
     MemoryBuffer(size_t initial);
     MemoryBuffer(size_t len, const void * buffer);
+    MemoryBuffer(MemoryBuffer & value) = delete;
     inline ~MemoryBuffer() { kill(); }
     
     MemoryBuffer &  rewrite(size32_t pos = 0);
@@ -150,8 +151,7 @@ public:
     MemoryBuffer &  appendSwap(size32_t len, const void * value);
     MemoryBuffer &  appendPacked(unsigned __int64 value); // compatible with any unsigned size
     inline MemoryBuffer &  appendMemSize(memsize_t  value) { __int64 val=(__int64)value; append(val); return *this; }
-    
-    
+
     MemoryBuffer &  reset(size32_t pos = 0);
     MemoryBuffer &  read(char & value);
     MemoryBuffer &  read(unsigned char & value);
@@ -187,6 +187,7 @@ public:
     int             setEndian(int endian);          // pass __[BIG|LITTLE]_ENDIAN
     bool            setSwapEndian(bool swap);
     void            swapWith(MemoryBuffer & other);
+    bool            matches(const MemoryBuffer & other) const;
 
     inline size32_t capacity() { return (maxLen - curLen); }
     void *          ensureCapacity (unsigned max);
@@ -211,9 +212,6 @@ public:
     inline const byte * bytes() const { return curLen ? (const byte *)buffer : nullptr; }
 
 
-protected:
-    size32_t  readPos;
-    bool    swapEndian;
 private:
     MemoryBuffer &  read(unsigned long & value);    // unimplemented
     MemoryBuffer &  read(long & value);             // unimplemented
@@ -233,10 +231,9 @@ private:
     mutable char *  buffer;
     size32_t  curLen;
     size32_t  maxLen;
+    size32_t  readPos;
+    bool    swapEndian;
     bool    ownBuffer;
-    
-    MemoryBuffer(MemoryBuffer & value);
-    
 };
 
 // Utility class, to back patch a scalar into current position

+ 68 - 0
system/jlib/jrowstream.cpp

@@ -0,0 +1,68 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+#include "platform.h"
+
+#include "jrowstream.hpp"
+
+class NullRawRowStream : public CInterfaceOf<IRawRowStream>
+{
+    virtual bool getCursor(MemoryBuffer & cursor)
+    {
+        return true;
+    }
+    virtual void setCursor(MemoryBuffer & cursor)
+    {
+    }
+    virtual void stop()
+    {
+    }
+    virtual const void *nextRow(size32_t & size)
+    {
+        size = 0;
+        return eofRow;
+    }
+};
+static NullRawRowStream nullRawStream;
+
+IRawRowStream * queryNullRawRowStream()
+{
+    return &nullRawStream;
+}
+
+IRawRowStream * createNullRawRowStream()
+{
+    return new NullRawRowStream;
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class NullAllocRowStream : public CInterfaceOf<IAllocRowStream>
+{
+    virtual const void *nextRow()
+    {
+        return eofRow;
+    }
+};
+static NullAllocRowStream nullAllocStream;
+
+IAllocRowStream * queryNullAllocatedRowStream()
+{
+    return &nullAllocStream;
+}

+ 89 - 0
system/jlib/jrowstream.hpp

@@ -0,0 +1,89 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef JROWSTREAM_INCL
+#define JROWSTREAM_INCL
+
+#include "jiface.hpp"
+
+//The following values are used as values for special rows which are returned in the row stream
+enum class SpecialRow : memsize_t
+{
+    eof = 0,    // end of file
+    eog = 1,    // end of group
+    eos = 2,    // end of stream
+    //more: We could have eogs == eog|eos, but simpler for the callers if they come through separately.
+    max
+};
+constexpr static const byte * eofRow = (const byte * )(memsize_t)SpecialRow::eof;
+constexpr static byte const * eogRow = (const byte * )(memsize_t)SpecialRow::eog;
+constexpr static const byte * eosRow = (const byte * )(memsize_t)SpecialRow::eos;
+
+inline bool isSpecialRow(const void * row) { return unlikely((memsize_t)row < (memsize_t)SpecialRow::max); }
+inline bool isEndOfFile(const void * row) { return unlikely((memsize_t)row == (memsize_t)SpecialRow::eof); }       // checking row against null is also valid
+inline bool isEndOfGroup(const void * row) { return unlikely((memsize_t)row == (memsize_t)SpecialRow::eog); }
+inline bool isEndOfStream(const void * row) { return unlikely((memsize_t)row == (memsize_t)SpecialRow::eos); }
+inline SpecialRow getSpecialRowType(const void * row) { return (SpecialRow)(memsize_t)row; }
+
+
+//Base interface for reading a stream of rows
+class MemoryBuffer;
+interface IRowStreamBase : extends IInterface
+{
+    virtual bool getCursor(MemoryBuffer & cursor) = 0;
+    virtual void setCursor(MemoryBuffer & cursor) = 0;
+    virtual void stop() = 0;                              // after stop called NULL is returned
+};
+
+//An interface for reading rows from which are not cloned
+interface IRawRowStream : extends IRowStreamBase
+{
+    virtual const void *nextRow(size32_t & size)=0;       // rows returned are only valid until next call.  Size is the number of bytes in the row.
+
+    inline const void *ungroupedNextRow(size32_t & size)  // size will not include the size of the eog
+    {
+        for (;;)
+        {
+            const void *ret = nextRow(size);
+            if (likely(!isEndOfGroup(ret)))
+                return ret;
+        }
+    }
+};
+
+//An interface for reading rows which have been allocated
+interface IAllocRowStream : extends IInterface
+{
+    virtual const void *nextRow()=0;                      // rows returned must be freed
+
+    inline const void *ungroupedNextRow()
+    {
+        for (;;)
+        {
+            const void *ret = nextRow();
+            if (likely(!isEndOfGroup(ret)))
+                return ret;
+        }
+    }
+};
+
+
+extern jlib_decl IRawRowStream * queryNullRawRowStream();
+extern jlib_decl IAllocRowStream * queryNullAllocatedRowStream();
+extern jlib_decl IRawRowStream * createNullRawRowStream();
+
+#endif

+ 3 - 0
testing/regress/ecl/translatedisk.ecl

@@ -17,15 +17,18 @@
 
 //version multiPart=false
 //version multiPart=true
+//version multiPart=true,optRemoteRead=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, false);
+optRemoteRead := #IFDEFINED(root.optRemoteRead, true);
 
 //--- end of version configuration ---
 
 #onwarning(2036, ignore);
 #onwarning(4522, ignore);
 #option ('layoutTranslation', true);
+#option('forceRemoteRead', optRemoteRead);
 import $.Setup;
 
 boolean useLocal := false;