Browse Source

Merge pull request #13306 from richardkchapman/index-stream

HPCC-23428 Avoid the need to backpatch indexes

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 years ago
parent
commit
34239b4417

+ 7 - 2
ecl/hthor/hthor.cpp

@@ -1129,7 +1129,12 @@ void CHThorIndexWriteActivity::execute()
         Owned<IPropertyTree> metadata;
         buildUserMetadata(metadata);
         buildLayoutMetadata(metadata);
-        unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
+        unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
+        if (metadata->getPropBool("_noSeek", false))
+            flags |= TRAILING_HEADER_ONLY;
+        if (metadata->getPropBool("_useTrailingHeader", true))
+            flags |= USE_TRAILING_HEADER;
+
         size32_t keyMaxSize = helper.queryDiskRecordSize()->getRecordSize(NULL);
         if (hasTrailingFileposition(helper.queryDiskRecordSize()->queryTypeInfo()))
             keyMaxSize -= sizeof(offset_t);
@@ -1324,7 +1329,7 @@ void CHThorIndexWriteActivity::buildUserMetadata(Owned<IPropertyTree> & metadata
     {
         StringBuffer name(nameLen, nameBuff);
         StringBuffer value(valueLen, valueBuff);
-        if(*nameBuff == '_' && strcmp(name, "_nodeSize") != 0)
+        if(*nameBuff == '_' && !checkReservedMetadataName(name))
         {
             OwnedRoxieString fname(helper.getFileName());
             throw MakeStringException(0, "Invalid name %s in user metadata for index %s (names beginning with underscore are reserved)", name.str(), fname.get());

+ 6 - 2
roxie/ccd/ccdserver.cpp

@@ -12240,7 +12240,7 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity,
         {
             StringBuffer name(nameLen, nameBuff);
             StringBuffer value(valueLen, valueBuff);
-            if(*nameBuff == '_' && strcmp(name, "_nodeSize") != 0)
+            if(*nameBuff == '_' && !checkReservedMetadataName(name))
             {
                 OwnedRoxieString fname(helper.getFileName());
                 throw MakeStringException(0, "Invalid name %s in user metadata for index %s (names beginning with underscore are reserved)", name.str(), fname.get());
@@ -12331,7 +12331,11 @@ public:
             Owned<IPropertyTree> metadata;
             buildUserMetadata(metadata);
             buildLayoutMetadata(metadata);
-            unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
+            unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
+            if (metadata->getPropBool("_noSeek", false))
+                flags |= TRAILING_HEADER_ONLY;
+            if (metadata->getPropBool("_useTrailingHeader", true))
+                flags |= USE_TRAILING_HEADER;
             Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0, &helper, true, false);
             class BcWrapper : implements IBlobCreator
             {

+ 33 - 22
system/jhtree/ctfile.cpp

@@ -106,13 +106,22 @@ extern bool isCompressedIndex(const char *filename)
         if (io->read(0, sizeof(hdr), &hdr) == sizeof(hdr))
         {
             SwapBigEndian(hdr);
-            if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.root && hdr.root % hdr.nodeSize == 0 && hdr.ktype & (HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY))
+            if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.ktype & (HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY))
             {
-                NodeHdr root;
-                if (io->read(hdr.root, sizeof(root), &root) == sizeof(root))
+                if (hdr.ktype & USE_TRAILING_HEADER)
                 {
-                    SwapBigEndian(root);
-                    return root.leftSib==0 && root.rightSib==0; 
+                    if (io->read(size-hdr.nodeSize, sizeof(hdr), &hdr) != sizeof(hdr))
+                        return false;
+                    SwapBigEndian(hdr);
+                }
+                if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.root && hdr.root % hdr.nodeSize == 0 && hdr.ktype & (HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY))
+                {
+                    NodeHdr root;
+                    if (io->read(hdr.root, sizeof(root), &root) == sizeof(root))
+                    {
+                        SwapBigEndian(root);
+                        return root.leftSib==0 && root.rightSib==0;
+                    }
                 }
             }
         }
@@ -134,9 +143,19 @@ extern jhtree_decl bool isIndexFile(IFile *file)
         if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
             return false;
         SwapBigEndian(hdr);
-        if (!hdr.root || !hdr.nodeSize || !hdr.root || size % hdr.nodeSize || hdr.root % hdr.nodeSize || hdr.root >= size)
-            return false;
-        return true;    // Reasonable heuristic...
+        if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1)
+        {
+            if (hdr.ktype & USE_TRAILING_HEADER)
+            {
+                if (io->read(size-hdr.nodeSize, sizeof(hdr), &hdr) != sizeof(hdr))
+                    return false;
+                SwapBigEndian(hdr);
+
+            }
+            if (!hdr.root || !hdr.nodeSize || !hdr.root || size % hdr.nodeSize || hdr.root % hdr.nodeSize || hdr.root >= size)
+                return false;
+            return true;    // Reasonable heuristic...
+        }
     }
     catch (IException *E)
     {
@@ -169,20 +188,6 @@ void CKeyHdr::load(KeyHdr &_hdr)
         throw MakeKeyException(KeyExcpt_IncompatVersion, "This build is compatible with key versions <= %u. Key is version %u", KEYBUILD_VERSION, (unsigned) hdr.version);
 }
 
-void CKeyHdr::write(IWriteSeq *out, CRC32 *crc)
-{
-    unsigned nodeSize = hdr.nodeSize;
-    assertex(out->getRecordSize()==nodeSize);
-    MemoryAttr ma;
-    byte *buf = (byte *) ma.allocate(nodeSize); 
-    memcpy(buf, &hdr, sizeof(hdr));
-    memset(buf+sizeof(hdr), 0xff, nodeSize-sizeof(hdr));
-    SwapBigEndian(*(KeyHdr*) buf);
-    out->put(buf);
-    if (crc)
-        crc->tally(nodeSize, buf);
-}
-
 void CKeyHdr::write(IFileIOStream *out, CRC32 *crc)
 {
     unsigned nodeSize = hdr.nodeSize;
@@ -837,6 +842,12 @@ extern jhtree_decl void validateKeyFile(const char *filename, offset_t nodePos)
         throw MakeStringException(4, "Invalid key %s: failed to read key header", filename);
     CKeyHdr keyHdr;
     keyHdr.load(hdr);
+    if (keyHdr.getKeyType() & USE_TRAILING_HEADER)
+    {
+        if (io->read(size - keyHdr.getNodeSize(), sizeof(hdr), &hdr) != sizeof(hdr))
+            throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", filename);
+        keyHdr.load(hdr);
+    }
 
     _WINREV(hdr.phyrec);
     _WINREV(hdr.root);

+ 14 - 9
system/jhtree/ctfile.hpp

@@ -27,14 +27,13 @@
 
 #define NODESIZE 8192
 
-
-#define HTREE_FPOS_OFFSET   0x01 // Obsolete, not supported
+#define TRAILING_HEADER_ONLY  0x01 // Leading header not updated - use trailing one
 #define HTREE_TOPLEVEL_KEY  0x02
 #define COL_PREFIX          0x04
-#define COL_SUFFIX          0x08 // Obsolete, not supported
+#define HTREE_QUICK_COMPRESSED 0x08 // See QUICK_COMPRESSED_KEY below
 #define HTREE_VARSIZE       0x10
 #define HTREE_FULLSORT_KEY  0x20
-#define INDAR_TRAILING_SEG  0x80 // Obsolete, not supported
+#define USE_TRAILING_HEADER  0x80 // Real index header node located at end of file
 #define HTREE_COMPRESSED_KEY 0x40
 #define HTREE_QUICK_COMPRESSED_KEY 0x48
 #define KEYBUILD_VERSION 1 // unsigned short. NB: This should upped if a change would make existing keys incompatible with current build.
@@ -124,7 +123,13 @@ struct jhtree_decl NodeHdr
 //#pragma pack(4)
 #pragma pack(pop)
 
-class jhtree_decl CKeyHdr : public CInterface
+class CWritableKeyNode : public CInterface
+{
+public:
+    virtual void write(IFileIOStream *, CRC32 *crc) = 0;
+};
+
+class jhtree_decl CKeyHdr : public CWritableKeyNode
 {
 private:
     KeyHdr hdr;
@@ -132,8 +137,7 @@ public:
     CKeyHdr();
 
     void load(KeyHdr &_hdr);
-    void write(IWriteSeq *, CRC32 *crc = NULL);
-    void write(IFileIOStream *, CRC32 *crc = NULL);
+    virtual void write(IFileIOStream *, CRC32 *crc) override;
 
     unsigned int getMaxKeyLength();
     bool isVariable();
@@ -171,7 +175,7 @@ public:
 
 };
 
-class jhtree_decl CNodeBase : public CInterface
+class jhtree_decl CNodeBase : public CWritableKeyNode
 {
 protected:
     NodeHdr hdr;
@@ -183,6 +187,7 @@ protected:
     bool isVariable;
 
 public:
+    virtual void write(IFileIOStream *, CRC32 *crc) { throwUnexpected(); }
     inline offset_t getFpos() const { return fpos; }
     inline size32_t getKeyLen() const { return keyLen; }
     inline size32_t getNumKeys() const { return hdr.numKeys; }
@@ -332,7 +337,7 @@ public:
     CWriteNodeBase(offset_t fpos, CKeyHdr *keyHdr);
     ~CWriteNodeBase();
 
-    void write(IFileIOStream *, CRC32 *crc = NULL);
+    virtual void write(IFileIOStream *, CRC32 *crc) override;
     void setLeftSib(offset_t leftSib) { hdr.leftSib = leftSib; }
     void setRightSib(offset_t rightSib) { hdr.rightSib = rightSib; }
 };

+ 27 - 10
system/jhtree/jhtree.cpp

@@ -1016,6 +1016,11 @@ CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, b
     if (io->length() < sizeof(hdr))
         throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
     memcpy(&hdr, io->base(), sizeof(hdr));
+    if (hdr.ktype & USE_TRAILING_HEADER)
+    {
+        _WINREV(hdr.nodeSize);
+        memcpy(&hdr, (io->base()+io->length()) - hdr.nodeSize, sizeof(hdr));
+    }
     init(hdr, isTLK, false);
 }
 
@@ -1042,6 +1047,12 @@ CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTL
     KeyHdr hdr;
     if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
         throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
+    if (hdr.ktype & USE_TRAILING_HEADER)
+    {
+        _WINREV(hdr.nodeSize);
+        if (!io->read(io->size() - hdr.nodeSize, sizeof(hdr), &hdr))
+            throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", _name);
+    }
     init(hdr, isTLK, allowPreload);
 }
 
@@ -3030,7 +3041,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
 
     void testStepping()
     {
-        buildTestKeys(false);
+        buildTestKeys(false, true, false);
         {
             // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
             Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
@@ -3138,20 +3149,24 @@ class IKeyManagerTest : public CppUnit::TestFixture
         removeTestKeys();
     }
 
-    void buildTestKeys(bool variable)
+    void buildTestKeys(bool variable, bool useTrailingHeader, bool noSeek)
     {
-        buildTestKey("keyfile1.$$$", false, variable);
-        buildTestKey("keyfile2.$$$", true, variable);
+        buildTestKey("keyfile1.$$$", false, variable, useTrailingHeader, noSeek);
+        buildTestKey("keyfile2.$$$", true, variable, useTrailingHeader, noSeek);
     }
 
-    void buildTestKey(const char *filename, bool skip, bool variable)
+    void buildTestKey(const char *filename, bool skip, bool variable, bool useTrailingHeader, bool noSeek)
     {
         OwnedIFile file = createIFile(filename);
         OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
         Owned<IFileIOStream> out = createIOStream(io);
         unsigned maxRecSize = variable ? 18 : 10;
         unsigned keyedSize = 10;
-        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0, nullptr, true, false);
+        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |
+                (variable ? HTREE_VARSIZE : 0) |
+                (useTrailingHeader ? USE_TRAILING_HEADER : 0) |
+                (noSeek ? TRAILING_HEADER_ONLY : 0),
+                maxRecSize, NODESIZE, keyedSize, 0, nullptr, true, false);
 
         char keybuf[18];
         memset(keybuf, '0', 18);
@@ -3219,7 +3234,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
         key->releaseBlobs();
     }
 protected:
-    void testKeys(bool variable)
+    void testKeys(bool variable, bool useTrailingHeader, bool noSeek)
     {
         const char *json = variable ?
                 "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
@@ -3239,7 +3254,7 @@ protected:
                 "}";
         Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, false);
         const RtlRecord &recInfo = meta->queryRecordAccessor(true);
-        buildTestKeys(variable);
+        buildTestKeys(variable, useTrailingHeader, noSeek);
         {
             Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
             Owned <IKeyManager> tlk1 = createLocalKeyManager(recInfo, index1, NULL, false, false);
@@ -3405,8 +3420,10 @@ protected:
     void testKeys()
     {
         ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
-        testKeys(false);
-        testKeys(true);
+        for (bool var : { false, true })
+            for (bool trail : { false, true })
+                for (bool noseek : { false, true })
+                    testKeys(var, trail, noseek);
     }
 };
 

+ 46 - 23
system/jhtree/keybuild.cpp

@@ -89,6 +89,7 @@ protected:
     unsigned __int64 sequence;
     CRC32StartHT crcStartPosTable;
     CRC32EndHT crcEndPosTable;
+    CRC32 headCRC;
     bool doCrc = false;
 
 public:
@@ -105,12 +106,14 @@ public:
         prevLeafNode = NULL;
 
         assertex(nodeSize >= CKeyHdr::getSize());
-        assertex(nodeSize <= 0xffff); // stored in a short in the header - we should fix that if/when we restructure header 
+        assertex(nodeSize <= 0xffff); // stored in a short in the header - we should fix that if/when we restructure header
+        if (flags & TRAILING_HEADER_ONLY)
+            flags |= USE_TRAILING_HEADER;
         KeyHdr *hdr = keyHdr->getHdrStruct();
         hdr->nodeSize = nodeSize;
         hdr->extsiz = 4096;
         hdr->length = keyValueSize; 
-        hdr->ktype = flags; 
+        hdr->ktype = flags;
         hdr->timeid = 0;
         hdr->clstyp = 1;  // IDX_CLOSE
         hdr->maxkbn = nodeSize-sizeof(NodeHdr);
@@ -131,11 +134,12 @@ public:
         hdr->blobHead = 0;
         hdr->metadataHead = 0;
 
-        keyHdr->write(out);  // Reserve space for the header - we'll seek back and write it properly later
+        keyHdr->write(out, &headCRC);  // Reserve space for the header - we may seek back and write it properly later
     }
 
     CKeyBuilderBase(CKeyHdr * chdr)
     {
+        sequence = 0;
         levels = 0;
         records = 0;
         prevLeafNode = NULL;
@@ -201,26 +205,34 @@ protected:
         if (out)
         {
             out->flush();
-            out->seek(0, IFSbegin);
-            keyHdr->write(out, crc);
+            if (keyHdr->getKeyType() & USE_TRAILING_HEADER)
+                writeNode(keyHdr, out->tell());  // write a copy at end too, for use on systems that can't seek
+            if (!(keyHdr->getKeyType() & TRAILING_HEADER_ONLY))
+            {
+                out->seek(0, IFSbegin);
+                keyHdr->write(out, crc);
+            }
+            else if (crc)
+            {
+                *crc = headCRC;
+            }
         }
     }
 
-    void writeNode(CWriteNodeBase *node)
+    void writeNode(CWritableKeyNode *node, offset_t _nodePos)
     {
         unsigned nodeSize = keyHdr->getNodeSize();
         if (doCrc)
         {
-            offset_t nodePos = node->getFpos();
-            CRC32HTE *rollingCrcEntry1 = crcEndPosTable.find(nodePos); // is start of this block end of another?
-            nodePos += nodeSize; // update to endpos
+            CRC32HTE *rollingCrcEntry1 = crcEndPosTable.find(_nodePos); // is start of this block end of another?
+            offset_t endPos = _nodePos+nodeSize;
             if (rollingCrcEntry1)
             {
                 crcEndPosTable.removeExact(rollingCrcEntry1); // end pos will change
                 node->write(out, &rollingCrcEntry1->crc);
                 rollingCrcEntry1->size += nodeSize;
 
-                CRC32HTE *rollingCrcEntry2 = crcStartPosTable.find(nodePos); // is end of this block, start of another?
+                CRC32HTE *rollingCrcEntry2 = crcStartPosTable.find(endPos); // is end of this block, start of another?
                 if (rollingCrcEntry2)
                 {
                     crcStartPosTable.removeExact(rollingCrcEntry2); // remove completely, will join to rollingCrcEntry1
@@ -236,12 +248,12 @@ protected:
                     delete rollingCrcEntry2;
                 }
                 else
-                    rollingCrcEntry1->endBlockPos = nodePos;
+                    rollingCrcEntry1->endBlockPos = endPos;
                 crcEndPosTable.replace(*rollingCrcEntry1);
             }
             else
             {
-                rollingCrcEntry1 = crcStartPosTable.find(nodePos); // is end of this node, start of another?
+                rollingCrcEntry1 = crcStartPosTable.find(endPos); // is end of this node, start of another?
                 if (rollingCrcEntry1)
                 {
                     crcStartPosTable.removeExact(rollingCrcEntry1); // start pos will change
@@ -253,7 +265,7 @@ protected:
                     crcMerger.addChildCRC(rollingCrcEntry1->size, rollingCrcEntry1->crc.get(), true);
 
                     rollingCrcEntry1->crc.reset(~crcMerger.get());
-                    rollingCrcEntry1->startBlockPos = node->getFpos();
+                    rollingCrcEntry1->startBlockPos = _nodePos;
                     rollingCrcEntry1->size += nodeSize;
                     crcStartPosTable.replace(*rollingCrcEntry1);
                 }
@@ -261,8 +273,8 @@ protected:
                 {
                     rollingCrcEntry1 = new CRC32HTE;
                     node->write(out, &rollingCrcEntry1->crc);
-                    rollingCrcEntry1->startBlockPos = node->getFpos();
-                    rollingCrcEntry1->endBlockPos = node->getFpos()+nodeSize;
+                    rollingCrcEntry1->startBlockPos = _nodePos;
+                    rollingCrcEntry1->endBlockPos = _nodePos+nodeSize;
                     rollingCrcEntry1->size = nodeSize;
                     crcStartPosTable.replace(*rollingCrcEntry1);
                     crcEndPosTable.replace(*rollingCrcEntry1);
@@ -270,7 +282,7 @@ protected:
             }
         }
         else
-            node->write(out);
+            node->write(out, nullptr);
     }
 
     void flushNode(CWriteNode *node, NodeInfoArray &nodeInfo)
@@ -287,7 +299,7 @@ protected:
             }
             else
                 nodeInfo.append(* new CNodeInfo(prevLeafNode->getFpos(), NULL, keyedSize, lastSequence));
-            writeNode(prevLeafNode);
+            writeNode(prevLeafNode, prevLeafNode->getFpos());
             prevLeafNode->Release();
             prevLeafNode = NULL;
         }
@@ -381,7 +393,7 @@ public:
         }
         if (NULL != activeBlobNode)
         {
-            writeNode(activeBlobNode);
+            writeNode(activeBlobNode, activeBlobNode->getFpos());
             activeBlobNode->Release();
         }
         flushNode(NULL, leafInfo);
@@ -482,7 +494,7 @@ public:
         {
             activeBlobNode->setLeftSib(prevBlobNode->getFpos());
             prevBlobNode->setRightSib(activeBlobNode->getFpos());
-            writeNode(prevBlobNode);
+            writeNode(prevBlobNode, prevBlobNode->getFpos());
             delete(prevBlobNode);
         }
     }
@@ -528,11 +540,11 @@ protected:
             {
                 node->setLeftSib(prevNode->getFpos());
                 prevNode->setRightSib(node->getFpos());
-                writeNode(prevNode);
+                writeNode(prevNode, prevNode->getFpos());
             }
             prevNode.setown(node.getClear());
         }
-        writeNode(prevNode);
+        writeNode(prevNode, prevNode->getFpos());
     }
 
     void writeBloomFilter(const BloomFilter &filter, __uint64 fields)
@@ -559,14 +571,14 @@ protected:
             {
                 node->setLeftSib(prevNode->getFpos());
                 prevNode->setRightSib(node->getFpos());
-                writeNode(prevNode);
+                writeNode(prevNode, prevNode->getFpos());
             }
             prevNode.setown(node.getClear());
             if (!size)
                 break;
             node.setown(new CBloomFilterWriteNode(nextPos, keyHdr));
         }
-        writeNode(prevNode);
+        writeNode(prevNode, prevNode->getFpos());
     }
 };
 
@@ -641,6 +653,17 @@ extern jhtree_decl IKeyDesprayer * createKeyDesprayer(IFile * in, IFileIOStream
 
     Owned<CKeyHdr> hdr = new CKeyHdr;
     hdr->load(*(KeyHdr *)buffer.get());
+    if (hdr->getKeyType() & USE_TRAILING_HEADER)
+    {
+        if (io->read(in->size() - hdr->getNodeSize(), sizeof(KeyHdr), (void *)buffer.get()) != sizeof(KeyHdr))
+            throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", in->queryFilename());
+        hdr->load(*(KeyHdr*)buffer.get());
+    }
     hdr->getHdrStruct()->nument = 0;
     return new CKeyDesprayer(hdr, out);
 }
+
+extern jhtree_decl bool checkReservedMetadataName(const char *name)
+{
+    return strsame(name, "_nodeSize") || strsame(name, "_noSeek") || strsame(name, "_useTrailingHeader");
+}

+ 1 - 0
system/jhtree/keybuild.hpp

@@ -111,5 +111,6 @@ interface IKeyDesprayer : public IInterface
 };
 
 extern jhtree_decl IKeyDesprayer * createKeyDesprayer(IFile * in, IFileIOStream * out);
+extern jhtree_decl bool checkReservedMetadataName(const char *name);
 
 #endif

+ 2 - 2
system/jhtree/keydiff.cpp

@@ -230,7 +230,7 @@ public:
             quickCompressed = false;
         else
             throw MakeStringException(0, "Index file %s did not have compression flags set, unsupported", filename);
-        unsigned optionalFlags = (HTREE_VARSIZE | HTREE_QUICK_COMPRESSED_KEY | HTREE_TOPLEVEL_KEY | HTREE_FULLSORT_KEY);
+        unsigned optionalFlags = (HTREE_VARSIZE | HTREE_QUICK_COMPRESSED_KEY | HTREE_TOPLEVEL_KEY | HTREE_FULLSORT_KEY | TRAILING_HEADER_ONLY | USE_TRAILING_HEADER);
         unsigned requiredFlags = COL_PREFIX;
 #ifdef _DEBUG
         if((flags & ~optionalFlags) != requiredFlags)
@@ -413,7 +413,7 @@ public:
         if(!keyFileIO)
             throw MakeStringException(0, "Could not write index file %s", filename);
         keyStream.setown(createIOStream(keyFileIO));
-        unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY;
+        unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY | USE_TRAILING_HEADER;
         if(variableWidth)
             flags |= HTREE_VARSIZE;
         if(quickCompressed)

+ 7 - 2
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -171,7 +171,12 @@ public:
             flags |= HTREE_TOPLEVEL_KEY;
         buildUserMetadata(metadata);                
         buildLayoutMetadata(metadata);
-        unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
+        // NOTE - if you add any more flags here, be sure to update checkReservedMetadataName
+        unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
+        if (metadata->getPropBool("_noSeek", false))
+            flags |= TRAILING_HEADER_ONLY;
+        if (metadata->getPropBool("_useTrailingHeader", true))
+            flags |= USE_TRAILING_HEADER;
         builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, helper, !isTlk, isTlk));
     }
     void buildUserMetadata(Owned<IPropertyTree> & metadata)
@@ -185,7 +190,7 @@ public:
         {
             StringBuffer name(nameLen, nameBuff);
             StringBuffer value(valueLen, valueBuff);
-            if(*nameBuff == '_' && strcmp(name, "_nodeSize") != 0)
+            if(*nameBuff == '_' && !checkReservedMetadataName(name))
                 throw MakeActivityException(this, 0, "Invalid name %s in user metadata for index %s (names beginning with underscore are reserved)", name.str(), logicalFilename.get());
             if(!validateXMLTag(name.str()))
                 throw MakeActivityException(this, 0, "Invalid name %s in user metadata for index %s (not legal XML element name)", name.str(), logicalFilename.get());

+ 7 - 1
tools/dumpkey/dumpkey.cpp

@@ -158,6 +158,12 @@ int main(int argc, const char **argv)
                 MemoryAttr block(sizeof(KeyHdr));
                 io->read(0, sizeof(KeyHdr), (void *)block.get());
                 header->load(*(KeyHdr*)block.get());
+                if (header->getKeyType() & USE_TRAILING_HEADER)
+                {
+                    if (io->read(in->size() - header->getNodeSize(), sizeof(KeyHdr), (void *)block.get()) != sizeof(KeyHdr))
+                        throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", keyName);
+                    header->load(*(KeyHdr*)block.get());
+                }
 
                 printf("Key '%s'\nkeySize=%d keyedSize = %d NumParts=%x, Top=%d\n", keyName, key_size, keyedSize, index->numParts(), index->isTopLevelKey());
                 printf("File size = %" I64F "d, nodes = %" I64F "d\n", in->size(), in->size() / nodeSize - 1);
@@ -221,7 +227,7 @@ int main(int argc, const char **argv)
                         E->Release();
                     }
                 }
-                if (!diskmeta && metadata->hasProp("_record_ECL"))
+                if (!diskmeta && metadata && metadata->hasProp("_record_ECL"))
                 {
                     MultiErrorReceiver errs;
                     Owned<IHqlExpression> expr = parseQuery(metadata->queryProp("_record_ECL"), &errs);

+ 10 - 0
tools/vkey/vkey.cpp

@@ -406,6 +406,16 @@ int main(int argc, const char *argv[])
                 else
                 {
                     SwapBigEndian(h);
+                    if (h.ktype & USE_TRAILING_HEADER)
+                    {
+                        printf("Reading trailing key header\n");
+                        lseek(f, -h.nodeSize, SEEK_END);
+                        if (_read(f, &h, sizeof(h)) != sizeof(h))
+                        {
+                            noteError(0, "Could not read trailing key header\n");
+                        }
+                        SwapBigEndian(h);
+                    }
                     if (nodeAddress)
                     {
                         checkNode(f, h, nodeAddress);