Sfoglia il codice sorgente

Merge pull request #13781 from richardkchapman/indexblobs

HPCC-23428 Avoid the need to backpatch indexes

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 anni fa
parent
commit
4396f37a15

+ 5 - 1
ecl/hthor/hthor.cpp

@@ -1132,7 +1132,7 @@ void CHThorIndexWriteActivity::execute()
             io.setown(file->open(IFOcreate));
         }
         incomplete = true;
-        Owned<IFileIOStream> out = createIOStream(io);
+        bool needsSeek = true;
         bool isVariable = helper.queryDiskRecordSize()->isVariableSize();
         unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY;
         if (helper.getFlags() & TIWrowcompress)
@@ -1146,7 +1146,10 @@ void CHThorIndexWriteActivity::execute()
         buildLayoutMetadata(metadata);
         unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
         if (metadata->getPropBool("_noSeek", defaultNoSeek))
+        {
             flags |= TRAILING_HEADER_ONLY;
+            needsSeek = false;
+        }
         if (metadata->getPropBool("_useTrailingHeader", true))
             flags |= USE_TRAILING_HEADER;
 
@@ -1154,6 +1157,7 @@ void CHThorIndexWriteActivity::execute()
         if (hasTrailingFileposition(helper.queryDiskRecordSize()->queryTypeInfo()))
             keyMaxSize -= sizeof(offset_t);
 
+        Owned<IFileIOStream> out = createIOStream(io, needsSeek);
         Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, keyMaxSize, nodeSize, helper.getKeyedSize(), 0, &helper, true, false);
         class BcWrapper : implements IBlobCreator
         {

+ 5 - 1
roxie/ccd/ccdserver.cpp

@@ -12348,7 +12348,7 @@ public:
             if(!io)
                 throw MakeStringException(errno, "Failed to create file %s for writing", filename.str());
 
-            Owned<IFileIOStream> out = createIOStream(io);
+            bool needsSeek = true;
             unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY;
             if (helper.getFlags() & TIWrowcompress)
                 flags |= HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY;
@@ -12361,9 +12361,13 @@ public:
             buildLayoutMetadata(metadata);
             unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
             if (metadata->getPropBool("_noSeek", ctx->queryOptions().noSeekBuildIndex))
+            {
                 flags |= TRAILING_HEADER_ONLY;
+                needsSeek = false;
+            }
             if (metadata->getPropBool("_useTrailingHeader", true))
                 flags |= USE_TRAILING_HEADER;
+            Owned<IFileIOStream> out = createIOStream(io, needsSeek);
             Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0, &helper, true, false);
             class BcWrapper : implements IBlobCreator
             {

+ 3 - 19
system/jhtree/ctfile.cpp

@@ -271,8 +271,7 @@ void CWriteNodeBase::write(IFileIOStream *out, CRC32 *crc)
         lzwcomp.close();
     assertex(hdr.keyBytes<=maxBytes);
     writeHdr();
-    assertex(fpos);
-    out->seek(fpos, IFSbegin);
+    out->seek(getFpos(), IFSbegin);
     out->write(keyHdr->getNodeSize(), nodeBuf);
     if (crc)
         crc->tally(keyHdr->getNodeSize(), nodeBuf);
@@ -306,7 +305,7 @@ bool CWriteNode::add(offset_t pos, const void *indata, size32_t insize, unsigned
         keyPtr += sizeof(rsequence);
         hdr.keyBytes += sizeof(rsequence);
     }
-    if (isLeaf() && keyType & HTREE_COMPRESSED_KEY)
+    if (isLeaf() && (keyType & HTREE_COMPRESSED_KEY))
     {
         if (0 == hdr.numKeys)
             lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, isVariable, (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY);
@@ -412,8 +411,7 @@ unsigned __int64 CBlobWriteNode::makeBlobId(offset_t nodepos, unsigned offset)
 
 unsigned __int64 CBlobWriteNode::add(const char * &data, size32_t &size)
 {
-    assertex(fpos);
-    unsigned __int64 ret = makeBlobId(fpos, lzwcomp.getCurrentOffset());
+    unsigned __int64 ret = makeBlobId(getFpos(), lzwcomp.getCurrentOffset());
     unsigned written = lzwcomp.writeBlob(data, size);
     if (written)
     {
@@ -434,7 +432,6 @@ CMetadataWriteNode::CMetadataWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) : CWrit
 
 size32_t CMetadataWriteNode::set(const char * &data, size32_t &size)
 {
-    assertex(fpos);
     unsigned short written = ((size > (maxBytes-sizeof(unsigned short))) ? (maxBytes-sizeof(unsigned short)) : size);
     _WINCPYREV2(keyPtr, &written);
     memcpy(keyPtr+sizeof(unsigned short), data, written);
@@ -452,7 +449,6 @@ CBloomFilterWriteNode::CBloomFilterWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) :
 
 size32_t CBloomFilterWriteNode::set(const byte * &data, size32_t &size)
 {
-    assertex(fpos);
     unsigned short written;
     _WINCPYREV2(&written, keyPtr);
 
@@ -491,18 +487,6 @@ void CBloomFilterWriteNode::put8(__int64 val)
 
 //=========================================================================================================
 
-CNodeHeader::CNodeHeader() 
-{
-}
-
-void CNodeHeader::load(NodeHdr &_hdr)
-{
-    memcpy(&hdr, &_hdr, sizeof(hdr));
-    SwapBigEndian(hdr);
-}
-
-//=========================================================================================================
-
 CJHTreeNode::CJHTreeNode()
 {
     keyBuf = NULL;

+ 4 - 13
system/jhtree/ctfile.hpp

@@ -185,13 +185,15 @@ protected:
     byte keyType;
     size32_t keyLen;
     size32_t keyCompareLen;
-    offset_t fpos;
     CKeyHdr *keyHdr;
     bool isVariable;
 
+private:
+    offset_t fpos;
+
 public:
     virtual void write(IFileIOStream *, CRC32 *crc) { throwUnexpected(); }
-    inline offset_t getFpos() const { return fpos; }
+    inline offset_t getFpos() const { assertex(fpos); return fpos; }
     inline size32_t getKeyLen() const { return keyLen; }
     inline size32_t getNumKeys() const { return hdr.numKeys; }
     inline bool isBlob() const { return hdr.leafFlag == 2; }
@@ -316,17 +318,6 @@ private:
     unsigned read = 0;
 };
 
-class jhtree_decl CNodeHeader : public CNodeBase
-{
-public:
-    CNodeHeader();
-
-    void load(NodeHdr &hdr);
-
-    inline offset_t getRightSib() const { return hdr.rightSib; }
-    inline offset_t getLeftSib() const { return hdr.leftSib; }
-};
-
 class jhtree_decl CWriteNodeBase : public CNodeBase
 {
 protected:

+ 1 - 1
system/jhtree/jhtree.cpp

@@ -3161,7 +3161,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
     {
         OwnedIFile file = createIFile(filename);
         OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
-        Owned<IFileIOStream> out = createIOStream(io);
+        Owned<IFileIOStream> out = createIOStream(io, !noSeek);
         unsigned maxRecSize = variable ? 18 : 10;
         unsigned keyedSize = 10;
         Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |

+ 70 - 116
system/jhtree/keybuild.cpp

@@ -74,7 +74,7 @@ public:
     virtual bool matchesFindParam(const void *et, const void *fp, unsigned) const { return *(offset_t *)((const CRC32HTE *)et)->queryEndParam() == *(offset_t *)fp; }
 };
 
-class CKeyBuilderBase : public CInterface
+class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
 {
 protected:
     unsigned keyValueSize;
@@ -92,8 +92,22 @@ protected:
     CRC32 headCRC;
     bool doCrc = false;
 
+private:
+    unsigned __int64 duplicateCount;
+    __uint64 partitionFieldMask = 0;
+    CWriteNode *activeNode = nullptr;
+    CBlobWriteNode *activeBlobNode = nullptr;
+    CIArrayOf<CBlobWriteNode> pendingBlobs;
+    IArrayOf<IBloomBuilder> bloomBuilders;
+    IArrayOf<IRowHasher> rowHashers;
+    bool enforceOrder = true;
+    bool isTLK = false;
+
 public:
-    CKeyBuilderBase(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned _keyedSize, unsigned __int64 _startSequence) : out(_out)
+    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned _keyedSize, unsigned __int64 _startSequence,  IHThorIndexWriteArg *_helper, bool _enforceOrder, bool _isTLK)
+        : out(_out),
+          enforceOrder(_enforceOrder),
+          isTLK(_isTLK)
     {
         sequence = _startSequence;
         keyHdr.setown(new CKeyHdr());
@@ -135,24 +149,27 @@ public:
         hdr->metadataHead = 0;
 
         keyHdr->write(out, &headCRC);  // Reserve space for the header - we may seek back and write it properly later
-    }
 
-    CKeyBuilderBase(CKeyHdr * chdr)
-    {
-        sequence = 0;
-        levels = 0;
-        records = 0;
-        prevLeafNode = NULL;
-
-        keyHdr.set(chdr);
-        KeyHdr *hdr = keyHdr->getHdrStruct();
-        records = hdr->nument;
-        nextPos = hdr->nodeSize; // leaving room for header
-        keyValueSize = keyHdr->getMaxKeyLength();
-        keyedSize = keyHdr->getNodeKeyLength();
+        doCrc = true;
+        duplicateCount = 0;
+        if (_helper)
+        {
+            partitionFieldMask = _helper->getPartitionFieldMask();
+            auto bloomInfo =_helper->queryBloomInfo();
+            if (bloomInfo)
+            {
+                const RtlRecord &recinfo = _helper->queryDiskRecordSize()->queryRecordAccessor(true);
+                while (*bloomInfo)
+                {
+                    bloomBuilders.append(*createBloomBuilder(*bloomInfo[0]));
+                    rowHashers.append(*createRowHasher(recinfo, bloomInfo[0]->getBloomFields()));
+                    bloomInfo++;
+                }
+            }
+        }
     }
 
-    ~CKeyBuilderBase()
+    ~CKeyBuilder()
     {
         for (;;)
         {
@@ -291,6 +308,22 @@ protected:
     void flushNode(CWriteNode *node, NodeInfoArray &nodeInfo)
     {   
         // Messy code, but I don't have the energy to recode right now.
+        if (keyHdr->getKeyType() & TRAILING_HEADER_ONLY)
+        {
+            if (activeBlobNode)
+            {
+                pendingBlobs.append(*activeBlobNode);
+                activeBlobNode = nullptr;
+            }
+            while (pendingBlobs)
+            {
+                CBlobWriteNode &pending = pendingBlobs.item(0);
+                if (!prevLeafNode || pending.getFpos() > prevLeafNode->getFpos())
+                    break;
+                writeNode(&pending, pending.getFpos());
+                pendingBlobs.remove(0);
+            }
+        }
         if (prevLeafNode != NULL)
         {
             unsigned __int64 lastSequence = prevLeafNode->getLastSequence();
@@ -344,62 +377,31 @@ protected:
             hdr->hdrseq = levels;
         }
     }
-};
 
-class CKeyBuilder : public CKeyBuilderBase, implements IKeyBuilder
-{
-private:
-    CWriteNode *activeNode;
-    CBlobWriteNode *activeBlobNode;
-    unsigned __int64 duplicateCount;
-    __uint64 partitionFieldMask = 0;
-    IArrayOf<IBloomBuilder> bloomBuilders;
-    IArrayOf<IRowHasher> rowHashers;
-    bool enforceOrder = true;
-    bool isTLK = false;
-
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence,  IHThorIndexWriteArg *_helper, bool _enforceOrder, bool _isTLK)
-        : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence),
-          enforceOrder(_enforceOrder),
-          isTLK(_isTLK)
-    {
-        doCrc = true;
-        activeNode = NULL;
-        activeBlobNode = NULL;
-        duplicateCount = 0;
-        if (_helper)
-        {
-            partitionFieldMask = _helper->getPartitionFieldMask();
-            auto bloomInfo =_helper->queryBloomInfo();
-            if (bloomInfo)
-            {
-                const RtlRecord &recinfo = _helper->queryDiskRecordSize()->queryRecordAccessor(true);
-                while (*bloomInfo)
-                {
-                    bloomBuilders.append(*createBloomBuilder(*bloomInfo[0]));
-                    rowHashers.append(*createRowHasher(recinfo, bloomInfo[0]->getBloomFields()));
-                    bloomInfo++;
-                }
-            }
-        }
-    }
-public:
     void finish(IPropertyTree * metadata, unsigned * fileCrc)
     {
         if (NULL != activeNode)
         {
             flushNode(activeNode, leafInfo);
             activeNode->Release();
+            activeNode = nullptr;
         }
-        if (NULL != activeBlobNode)
+        if (activeBlobNode && !(keyHdr->getKeyType() & TRAILING_HEADER_ONLY))
         {
             writeNode(activeBlobNode, activeBlobNode->getFpos());
             activeBlobNode->Release();
+            activeBlobNode = nullptr;
         }
         flushNode(NULL, leafInfo);
+        if (keyHdr->getKeyType() & TRAILING_HEADER_ONLY)
+        {
+            ForEachItemIn(idx, pendingBlobs)
+            {
+                CBlobWriteNode &pending = pendingBlobs.item(idx);
+                writeNode(&pending, pending.getFpos());
+            }
+            pendingBlobs.kill();
+        }
         buildTree(leafInfo);
         if(metadata)
         {
@@ -453,7 +455,7 @@ public:
         }
         else if (enforceOrder) // NB: order is indeterminate when build a TLK for a LOCAL index. duplicateCount is not calculated in this case.
         {
-            int cmp = memcmp(keyData,activeNode->getLastKeyValue(),keyedSize);
+            int cmp = memcmp(keyData, activeNode->getLastKeyValue(), keyedSize);
             if (cmp<0)
                 throw MakeStringException(JHTREE_KEY_NOT_SORTED, "Unable to build index - dataset not sorted in key order");
             if (cmp==0)
@@ -497,8 +499,13 @@ public:
         {
             activeBlobNode->setLeftSib(prevBlobNode->getFpos());
             prevBlobNode->setRightSib(activeBlobNode->getFpos());
-            writeNode(prevBlobNode, prevBlobNode->getFpos());
-            delete(prevBlobNode);
+            if (keyHdr->getKeyType() & TRAILING_HEADER_ONLY)
+                pendingBlobs.append(*prevBlobNode);
+            else
+            {
+                writeNode(prevBlobNode, prevBlobNode->getFpos());
+                delete(prevBlobNode);
+            }
         }
     }
 
@@ -613,59 +620,6 @@ int compareParts(CInterface * const * _left, CInterface * const * _right)
     return (int)(left->part - right->part);
 }
 
-class CKeyDesprayer : public CKeyBuilderBase, public IKeyDesprayer
-{
-public:
-    CKeyDesprayer(CKeyHdr * _hdr, IFileIOStream * _out) : CKeyBuilderBase(_hdr)
-    {
-        out.set(_out);
-        nextPos = out->tell();
-    }
-    IMPLEMENT_IINTERFACE
-
-    virtual void addPart(unsigned idx, offset_t numRecords, NodeInfoArray & nodes)
-    {
-        records += numRecords;
-        parts.append(* new PartNodeInfo(idx, nodes));
-    }
-
-    virtual void finish()
-    {
-        levels = 1; // already processed one level of index....
-        parts.sort(compareParts);
-        ForEachItemIn(idx, parts)
-        {
-            NodeInfoArray & nodes = parts.item(idx).nodes;
-            ForEachItemIn(idx2, nodes)
-                leafInfo.append(OLINK(nodes.item(idx2)));
-        }
-        buildTree(leafInfo);
-        writeFileHeader(true, NULL);
-    }
-
-protected:
-    CIArrayOf<PartNodeInfo> parts;
-};
-
-
-extern jhtree_decl IKeyDesprayer * createKeyDesprayer(IFile * in, IFileIOStream * out)
-{
-    Owned<IFileIO> io = in->open(IFOread);
-    MemoryAttr buffer(sizeof(KeyHdr));
-    io->read(0, sizeof(KeyHdr), (void *)buffer.get());
-
-    Owned<CKeyHdr> hdr = new CKeyHdr;
-    hdr->load(*(KeyHdr *)buffer.get());
-    if (hdr->getKeyType() & USE_TRAILING_HEADER)
-    {
-        if (io->read(in->size() - hdr->getNodeSize(), sizeof(KeyHdr), (void *)buffer.get()) != sizeof(KeyHdr))
-            throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", in->queryFilename());
-        hdr->load(*(KeyHdr*)buffer.get());
-    }
-    hdr->getHdrStruct()->nument = 0;
-    return new CKeyDesprayer(hdr, out);
-}
-
 extern jhtree_decl bool checkReservedMetadataName(const char *name)
 {
     return strsame(name, "_nodeSize") || strsame(name, "_noSeek") || strsame(name, "_useTrailingHeader");

+ 14 - 9
system/jhtree/keydiff.cpp

@@ -298,6 +298,7 @@ public:
     size32_t queryKeyedSize() const { return keyedsize; }
     size32_t queryRowSize() const { return rowsize; }
     unsigned queryCRC() { return crc.get(); }
+    unsigned getFlags() { return keyIndex->getFlags(); }
     unsigned queryCount() const { return count; }
     bool isVariableWidth() const { return variableWidth; }
     bool isQuickCompressed() const { return quickCompressed; }
@@ -363,6 +364,7 @@ public:
         header->setPropInt("@keyedSize",reader.queryKeyedSize());
         header->setPropBool("@variableWidth",isvar);
         header->setPropBool("@quickCompressed",reader.isQuickCompressed());
+        header->setPropBool("@noSeek",(reader.getFlags() & TRAILING_HEADER_ONLY) != 0);
 #if 0
         PROGLOG("rowSize = %d",rowsize);
         PROGLOG("keyedSize = %d",reader.queryKeyedSize());
@@ -401,7 +403,7 @@ public:
     {
     }
         
-    void init (char const * filename, bool overwrite, size32_t _keyedsize, size32_t _rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize) 
+    void init (char const * filename, bool overwrite, size32_t _keyedsize, size32_t _rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize, bool noSeek)
     {
         keyedsize = _keyedsize;
         rowsize = _rowsize;
@@ -412,11 +414,13 @@ public:
         keyFileIO.setown(keyFile->openShared(IFOcreate, IFSHfull)); // not sure if needs shared here
         if(!keyFileIO)
             throw MakeStringException(0, "Could not write index file %s", filename);
-        keyStream.setown(createIOStream(keyFileIO));
+        keyStream.setown(createIOStream(keyFileIO, !noSeek));
         unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY | USE_TRAILING_HEADER;
-        if(variableWidth)
+        if (noSeek)
+            flags |= TRAILING_HEADER_ONLY;
+        if (variableWidth)
             flags |= HTREE_VARSIZE;
-        if(quickCompressed)
+        if (quickCompressed)
             flags |= HTREE_QUICK_COMPRESSED_KEY;
         keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0, nullptr, false, false)); // MORE - support for sequence other than 0...
     }
@@ -495,7 +499,7 @@ public:
     CKeyFileWriter(const char *filename, IPropertyTree *_header, bool overwrite, unsigned nodeSize)
         : header(createPTreeFromIPT(_header))
     {
-        writer.init(filename,overwrite,header->getPropInt("@keyedSize"), header->getPropInt("@rowSize"), header->getPropBool("@variableWidth"), header->getPropBool("@quickCompressed"), header->getPropInt("@nodeSize", NODESIZE));
+        writer.init(filename,overwrite,header->getPropInt("@keyedSize"), header->getPropInt("@rowSize"), header->getPropBool("@variableWidth"), header->getPropBool("@quickCompressed"), header->getPropInt("@nodeSize", NODESIZE), header->getPropBool("@noSeek"));
         size32_t rowsize = header->getPropInt("@rowSize");
         bool isvar = header->getPropBool("@variableWidth");
         buffer.init(rowsize,isvar);
@@ -1210,11 +1214,11 @@ public:
     {
     }
 
-    void open(char const * tlkName, bool overwrite, unsigned keyedsize, unsigned rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize)
+    void open(char const * tlkName, bool overwrite, unsigned keyedsize, unsigned rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize, bool noSeek)
     {
         filename.set(tlkName);
         writer.setown(new CKeyWriter());
-        writer->init(tlkName, overwrite, keyedsize, rowsize, variableWidth, quickCompressed, nodeSize);
+        writer->init(tlkName, overwrite, keyedsize, rowsize, variableWidth, quickCompressed, nodeSize, noSeek);
     }
 
     virtual int run()
@@ -1416,10 +1420,11 @@ private:
         if(progressCallback)
             oldInput->setProgressCallback(progressCallback.getLink(), progressFrequency);
         rowsize = oldInput->queryRowSize();
+        bool noSeek = (oldInput->getFlags() & TRAILING_HEADER_ONLY) != 0;
         newOutput.setown(new CKeyWriter());
-        newOutput->init(newIndex, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize());
+        newOutput->init(newIndex, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize(), noSeek);
         if(tlkGen)
-            tlkGen->open(newTLK, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize());
+            tlkGen->open(newTLK, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize(), noSeek);
         newcurr.init(rowsize, oldInput->isVariableWidth());
         newprev.init(rowsize, oldInput->isVariableWidth());
         oldcurr.init(rowsize, oldInput->isVariableWidth());

+ 7 - 5
system/jlib/jfile.cpp

@@ -2449,10 +2449,11 @@ IFileAsyncResult *CFileAsyncIO::writeAsync(offset_t pos, size32_t len, const voi
 
 //---------------------------------------------------------------------------
 
-CFileIOStream::CFileIOStream(IFileIO * _io)
+CFileIOStream::CFileIOStream(IFileIO * _io, bool _allowSeek)
 {
     io.set(_io);
     curOffset = 0;
+    allowSeek = _allowSeek;
 }
 
 
@@ -2470,6 +2471,7 @@ size32_t CFileIOStream::read(size32_t len, void * data)
 
 void CFileIOStream::seek(offset_t pos, IFSmode origin)
 {
+    auto oldOffset = curOffset;
     switch (origin)
     {
     case IFScurrent:
@@ -2482,6 +2484,8 @@ void CFileIOStream::seek(offset_t pos, IFSmode origin)
         curOffset = pos;
         break;
     }
+    if (!allowSeek && oldOffset != curOffset)
+        throw makeStringException(0, "Seek on non-seekable CFileIOStream");
 }
 
 offset_t CFileIOStream::size()
@@ -4130,13 +4134,11 @@ IFile * createIFile(const char * filename)
 }
 
 
-IFileIOStream * createIOStream(IFileIO * file)
+IFileIOStream * createIOStream(IFileIO * file, bool allowSeek)
 {
-    return new CFileIOStream(file);
+    return new CFileIOStream(file, allowSeek);
 }
 
-
-
 IFileIO * createIORange(IFileIO * io, offset_t header, offset_t length)
 {
     return new CFileRangeIO(io, header, length);

+ 1 - 1
system/jlib/jfile.hpp

@@ -267,7 +267,7 @@ extern jlib_decl IDirectoryIterator * createDirectoryIterator(const char * path
 extern jlib_decl IDirectoryIterator * createNullDirectoryIterator();
 extern jlib_decl IFileIO * createIORange(IFileIO * file, offset_t header, offset_t length);     // restricts input/output to a section of a file.
 
-extern jlib_decl IFileIOStream * createIOStream(IFileIO * file);        // links argument
+extern jlib_decl IFileIOStream * createIOStream(IFileIO * file, bool allowSeek=true);        // links argument
 extern jlib_decl IFileIOStream * createBufferedIOStream(IFileIO * file, unsigned bufsize=(unsigned)-1);// links argument
 extern jlib_decl IFileIOStream * createBufferedAsyncIOStream(IFileAsyncIO * file, unsigned bufsize=(unsigned)-1);// links argument
 

+ 2 - 1
system/jlib/jfile.ipp

@@ -183,7 +183,7 @@ protected: friend class CFileAsyncResult;
 class CFileIOStream : implements IFileIOStream, public CInterface
 {
 public:
-    CFileIOStream(IFileIO * _io);
+    CFileIOStream(IFileIO * _io, bool _allowSeek=true);
     IMPLEMENT_IINTERFACE
 
     virtual void flush();
@@ -196,6 +196,7 @@ public:
 protected:
     Linked<IFileIO>     io;
     offset_t            curOffset;
+    bool allowSeek = true;
 };
 
 

+ 6 - 2
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -157,9 +157,8 @@ public:
         StringBuffer partFname;
         getPartFilename(partDesc, 0, partFname);
         bool compress=false;
-        OwnedIFileIO iFileIO = createMultipleWrite(this, partDesc, 0, TW_RenameToPrimary, compress, NULL, this, &abortSoon);
-        Owned<IFileIOStream> out = createBufferedIOStream(iFileIO);
         ActPrintLog("INDEXWRITE: created fixed output stream %s", partFname.str());
+        bool needsSeek = true;
         unsigned flags = COL_PREFIX;
         if (TIWrowcompress & helper->getFlags())
             flags |= HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY;
@@ -176,9 +175,14 @@ public:
         // NOTE - if you add any more flags here, be sure to update checkReservedMetadataName
         unsigned nodeSize = metadata->getPropInt("_nodeSize", NODESIZE);
         if (metadata->getPropBool("_noSeek", defaultNoSeek))
+        {
             flags |= TRAILING_HEADER_ONLY;
+            needsSeek = false;
+        }
         if (metadata->getPropBool("_useTrailingHeader", true))
             flags |= USE_TRAILING_HEADER;
+        OwnedIFileIO iFileIO = createMultipleWrite(this, partDesc, 0, TW_RenameToPrimary, compress, NULL, this, &abortSoon);
+        Owned<IFileIOStream> out = createBufferedIOStream(iFileIO, needsSeek);
         builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, helper, !isTlk, isTlk));
     }
     void buildUserMetadata(Owned<IPropertyTree> & metadata)