Переглянути джерело

HPCC-17690 Fallback to local key manager

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 роки тому
батько
коміт
15afff5ac7

+ 1 - 1
common/fileview2/fvidxsource.cpp

@@ -446,7 +446,7 @@ bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned siz
 
 void IndexDataSource::applyFilter()
 {
-    manager.setown(createKeyManager(tlk, tlk->keySize(), NULL));
+    manager.setown(createLocalKeyManager(tlk, tlk->keySize(), NULL));
     ForEachItemIn(i, values)
     {
         IStringSet & cur = values.item(i);

+ 363 - 103
common/remote/sockfile.cpp

@@ -43,6 +43,8 @@
 
 #define SOCKET_CACHE_MAX 500
 
+#define MIN_KEYFILTSUPPORT_VERSION 20
+
 #ifdef _DEBUG
 //#define SIMULATE_PACKETLOSS 1
 #endif
@@ -1072,7 +1074,7 @@ protected: friend class CRemoteFileIO;
     SocketEndpoint      ep;
 
 
-    void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false)
+    void sendRemoteCommand(MemoryBuffer & src, MemoryBuffer & reply, bool retry=true, bool lengthy=false, bool handleErrCode=true)
     {
         CriticalBlock block(crit);  // serialize commands on same file
         SocketEndpoint tep(ep);
@@ -1154,6 +1156,8 @@ protected: friend class CRemoteFileIO;
             }
         }
 
+        if (!handleErrCode)
+            return;
         unsigned errCode;
         reply.read(errCode);
         if (errCode) {
@@ -2482,9 +2486,9 @@ public:
 
     void setDisconnectOnExit(bool set) { disconnectonexit = set; }
 
-    void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false)
+    void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false, bool handleErrCode=true)
     {
-        parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy);
+        parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy, handleErrCode);
     }
 };
 
@@ -2598,6 +2602,100 @@ void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *pr
 
 /////////////////////////
 
+unsigned getRemoteVersion(CRemoteFileIO &remoteFileIO, StringBuffer &ver)
+{
+    unsigned ret;
+    MemoryBuffer sendBuffer;
+    initSendBuffer(sendBuffer);
+    sendBuffer.append((RemoteFileCommandType)RFCgetver);
+    sendBuffer.append((unsigned)RFCgetver);
+    MemoryBuffer replyBuffer;
+    try
+    {
+        remoteFileIO.sendRemoteCommand(sendBuffer, replyBuffer, true, false, false);
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e);
+        ::Release(e);
+        return 0;
+    }
+    unsigned errCode;
+    replyBuffer.read(errCode);
+    if (errCode==RFSERR_InvalidCommand)
+    {
+        ver.append("DS V1.0");
+        return 10;
+    }
+    else if (errCode==0)
+        ret = 11;
+    else if (errCode<0x10000)
+        return 0;
+    else
+        ret = errCode-0x10000;
+
+    StringAttr vers;
+    replyBuffer.read(vers);
+    ver.append(vers);
+    return ret;
+}
+
+unsigned getRemoteVersion(ISocket * socket, StringBuffer &ver)
+{
+    // used to have a global critical section here
+    if (!socket)
+        return 0;
+
+    Owned<ISecureSocket> ssock;
+
+    if (securitySettings.useSSL && !socket->isSecure())
+    {
+#ifdef _USE_OPENSSL
+        ssock.setown(createSecureSocket(LINK(socket), ClientSocket));
+        int status = ssock->secure_connect();
+        if (status < 0)
+            throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection");
+        socket = ssock;
+#else
+        throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
+#endif
+    }
+
+    unsigned ret;
+    MemoryBuffer sendbuf;
+    initSendBuffer(sendbuf);
+    sendbuf.append((RemoteFileCommandType)RFCgetver);
+    sendbuf.append((unsigned)RFCgetver);
+    MemoryBuffer reply;
+    try {
+        sendBuffer(socket, sendbuf);
+        receiveBuffer(socket, reply, 1 ,4096);
+        unsigned errCode;
+        reply.read(errCode);
+        if (errCode==RFSERR_InvalidCommand) {
+            ver.append("DS V1.0");
+            return 10;
+        }
+        else if (errCode==0)
+            ret = 11;
+        else if (errCode<0x10000)
+            return 0;
+        else
+            ret = errCode-0x10000;
+    }
+    catch (IException *e) {
+        EXCLOG(e);
+        ::Release(e);
+        return 0;
+    }
+    StringAttr vers;
+    reply.read(vers);
+    ver.append(vers);
+    return ret;
+}
+
+/////////////////////////
+
 class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
 {
     StringAttr filename;
@@ -2605,10 +2703,8 @@ class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
     SegMonitorList segs;
     size32_t rowDataRemaining = 0;
     MemoryBuffer rowDataBuffer;
-    size32_t keyCursorSz = 0;        // used for continuation
-    const void *keyCursor = nullptr; // used for continuation
+    MemoryBuffer keyCursorMb;        // used for continuation
     unsigned __int64 totalGot = 0;
-    size32_t maxRecsPerRequest = 100; // arbritary # recs per request, perhaps should be based on recsize
     size32_t keySize = 0;
     size32_t currentSize = 0;
     offset_t currentFpos = 0;
@@ -2616,6 +2712,9 @@ class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
     bool first = true;
     unsigned __int64 chooseNLimit = 0;
     ConstPointerArray activeBlobs;
+    unsigned crc = 0;
+    mutable bool hasRemoteSupport = false; // must check 1st
+    mutable Owned<IKeyManager> directKM; // failover manager if remote key support is unavailable
 
     CRemoteFileIO *prepKeySend(MemoryBuffer &sendBuffer, RemoteFileCommandType cmd, bool segmentMonitors)
     {
@@ -2630,6 +2729,37 @@ class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
             segs.serialize(sendBuffer);
         return remoteIO.getClear();
     }
+    bool remoteSupport() const
+    {
+        if (hasRemoteSupport)
+            return true;
+        else if (directKM)
+            return false;
+        Owned<IFileIO> iFileIO = delayedFile->getFileIO();
+        if (!iFileIO)
+            throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
+        Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
+        bool useRemote = nullptr != remoteIO.get();
+        if (useRemote)
+        {
+            StringBuffer verString;
+            unsigned ver = getRemoteVersion(*remoteIO, verString);
+            if (ver < MIN_KEYFILTSUPPORT_VERSION)
+                useRemote = false;
+        }
+        if (useRemote)
+        {
+            PROGLOG("Using remote key manager for file: %s", filename.get());
+            hasRemoteSupport = true;
+        }
+        else
+        {
+            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
+            directKM.setown(createLocalKeyManager(keyIndex, keySize, nullptr));
+            return false;
+        }
+        return true;
+    }
     unsigned __int64 _checkCount(unsigned __int64 limit)
     {
         MemoryBuffer sendBuffer;
@@ -2642,42 +2772,62 @@ class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
         return count;
     }
 public:
-    CRemoteKeyManager(const char *_filename, unsigned _keySize, IDelayedFile *_delayedFile) : filename(_filename), keySize(_keySize), delayedFile(_delayedFile)
+    CRemoteKeyManager(const char *_filename, unsigned _keySize, unsigned _crc, IDelayedFile *_delayedFile) : filename(_filename), keySize(_keySize), crc(_crc), delayedFile(_delayedFile)
     {
     }
     ~CRemoteKeyManager()
     {
         releaseBlobs();
     }
+// IKeyManager impl.
     virtual void reset(bool crappyHack = false) override
     {
+        if (!remoteSupport())
+        {
+            directKM->reset(crappyHack);
+            return;
+        }
         rowDataBuffer.clear();
         rowDataRemaining = 0;
+        keyCursorMb.clear();
         currentSize = 0;
         currentFpos = 0;
         currentRow = nullptr;
         first = true;
-        maxRecsPerRequest = 100;
         totalGot = 0;
-        keyCursorSz = 0;
-        keyCursor = nullptr;
     }
-    virtual void releaseSegmentMonitors() override { segs.reset(); }
+    virtual void releaseSegmentMonitors() override
+    {
+        if (!remoteSupport())
+        {
+            directKM->releaseSegmentMonitors();
+            return;
+        }
+        segs.reset();
+    }
     virtual const byte *queryKeyBuffer(offset_t & fpos) override
     {
+        if (!remoteSupport())
+            return directKM->queryKeyBuffer(fpos);;
         fpos = currentFpos;
         return currentRow;
     }
     virtual offset_t queryFpos() override
     {
+        if (!remoteSupport())
+            return directKM->queryFpos();
         return currentFpos;
     }
     virtual unsigned queryRecordSize() override
     {
+        if (!remoteSupport())
+            return directKM->queryRecordSize();
         return currentSize;
     }
     virtual bool lookup(bool exact) override
     {
+        if (!remoteSupport())
+            return directKM->lookup(exact);
         while (true)
         {
             if (rowDataRemaining)
@@ -2690,16 +2840,19 @@ public:
             }
             else
             {
-                if (!first && (nullptr == keyCursor)) // No keyCursor implies there is nothing more to fetch
+                if (!first && (0 == keyCursorMb.length())) // No keyCursor implies there is nothing more to fetch
                     return false;
-                unsigned maxRecs = maxRecsPerRequest;
-                if (maxRecs && chooseNLimit)
+                unsigned maxRecs = 0;
+                if (chooseNLimit)
                 {
-                    if (totalGot + maxRecs > chooseNLimit)
-                        maxRecs = (unsigned)(chooseNLimit - totalGot);
+                    if (totalGot == chooseNLimit)
+                        break;
+                    unsigned __int64 max = chooseNLimit-totalGot;
+                    if (max > UINT_MAX)
+                        maxRecs = UINT_MAX;
+                    else
+                        maxRecs = (unsigned)max;
                 }
-                if (0 == maxRecs)
-                    break;
                 MemoryBuffer sendBuffer;
                 Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindex, true);
                 sendBuffer.append(first).append(maxRecs);
@@ -2707,8 +2860,8 @@ public:
                     first = false;
                 else
                 {
-                    dbgassertex(keyCursor);
-                    sendBuffer.append(keyCursorSz, keyCursor);
+                    dbgassertex(keyCursorMb.length());
+                    sendBuffer.append(keyCursorMb);
                 }
                 rowDataBuffer.clear();
                 remoteIO->sendRemoteCommand(sendBuffer, rowDataBuffer);
@@ -2716,18 +2869,18 @@ public:
                 rowDataBuffer.read(recsGot);
                 if (0 == recsGot)
                 {
-                    maxRecsPerRequest = 0;
+                    keyCursorMb.clear(); // signals no more data if called again.
                     break; // end
                 }
                 totalGot += recsGot;
                 rowDataBuffer.read(rowDataRemaining);
                 unsigned pos = rowDataBuffer.getPos(); // start of row data
                 const void *rowData = rowDataBuffer.readDirect(rowDataRemaining);
+                size32_t keyCursorSz;
                 rowDataBuffer.read(keyCursorSz);
+                keyCursorMb.clear();
                 if (keyCursorSz)
-                    keyCursor = rowDataBuffer.readDirect(keyCursorSz);
-                else
-                    keyCursor = nullptr;
+                    keyCursorMb.append(keyCursorSz, rowDataBuffer.readDirect(keyCursorSz));
                 rowDataBuffer.reset(pos); // reposition to start of row data
             }
         }
@@ -2735,27 +2888,92 @@ public:
     }
     virtual unsigned __int64 getCount() override
     {
+        if (!remoteSupport())
+            return directKM->getCount();
         return _checkCount((unsigned __int64)-1);
     }
-    virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) override { UNIMPLEMENTED; }
-    virtual bool nextRange(unsigned groupSegCount) override { UNIMPLEMENTED; }
-    virtual void setKey(IKeyIndexBase * _key) override { UNIMPLEMENTED; }
+    virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) override
+    {
+        if (!remoteSupport())
+            return directKM->getCurrentRangeCount(groupSegCount);
+        UNIMPLEMENTED;
+    }
+    virtual bool nextRange(unsigned groupSegCount) override
+    {
+        if (!remoteSupport())
+            return directKM->nextRange(groupSegCount);
+        UNIMPLEMENTED;
+    }
+    virtual void setKey(IKeyIndexBase * _key) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->setKey(_key);
+            return;
+        }
+        UNIMPLEMENTED;
+    }
     virtual void setChooseNLimit(unsigned __int64 _chooseNLimit) override
     {
+        if (!remoteSupport())
+        {
+            directKM->setChooseNLimit(_chooseNLimit);
+            return;
+        }
         chooseNLimit = _chooseNLimit;
     }
     virtual unsigned __int64 checkCount(unsigned __int64 limit) override
     {
+        if (!remoteSupport())
+            directKM->checkCount(limit);
         return _checkCount(limit);
     }
-    virtual void serializeCursorPos(MemoryBuffer &mb) override { UNIMPLEMENTED; }
-    virtual void deserializeCursorPos(MemoryBuffer &mb) override { UNIMPLEMENTED; }
-    virtual unsigned querySeeks() const override { return 0; }
-    virtual unsigned queryScans() const override { return 0; }
-    virtual unsigned querySkips() const override { return 0; }
-    virtual unsigned queryNullSkips() const override { return 0; }
+    virtual void serializeCursorPos(MemoryBuffer &mb) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->serializeCursorPos(mb);
+            return;
+        }
+        UNIMPLEMENTED;
+    }
+    virtual void deserializeCursorPos(MemoryBuffer &mb) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->deserializeCursorPos(mb);
+            return;
+        }
+        UNIMPLEMENTED;
+    }
+    virtual unsigned querySeeks() const override
+    {
+        if (!remoteSupport())
+            return directKM->querySeeks();
+        return 0;
+    }
+    virtual unsigned queryScans() const override
+    {
+        if (!remoteSupport())
+            return directKM->queryScans();
+        return 0;
+    }
+    virtual unsigned querySkips() const override
+    {
+        if (!remoteSupport())
+            return directKM->querySkips();
+        return 0;
+    }
+    virtual unsigned queryNullSkips() const override
+    {
+        if (!remoteSupport())
+            return directKM->queryNullSkips();
+        return 0;
+    }
     virtual const byte *loadBlob(unsigned __int64 blobId, size32_t &blobSize) override
     {
+        if (!remoteSupport())
+            return directKM->loadBlob(blobId, blobSize);
         MemoryBuffer sendBuffer;
         Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexblob, false);
         sendBuffer.append(blobId);
@@ -2768,89 +2986,116 @@ public:
     }
     virtual void releaseBlobs() override
     {
+        if (!remoteSupport())
+            return directKM->releaseBlobs();
         ForEachItemIn(idx, activeBlobs)
         {
             free((void *) activeBlobs.item(idx));
         }
         activeBlobs.kill();
     }
-    virtual void resetCounts() override { UNIMPLEMENTED; }
+    virtual void resetCounts() override
+    {
+        if (!remoteSupport())
+        {
+            directKM->resetCounts();
+            return;
+        }
+        UNIMPLEMENTED;
+    }
 
-    virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) override { UNIMPLEMENTED; }
-    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override { segs.swapWith(segmentMonitors); }
-    virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override { segs.deserialize(mb); }
-    virtual void finishSegmentMonitors() override { }
-    virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) override { UNIMPLEMENTED; }
+    virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->setLayoutTranslator(trans);
+            return;
+        }
+        UNIMPLEMENTED;
+    }
+    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->setSegmentMonitors(segmentMonitors);
+            return;
+        }
+        segs.swapWith(segmentMonitors);
+    }
+    virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->deserializeSegmentMonitors(mb);
+            return;
+        }
+        segs.deserialize(mb);
+    }
+    virtual void finishSegmentMonitors() override
+    {
+        if (!remoteSupport())
+        {
+            directKM->finishSegmentMonitors();
+            return;
+        }
+    }
+    virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) override
+    {
+        if (!remoteSupport())
+            return directKM->lookupSkip(seek, seekGEOffset, seeklen);
+        UNIMPLEMENTED;
+    }
     virtual void append(IKeySegmentMonitor *segment) override
     {
+        if (!remoteSupport())
+        {
+            directKM->append(segment);
+            return;
+        }
         segs.append(segment);
     }
-    virtual unsigned ordinality() const override { return segs.ordinality(); }
-    virtual IKeySegmentMonitor *item(unsigned idx) const override { return segs.item(idx); }
-    virtual void setMergeBarrier(unsigned offset) override { UNIMPLEMENTED; }
+    virtual unsigned ordinality() const override
+    {
+        if (!remoteSupport())
+            return directKM->ordinality();
+        return segs.ordinality();
+    }
+    virtual IKeySegmentMonitor *item(unsigned idx) const override
+    {
+        if (!remoteSupport())
+            return directKM->item(idx);
+        return segs.item(idx);
+    }
+    virtual void setMergeBarrier(unsigned offset) override
+    {
+        if (!remoteSupport())
+        {
+            directKM->setMergeBarrier(offset);
+            return;
+        }
+        UNIMPLEMENTED;
+    }
 };
 
-IKeyManager * createRemoteKeyManager(const char *filename, unsigned keySize, IDelayedFile *delayedFile)
+IKeyManager *createRemoteKeyManager(const char *filename, unsigned keySize, unsigned crc, IDelayedFile *delayedFile)
 {
-    return new CRemoteKeyManager(filename, keySize, delayedFile);
+    return new CRemoteKeyManager(filename, keySize, crc, delayedFile);
 }
 
-//////////////
-
-unsigned getRemoteVersion(ISocket * socket, StringBuffer &ver)
+IKeyManager *createKeyManager(const char *filename, unsigned keySize, unsigned crc, IDelayedFile *delayedFile, bool allowRemote, bool forceRemote)
 {
-    // used to have a global critical section here
-    if (!socket)
-        return 0;
-
-    Owned<ISecureSocket> ssock;
-
-    if (securitySettings.useSSL && !socket->isSecure())
+    RemoteFilename rfn;
+    rfn.setRemotePath(filename);
+    if (forceRemote || (allowRemote && !rfn.isLocal()))
+        return createRemoteKeyManager(filename, keySize, crc, delayedFile);
+    else
     {
-#ifdef _USE_OPENSSL
-        ssock.setown(createSecureSocket(LINK(socket), ClientSocket));
-        int status = ssock->secure_connect();
-        if (status < 0)
-            throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection");
-        socket = ssock;
-#else
-        throw createDafsException(DAFSERR_connection_failed,"Failure to establish secure connection: OpenSSL disabled in build");
-#endif
+        Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
+        return createLocalKeyManager(keyIndex, keySize, nullptr);
     }
-
-    unsigned ret;
-    MemoryBuffer sendbuf;
-    initSendBuffer(sendbuf);
-    sendbuf.append((RemoteFileCommandType)RFCgetver);
-    sendbuf.append((unsigned)RFCgetver);
-    MemoryBuffer reply;
-    try {
-        sendBuffer(socket, sendbuf);
-        receiveBuffer(socket, reply, 1 ,4096);
-        unsigned errCode;
-        reply.read(errCode);
-        if (errCode==RFSERR_InvalidCommand) {
-            ver.append("DS V1.0");
-            return 10;
-        }
-        else if (errCode==0)
-            ret = 11;
-        else if (errCode<0x10000)
-            return 0;
-        else
-            ret = errCode-0x10000;
-    }
-    catch (IException *e) {
-        EXCLOG(e);
-        ::Release(e);
-        return 0;
-    }
-    StringAttr vers;
-    reply.read(vers);
-    ver.append(vers);
-    return ret;
 }
 
+//////////////
 
 extern unsigned stopRemoteServer(ISocket * socket)
 {
@@ -3386,6 +3631,8 @@ struct OpenFileInfo
     unsigned flags = 0;
 };
 
+#define MAX_KEYDATA_SZ 0x10000
+
 class CRemoteFileServer : implements IRemoteFileServer, public CInterface
 {
     class CThrottler;
@@ -4061,11 +4308,13 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         return false;
     }
 
-    unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply)
+    unsigned readKeyData(IKeyManager *keyManager, unsigned maxRecs, MemoryBuffer &reply, bool &maxHit)
     {
         DelayedSizeMarker keyDataSzReturned(reply);
         unsigned numRecs = 0;
-        while (maxRecs-- && keyManager->lookup(true))
+        maxHit = false;
+        unsigned pos = reply.length();
+        while (keyManager->lookup(true))
         {
             unsigned size = keyManager->queryRecordSize();
             offset_t fpos;
@@ -4074,6 +4323,16 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
             reply.append(size);
             reply.append(size, result);
             ++numRecs;
+            if (maxRecs && (0 == --maxRecs))
+            {
+                maxHit = true;
+                break;
+            }
+            if (reply.length()-pos >= MAX_KEYDATA_SZ)
+            {
+                maxHit = true;
+                break;
+            }
         }
         keyDataSzReturned.write();
         return numRecs;
@@ -4093,7 +4352,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
             VStringBuffer errStr("Error opening key file : %s", keyname);
             throw createDafsException(RFSERR_KeyIndexFailed, errStr.str());
         }
-        Owned<IKeyManager> keyManager = createKeyManager(index, keySize, nullptr);
+        Owned<IKeyManager> keyManager = createLocalKeyManager(index, keySize, nullptr);
         if (segs)
         {
             keyManager->setSegmentMonitors(*segs);
@@ -4480,20 +4739,21 @@ public:
     {
         Owned<IKeyManager> keyManager = prepKey(msg, true);
         bool first;
-        size32_t maxRecs;
+        unsigned maxRecs;
         msg.read(first).read(maxRecs);
         if (!first)
             keyManager->deserializeCursorPos(msg);
 
         reply.append((unsigned)RFEnoerror);
         DelayedMarker<unsigned> numReturned(reply);
-        unsigned numRecs = readKeyData(keyManager, maxRecs, reply);
+        bool maxHit;
+        unsigned numRecs = readKeyData(keyManager, maxRecs, reply, maxHit);
         numReturned.write(numRecs);
 
-        DelayedSizeMarker keyCursorSz(reply);
-        if (numRecs >= maxRecs) // no point in cursor if no more recs to return
+        DelayedSizeMarker keyCursorSzMarker(reply);
+        if (maxHit) // if maximum hit, either supplied maxRecs limit, or buffer limit, return cursor
             keyManager->serializeCursorPos(reply);
-        keyCursorSz.write();
+        keyCursorSzMarker.write();
         return true;
     }
 

+ 2 - 1
common/remote/sockfile.hpp

@@ -65,7 +65,8 @@ interface IRemoteFileServer : extends IInterface
 interface IKeyManager;
 interface IDelayedFile;
 extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename);
-extern REMOTE_API IKeyManager * createRemoteKeyManager(const char *filename, unsigned keySize, IDelayedFile *delayedFile);
+extern REMOTE_API IKeyManager *createKeyManager(const char *filename, unsigned keySize, unsigned crc, IDelayedFile *delayedFile, bool allowRemote, bool forceRemote);
+extern REMOTE_API IKeyManager * createRemoteKeyManager(const char *filename, unsigned keySize, unsigned crc, IDelayedFile *delayedFile);
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();

+ 7 - 7
ecl/hthor/hthorkey.cpp

@@ -475,7 +475,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
         {
             Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
             verifyIndex(tlk);
-            Owned<IKeyManager> tlman = createKeyManager(tlk, keySize, NULL);
+            Owned<IKeyManager> tlman = createLocalKeyManager(tlk, keySize, NULL);
             initManager(tlman);
             while(tlman->lookup(false) && (count<=limit))
             {
@@ -511,7 +511,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r
         verifyIndex(kidx);
     if (limit != (unsigned) -1)
     {
-        Owned<IKeyManager> kman = createKeyManager(kidx, keySize, NULL);
+        Owned<IKeyManager> kman = createLocalKeyManager(kidx, keySize, NULL);
         initManager(kman);
         result += kman->checkCount(limit-result);
     }
@@ -612,7 +612,7 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager)
 
 void CHThorIndexReadActivityBase::initPart()                                    
 { 
-    klManager.setown(createKeyManager(keyIndex, keySize, NULL));
+    klManager.setown(createLocalKeyManager(keyIndex, keySize, NULL));
     initManager(klManager);     
     callback.setManager(klManager);
 }
@@ -642,7 +642,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart()
     if(!tlk)
         openTlk();
     verifyIndex(tlk);
-    tlManager.setown(createKeyManager(tlk, keySize, NULL));
+    tlManager.setown(createLocalKeyManager(tlk, keySize, NULL));
     initManager(tlManager);
     nextPartNumber = 0;
     return nextMultiPart();
@@ -3136,7 +3136,7 @@ public:
             //Owned<IRecordLayoutTranslator> 
             trans.setown(owner.getLayoutTranslator(&f));
             owner.verifyIndex(&f, index, trans);
-            Owned<IKeyManager> manager = createKeyManager(index, index->keySize(), NULL);
+            Owned<IKeyManager> manager = createLocalKeyManager(index, index->keySize(), NULL);
             if(trans)
                 manager->setLayoutTranslator(trans);
             managers.append(*manager.getLink());
@@ -3177,7 +3177,7 @@ void KeyedLookupPartHandler::openPart()
     if(manager)
         return;
     Owned<IKeyIndex> index = openKeyFile(*part);
-    manager.setown(createKeyManager(index, index->keySize(), NULL));
+    manager.setown(createLocalKeyManager(index, index->keySize(), NULL));
     IRecordLayoutTranslator * trans = tlk->queryRecordLayoutTranslator();
     if(trans)
         manager->setLayoutTranslator(trans);
@@ -3257,7 +3257,7 @@ public:
             {
                 Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
                 owner.verifyIndex(&f, index, trans);
-                manager.setown(createKeyManager(index, index->keySize(), NULL));
+                manager.setown(createLocalKeyManager(index, index->keySize(), NULL));
             }
             else
             {

+ 1 - 1
roxie/ccd/ccdactivities.cpp

@@ -3186,7 +3186,7 @@ protected:
             IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
             if (filechanged)
             {
-                tlk.setown(createKeyManager(k, 0, &logctx));
+                tlk.setown(createLocalKeyManager(k, 0, &logctx));
                 createSegmentMonitorsPending = true;
             }
             else

+ 7 - 7
roxie/ccd/ccdserver.cpp

@@ -22684,7 +22684,7 @@ public:
                                     if ((indexHelper.getFlags() & TIRcountkeyedlimit) != 0)
                                     {
                                         Owned<IKeyManager> countKey;
-                                        countKey.setown(createKeyManager(thisKey, 0, this));
+                                        countKey.setown(createLocalKeyManager(thisKey, 0, this));
                                         countKey->setLayoutTranslator(translators->item(fileNo));
                                         createSegmentMonitors(countKey);
                                         unsigned __int64 count = countKey->checkCount(keyedLimit);
@@ -22703,7 +22703,7 @@ public:
                             }
                             else
                             {
-                                tlk.setown(createKeyManager(thisKey, 0, this));
+                                tlk.setown(createLocalKeyManager(thisKey, 0, this));
                                 tlk->setLayoutTranslator(translators->item(fileNo));
                             }
                             createSegmentMonitors(tlk);
@@ -22957,7 +22957,7 @@ public:
             if (owner.seekGEOffset)
                 tlk.setown(createKeyMerger(keySet, 0, owner.seekGEOffset, &owner));
             else
-                tlk.setown(createKeyManager(keySet->queryPart(0), 0, &owner));
+                tlk.setown(createLocalKeyManager(keySet->queryPart(0), 0, &owner));
             tlk->setLayoutTranslator(trans);
             owner.indexHelper.createSegmentMonitors(tlk);
             tlk->finishSegmentMonitors();
@@ -23371,7 +23371,7 @@ public:
         unsigned __int64 result = 0;
         for (unsigned i = 0; i < numParts; i++)
         {
-            Owned<IKeyManager> countTlk = createKeyManager(keyIndexSet->queryPart(i), 0, this);
+            Owned<IKeyManager> countTlk = createLocalKeyManager(keyIndexSet->queryPart(i), 0, this);
             countTlk->setLayoutTranslator(translators->item(i));
             indexHelper.createSegmentMonitors(countTlk);
             countTlk->finishSegmentMonitors();
@@ -23408,7 +23408,7 @@ public:
             }
             else
             {
-                tlk.setown(createKeyManager(keyIndexSet->queryPart(0), 0, this));
+                tlk.setown(createLocalKeyManager(keyIndexSet->queryPart(0), 0, this));
                 tlk->setLayoutTranslator(translators->item(0));
             }
             indexHelper.createSegmentMonitors(tlk);
@@ -25171,7 +25171,7 @@ public:
     CRoxieServerFullKeyedJoinHead(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, IKeyArray * _keySet, TranslatorArray *_translators, IOutputMetaData *_indexReadMeta, IJoinProcessor *_joinHandler, bool _isLocal)
         : CRoxieServerActivity(_ctx, _factory, _probeManager),
           helper((IHThorKeyedJoinArg &)basehelper), 
-          tlk(createKeyManager(NULL, 0, this)),
+          tlk(createLocalKeyManager(NULL, 0, this)),
           translators(_translators),
           keySet(_keySet),
           remote(_ctx, this, _remoteId, 0, helper, *this, true, true),
@@ -26046,7 +26046,7 @@ public:
         IOutputMetaData *_indexReadMeta, unsigned _joinFlags, bool _isSimple, bool _isLocal)
         : CRoxieServerKeyedJoinBase(_ctx, _factory, _probeManager, _remoteId, _joinFlags, false, _isSimple, _isLocal),
           indexReadMeta(_indexReadMeta),
-          tlk(createKeyManager(NULL, 0, this)),
+          tlk(createLocalKeyManager(NULL, 0, this)),
           keySet(_keySet),
           translators(_translators)
     {

+ 7 - 7
system/jhtree/jhtree.cpp

@@ -2980,7 +2980,7 @@ extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
     return new CKeyIndexSet;
 }
 
-extern jhtree_decl IKeyManager *createKeyManager(IKeyIndex *key, unsigned _rawSize, IContextLogger *_ctx)
+extern jhtree_decl IKeyManager *createLocalKeyManager(IKeyIndex *key, unsigned _rawSize, IContextLogger *_ctx)
 {
     return new CKeyLevelManager(key, _rawSize, _ctx);
 }
@@ -3224,14 +3224,14 @@ protected:
         {
             unsigned maxSize = (variable && blobby) ? 18 : 10;
             Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
-            Owned <IKeyManager> tlk1 = createKeyManager(index1, maxSize, NULL);
+            Owned <IKeyManager> tlk1 = createLocalKeyManager(index1, maxSize, NULL);
             Owned<IStringSet> sset1 = createStringSet(10);
             sset1->addRange("0000000001", "0000000100");
             tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 10));
             tlk1->finishSegmentMonitors();
             tlk1->reset();
 
-            Owned <IKeyManager> tlk1a = createKeyManager(index1, maxSize, NULL);
+            Owned <IKeyManager> tlk1a = createLocalKeyManager(index1, maxSize, NULL);
             Owned<IStringSet> sset1a = createStringSet(8);
             sset1a->addRange("00000000", "00000001");
             tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 8));
@@ -3264,7 +3264,7 @@ protected:
 
 
             Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
-            Owned <IKeyManager> tlk2 = createKeyManager(index2, maxSize, NULL);
+            Owned <IKeyManager> tlk2 = createLocalKeyManager(index2, maxSize, NULL);
             Owned<IStringSet> sset2 = createStringSet(10);
             sset2->addRange("0000000001", "0000000100");
             ASSERT(sset2->numValues() == 65536);
@@ -3287,7 +3287,7 @@ protected:
                 tlk3->reset();
             }
 
-            Owned <IKeyManager> tlk2a = createKeyManager(index2, maxSize, NULL);
+            Owned <IKeyManager> tlk2a = createLocalKeyManager(index2, maxSize, NULL);
             Owned<IStringSet> sset2a = createStringSet(10);
             sset2a->addRange("0000000048", "0000000048");
             ASSERT(sset2a->numValues() == 1);
@@ -3295,7 +3295,7 @@ protected:
             tlk2a->finishSegmentMonitors();
             tlk2a->reset();
 
-            Owned <IKeyManager> tlk2b = createKeyManager(index2, maxSize, NULL);
+            Owned <IKeyManager> tlk2b = createLocalKeyManager(index2, maxSize, NULL);
             Owned<IStringSet> sset2b = createStringSet(10);
             sset2b->addRange("0000000047", "0000000049");
             ASSERT(sset2b->numValues() == 3);
@@ -3303,7 +3303,7 @@ protected:
             tlk2b->finishSegmentMonitors();
             tlk2b->reset();
 
-            Owned <IKeyManager> tlk2c = createKeyManager(index2, maxSize, NULL);
+            Owned <IKeyManager> tlk2c = createLocalKeyManager(index2, maxSize, NULL);
             Owned<IStringSet> sset2c = createStringSet(10);
             sset2c->addRange("0000000047", "0000000047");
             tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 10));

+ 1 - 1
system/jhtree/jhtree.hpp

@@ -225,7 +225,7 @@ interface IKeyManager : public IInterface, extends IIndexReadContext
     virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) = 0;
 };
 
-extern jhtree_decl IKeyManager *createKeyManager(IKeyIndex * _key, unsigned rawSize, IContextLogger *ctx);
+extern jhtree_decl IKeyManager *createLocalKeyManager(IKeyIndex * _key, unsigned rawSize, IContextLogger *ctx);
 extern jhtree_decl IKeyManager *createKeyMerger(IKeyIndexSet * _key, unsigned rawSize, unsigned sortFieldOffset, IContextLogger *ctx);
 extern jhtree_decl IKeyManager *createSingleKeyMerger(IKeyIndex * _onekey, unsigned rawSize, unsigned sortFieldOffset, IContextLogger *ctx);
 

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2352,7 +2352,7 @@ class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
         CKeyLookup(IndexDistributeSlaveActivity &_owner, IHThorKeyedDistributeArg *_helper, IKeyIndex *_tlk)
             : owner(_owner), helper(_helper), tlk(_tlk)
         {
-            tlkManager.setown(createKeyManager(tlk, tlk->keySize(), NULL));
+            tlkManager.setown(createLocalKeyManager(tlk, tlk->keySize(), NULL));
             numslaves = owner.queryContainer().queryJob().querySlaves();
         }
         unsigned hash(const void *data)

+ 1 - 1
thorlcr/activities/indexread/thindexread.cpp

@@ -161,7 +161,7 @@ protected:
                     throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", index->queryLogicalName());
 
                 unsigned fixedSize = indexBaseHelper->queryDiskRecordSize()->querySerializedDiskMeta()->getFixedSize(); // used only if fixed
-                Owned <IKeyManager> tlk = createKeyManager(keyIndex, fixedSize, NULL);
+                Owned <IKeyManager> tlk = createLocalKeyManager(keyIndex, fixedSize, NULL);
                 indexBaseHelper->createSegmentMonitors(tlk);
                 tlk->finishSegmentMonitors();
                 tlk->reset();

+ 15 - 10
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -229,7 +229,6 @@ public:
         _statsArr.append(0);
         statsArr = _statsArr.getArray();
         lastSeeks = lastScans = 0;
-        keyIndexSet.setown(createKeyIndexSet());
         ForEachItemIn(p, partDescs)
         {
             IPartDescriptor &part = partDescs.item(p);
@@ -241,17 +240,23 @@ public:
             Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, logicalFilename, part);
             Owned<IKeyManager> klManager;
 
-            bool remoteKey = !localKey && !seekGEOffset && (!rfn.isLocal() || getOptBool("forceDafilesrv"));
-            if (remoteKey && getOptBool("remoteKeyFilteringEnabled"))
-                klManager.setown(createRemoteKeyManager(filePath.str(), fixedDiskRecordSize, lfile));
-            else
+            unsigned crc=0;
+            part.getCrc(crc);
+
+            if ((localKey && partDescs.ordinality()>1) || seekGEOffset) // for now at least, no remote key support if stepping or merging
             {
-                unsigned crc=0;
-                part.getCrc(crc);
-                Owned<IKeyIndex> keyIndex = createKeyIndex(filePath.str(), crc, *lfile, false, false);
-                klManager.setown(createKeyManager(keyIndex, fixedDiskRecordSize, NULL));
+                Owned<IKeyIndex> keyIndex = createKeyIndex(filePath, crc, *lfile, false, false);
+                klManager.setown(createLocalKeyManager(keyIndex, fixedDiskRecordSize, nullptr));
+                if (!keyIndexSet)
+                    keyIndexSet.setown(createKeyIndexSet());
                 keyIndexSet->addIndex(keyIndex.getClear());
             }
+            else
+            {
+                bool allowRemote = getOptBool("remoteKeyFilteringEnabled");
+                bool forceRemote = allowRemote ? getOptBool("forceDafilesrv") : false; // can only force remote, if forceDafilesrv and remoteKeyFilteringEnabled are enabled.
+                klManager.setown(createKeyManager(filePath, fixedDiskRecordSize, crc, lfile, allowRemote, forceRemote));
+            }
             keyManagers.append(*klManager.getClear());
         }
     }
@@ -498,7 +503,7 @@ public:
             else
                 steppingMeta.init(rawMeta, hasPostFilter);
         }
-        if ((seekGEOffset || localKey))
+        if (keyIndexSet)
             keyMergerManager.setown(createKeyMerger(keyIndexSet, fixedDiskRecordSize, seekGEOffset, NULL));
     }
 

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1196,11 +1196,11 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
         CKeyLocalLookup(CKeyedJoinSlave &_owner) : owner(_owner), indexReadFieldsRow(_owner.indexInputAllocator)
         {
-            tlkManager = owner.keyHasTlk ? createKeyManager(NULL, owner.fixedRecordSize, NULL) : NULL;
+            tlkManager = owner.keyHasTlk ? createLocalKeyManager(NULL, owner.fixedRecordSize, NULL) : NULL;
             if (owner.localKey && owner.partKeySet->numParts() > 1)
                 partManager = createKeyMerger(owner.partKeySet, owner.fixedRecordSize, 0, NULL);
             else
-                partManager = createKeyManager(NULL, owner.fixedRecordSize, NULL);
+                partManager = createLocalKeyManager(NULL, owner.fixedRecordSize, NULL);
             reset();
         }
         ~CKeyLocalLookup()