瀏覽代碼

HPCC-18777 Pass record information to segment monitors

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父節點
當前提交
891e404175

+ 7 - 2
common/fileview2/fvidxsource.cpp

@@ -24,6 +24,8 @@
 #include "fvidxsource.ipp"
 #include "fverror.hpp"
 #include "dasess.hpp"
+#include "rtlrecord.hpp"
+#include "rtldynfield.hpp"
 
 //cloned from hthor - a candidate for commoning up.
 static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
@@ -122,7 +124,8 @@ IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _di
 {
     logicalName.set(_logicalName);
     diskRecord.set(_diskRecord);
-
+    deserializer.setown(createRtlFieldTypeDeserializer(nullptr));
+    diskRecordMeta.setown(new CDynamicOutputMetaData(* static_cast<const RtlRecordTypeInfo *>(queryRtlType(*deserializer.get(), diskRecord))));
     Owned<IUserDescriptor> udesc;
     if(_username != NULL && *_username != '\0')
     {
@@ -139,6 +142,8 @@ IndexDataSource::IndexDataSource(IndexDataSource * _other)
 {
     logicalName.set(_other->logicalName);
     diskRecord.set(_other->diskRecord);
+    deserializer.set(_other->deserializer);
+    diskRecordMeta.set(_other->diskRecordMeta);
     df.set(_other->df);
     original.set(_other);       // stop any work units etc. being unloaded.
     diskMeta.set(_other->diskMeta);     // optimization - would be handled by init anyway
@@ -427,7 +432,7 @@ bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned siz
 
 void IndexDataSource::applyFilter()
 {
-    manager.setown(createLocalKeyManager(tlk, NULL));
+    manager.setown(createLocalKeyManager(diskRecordMeta->queryRecordAccessor(true), tlk, NULL));
     ForEachItemIn(i, values)
     {
         IStringSet & cur = values.item(i);

+ 2 - 0
common/fileview2/fvidxsource.ipp

@@ -78,6 +78,8 @@ protected:
     IArrayOf<IStringSet> values;
     Owned<DataSourceMetaData> diskMeta;
     HqlExprAttr diskRecord;
+    Owned<IRtlFieldTypeDeserializer> deserializer;
+    Owned<IOutputMetaData> diskRecordMeta;
     Owned<IDistributedFile> df;
     Linked<FVDataSource> original;
     unsigned __int64 totalRows;

+ 1 - 499
common/remote/sockfile.cpp

@@ -340,7 +340,7 @@ enum {
     RFCsetthrottle2,
     RFCsetfileperms,
 // 2.0
-    RFCreadfilteredindex,
+    RFCreadfilteredindex,    // No longer used
     RFCreadfilteredindexcount,
     RFCreadfilteredindexblob,
     RFCmaxnormal,
@@ -2921,406 +2921,6 @@ unsigned getRemoteVersion(ISocket *origSock, StringBuffer &ver)
 
 /////////////////////////
 
-class CRemoteKeyManager : public CSimpleInterfaceOf<IKeyManager>
-{
-    StringAttr filename;
-    Linked<IDelayedFile> delayedFile;
-    SegMonitorList segs;
-    size32_t rowDataRemaining = 0;
-    MemoryBuffer rowDataBuffer;
-    MemoryBuffer keyCursorMb;        // used for continuation
-    unsigned __int64 totalGot = 0;
-    size32_t currentSize = 0;
-    const byte *currentRow = nullptr;
-    bool first = true;
-    unsigned __int64 chooseNLimit = 0;
-    ConstPointerArray activeBlobs;
-    unsigned crc = 0;
-    mutable bool hasRemoteSupport = false; // must check 1st
-    mutable Owned<IKeyManager> directKM; // failover manager if remote key support is unavailable
-
-    CRemoteFileIO *prepKeySend(MemoryBuffer &sendBuffer, RemoteFileCommandType cmd, bool segmentMonitors)
-    {
-        Owned<IFileIO> iFileIO = delayedFile->getFileIO();
-        if (!iFileIO)
-            throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
-        Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
-        assertex(remoteIO);
-        initSendBuffer(sendBuffer);
-        size32_t keySize = 0; // backward compatibility - now ignored
-        sendBuffer.append(cmd).append(remoteIO->getHandle()).append(filename).append(keySize);
-        if (segmentMonitors)
-            segs.serialize(sendBuffer);
-        return remoteIO.getClear();
-    }
-    bool remoteSupport() const
-    {
-        if (hasRemoteSupport)
-            return true;
-        else if (directKM)
-            return false;
-        Owned<IFileIO> iFileIO = delayedFile->getFileIO();
-        if (!iFileIO)
-            throw MakeStringException(0, "CRemoteKeyManager: Failed to open key file: %s", filename.get());
-        Linked<CRemoteFileIO> remoteIO = QUERYINTERFACE(iFileIO.get(), CRemoteFileIO);
-        bool useRemote = nullptr != remoteIO.get();
-        if (useRemote)
-        {
-            StringBuffer verString;
-            unsigned ver = getRemoteVersion(*remoteIO, verString);
-            if (ver < MIN_KEYFILTSUPPORT_VERSION)
-                useRemote = false;
-        }
-        if (useRemote)
-        {
-            PROGLOG("Using remote key manager for file: %s", filename.get());
-            hasRemoteSupport = true;
-        }
-        else
-        {
-            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
-            directKM.setown(createLocalKeyManager(keyIndex, nullptr));
-            return false;
-        }
-        return true;
-    }
-    unsigned __int64 _checkCount(unsigned __int64 limit)
-    {
-        MemoryBuffer sendBuffer;
-        Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexcount, true);
-        sendBuffer.append(limit);
-        MemoryBuffer replyBuffer;
-        remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
-        unsigned __int64 count;
-        replyBuffer.read(count);
-        return count;
-    }
-public:
-    CRemoteKeyManager(const char *_filename, unsigned _crc, IDelayedFile *_delayedFile) : segs(true), filename(_filename), crc(_crc), delayedFile(_delayedFile)
-    {
-    }
-    ~CRemoteKeyManager()
-    {
-        releaseBlobs();
-    }
-// IKeyManager impl.
-    virtual void reset(bool crappyHack = false) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->reset(crappyHack);
-            return;
-        }
-        rowDataBuffer.clear();
-        rowDataRemaining = 0;
-        keyCursorMb.clear();
-        currentSize = 0;
-        currentRow = nullptr;
-        first = true;
-        totalGot = 0;
-    }
-    virtual void releaseSegmentMonitors() override
-    {
-        if (!remoteSupport())
-        {
-            directKM->releaseSegmentMonitors();
-            return;
-        }
-        segs.reset();
-    }
-    virtual const byte *queryKeyBuffer() override
-    {
-        if (!remoteSupport())
-            return directKM->queryKeyBuffer();
-        return currentRow;
-    }
-    virtual unsigned queryRecordSize() override
-    {
-        if (!remoteSupport())
-            return directKM->queryRecordSize();
-        return currentSize; // this is wrong I think
-    }
-    virtual size32_t queryRowSize() override
-    {
-        if (!remoteSupport())
-            return directKM->queryRowSize();
-        return currentSize;
-    }
-    virtual unsigned __int64 querySequence() override
-    {
-        if (!remoteSupport())
-            return directKM->querySequence();
-        UNIMPLEMENTED;
-    }
-    virtual bool lookup(bool exact) override
-    {
-        if (!remoteSupport())
-            return directKM->lookup(exact);
-        while (true)
-        {
-            if (rowDataRemaining)
-            {
-                rowDataBuffer.read(currentSize);
-                currentRow = rowDataBuffer.readDirect(currentSize);
-                rowDataRemaining -= sizeof(currentSize) + currentSize;
-                return true;
-            }
-            else
-            {
-                if (!first && (0 == keyCursorMb.length())) // No keyCursor implies there is nothing more to fetch
-                    return false;
-                unsigned maxRecs = 0;
-                if (chooseNLimit)
-                {
-                    if (totalGot == chooseNLimit)
-                        break;
-                    unsigned __int64 max = chooseNLimit-totalGot;
-                    if (max > UINT_MAX)
-                        maxRecs = UINT_MAX;
-                    else
-                        maxRecs = (unsigned)max;
-                }
-                MemoryBuffer sendBuffer;
-                Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindex, true);
-                sendBuffer.append(first).append(maxRecs);
-                if (first)
-                    first = false;
-                else
-                {
-                    dbgassertex(keyCursorMb.length());
-                    sendBuffer.append(keyCursorMb);
-                }
-                rowDataBuffer.clear();
-                remoteIO->sendRemoteCommand(sendBuffer, rowDataBuffer);
-                unsigned recsGot;
-                rowDataBuffer.read(recsGot);
-                if (0 == recsGot)
-                {
-                    keyCursorMb.clear(); // signals no more data if called again.
-                    break; // end
-                }
-                totalGot += recsGot;
-                rowDataBuffer.read(rowDataRemaining);
-                unsigned pos = rowDataBuffer.getPos(); // start of row data
-                const void *rowData = rowDataBuffer.readDirect(rowDataRemaining);
-                size32_t keyCursorSz;
-                rowDataBuffer.read(keyCursorSz);
-                keyCursorMb.clear();
-                if (keyCursorSz)
-                    keyCursorMb.append(keyCursorSz, rowDataBuffer.readDirect(keyCursorSz));
-                rowDataBuffer.reset(pos); // reposition to start of row data
-            }
-        }
-        return false;
-    }
-    virtual unsigned __int64 getCount() override
-    {
-        if (!remoteSupport())
-            return directKM->getCount();
-        return _checkCount((unsigned __int64)-1);
-    }
-    virtual unsigned __int64 getCurrentRangeCount(unsigned groupSegCount) override
-    {
-        if (!remoteSupport())
-            return directKM->getCurrentRangeCount(groupSegCount);
-        UNIMPLEMENTED;
-    }
-    virtual bool nextRange(unsigned groupSegCount) override
-    {
-        if (!remoteSupport())
-            return directKM->nextRange(groupSegCount);
-        UNIMPLEMENTED;
-    }
-    virtual void setKey(IKeyIndexBase * _key) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->setKey(_key);
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-    virtual void setChooseNLimit(unsigned __int64 _chooseNLimit) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->setChooseNLimit(_chooseNLimit);
-            return;
-        }
-        chooseNLimit = _chooseNLimit;
-    }
-    virtual unsigned __int64 checkCount(unsigned __int64 limit) override
-    {
-        if (!remoteSupport())
-            directKM->checkCount(limit);
-        return _checkCount(limit);
-    }
-    virtual void serializeCursorPos(MemoryBuffer &mb) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->serializeCursorPos(mb);
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-    virtual void deserializeCursorPos(MemoryBuffer &mb) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->deserializeCursorPos(mb);
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-    virtual unsigned querySeeks() const override
-    {
-        if (!remoteSupport())
-            return directKM->querySeeks();
-        return 0;
-    }
-    virtual unsigned queryScans() const override
-    {
-        if (!remoteSupport())
-            return directKM->queryScans();
-        return 0;
-    }
-    virtual unsigned querySkips() const override
-    {
-        if (!remoteSupport())
-            return directKM->querySkips();
-        return 0;
-    }
-    virtual unsigned queryNullSkips() const override
-    {
-        if (!remoteSupport())
-            return directKM->queryNullSkips();
-        return 0;
-    }
-    virtual const byte *loadBlob(unsigned __int64 blobId, size32_t &blobSize) override
-    {
-        if (!remoteSupport())
-            return directKM->loadBlob(blobId, blobSize);
-        MemoryBuffer sendBuffer;
-        Owned<CRemoteFileIO> remoteIO = prepKeySend(sendBuffer, RFCreadfilteredindexblob, false);
-        sendBuffer.append(blobId);
-        MemoryBuffer replyBuffer;
-        remoteIO->sendRemoteCommand(sendBuffer, replyBuffer);
-        replyBuffer.read(blobSize);
-        const byte *blobData = replyBuffer.readDirect(blobSize);
-        activeBlobs.append(replyBuffer.detach()); // NB: don't need to retain size, but keep sz+data to avoid copy
-        return blobData;
-    }
-    virtual void releaseBlobs() override
-    {
-        if (!remoteSupport())
-            return directKM->releaseBlobs();
-        ForEachItemIn(idx, activeBlobs)
-        {
-            free((void *) activeBlobs.item(idx));
-        }
-        activeBlobs.kill();
-    }
-    virtual void resetCounts() override
-    {
-        if (!remoteSupport())
-        {
-            directKM->resetCounts();
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-
-    virtual void setLayoutTranslator(IRecordLayoutTranslator * trans) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->setLayoutTranslator(trans);
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-    virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->setSegmentMonitors(segmentMonitors);
-            return;
-        }
-        segs.swapWith(segmentMonitors);
-    }
-    virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->deserializeSegmentMonitors(mb);
-            return;
-        }
-        segs.deserialize(mb);
-    }
-    virtual void finishSegmentMonitors() override
-    {
-        if (!remoteSupport())
-        {
-            directKM->finishSegmentMonitors();
-            return;
-        }
-    }
-    virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) override
-    {
-        if (!remoteSupport())
-            return directKM->lookupSkip(seek, seekGEOffset, seeklen);
-        UNIMPLEMENTED;
-    }
-    virtual void append(IKeySegmentMonitor *segment) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->append(segment);
-            return;
-        }
-        segs.append(segment);
-    }
-    virtual unsigned ordinality() const override
-    {
-        if (!remoteSupport())
-            return directKM->ordinality();
-        return segs.ordinality();
-    }
-    virtual IKeySegmentMonitor *item(unsigned idx) const override
-    {
-        if (!remoteSupport())
-            return directKM->item(idx);
-        return segs.item(idx);
-    }
-    virtual void setMergeBarrier(unsigned offset) override
-    {
-        if (!remoteSupport())
-        {
-            directKM->setMergeBarrier(offset);
-            return;
-        }
-        UNIMPLEMENTED;
-    }
-};
-
-IKeyManager *createRemoteKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile)
-{
-    return new CRemoteKeyManager(filename, crc, delayedFile);
-}
-
-IKeyManager *createKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile, bool allowRemote, bool forceRemote)
-{
-    RemoteFilename rfn;
-    rfn.setRemotePath(filename);
-    if (forceRemote || (allowRemote && !rfn.isLocal()))
-        return createRemoteKeyManager(filename, crc, delayedFile);
-    else
-    {
-        Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
-        return createLocalKeyManager(keyIndex, nullptr);
-    }
-}
 
 //////////////
 
@@ -4794,46 +4394,6 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         return numRecs;
     }
 
-    IKeyManager *prepKey(int handle, const char *keyname, SegMonitorList *segs)
-    {
-        OpenFileInfo fileInfo;
-        if (!lookupFileIOHandle(handle, fileInfo, of_key))
-        {
-            VStringBuffer errStr("Error opening key file : %s", keyname);
-            throw createDafsException(RFSERR_InvalidFileIOHandle, errStr.str());
-        }
-        Owned<IKeyIndex> index = createKeyIndex(keyname, 0, *fileInfo.fileIO, false, false);
-        if (!index)
-        {
-            VStringBuffer errStr("Error opening key file : %s", keyname);
-            throw createDafsException(RFSERR_KeyIndexFailed, errStr.str());
-        }
-        Owned<IKeyManager> keyManager = createLocalKeyManager(index, nullptr);
-        if (segs)
-        {
-            keyManager->setSegmentMonitors(*segs);
-            keyManager->finishSegmentMonitors();
-        }
-        keyManager->reset();
-        return keyManager.getLink();
-    }
-
-    IKeyManager *prepKey(MemoryBuffer &mb, bool segmentMonitors)
-    {
-        int handle;
-        StringBuffer keyName;
-        size32_t keySize; // backward comp
-        mb.read(handle).read(keyName).read(keySize);
-        if (segmentMonitors)
-        {
-            SegMonitorList segs(true);
-            segs.deserialize(mb);
-            return prepKey(handle, keyName, &segs);
-        }
-        else
-            return prepKey(handle, keyName, nullptr);
-    }
-
     class cCommandProcessor: public CInterface, implements IPooledThread
     {
         Owned<CRemoteClientHandler> client;
@@ -5191,61 +4751,6 @@ public:
         return true;
     }
 
-    bool cmdReadFilteredIndex(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
-    {
-        Owned<IKeyManager> keyManager = prepKey(msg, true);
-        bool first;
-        unsigned maxRecs;
-        msg.read(first).read(maxRecs);
-        if (!first)
-            keyManager->deserializeCursorPos(msg);
-
-        reply.append((unsigned)RFEnoerror);
-        DelayedMarker<unsigned> numReturned(reply);
-        bool maxHit;
-        unsigned numRecs = readKeyData(keyManager, maxRecs, reply, maxHit);
-        numReturned.write(numRecs);
-
-        DelayedSizeMarker keyCursorSzMarker(reply);
-        if (maxHit) // if maximum hit, either supplied maxRecs limit, or buffer limit, return cursor
-            keyManager->serializeCursorPos(reply);
-        keyCursorSzMarker.write();
-        return true;
-    }
-
-    bool cmdReadFilteredIndexCount(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
-    {
-        Owned<IKeyManager> keyManager = prepKey(msg, true);
-
-        unsigned __int64 limit;
-        msg.read(limit);
-        unsigned __int64 count;
-        if (((unsigned __int64)-1) != limit)
-            count = keyManager->checkCount(limit);
-        else
-            count = keyManager->getCount();
-        reply.append((unsigned)RFEnoerror);
-        reply.append(count);
-        return true;
-    }
-
-    bool cmdReadFilteredIndexBlob(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
-    {
-        Owned<IKeyManager> keyManager = prepKey(msg, false);
-        unsigned __int64 blobId;
-        msg.read(blobId);
-
-        size32_t blobSize;
-        const byte *blobData = keyManager->loadBlob(blobId, blobSize);
-
-        reply.append((unsigned)RFEnoerror);
-        reply.append(blobSize);
-        reply.append(blobSize, blobData);
-
-        keyManager->releaseBlobs();
-        return true;
-    }
-
     bool cmdSize(MemoryBuffer & msg, MemoryBuffer & reply)
     {
         int handle;
@@ -6235,9 +5740,6 @@ public:
             {
                 MAPCOMMANDSTATS(RFCread, cmdRead, *stats);
                 MAPCOMMANDSTATS(RFCwrite, cmdWrite, *stats);
-                MAPCOMMANDSTATS(RFCreadfilteredindex, cmdReadFilteredIndex, *stats);
-                MAPCOMMANDSTATS(RFCreadfilteredindexcount, cmdReadFilteredIndexCount, *stats);
-                MAPCOMMANDSTATS(RFCreadfilteredindexblob, cmdReadFilteredIndexBlob, *stats);
                 MAPCOMMANDCLIENTSTATS(RFCappend, cmdAppend, *client, *stats);
                 MAPCOMMAND(RFCcloseIO, cmdCloseFileIO);
                 MAPCOMMANDCLIENT(RFCopenIO, cmdOpenFileIO, *client);

+ 1 - 2
common/remote/sockfile.hpp

@@ -64,9 +64,8 @@ interface IRemoteFileServer : extends IInterface
 
 interface IKeyManager;
 interface IDelayedFile;
+
 extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename);
-extern REMOTE_API IKeyManager *createKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile, bool allowRemote, bool forceRemote);
-extern REMOTE_API IKeyManager * createRemoteKeyManager(const char *filename, unsigned crc, IDelayedFile *delayedFile);
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();

+ 4 - 0
common/thorhelper/thorcommon.ipp

@@ -99,6 +99,10 @@ public:
     {
         return meta->queryChildMeta(i);
     }
+    inline const RtlRecord &queryRecordAccessor(bool expand) const
+    {
+        return meta->queryRecordAccessor(expand);
+    }
 
 //cast operators.
     inline IOutputMetaData * queryOriginal() const          { return meta; }

+ 5 - 0
ecl/hql/hqlexpr.cpp

@@ -14074,6 +14074,11 @@ void exportBinaryType(MemoryBuffer &ret, IHqlExpression *table)
     dumpTypeInfo(ret, typeInfo, false);
 }
 
+const RtlTypeInfo *queryRtlType(IRtlFieldTypeDeserializer &deserializer, IHqlExpression *table)
+{
+    return buildRtlType(deserializer, table->queryType());
+}
+
 void exportData(IPropertyTree *data, IHqlExpression *table, bool flatten)
 {
     IPropertyTree *tt = NULL;

+ 4 - 0
ecl/hql/hqlexpr.hpp

@@ -1829,9 +1829,13 @@ extern HQL_API StringBuffer& getFriendlyTypeStr(ITypeInfo* type, StringBuffer& s
 #define ForEachChildFrom(idx, expr, first)  unsigned numOfChildren##idx = (expr)->numChildren(); \
         for (unsigned idx = first; idx < numOfChildren##idx; idx++) 
 
+struct RtlTypeInfo;
+interface IRtlFieldTypeDeserializer;
+
 extern HQL_API void exportData(IPropertyTree *data, IHqlExpression *table, bool flatten=false);
 extern HQL_API void exportJsonType(StringBuffer &ret, IHqlExpression *table);
 extern HQL_API void exportBinaryType(MemoryBuffer &ret, IHqlExpression *table);
+extern HQL_API const RtlTypeInfo *queryRtlType(IRtlFieldTypeDeserializer &deserializer, IHqlExpression *table);
 
 extern HQL_API void clearCacheCounts();
 extern HQL_API void displayHqlCacheStats();

+ 4 - 1
ecl/hthor/hthor.cpp

@@ -8364,7 +8364,10 @@ void CHThorDiskReadBaseActivity::open()
 //=====================================================================================================
 
 CHThorBinaryDiskReadBase::CHThorBinaryDiskReadBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind)
-: CHThorDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind), segHelper(_segHelper), prefetchBuffer(NULL), recInfo(outputMeta.queryOriginal()->queryRecordAccessor(true)),rowInfo(recInfo)
+: CHThorDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind),
+  segHelper(_segHelper), prefetchBuffer(NULL),
+  recInfo(outputMeta.queryRecordAccessor(true)),  // MORE - is this right - should it be diskMeta->queryRecordAccessor() ?
+  rowInfo(recInfo)
 {
 }
 

+ 15 - 9
ecl/hthor/hthorkey.cpp

@@ -469,7 +469,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
         {
             Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
             verifyIndex(tlk);
-            Owned<IKeyManager> tlman = createLocalKeyManager(tlk, NULL);
+            Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL);
             initManager(tlman, true);
             while(tlman->lookup(false) && (count<=limit))
             {
@@ -505,7 +505,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r
         verifyIndex(kidx);
     if (limit != (unsigned) -1)
     {
-        Owned<IKeyManager> kman = createLocalKeyManager(kidx, NULL);
+        Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, NULL);
         initManager(kman, false);
         result += kman->checkCount(limit-result);
     }
@@ -607,7 +607,7 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk)
 void CHThorIndexReadActivityBase::initPart()                                    
 { 
     assertex(!keyIndex->isTopLevelKey());
-    klManager.setown(createLocalKeyManager(keyIndex, NULL));
+    klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, NULL));
     initManager(klManager, false);
     callback.setManager(klManager);
 }
@@ -637,7 +637,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart()
     if(!tlk)
         openTlk();
     verifyIndex(tlk);
-    tlManager.setown(createLocalKeyManager(tlk, NULL));
+    tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL));
     initManager(tlManager, true);
     nextPartNumber = 0;
     return nextMultiPart();
@@ -868,7 +868,7 @@ bool CHThorIndexReadActivity::nextPart()
 {
     if(keyIndexCache && (seekGEOffset || localSortKey))
     {
-        klManager.setown(createKeyMerger(keyIndexCache, seekGEOffset, NULL));
+        klManager.setown(createKeyMerger(eclKeySize.queryRecordAccessor(true), keyIndexCache, seekGEOffset, NULL));
         keyIndexCache.clear();
         initManager(klManager, false);
         callback.setManager(klManager);
@@ -2729,6 +2729,7 @@ interface IJoinProcessor
     virtual void onComplete(CJoinGroup * jg) = 0;
     virtual bool leftCanMatch(const void *_left) = 0;
     virtual IRecordLayoutTranslator * getLayoutTranslator(IDistributedFile * f) = 0;
+    virtual const RtlRecord &queryIndexRecord() = 0;
     virtual void verifyIndex(IDistributedFile * f, IKeyIndex * idx, IRecordLayoutTranslator * trans) = 0;
 };
 
@@ -3116,7 +3117,7 @@ public:
             //Owned<IRecordLayoutTranslator> 
             trans.setown(owner.getLayoutTranslator(&f));
             owner.verifyIndex(&f, index, trans);
-            Owned<IKeyManager> manager = createLocalKeyManager(index, NULL);
+            Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, NULL);
             managers.append(*manager.getLink());
         }
         opened = true;
@@ -3155,7 +3156,7 @@ void KeyedLookupPartHandler::openPart()
     if(manager)
         return;
     Owned<IKeyIndex> index = openKeyFile(*part);
-    manager.setown(createLocalKeyManager(index, NULL));
+    manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL));
     IRecordLayoutTranslator * trans = tlk->queryRecordLayoutTranslator();
     if(trans && !index->isTopLevelKey())
         manager->setLayoutTranslator(trans);
@@ -3235,7 +3236,7 @@ public:
             {
                 Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
                 owner.verifyIndex(&f, index, trans);
-                manager.setown(createLocalKeyManager(index, NULL));
+                manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL));
             }
             else
             {
@@ -3248,7 +3249,7 @@ public:
                     parts->addIndex(index.getLink());
                 }
                 owner.verifyIndex(&f, index, trans);
-                manager.setown(createKeyMerger(parts, 0, nullptr));
+                manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, nullptr));
             }
             if(trans)
                 manager->setLayoutTranslator(trans);
@@ -4063,6 +4064,11 @@ protected:
             ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
     }
 
+    virtual const RtlRecord &queryIndexRecord()
+    {
+        return eclKeySize.queryRecordAccessor(true);
+    }
+
     virtual void fail(char const * msg)
     {
         throw MakeStringExceptionDirect(0, msg);

+ 8 - 5
roxie/ccd/ccdactivities.cpp

@@ -957,7 +957,7 @@ public:
         if (!segment->isWild())
         {
             if (!cursor)
-                cursor.setown(manager->createCursor(diskSize.queryOriginal()->queryRecordAccessor(true)));
+                cursor.setown(manager->createCursor(diskSize.queryRecordAccessor(true)));
             cursor->append(segment);
         }
     }
@@ -3135,6 +3135,7 @@ protected:
     Owned<IKeyManager> tlk;
     Linked<TranslatorArray> layoutTranslators;
     Linked<IKeyArray> keyArray;
+    const RtlRecord *keyRecInfo = nullptr;
     IDefRecordMeta *activityMeta;
     bool createSegmentMonitorsPending;
 
@@ -3160,7 +3161,7 @@ protected:
             }
             if (allKeys->numParts())
             {
-                tlk.setown(createKeyMerger(allKeys, 0, &logctx));
+                tlk.setown(createKeyMerger(*keyRecInfo, allKeys, 0, &logctx));
                 createSegmentMonitorsPending = true;
             }
             else
@@ -3173,7 +3174,7 @@ protected:
             IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
             if (filechanged)
             {
-                tlk.setown(createLocalKeyManager(k, &logctx));
+                tlk.setown(createLocalKeyManager(*keyRecInfo, k, &logctx));
                 createSegmentMonitorsPending = true;
             }
             else
@@ -3264,12 +3265,13 @@ protected:
 
 public:
     CRoxieIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
-        : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory), 
+        : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory),
         factory(_aFactory),
         steppingOffset(_steppingOffset),
         stepExtra(SSEFreadAhead, NULL)
     {
         indexHelper = (IHThorIndexReadBaseArg *) basehelper;
+        keyRecInfo = &indexHelper->queryDiskRecordSize()->queryRecordAccessor(true);
         variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
         isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
         inputData = NULL;
@@ -3422,7 +3424,7 @@ public:
                 i++;
             }
             if (allKeys->numParts())
-                tlk.setown(::createKeyMerger(allKeys, steppingOffset, &logctx));
+                tlk.setown(::createKeyMerger(*keyRecInfo, allKeys, steppingOffset, &logctx));
             else
                 tlk.clear();
             createSegmentMonitorsPending = true;
@@ -4682,6 +4684,7 @@ public:
         : factory(_aFactory), CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory)
     {
         helper = (IHThorKeyedJoinArg *) basehelper;
+        keyRecInfo = &helper->queryIndexRecordSize()->queryRecordAccessor(true);
         variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename|JFindexfromactivity)) != 0);
         inputDone = 0;
         processed = 0;

+ 11 - 11
roxie/ccd/ccdserver.cpp

@@ -21563,7 +21563,7 @@ public:
         if (!segment->isWild())
         {
             if (!cursor)
-                cursor.setown(manager->createCursor(diskSize.queryOriginal()->queryRecordAccessor(true)));
+                cursor.setown(manager->createCursor(diskSize.queryRecordAccessor(true)));
             cursor->append(segment);
         }
     }
@@ -22791,7 +22791,7 @@ public:
                                     if ((indexHelper.getFlags() & TIRcountkeyedlimit) != 0)
                                     {
                                         Owned<IKeyManager> countKey;
-                                        countKey.setown(createLocalKeyManager(thisKey, this));
+                                        countKey.setown(createLocalKeyManager(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), thisKey, this));
                                         countKey->setLayoutTranslator(translators->item(fileNo));
                                         createSegmentMonitors(countKey);
                                         unsigned __int64 count = countKey->checkCount(keyedLimit);
@@ -22806,11 +22806,11 @@ public:
                             }
                             if (seekGEOffset && !thisKey->isTopLevelKey())
                             {
-                                tlk.setown(createSingleKeyMerger(thisKey, seekGEOffset, this));
+                                tlk.setown(createSingleKeyMerger(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), thisKey, seekGEOffset, this));
                             }
                             else
                             {
-                                tlk.setown(createLocalKeyManager(thisKey, this));
+                                tlk.setown(createLocalKeyManager(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), thisKey, this));
                                 if (!thisKey->isTopLevelKey())
                                     tlk->setLayoutTranslator(translators->item(fileNo));
                                 else
@@ -23076,9 +23076,9 @@ public:
             keySet.setown(createKeyIndexSet());
             keySet->addIndex(LINK(key));
             if (owner.seekGEOffset)
-                tlk.setown(createKeyMerger(keySet, owner.seekGEOffset, &owner));
+                tlk.setown(createKeyMerger(owner.indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), keySet, owner.seekGEOffset, &owner));
             else
-                tlk.setown(createLocalKeyManager(keySet->queryPart(0), &owner));
+                tlk.setown(createLocalKeyManager(owner.indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), keySet->queryPart(0), &owner));
             if (!key->isTopLevelKey())
                 tlk->setLayoutTranslator(trans);
             owner.indexHelper.createSegmentMonitors(tlk);
@@ -23489,7 +23489,7 @@ public:
         unsigned __int64 result = 0;
         for (unsigned i = 0; i < numParts; i++)
         {
-            Owned<IKeyManager> countTlk = createLocalKeyManager(keyIndexSet->queryPart(i), this);
+            Owned<IKeyManager> countTlk = createLocalKeyManager(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet->queryPart(i), this);
             countTlk->setLayoutTranslator(translators->item(i));
             indexHelper.createSegmentMonitors(countTlk);
             countTlk->finishSegmentMonitors();
@@ -23521,12 +23521,12 @@ public:
             }
             if (numParts > 1 || seekGEOffset)
             {
-                tlk.setown(createKeyMerger(keyIndexSet, seekGEOffset, this));
+                tlk.setown(createKeyMerger(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, this));
                 // note that we don't set up translator because we don't support it. If that ever changes...
             }
             else
             {
-                tlk.setown(createLocalKeyManager(keyIndexSet->queryPart(0), this));
+                tlk.setown(createLocalKeyManager(indexHelper.queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet->queryPart(0), this));
                 tlk->setLayoutTranslator(translators->item(0));
             }
             indexHelper.createSegmentMonitors(tlk);
@@ -25291,7 +25291,7 @@ public:
     CRoxieServerFullKeyedJoinHead(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, IKeyArray * _keySet, TranslatorArray *_translators, IOutputMetaData *_indexReadMeta, IJoinProcessor *_joinHandler, bool _isLocal)
         : CRoxieServerActivity(_ctx, _factory, _probeManager),
           helper((IHThorKeyedJoinArg &)basehelper), 
-          tlk(createLocalKeyManager(NULL, this)),
+          tlk(createLocalKeyManager(helper.queryIndexRecordSize()->queryRecordAccessor(true), NULL, this)),
           translators(_translators),
           keySet(_keySet),
           remote(_ctx, this, _remoteId, 0, helper, *this, true, true),
@@ -26193,7 +26193,7 @@ public:
         IOutputMetaData *_indexReadMeta, unsigned _joinFlags, bool _isSimple, bool _isLocal)
         : CRoxieServerKeyedJoinBase(_ctx, _factory, _probeManager, _remoteId, _joinFlags, false, _isSimple, _isLocal),
           indexReadMeta(_indexReadMeta),
-          tlk(createLocalKeyManager(NULL, this)),
+          tlk(createLocalKeyManager(helper.queryIndexRecordSize()->queryRecordAccessor(true), NULL, this)),
           keySet(_keySet),
           translators(_translators)
     {

+ 1 - 0
rtl/eclrtl/rtldynfield.cpp

@@ -44,6 +44,7 @@ const RtlTypeInfo *FieldTypeInfoStruct::createRtlTypeInfo(IThorIndexCallback *_c
     case type_keyedint:
         ret = new RtlKeyedIntTypeInfo(fieldType, length, childType);
         break;
+    case type_blob:  // MORE - will need its own type!
     case type_int:
         ret = new RtlIntTypeInfo(fieldType, length);
         break;

+ 65 - 55
system/jhtree/jhtree.cpp

@@ -57,6 +57,7 @@
 
 #include "jhtree.ipp"
 #include "keybuild.hpp"
+#include "eclhelper_dyn.hpp"
 #include "layouttrans.hpp"
 
 static std::atomic<CKeyStore *> keyStore(nullptr);
@@ -484,7 +485,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    CKeyLevelManager(IKeyIndex * _key, IContextLogger *_ctx) : segs(true)
+    CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx) : segs(_recInfo, true)
     {
         ctx = _ctx;
         numsegs = 0;
@@ -2380,14 +2381,14 @@ class CKeyMerger : public CKeyLevelManager
     }
 
 public:
-    CKeyMerger(IKeyIndexSet *_keyset, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(NULL, _ctx), sortFieldOffset(_sortFieldOffset)
+    CKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet *_keyset, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
     {
         segs.setMergeBarrier(sortFieldOffset);
         init();
         setKey(_keyset);
     }
 
-    CKeyMerger(IKeyIndex *_onekey, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(NULL, _ctx), sortFieldOffset(_sortFieldOffset)
+    CKeyMerger(const RtlRecord &_recInfo, IKeyIndex *_onekey, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
     {
         segs.setMergeBarrier(sortFieldOffset);
         init();
@@ -2871,14 +2872,14 @@ public:
     }
 };
 
-extern jhtree_decl IKeyManager *createKeyMerger(IKeyIndexSet * _keys, unsigned _sortFieldOffset, IContextLogger *_ctx)
+extern jhtree_decl IKeyManager *createKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet * _keys, unsigned _sortFieldOffset, IContextLogger *_ctx)
 {
-    return new CKeyMerger(_keys, _sortFieldOffset, _ctx);
+    return new CKeyMerger(_recInfo, _keys, _sortFieldOffset, _ctx);
 }
 
-extern jhtree_decl IKeyManager *createSingleKeyMerger(IKeyIndex * _onekey, unsigned _sortFieldOffset, IContextLogger *_ctx)
+extern jhtree_decl IKeyManager *createSingleKeyMerger(const RtlRecord &_recInfo, IKeyIndex * _onekey, unsigned _sortFieldOffset, IContextLogger *_ctx)
 {
-    return new CKeyMerger(_onekey, _sortFieldOffset, _ctx);
+    return new CKeyMerger(_recInfo, _onekey, _sortFieldOffset, _ctx);
 }
 
 class CKeyIndexSet : implements IKeyIndexSet, public CInterface
@@ -2906,9 +2907,9 @@ extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
     return new CKeyIndexSet;
 }
 
-extern jhtree_decl IKeyManager *createLocalKeyManager(IKeyIndex *key, IContextLogger *_ctx)
+extern jhtree_decl IKeyManager *createLocalKeyManager(const RtlRecord &_recInfo, IKeyIndex *_key, IContextLogger *_ctx)
 {
-    return new CKeyLevelManager(key, _ctx);
+    return new CKeyLevelManager(_recInfo, _key, _ctx);
 }
 
 class CKeyArray : implements IKeyArray, public CInterface
@@ -2950,16 +2951,23 @@ class IKeyManagerTest : public CppUnit::TestFixture
 
     void testStepping()
     {
-        buildTestKeys(false, false, false, false);
+        buildTestKeys(false);
         {
             // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
-            unsigned maxSize = 10; // (variable && blobby) ? 18 : 10;
             Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
             Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
             Owned<IKeyIndexSet> keyset = createKeyIndexSet();
             keyset->addIndex(index1.getClear());
             keyset->addIndex(index2.getClear());
-            Owned <IKeyManager> tlk1 = createKeyMerger(keyset, 7, NULL);
+            const char *json = "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
+                               "  \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
+                               " \"fieldType\": 13, \"length\": 10, "
+                               " \"fields\": [ "
+                               " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
+                               " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 } ] "
+                               "}";
+            Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
+            Owned <IKeyManager> tlk1 = createKeyMerger(meta->queryRecordAccessor(true), keyset, 7, NULL);
             Owned<IStringSet> sset1 = createStringSet(7);
             sset1->addRange("0000003", "0000003");
             sset1->addRange("0000005", "0000006");
@@ -2996,7 +3004,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
             ASSERT(!tlk1->lookup(true)); 
             ASSERT(!tlk1->lookup(true)); 
 
-            Owned <IKeyManager> tlk2 = createKeyMerger(NULL, 7, NULL);
+            Owned <IKeyManager> tlk2 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
             tlk2->setKey(keyset);
             tlk2->deserializeCursorPos(mb);
             tlk2->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
@@ -3012,7 +3020,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
             ASSERT(!tlk2->lookup(true)); 
             ASSERT(!tlk2->lookup(true)); 
 
-            Owned <IKeyManager> tlk3 = createKeyMerger(NULL, 7, NULL);
+            Owned <IKeyManager> tlk3 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
             tlk3->setKey(keyset);
             tlk3->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
             tlk3->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
@@ -3025,7 +3033,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
             ASSERT(!tlk3->lookupSkip("081", 7, 3)); 
             ASSERT(!tlk3->lookup(true)); 
 
-            Owned <IKeyManager> tlk4 = createKeyMerger(NULL, 7, NULL);
+            Owned <IKeyManager> tlk4 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
             tlk4->setKey(keyset);
             tlk4->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
             tlk4->append(createKeySegmentMonitor(false, sset3.getLink(), 1, 7, 3));
@@ -3051,19 +3059,19 @@ class IKeyManagerTest : public CppUnit::TestFixture
         removeTestKeys();
     }
 
-    void buildTestKeys(bool shortForm, bool indar, bool variable, bool blobby)
+    void buildTestKeys(bool variable)
     {
-        buildTestKey("keyfile1.$$$", false, shortForm, indar, variable, blobby);
-        buildTestKey("keyfile2.$$$", true, shortForm, indar, variable, blobby);
+        buildTestKey("keyfile1.$$$", false, variable);
+        buildTestKey("keyfile2.$$$", true, variable);
     }
 
-    void buildTestKey(const char *filename, bool skip, bool shortForm, bool indar, bool variable, bool blobby)
+    void buildTestKey(const char *filename, bool skip, bool variable)
     {
         OwnedIFile file = createIFile(filename);
         OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
         Owned<IFileIOStream> out = createIOStream(io);
-        unsigned maxRecSize = (variable && blobby) ? 18 : 10;
-        unsigned keyedSize = (shortForm || (variable && blobby)) ? 10 : (unsigned) -1;
+        unsigned maxRecSize = variable ? 18 : 10;
+        unsigned keyedSize = 10;
         Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0);
 
         char keybuf[18];
@@ -3071,7 +3079,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
         for (unsigned count = 0; count < 10000; count++)
         {
             unsigned datasize = 10;
-            if (blobby && variable && (count % 10)==0)
+            if (variable && (count % 10)==0)
             {
                 char *blob = new char[count+100000];
                 byte seed = count;
@@ -3132,31 +3140,41 @@ class IKeyManagerTest : public CppUnit::TestFixture
         key->releaseBlobs();
     }
 protected:
-    void testKeys(bool shortForm, bool indar, bool variable, bool blobby)
-    {
-        // If it is not a supported combination, just return
-        if (indar)
-        {
-            if (shortForm || variable || blobby)
-                return;
-        }
-        if (blobby)
+    void testKeys(bool variable)
+    {
+        const char *json = variable ?
+                "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
+                "  \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
+                "  \"ty3\": { \"fieldType\": 15, \"length\": 8 }, "
+                " \"fieldType\": 13, \"length\": 10, "
+                " \"fields\": [ "
+                " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
+                " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 }, "
+                " { \"name\": \"f3\", \"type\": \"ty3\", \"flags\": 65551 } "  // 0x01000f i.e. payload and blob
+                " ]"
+                "}"
+                :
+                "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
+                "  \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
+                " \"fieldType\": 13, \"length\": 10, "
+                " \"fields\": [ "
+                " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
+                " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 } "
+                " ] "
+                "}";
+        Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
+        const RtlRecord &recInfo = meta->queryRecordAccessor(true);
+        buildTestKeys(variable);
         {
-            if (!shortForm || !variable)
-                return;
-        }
-        buildTestKeys(shortForm, indar, variable, blobby);
-        {
-            unsigned maxSize = (variable && blobby) ? 18 : 10;
             Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
-            Owned <IKeyManager> tlk1 = createLocalKeyManager(index1, NULL);
+            Owned <IKeyManager> tlk1 = createLocalKeyManager(recInfo, index1, NULL);
             Owned<IStringSet> sset1 = createStringSet(10);
             sset1->addRange("0000000001", "0000000100");
             tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 0, 10));
             tlk1->finishSegmentMonitors();
             tlk1->reset();
 
-            Owned <IKeyManager> tlk1a = createLocalKeyManager(index1, NULL);
+            Owned <IKeyManager> tlk1a = createLocalKeyManager(recInfo, index1, NULL);
             Owned<IStringSet> sset1a = createStringSet(8);
             sset1a->addRange("00000000", "00000001");
             tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 0, 8));
@@ -3167,14 +3185,6 @@ protected:
             tlk1a->finishSegmentMonitors();
             tlk1a->reset();
 
-/*          for (;;)
-            {
-                if (!tlk1a->lookup(true))
-                    break;
-                DBGLOG("%.10s", tlk1a->queryKeyBuffer());
-            }
-            tlk1a->reset();
-*/
 
             Owned<IStringSet> ssetx = createStringSet(10);
             ssetx->addRange("0000000001", "0000000002");
@@ -3188,7 +3198,7 @@ protected:
 
 
             Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
-            Owned <IKeyManager> tlk2 = createLocalKeyManager(index2, NULL);
+            Owned <IKeyManager> tlk2 = createLocalKeyManager(recInfo, index2, NULL);
             Owned<IStringSet> sset2 = createStringSet(10);
             sset2->addRange("0000000001", "0000000100");
             ASSERT(sset2->numValues() == 65536);
@@ -3203,7 +3213,7 @@ protected:
                 both->addIndex(index1.getLink());
                 both->addIndex(index2.getLink());
                 Owned<IStringSet> sset3 = createStringSet(10);
-                tlk3.setown(createKeyMerger(NULL, 0, NULL));
+                tlk3.setown(createKeyMerger(recInfo, NULL, 0, NULL));
                 tlk3->setKey(both);
                 sset3->addRange("0000000001", "0000000100");
                 tlk3->append(createKeySegmentMonitor(false, sset3.getClear(), 0, 0, 10));
@@ -3211,7 +3221,7 @@ protected:
                 tlk3->reset();
             }
 
-            Owned <IKeyManager> tlk2a = createLocalKeyManager(index2, NULL);
+            Owned <IKeyManager> tlk2a = createLocalKeyManager(recInfo, index2, NULL);
             Owned<IStringSet> sset2a = createStringSet(10);
             sset2a->addRange("0000000048", "0000000048");
             ASSERT(sset2a->numValues() == 1);
@@ -3219,7 +3229,7 @@ protected:
             tlk2a->finishSegmentMonitors();
             tlk2a->reset();
 
-            Owned <IKeyManager> tlk2b = createLocalKeyManager(index2, NULL);
+            Owned <IKeyManager> tlk2b = createLocalKeyManager(recInfo, index2, NULL);
             Owned<IStringSet> sset2b = createStringSet(10);
             sset2b->addRange("0000000047", "0000000049");
             ASSERT(sset2b->numValues() == 3);
@@ -3227,7 +3237,7 @@ protected:
             tlk2b->finishSegmentMonitors();
             tlk2b->reset();
 
-            Owned <IKeyManager> tlk2c = createLocalKeyManager(index2, NULL);
+            Owned <IKeyManager> tlk2c = createLocalKeyManager(recInfo, index2, NULL);
             Owned<IStringSet> sset2c = createStringSet(10);
             sset2c->addRange("0000000047", "0000000047");
             tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 0, 10));
@@ -3261,7 +3271,7 @@ protected:
                 ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000007", 10)==0);
                 ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000009", 10)==0);
                 ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000010", 10)==0);
-                if (blobby)
+                if (variable)
                     checkBlob(tlk1, 10+100000);
 
                 tlk1a->reset();
@@ -3320,8 +3330,8 @@ protected:
     void testKeys()
     {
         ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
-        for (unsigned i = 0; i < 16; i++)
-            testKeys((i & 0x8)!=0,(i & 0x4)!=0,(i & 0x2)!=0,(i & 0x1)!=0);
+        testKeys(false);
+        testKeys(true);
     }
 };
 

+ 8 - 4
system/jhtree/jhtree.hpp

@@ -154,6 +154,7 @@ typedef unsigned short UChar;
 #include "rtlkey.hpp"
 #include "jmisc.hpp"
 
+class RtlRecord;
 class jhtree_decl SegMonitorList : implements IInterface, implements IIndexReadContext, public CInterface
 {
     unsigned _lastRealSeg() const;
@@ -161,9 +162,10 @@ class jhtree_decl SegMonitorList : implements IInterface, implements IIndexReadC
     unsigned mergeBarrier;
     bool modified;
     bool needWild;
+    const RtlRecord &recInfo;
 public:
     IMPLEMENT_IINTERFACE;
-    inline SegMonitorList(bool _needWild) : needWild(_needWild) { reset(); }
+    inline SegMonitorList(const RtlRecord &_recInfo, bool _needWild) : recInfo(_recInfo), needWild(_needWild) { reset(); }
     IArrayOf<IKeySegmentMonitor> segMonitors;
 
     void reset();
@@ -235,9 +237,11 @@ inline offset_t extractFpos(IKeyManager * manager)
     return rtlReadBigUInt8(keyRow + offset);
 }
 
-extern jhtree_decl IKeyManager *createLocalKeyManager(IKeyIndex * _key, IContextLogger *ctx);
-extern jhtree_decl IKeyManager *createKeyMerger(IKeyIndexSet * _key, unsigned sortFieldOffset, IContextLogger *ctx);
-extern jhtree_decl IKeyManager *createSingleKeyMerger(IKeyIndex * _onekey, unsigned sortFieldOffset, IContextLogger *ctx);
+class RtlRecord;
+
+extern jhtree_decl IKeyManager *createLocalKeyManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *ctx);
+extern jhtree_decl IKeyManager *createKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet * _key, unsigned sortFieldOffset, IContextLogger *ctx);
+extern jhtree_decl IKeyManager *createSingleKeyMerger(const RtlRecord &_recInfo, IKeyIndex * _onekey, unsigned sortFieldOffset, IContextLogger *ctx);
 
 class KLBlobProviderAdapter : implements IBlobProvider
 {

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2379,7 +2379,7 @@ class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
         CKeyLookup(IndexDistributeSlaveActivity &_owner, IHThorKeyedDistributeArg *_helper, IKeyIndex *_tlk)
             : owner(_owner), helper(_helper), tlk(_tlk)
         {
-            tlkManager.setown(createLocalKeyManager(tlk, nullptr));
+            tlkManager.setown(createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), tlk, nullptr));
             numslaves = owner.queryContainer().queryJob().querySlaves();
         }
         unsigned hash(const void *data)

+ 1 - 2
thorlcr/activities/indexread/thindexread.cpp

@@ -160,8 +160,7 @@ protected:
                 if (!keyIndex)
                     throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", index->queryLogicalName());
 
-                unsigned fixedSize = indexBaseHelper->queryDiskRecordSize()->querySerializedDiskMeta()->getFixedSize(); // used only if fixed
-                Owned <IKeyManager> tlk = createLocalKeyManager(keyIndex, nullptr);
+                Owned <IKeyManager> tlk = createLocalKeyManager(indexBaseHelper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, nullptr);
                 indexBaseHelper->createSegmentMonitors(tlk);
                 tlk->finishSegmentMonitors();
                 tlk->reset();

+ 4 - 10
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -241,20 +241,14 @@ public:
             unsigned crc=0;
             part.getCrc(crc);
 
-            if ((localKey && partDescs.ordinality()>1) || seekGEOffset) // for now at least, no remote key support if stepping or merging
+            Owned<IKeyIndex> keyIndex = createKeyIndex(filePath, crc, *lfile, false, false);
+            klManager.setown(createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, nullptr));
+            if ((localKey && partDescs.ordinality()>1) || seekGEOffset)
             {
-                Owned<IKeyIndex> keyIndex = createKeyIndex(filePath, crc, *lfile, false, false);
-                klManager.setown(createLocalKeyManager(keyIndex, nullptr));
                 if (!keyIndexSet)
                     keyIndexSet.setown(createKeyIndexSet());
                 keyIndexSet->addIndex(keyIndex.getClear());
             }
-            else
-            {
-                bool allowRemote = getOptBool("remoteKeyFilteringEnabled");
-                bool forceRemote = allowRemote ? getOptBool("forceDafilesrv") : false; // can only force remote, if forceDafilesrv and remoteKeyFilteringEnabled are enabled.
-                klManager.setown(createKeyManager(filePath, crc, lfile, allowRemote, forceRemote));
-            }
             keyManagers.append(*klManager.getClear());
         }
     }
@@ -503,7 +497,7 @@ public:
                 steppingMeta.init(rawMeta, hasPostFilter);
         }
         if (keyIndexSet)
-            keyMergerManager.setown(createKeyMerger(keyIndexSet, seekGEOffset, nullptr));
+            keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr));
     }
 
 // IThorDataLink

+ 8 - 8
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1156,6 +1156,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
         Linked<IRowStream> in;
         unsigned nextTlk;
         Owned<IKeyManager> tlkManager;
+        const RtlRecord &keyRecInfo;
         bool eos, eog;
         IKeyIndex *currentTlk;
         CJoinGroup *currentJG;
@@ -1196,9 +1197,9 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CKeyLocalLookup(CKeyedJoinSlave &_owner) : owner(_owner), indexReadFieldsRow(_owner.indexInputAllocator)
+        CKeyLocalLookup(CKeyedJoinSlave &_owner, const RtlRecord &_keyRecInfo) : owner(_owner), keyRecInfo(_keyRecInfo), indexReadFieldsRow(_owner.indexInputAllocator)
         {
-            tlkManager.setown(owner.keyHasTlk ? createLocalKeyManager(nullptr, nullptr) : nullptr);
+            tlkManager.setown(owner.keyHasTlk ? createLocalKeyManager(keyRecInfo, nullptr, nullptr) : nullptr);
 
             if (owner.getKeyManagers(partKeyManagers)) // true signifies that dealing with a local mergable set of index parts
                 currentPartKeyManager = &partKeyManagers.item(0);
@@ -1493,7 +1494,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
             CKeyLookupPoolMember(CPRowStream &_owner) : owner(_owner)
             {
-                lookupStream.setown(new CKeyLocalLookup(owner.owner));
+                lookupStream.setown(new CKeyLocalLookup(owner.owner, owner.owner.helper->queryIndexRecordSize()->queryRecordAccessor(true)));
             }
             virtual void init(void *param) override
             {
@@ -1588,16 +1589,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             }
             else
             {
-                bool allowRemote = getOptBool("remoteKeyFilteringEnabled");
-                bool forceRemote = allowRemote ? getOptBool("forceDafilesrv") : false; // can only force remote, if forceDafilesrv and remoteKeyFilteringEnabled are enabled.
-                klManager.setown(createKeyManager(filename, crc, lfile, allowRemote, forceRemote));
+                Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *lfile, false, false);
+                klManager.setown(createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, nullptr));
                 keyManagers.append(*klManager.getClear());
             }
         }
         if (localMergedKey)
         {
             dbgassertex(0 == keyManagers.ordinality());
-            keyManagers.append(*createKeyMerger(partKeySet, 0, nullptr));
+            keyManagers.append(*createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr));
             return true;
         }
         else
@@ -2032,7 +2032,7 @@ public:
         else
         {
             parallelLookups = 0;
-            resultDistStream = new CKeyLocalLookup(*this);
+            resultDistStream = new CKeyLocalLookup(*this, helper->queryIndexRecordSize()->queryRecordAccessor(true));
         }
     }
     virtual void abort()

+ 13 - 4
tools/dumpkey/dumpkey.cpp

@@ -164,7 +164,7 @@ int main(int argc, const char **argv)
             }
             else
             {
-                Owned<IKeyManager> manager = createLocalKeyManager(index, NULL);
+                Owned<IKeyManager> manager;
                 Owned<IPropertyTree> metadata = index->getMetadata();
                 Owned<IOutputMetaData> diskmeta;
                 Owned<IOutputMetaData> translatedmeta;
@@ -176,7 +176,7 @@ int main(int argc, const char **argv)
                 class MyIndexCallback : public CInterfaceOf<IThorIndexCallback>
                 {
                 public:
-                    MyIndexCallback(IKeyManager *_manager) : manager(_manager) {}
+                    MyIndexCallback()  {}
                     virtual unsigned __int64 getFilePosition(const void * row)
                     {
                         return 0;
@@ -185,8 +185,7 @@ int main(int argc, const char **argv)
                     {
                         UNIMPLEMENTED;
                     }
-                    Linked<IKeyManager> manager;
-                } callback(manager);
+                } callback;
                 unsigned __int64 count = globals->getPropInt("recs", 1);
                 const RtlRecordTypeInfo *outRecType = nullptr;
                 if (metadata && metadata->hasProp("_rtlType"))
@@ -194,8 +193,12 @@ int main(int argc, const char **argv)
                     MemoryBuffer layoutBin;
                     metadata->getPropBin("_rtlType", layoutBin);
                     diskmeta.setown(createTypeInfoOutputMetaData(layoutBin, &callback));
+                }
+                if (diskmeta)
+                {
                     writer.setown(new SimpleOutputWriter);
                     const RtlRecord &inrec = diskmeta->queryRecordAccessor(true);
+                    manager.setown(createLocalKeyManager(inrec, index, nullptr));
                     size32_t minRecSize = 0;
                     if (globals->hasProp("fields"))
                     {
@@ -253,6 +256,12 @@ int main(int argc, const char **argv)
                     helper->createSegmentMonitors(manager);
                     count = helper->getChooseNLimit(); // Just because this is testing out the createIndexReadArg functionality
                 }
+                else
+                {
+                    // We don't have record info - fake it? We could pretend it's a single field...
+                    UNIMPLEMENTED;
+                    // manager.setown(createLocalKeyManager(fake, index, nullptr));
+                }
                 manager->finishSegmentMonitors();
                 manager->reset();
                 while (manager->lookup(true) && count--)