소스 검색

HPCC-19431 Use Bloom filter to avoid btree lookups

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 년 전
부모
커밋
eee1078e8d

+ 1 - 1
ecl/hthor/hthor.cpp

@@ -1106,7 +1106,7 @@ void CHThorIndexWriteActivity::execute()
         if (hasTrailingFileposition(helper.queryDiskRecordSize()->queryTypeInfo()))
             keyMaxSize -= sizeof(offset_t);
 
-        Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, keyMaxSize, nodeSize, helper.getKeyedSize(), 0);
+        Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, keyMaxSize, nodeSize, helper.getKeyedSize(), 0, helper.getBloomKeyLength(), true);
         class BcWrapper : implements IBlobCreator
         {
             IKeyBuilder *builder;

+ 1 - 1
roxie/ccd/ccdserver.cpp

@@ -12179,7 +12179,7 @@ public:
             buildUserMetadata(metadata);
             buildLayoutMetadata(metadata);
             unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
-            Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0);
+            Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0, helper.getBloomKeyLength(), true);
             class BcWrapper : implements IBlobCreator
             {
                 IKeyBuilder *builder;

+ 7 - 0
rtl/eclrtl/eclhelper_base.cpp

@@ -41,6 +41,13 @@ bool CThorIndexWriteArg::getIndexMeta(size32_t & lenName, char * & name, size32_
 unsigned CThorIndexWriteArg::getWidth() { return 0; }
 ICompare * CThorIndexWriteArg::queryCompare() { return NULL; }
 
+unsigned CThorIndexWriteArg::getBloomKeyLength()
+{
+    // Default to building bloom on the first field...
+    IOutputMetaData *layout = queryDiskRecordSize();
+    return layout->queryRecordAccessor(true).getFixedOffset(1);
+}
+
 //CThorFirstNArg
 
 __int64 CThorFirstNArg::numToSkip() { return 0; }

+ 5 - 6
rtl/eclrtl/eclhelper_dyn.cpp

@@ -136,10 +136,8 @@ public:
         if (fieldNum == (unsigned) -1)
             throw MakeStringException(0, "Invalid filter string: field '%s' not recognized", fieldName.str());
         unsigned numOffsets = inrec.getNumVarFields() + 1;
-        size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
-        RtlRow offsetCalculator(inrec, nullptr, numOffsets, variableOffsets);
-        unsigned fieldOffset = offsetCalculator.getOffset(fieldNum);
-        unsigned fieldSize = offsetCalculator.getSize(fieldNum);
+        unsigned fieldOffset = inrec.getFixedOffset(fieldNum);
+        unsigned fieldSize = inrec.getFixedOffset(fieldNum+1) - fieldOffset;
         const RtlTypeInfo *fieldType = inrec.queryType(fieldNum);
         filter = epos+1;
         if (*filter=='~')
@@ -155,8 +153,9 @@ public:
             while (filters.length()<=fieldNum)
             {
                 filters.append(nullptr);
-                filterOffsets.append(offsetCalculator.getOffset(filters.length()));
-                filterSizes.append(offsetCalculator.getSize(filters.length()));
+                unsigned dummyOffset = inrec.getFixedOffset(filters.length());
+                filterOffsets.append(dummyOffset);
+                filterSizes.append(inrec.getFixedOffset(filters.length()+1) - dummyOffset);
             }
             IStringSet *prev = filters.item(fieldNum);
             if (prev)

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -1159,6 +1159,7 @@ struct IHThorIndexWriteArg : public IHThorArg
     virtual unsigned getWidth() = 0;                // only guaranteed present if TIWhaswidth defined
     virtual ICompare * queryCompare() = 0;          // only guaranteed present if TIWhaswidth defined
     virtual unsigned getMaxKeySize() = 0;
+    virtual unsigned getBloomKeyLength() = 0;
 };
 
 struct IHThorFirstNArg : public IHThorArg

+ 1 - 0
rtl/include/eclhelper_base.hpp

@@ -64,6 +64,7 @@ public:
     virtual bool getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) override;
     virtual unsigned getWidth() override;
     virtual ICompare * queryCompare() override;
+    virtual unsigned getBloomKeyLength() override;
 };
 
 class ECLRTL_API CThorFirstNArg : public CThorArgOf<IHThorFirstNArg>

+ 2 - 0
system/jhtree/CMakeLists.txt

@@ -30,6 +30,7 @@ set (    SRCS
          ctfile.cpp 
          jhtree.cpp 
          jhutil.cpp 
+         bloom.cpp
          keybuild.cpp 
          keydiff.cpp 
          sourcedoc.xml
@@ -40,6 +41,7 @@ set (    INCLUDES
          hlzw.h
          jhtree.hpp
          jhutil.hpp
+         bloom.hpp
          keybuild.hpp
          keydiff.hpp
     )

+ 151 - 0
system/jhtree/bloom.cpp

@@ -0,0 +1,151 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "jlib.hpp"
+#include "bloom.hpp"
+#include "math.h"
+
+BloomFilter::BloomFilter(unsigned _cardinality, double _probability)
+{
+    unsigned cardinality = _cardinality ? _cardinality : 1;
+    double probability = _probability >= 0.3 ? 0.3 : (_probability < 0.01 ? 0.01 : _probability);
+    numBits = rtlRoundUp(-(cardinality*log(probability))/pow(log(2),2));
+    unsigned tableSize = (numBits + 7) / 8;
+    numBits = tableSize * 8;
+    numHashes = round((numBits * log(2))/cardinality);
+    table = (byte *) calloc(tableSize, 1);
+}
+
+BloomFilter::BloomFilter(unsigned _numHashes, unsigned _tableSize, byte *_table)
+{
+    numBits = _tableSize * 8;
+    numHashes = _numHashes;
+    table = _table;  // Note - takes ownership
+}
+
+BloomFilter::~BloomFilter()
+{
+    free(table);
+}
+
+void BloomFilter::add(hash64_t hash)
+{
+    uint32_t hash1 = hash >> 32;
+    uint32_t hash2 = hash & 0xffffffff;
+    for (unsigned i=0; i < numHashes; i++)
+    {
+        // Kirsch and Mitzenmacher technique (Harvard U)
+        uint64_t bit = (hash1 + (i * hash2)) % numBits;
+        uint64_t slot = bit / 8;
+        unsigned shift = bit % 8;
+        unsigned mask = 1 << shift;
+        table[slot] |= mask;
+    }
+}
+
+bool BloomFilter::test(hash64_t hash) const
+{
+    uint32_t hash1 = hash >> 32;
+    uint32_t hash2 = hash & 0xffffffff;
+    for (unsigned i=0; i < numHashes; i++)
+    {
+        // Kirsch and Mitzenmacher technique (Harvard U)
+        uint64_t bit = (hash1 + (i * hash2)) % numBits;
+        uint64_t slot = bit / 8;
+        unsigned shift = bit % 8;
+        unsigned mask = 1 << shift;
+        if (!(table[slot] & mask))
+            return false;
+      }
+    return true;
+}
+
+BloomBuilder::BloomBuilder(unsigned _maxHashes) : maxHashes(_maxHashes)
+{
+    isValid = true;
+}
+
+bool BloomBuilder::add(hash64_t val)
+{
+    if (isValid)
+    {
+        if (hashes.length()==maxHashes)
+        {
+            isValid = false;
+            hashes.kill();
+        }
+        else
+            hashes.append(val);
+    }
+    return isValid;
+}
+
+bool BloomBuilder::valid() const
+{
+    return isValid && hashes.length();
+}
+
+const BloomFilter * BloomBuilder::build(double probability) const
+{
+    if (!valid())
+        return nullptr;
+    BloomFilter *b = new BloomFilter(hashes.length(), probability);
+    ForEachItemIn(idx, hashes)
+    {
+        b->add(hashes.item(idx));
+    }
+    return b;
+}
+
+#ifdef _USE_CPPUNIT
+#include "unittests.hpp"
+
+class BloomTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(BloomTest);
+      CPPUNIT_TEST(testBloom);
+    CPPUNIT_TEST_SUITE_END();
+
+    const unsigned count = 1000000;
+    void testBloom()
+    {
+        BloomBuilder b;
+        for (unsigned val = 0; val < count; val++)
+            b.add(rtlHash64Data(sizeof(val), &val, HASH64_INIT));
+        Owned<const BloomFilter> f = b.build(0.01);
+        unsigned falsePositives = 0;
+        unsigned falseNegatives = 0;
+        unsigned start = usTick();
+        for (unsigned val = 0; val < count; val++)
+        {
+            if (!f->test(rtlHash64Data(sizeof(val), &val, HASH64_INIT)))
+                falseNegatives++;
+            if (f->test(rtlHash64Data(sizeof(val), &val, HASH64_INIT+1)))
+                falsePositives++;
+        }
+        unsigned end = usTick();
+        ASSERT(falseNegatives==0);
+        DBGLOG("Bloom filter (%d, %d) gave %d false positives (%.02f %%) in %d uSec", f->queryNumHashes(), f->queryTableSize(), falsePositives, (falsePositives * 100.0)/count, end-start);
+    }
+
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( BloomTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( BloomTest, "BloomTest" );
+
+#endif

+ 115 - 0
system/jhtree/bloom.hpp

@@ -0,0 +1,115 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef _BLOOM_INCL
+#define _BLOOM_INCL
+
+#include "jhtree.hpp"
+
+/**
+ *   A BloomFilter object is used to create or test a Bloom filter - this can be used to quickly determine whether a value has been added to the filter,
+ *   giving some false positives but no false negatives.
+ */
+
+class jhtree_decl BloomFilter : public CInterface
+{
+public:
+    /*
+     * Create an empty bloom filter
+     *
+     * @param cardinality Expected number of values to be added. This will be used to determine the appropriate size and hash count
+     * @param probability Desired probability of false positives. This will be used to determine the appropriate size and hash count
+     */
+    BloomFilter(unsigned cardinality, double probability=0.1);
+    /*
+     * Create a bloom filter from a previously-generated table. Parameters must batch those used when building the table.
+     *
+     * @param numHashes  Number of hashes to use for each lookup.
+     * @param tableSize  Size (in bytes) of the table
+     * @param table      Bloom table. Note that the BloomFilter object will take ownership of this memory, so it must be allocated on the heap.
+     */
+    BloomFilter(unsigned numHashes, unsigned tableSize, byte *table);
+    /*
+     * BloomFilter destructor
+     */
+    virtual ~BloomFilter();
+    /*
+     * Add a value to the filter
+     *
+     * @param hash   The hash of the value to be added
+     */
+    void add(hash64_t hash);
+    /*
+     * Test if a value has been added to the filter (with some potential for false-positives)
+     *
+     * @param hash   The hash of the value to be tested.
+     * @return       False if the value is definitely not present, otherwise true.
+     */
+    bool test(hash64_t hash) const;
+    /*
+     * Add a value to the filter, by key
+     *
+     * @param len   The length of the key
+     * @param val   The key data
+     */
+    inline void add(size32_t len, const void *val) { add(rtlHash64Data(len, val, HASH64_INIT)); }
+    /*
+     * Test if a value has been added to the filter (with some potential for false-positives), by key
+     *
+     * @param len   The length of the key
+     * @param val   The key data
+     * @return       False if the value is definitely not present, otherwise true.
+     */
+    inline bool test(size32_t len, const void *val) { return test(rtlHash64Data(len, val, HASH64_INIT)); }
+    /*
+     * Retrieve bloom table size
+     *
+     * @return       Size, in bytes.
+     */
+    inline unsigned queryTableSize() const { return numBits / 8; }
+    /*
+     * Retrieve bloom table hash count
+     *
+     * @return       Hash count.
+     */
+    inline unsigned queryNumHashes() const { return numHashes; }
+    /*
+     * Retrieve bloom table data
+     *
+     * @return       Table data.
+     */
+    inline const byte *queryTable() const { return table; }
+protected:
+    unsigned numBits;
+    unsigned numHashes;
+    byte *table;
+};
+
+class jhtree_decl BloomBuilder
+{
+public:
+    BloomBuilder(unsigned _maxHashes = 1000000);
+    const BloomFilter * build(double probability=0.1) const;
+    bool add(hash64_t hash);
+    inline bool add(size32_t len, const void *val) { return add(rtlHash64Data(len, val, HASH64_INIT)); }
+    bool valid() const;
+protected:
+    ArrayOf<hash64_t> hashes;
+    const unsigned maxHashes;
+    bool isValid;
+};
+#endif

+ 29 - 1
system/jhtree/ctfile.cpp

@@ -80,6 +80,10 @@ inline void SwapBigEndian(KeyHdr &hdr)
     _WINREV(hdr.version);
     _WINREV(hdr.blobHead);
     _WINREV(hdr.metadataHead);
+    _WINREV(hdr.bloomHead);
+    _WINREV(hdr.bloomTableSize);
+    _WINREV(hdr.bloomKeyLength);
+    _WINREV(hdr.bloomTableHashes);
 }
 
 inline void SwapBigEndian(NodeHdr &hdr)
@@ -444,6 +448,24 @@ size32_t CMetadataWriteNode::set(const char * &data, size32_t &size)
 
 //=========================================================================================================
 
+CBloomFilterWriteNode::CBloomFilterWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) : CWriteNodeBase(_fpos, _keyHdr)
+{
+    hdr.leafFlag = 4;
+}
+
+size32_t CBloomFilterWriteNode::set(const byte * &data, size32_t &size)
+{
+    assertex(fpos);
+    unsigned short written = ((size > (maxBytes-sizeof(unsigned short))) ? (maxBytes-sizeof(unsigned short)) : size);
+    _WINCPYREV2(keyPtr, &written);
+    memcpy(keyPtr+sizeof(unsigned short), data, written);
+    data += written;
+    size -= written;
+    return written;
+}
+
+//=========================================================================================================
+
 CNodeHeader::CNodeHeader() 
 {
 }
@@ -556,7 +578,7 @@ void CJHTreeNode::unpack(const void *node, bool needCopy)
         keys += sizeof(unsigned __int64);
         _WINREV(firstSequence);
     }
-    if(isMetadata())
+    if(isMetadata() || isBloom())
     {
         unsigned short len = *reinterpret_cast<unsigned short *>(keys);
         _WINREV(len);
@@ -1006,6 +1028,12 @@ void CJHTreeMetadataNode::get(StringBuffer & out)
     out.append(expandedSize, keyBuf);
 }
 
+void CJHTreeBloomTableNode::get(MemoryBuffer & out)
+{
+    out.append(expandedSize, keyBuf);
+}
+
+
 class DECL_EXCEPTION CKeyException : implements IKeyException, public CInterface
 {
     int errCode;

+ 23 - 0
system/jhtree/ctfile.hpp

@@ -96,6 +96,10 @@ struct __declspec(novtable) jhtree_decl KeyHdr
     short unused[2]; /* unused ecx */
     __int64 blobHead; /* fpos of first blob node f0x */
     __int64 metadataHead; /* fpos of first metadata node f8x */
+    __int64 bloomHead; /* fpos of bloom table data, if present 100x */
+    uint32_t bloomTableSize;  /* Size in bytes of bloom table 108x */
+    unsigned short bloomKeyLength; /* Length of bloom keyed fields 11cx */
+    unsigned short bloomTableHashes; /* Number of hashes in bloom table 11ex */
 };
 
 //#pragma pack(1)
@@ -170,6 +174,7 @@ public:
     inline size32_t getNumKeys() const { return hdr.numKeys; }
     inline bool isBlob() const { return hdr.leafFlag == 2; }
     inline bool isMetadata() const { return hdr.leafFlag == 3; }
+    inline bool isBloom() const { return hdr.leafFlag == 4; }
     inline bool isLeaf() const { return hdr.leafFlag != 0; }
 
 public:
@@ -257,6 +262,17 @@ public:
     void get(StringBuffer & out);
 };
 
+class CJHTreeBloomTableNode : public CJHTreeNode
+{
+public:
+    virtual bool getValueAt(unsigned int num, char *key) const {throwUnexpected();}
+    virtual offset_t getFPosAt(unsigned int num) const {throwUnexpected();}
+    virtual size32_t getSizeAt(unsigned int num) const {throwUnexpected();}
+    virtual int compareValueAt(const char *src, unsigned int index) const {throwUnexpected();}
+    virtual void dump() {throwUnexpected();}
+    void get(MemoryBuffer & out);
+};
+
 class jhtree_decl CNodeHeader : public CNodeBase
 {
 public:
@@ -320,6 +336,13 @@ public:
     size32_t set(const char * &data, size32_t &size);
 };
 
+class jhtree_decl CBloomFilterWriteNode : public CWriteNodeBase
+{
+public:
+    CBloomFilterWriteNode(offset_t _fpos, CKeyHdr *keyHdr);
+    size32_t set(const byte * &data, size32_t &size);
+};
+
 enum KeyExceptionCodes
 {
     KeyExcpt_IncompatVersion = 1,

+ 91 - 13
system/jhtree/jhtree.cpp

@@ -57,6 +57,7 @@
 
 #include "jhtree.ipp"
 #include "keybuild.hpp"
+#include "bloom.hpp"
 #include "eclhelper_dyn.hpp"
 #include "rtlrecord.hpp"
 #include "rtldynfield.hpp"
@@ -115,6 +116,19 @@ size32_t SegMonitorList::getSize() const
         return 0;
 }
 
+bool SegMonitorList::isExact(unsigned len) const
+{
+    ForEachItemIn(idx, segMonitors)
+    {
+        IKeySegmentMonitor &seg = segMonitors.item(idx);
+        if (seg.getOffset() >= len)
+            return true;
+        if (!seg.isWellKeyed())
+            return false;
+    }
+    return false;
+}
+
 void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname)
 {
     size32_t segSize = getSize();
@@ -316,6 +330,7 @@ protected:
     char *keyBuffer;
     unsigned keySize;       // size of key record including payload
     unsigned keyedSize;     // size of non-payload part of key
+    unsigned bloomLength = 0;   // size that can be prechecked using bloom filter
     unsigned numsegs;
     bool matched = false;
     bool eof = false;
@@ -539,12 +554,13 @@ public:
                     throw e;
                 }
                 keyBuffer = (char *) malloc(keySize);
-
+                bloomLength = ki->getBloomKeyLength();
             }
             else
             {
                 assertex(keyedSize==ki->keyedSize());
                 assertex(keySize==ki->keySize());
+                assertex(bloomLength==ki->getBloomKeyLength());
             }
         }
     }
@@ -638,28 +654,42 @@ public:
         unsigned lscans = 0;
         while (!eof)
         {
-            bool ok;
+            bool canMatch = true;
             if (matched)
             {
-                ok = keyCursor->next(keyBuffer);
+                if (!keyCursor->next(keyBuffer))
+                    eof = true;
                 lscans++;
             }
             else
             {
-                ok = keyCursor->gtEqual(keyBuffer, keyBuffer, true);
-                lseeks++;
+                if (exact && bloomLength && segs.isExact(bloomLength))
+                {
+                    hash64_t hash = rtlHash64Data(bloomLength, keyBuffer, HASH64_INIT);
+                    if (!keyCursor->checkBloomFilter(hash))
+                        canMatch = false;
+                }
+                if (canMatch)
+                {
+                    if (!keyCursor->gtEqual(keyBuffer, keyBuffer, true))
+                        eof = true;
+                    lseeks++;
+                }
             }
-            if (ok)
+            if (!eof)
             {
                 unsigned i = 0;
-                matched = true;
-                if (segs.segMonitors.length())
+                if (canMatch)
                 {
-                    for (; i <= lastSeg; i++)
+                    matched = true;
+                    if (segs.segMonitors.length())
                     {
-                        matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
-                        if (!matched)
-                            break;
+                        for (; i <= lastSeg; i++)
+                        {
+                            matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
+                            if (!matched)
+                                break;
+                        }
                     }
                 }
                 if (matched)
@@ -1311,6 +1341,7 @@ void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
         }
     }
     rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
+    loadBloomFilter();
 }
 
 CKeyIndex::~CKeyIndex()
@@ -1400,6 +1431,9 @@ CJHTreeNode *CKeyIndex::loadNode(char *nodeData, offset_t pos, bool needsCopy)
         case 3:
             ret.setown(new CJHTreeMetadataNode());
             break;
+        case 4:
+            ret.setown(new CJHTreeBloomTableNode());
+            break;
         default:
             throwUnexpected();
         }
@@ -1539,6 +1573,36 @@ offset_t CKeyIndex::queryMetadataHead()
     return ret;
 }
 
+void CKeyIndex::loadBloomFilter()
+{
+    offset_t bloomAddr = keyHdr->getHdrStruct()->bloomHead;
+    if (!bloomAddr || bloomAddr == static_cast<offset_t>(-1))
+        return; // indexes created before introduction of bloomfilter would have FFFF... in this space
+    uint32_t bloomTableSize = keyHdr->getHdrStruct()->bloomTableSize;
+    unsigned short bloomTableHashes = keyHdr->getHdrStruct()->bloomTableHashes;
+    MemoryBuffer bloomTable;
+    bloomTable.ensureCapacity(bloomTableSize);
+    while (bloomAddr)
+    {
+        Owned<CJHTreeNode> node = loadNode(bloomAddr);
+        assertex(node->isBloom());
+        static_cast<CJHTreeBloomTableNode *>(node.get())->get(bloomTable);
+        bloomAddr = node->getRightSib();
+    }
+    assertex(bloomTable.length()==bloomTableSize);
+    bloomFilter.setown(new BloomFilter(bloomTableHashes, bloomTableSize, (byte *) bloomTable.detach()));
+}
+
+const BloomFilter * CKeyIndex::queryBloomFilter()
+{
+    return bloomFilter;
+}
+
+unsigned CKeyIndex::getBloomKeyLength()
+{
+    return keyHdr->getHdrStruct()->bloomKeyLength;
+}
+
 IPropertyTree * CKeyIndex::getMetadata()
 {
     offset_t nodepos = queryMetadataHead();
@@ -1571,6 +1635,8 @@ CKeyCursor::CKeyCursor(CKeyIndex &_key, IContextLogger *_ctx)
     : key(_key), ctx(_ctx)
 {
     key.Link();
+    bloomFilter = key.queryBloomFilter();
+    bloomLength = key.getBloomKeyLength();
     nodeKey = 0;
 }
 
@@ -1585,6 +1651,16 @@ void CKeyCursor::reset()
     node.clear();
 }
 
+bool CKeyCursor::checkBloomFilter(hash64_t hash)
+{
+    return bloomFilter != nullptr && bloomFilter->test(hash);
+}
+
+unsigned CKeyCursor::getBloomKeyLength()
+{
+    return bloomLength;
+}
+
 CJHTreeNode *CKeyCursor::locateFirstNode()
 {
     CJHTreeNode * n = 0;
@@ -1958,6 +2034,8 @@ public:
     virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
+    virtual const BloomFilter * queryBloomFilter() { return checkOpen().queryBloomFilter(); }
+    virtual unsigned getBloomKeyLength() { return checkOpen().getBloomKeyLength(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
     virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
     virtual bool hasSpecialFileposition() const { return realKey ? realKey->hasSpecialFileposition() : false; }
@@ -3016,7 +3094,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
         Owned<IFileIOStream> out = createIOStream(io);
         unsigned maxRecSize = variable ? 18 : 10;
         unsigned keyedSize = 10;
-        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0);
+        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0, keyedSize, true);
 
         char keybuf[18];
         memset(keybuf, '0', 18);

+ 8 - 1
system/jhtree/jhtree.hpp

@@ -29,6 +29,8 @@
 #include "jlog.hpp"
 #include "errorlist.h"
 
+class BloomFilter;
+
 interface jhtree_decl IDelayedFile : public IInterface
 {
     virtual IMemoryMappedFile *getMappedFile() = 0;
@@ -50,6 +52,8 @@ interface jhtree_decl IKeyCursor : public IInterface
     virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize) = 0;
     virtual void releaseBlobs() = 0;
     virtual void reset() = 0;
+    virtual bool checkBloomFilter(hash64_t hash) = 0;
+    virtual unsigned getBloomKeyLength() = 0;
 };
 
 interface IKeyIndex;
@@ -79,6 +83,9 @@ interface jhtree_decl IKeyIndex : public IKeyIndexBase
     virtual offset_t queryLatestGetNodeOffset() const = 0;
     virtual offset_t queryMetadataHead() = 0;
     virtual IPropertyTree * getMetadata() = 0;
+    virtual const BloomFilter * queryBloomFilter() = 0;
+    virtual unsigned getBloomKeyLength() = 0;
+
     virtual unsigned getNodeSize() = 0;
     virtual const IFileIO *queryFileIO() const = 0;
     virtual bool hasSpecialFileposition() const = 0;
@@ -182,7 +189,7 @@ public:
     unsigned lastFullSeg() const;
     bool matched(void *keyBuffer, unsigned &lastMatch) const;
     size32_t getSize() const;
-
+    bool isExact(unsigned bytes) const;  // Are first N bytes an exact match ?
     void checkSize(size32_t keyedSize, char const * keyname);
     void recalculateCache();
     void finish(size32_t keyedSize);

+ 8 - 0
system/jhtree/jhtree.ipp

@@ -77,6 +77,7 @@ protected:
     StringAttr name;
     CriticalSection blobCacheCrit;
     Owned<CJHTreeBlobNode> cachedBlobNode;
+    Owned<BloomFilter> bloomFilter;
     offset_t cachedBlobNodePos;
 
     CKeyHdr *keyHdr;
@@ -95,6 +96,7 @@ protected:
     ~CKeyIndex();
     void init(KeyHdr &hdr, bool isTLK, bool allowPreload);
     void cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK);
+    void loadBloomFilter();
     
 public:
     IMPLEMENT_IINTERFACE;
@@ -122,6 +124,8 @@ public:
     virtual offset_t queryLatestGetNodeOffset() const { return latestGetNodeOffset; }
     virtual offset_t queryMetadataHead();
     virtual IPropertyTree * getMetadata();
+    virtual const BloomFilter * queryBloomFilter();
+    virtual unsigned getBloomKeyLength();
     virtual unsigned getNodeSize() { return keyHdr->getNodeSize(); }
     virtual bool hasSpecialFileposition() const;
  
@@ -162,6 +166,8 @@ class jhtree_decl CKeyCursor : public IKeyCursor, public CInterface
 private:
     IContextLogger *ctx;
     CKeyIndex &key;
+    const BloomFilter *bloomFilter = nullptr;
+    unsigned bloomLength = 0;
     Owned<CJHTreeNode> node;
     unsigned int nodeKey;
     ConstPointerArray activeBlobs;
@@ -188,6 +194,8 @@ public:
     virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize);
     virtual void releaseBlobs();
     virtual void reset();
+    virtual bool checkBloomFilter(hash64_t hash);
+    virtual unsigned getBloomKeyLength();
 };
 
 

+ 60 - 6
system/jhtree/keybuild.cpp

@@ -16,6 +16,7 @@
 ############################################################################## */
 
 #include "keybuild.hpp"
+#include "bloom.hpp"
 #include "jmisc.hpp"
 
 struct CRC32HTE
@@ -336,20 +337,28 @@ private:
     CWriteNode *activeNode;
     CBlobWriteNode *activeBlobNode;
     unsigned __int64 duplicateCount;
+    unsigned bloomKeyLength = 0;
+    BloomBuilder bloomBuilder;
+    byte *lastBloomKeyData = nullptr;
     bool enforceOrder = true;
 
 public:
     IMPLEMENT_IINTERFACE;
 
-    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence, bool _enforceOrder)
-        : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence), enforceOrder(_enforceOrder)
+    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence, unsigned _bloomKeyLength, bool _enforceOrder)
+        : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence), bloomKeyLength(_bloomKeyLength), enforceOrder(_enforceOrder)
     {
         doCrc = true;
         activeNode = NULL;
         activeBlobNode = NULL;
         duplicateCount = 0;
+        if (bloomKeyLength)
+            lastBloomKeyData = (byte *) calloc(bloomKeyLength, 1);
+    }
+    ~CKeyBuilder()
+    {
+        free(lastBloomKeyData);
     }
-
 public:
     void finish(IPropertyTree * metadata, unsigned * fileCrc)
     {
@@ -372,6 +381,11 @@ public:
             toXML(metadata, metaXML);
             writeMetadata(metaXML.str(), metaXML.length());
         }
+        if (bloomBuilder.valid())
+        {
+            Owned<const BloomFilter> filter = bloomBuilder.build();
+            writeBloomFilter(*filter, bloomKeyLength);
+        }
         CRC32 headerCrc;
         writeFileHeader(false, &headerCrc);
 
@@ -413,6 +427,17 @@ public:
             if (cmp==0)
                 ++duplicateCount;
         }
+        if (bloomKeyLength)
+        {
+            int cmp = memcmp(keyData, lastBloomKeyData, bloomKeyLength);
+            if (cmp)
+            {
+                memcpy(lastBloomKeyData, keyData, bloomKeyLength);
+                hash64_t hash = rtlHash64Data(bloomKeyLength, keyData, HASH64_INIT);
+                if (!bloomBuilder.add(hash))
+                    bloomKeyLength = 0;
+            }
+        }
         if (!activeNode->add(pos, keyData, recsize, sequence))
         {
             assertex(NULL != activeNode->getLastKeyValue()); // empty and doesn't fit!
@@ -486,15 +511,44 @@ protected:
                 prevNode->setRightSib(node->getFpos());
                 writeNode(prevNode);
             }
-            prevNode.setown(node.getLink());
+            prevNode.setown(node.getClear());
+        }
+        writeNode(prevNode);
+    }
+
+    void writeBloomFilter(const BloomFilter &filter, unsigned bloomKeyLength)
+    {
+        assertex(keyHdr->getHdrStruct()->bloomHead == 0);
+        size32_t size = filter.queryTableSize();
+        if (!size)
+            return;
+        keyHdr->getHdrStruct()->bloomHead = nextPos;
+        keyHdr->getHdrStruct()->bloomKeyLength = bloomKeyLength;
+        keyHdr->getHdrStruct()->bloomTableSize = size;
+        keyHdr->getHdrStruct()->bloomTableHashes = filter.queryNumHashes();
+        const byte *data = filter.queryTable();
+        Owned<CBloomFilterWriteNode> prevNode;
+        while (size)
+        {
+            Owned<CBloomFilterWriteNode> node(new CBloomFilterWriteNode(nextPos, keyHdr));
+            nextPos += keyHdr->getNodeSize();
+            size32_t written = node->set(data, size);
+            assertex(written);
+            if(prevNode)
+            {
+                node->setLeftSib(prevNode->getFpos());
+                prevNode->setRightSib(node->getFpos());
+                writeNode(prevNode);
+            }
+            prevNode.setown(node.getClear());
         }
         writeNode(prevNode);
     }
 };
 
-extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, bool enforceOrder)
+extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, unsigned bloomKeySize, bool enforceOrder)
 {
-    return new CKeyBuilder(_out, flags, rawSize, nodeSize, keyFieldSize, startSequence, enforceOrder);
+    return new CKeyBuilder(_out, flags, rawSize, nodeSize, keyFieldSize, startSequence, bloomKeySize, enforceOrder);
 }
 
 

+ 1 - 1
system/jhtree/keybuild.hpp

@@ -101,7 +101,7 @@ interface IKeyBuilder : public IInterface
     virtual unsigned __int64 getDuplicateCount() = 0;
 };
 
-extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, bool enforceOrder=false);
+extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, unsigned bloomKeyLength, bool enforceOrder);
 
 interface IKeyDesprayer : public IInterface
 {

+ 1 - 1
system/jhtree/keydiff.cpp

@@ -417,7 +417,7 @@ public:
             flags |= HTREE_VARSIZE;
         if(quickCompressed)
             flags |= HTREE_QUICK_COMPRESSED_KEY;
-        keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0)); // MORE - support for sequence other than 0...
+        keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0, keyedsize, false)); // MORE - support for sequence other than 0...
     }
 
     ~CKeyWriter()

+ 1 - 1
system/jlib/jarray.hpp

@@ -27,7 +27,7 @@
 
 typedef size32_t aindex_t;
 
-#define NotFound   (aindex_t)-1
+const aindex_t NotFound = (aindex_t)-1;
 
 /************************************************************************
  *                            Copy Lists                                *

+ 1 - 1
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -177,7 +177,7 @@ public:
         buildUserMetadata(metadata);                
         buildLayoutMetadata(metadata);
         unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
-        builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, !isTlk));
+        builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, isTlk ? 0 : helper->getBloomKeyLength(), !isTlk));
     }