瀏覽代碼

HPCC-19469 BLOOM attribute on filters

Add support for BLOOM([bool ,] fieldlist [, bloomoptions]) attribute on INDEX
and BUILDINDEX.

Also adds PARTITION(fieldlist) attribute on INDEX and BUILDINDEX, and fixes
some issues with DISTRIBUTED(expr) on an index.

Reworks the previous bloom support in HPCC-19431 to apply the filters earlier
- effectively applying to the segmonitor list rather than on each lookup. This
addresses a few concerns with the performance of the prior version.

Supports multiple bloom filters on an index.

Generates a default bloom filter if none specified - means the code will get
much more testing!

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父節點
當前提交
d6138c31fe

+ 1 - 0
ecl/eclcc/eclcc.hpp

@@ -142,6 +142,7 @@ const char * const helpText[] = {
     "?!  -fsubgraphToRegenerate=n Regenerate the ECL for a particular subgraph",
     "?!  -ftimeParser            Add timings for parsing each ECL attribute",
     "?!  -ftimeTransforms        Add timings for internal transforms to the workunit",
+    "?!  -faddDefaultBloom       Generate Bloom filter for first field of index if none specified",
     "",
 };
 

+ 4 - 0
ecl/hql/hqlatoms.cpp

@@ -100,6 +100,7 @@ IAtom * bestAtom;
 IAtom * bitfieldOffsetAtom;
 IAtom * bitmapAtom;
 IAtom * blobAtom;
+IAtom * bloomAtom;
 IAtom * cAtom;
 IAtom * cardinalityAtom;
 IAtom * caseAtom;
@@ -340,6 +341,7 @@ IAtom * prefetchAtom;
 IAtom * preloadAtom;
 IAtom * priorityAtom;
 IAtom * privateAtom;
+IAtom * probabilityAtom;
 IAtom * projectedAtom;
 IAtom * _propAligned_Atom;
 IAtom * _propRecordCount_Atom;
@@ -559,6 +561,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
     MAKEATOM(bitfieldOffset);
     MAKEATOM(bitmap);
     MAKEATOM(blob);
+    MAKEATOM(bloom);
     MAKEATOM(c);
     MAKEATOM(cardinality);
     MAKEATOM(case);
@@ -800,6 +803,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
     MAKEATOM(preload);
     MAKEATOM(priority);
     MAKEATOM(private);
+    MAKEATOM(probability);
     MAKEATOM(projected);
     MAKESYSATOM(propAligned);
     MAKESYSATOM(propRecordCount);

+ 2 - 0
ecl/hql/hqlatoms.hpp

@@ -104,6 +104,7 @@ extern HQL_API IAtom * bestAtom;
 extern HQL_API IAtom * bitfieldOffsetAtom;
 extern HQL_API IAtom * bitmapAtom;
 extern HQL_API IAtom * blobAtom;
+extern HQL_API IAtom * bloomAtom;
 extern HQL_API IAtom * cAtom;
 extern HQL_API IAtom * caseAtom;
 extern HQL_API IAtom * cardinalityAtom;
@@ -345,6 +346,7 @@ extern HQL_API IAtom * prefetchAtom;
 extern HQL_API IAtom * preloadAtom;
 extern HQL_API IAtom * priorityAtom;
 extern HQL_API IAtom * privateAtom;
+extern HQL_API IAtom * probabilityAtom;
 extern HQL_API IAtom * projectedAtom;
 extern HQL_API IAtom * _propAligned_Atom;
 extern HQL_API IAtom * _propRecordCount_Atom;

+ 1 - 0
ecl/hql/hqlerrors.hpp

@@ -434,6 +434,7 @@
 #define ERR_PROBABILITY_RANGE       2400
 #define ERR_EMBEDPROJECT_INVALID    2401
 #define ERR_ASSOCIATED_SIDEEFFECT   2402
+#define ERR_INVALID_PROBABILITY     2403
 
 #define ERR_CPP_COMPILE_ERROR       2999
 

+ 6 - 0
ecl/hql/hqlgram.hpp

@@ -575,6 +575,11 @@ public:
 
     bool extractConstantString(StringBuffer & text, attribute & attr);
 
+    IHqlExpression *processPartitionBloomAttr(IHqlExpression *bloom, IHqlExpression *index, const attribute & errpos);
+    void setIndexScope(IHqlExpression *index);
+    void clearIndexScope();
+    void pushIndexScope();
+
     //Various grammar rule productions.
     void beginAlienType(const attribute & errpos);
     void beginDefineId(IIdAtom * name, ITypeInfo * type);
@@ -921,6 +926,7 @@ protected:
     bool inType;
     Owned<IHqlScope> modScope;
     OwnedHqlExpr dotScope;
+    OwnedHqlExpr indexScope;
     unsigned outerScopeAccessDepth;
     IHqlScope* containerScope;
     IHqlScope* globalScope;

+ 80 - 11
ecl/hql/hqlgram.y

@@ -138,6 +138,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   TOK_BITMAP
   BIG
   BLOB
+  BLOOM
   BNOT
   BUILD
   CARDINALITY
@@ -365,6 +366,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   PRELOAD
   PRIORITY
   PRIVATE
+  PROBABILITY
   PROCESS
   PROJECT
   PROXYADDRESS
@@ -2390,24 +2392,24 @@ actionStmt
                         {
                             $$.setExpr(createValue(no_update, makeVoidType(), $3.getExpr(), $5.getExpr(), $7.getExpr()), $1);
                         }
-    | BUILD '(' startTopFilter ',' ',' thorFilenameOrList optBuildFlags ')' endTopFilter
+    | BUILD '(' startTopFilter startDistributeAttrs ',' ',' thorFilenameOrList optBuildFlags ')' endTopFilter
                         {
-                            $$.setExpr(parser->processIndexBuild($1, $3, NULL, NULL, $6, $7), $1);
+                            $$.setExpr(parser->processIndexBuild($1, $3, NULL, NULL, $7, $8), $1);
                             parser->processUpdateAttr($$);
                         }
-    | BUILD '(' startTopFilter ',' recordDef ',' thorFilenameOrList optBuildFlags ')' endTopFilter
+    | BUILD '(' startTopFilter startDistributeAttrs ',' indexRecordDef ',' thorFilenameOrList optBuildFlags ')' endTopFilter endIndexScope
                         {
-                            $$.setExpr(parser->processIndexBuild($1, $3, &$5, NULL, $7, $8), $1);
+                            $$.setExpr(parser->processIndexBuild($1, $3, &$6, NULL, $8, $9), $1);
                             parser->processUpdateAttr($$);
                         }
-    | BUILD '(' startTopFilter ',' recordDef ',' nullRecordDef ',' thorFilenameOrList optBuildFlags ')' endTopFilter
+    | BUILD '(' startTopFilter startDistributeAttrs ',' indexRecordDef ',' nullRecordDef ',' thorFilenameOrList optBuildFlags ')' endTopFilter endIndexScope
                         {
-                            $$.setExpr(parser->processIndexBuild($1, $3, &$5, &$7, $9, $10), $1);
+                            $$.setExpr(parser->processIndexBuild($1, $3, &$6, &$8, $10, $11), $1);
                             parser->processUpdateAttr($$);
                         }
-    | BUILD '(' startTopFilter optBuildFlags ')' endTopFilter
+    | BUILD '(' startTopFilter startDistributeAttrs optBuildFlags ')' endTopFilter
                         {
-                            $$.setExpr(parser->createBuildIndexFromIndex($3, $4, $5), $1);
+                            $$.setExpr(parser->createBuildIndexFromIndex($3, $5, $6), $1);
                             parser->processUpdateAttr($$);
                         }
     | OUTPUT '(' startTopFilter ',' optRecordDef endTopFilter optOutputFlags ')'
@@ -2926,9 +2928,13 @@ assertFlags
                         }
     ;
 
+indexRecordDef
+    : recordDef         {   parser->setIndexScope($1.queryExpr()); $$.setExpr($1.getExpr()); }
+    ;
+   
 optBuildFlags
-    :                   { $$.setNullExpr(); $$.clearPosition(); }
-    | ',' buildFlags    { $$.setExpr($2.getExpr(), $1); }
+    :                     { $$.setNullExpr(); $$.clearPosition(); }
+    |  ','  buildFlags    { $$.setExpr($2.getExpr(), $1); }
     ;
 
 buildFlags
@@ -2996,6 +3002,8 @@ buildFlag
                             $$.setPosition($1);
                         }
     | expireAttr
+    | bloomAttr
+    | hashedIndexAttr
     | NOROOT            {
                             $$.setExpr(createComma(createAttribute(noRootAtom), createLocalAttribute()));
                             $$.setPosition($1);
@@ -3067,6 +3075,64 @@ localAttribute
                         }
     ;
 
+bloomAttr
+    : BLOOM '(' beginIndexList sortList optBloomFlags ')' endTopFilter
+                        {
+                            HqlExprArray sortItems;
+                            parser->endList(sortItems);
+                            IHqlExpression * bloomList = parser->processSortList($4, no_hash, NULL, sortItems, NULL, NULL);
+                            $$.setExpr(createExprAttribute(bloomAtom, bloomList, $5.getExpr()));
+                            $$.setPosition($1);
+                        }
+    ;
+    
+hashedIndexAttr
+    : PARTITION_ATTR '(' beginIndexList sortList ')' endTopFilter
+                        {
+                            HqlExprArray sortItems;
+                            parser->endList(sortItems);
+                            IHqlExpression * hashList = parser->processSortList($4, no_hash, NULL, sortItems, NULL, NULL);
+                            // MORE - we need to sort the hashList by fieldno, since we are only storing a bitmap...
+                            // Or give an error if they are specified out of order, I suppose.
+                            IHqlExpression *hashFunc = createValue(no_hash64, makeIntType(8, false), LINK(hashList), createAttribute(internalAtom));
+                            OwnedHqlExpr distr = createComma(createAttribute(noRootAtom), createExprAttribute(distributedAtom, hashFunc), createLocalAttribute());
+                            OwnedHqlExpr hash = createExprAttribute(hashAtom, hashList);
+                            $$.setExpr(createComma(distr.getClear(), hash.getClear())); 
+                            $$.setPosition($1);
+                        }
+    ;
+
+beginIndexList
+    : beginList         {
+                            parser->pushIndexScope();
+                            $$.inherit($1);
+                        }     
+    ;    
+
+endIndexScope
+    :                   {   parser->clearIndexScope(); }
+    ;
+    
+optBloomFlags
+    :                   { $$.setNullExpr(); }
+    | ',' bloomFlag optBloomFlags
+                        {
+                            $$.setExpr(createComma($2.getExpr(), $3.getExpr()));
+                            $$.setPosition($3);
+                        }
+    ;
+
+bloomFlag
+    : LIMIT '(' expression ')' { 
+                            parser->normalizeExpression($3, type_int, false);
+                            $$.setExpr(createExprAttribute(limitAtom, $3.getExpr()), $1);
+                        }
+    | PROBABILITY '(' expression ')' { 
+                            parser->normalizeExpression($3, type_real, false);
+                            $$.setExpr(createExprAttribute(probabilityAtom, $3.getExpr()), $1);
+                        }
+    ;
+
 optCommonAttrs
     :                   { $$.setNullExpr(); }
     | ',' commonAttribute optCommonAttrs
@@ -3267,7 +3333,8 @@ datasetFlag
 
 optIndexFlags
     :                   { $$.setNullExpr(); $$.clearPosition(); }
-    | ',' indexFlags    { $$.setExpr($2.getExpr()); $$.setPosition($1); }
+    | startDistributeAttrs ',' indexFlags    
+                        { $$.setExpr($3.getExpr()); $$.setPosition($2); }
     ;
 
 indexFlags
@@ -3339,6 +3406,8 @@ indexFlag
                             parser->normalizeExpression($3, type_numeric, false);
                             $$.setExpr(createExprAttribute(maxLengthAtom, $3.getExpr()), $1);
                         }
+    | bloomAttr
+    | hashedIndexAttr
     | commonAttribute
     ;
 

+ 177 - 8
ecl/hql/hqlgram2.cpp

@@ -1098,7 +1098,7 @@ IHqlExpression * HqlGram::processIndexBuild(const attribute &err, attribute & in
     OwnedHqlExpr dataset = indexAttr.getExpr();
     checkBuildIndexFilenameFlags(dataset, flagsAttr);
 
-    LinkedHqlExpr inputDataset = dataset;
+    LinkedHqlExpr projectedDataset = dataset;
     OwnedHqlExpr flags = flagsAttr.getExpr();
     if (recordAttr)
     {
@@ -1122,8 +1122,8 @@ IHqlExpression * HqlGram::processIndexBuild(const attribute &err, attribute & in
         bool hasFileposition = getBoolAttributeInList(flags, filepositionAtom, true);
         record.setown(checkBuildIndexRecord(record.getClear(), *recordAttr));
         record.setown(checkIndexRecord(record, *recordAttr, flags));
-        inputDataset.setown(createDatasetF(no_selectfields, LINK(dataset), LINK(record), NULL));
-        warnIfRecordPacked(inputDataset, *recordAttr);
+        projectedDataset.setown(createDatasetF(no_selectfields, LINK(dataset), LINK(record), NULL));
+        warnIfRecordPacked(projectedDataset, *recordAttr);
     }
     else
     {
@@ -1131,10 +1131,53 @@ IHqlExpression * HqlGram::processIndexBuild(const attribute &err, attribute & in
     }
 
     HqlExprArray args;
-    args.append(*LINK(inputDataset));
+    args.append(*LINK(projectedDataset));
     args.append(*filenameAttr.getExpr());
     if (flags)
-        flags->unwindList(args, no_comma);
+    {
+        HqlExprArray buildOptions;
+        flags->unwindList(buildOptions, no_comma);
+        OwnedHqlExpr distribution, partition;
+        ForEachItemIn(i, buildOptions)
+        {
+            IHqlExpression & cur = buildOptions.item(i);
+            IAtom * name = cur.queryName();
+            if (name == distributedAtom)
+            {
+                if (distribution)
+                    reportError(ERR_CANNOT_REDEFINE, err, "DISTRIBUTED attribute cannot be specified more than once");
+                if (cur.queryChild(0))
+                    distribution.setown(replaceSelector(&cur, queryActiveTableSelector(), projectedDataset));
+                else
+                    distribution.set(&cur);
+            }
+            else if (name == bloomAtom)
+            {
+                OwnedHqlExpr replaced = replaceSelector(&cur, queryActiveTableSelector(), projectedDataset);
+                OwnedHqlExpr bloom = processPartitionBloomAttr(replaced, projectedDataset, err);
+                ForEachItemIn(idx, args)
+                {
+                    IHqlExpression &arg = args.item(idx);
+                    if (arg.isAttribute() && arg.queryName()==bloomAtom && arg.queryChild(0) == bloom->queryChild(0))
+                        reportError(ERR_CANNOT_REDEFINE, err, "Duplicate BLOOM definition");
+                }
+                args.append(*bloom.getClear());
+            }
+            else if (name == hashAtom)
+            {
+                if (partition)
+                    reportError(ERR_CANNOT_REDEFINE, err, "PARTITION attribute cannot be specified more than once");
+                partition.setown(replaceSelector(&cur, queryActiveTableSelector(), projectedDataset));
+            }
+            else
+                args.append(OLINK(cur));
+        }
+
+        if (distribution)
+            args.append(*distribution.getClear());
+        if (partition)
+            args.append(*processPartitionBloomAttr(partition, projectedDataset, err));
+    }
     saveDiskAccessInformation(err, args);
     checkDistributer(flagsAttr.pos, args);
     return createValue(no_buildindex, makeVoidType(), args);
@@ -7000,7 +7043,7 @@ IHqlExpression * HqlGram::createBuildIndexFromIndex(attribute & indexAttr, attri
         transform.setown(createDefaultAssignTransform(record, sourceDataset->queryNormalizedSelector(), indexAttr));
 
     //need to tag record scope in this case so it generates no_activetable as top selector
-    OwnedHqlExpr distribution;
+    OwnedHqlExpr distribution, hash;
     if (!sourceDataset)
     {
         bool allConstant = true;
@@ -7042,7 +7085,40 @@ IHqlExpression * HqlGram::createBuildIndexFromIndex(attribute & indexAttr, attri
         IHqlExpression & cur = buildOptions.item(i);
         IAtom * name = cur.queryName();
         if (name == distributedAtom)
-            distribution.setown(&cur);
+        {
+            if (!index->hasAttribute(distributedAtom))
+            {
+                if (distribution)
+                    reportError(ERR_CANNOT_REDEFINE, errpos, "DISTRIBUTED attribute cannot be specified more than once");
+                if (cur.queryChild(0))
+                    distribution.setown(replaceSelector(&cur, index, select));
+                else
+                    distribution.set(&cur);
+            }
+        }
+        else if (name == bloomAtom)
+        {
+            if (!index->hasAttribute(bloomAtom))
+            {
+                OwnedHqlExpr bloom = processPartitionBloomAttr(&cur, index, errpos);
+                ForEachItemIn(idx, args)
+                {
+                    IHqlExpression &arg = args.item(idx);
+                    if (arg.isAttribute() && arg.queryName()==bloomAtom && arg.queryChild(0) == bloom->queryChild(0))
+                        reportError(ERR_CANNOT_REDEFINE, errpos, "Duplicate BLOOM definition");
+                }
+                args.append(*bloom.getClear());
+            }
+        }
+        else if (name == hashAtom)
+        {
+            if (!index->hasAttribute(hashAtom))
+            {
+                if (hash)
+                    reportError(ERR_CANNOT_REDEFINE, errpos, "PARTITION attribute cannot be specified more than once");
+                hash.set(&cur);
+            }
+        }
         else if (name == persistAtom)
             args.append(*createAttribute(persistAtom, LINK(index)));        // preserve so changes in representation don't affect crc.
         else
@@ -7056,10 +7132,32 @@ IHqlExpression * HqlGram::createBuildIndexFromIndex(attribute & indexAttr, attri
         if (cur->isAttribute())
         {
             IAtom * name = cur->queryName();
-            if ((name == sort_AllAtom) || (name == sort_KeyedAtom) || (name == fixedAtom) || (name == compressedAtom) || (name == dedupAtom))
+            if ((name == sort_AllAtom) || (name == sort_KeyedAtom) || (name == fixedAtom) || (name == compressedAtom) ||
+                (name == dedupAtom) || (name == probabilityAtom) || (name == limitAtom))
                 args.append(*LINK(cur));
+            else if (name == bloomAtom)
+            {
+                OwnedHqlExpr replaced = replaceSelector(cur, queryActiveTableSelector(), index);
+                OwnedHqlExpr bloom = processPartitionBloomAttr(replaced, index, errpos);
+                // Check for duplicates
+                ForEachItemIn(idx, args)
+                {
+                    IHqlExpression &arg = args.item(idx);
+                    if (arg.isAttribute() && arg.queryName()==bloomAtom && arg.queryChild(0) == bloom->queryChild(0))
+                        reportError(ERR_CANNOT_REDEFINE, errpos, "Duplicate BLOOM definition");
+                }
+                args.append(*bloom.getClear());
+            }
+            else if (name == hashAtom)
+            {
+                if (hash)
+                    reportError(ERR_CANNOT_REDEFINE, errpos, "PARTITION attribute cannot be specified more than once");
+                hash.setown(replaceSelector(cur, queryActiveTableSelector(), index));
+            }
             else if (name == distributedAtom)
             {
+                if (distribution)
+                    reportError(ERR_CANNOT_REDEFINE, errpos, "DISTRIBUTED attribute cannot be specified more than once");
                 args.append(*createAttribute(noRootAtom));
                 if (cur->queryChild(0))
                     distribution.setown(replaceSelector(cur, queryActiveTableSelector(), select));
@@ -7080,12 +7178,81 @@ IHqlExpression * HqlGram::createBuildIndexFromIndex(attribute & indexAttr, attri
         args.append(*LINK(fileposition));
     if (distribution)
         args.append(*distribution.getClear());
+    if (hash)
+        args.append(*processPartitionBloomAttr(hash, index, errpos));
 
     saveDiskAccessInformation(indexAttr, args);
     checkDistributer(flagsAttr.pos, args);
     return createValue(no_buildindex, makeVoidType(), args);
 }
 
+IHqlExpression *HqlGram::processPartitionBloomAttr(IHqlExpression *bloom, IHqlExpression *index, const attribute & errpos)
+{
+    __uint64 fields = 0;
+    IHqlExpression *list = bloom->queryChild(0);
+    assertex(list->getOperator()==no_sortlist);
+    unsigned numKeyed = 0;
+    if (index->hasAttribute(_payload_Atom))
+        numKeyed = numKeyedFields(index);  // NOTE - this may miss reporting an error in cases where there is a trailing filepos that someone tries to put into a bloom filter. I don't care.
+    unsigned lastFieldNum = 0;
+    OwnedHqlExpr enable;
+    const char *errName = (bloom->queryName()==bloomAtom) ? "BLOOM" : "PARTITION";
+    ForEachChild(idx, list)
+    {
+        IHqlExpression * cur = list->queryChild(idx);
+        if (cur->getOperator() != no_select || cur->queryChild(0) != index->queryNormalizedSelector())
+        {
+            if (!idx && cur->isBoolean())
+            {
+                enable.setown(createExprAttribute(activeAtom, LINK(cur)));
+                continue;
+            }
+            else
+            {
+                reportError(ERR_KEYEDINDEXINVALID, errpos, "%s parameter is not a field from the index", errName);
+                break;
+            }
+        }
+        unsigned fieldNum = getFieldNumber(cur->queryChild(0), cur);
+        if (numKeyed && fieldNum >= numKeyed)
+            reportError(ERR_KEYEDINDEXINVALID, errpos, "%s parameter must refer to keyed fields only", errName);
+        if (fieldNum < lastFieldNum)
+            reportError(ERR_KEYEDINDEXINVALID, errpos, "%s fields out of order", errName);
+        if (fields & (((__uint64) 1) << fieldNum))
+            reportError(ERR_KEYEDINDEXINVALID, errpos, "%s field repeated", errName);
+        lastFieldNum = fieldNum;
+        fields |= ((__uint64) 1) << fieldNum;
+    }
+    HqlExprArray args;
+    args.append(*createConstant(fields, makeIntType(8, false)));
+    if (bloom->queryChild(1))
+        bloom->queryChild(1)->unwindList(args, no_comma);
+    if (enable)
+        args.append(*enable.getClear());
+    IHqlExpression *ret = bloom->clone(args);
+    return ret;
+}
+
+void HqlGram::setIndexScope(IHqlExpression *index)
+{
+    assertex(!indexScope);
+    indexScope.set(index);  // NOTE - don't worry about nesting, as a BUILD inside a BUILD is highly unlikely
+}
+
+void HqlGram::clearIndexScope()
+{
+    indexScope.clear();
+}
+
+void HqlGram::pushIndexScope()
+{
+    if (indexScope)
+        pushTopScope(indexScope);
+    else
+        pushTopScope(queryTopScope());
+}
+
+
 bool HqlGram::doCheckValidFieldValue(const attribute &errpos, IHqlExpression *value, IHqlExpression * field)
 {
     if (value->queryTransformExtra())
@@ -10824,6 +10991,7 @@ static void getTokenText(StringBuffer & msg, int token)
     case BETWEEN: msg.append("BETWEEN"); break;
     case BIG: msg.append("BIG_ENDIAN"); break;
     case TOK_BITMAP: msg.append("BITMAP"); break;
+    case BLOOM: msg.append("BLOOM"); break;
     case BLOB: msg.append("BLOB"); break;
     case BNOT: msg.append("BNOT"); break;
     case BUILD: msg.append("BUILD"); break;
@@ -11056,6 +11224,7 @@ static void getTokenText(StringBuffer & msg, int token)
     case PRELOAD: msg.append("PRELOAD"); break;
     case PRIORITY: msg.append("PRIORITY"); break;
     case PRIVATE: msg.append("PRIVATE"); break;
+    case PROBABILITY: msg.append("PROBABILITY"); break;
     case PROCESS: msg.append("PROCESS"); break;
     case PROJECT: msg.append("PROJECT"); break;
     case PULL: msg.append("PULL"); break;

+ 2 - 0
ecl/hql/hqllex.l

@@ -655,6 +655,7 @@ BETWEEN             { RETURNSYM(BETWEEN); }
 BITMAP              { RETURNSYM(TOK_BITMAP); }
 BIG_ENDIAN          { RETURNSYM(BIG); }
 BLOB                { RETURNSYM(BLOB); }
+BLOOM               { RETURNSYM(BLOOM); }
 BNOT                { RETURNSYM(BNOT); }
 BUILD               { RETURNSYM(BUILD); }
 BUILDINDEX          { RETURNSYM(BUILD); }
@@ -873,6 +874,7 @@ PREFETCH            { RETURNSYM(PREFETCH); }
 PRELOAD             { RETURNSYM(PRELOAD); }
 PRIORITY            { RETURNSYM(PRIORITY); }
 PRIVATE             { RETURNSYM(PRIVATE); }
+PROBABILITY         { RETURNSYM(PROBABILITY); }
 PROCESS             { RETURNSYM(PROCESS); }
 PROJECT             { RETURNSYM(PROJECT); }
 PROXYADDRESS        { RETURNSYM(PROXYADDRESS); }

+ 2 - 0
ecl/hql/reservedwords.cpp

@@ -339,6 +339,7 @@ static const char * eclReserved12[] = {//Attributes
     "any",
     "best",
     "bitmap",
+    "bloom",
     "blob",
     "c++",
     "choosen:all",
@@ -380,6 +381,7 @@ static const char * eclReserved12[] = {//Attributes
     "out",
     "outer",
     "packed",
+    "probability",
     "pulled",
     "remote",
     "return",

+ 1 - 0
ecl/hqlcpp/hqlcpp.cpp

@@ -1810,6 +1810,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.useGlobalCompareClass,"useGlobalCompareClass", false),
         DebugOption(options.createValueSets,"createValueSets", false),
         DebugOption(options.implicitKeyedDiskFilter,"implicitKeyedDiskFilter", false),
+        DebugOption(options.addDefaultBloom,"addDefaultBloom", true),
     };
 
     //get options values from workunit

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -795,6 +795,7 @@ struct HqlCppOptions
     bool                useGlobalCompareClass;
     bool                createValueSets;
     bool                implicitKeyedDiskFilter;
+    bool                addDefaultBloom;
 };
 
 //Any information gathered while processing the query should be moved into here, rather than cluttering up the translator class

+ 55 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -10409,7 +10409,6 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutputIndex(BuildCtx & ctx, IH
     if (!hasTLK && !singlePart)           flags.append("|TIWlocal");
     if (expr->hasAttribute(expireAtom))   flags.append("|TIWexpires");
     if (expr->hasAttribute(maxLengthAtom))   flags.append("|TIWmaxlength");
-
     if (compressAttr)
     {
         if (compressAttr->hasAttribute(rowAtom))   flags.append("|TIWrowcompress");
@@ -10428,6 +10427,61 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutputIndex(BuildCtx & ctx, IH
     buildUpdateHelper(instance->createctx, *instance, dataset, updateAttr);
     buildClusterHelper(instance->startctx, expr);
 
+    unsigned blooms = 0;
+    StringBuffer bloomNames;
+    ForEachChild(idx, expr)
+    {
+        IHqlExpression *cur = expr->queryChild(idx);
+        if (cur->isAttribute() && (cur->queryName() == bloomAtom))
+        {
+            VStringBuffer bloom("bloom%d", ++blooms);
+            bloomNames.append(", &").append(bloom);
+            BuildCtx classctx(instance->startctx);
+            IHqlStmt * classStmt = beginNestedClass(classctx, bloom, "CBloomBuilderInfo");
+
+            IHqlExpression * bloomProbabilityAttr = cur->queryAttribute(probabilityAtom);
+            if (bloomProbabilityAttr)
+            {
+                MemberFunction func(*this, classctx, "virtual double getBloomProbability() const override");
+                buildReturn(func.ctx, bloomProbabilityAttr->queryChild(0), doubleType);
+            }
+            IHqlExpression * bloomLimitAttr = cur->queryAttribute(limitAtom);
+            if (bloomLimitAttr)
+            {
+                MemberFunction func(*this, classctx, "virtual unsigned getBloomLimit() const override");
+                buildReturn(func.ctx, bloomLimitAttr->queryChild(0), unsignedType);
+            }
+            IHqlExpression * bloomEnabledAttr = cur->queryAttribute(activeAtom);
+            if (bloomEnabledAttr)
+            {
+                MemberFunction func(*this, classctx, "virtual bool getBloomEnabled() const override");
+                buildReturn(func.ctx, bloomEnabledAttr->queryChild(0));
+            }
+            MemberFunction func(*this, classctx, "virtual __uint64 getBloomFields() const override");
+            buildReturn(func.ctx, cur->queryChild(0));
+
+            endNestedClass(classStmt);
+        }
+    }
+    if (!blooms && options.addDefaultBloom)
+    {
+        bloomNames.append(", &bloomDefault");
+        BuildCtx classctx(instance->startctx);
+        IHqlStmt * classStmt = beginNestedClass(classctx, "bloomDefault", "CBloomBuilderInfo");
+        classctx.addQuoted("virtual __uint64 getBloomFields() const override { return 1; }");
+        endNestedClass(classStmt);
+        blooms++;
+    }
+    instance->classctx.addQuoted(s.clear().appendf("const IBloomBuilderInfo * const bloomInfo [%d] = {", blooms+1).append(bloomNames+1).append(", nullptr };"));
+    instance->classctx.addQuoted(s.clear().append("virtual const IBloomBuilderInfo * const *queryBloomInfo() const override { return bloomInfo; }"));
+
+    IHqlExpression * partitionAttr = expr->queryAttribute(hashAtom);
+    if (partitionAttr)
+    {
+        MemberFunction func(*this, instance->classctx, "virtual __uint64 getPartitionFieldMask() const override");
+        buildReturn(func.ctx, partitionAttr->queryChild(0));
+    }
+
     // virtual unsigned getKeyedSize()
     HqlExprArray fields;
     unwindChildren(fields, record);

+ 31 - 0
ecl/hqlcpp/hqlttcpp.cpp

@@ -13956,6 +13956,10 @@ public:
         }
         switch (expr->getOperator())
         {
+        case no_attr_expr:
+            if (expr->queryName()==bloomAtom)
+                checkBloom(expr);
+            break;
         case no_join:
             checkJoin(expr);
             break;
@@ -13964,6 +13968,7 @@ public:
     }
 
 protected:
+    void checkBloom(IHqlExpression * expr);
     void checkJoin(IHqlExpression * expr);
     void reportError(int errNo, const char * format, ...) __attribute__((format(printf, 3, 4)));
 
@@ -13972,6 +13977,32 @@ protected:
     HqlExprCopyArray locations;
 };
 
+void SemanticErrorChecker::checkBloom(IHqlExpression * bloom)
+{
+    // Possible semantic errors caught here:
+    // 1. Args to active, limit, or probability sub-attrs not constant
+    // 2. Arg to probability out of range
+    ForEachChildFrom(i, bloom, 1)
+    {
+        IHqlExpression * cur = bloom->queryChild(i);
+        const IAtom *name = cur->queryName();
+        if (name==activeAtom)
+            name = bloomAtom;
+        else if (name==probabilityAtom)
+        {
+            IValue *cval = cur->queryChild(0)->queryValue();
+            if (cval)
+            {
+                double rval = cval->getRealValue();
+                if (rval <= 0.01 || rval > 0.3)
+                    reportError(ERR_INVALID_PROBABILITY,"Probability argument is not in permitted range 0.01 to 0.3");
+            }
+        }
+        if (!cur->isConstant())
+            reportError(ERR_KEYEDINDEXINVALID, "Parameter to %s must be constant", str(name));
+    }
+}
+
 void SemanticErrorChecker::checkJoin(IHqlExpression * join)
 {
     IHqlExpression * group = join->queryAttribute(groupAtom);

+ 1 - 1
ecl/hthor/hthor.cpp

@@ -1107,7 +1107,7 @@ void CHThorIndexWriteActivity::execute()
         if (hasTrailingFileposition(helper.queryDiskRecordSize()->queryTypeInfo()))
             keyMaxSize -= sizeof(offset_t);
 
-        Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, keyMaxSize, nodeSize, helper.getKeyedSize(), 0, helper.getBloomKeyLength(), true);
+        Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, keyMaxSize, nodeSize, helper.getKeyedSize(), 0, &helper, true, false);
         class BcWrapper : implements IBlobCreator
         {
             IKeyBuilder *builder;

+ 2 - 0
ecl/hthor/hthorkey.cpp

@@ -411,6 +411,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
     {
         if(localSortKey)
         {
+            // MORE - partition support goes here
             Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
             verifyIndex(tlk);
             for(unsigned idx = 0; idx < num; ++idx)
@@ -605,6 +606,7 @@ bool CHThorIndexReadActivityBase::nextMultiPart()
     {
         if (localSortKey)
         {
+            // MORE - partition key support should go here?
             if (nextPartNumber<(df->numParts()-1))
                 return setCurrentPart(nextPartNumber++);
         }

+ 47 - 0
ecl/regress/bloom1.ecl

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing code generation of the various forms of PARTITION/BLOOM
+
+rec := RECORD
+   string1 a;
+   string3 b;
+   string1 c;
+   unsigned d;
+END;
+
+boolean useBloom := true : stored('useBloom');
+
+d := dataset('fred', rec, FLAT);
+
+buildindex(d, {a, b}, { c },'ix1', overwrite, PARTITION(a,b), BLOOM(b));
+buildindex(d, {a, b => c },'ix2', overwrite, PARTITION(a,b), BLOOM(b));
+buildindex(d, ,'ix3', overwrite, PARTITION(a,b), BLOOM(b));
+
+i1 := index(d, {a,b,c,d}, 'index1');
+i2 := index(d, {a,b,c,d}, 'index2', PARTITION(a,b), BLOOM(c));
+
+buildindex(i1, overwrite, PARTITION(c), BLOOM(a, b));  // NOTE - should use values from BUILD
+buildindex(i2, overwrite, PARTITION(c), BLOOM(a, b));  // NOTE - should use values from INDEX
+
+// Various forms of BLOOM
+
+buildindex(i1, overwrite, BLOOM(a), BLOOM(b), BLOOM(a,b));
+buildindex(i1, overwrite, BLOOM(false, a), BLOOM(true, b));
+buildindex(i1, overwrite, BLOOM(useBloom, a));
+buildindex(i1, overwrite, BLOOM(a, LIMIT(10), PROBABILITY(0.1)), BLOOM(b, LIMIT(12), PROBABILITY(0.2)));
+

+ 36 - 0
ecl/regress/bloom2_err.ecl

@@ -0,0 +1,36 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing code generation of the various forms of PARTITION/BLOOM
+
+rec := RECORD
+   string1 a;
+   string3 b;
+   string1 c;
+   unsigned d;
+END;
+
+d := dataset('fred', rec, FLAT);
+
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(a),bloom(a));  // Must be unique
+
+i1 := index(d, {a,b,c,d}, 'index1', BLOOM(a),bloom(a));
+i2 := index(d, {a,b,c,d}, 'index2');
+i3 := index(d, {a,b,c,d}, 'index3', BLOOM(b));
+buildindex(i1, overwrite); // Should report the index duplicated bloom issue - shame it's not sooner...
+buildindex(i2, overwrite, BLOOM(a),bloom(a));
+buildindex(i3, overwrite, BLOOM(a),bloom(a));  // Won't report an error, because the index definition overrides (not ideal)

+ 32 - 0
ecl/regress/bloom3_err.ecl

@@ -0,0 +1,32 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing code generation of the various forms of PARTITION/BLOOM
+
+rec := RECORD
+   string1 a;
+   string3 b;
+   string1 c;
+   unsigned d;
+END;
+
+boolean enabled := true : stored('enabled');
+
+d := dataset('fred', rec, FLAT);
+
+buildindex(d, {a, b => c },'ix2', overwrite, bloom(a, probability(53)));  // Out of range
+buildindex(d, {a, b => c },'ix2', overwrite, bloom(enabled, a));  // Must be constant

+ 33 - 0
ecl/regress/bloom_err.ecl

@@ -0,0 +1,33 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing code generation of the various forms of PARTITION/BLOOM
+
+rec := RECORD
+   string1 a;
+   string3 b;
+   string1 c;
+   unsigned d;
+END;
+
+d := dataset('fred', rec, FLAT);
+
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(c));  // Must be keyed
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(b,a));  // Must be in order
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(a),bloom(a));  // Must be unique
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(a,a));  // Must not repeat fields
+buildindex(d, {a, b => c },'ix2', overwrite, BLOOM(a,PROBABILITY(10)));  // Probability out of range

+ 3 - 4
roxie/ccd/ccdserver.cpp

@@ -12179,7 +12179,7 @@ public:
             buildUserMetadata(metadata);
             buildLayoutMetadata(metadata);
             unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
-            Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0, helper.getBloomKeyLength(), true);
+            Owned<IKeyBuilder> builder = createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper.getKeyedSize(), 0, &helper, true, false);
             class BcWrapper : implements IBlobCreator
             {
                 IKeyBuilder *builder;
@@ -22875,9 +22875,8 @@ public:
                                         }
                                         else
                                         {
-                                            // MORE - we could check whether there are any matching parts if we wanted.
-                                            // If people are in the habit of sending null values that would be worthwhile
-                                            remote.getMem(0, fileNo, 0);
+                                            unsigned slavePart = tlk->getPartition();  // Returns 0 if no partition info, or filter cannot be partitioned
+                                            remote.getMem(slavePart, fileNo, 0);
                                         }
                                     }
                                     else

+ 17 - 4
rtl/eclrtl/eclhelper_base.cpp

@@ -40,12 +40,25 @@ bool CThorIndexWriteArg::getIndexLayout(size32_t & _retLen, void * & _retData) {
 bool CThorIndexWriteArg::getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) { return false; }
 unsigned CThorIndexWriteArg::getWidth() { return 0; }
 ICompare * CThorIndexWriteArg::queryCompare() { return NULL; }
+const IBloomBuilderInfo * const *CThorIndexWriteArg::queryBloomInfo() const { return nullptr; }
+__uint64 CThorIndexWriteArg::getPartitionFieldMask() const { return 0; }
 
-unsigned CThorIndexWriteArg::getBloomKeyLength()
+//CBloomBuilderInfo
+__uint64 CBloomBuilderInfo::getBloomFields() const
 {
-    // Default to building bloom on the first field...
-    IOutputMetaData *layout = queryDiskRecordSize();
-    return layout->queryRecordAccessor(true).getFixedOffset(1);
+    return 0;
+}
+unsigned CBloomBuilderInfo::getBloomLimit() const
+{
+    return 1000000;
+}
+double CBloomBuilderInfo::getBloomProbability() const
+{
+    return 0.01;
+}
+bool CBloomBuilderInfo::getBloomEnabled() const
+{
+    return true;
 }
 
 //CThorFirstNArg

+ 1 - 1
rtl/eclrtl/eclhelper_dyn.cpp

@@ -225,10 +225,10 @@ public:
             deserializeSet(*filterSet, inrec.getMinRecordSize(), fieldType, filter);
             while (filters.length()<=fieldNum)
             {
-                filters.append(nullptr);
                 unsigned dummyOffset = inrec.getFixedOffset(filters.length());
                 filterOffsets.append(dummyOffset);
                 filterSizes.append(inrec.getFixedOffset(filters.length()+1) - dummyOffset);
+                filters.append(nullptr);
             }
             IStringSet *prev = filters.item(fieldNum);
             if (prev)

+ 19 - 0
rtl/eclrtl/rtlkey.cpp

@@ -76,6 +76,11 @@ public:
         return hash;
     }
 
+    virtual bool getBloomHash(hash64_t &hash) const override
+    {
+        return false;
+    }
+
     virtual bool setOffset(unsigned _offset) override
     {
         offset = _offset;
@@ -404,6 +409,13 @@ public:
 
     virtual bool isSigned() const override { return false; }
     virtual bool isLittleEndian() const override { return false; }
+    virtual bool getBloomHash(hash64_t &hash) const override
+    {
+        if (!val)
+            return false;
+        hash = rtlHash64Data(size, val, hash);
+        return true;
+    }
 
     virtual KeySegmentMonitorSerializeType serializeType() const override { return KSMST_SINGLEKEYSEGMENTMONITOR; }
 };
@@ -533,6 +545,13 @@ public:
         return hash;
     }
 
+    virtual bool getBloomHash(hash64_t &hash) const override
+    {
+        // MORE - I don't know what correct answer is but this is safest!
+        // Perhaps it should return hash of base/overridden as appropriate?
+        return false;
+    }
+
     virtual bool matchesBuffer(const void *keyval) const override
     {
         if (overridden)

+ 1 - 0
rtl/eclrtl/rtlkey.hpp

@@ -110,6 +110,7 @@ public:
     virtual KeySegmentMonitorSerializeType serializeType() const = 0;
     virtual IKeySegmentMonitor *clone() const = 0;
     virtual unsigned numFieldsRequired() const = 0;
+    virtual bool getBloomHash(hash64_t &hash) const = 0;
 
     virtual bool setOffset(unsigned _offset) = 0;  // Used by old record layout translator - to be removed at some point
 };

+ 6 - 1
rtl/eclrtl/rtlrecord.cpp

@@ -557,10 +557,15 @@ bool RtlRecord::excluded(const RtlFieldInfo *field, const byte *row, byte *condi
 
 size_t RtlRecord::getFixedOffset(unsigned field) const
 {
-    assert(whichVariableOffset[field]==0);
+    assert(isFixedOffset(field));
     return fixedOffsets[field];
 }
 
+bool RtlRecord::isFixedOffset(unsigned field) const
+{
+    return (whichVariableOffset[field]==0);
+}
+
 size32_t RtlRecord::getRecordSize(const void *_row) const
 {
     if (numIfBlocks)

+ 1 - 0
rtl/eclrtl/rtlrecord.hpp

@@ -211,6 +211,7 @@ public:
     }
 
     size_t getFixedOffset(unsigned field) const;
+    bool isFixedOffset(unsigned field) const;
     size_t getRecordSize(size_t * variableOffsets) const
     {
         return getOffset(variableOffsets, numFields);

+ 10 - 1
rtl/include/eclhelper.hpp

@@ -1140,6 +1140,14 @@ enum
     TTFfiltered          = 0x0004,
 };
 
+struct IBloomBuilderInfo
+{
+    virtual bool getBloomEnabled() const = 0;
+    virtual __uint64 getBloomFields() const = 0;
+    virtual unsigned getBloomLimit() const = 0;
+    virtual double getBloomProbability() const = 0;
+};
+
 struct IHThorIndexWriteArg : public IHThorArg
 {
     virtual const char * getFileName() = 0;
@@ -1159,7 +1167,8 @@ struct IHThorIndexWriteArg : public IHThorArg
     virtual unsigned getWidth() = 0;                // only guaranteed present if TIWhaswidth defined
     virtual ICompare * queryCompare() = 0;          // only guaranteed present if TIWhaswidth defined
     virtual unsigned getMaxKeySize() = 0;
-    virtual unsigned getBloomKeyLength() = 0;
+    virtual const IBloomBuilderInfo * const *queryBloomInfo() const = 0;
+    virtual __uint64 getPartitionFieldMask() const = 0;
 };
 
 struct IHThorFirstNArg : public IHThorArg

+ 10 - 3
rtl/include/eclhelper_base.hpp

@@ -20,8 +20,6 @@
 
 // Only the base classes for Activity helpers - CHThorArg and derived classes - should be included in this file.
 
-//Don't #include any files here - because sometimes included inside a namespace, and that generates confusion.
-
 /*
 This file contains base class definitions for the different helper classes.  Any common methods are implemented here.
 
@@ -64,7 +62,16 @@ public:
     virtual bool getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) override;
     virtual unsigned getWidth() override;
     virtual ICompare * queryCompare() override;
-    virtual unsigned getBloomKeyLength() override;
+    virtual const IBloomBuilderInfo * const *queryBloomInfo() const override;
+    virtual __uint64 getPartitionFieldMask() const override;
+};
+
+class ECLRTL_API CBloomBuilderInfo : public IBloomBuilderInfo
+{
+    virtual bool getBloomEnabled() const override;
+    virtual __uint64 getBloomFields() const override;
+    virtual unsigned getBloomLimit() const override;
+    virtual double getBloomProbability() const override;
 };
 
 class ECLRTL_API CThorFirstNArg : public CThorArgOf<IHThorFirstNArg>

+ 347 - 11
system/jhtree/bloom.cpp

@@ -17,8 +17,11 @@
 
 #include "platform.h"
 #include "jlib.hpp"
+#include "jset.hpp"
 #include "bloom.hpp"
 #include "math.h"
+#include "eclhelper.hpp"
+#include "rtlrecord.hpp"
 
 BloomFilter::BloomFilter(unsigned _cardinality, double _probability)
 {
@@ -75,14 +78,148 @@ bool BloomFilter::test(hash64_t hash) const
     return true;
 }
 
-BloomBuilder::BloomBuilder(unsigned _maxHashes) : maxHashes(_maxHashes)
+IndexBloomFilter::IndexBloomFilter(unsigned _numHashes, unsigned _tableSize, byte *_table, __uint64 _fields)
+: BloomFilter(_numHashes, _tableSize, _table), fields(_fields)
+{}
+
+int IndexBloomFilter::compare(CInterface *const *_a, CInterface *const *_b)
+{
+    const IndexBloomFilter *a = static_cast<IndexBloomFilter *>(*_a);
+    const IndexBloomFilter *b = static_cast<IndexBloomFilter *>(*_b);
+    return a->fields - b->fields;
+}
+
+bool IndexBloomFilter::reject(const SegMonitorList &segs) const
+{
+    hash64_t hashval = HASH64_INIT;
+    return getBloomHash(fields, segs, hashval) && !test(hashval);
+}
+
+extern bool getBloomHash(__int64 fields, const SegMonitorList &segs, hash64_t &hashval)
+{
+    while (fields)
+    {
+        unsigned f = ffsll(fields)-1;    // extract lowest 1 bit
+        fields &= ~ (((__uint64) 1)<<f); // and clear it
+        IKeySegmentMonitor *seg = segs.item(f);
+        if (seg)
+        {
+            assertex(seg->getFieldIdx() == f);
+            if (!seg->getBloomHash(hashval))
+                return false;
+        }
+    }
+    return true;
+}
+
+class RowHasher : public CInterfaceOf<IRowHasher>
+{
+public:
+    RowHasher(const RtlRecord &_recInfo, __uint64 _fields);
+    virtual hash64_t hash(const byte *row) const override;
+    virtual bool isExact(const SegMonitorList &segs) const override;
+    virtual __uint64 queryFields() const override { return fields; }
+private:
+    const RtlRecord &recInfo;
+    const __uint64 fields;
+};
+
+class SimpleRowHasher : public RowHasher
+{
+public:
+    SimpleRowHasher(const RtlRecord &_recInfo, __uint64 _fields, unsigned _offset, unsigned _length);
+    virtual hash64_t hash(const byte *row) const override;
+private:
+    const unsigned offset;
+    const unsigned length;
+};
+
+RowHasher::RowHasher(const RtlRecord &_recInfo, __uint64 _fields) : recInfo(_recInfo), fields(_fields)
+{
+}
+
+hash64_t RowHasher::hash(const byte *row) const
+{
+    auto lfields = fields;
+    hash64_t hashval = HASH64_INIT;
+    // Assumes fixed size fields for now. Could probably optimize a bit
+    while (lfields)
+    {
+        unsigned f = ffsll(lfields)-1;    // extract lowest 1 bit
+        lfields &= ~ (((__uint64) 1)<<f); // and clear it
+        hashval = rtlHash64Data(recInfo.queryType(f)->getMinSize(), row + recInfo.getFixedOffset(f), hashval);
+    }
+    return hashval;
+}
+
+bool RowHasher::isExact(const SegMonitorList &segs) const
+{
+    auto lfields = fields;
+    // This will need reworking if/when non-fixed-size fields are supported (should actually become easier)
+    while (lfields)
+    {
+        unsigned f = ffsll(lfields)-1;    // extract lowest 1 bit
+        lfields &= ~ (((__uint64) 1)<<f); // and clear it
+        if (!segs.isExact(recInfo.queryType(f)->getMinSize(), recInfo.getFixedOffset(f)))
+            return false;
+    }
+    return true;
+}
+
+SimpleRowHasher::SimpleRowHasher(const RtlRecord &_recInfo, __uint64 _fields, unsigned _offset, unsigned _length)
+: RowHasher(_recInfo, _fields), offset(_offset), length(_length)
+{
+}
+
+hash64_t SimpleRowHasher::hash(const byte *row) const
+{
+    return rtlHash64Data(length, row + offset, HASH64_INIT);
+}
+
+// For cases where we know data is sorted
+
+class jhtree_decl SortedBloomBuilder : public CInterfaceOf<IBloomBuilder>
+{
+public:
+    SortedBloomBuilder(const IBloomBuilderInfo &_helper);
+    SortedBloomBuilder(unsigned _maxHashes, double _probability);
+    virtual const BloomFilter * build() const override;
+    virtual bool add(hash64_t val) override;
+    virtual unsigned queryCount() const override;
+    virtual bool valid() const override;
+
+protected:
+    ArrayOf<hash64_t> hashes;
+    const unsigned maxHashes;
+    hash64_t lastHash = 0;
+    const double probability = 0.0;
+    bool isValid = true;
+};
+
+SortedBloomBuilder::SortedBloomBuilder(const IBloomBuilderInfo &helper)
+: maxHashes(helper.getBloomLimit()),
+  probability(helper.getBloomProbability())
+{
+    if (maxHashes==0 || !helper.getBloomEnabled())
+        isValid = false;
+}
+
+SortedBloomBuilder::SortedBloomBuilder(unsigned _maxHashes, double _probability)
+: maxHashes(_maxHashes),
+  probability(_probability)
+{
+    if (maxHashes==0)
+        isValid = false;
+}
+
+unsigned SortedBloomBuilder::queryCount() const
 {
-    isValid = true;
+    return hashes.length();
 }
 
-bool BloomBuilder::add(hash64_t val)
+bool SortedBloomBuilder::add(hash64_t hash)
 {
-    if (isValid)
+    if (isValid && (hash != lastHash || !hashes.length()))
     {
         if (hashes.length()==maxHashes)
         {
@@ -90,17 +227,18 @@ bool BloomBuilder::add(hash64_t val)
             hashes.kill();
         }
         else
-            hashes.append(val);
+            hashes.append(hash);
+        lastHash = hash;
     }
     return isValid;
 }
 
-bool BloomBuilder::valid() const
+bool SortedBloomBuilder::valid() const
 {
     return isValid && hashes.length();
 }
 
-const BloomFilter * BloomBuilder::build(double probability) const
+const BloomFilter * SortedBloomBuilder::build() const
 {
     if (!valid())
         return nullptr;
@@ -112,22 +250,191 @@ const BloomFilter * BloomBuilder::build(double probability) const
     return b;
 }
 
+// For cases where we do not know data is sorted - use a hash table to store the hashes
+
+class jhtree_decl UnsortedBloomBuilder : public CInterfaceOf<IBloomBuilder>
+{
+public:
+    UnsortedBloomBuilder(const IBloomBuilderInfo &_helper);
+    UnsortedBloomBuilder(unsigned _maxHashes, double _probability);
+    ~UnsortedBloomBuilder();
+    virtual const BloomFilter * build() const override;
+    virtual bool add(hash64_t val) override;
+    virtual unsigned queryCount() const override;
+    virtual bool valid() const override;
+
+protected:
+    hash64_t *hashes = nullptr;
+    const unsigned maxHashes;
+    const unsigned tableSize;
+    unsigned tableCount = 0;
+    const double probability = 0.0;
+};
+
+
+UnsortedBloomBuilder::UnsortedBloomBuilder(const IBloomBuilderInfo &helper)
+: maxHashes(helper.getBloomLimit()),
+  probability(helper.getBloomProbability()),
+  tableSize(((helper.getBloomLimit()*4)/3)+1)
+{
+    if (tableSize && helper.getBloomEnabled())
+    {
+        hashes = (hash64_t *) calloc(sizeof(hash64_t), tableSize);
+    }
+
+}
+
+UnsortedBloomBuilder::UnsortedBloomBuilder(unsigned _maxHashes, double _probability)
+: maxHashes(_maxHashes),
+  probability(_probability),
+  tableSize(((_maxHashes*4)/3)+1)
+{
+    if (tableSize)
+        hashes = (hash64_t *) calloc(sizeof(hash64_t), tableSize);
+}
+
+UnsortedBloomBuilder::~UnsortedBloomBuilder()
+{
+    free(hashes);
+}
+
+unsigned UnsortedBloomBuilder::queryCount() const
+{
+    return tableCount;
+}
+
+bool UnsortedBloomBuilder::add(hash64_t hash)
+{
+    if (hashes)
+    {
+        if (!hash)
+        {
+            // Something that genuinely hashes to zero cannot be handled - so just mark the builder as invalid
+            free(hashes);
+            hashes = nullptr;
+            tableCount = 0;
+        }
+        else
+        {
+            unsigned pos = hash % tableSize;
+            for (;;)
+            {
+                hash64_t val = hashes[pos];
+                if (!val)
+                    break;
+                if (val== hash)
+                    return true;
+                pos++;
+                if (pos == tableSize)
+                    pos = 0;
+            }
+            if (tableCount==maxHashes)
+            {
+                free(hashes);
+                hashes = nullptr;
+                tableCount = 0;
+            }
+            else
+            {
+                hashes[pos] = hash;
+                tableCount++;
+            }
+        }
+    }
+    return hashes != nullptr;
+}
+
+bool UnsortedBloomBuilder::valid() const
+{
+    return tableCount != 0;
+}
+
+const BloomFilter * UnsortedBloomBuilder::build() const
+{
+    if (!valid())
+        return nullptr;
+    BloomFilter *b = new BloomFilter(tableCount, probability);
+    for (unsigned idx = 0; idx < tableSize; idx++)
+    {
+        hash64_t val = hashes[idx];
+        if (val)
+            b->add(val);
+    }
+    return b;
+}
+
+extern jhtree_decl IBloomBuilder *createBloomBuilder(const IBloomBuilderInfo &helper)
+{
+    __uint64 fields = helper.getBloomFields();
+    if (!(fields & (fields+1)))   // only true if all the ones are at the lsb end...
+        return new SortedBloomBuilder(helper);
+    else
+        return new UnsortedBloomBuilder(helper);
+}
+
+extern jhtree_decl IRowHasher *createRowHasher(const RtlRecord &recInfo, __uint64 fields)
+{
+    if (!(fields & (fields-1)))  // Only one bit set
+    {
+        unsigned field = ffsll(fields)-1;
+        if (recInfo.isFixedOffset(field) && recInfo.queryType(field)->isFixedSize())
+           return new SimpleRowHasher(recInfo, fields, recInfo.getFixedOffset(field), recInfo.queryType(field)->getMinSize()); // Specialize to speed up most common case
+    }
+    else if (!(fields & (fields+1)))   // only true if all the ones are at the lsb end...
+    {
+        unsigned lastField = ffsll(fields+1)-2;
+        if (recInfo.isFixedOffset(lastField) && recInfo.queryType(lastField)->isFixedSize())
+           return new SimpleRowHasher(recInfo, fields, 0, recInfo.queryType(lastField)->getMinSize()); // Specialize to speed up another common case - fixed-size block at start
+    }
+    return new RowHasher(recInfo, fields);
+}
+
+
 #ifdef _USE_CPPUNIT
 #include "unittests.hpp"
 
 class BloomTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(BloomTest);
-      CPPUNIT_TEST(testBloom);
+    CPPUNIT_TEST(testSortedBloom);
+    CPPUNIT_TEST(testUnsortedBloom);
+    CPPUNIT_TEST(testFailedSortedBloomBuilder);
+    CPPUNIT_TEST(testFailedUnsortedBloomBuilder);
     CPPUNIT_TEST_SUITE_END();
 
     const unsigned count = 1000000;
-    void testBloom()
+    void testSortedBloom()
+    {
+        SortedBloomBuilder b(count, 0.01);
+        for (unsigned val = 0; val < count; val++)
+        {
+            b.add(rtlHash64Data(sizeof(val), &val, HASH64_INIT));
+            b.add(rtlHash64Data(sizeof(val), &val, HASH64_INIT));
+        }
+        Owned<const BloomFilter> f = b.build();
+        unsigned falsePositives = 0;
+        unsigned falseNegatives = 0;
+        unsigned start = usTick();
+        for (unsigned val = 0; val < count; val++)
+        {
+            if (!f->test(rtlHash64Data(sizeof(val), &val, HASH64_INIT)))
+                falseNegatives++;
+            if (f->test(rtlHash64Data(sizeof(val), &val, HASH64_INIT+1)))
+                falsePositives++;
+        }
+        unsigned end = usTick();
+        ASSERT(falseNegatives==0);
+        DBGLOG("Bloom filter (%d, %d) gave %d false positives (%.02f %%) in %d uSec", f->queryNumHashes(), f->queryTableSize(), falsePositives, (falsePositives * 100.0)/count, end-start);
+    }
+
+    void testUnsortedBloom()
     {
-        BloomBuilder b;
+        UnsortedBloomBuilder b(count, 0.01);
         for (unsigned val = 0; val < count; val++)
             b.add(rtlHash64Data(sizeof(val), &val, HASH64_INIT));
-        Owned<const BloomFilter> f = b.build(0.01);
+        for (unsigned val = 0; val < count; val++)
+            b.add(rtlHash64Data(sizeof(val), &val, HASH64_INIT));
+        Owned<const BloomFilter> f = b.build();
         unsigned falsePositives = 0;
         unsigned falseNegatives = 0;
         unsigned start = usTick();
@@ -143,6 +450,35 @@ class BloomTest : public CppUnit::TestFixture
         DBGLOG("Bloom filter (%d, %d) gave %d false positives (%.02f %%) in %d uSec", f->queryNumHashes(), f->queryTableSize(), falsePositives, (falsePositives * 100.0)/count, end-start);
     }
 
+    void testFailedSortedBloomBuilder()
+    {
+        SortedBloomBuilder b1(0, 0.01);
+        ASSERT(!b1.valid())
+        ASSERT(!b1.add(0))
+        SortedBloomBuilder b2(1, 0.01);
+        ASSERT(b2.add(1))
+        ASSERT(!b2.add(2))
+        SortedBloomBuilder b3(10, 0.01);
+        ASSERT(b3.add(1))
+        ASSERT(b3.add(0))  // ok to add 0 to sorted bloom tables
+        ASSERT(b3.add(2))
+    }
+
+    void testFailedUnsortedBloomBuilder()
+    {
+        UnsortedBloomBuilder b1(0, 0.01);
+        ASSERT(!b1.valid())
+        ASSERT(!b1.add(0))
+        UnsortedBloomBuilder b2(1, 0.01);
+        ASSERT(b2.add(1))
+        ASSERT(!b2.add(2))
+        UnsortedBloomBuilder b3(10, 0.01);
+        ASSERT(b3.add(1))
+        ASSERT(!b3.add(0))   // Not ok to add hash value 0 to unsorted
+        ASSERT(!b3.add(2))
+    }
+
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( BloomTest );

+ 80 - 10
system/jhtree/bloom.hpp

@@ -19,6 +19,7 @@
 #define _BLOOM_INCL
 
 #include "jhtree.hpp"
+#include "eclhelper.hpp"
 
 /**
  *   A BloomFilter object is used to create or test a Bloom filter - this can be used to quickly determine whether a value has been added to the filter,
@@ -99,17 +100,86 @@ protected:
     byte *table;
 };
 
-class jhtree_decl BloomBuilder
+class jhtree_decl IndexBloomFilter : public BloomFilter
 {
 public:
-    BloomBuilder(unsigned _maxHashes = 1000000);
-    const BloomFilter * build(double probability=0.1) const;
-    bool add(hash64_t hash);
-    inline bool add(size32_t len, const void *val) { return add(rtlHash64Data(len, val, HASH64_INIT)); }
-    bool valid() const;
-protected:
-    ArrayOf<hash64_t> hashes;
-    const unsigned maxHashes;
-    bool isValid;
+    /*
+     * Create a bloom filter with field information.
+     *
+     * @param numHashes  Number of hashes to use for each lookup.
+     * @param tableSize  Size (in bytes) of the table
+     * @param table      Bloom table. Note that the BloomFilter object will take ownership of this memory, so it must be allocated on the heap.
+     * @param fields     Bitmap storing the field indices
+     */
+    IndexBloomFilter(unsigned numHashes, unsigned tableSize, byte *table, __uint64 fields);
+    inline __int64 queryFields() const { return fields; }
+    bool reject(const SegMonitorList &segs) const;
+    static int compare(CInterface *const *a, CInterface *const *b);
+private:
+    const __uint64 fields;
+};
+
+/**
+ *   An IBloomBuilder object is used to store and dedup a set of hash values, then build an optimally-sized bloom table from them
+ */
+
+interface IBloomBuilder : public IInterface
+{
+    /*
+     * Add a hash value to the builder
+     *
+     * @return       True if the value was successfully added
+     */
+    virtual bool add(hash64_t hash) = 0;
+    /*
+     * Add a row to the builder. Row will be hashed using the bloom builder's field information
+     *
+     * @return       True if the value was successfully added
+     */
+    virtual bool valid() const = 0;
+    /*
+     * Retrieve bloom filter
+     *
+     * @return       A newly-created filter
+     */
+    virtual const BloomFilter * build() const = 0;
+    /*
+     * Number of unique hashes added
+     *
+     * @return       Count
+     */
+    virtual unsigned queryCount() const = 0;
+};
+
+/**
+ * Create a BloomBuilder object from (compiler-generated) information
+ */
+
+extern jhtree_decl IBloomBuilder *createBloomBuilder(const IBloomBuilderInfo &_helper);
+
+interface IRowHasher : public IInterface
+{
+    virtual hash64_t hash(const byte *row) const = 0;
+    virtual bool isExact(const SegMonitorList &segs) const = 0;
+    virtual __uint64 queryFields() const = 0;
 };
+
+/**
+ * Create a RowHasher object from (compiler-generated) information
+ * @param recInfo  Record metadata information - needs to have a lifetime longer than the created hasher object
+ * @param fields   Bitmap containing field numbers
+ * return          New row hasher object
+ */
+extern jhtree_decl IRowHasher * createRowHasher(const RtlRecord &recInfo, __uint64 _fields);
+
+/**
+ * Retrieve bloom/partition hash corresponding to a supplied filter condition
+ * @param fields   Bitmap containing field numbers
+ * @param segs     Filter to be checked
+ * @param hash     Initial hash value, updated to reflect supplied fields
+ * return          true if the filter is suitable for bloom filtering/partitioning via returned hash value
+ */
+extern jhtree_decl bool getBloomHash(__int64 fields, const SegMonitorList &segs, hash64_t &hashval);
+
+
 #endif

+ 55 - 9
system/jhtree/ctfile.cpp

@@ -81,9 +81,7 @@ inline void SwapBigEndian(KeyHdr &hdr)
     _WINREV(hdr.blobHead);
     _WINREV(hdr.metadataHead);
     _WINREV(hdr.bloomHead);
-    _WINREV(hdr.bloomTableSize);
-    _WINREV(hdr.bloomKeyLength);
-    _WINREV(hdr.bloomTableHashes);
+    _WINREV(hdr.partitionFieldMask);
 }
 
 inline void SwapBigEndian(NodeHdr &hdr)
@@ -456,12 +454,40 @@ CBloomFilterWriteNode::CBloomFilterWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) :
 size32_t CBloomFilterWriteNode::set(const byte * &data, size32_t &size)
 {
     assertex(fpos);
-    unsigned short written = ((size > (maxBytes-sizeof(unsigned short))) ? (maxBytes-sizeof(unsigned short)) : size);
+    unsigned short written;
+    _WINCPYREV2(&written, keyPtr);
+
+    unsigned short writtenThisTime = ((size > (maxBytes-written-sizeof(unsigned short))) ? (maxBytes-written-sizeof(unsigned short)) : size);
+    memcpy(keyPtr+sizeof(unsigned short)+written, data, writtenThisTime);
+    data += writtenThisTime;
+    size -= writtenThisTime;
+    written += writtenThisTime;
+    _WINCPYREV2(keyPtr, &written);
+    return writtenThisTime;
+}
+
+void CBloomFilterWriteNode::put4(unsigned val)
+{
+    assert(sizeof(val)==4);
+    unsigned short written;
+    _WINCPYREV2(&written, keyPtr);
+
+    assertex(written + sizeof(val) + sizeof(unsigned short) <= maxBytes);
+    _WINCPYREV4(keyPtr+sizeof(unsigned short)+written, &val);
+    written += sizeof(val);
+    _WINCPYREV2(keyPtr, &written);
+}
+
+void CBloomFilterWriteNode::put8(__int64 val)
+{
+    assert(sizeof(val)==8);
+    unsigned short written;
+    _WINCPYREV2(&written, keyPtr);
+
+    assertex(written + sizeof(val) + sizeof(unsigned short) <= maxBytes);
+    _WINCPYREV8(keyPtr+sizeof(unsigned short)+written, &val);
+    written += sizeof(val);
     _WINCPYREV2(keyPtr, &written);
-    memcpy(keyPtr+sizeof(unsigned short), data, written);
-    data += written;
-    size -= written;
-    return written;
 }
 
 //=========================================================================================================
@@ -1030,7 +1056,27 @@ void CJHTreeMetadataNode::get(StringBuffer & out)
 
 void CJHTreeBloomTableNode::get(MemoryBuffer & out)
 {
-    out.append(expandedSize, keyBuf);
+    out.append(expandedSize-read, keyBuf + read);
+}
+
+__int64 CJHTreeBloomTableNode::get8()
+{
+    __int64 ret = 0;
+    assert(sizeof(ret)==8);
+    assertex(expandedSize >= read + sizeof(ret));
+    _WINCPYREV8(&ret, keyBuf + read);
+    read += sizeof(ret);
+    return ret;
+}
+
+unsigned CJHTreeBloomTableNode::get4()
+{
+    unsigned ret = 0;
+    assert(sizeof(ret)==4);
+    assertex(expandedSize >= read + sizeof(ret));
+    _WINCPYREV4(&ret, keyBuf + read);
+    read += sizeof(ret);
+    return ret;
 }
 
 

+ 22 - 3
system/jhtree/ctfile.hpp

@@ -97,9 +97,7 @@ struct __declspec(novtable) jhtree_decl KeyHdr
     __int64 blobHead; /* fpos of first blob node f0x */
     __int64 metadataHead; /* fpos of first metadata node f8x */
     __int64 bloomHead; /* fpos of bloom table data, if present 100x */
-    uint32_t bloomTableSize;  /* Size in bytes of bloom table 108x */
-    unsigned short bloomKeyLength; /* Length of bloom keyed fields 11cx */
-    unsigned short bloomTableHashes; /* Number of hashes in bloom table 11ex */
+    __uint64 partitionFieldMask; /* Bitmap indicating partition keyed fields */
 };
 
 //#pragma pack(1)
@@ -155,6 +153,21 @@ public:
     inline static size32_t getSize() { return sizeof(KeyHdr); }
     inline unsigned getNodeSize() { return hdr.nodeSize; }
     inline bool hasSpecialFileposition() const { return true; }
+    __uint64 getPartitionFieldMask()
+    {
+        if (hdr.partitionFieldMask == (__uint64) -1)
+            return 0;
+        else
+            return hdr.partitionFieldMask;
+    }
+    unsigned numPartitions()
+    {
+        if (hdr.ktype & HTREE_TOPLEVEL_KEY)
+            return (unsigned) hdr.nument-1;
+        else
+            return 0;
+    }
+
 };
 
 class jhtree_decl CNodeBase : public CInterface
@@ -271,6 +284,10 @@ public:
     virtual int compareValueAt(const char *src, unsigned int index) const {throwUnexpected();}
     virtual void dump() {throwUnexpected();}
     void get(MemoryBuffer & out);
+    __int64 get8();
+    unsigned get4();
+private:
+    unsigned read = 0;
 };
 
 class jhtree_decl CNodeHeader : public CNodeBase
@@ -341,6 +358,8 @@ class jhtree_decl CBloomFilterWriteNode : public CWriteNodeBase
 public:
     CBloomFilterWriteNode(offset_t _fpos, CKeyHdr *keyHdr);
     size32_t set(const byte * &data, size32_t &size);
+    void put4(unsigned val);
+    void put8(__int64 val);
 };
 
 enum KeyExceptionCodes

+ 95 - 65
system/jhtree/jhtree.cpp

@@ -116,17 +116,21 @@ size32_t SegMonitorList::getSize() const
         return 0;
 }
 
-bool SegMonitorList::isExact(unsigned len) const
+bool SegMonitorList::isExact(unsigned len, unsigned start) const
 {
     ForEachItemIn(idx, segMonitors)
     {
         IKeySegmentMonitor &seg = segMonitors.item(idx);
-        if (seg.getOffset() >= len)
+        unsigned o = seg.getOffset();
+        unsigned s = seg.getSize();
+        if (o+s <= start)
+            continue;
+        if (o >= start+len)
             return true;
         if (!seg.isWellKeyed())
             return false;
     }
-    return false;
+    return true;
 }
 
 void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname)
@@ -327,10 +331,11 @@ protected:
     IContextLogger *ctx;
     SegMonitorList segs;
     IKeyCursor *keyCursor;
+    __uint64 partitionFieldMask = 0;
+    unsigned indexParts = 0;
     char *keyBuffer;
     unsigned keySize;       // size of key record including payload
     unsigned keyedSize;     // size of non-payload part of key
-    unsigned bloomLength = 0;   // size that can be prechecked using bloom filter
     unsigned numsegs;
     bool matched = false;
     bool eof = false;
@@ -540,6 +545,9 @@ public:
             assertex(_key->numParts()==1);
             IKeyIndex *ki = _key->queryPart(0);
             keyCursor = ki->getCursor(ctx);
+            partitionFieldMask = ki->getPartitionFieldMask();
+            indexParts = ki->numPartitions();
+
             keyName.set(ki->queryFileName());
             if (!keyBuffer)
             {
@@ -554,17 +562,26 @@ public:
                     throw e;
                 }
                 keyBuffer = (char *) malloc(keySize);
-                bloomLength = ki->getBloomKeyLength();
             }
             else
             {
                 assertex(keyedSize==ki->keyedSize());
                 assertex(keySize==ki->keySize());
-                assertex(bloomLength==ki->getBloomKeyLength());
             }
         }
     }
 
+    virtual unsigned getPartition() override
+    {
+        if (partitionFieldMask)
+        {
+            hash64_t hash = HASH64_INIT;
+            if (getBloomHash(partitionFieldMask, segs, hash))
+                return (hash % indexParts) + 1;
+        }
+        return 0;
+    }
+
     virtual void setChooseNLimit(unsigned __int64 _rowLimit) override
     {
         // TODO ?
@@ -583,9 +600,12 @@ public:
             if (!crappyHack)
             {
                 matched = false;
-                eof = false;
-                setLow(0);
-                keyCursor->reset();
+                eof = keyCursor->bloomFilterReject(segs);
+                if (!eof)
+                {
+                    setLow(0);
+                    keyCursor->reset();
+                }
             }
         }
     }
@@ -654,7 +674,6 @@ public:
         unsigned lscans = 0;
         while (!eof)
         {
-            bool canMatch = true;
             if (matched)
             {
                 if (!keyCursor->next(keyBuffer))
@@ -663,33 +682,21 @@ public:
             }
             else
             {
-                if (exact && bloomLength && segs.isExact(bloomLength))
-                {
-                    hash64_t hash = rtlHash64Data(bloomLength, keyBuffer, HASH64_INIT);
-                    if (!keyCursor->checkBloomFilter(hash))
-                        canMatch = false;
-                }
-                if (canMatch)
-                {
-                    if (!keyCursor->gtEqual(keyBuffer, keyBuffer, true))
-                        eof = true;
-                    lseeks++;
-                }
+                if (!keyCursor->gtEqual(keyBuffer, keyBuffer, true))
+                    eof = true;
+                lseeks++;
             }
             if (!eof)
             {
                 unsigned i = 0;
-                if (canMatch)
+                matched = true;
+                if (segs.segMonitors.length())
                 {
-                    matched = true;
-                    if (segs.segMonitors.length())
+                    for (; i <= lastSeg; i++)
                     {
-                        for (; i <= lastSeg; i++)
-                        {
-                            matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
-                            if (!matched)
-                                break;
-                        }
+                        matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
+                        if (!matched)
+                            break;
                     }
                 }
                 if (matched)
@@ -1341,7 +1348,7 @@ void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
         }
     }
     rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
-    loadBloomFilter();
+    loadBloomFilters();
 }
 
 CKeyIndex::~CKeyIndex()
@@ -1467,6 +1474,16 @@ bool CKeyIndex::isFullySorted()
     return (keyHdr->getKeyType() & HTREE_FULLSORT_KEY) != 0;
 }
 
+__uint64 CKeyIndex::getPartitionFieldMask()
+{
+    return keyHdr->getPartitionFieldMask();
+}
+unsigned CKeyIndex::numPartitions()
+{
+    return keyHdr->numPartitions();
+}
+
+
 IKeyCursor *CKeyIndex::getCursor(IContextLogger *ctx)
 {
     return new CKeyCursor(*this, ctx);      // MORE - pool them?
@@ -1573,34 +1590,48 @@ offset_t CKeyIndex::queryMetadataHead()
     return ret;
 }
 
-void CKeyIndex::loadBloomFilter()
+void CKeyIndex::loadBloomFilters()
 {
     offset_t bloomAddr = keyHdr->getHdrStruct()->bloomHead;
     if (!bloomAddr || bloomAddr == static_cast<offset_t>(-1))
         return; // indexes created before introduction of bloomfilter would have FFFF... in this space
-    uint32_t bloomTableSize = keyHdr->getHdrStruct()->bloomTableSize;
-    unsigned short bloomTableHashes = keyHdr->getHdrStruct()->bloomTableHashes;
-    MemoryBuffer bloomTable;
-    bloomTable.ensureCapacity(bloomTableSize);
+
     while (bloomAddr)
     {
         Owned<CJHTreeNode> node = loadNode(bloomAddr);
         assertex(node->isBloom());
-        static_cast<CJHTreeBloomTableNode *>(node.get())->get(bloomTable);
-        bloomAddr = node->getRightSib();
+        CJHTreeBloomTableNode &bloomNode = *static_cast<CJHTreeBloomTableNode *>(node.get());
+        bloomAddr = bloomNode.get8();
+        unsigned numHashes = bloomNode.get4();
+        __uint64 fields =  bloomNode.get8();
+        unsigned bloomTableSize = bloomNode.get4();
+        MemoryBuffer bloomTable;
+        bloomTable.ensureCapacity(bloomTableSize);
+        for (;;)
+        {
+            static_cast<CJHTreeBloomTableNode *>(node.get())->get(bloomTable);
+            offset_t next = node->getRightSib();
+            if (!next)
+                break;
+            node.setown(loadNode(next));
+            assertex(node->isBloom());
+        }
+        assertex(bloomTable.length()==bloomTableSize);
+        //DBGLOG("Creating bloomfilter(%d, %d) for fields %" I64F "x",numHashes, bloomTableSize, fields);
+        bloomFilters.append(*new IndexBloomFilter(numHashes, bloomTableSize, (byte *) bloomTable.detach(), fields));
     }
-    assertex(bloomTable.length()==bloomTableSize);
-    bloomFilter.setown(new BloomFilter(bloomTableHashes, bloomTableSize, (byte *) bloomTable.detach()));
+    bloomFilters.sort(IndexBloomFilter::compare);
 }
 
-const BloomFilter * CKeyIndex::queryBloomFilter()
+bool CKeyIndex::bloomFilterReject(const SegMonitorList &segs) const
 {
-    return bloomFilter;
-}
-
-unsigned CKeyIndex::getBloomKeyLength()
-{
-    return keyHdr->getHdrStruct()->bloomKeyLength;
+    ForEachItemIn(idx, bloomFilters)
+    {
+        IndexBloomFilter &filter = bloomFilters.item(idx);
+        if (filter.reject(segs))
+            return true;
+    }
+    return false;
 }
 
 IPropertyTree * CKeyIndex::getMetadata()
@@ -1635,8 +1666,6 @@ CKeyCursor::CKeyCursor(CKeyIndex &_key, IContextLogger *_ctx)
     : key(_key), ctx(_ctx)
 {
     key.Link();
-    bloomFilter = key.queryBloomFilter();
-    bloomLength = key.getBloomKeyLength();
     nodeKey = 0;
 }
 
@@ -1651,16 +1680,6 @@ void CKeyCursor::reset()
     node.clear();
 }
 
-bool CKeyCursor::checkBloomFilter(hash64_t hash)
-{
-    return bloomFilter != nullptr && bloomFilter->test(hash);
-}
-
-unsigned CKeyCursor::getBloomKeyLength()
-{
-    return bloomLength;
-}
-
 CJHTreeNode *CKeyCursor::locateFirstNode()
 {
     CJHTreeNode * n = 0;
@@ -1975,6 +1994,12 @@ void CKeyCursor::releaseBlobs()
     activeBlobs.kill();
 }
 
+bool CKeyCursor::bloomFilterReject(const SegMonitorList &segs) const
+{
+    return key.bloomFilterReject(segs);
+}
+
+
 class CLazyKeyIndex : implements IKeyIndex, public CInterface
 {
     StringAttr keyfile;
@@ -2020,8 +2045,10 @@ public:
     virtual size32_t keySize() { return checkOpen().keySize(); }
     virtual size32_t keyedSize() { return checkOpen().keyedSize(); }
     virtual bool hasPayload() { return checkOpen().hasPayload(); }
-    virtual bool isTopLevelKey() { return checkOpen().isTopLevelKey(); }
-    virtual bool isFullySorted() { return checkOpen().isFullySorted(); }
+    virtual bool isTopLevelKey() override { return checkOpen().isTopLevelKey(); }
+    virtual bool isFullySorted() override { return checkOpen().isFullySorted(); }
+    virtual __uint64 getPartitionFieldMask() { return checkOpen().getPartitionFieldMask(); }
+    virtual unsigned numPartitions() { return checkOpen().numPartitions(); }
     virtual unsigned getFlags() { return checkOpen().getFlags(); }
     virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) { checkOpen().dumpNode(out, pos, count, isRaw); }
     virtual unsigned numParts() { return 1; }
@@ -2034,8 +2061,6 @@ public:
     virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
-    virtual const BloomFilter * queryBloomFilter() { return checkOpen().queryBloomFilter(); }
-    virtual unsigned getBloomKeyLength() { return checkOpen().getBloomKeyLength(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
     virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
     virtual bool hasSpecialFileposition() const { return realKey ? realKey->hasSpecialFileposition() : false; }
@@ -2458,6 +2483,11 @@ public:
         sortFromSeg = 0;
     }
 
+    virtual unsigned getPartition() override
+    {
+        return 0;   // If all keys share partition info (is that required?) then we can do better
+    }
+
     virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
     {
         // Rather like a lookup, except that no records below the value indicated by seek* should be returned.
@@ -3094,7 +3124,7 @@ class IKeyManagerTest : public CppUnit::TestFixture
         Owned<IFileIOStream> out = createIOStream(io);
         unsigned maxRecSize = variable ? 18 : 10;
         unsigned keyedSize = 10;
-        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0, keyedSize, true);
+        Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |  (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0, nullptr, true, false);
 
         char keybuf[18];
         memset(keybuf, '0', 18);

+ 6 - 5
system/jhtree/jhtree.hpp

@@ -30,6 +30,7 @@
 #include "errorlist.h"
 
 class BloomFilter;
+class SegMonitorList;
 
 interface jhtree_decl IDelayedFile : public IInterface
 {
@@ -52,8 +53,7 @@ interface jhtree_decl IKeyCursor : public IInterface
     virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize) = 0;
     virtual void releaseBlobs() = 0;
     virtual void reset() = 0;
-    virtual bool checkBloomFilter(hash64_t hash) = 0;
-    virtual unsigned getBloomKeyLength() = 0;
+    virtual bool bloomFilterReject(const SegMonitorList &segs) const = 0;  // returns true if record cannot possibly match
 };
 
 interface IKeyIndex;
@@ -71,6 +71,8 @@ interface jhtree_decl IKeyIndex : public IKeyIndexBase
     virtual size32_t keySize() = 0;
     virtual bool isFullySorted() = 0;
     virtual bool isTopLevelKey() = 0;
+    virtual __uint64 getPartitionFieldMask() = 0;
+    virtual unsigned numPartitions() = 0;
     virtual unsigned getFlags() = 0;
     virtual void dumpNode(FILE *out, offset_t pos, unsigned rowCount, bool isRaw) = 0;
     virtual unsigned queryScans() = 0;
@@ -83,8 +85,6 @@ interface jhtree_decl IKeyIndex : public IKeyIndexBase
     virtual offset_t queryLatestGetNodeOffset() const = 0;
     virtual offset_t queryMetadataHead() = 0;
     virtual IPropertyTree * getMetadata() = 0;
-    virtual const BloomFilter * queryBloomFilter() = 0;
-    virtual unsigned getBloomKeyLength() = 0;
 
     virtual unsigned getNodeSize() = 0;
     virtual const IFileIO *queryFileIO() const = 0;
@@ -189,7 +189,7 @@ public:
     unsigned lastFullSeg() const;
     bool matched(void *keyBuffer, unsigned &lastMatch) const;
     size32_t getSize() const;
-    bool isExact(unsigned bytes) const;  // Are first N bytes an exact match ?
+    bool isExact(unsigned length, unsigned start) const;  // Are corresponding bytes an exact match ?
     void checkSize(size32_t keyedSize, char const * keyname);
     void recalculateCache();
     void finish(size32_t keyedSize);
@@ -236,6 +236,7 @@ interface IKeyManager : public IInterface, extends IIndexReadContext
     virtual void finishSegmentMonitors() = 0;
 
     virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) = 0;
+    virtual unsigned getPartition() = 0;  // Use PARTITION() to retrieve partno, if possible, or zero to mean read all
 };
 
 inline offset_t extractFpos(IKeyManager * manager)

+ 11 - 10
system/jhtree/jhtree.ipp

@@ -24,6 +24,7 @@
 #include "ctfile.hpp"
 
 #include "jhtree.hpp"
+#include "bloom.hpp"
 
 typedef OwningStringHTMapping<IKeyIndex> CKeyIndexMapping;
 typedef OwningStringSuperHashTableOf<CKeyIndexMapping> CKeyIndexTable;
@@ -77,7 +78,7 @@ protected:
     StringAttr name;
     CriticalSection blobCacheCrit;
     Owned<CJHTreeBlobNode> cachedBlobNode;
-    Owned<BloomFilter> bloomFilter;
+    CIArrayOf<IndexBloomFilter> bloomFilters;
     offset_t cachedBlobNodePos;
 
     CKeyHdr *keyHdr;
@@ -96,7 +97,7 @@ protected:
     ~CKeyIndex();
     void init(KeyHdr &hdr, bool isTLK, bool allowPreload);
     void cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK);
-    void loadBloomFilter();
+    void loadBloomFilters();
     
 public:
     IMPLEMENT_IINTERFACE;
@@ -108,8 +109,10 @@ public:
     virtual size32_t keySize();
     virtual bool hasPayload();
     virtual size32_t keyedSize();
-    virtual bool isTopLevelKey();
-    virtual bool isFullySorted();
+    virtual bool isTopLevelKey() override;
+    virtual bool isFullySorted() override;
+    virtual __uint64 getPartitionFieldMask() override;
+    virtual unsigned numPartitions() override;
     virtual unsigned getFlags() { return (unsigned char)keyHdr->getKeyType(); };
 
     virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw);
@@ -124,8 +127,9 @@ public:
     virtual offset_t queryLatestGetNodeOffset() const { return latestGetNodeOffset; }
     virtual offset_t queryMetadataHead();
     virtual IPropertyTree * getMetadata();
-    virtual const BloomFilter * queryBloomFilter();
-    virtual unsigned getBloomKeyLength();
+
+    bool bloomFilterReject(const SegMonitorList &segs) const;
+
     virtual unsigned getNodeSize() { return keyHdr->getNodeSize(); }
     virtual bool hasSpecialFileposition() const;
  
@@ -166,8 +170,6 @@ class jhtree_decl CKeyCursor : public IKeyCursor, public CInterface
 private:
     IContextLogger *ctx;
     CKeyIndex &key;
-    const BloomFilter *bloomFilter = nullptr;
-    unsigned bloomLength = 0;
     Owned<CJHTreeNode> node;
     unsigned int nodeKey;
     ConstPointerArray activeBlobs;
@@ -194,8 +196,7 @@ public:
     virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize);
     virtual void releaseBlobs();
     virtual void reset();
-    virtual bool checkBloomFilter(hash64_t hash);
-    virtual unsigned getBloomKeyLength();
+    virtual bool bloomFilterReject(const SegMonitorList &segs) const override;  // returns true if record cannot possibly match
 };
 
 

+ 55 - 32
system/jhtree/keybuild.cpp

@@ -16,6 +16,7 @@
 ############################################################################## */
 
 #include "keybuild.hpp"
+#include "eclhelper.hpp"
 #include "bloom.hpp"
 #include "jmisc.hpp"
 
@@ -337,27 +338,39 @@ private:
     CWriteNode *activeNode;
     CBlobWriteNode *activeBlobNode;
     unsigned __int64 duplicateCount;
-    unsigned bloomKeyLength = 0;
-    BloomBuilder bloomBuilder;
-    byte *lastBloomKeyData = nullptr;
+    __uint64 partitionFieldMask = 0;
+    IArrayOf<IBloomBuilder> bloomBuilders;
+    IArrayOf<IRowHasher> rowHashers;
     bool enforceOrder = true;
+    bool isTLK = false;
 
 public:
     IMPLEMENT_IINTERFACE;
 
-    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence, unsigned _bloomKeyLength, bool _enforceOrder)
-        : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence), bloomKeyLength(_bloomKeyLength), enforceOrder(_enforceOrder)
+    CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence,  IHThorIndexWriteArg *_helper, bool _enforceOrder, bool _isTLK)
+        : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence),
+          enforceOrder(_enforceOrder),
+          isTLK(_isTLK)
     {
         doCrc = true;
         activeNode = NULL;
         activeBlobNode = NULL;
         duplicateCount = 0;
-        if (bloomKeyLength)
-            lastBloomKeyData = (byte *) calloc(bloomKeyLength, 1);
-    }
-    ~CKeyBuilder()
-    {
-        free(lastBloomKeyData);
+        if (_helper)
+        {
+            partitionFieldMask = _helper->getPartitionFieldMask();
+            auto bloomInfo =_helper->queryBloomInfo();
+            if (bloomInfo)
+            {
+                const RtlRecord &recinfo = _helper->queryDiskRecordSize()->queryRecordAccessor(true);
+                while (*bloomInfo)
+                {
+                    bloomBuilders.append(*createBloomBuilder(*bloomInfo[0]));
+                    rowHashers.append(*createRowHasher(recinfo, bloomInfo[0]->getBloomFields()));
+                    bloomInfo++;
+                }
+            }
+        }
     }
 public:
     void finish(IPropertyTree * metadata, unsigned * fileCrc)
@@ -381,11 +394,16 @@ public:
             toXML(metadata, metaXML);
             writeMetadata(metaXML.str(), metaXML.length());
         }
-        if (bloomBuilder.valid())
+        ForEachItemIn(idx, bloomBuilders)
         {
-            Owned<const BloomFilter> filter = bloomBuilder.build();
-            writeBloomFilter(*filter, bloomKeyLength);
+            IBloomBuilder &bloomBuilder = bloomBuilders.item(idx);
+            if (bloomBuilder.valid())
+            {
+                Owned<const BloomFilter> filter = bloomBuilder.build();
+                writeBloomFilter(*filter, rowHashers.item(idx).queryFields());
+            }
         }
+        keyHdr->getHdrStruct()->partitionFieldMask = partitionFieldMask;
         CRC32 headerCrc;
         writeFileHeader(false, &headerCrc);
 
@@ -414,12 +432,10 @@ public:
     void processKeyData(const char *keyData, offset_t pos, size32_t recsize)
     {
         records++;
-        bool firstRow = false;
         if (NULL == activeNode)
         {
             activeNode = new CWriteNode(nextPos, keyHdr, true);
             nextPos += keyHdr->getNodeSize();
-            firstRow = true;
         }
         else if (enforceOrder) // NB: order is indeterminate when build a TLK for a LOCAL index. duplicateCount is not calculated in this case.
         {
@@ -429,15 +445,17 @@ public:
             if (cmp==0)
                 ++duplicateCount;
         }
-        if (bloomKeyLength)
+        if (!isTLK)
         {
-            int cmp = memcmp(keyData, lastBloomKeyData, bloomKeyLength);
-            if (firstRow || cmp)
+            ForEachItemInRev(idx, bloomBuilders)
             {
-                memcpy(lastBloomKeyData, keyData, bloomKeyLength);
-                hash64_t hash = rtlHash64Data(bloomKeyLength, keyData, HASH64_INIT);
-                if (!bloomBuilder.add(hash))
-                    bloomKeyLength = 0;
+                IBloomBuilder &bloomBuilder = bloomBuilders.item(idx);
+                IRowHasher &hasher = rowHashers.item(idx);
+                if (!bloomBuilder.add(hasher.hash((const byte *) keyData)))
+                {
+                    bloomBuilders.remove(idx);
+                    rowHashers.remove(idx);
+                }
             }
         }
         if (!activeNode->add(pos, keyData, recsize, sequence))
@@ -518,21 +536,23 @@ protected:
         writeNode(prevNode);
     }
 
-    void writeBloomFilter(const BloomFilter &filter, unsigned bloomKeyLength)
+    void writeBloomFilter(const BloomFilter &filter, __uint64 fields)
     {
-        assertex(keyHdr->getHdrStruct()->bloomHead == 0);
         size32_t size = filter.queryTableSize();
         if (!size)
             return;
+        auto prevBloom = keyHdr->getHdrStruct()->bloomHead;
         keyHdr->getHdrStruct()->bloomHead = nextPos;
-        keyHdr->getHdrStruct()->bloomKeyLength = bloomKeyLength;
-        keyHdr->getHdrStruct()->bloomTableSize = size;
-        keyHdr->getHdrStruct()->bloomTableHashes = filter.queryNumHashes();
-        const byte *data = filter.queryTable();
         Owned<CBloomFilterWriteNode> prevNode;
+        Owned<CBloomFilterWriteNode> node(new CBloomFilterWriteNode(nextPos, keyHdr));
+        // Table info is serialized into first page. Note that we assume that it fits (would need to have a crazy-small page size for that to not be true)
+        node->put8(prevBloom);
+        node->put4(filter.queryNumHashes());
+        node->put8(fields);
+        node->put4(size);
+        const byte *data = filter.queryTable();
         while (size)
         {
-            Owned<CBloomFilterWriteNode> node(new CBloomFilterWriteNode(nextPos, keyHdr));
             nextPos += keyHdr->getNodeSize();
             size32_t written = node->set(data, size);
             assertex(written);
@@ -543,14 +563,17 @@ protected:
                 writeNode(prevNode);
             }
             prevNode.setown(node.getClear());
+            if (!size)
+                break;
+            node.setown(new CBloomFilterWriteNode(nextPos, keyHdr));
         }
         writeNode(prevNode);
     }
 };
 
-extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, unsigned bloomKeySize, bool enforceOrder)
+extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, IHThorIndexWriteArg *helper, bool enforceOrder, bool isTLK)
 {
-    return new CKeyBuilder(_out, flags, rawSize, nodeSize, keyFieldSize, startSequence, bloomKeySize, enforceOrder);
+    return new CKeyBuilder(_out, flags, rawSize, nodeSize, keyFieldSize, startSequence, helper, enforceOrder, isTLK);
 }
 
 

+ 2 - 1
system/jhtree/keybuild.hpp

@@ -19,6 +19,7 @@
 #define KEYBUILD_HPP
 
 #include "ctfile.hpp"
+#include "eclhelper.hpp"
 
 class CNodeInfo : implements serializable, public CInterface
 {
@@ -101,7 +102,7 @@ interface IKeyBuilder : public IInterface
     virtual unsigned __int64 getDuplicateCount() = 0;
 };
 
-extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, unsigned bloomKeyLength, bool enforceOrder);
+extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, IHThorIndexWriteArg *helper, bool enforceOrder, bool isTLK);
 
 interface IKeyDesprayer : public IInterface
 {

+ 1 - 1
system/jhtree/keydiff.cpp

@@ -417,7 +417,7 @@ public:
             flags |= HTREE_VARSIZE;
         if(quickCompressed)
             flags |= HTREE_QUICK_COMPRESSED_KEY;
-        keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0, keyedsize, false)); // MORE - support for sequence other than 0...
+        keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0, nullptr, false, false)); // MORE - support for sequence other than 0...
     }
 
     ~CKeyWriter()

+ 13 - 0
system/jlib/jset.hpp

@@ -126,6 +126,19 @@ inline unsigned getMostSignificantBit(unsigned value)
 #endif
 }
 
+#if defined (_WIN32)
+// These are standard in glibc
+inline int ffsll(__uint64 i)
+{
+    return i ? countTrailingUnsetBits(i) + 1 : 0;
+}
+
+inline int ffs(unsigned i)
+{
+    return i ? countTrailingUnsetBits(i) + 1 : 0;
+}
+#endif
+
 interface jlib_decl IBitSet : public IInterface 
 {
     virtual void set(unsigned n,bool val=true)      = 0;

+ 1 - 1
testing/regress/ecl/setup/setup.ecl

@@ -123,7 +123,7 @@ IF (createMultiPart,
         buildindex(LocalFiles.DG_NormalIndexFileEvens,overwrite,NOROOT,SET('_nodeSize', 512)),
         buildindex(LocalFiles.DG_TransIndexFile,overwrite,NOROOT),
         buildindex(LocalFiles.DG_TransIndexFileEvens,overwrite,NOROOT),
-        buildindex(LocalFiles.DG_NormalVarIndex, overwrite,NOROOT);
+        buildindex(LocalFiles.DG_NormalVarIndex, overwrite,NOROOT,PARTITION(DG_firstname));
         buildindex(LocalFiles.DG_TransVarIndex, overwrite,NOROOT);
    )
 );

+ 4 - 0
thorlcr/activities/indexwrite/thindexwrite.cpp

@@ -259,7 +259,11 @@ public:
             }
             props.setPropInt("@formatCrc", helper->getFormatCrc());
             if (isLocal)
+            {
                 props.setPropBool("@local", true);
+                if (helper->getPartitionFieldMask())
+                    props.setPropInt64("@partitionFieldMask", helper->getPartitionFieldMask());
+            }
             container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
             if (!dlfn.isExternal())
                 queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc);

+ 3 - 1
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -174,10 +174,12 @@ public:
             flags |= HTREE_FULLSORT_KEY;
         if (isVariable)
             flags |= HTREE_VARSIZE;
+        if (isTlk)
+            flags |= HTREE_TOPLEVEL_KEY;
         buildUserMetadata(metadata);                
         buildLayoutMetadata(metadata);
         unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
-        builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, isTlk ? 0 : helper->getBloomKeyLength(), !isTlk));
+        builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, helper, !isTlk, isTlk));
     }