Forráskód Böngészése

HPCC-19070 Add support to base classes for substring matches on fields.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 7 éve
szülő
commit
eb5dc33848

+ 1 - 75
rtl/eclrtl/rtldynfield.cpp

@@ -1165,7 +1165,7 @@ private:
                         break;
                     }
                     case match_typecast:
-                        offset = translateScalar(builder, offset, field, type, sourceType, source);
+                        offset = translateScalar(builder, offset, field, *type, *sourceType, source);
                         break;
                     case match_link:
                     {
@@ -1295,80 +1295,6 @@ private:
         }
     } *matchInfo;
 
-    static size32_t translateScalar(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, const RtlTypeInfo *destType, const RtlTypeInfo *sourceType, const byte *source)
-    {
-        // This code COULD move into rtlfield.cpp?
-        switch(destType->getType())
-        {
-        case type_boolean:
-        case type_int:
-        case type_swapint:
-        case type_packedint:
-        case type_filepos:
-            offset = destType->buildInt(builder, offset, field, sourceType->getInt(source));
-            break;
-        case type_real:
-            offset = destType->buildReal(builder, offset, field, sourceType->getReal(source));
-            break;
-        case type_decimal:  // Go via string - not common enough to special-case
-        case type_data:
-        case type_string:
-        case type_varstring:
-        case type_qstring:
-        {
-            size32_t size;
-            rtlDataAttr text;
-            sourceType->getString(size, text.refstr(), source);
-            offset = destType->buildString(builder, offset, field, size, text.getstr());
-            break;
-        }
-        case type_unicode:
-        case type_varunicode:
-        case type_utf8:
-        {
-            size32_t utf8chars;
-            rtlDataAttr utf8Text;
-            sourceType->getUtf8(utf8chars, utf8Text.refstr(), source);
-            offset = destType->buildUtf8(builder, offset, field, utf8chars, utf8Text.getstr());
-            break;
-        }
-        case type_set:
-        {
-            bool isAll = *(bool *) source;
-            source+= sizeof(bool);
-            byte *dest = builder.ensureCapacity(offset+sizeof(bool)+sizeof(size32_t), field->name)+offset;
-            *(size32_t *) (dest + sizeof(bool)) = 0; // Patch later when size known
-            offset += sizeof(bool) + sizeof(size32_t);
-            if (isAll)
-            {
-                *(bool*) dest = true;
-            }
-            else
-            {
-                *(bool*) dest = false;
-                size32_t sizeOffset = offset - sizeof(size32_t);  // Where we need to patch
-                size32_t childSize = *(size32_t *)source;
-                source += sizeof(size32_t);
-                const byte *initialSource = source;
-                size32_t initialOffset = offset;
-                const RtlTypeInfo *destChildType = destType->queryChildType();
-                const RtlTypeInfo *sourceChildType = sourceType->queryChildType();
-                while ((size_t)(source - initialSource) < childSize)
-                {
-                    offset = translateScalar(builder, offset, field, destChildType, sourceChildType, source);
-                    source += sourceChildType->size(source, nullptr); // MORE - shame to repeat a calculation that the translate above almost certainly just did
-                }
-                dest = builder.getSelf() + sizeOffset;  // Note - man have been moved by reallocs since last calculated
-                *(size32_t *)dest = offset - initialOffset;
-            }
-            break;
-        }
-        default:
-            throwUnexpected();
-        }
-        return offset;
-    }
-
     size32_t estimateNewSize(const RtlRow &sourceRow) const
     {
         //DBGLOG("Source record size is %d", (int) sourceRow.getRecordSize());

+ 93 - 0
rtl/eclrtl/rtlfield.cpp

@@ -4213,6 +4213,99 @@ unsigned ECLRTL_API countFields(const RtlFieldInfo * const * fields)
     return cnt;
 }
 
+//-------------------------------------------------------------------------------------------------------------------
+
+static const unsigned sameTypeMask = RFTMkind|RFTMebcdic;
+size32_t translateScalar(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, const RtlTypeInfo &destType, const RtlTypeInfo &sourceType, const byte *source)
+{
+    switch(destType.getType())
+    {
+    case type_boolean:
+    case type_int:
+    case type_swapint:
+    case type_packedint:
+    case type_filepos:
+        offset = destType.buildInt(builder, offset, field, sourceType.getInt(source));
+        break;
+    case type_real:
+        offset = destType.buildReal(builder, offset, field, sourceType.getReal(source));
+        break;
+    case type_data:
+    case type_string:
+        //Special case if source and destination types are identical to avoid cloning strings
+        if ((destType.fieldType & sameTypeMask) == (sourceType.fieldType & sameTypeMask))
+        {
+            size32_t length = sourceType.length;
+            if (!sourceType.isFixedSize())
+            {
+                length = rtlReadSize32t(source);
+                source += sizeof(size32_t);
+            }
+            offset = destType.buildString(builder, offset, field, length, (const char *)source);
+            break;
+        }
+        //fallthrough
+    case type_decimal:  // Go via string - not common enough to special-case
+    case type_varstring:
+    case type_qstring:
+    {
+        size32_t size;
+        rtlDataAttr text;
+        sourceType.getString(size, text.refstr(), source);
+        offset = destType.buildString(builder, offset, field, size, text.getstr());
+        break;
+    }
+    case type_utf8:
+        //MORE: Could special case casting from utf8 to utf8 similar to strings above
+    case type_unicode:
+    case type_varunicode:
+    {
+        size32_t utf8chars;
+        rtlDataAttr utf8Text;
+        sourceType.getUtf8(utf8chars, utf8Text.refstr(), source);
+        offset = destType.buildUtf8(builder, offset, field, utf8chars, utf8Text.getstr());
+        break;
+    }
+    case type_set:
+    {
+        bool isAll = *(bool *) source;
+        source+= sizeof(bool);
+        byte *dest = builder.ensureCapacity(offset+sizeof(bool)+sizeof(size32_t), field->name)+offset;
+        *(size32_t *) (dest + sizeof(bool)) = 0; // Patch later when size known
+        offset += sizeof(bool) + sizeof(size32_t);
+        if (isAll)
+        {
+            *(bool*) dest = true;
+        }
+        else
+        {
+            *(bool*) dest = false;
+            size32_t sizeOffset = offset - sizeof(size32_t);  // Where we need to patch
+            size32_t childSize = *(size32_t *)source;
+            source += sizeof(size32_t);
+            const byte *initialSource = source;
+            size32_t initialOffset = offset;
+            const RtlTypeInfo *destChildType = destType.queryChildType();
+            const RtlTypeInfo *sourceChildType = sourceType.queryChildType();
+            while ((size_t)(source - initialSource) < childSize)
+            {
+                offset = translateScalar(builder, offset, field, *destChildType, *sourceChildType, source);
+                source += sourceChildType->size(source, nullptr); // MORE - shame to repeat a calculation that the translate above almost certainly just did
+            }
+            dest = builder.getSelf() + sizeOffset;  // Note - man have been moved by reallocs since last calculated
+            *(size32_t *)dest = offset - initialOffset;
+        }
+        break;
+    }
+    default:
+        throwUnexpected();
+    }
+    return offset;
+}
+
+//-------------------------------------------------------------------------------------------------------------------
+
+
 /* 
 
 Stack:

+ 2 - 0
rtl/eclrtl/rtlfield.hpp

@@ -738,4 +738,6 @@ extern int ECLRTL_API compareFields(const RtlFieldInfo * const * cur, const byte
 extern unsigned ECLRTL_API hashFields(const RtlFieldInfo * const * cur, const byte *self, unsigned inhash, bool excludePayload = false);
 extern bool ECLRTL_API hasTrailingFileposition(const RtlFieldInfo * const * fields);
 extern bool ECLRTL_API hasTrailingFileposition(const RtlTypeInfo * type);
+extern size32_t translateScalar(ARowBuilder &builder, size32_t offset, const RtlFieldInfo *field, const RtlTypeInfo &destType, const RtlTypeInfo &sourceType, const byte *source);
+
 #endif

+ 1 - 0
rtl/eclrtl/rtlkey.hpp

@@ -216,6 +216,7 @@ interface IValueSet : public IInterface
     virtual StringBuffer & serialize(StringBuffer & out) const = 0;
     virtual MemoryBuffer & serialize(MemoryBuffer & out) const = 0;
     virtual bool equals(const IValueSet & _other) const = 0;
+    virtual IValueSet * cast(const RtlTypeInfo & newType) const = 0;
 
 //following are primarily for use from the code generator
     virtual IValueTransition * createRawTransition(TransitionMask mask, const void * value) const = 0;

+ 379 - 17
rtl/eclrtl/rtlnewkey.cpp

@@ -23,6 +23,8 @@
 #include "rtlrecord.hpp"
 #include "rtlkey.hpp"
 #include "rtlnewkey.hpp"
+#include "rtlfield.hpp"
+#include "rtldynfield.hpp"
 
 /*
  * Read a single quoted string from in until the terminating quote is found
@@ -144,6 +146,13 @@ void deserializeSet(IValueSet & set, const char * filter)
     deserializeSet(creator, filter);
 }
 
+
+void getStretchedValue(MemoryBuffer & target, const RtlTypeInfo & newType, const RtlTypeInfo & oldType, const byte * value)
+{
+    MemoryBufferBuilder builder(target, 0);
+    translateScalar(builder, 0, nullptr, newType, oldType, value);
+}
+
 //---------------------------------------------------------------------------------------------------------------------
 
 /*
@@ -344,6 +353,30 @@ public:
         return modifyTransition(type, newMask);
     }
 
+    ValueTransition * cast(const RtlTypeInfo & newType, const RtlTypeInfo & oldType)
+    {
+        if (!value)
+            return LINK(this);
+
+        MemoryBuffer resized;
+        getStretchedValue(resized, newType, oldType, value);
+
+        MemoryBuffer recast;
+        getStretchedValue(recast, oldType, newType, resized.bytes());
+
+        TransitionMask newMask = mask;
+        if (oldType.compare(value, recast.bytes()) != 0)
+        {
+            // x >= 'ab' becomes x > 'a' => clear the equal item
+            // x < 'ab' becomes x <= 'a' => set the equal item
+            if (newMask & CMPlt)
+                newMask &= ~CMPeq;
+            else if (newMask & CMPgt)
+                newMask |= CMPeq;
+        }
+        return new ValueTransition(newMask, newType, resized.toByteArray());
+    }
+
     bool isLowerBound() const { return (mask & CMPgt) != 0; }
     bool isUpperBound() const { return (mask & CMPlt) != 0; }
     bool isInclusiveBound() const { return (mask & CMPeq) != 0; }
@@ -806,6 +839,19 @@ public:
         return true;
     }
 
+    virtual IValueSet * cast(const RtlTypeInfo & newType) const
+    {
+        Owned<IValueSet> newSet = new ValueSet(newType);
+        unsigned max = transitions.ordinality();
+        for (unsigned i=0; i < max; i += 2)
+        {
+            Owned<IValueTransition> newLower = transitions.item(i).cast(newType, type);
+            Owned<IValueTransition> newUpper = transitions.item(i+1).cast(newType, type);
+            newSet->addRange(newLower, newUpper);
+        }
+        return newSet.getClear();
+    }
+
 // Methods for using a value set
     virtual bool isWild() const override;
     virtual unsigned numRanges() const override;
@@ -1111,6 +1157,7 @@ class SetFieldFilter : public FieldFilter
 {
 public:
     SetFieldFilter(unsigned _field, IValueSet * _values) : FieldFilter(_field, _values->queryType()), values(_values) {}
+    SetFieldFilter(unsigned _field, const RtlTypeInfo & fieldType, IValueSet * _values) : FieldFilter(_field, fieldType), values(_values) {}
 
 //Simple row matching
     virtual bool matches(const RtlRow & row) const override;
@@ -1208,6 +1255,205 @@ IFieldFilter * createFieldFilter(unsigned fieldId, const RtlTypeInfo & type, con
 //---------------------------------------------------------------------------------------------------------------------
 
 /*
+ * A link counted version of an RtlTypeInfo
+ */
+class SharedRtlTypeInfo : public CInterface
+{
+public:
+    SharedRtlTypeInfo(const RtlTypeInfo * _type) : type(_type) {}
+    ~SharedRtlTypeInfo() { type->doDelete(); }
+
+    const RtlTypeInfo * const type;
+};
+
+/*
+ * A field filter that can match a set of values.
+ */
+class SubStringFieldFilter : public SetFieldFilter
+{
+public:
+    SubStringFieldFilter(unsigned _field, const RtlTypeInfo & _fieldType, SharedRtlTypeInfo * _subType, IValueSet * _values)
+    : SetFieldFilter(_field, _fieldType, _values), subType(_subType)
+    {
+        subLength = subType->type->length;
+    }
+
+    virtual StringBuffer & serialize(StringBuffer & out) const override;
+    virtual MemoryBuffer & serialize(MemoryBuffer & out) const override;
+protected:
+    Linked<SharedRtlTypeInfo> subType;
+    size32_t subLength;
+};
+
+StringBuffer & SubStringFieldFilter::serialize(StringBuffer & out) const
+{
+    out.append(':').append(subLength).append("=");
+    return values->serialize(out);
+}
+
+MemoryBuffer & SubStringFieldFilter::serialize(MemoryBuffer & out) const
+{
+    out.append(':').append(subLength);
+    return values->serialize(out);
+}
+
+
+class FixedSubStringFieldFilter : public SubStringFieldFilter
+{
+public:
+    FixedSubStringFieldFilter(unsigned _field, const RtlTypeInfo & _fieldType, SharedRtlTypeInfo * _subType, IValueSet * _values)
+    : SubStringFieldFilter(_field, _fieldType, _subType, _values)
+    {
+    }
+
+    virtual IFieldFilter *remap(unsigned newField) const override { return new SubStringFieldFilter(newField, type, subType, values); }
+};
+
+
+
+class VariableSubStringFieldFilter : public SubStringFieldFilter
+{
+public:
+    VariableSubStringFieldFilter(unsigned _field, const RtlTypeInfo & _fieldType, SharedRtlTypeInfo * _subType, IValueSet * _values)
+    : SubStringFieldFilter(_field, _fieldType, _subType, _values),
+      maxTempLength(subLength * 4) //Maximum expansion from length to size is 4 bytes for utf8
+    {
+    }
+
+//Simple row matching
+    virtual bool matches(const RtlRow & row) const override;
+
+//More complex index matching
+    virtual int compareLowest(const RtlRow & left, unsigned range) const override;
+    virtual int compareHighest(const RtlRow & left, unsigned range) const override;
+    virtual int findForwardMatchRange(const RtlRow & row, unsigned & matchRange) const override;
+
+    virtual IFieldFilter *remap(unsigned newField) const override { return new VariableSubStringFieldFilter(newField, type, subType, values); }
+
+protected:
+    const unsigned maxTempLength;
+};
+
+
+bool VariableSubStringFieldFilter::matches(const RtlRow & row) const
+{
+    const byte * ptr = row.queryField(field);
+    size32_t len = *reinterpret_cast<const size32_t *>(ptr);
+    if (likely(len >= subLength))
+        return values->matches(ptr + sizeof(size32_t));
+
+    //Clone and expand the string to the expected length
+    byte * temp = (byte *)alloca(maxTempLength);
+    RtlStaticRowBuilder builder(temp, maxTempLength);
+    translateScalar(builder, 0, nullptr, *subType->type, type, ptr);
+    return values->matches(temp);
+}
+
+int VariableSubStringFieldFilter::compareLowest(const RtlRow & left, unsigned range) const
+{
+    const byte * ptr = left.queryField(field);
+    size32_t len = *reinterpret_cast<const size32_t *>(ptr);
+    if (likely(len >= subLength))
+        return values->compareLowest(ptr + sizeof(size32_t), range);
+
+    //Clone and expand the string to the expected length
+    byte * temp = (byte *)alloca(maxTempLength);
+    RtlStaticRowBuilder builder(temp, maxTempLength);
+    translateScalar(builder, 0, nullptr, *subType->type, type, ptr);
+    return values->compareLowest(temp, range);
+}
+
+int VariableSubStringFieldFilter::compareHighest(const RtlRow & left, unsigned range) const
+{
+    const byte * ptr = left.queryField(field);
+    size32_t len = *reinterpret_cast<const size32_t *>(ptr);
+    if (likely(len >= subLength))
+        return values->compareHighest(ptr + sizeof(size32_t), range);
+
+    //Clone and expand the string to the expected length
+    byte * temp = (byte *)alloca(maxTempLength);
+    RtlStaticRowBuilder builder(temp, maxTempLength);
+    translateScalar(builder, 0, nullptr, *subType->type, type, ptr);
+    return values->compareHighest(temp, range);
+}
+
+int VariableSubStringFieldFilter::findForwardMatchRange(const RtlRow & row, unsigned & matchRange) const
+{
+    const byte * ptr = row.queryField(field);
+    size32_t len = *reinterpret_cast<const size32_t *>(ptr);
+    if (likely(len >= subLength))
+        return values->findForwardMatchRange(ptr + sizeof(size32_t), matchRange);
+
+    //Clone and expand the string to the expected length
+    byte * temp = (byte *)alloca(maxTempLength);
+    RtlStaticRowBuilder builder(temp, maxTempLength);
+    translateScalar(builder, 0, nullptr, *subType->type, type, ptr);
+    return values->findForwardMatchRange(temp, matchRange);
+}
+
+
+IFieldFilter * createSubStringFieldFilter(unsigned fieldId, size32_t subLength, IValueSet * values)
+{
+    const RtlTypeInfo & type = values->queryType();
+    if (subLength == 0)
+    {
+        //Does the set include a blank value?
+        MemoryBuffer blank;
+        MemoryBufferBuilder builder(blank, 0);
+        type.buildString(builder, 0, nullptr, 0, "");
+
+        bool valuesMatchBlank = values->matches(blank.bytes());
+        if (valuesMatchBlank)
+            return createWildFieldFilter(fieldId, type);
+        else
+            return createEmptyFieldFilter(fieldId, type);
+    }
+
+    //Check for substring that doesn't truncate the field.
+    if (type.isFixedSize())
+    {
+        size32_t fieldLength = type.length;
+        if (subLength >= fieldLength)
+            return new SetFieldFilter(fieldId, values);
+    }
+
+    switch (type.getType())
+    {
+    case type_string:
+    case type_qstring:
+    case type_unicode:
+    case type_utf8:
+    case type_data:
+        break;
+    default:
+        throw MakeStringException(2, "Invalid type for substring filter");
+    }
+
+    //Created a truncated type
+    FieldTypeInfoStruct info;
+    info.fieldType = (type.fieldType & ~RFTMunknownsize);
+    info.length = subLength;
+    Owned<SharedRtlTypeInfo> subType(new SharedRtlTypeInfo(info.createRtlTypeInfo(nullptr)));
+
+    //Create a new set of values truncated to the appropriate length.
+    Owned<IValueSet> newValues = values->cast(*subType->type);
+    if (type.isFixedSize())
+    {
+        //The standard compare will only look at the first subLength characters.
+        return new FixedSubStringFieldFilter(fieldId, type, subType, newValues);
+    }
+
+    //Check that the temporary buffer that *might* be required is a sensible size for storing on the stack
+    if (subLength > 256)
+        throw MakeStringException(3, "Substring [1..%u] range is too large for a variable size field", subLength);
+
+    //Use a class which will expand the string to the sub length if it is shorter
+    return new VariableSubStringFieldFilter(fieldId, type, subType, newValues);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+/*
  * A field filter that can match any value.
  */
 class WildFieldFilter : public FieldFilter
@@ -1295,6 +1541,29 @@ IFieldFilter * deserializeFieldFilter(unsigned fieldId, const RtlTypeInfo & type
             deserializeSet(*values, src+1);
             return createFieldFilter(fieldId, values);
         }
+    case ':':
+        {
+            char * end;
+            size32_t subLength = strtoul(src+1, &end, 10);
+            if (*end != '=')
+                UNIMPLEMENTED_X("Expected =");
+
+            //Created a truncated type
+            FieldTypeInfoStruct info;
+            info.fieldType = (type.fieldType & ~RFTMunknownsize);
+            info.length = subLength;
+            Owned<SharedRtlTypeInfo> subType(new SharedRtlTypeInfo(info.createRtlTypeInfo(nullptr)));
+
+            //The serialized set is already truncated to the appropriate length.
+            Owned<IValueSet> values = createValueSet(*subType->type);
+            deserializeSet(*values, end+1);
+            if (type.isFixedSize())
+            {
+                return new FixedSubStringFieldFilter(fieldId, type, subType, values);
+            }
+
+            return new VariableSubStringFieldFilter(fieldId, type, subType, values);
+        }
     }
 
     UNIMPLEMENTED_X("Unknown Field Filter");
@@ -1313,9 +1582,36 @@ IFieldFilter * deserializeFieldFilter(unsigned fieldId, const RtlTypeInfo & type
             Owned<IValueSet> values = createValueSet(type, in);
             return createFieldFilter(fieldId, values);
         }
+    case ':':
+        {
+            size32_t subLength;
+            in.read(subLength);
+
+            //Created a truncated type
+            FieldTypeInfoStruct info;
+            info.fieldType = (type.fieldType & ~RFTMunknownsize);
+            info.length = subLength;
+            Owned<SharedRtlTypeInfo> subType(new SharedRtlTypeInfo(info.createRtlTypeInfo(nullptr)));
+
+            //The serialized set is already truncated to the appropriate length.
+            Owned<IValueSet> values = createValueSet(*subType->type, in);
+            if (type.isFixedSize())
+            {
+                return new FixedSubStringFieldFilter(fieldId, type, subType, values);
+            }
+
+            return new VariableSubStringFieldFilter(fieldId, type, subType, values);
+        }
     }
 
-    UNIMPLEMENTED_X("Unknown Field Filter");
+    throwUnexpectedX("Unknown Field Filter");
+}
+
+IFieldFilter * deserializeFieldFilter(const RtlRecord & searchRecord, MemoryBuffer & in)
+{
+    unsigned fieldNum;
+    in.readPacked(fieldNum);
+    return deserializeFieldFilter(fieldNum, *searchRecord.queryType(fieldNum), in);
 }
 
 //---------------------------------------------------------------------------------------------------------------------
@@ -2420,6 +2716,7 @@ protected:
     const RtlIntTypeInfo int4 = RtlIntTypeInfo(type_int, 4);
     const RtlStringTypeInfo str1 = RtlStringTypeInfo(type_string, 1);
     const RtlStringTypeInfo str2 = RtlStringTypeInfo(type_string, 2);
+    const RtlStringTypeInfo str4 = RtlStringTypeInfo(type_string, 4);
     const RtlStringTypeInfo strx = RtlStringTypeInfo(type_string|RFTMunknownsize, 0);
     const RtlFieldInfo id = RtlFieldInfo("id", nullptr, &int2);
     const RtlFieldInfo extra = RtlFieldInfo("extra", nullptr, &strx);
@@ -2429,6 +2726,31 @@ protected:
     const RtlRecordTypeInfo recordType = RtlRecordTypeInfo(type_record, 4, fields);
     const RtlRecord record = RtlRecord(recordType, true);
 
+    void verifyFilter(const RtlRecord & searchRecord, IFieldFilter * filter)
+    {
+        StringBuffer str1, str2;
+        filter->serialize(str1);
+        Owned<IFieldFilter> clone1 = deserializeFieldFilter(filter->queryFieldIndex(), filter->queryType(), str1);
+        clone1->serialize(str2);
+
+        MemoryBuffer mem1, mem2;
+        //Should this be part of the serialize?  There are issues with serializing conditions for ifblocks if it is.
+        mem1.appendPacked(filter->queryFieldIndex());
+        filter->serialize(mem1);
+        Owned<IFieldFilter> clone2 = deserializeFieldFilter(searchRecord, mem1);
+        mem2.appendPacked(filter->queryFieldIndex());
+        clone2->serialize(mem2);
+
+        if (!streq(str1, str2))
+            CPPUNIT_ASSERT_EQUAL(str1.str(), str2.str());
+
+        if ((mem1.length() != mem2.length()) || memcmp(mem1.bytes(), mem2.bytes(), mem1.length()) != 0)
+        {
+            printf("Filter %s failed\n", str1.str());
+            CPPUNIT_ASSERT_MESSAGE("Binary filter deserialize failed", false);
+        }
+    }
+
     void processFilter(RowFilter & cursor, const char * originalFilter, const RtlRecord & searchRecord)
     {
         const char * filter = originalFilter;
@@ -2449,13 +2771,23 @@ protected:
 
             const char * equal = strchr(next, '=');
             assertex(equal);
-            StringBuffer fieldName(equal-next, next);
+            const char * colon = strchr(next, ':');
+            if (colon && colon > equal)
+                colon = nullptr;
+            StringBuffer fieldName(colon ? colon - next : equal-next, next);
             unsigned fieldNum = searchRecord.getFieldNum(fieldName);
             assertex(fieldNum != (unsigned) -1);
             const RtlTypeInfo *fieldType = searchRecord.queryType(fieldNum);
             Owned<IValueSet> set = createValueSet(*fieldType);
             deserializeSet(*set, equal+1);
-            cursor.addFilter(*new SetFieldFilter(fieldNum, set));
+
+            IFieldFilter * filter;
+            if (colon)
+                filter = createSubStringFieldFilter(fieldNum, atoi(colon+1), set);
+            else
+                filter = new SetFieldFilter(fieldNum, set);
+            cursor.addFilter(*filter);
+            verifyFilter(searchRecord, filter);
         }
     }
 
@@ -2492,6 +2824,11 @@ protected:
         testFilter("extra=['MAR','MAS']", { true, false, false, true, true, false });
         testFilter("extra=('MAR','MAS')", { true, false, false, false, false, false });
         testFilter("id=(,257]", { true, true, true, true, false, false });
+        testFilter("extra:2=['MA']", { true, false, false, true, true, true });
+        testFilter("extra:2=['  ']", { false, true, false, false, false, false });
+        testFilter("extra:0=['XX']", { false, false, false, false, false, false });
+        testFilter("extra:0=['']", { true, true, true, true, true, true });
+        testFilter("name:1=['A']", { false, true, false, true, false, false });
     }
 
 
@@ -2615,6 +2952,13 @@ protected:
     const RtlRecordTypeInfo testRecordType = RtlRecordTypeInfo(type_record, 6, testFields);
     const RtlRecord testRecord = RtlRecord(testRecordType, true);
 
+    const RtlFieldInfo f1s = RtlFieldInfo("f1", nullptr, &strx);
+    const RtlFieldInfo f2s = RtlFieldInfo("f2", nullptr, &str4);
+    const RtlFieldInfo f3s = RtlFieldInfo("f3", nullptr, &str2);
+    const RtlFieldInfo * const testFieldsB[4] = { &f1s, &f2s, &f3s, nullptr };
+    const RtlRecordTypeInfo testRecordTypeB = RtlRecordTypeInfo(type_record, 10, testFieldsB);
+    const RtlRecord testRecordB = RtlRecord(testRecordTypeB, true);
+
     void timeKeyedScan(const PointerArray & rows, const RtlRecord & searchRecord, const char * filterText)
     {
         RowFilter filter;
@@ -2700,26 +3044,44 @@ protected:
     }
 
 
-    void testKeyed2()
+    void testKeyed2(const RtlRecord & record, bool testSubString)
     {
         PointerArray rows;
-        generateOrderedRows(rows, testRecord);
-
-        testKeyed(rows, testRecord, "");
-        testKeyed(rows, testRecord, "f1=[5]");
-        testKeyed(rows, testRecord, "f1=[0]");
-        testKeyed(rows, testRecord, "f2=[1]");
-        testKeyed(rows, testRecord, "f3=[1]");
-        testKeyed(rows, testRecord, "f3=[4]");
-        testKeyed(rows, testRecord, "f3=[1,3]");
-        testKeyed(rows, testRecord, "f3=[1],[2],[3]");
-        testKeyed(rows, testRecord, "f1=[21];f2=[20];f3=[4]");
-        testKeyed(rows, testRecord, "f1=[7];f3=[5]");
-        testKeyed(rows, testRecord, "f1=[7,];f3=[,5]");
+        generateOrderedRows(rows, record);
+
+        testKeyed(rows, record, "");
+        testKeyed(rows, record, "f1=[5]");
+        testKeyed(rows, record, "f1=[0]");
+        testKeyed(rows, record, "f2=[1]");
+        testKeyed(rows, record, "f3=[1]");
+        testKeyed(rows, record, "f3=[4]");
+        testKeyed(rows, record, "f3=[1,3]");
+        testKeyed(rows, record, "f3=[1],[2],[3]");
+        testKeyed(rows, record, "f1=[21];f2=[20];f3=[4]");
+        testKeyed(rows, record, "f1=[7];f3=[5]");
+        testKeyed(rows, record, "f1=[7,];f3=[,5]");
+
+        if (testSubString)
+        {
+            testKeyed(rows, record, "f1:1=['1']");
+            testKeyed(rows, record, "f1:2=['10']");
+            testKeyed(rows, record, "f1:30=['5']");
+            testKeyed(rows, record, "f3:1=['1']");
+            testKeyed(rows, record, "f3:2=['10']");
+            testKeyed(rows, record, "f3:2=['123']");
+            testKeyed(rows, record, "f3:2=['123','126']");
+        }
 
         ForEachItemIn(i, rows)
             delete [] (byte *)rows.item(i);
     }
+
+    void testKeyed2()
+    {
+        testKeyed2(testRecord, false);
+        testKeyed2(testRecordB, true);
+    }
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(ValueSetTest);

+ 2 - 1
system/jlib/jbuff.hpp

@@ -167,7 +167,6 @@ public:
     inline bool     needSwapEndian() { return swapEndian; }
     int             setEndian(int endian);          // pass __[BIG|LITTLE]_ENDIAN
     bool            setSwapEndian(bool swap);
-    inline const char * toByteArray() const { return curLen ? buffer : NULL; }
     void            swapWith(MemoryBuffer & other);
 
     inline size32_t capacity() { return (maxLen - curLen); }
@@ -189,6 +188,8 @@ public:
     inline void     Release() const                         { delete this; }    // for consistency even though not link counted
 
     inline void *   bufferBase() const { return buffer; }
+    inline const char * toByteArray() const { return curLen ? buffer : nullptr; }
+    inline const byte * bytes() const { return curLen ? (const byte *)buffer : nullptr; }
 
 
 protected: