瀏覽代碼

HPCC-17644 Support for remote key filtering + counting/limits.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 年之前
父節點
當前提交
e865819759

+ 4 - 1
common/remote/CMakeLists.txt

@@ -55,6 +55,8 @@ include_directories (
          ./../../system/mp 
          ./../../system/include 
          ./../../system/jlib 
+         ./../../system/jhtree
+         ./../../rtl/eclrtl
          ./../../system/security/securesocket
          ./../../testing/unittests
     )
@@ -65,7 +67,8 @@ HPCC_ADD_LIBRARY( remote SHARED ${SRCS}  )
 install ( TARGETS remote RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
 
 target_link_libraries ( remote 
-    jlib 
+    jlib
+    jhtree 
     mp
     ${URIPARSER_LIBRARIES}
     ${CPPUNIT_LIBRARIES}

+ 1 - 0
common/remote/remoteerr.hpp

@@ -66,6 +66,7 @@
 #define RFSERR_VersionMismatch                  8047
 #define RFSERR_SetThrottleFailed                8048
 #define RFSERR_MaxQueueRequests                 8049
+#define RFSERR_KeyIndexFailed                   8050
 
 //---- Text for all errors (make it easy to internationalise) ---------------------------
 

+ 417 - 37
common/remote/sockfile.cpp

@@ -36,6 +36,7 @@
 #include "jsocket.hpp"
 #include "jencrypt.hpp"
 #include "jset.hpp"
+#include "jhtree.hpp"
 
 #include "remoteerr.hpp"
 #include <atomic>
@@ -159,7 +160,7 @@ struct dummyReadWrite
 // backward compatible modes
 typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
 
-static const char *VERSTRING= "DS V1.9"       // dont forget FILESRV_VERSION in header
+static const char *VERSTRING= "DS V2.0"       // dont forget FILESRV_VERSION in header
 #ifdef _WIN32
 "Windows ";
 #else
@@ -313,6 +314,10 @@ enum {
 // 1.9
     RFCsetthrottle2,
     RFCsetfileperms,
+// 2.0
+    RFCreadfilteredindex,
+    RFCreadfilteredindexcount,
+    RFCreadfilteredindexblob,
     RFCmax,
     RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
 };
@@ -361,6 +366,9 @@ const char *RFCStrings[] =
     RFCText(RFCsetthrottle), // legacy version
     RFCText(RFCsetthrottle2),
     RFCText(RFCsetfileperms),
+    RFCText(RFCreadfilteredindex),
+    RFCText(RFCreadfilteredcount),
+    RFCText(RFCreadfilteredblob),
     RFCText(RFCunknown),
 };
 static const char *getRFCText(RemoteFileCommandType cmd)
@@ -448,6 +456,8 @@ static const char *getRFSERRText(unsigned err)
             return "RFSERR_SetThrottleFailed";
         case RFSERR_MaxQueueRequests:
             return "RFSERR_MaxQueueRequests";
+        case RFSERR_KeyIndexFailed:
+            return "RFSERR_MaxQueueRequests";
     }
     return "RFSERR_Unknown";
 }
@@ -1609,7 +1619,6 @@ void CEndpointCS::beforeDispose()
     table.removeExact(this);
 }
 
-
 class CRemoteFile : public CRemoteBase, implements IFile
 {
     StringAttr remotefilename;
@@ -2221,7 +2230,7 @@ public:
             handle = 0;
         }
     }
-
+    RemoteFileIOHandle getHandle() const { return handle; }
     bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
     {
         MemoryBuffer sendBuffer;
@@ -2472,6 +2481,11 @@ public:
     }
 
     void setDisconnectOnExit(bool set) { disconnectonexit = set; }
+
+    void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false)
+    {
+        parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy);
+    }
 };
 
 void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set)
@@ -2582,6 +2596,206 @@ void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *pr
     doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
 }
 
+/////////////////////////
+
+class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
+{
+    StringAttr filename;
+    Linked<IDelayedFile> delayedFile;
+    SegMonitorList segs;
+    size32_t rowDataRemaining = 0;
+    MemoryBuffer rowDataBuffer;
+    size32_t keyCursorSz = 0;        // used for continuation
+    const void *keyCursor = nullptr; // used for continuation
+    unsigned __int64 totalGot = 0;
+    size32_t maxRecsPerRequest = 100; // arbritary # recs per request, perhaps should be based on recsize
+    size32_t keySize = 0;
+    size32_t currentSize = 0;
+    offset_t currentFpos = 0;
+    const byte *currentRow = nullptr;
+    bool first = true;
+    unsigned __int64 chooseNLimit = 0;
+    ConstPointerArray activeBlobs;
+
+    CRemoteFileIO *prepKeySend(MemoryBuffer &sendBuffer, RemoteFileCommandType cmd, bool segmentMonitors)
+    {
+        Owned<IFileIO> iFileIO = delayedFile->getFileIO();
+        if (!iFileIO)
+            throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
+        Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
+        assertex(remoteIO);
+        initSendBuffer(sendBuffer);
+        sendBuffer.append(cmd).append(remoteIO->getHandle()).append(filename).append(keySize);
+        if (segmentMonitors)
+            segs.serialize(sendBuffer);
+        return remoteIO.getClear();
+    }
+    unsigned __int64 _checkCount(unsigned __int64 limit)
+    {
+        MemoryBuffer sendBuffer;
+        Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexcount, true);
+        sendBuffer.append(limit);
+        MemoryBuffer replyBuffer;
+        remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
+        unsigned __int64 count;
+        replyBuffer.read(count);
+        return count;
+    }
+public:
+    CRemoteKeyManager(const char *_filename, unsigned _keySize, IDelayedFile *_delayedFile) : filename(_filename), keySize(_keySize), delayedFile(_delayedFile)
+    {
+    }
+    ~CRemoteKeyManager()
+    {
+        releaseBlobs();
+    }
+    virtual void reset(bool crappyHack = false) override
+    {
+        rowDataBuffer.clear();
+        rowDataRemaining = 0;
+        currentSize = 0;
+        currentFpos = 0;
+        currentRow = nullptr;
+        first = true;
+        maxRecsPerRequest = 100;
+        totalGot = 0;
+        keyCursorSz = 0;
+        keyCursor = nullptr;
+    }
+    virtual void releaseSegmentMonitors() override { segs.reset(); }
+    virtual const byte *queryKeyBuffer(offset_t & fpos) override
+    {
+        fpos = currentFpos;
+        return currentRow;
+    }
+    virtual offset_t queryFpos() override
+    {
+        return currentFpos;
+    }
+    virtual unsigned queryRecordSize() override
+    {
+        return currentSize;
+    }
+    virtual bool lookup(bool exact) override
+    {
+        while (true)
+        {
+            if (rowDataRemaining)
+            {
+                rowDataBuffer.read(currentFpos);
+                rowDataBuffer.read(currentSize);
+                currentRow = rowDataBuffer.readDirect(currentSize);
+                rowDataRemaining -= sizeof(currentFpos) + sizeof(currentSize) + currentSize;
+                return true;
+            }
+            else
+            {
+                if (!first && (nullptr == keyCursor)) // No keyCursor implies there is nothing more to fetch
+                    return false;
+                unsigned maxRecs = maxRecsPerRequest;
+                if (maxRecs && chooseNLimit)
+                {
+                    if (totalGot + maxRecs > chooseNLimit)
+                        maxRecs = (unsigned)(chooseNLimit - totalGot);
+                }
+                if (0 == maxRecs)
+                    break;
+                MemoryBuffer sendBuffer;
+                Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindex, true);
+                sendBuffer.append(first).append(maxRecs);
+                if (first)
+                    first = false;
+                else
+                {
+                    dbgassertex(keyCursor);
+                    sendBuffer.append(keyCursorSz, keyCursor);
+                }
+                rowDataBuffer.clear();
+                remoteIO->sendRemoteCommand(sendBuffer, rowDataBuffer);
+                unsigned recsGot;
+                rowDataBuffer.read(recsGot);
+                if (0 == recsGot)
+                {
+                    maxRecsPerRequest = 0;
+                    break; // end
+                }
+                totalGot += recsGot;
+                rowDataBuffer.read(rowDataRemaining);
+                unsigned pos = rowDataBuffer.getPos(); // start of row data
+                const void *rowData = rowDataBuffer.readDirect(rowDataRemaining);
+                rowDataBuffer.read(keyCursorSz);
+                if (keyCursorSz)
+                    keyCursor = rowDataBuffer.readDirect(keyCursorSz);
+                else
+                    keyCursor = nullptr;
+                rowDataBuffer.reset(pos); // reposition to start of row data
+            }
+        }
+        return false;
+    }
+    virtual unsigned __int64 getCount() override
+    {
+        return _checkCount((unsigned __int64)-1);
+    }
+    virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) override { UNIMPLEMENTED; }
+    virtual bool nextRange(unsigned groupSegCount) override { UNIMPLEMENTED; }
+    virtual void setKey(IKeyIndexBase * _key) override { UNIMPLEMENTED; }
+    virtual void setChooseNLimit(unsigned __int64 _chooseNLimit) override
+    {
+        chooseNLimit = _chooseNLimit;
+    }
+    virtual unsigned __int64 checkCount(unsigned __int64 limit) override
+    {
+        return _checkCount(limit);
+    }
+    virtual void serializeCursorPos(MemoryBuffer &mb) override { UNIMPLEMENTED; }
+    virtual void deserializeCursorPos(MemoryBuffer &mb) override { UNIMPLEMENTED; }
+    virtual unsigned querySeeks() const override { return 0; }
+    virtual unsigned queryScans() const override { return 0; }
+    virtual unsigned querySkips() const override { return 0; }
+    virtual unsigned queryNullSkips() const override { return 0; }
+    virtual const byte *loadBlob(unsigned __int64 blobId, size32_t &blobSize) override
+    {
+        MemoryBuffer sendBuffer;
+        Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexblob, false);
+        sendBuffer.append(blobId);
+        MemoryBuffer replyBuffer;
+        remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
+        replyBuffer.read(blobSize);
+        const byte *blobData = replyBuffer.readDirect(blobSize);
+        activeBlobs.append(replyBuffer.detach()); // NB: don't need to retain size, but keep sz+data to avoid copy
+        return blobData;
+    }
+    virtual void releaseBlobs() override
+    {
+        ForEachItemIn(idx, activeBlobs)
+        {
+            free((void *) activeBlobs.item(idx));
+        }
+        activeBlobs.kill();
+    }
+    virtual void resetCounts() override { UNIMPLEMENTED; }
+
+    virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) override { UNIMPLEMENTED; }
+    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override { segs.swapWith(segmentMonitors); }
+    virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override { segs.deserialize(mb); }
+    virtual void finishSegmentMonitors() override { }
+    virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) override { UNIMPLEMENTED; }
+    virtual void append(IKeySegmentMonitor *segment) override
+    {
+        segs.append(segment);
+    }
+    virtual unsigned ordinality() const override { return segs.ordinality(); }
+    virtual IKeySegmentMonitor *item(unsigned idx) const override { return segs.item(idx); }
+    virtual void setMergeBarrier(unsigned offset) override { UNIMPLEMENTED; }
+};
+
+IKeyManager * createRemoteKeyManager(const char *filename, unsigned keySize, IDelayedFile *delayedFile)
+{
+    return new CRemoteKeyManager(filename, keySize, delayedFile);
+}
+
+//////////////
 
 unsigned getRemoteVersion(ISocket * socket, StringBuffer &ver)
 {
@@ -3059,6 +3273,7 @@ public:
     std::atomic<unsigned __int64> bRead;
     std::atomic<unsigned __int64> bWritten;
 };
+
 class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
 {
     typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
@@ -3160,6 +3375,17 @@ public:
     }
 };
 
+enum OpenFileFlag { of_null=0x0, of_key=0x01 };
+struct OpenFileInfo
+{
+    OpenFileInfo() { }
+    OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
+    Linked<IFileIO> fileIO;
+    Linked<StringAttrItem> filename; // for debug
+    int handle = 0;
+    unsigned flags = 0;
+};
+
 class CRemoteFileServer : implements IRemoteFileServer, public CInterface
 {
     class CThrottler;
@@ -3173,10 +3399,8 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         MemoryBuffer msg;
         bool selecthandled;
         size32_t left;
-        IArrayOf<IFileIO>   openfiles;      // kept in sync with handles
+        StructArrayOf<OpenFileInfo> openFiles;
         Owned<IDirectoryIterator> opendir;
-        StringAttrArray     opennames;      // for debug
-        IntArray            handles;
         unsigned            lasttick, lastInactiveTick;
         atomic_t            &globallasttick;
         unsigned            previdx;        // for debug
@@ -3333,8 +3557,11 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
 
         void logPrevHandle()
         {
-            if (previdx<opennames.ordinality())
-                PROGLOG("Previous handle(%d): %s",handles.item(previdx),opennames.item(previdx).text.get());
+            if (previdx<openFiles.ordinality())
+            {
+                const OpenFileInfo &fileInfo = openFiles.item(previdx);
+                PROGLOG("Previous handle(%d): %s", fileInfo.handle, fileInfo.filename->text.get());
+            }
         }
 
         bool throttleCommand(MemoryBuffer &msg)
@@ -3481,9 +3708,11 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
                 ok = false;
             unsigned ms = msTick();
             str.appendf("): last touch %d ms ago (%d, %d)",ms-lasttick,lasttick,ms);
-            ForEachItemIn(i,handles) {
-                str.appendf("\n  %d: ",handles.item(i));
-                str.append(opennames.item(i).text);
+            ForEachItemIn(i, openFiles)
+            {
+                const OpenFileInfo &fileInfo = openFiles.item(i);
+                str.appendf("\n  %d: ", fileInfo.handle);
+                str.append(fileInfo.filename->text.get());
             }
             return ok;
         }
@@ -3819,8 +4048,10 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         handleidx = (unsigned)-1;
         ForEachItemIn(i,clients) {
             CRemoteClientHandler &client = clients.item(i);
-            ForEachItemIn(j,client.handles) {
-                if (client.handles.item(j)==handle) {
+            ForEachItemIn(j, client.openFiles)
+            {
+                if (client.openFiles.item(j).handle==handle)
+                {
                     handleidx = j;
                     clientidx = i;
                     return true;
@@ -3830,6 +4061,64 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         return false;
     }
 
+    unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply)
+    {
+        DelayedSizeMarker keyDataSzReturned(reply);
+        unsigned numRecs = 0;
+        while (maxRecs-- && keyManager->lookup(true))
+        {
+            unsigned size = keyManager->queryRecordSize();
+            offset_t fpos;
+            const byte *result = keyManager->queryKeyBuffer(fpos);
+            reply.append(fpos);
+            reply.append(size);
+            reply.append(size, result);
+            ++numRecs;
+        }
+        keyDataSzReturned.write();
+        return numRecs;
+    }
+
+    IKeyManager *prepKey(int handle, const char *keyname, unsigned keySize, SegMonitorList *segs)
+    {
+        OpenFileInfo fileInfo;
+        if (!lookupFileIOHandle(handle, fileInfo, of_key))
+        {
+            VStringBuffer errStr("Error opening key file : %s", keyname);
+            throw createDafsException(RFSERR_InvalidFileIOHandle, errStr.str());
+        }
+        Owned<IKeyIndex> index = createKeyIndex(keyname, 0, *fileInfo.fileIO, false, false);
+        if (!index)
+        {
+            VStringBuffer errStr("Error opening key file : %s", keyname);
+            throw createDafsException(RFSERR_KeyIndexFailed, errStr.str());
+        }
+        Owned<IKeyManager> keyManager = createKeyManager(index, keySize, nullptr);
+        if (segs)
+        {
+            keyManager->setSegmentMonitors(*segs);
+            keyManager->finishSegmentMonitors();
+        }
+        keyManager->reset();
+        return keyManager.getLink();
+    }
+
+    IKeyManager *prepKey(MemoryBuffer &mb, bool segmentMonitors)
+    {
+        int handle;
+        StringBuffer keyName;
+        size32_t keySize;
+        mb.read(handle).read(keyName).read(keySize);
+        if (segmentMonitors)
+        {
+            SegMonitorList segs;
+            segs.deserialize(mb);
+            return prepKey(handle, keyName, keySize, &segs);
+        }
+        else
+            return prepKey(handle, keyName, keySize, nullptr);
+    }
+
     class cCommandProcessor: public CInterface, implements IPooledThread
     {
         Owned<CRemoteClientHandler> client;
@@ -3970,37 +4259,64 @@ public:
         PROGLOG("Exited CRemoteFileServer");
 #endif
     }
+    bool lookupFileIOHandle(int handle, OpenFileInfo &fileInfo, unsigned newFlags=0)
+    {
+        if (handle<=0)
+            return false;
+        CriticalBlock block(sect);
+        unsigned clientidx;
+        unsigned handleidx;
+        if (!findHandle(handle,clientidx,handleidx))
+            return false;
+        CRemoteClientHandler &client = clients.item(clientidx);
+        OpenFileInfo &openFileInfo = client.openFiles.element(handleidx); // NB: links members
+        openFileInfo.flags |= newFlags;
+        fileInfo = openFileInfo;
+        client.previdx = handleidx;
+        return true;
+    }
 
     //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
     // should throw a different exception
-    bool checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
+    bool checkFileIOHandle(int handle, IFileIO *&fileio, bool del=false)
     {
-        CriticalBlock block(sect);
         fileio = NULL;
-        if (handle<=0) {
-            appendErr(reply, RFSERR_NullFileIOHandle);
+        if (handle<=0)
             return false;
-        }
+        CriticalBlock block(sect);
         unsigned clientidx;
         unsigned handleidx;
-        if (findHandle(handle,clientidx,handleidx)) {
+        if (findHandle(handle,clientidx,handleidx))
+        {
             CRemoteClientHandler &client = clients.item(clientidx);
-            if (del) {
-                client.handles.remove(handleidx);
-                client.openfiles.remove(handleidx);
-                client.opennames.remove(handleidx);
+            const OpenFileInfo &fileInfo = client.openFiles.item(handleidx);
+            if (del)
+            {
+                if (fileInfo.flags & of_key)
+                    clearKeyStoreCacheEntry(fileInfo.fileIO);
+                client.openFiles.remove(handleidx);
                 client.previdx = (unsigned)-1;
             }
-            else {
-               fileio = &client.openfiles.item(handleidx);
+            else
+            {
+               fileio = client.openFiles.item(handleidx).fileIO;
                client.previdx = handleidx;
             }
             return true;
         }
-        appendErr(reply, RFSERR_InvalidFileIOHandle);
         return false;
     }
 
+    bool checkFileIOHandle(MemoryBuffer &reply, int handle, IFileIO *&fileio, bool del=false)
+    {
+        if (!checkFileIOHandle(handle, fileio, del))
+        {
+            appendErr(reply, RFSERR_InvalidFileIOHandle);
+            return false;
+        }
+        return true;
+    }
+
     void onCloseSocket(CRemoteClientHandler *client, int which) 
     {
         if (!client)
@@ -4086,15 +4402,13 @@ public:
         }
         if (TF_TRACE_PRE_IO)
             PROGLOG("before open file '%s',  (%d,%d,%d,%d,0%o)",name->text.get(),(int)mode,(int)share,extraFlags,sMode,cFlags);
-        IFileIO *fileio = file->open((IFOmode)mode,extraFlags);
+        Owned<IFileIO> fileio = file->open((IFOmode)mode,extraFlags);
         int handle;
         if (fileio) {
             CriticalBlock block(sect);
             handle = getNextHandle();
-            client.previdx = client.opennames.ordinality();
-            client.handles.append(handle);
-            client.openfiles.append(*fileio);
-            client.opennames.append(*name.getLink());
+            client.previdx = client.openFiles.ordinality();
+            client.openFiles.append(OpenFileInfo(handle, fileio, name));
         }
         else
             handle = 0;
@@ -4162,6 +4476,60 @@ public:
         return true;
     }
 
+    bool cmdReadFilteredIndex(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
+    {
+        Owned<IKeyManager> keyManager = prepKey(msg, true);
+        bool first;
+        size32_t maxRecs;
+        msg.read(first).read(maxRecs);
+        if (!first)
+            keyManager->deserializeCursorPos(msg);
+
+        reply.append((unsigned)RFEnoerror);
+        DelayedMarker<unsigned> numReturned(reply);
+        unsigned numRecs = readKeyData(keyManager, maxRecs, reply);
+        numReturned.write(numRecs);
+
+        DelayedSizeMarker keyCursorSz(reply);
+        if (numRecs >= maxRecs) // no point in cursor if no more recs to return
+            keyManager->serializeCursorPos(reply);
+        keyCursorSz.write();
+        return true;
+    }
+
+    bool cmdReadFilteredIndexCount(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
+    {
+        Owned<IKeyManager> keyManager = prepKey(msg, true);
+
+        unsigned __int64 limit;
+        msg.read(limit);
+        unsigned __int64 count;
+        if (((unsigned __int64)-1) != limit)
+            count = keyManager->checkCount(limit);
+        else
+            count = keyManager->getCount();
+        reply.append((unsigned)RFEnoerror);
+        reply.append(count);
+        return true;
+    }
+
+    bool cmdReadFilteredIndexBlob(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
+    {
+        Owned<IKeyManager> keyManager = prepKey(msg, false);
+        unsigned __int64 blobId;
+        msg.read(blobId);
+
+        size32_t blobSize;
+        const byte *blobData = keyManager->loadBlob(blobId, blobSize);
+
+        reply.append((unsigned)RFEnoerror);
+        reply.append(blobSize);
+        reply.append(blobSize, blobData);
+
+        keyManager->releaseBlobs();
+        return true;
+    }
+
     bool cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
     {
         int handle;
@@ -4899,6 +5267,9 @@ public:
             case RFCisreadonly:
             case RFCsetreadonly:
             case RFCsetfileperms:
+            case RFCreadfilteredindex:
+            case RFCreadfilteredindexcount:
+            case RFCreadfilteredindexblob:
             case RFCgettime:
             case RFCsettime:
             case RFCcreatedir:
@@ -4937,6 +5308,9 @@ public:
             {
                 MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
                 MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
+                MAPCOMMANDSTATS(RFCreadfilteredindex, cmdReadFilteredIndex, *stats);
+                MAPCOMMANDSTATS(RFCreadfilteredindexcount, cmdReadFilteredIndexCount, *stats);
+                MAPCOMMANDSTATS(RFCreadfilteredindexblob, cmdReadFilteredIndexBlob, *stats);
                 MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
                 MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
                 MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);
@@ -5295,29 +5669,35 @@ public:
 
     void checkTimeout()
     {
-        if (msTick()-clientcounttick>1000*60*60) {
+        if (msTick()-clientcounttick>1000*60*60)
+        {
             CriticalBlock block(ClientCountSect);
             if (TF_TRACE_CLIENT_STATS && (ClientCount || MaxClientCount))
                 PROGLOG("Client count = %d, max = %d", ClientCount, MaxClientCount);
             clientcounttick = msTick();
             MaxClientCount = ClientCount;
-            if (closedclients) {
+            if (closedclients)
+            {
                 if (TF_TRACE_CLIENT_STATS)
                     PROGLOG("Closed client count = %d",closedclients);
                 closedclients = 0;
             }
         }
         CriticalBlock block(sect);
-        ForEachItemInRev(i,clients) {
+        ForEachItemInRev(i,clients)
+        {
             CRemoteClientHandler &client = clients.item(i);
-            if (client.timedOut()) {
+            if (client.timedOut())
+            {
                 StringBuffer s;
                 bool ok = client.getInfo(s);    // will spot duff sockets
-                if (ok&&(client.handles.ordinality()!=0))  {
+                if (ok&&(client.openFiles.ordinality()!=0))
+                {
                     if (TF_TRACE_CLIENT_CONN && client.inactiveTimedOut())
                         WARNLOG("Inactive %s",s.str());
                 }
-                else {
+                else
+                {
 #ifndef _DEBUG
                     if (TF_TRACE_CLIENT_CONN)
 #endif

+ 5 - 2
common/remote/sockfile.hpp

@@ -60,9 +60,12 @@ interface IRemoteFileServer : extends IInterface
     virtual StringBuffer &getStats(StringBuffer &stats, bool reset) = 0;
 };
 
-#define FILESRV_VERSION 19 // don't forget VERSTRING in sockfile.cpp
+#define FILESRV_VERSION 20 // don't forget VERSTRING in sockfile.cpp
 
-extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename); // takes ownershop of socket
+interface IKeyManager;
+interface IDelayedFile;
+extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename);
+extern REMOTE_API IKeyManager * createRemoteKeyManager(const char *filename, unsigned keySize, IDelayedFile *delayedFile);
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();

+ 2 - 0
dali/dafilesrv/dafilesrv.cmake

@@ -31,7 +31,9 @@ include_directories (
          ./../../system/hrpc 
          ./../../common/remote 
          ./../../system/include 
+         ./../../system/jhtree
          ./../../system/jlib 
+         ./../../rtl/eclrtl
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
          ./../../system/security/shared

+ 1 - 1
rtl/eclrtl/rtlkey.cpp

@@ -213,9 +213,9 @@ public:
     {
         lastCompareResult = false;
         hasCompareResult = false;
-        lastCompareValue = NULL;
         set.setown(deserializeStringSet(mb));
         mb.read(optional);
+        lastCompareValue = new char[size];
     }
     ~CSetKeySegmentMonitor();
 

+ 25 - 2
system/jhtree/jhtree.cpp

@@ -260,6 +260,18 @@ void SegMonitorList::recalculateCache()
     cachedLRS = _lastRealSeg();
 }
 
+void SegMonitorList::reset()
+{
+    segMonitors.kill();
+    modified = true; mergeBarrier = 0;
+}
+
+void SegMonitorList::swapWith(SegMonitorList &other)
+{
+    reset();
+    other.segMonitors.swapWith(segMonitors);
+}
+
 void SegMonitorList::deserialize(MemoryBuffer &mb)
 {
     unsigned num;
@@ -635,6 +647,11 @@ public:
         }
     }
 
+    virtual void setChooseNLimit(unsigned __int64 _rowLimit) override
+    {
+        // TODO ?
+    }
+
     virtual void reset(bool crappyHack)
     {
         if (keyCursor)
@@ -642,8 +659,9 @@ public:
             if (!started)
             {
                 started = true;
-                segs.checkSize(keyedSize, keyName.get());
-                numsegs = segs.segMonitors.length();
+                numsegs = segs.ordinality();
+                if (numsegs)
+                    segs.checkSize(keyedSize, keyName.get());
             }
             if (!crappyHack)
             {
@@ -950,6 +968,11 @@ public:
         activitySegs->setMergeBarrier(offset); 
     }
 
+    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
+    {
+        segs.swapWith(segmentMonitors);
+    }
+
     virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
     {
         segs.deserialize(mb);

+ 5 - 1
system/jhtree/jhtree.hpp

@@ -162,9 +162,11 @@ class jhtree_decl SegMonitorList : implements IInterface, implements IIndexReadC
     bool modified;
 public:
     IMPLEMENT_IINTERFACE;
-    inline SegMonitorList() { modified = true; mergeBarrier = 0; }
+    inline SegMonitorList() { reset(); }
     IArrayOf<IKeySegmentMonitor> segMonitors;
 
+    void reset();
+    void swapWith(SegMonitorList &other);
     void setLow(unsigned segno, void *keyBuffer) const;
     unsigned setLowAfter(size32_t offset, void *keyBuffer) const;
     bool incrementKey(unsigned segno, void *keyBuffer) const;
@@ -203,6 +205,7 @@ interface IKeyManager : public IInterface, extends IIndexReadContext
     virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) = 0;
     virtual bool nextRange(unsigned groupSegCount) = 0;
     virtual void setKey(IKeyIndexBase * _key) = 0;
+    virtual void setChooseNLimit(unsigned __int64 _rowLimit) = 0; // for choosen type functionality
     virtual unsigned __int64 checkCount(unsigned __int64 limit) = 0;
     virtual void serializeCursorPos(MemoryBuffer &mb) = 0;
     virtual void deserializeCursorPos(MemoryBuffer &mb) = 0;
@@ -215,6 +218,7 @@ interface IKeyManager : public IInterface, extends IIndexReadContext
     virtual void resetCounts() = 0;
 
     virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) = 0;
+    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) = 0;
     virtual void deserializeSegmentMonitors(MemoryBuffer &mb) = 0;
     virtual void finishSegmentMonitors() = 0;
 

+ 39 - 19
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -19,6 +19,7 @@
 #include "jfile.hpp"
 #include "jtime.hpp"
 #include "jsort.hpp"
+#include "sockfile.hpp"
 
 #include "rtlkey.hpp"
 #include "jhtree.hpp"
@@ -48,6 +49,7 @@ protected:
     Owned<IOutputRowDeserializer> deserializer;
     Owned<IOutputRowSerializer> serializer;
     bool localKey = false;
+    size32_t seekGEOffset = 0;
     __int64 lastSeeks = 0, lastScans = 0;
     UInt64Array _statsArr;
     SpinLock statLock;  // MORE: Can this be avoided by passing in the delta?
@@ -99,7 +101,7 @@ protected:
     } callback;
 
     virtual bool keyed() { return false; }
-    void setManager(IKeyManager *manager)
+    virtual void setManager(IKeyManager *manager)
     {
         clearManager();
         currentManager = manager;
@@ -130,17 +132,6 @@ protected:
             return row.finalizeRowClear(sz);
         return NULL;
     }
-    IKeyIndex *openKeyPart(IPartDescriptor &partDesc)
-    {
-        RemoteFilename rfn;
-        partDesc.getFilename(0, rfn);
-        StringBuffer filePath;
-        rfn.getPath(filePath);
-        unsigned crc=0;
-        partDesc.getCrc(crc);
-        Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, logicalFilename, partDesc);
-        return createKeyIndex(filePath.str(), crc, *lfile, false, false);
-    }
     const void *nextKey()
     {
         if (eoi)
@@ -241,10 +232,27 @@ public:
         keyIndexSet.setown(createKeyIndexSet());
         ForEachItemIn(p, partDescs)
         {
-            Owned<IKeyIndex> keyIndex = openKeyPart(partDescs.item(p));
-            Owned<IKeyManager> klManager = createKeyManager(keyIndex, fixedDiskRecordSize, NULL);
+            IPartDescriptor &part = partDescs.item(p);
+            RemoteFilename rfn;
+            part.getFilename(0, rfn);
+            StringBuffer filePath;
+            rfn.getPath(filePath);
+
+            Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, logicalFilename, part);
+            Owned<IKeyManager> klManager;
+
+            bool remoteKey = !localKey && !seekGEOffset && (!rfn.isLocal() || getOptBool("forceDafilesrv"));
+            if (remoteKey && getOptBool("remoteKeyFilteringEnabled"))
+                klManager.setown(createRemoteKeyManager(filePath.str(), fixedDiskRecordSize, lfile));
+            else
+            {
+                unsigned crc=0;
+                part.getCrc(crc);
+                Owned<IKeyIndex> keyIndex = createKeyIndex(filePath.str(), crc, *lfile, false, false);
+                klManager.setown(createKeyManager(keyIndex, fixedDiskRecordSize, NULL));
+                keyIndexSet->addIndex(keyIndex.getClear());
+            }
             keyManagers.append(*klManager.getClear());
-            keyIndexSet->addIndex(keyIndex.getClear());
         }
     }
     // IThorDataLink
@@ -296,7 +304,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
     IHThorSteppedSourceExtra *steppedExtra;
     CSteppingMeta steppingMeta;
     UnsignedArray seekSizes;
-    size32_t seekGEOffset = 0;
 
     const void *getNextRow()
     {
@@ -324,6 +331,12 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
         }
         return nullptr;
     }
+    virtual void setManager(IKeyManager *manager) override
+    {
+        PARENT::setManager(manager);
+        if (stopAfter && !helper->transformMayFilter())
+            manager->setChooseNLimit(stopAfter);
+    }
     const void *nextKeyGE(const void *seek, unsigned numFields)
     {
         assertex(keyMergerManager.get());
@@ -493,9 +506,10 @@ public:
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
+        stopAfter = (rowcount_t)helper->getChooseNLimit();
+
         PARENT::start();
 
-        stopAfter = (rowcount_t)helper->getChooseNLimit();
         needTransform = helper->needTransform();
         helperKeyedLimit = (rowcount_t)helper->getKeyedLimit();
         rowLimit = (rowcount_t)helper->getRowLimit(); // MORE - if no filtering going on could keyspan to get count
@@ -762,6 +776,12 @@ public:
         helper = static_cast <IHThorIndexCountArg *> (container.queryHelper());
         appendOutputLinked(this);
     }
+    virtual void setManager(IKeyManager *manager) override
+    {
+        PARENT::setManager(manager);
+        if (choosenLimit && !helper->hasFilter())
+            manager->setChooseNLimit(choosenLimit);
+    }
 
 // IThorDataLink
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
@@ -774,8 +794,8 @@ public:
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
-        PARENT::start();
         choosenLimit = (rowcount_t)helper->getChooseNLimit();
+        PARENT::start();
         if (!helper->canMatchAny())
         {
             totalCountKnown = true;
@@ -924,12 +944,12 @@ public:
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
+        stopAfter = (rowcount_t)helper->getChooseNLimit();
         PARENT::start();
         keyedLimit = (rowcount_t)helper->getKeyedLimit();
         rowLimit = (rowcount_t)helper->getRowLimit();
         if (helper->getFlags() & TIRlimitskips)
             rowLimit = RCMAX;
-        stopAfter = (rowcount_t)helper->getChooseNLimit();
         expanding = false;
         keyedProcessed = 0;
         keyedLimitCount = RCMAX;

+ 10 - 1
thorlcr/thorutil/thormisc.cpp

@@ -47,6 +47,7 @@
 #include "rtlread_imp.hpp"
 #include "rtlfield_imp.hpp"
 #include "rtlds_imp.hpp"
+#include "rmtfile.hpp"
 
 namespace thormisc {  // Make sure we can't clash with generated versions or version check mechanism fails.
  #include "eclhelper_base.hpp" 
@@ -980,7 +981,15 @@ bool getBestFilePart(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIF
         {
             rfn.getPath(locationName.clear());
             assertex(locationName.length());
-            Owned<IFile> file = createIFile(locationName.str());
+
+            Owned<IFile> file;
+            if (activity->getOptBool("forceDafilesrv"))
+            {
+                PROGLOG("Using dafilesrv for: %s", locationName.str());
+                file.setown(createDaliServixFile(rfn));
+            }
+            else
+                file.setown(createIFile(locationName.str()));
             try
             {
                 if (file->exists())