فهرست منبع

HPCC-24383 PTree auto qualifier maps

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 سال پیش
والد
کامیت
d9e4fc5fe4
6فایلهای تغییر یافته به همراه939 افزوده شده و 84 حذف شده
  1. 17 11
      common/workunit/workunit.cpp
  2. 5 13
      dali/base/dasds.cpp
  3. 582 52
      system/jlib/jptree.cpp
  4. 3 0
      system/jlib/jptree.hpp
  5. 33 8
      system/jlib/jptree.ipp
  6. 299 0
      testing/unittests/dalitests.cpp

+ 17 - 11
common/workunit/workunit.cpp

@@ -9192,21 +9192,27 @@ void CLocalWorkUnit::releaseFile(const char *fileName)
     IPropertyTree *files = p->queryPropTree("Files");
     if (!files) return;
     Owned<IPropertyTreeIterator> fiter = files->getElements(path.str());
-    ForEach (*fiter)
+    if (fiter->first())
     {
-        IPropertyTree *file = &fiter->query();
-        unsigned usageCount = file->getPropInt("@usageCount");
-        if (usageCount > 1)
-            file->setPropInt("@usageCount", usageCount-1);
-        else
+        while (true)
         {
-            StringAttr name(file->queryProp("@name"));
-            files->removeTree(file);
-            if (!name.isEmpty()&&(1 == usageCount))
+            IPropertyTree *file = &fiter->query();
+            unsigned usageCount = file->getPropInt("@usageCount");
+            bool more = fiter->next();
+            if (usageCount > 1)
+                file->setPropInt("@usageCount", usageCount-1);
+            else
             {
-                if (queryDistributedFileDirectory().removeEntry(fileName, queryUserDescriptor()))
-                    LOG(MCdebugProgress, unknownJob, "Removed (released) file %s from DFS", name.get());
+                StringAttr name(file->queryProp("@name"));
+                files->removeTree(file);
+                if (!name.isEmpty()&&(1 == usageCount))
+                {
+                    if (queryDistributedFileDirectory().removeEntry(fileName, queryUserDescriptor()))
+                        LOG(MCdebugProgress, unknownJob, "Removed (released) file %s from DFS", name.get());
+                }
             }
+            if (!more)
+                break;
         }
     }
 }

+ 5 - 13
dali/base/dasds.cpp

@@ -798,12 +798,7 @@ public:
                     if (numeric)
                     {
                         unsigned qnum = atoi(qualifier);
-                        if (!item.queryParent())
-                        {
-                            if (qnum != 1)
-                                return false;
-                        }
-                        else if (((PTree *)item.queryParent())->findChild(&item) != qnum-1)
+                        if (qnum != (item.querySiblingPosition()+1))
                             return false;
                     }
                     else if (!item.checkPattern(qualifier))
@@ -1757,14 +1752,11 @@ void buildNotifyData(MemoryBuffer &notifyData, PDState state, CPTStack *stack, M
                 PTree &child = stack->item(s);
                 const char *str = child.queryName();
                 notifyData.append(strlen(str), str);
-                if (child.queryParent())
-                {
-                    char temp[12];
-                    unsigned written = numtostr(temp, parent->findChild(&child)+1);
-                    notifyData.append('[').append(written, temp).append(']');
-                }
-                else
+                unsigned pos = child.querySiblingPosition();
+                if (0 == pos) // common case
                     notifyData.append(3, "[1]");
+                else
+                    notifyData.append('[').append(pos+1).append(']');
                 parent = &child;
                 s++;
                 if (s<n)

+ 582 - 52
system/jlib/jptree.cpp

@@ -15,6 +15,10 @@
     limitations under the License.
 ############################################################################## */
 
+#include <unordered_map>
+#include <unordered_set>
+#include <string>
+
 #include "platform.h"
 #include "jarray.hpp"
 #include "jdebug.hpp"
@@ -51,10 +55,12 @@
 #define PTREE_COMPRESS_BOTHER_PECENTAGE (80) // i.e. if it doesn't compress to <80 % of original size don't bother
 
 
-class NullPTreeIterator : implements IPropertyTreeIterator, public CInterface
+class NullPTreeIterator final : implements IPropertyTreeIterator
 {
 public:
-    IMPLEMENT_IINTERFACE;
+    virtual ~NullPTreeIterator() {}
+    virtual void Link() const override {}
+    virtual bool Release() const override { return true; }
 // IPropertyTreeIterator
     virtual bool first() override { return false; }
     virtual bool next() override { return false; }
@@ -62,7 +68,7 @@ public:
     virtual IPropertyTree & query() override { throwUnexpected(); }
 } *nullPTreeIterator;
 
-IPropertyTreeIterator *createNullPTreeIterator() { return LINK(nullPTreeIterator); } // initialize in init mod below.
+IPropertyTreeIterator *createNullPTreeIterator() { return LINK(nullPTreeIterator); } // initialized in init mod below.
 
 
 //===================================================================
@@ -182,7 +188,7 @@ MODULE_INIT(INIT_PRIORITY_JPTREE)
 
 MODULE_EXIT()
 {
-    nullPTreeIterator->Release();
+    delete nullPTreeIterator;
     delete attrHT;
     keyTable->Release();
     keyTableNC->Release();
@@ -385,21 +391,27 @@ inline const char * readIndex(const char *xpath, StringAttr &index)
 
 
 
-inline static void readWildIdIndex(const char *&xpath, bool &wild)
+inline static void readWildIdIndex(const char *&xpath, bool &wild, bool &numeric)
 {
     const char *_xpath = xpath;
     readWildId(xpath, wild);
     if ('[' == *xpath) // check for local index not iterative qualifier.
     {
         const char *end = xpath+1;
-        if (isdigit(*end)) {
+        if (isdigit(*end))
+        {
             StringAttr index;
             end = readIndex(end, index);
             if (']' != *end)
                 throw MakeXPathException(_xpath, PTreeExcpt_XPath_ParseError, xpath-_xpath, "Qualifier brace unclosed");
             xpath = end+1;
+            numeric = true;
         }
+        else
+            numeric = false;
     }
+    else
+        numeric = false;
 }
 
 inline static unsigned getTailIdLength(const char *xxpath, unsigned xxpathlength)
@@ -531,6 +543,375 @@ const char *queryHead(const char *xpath, StringBuffer &head)
 
 ///////////////////
 
+static constexpr unsigned defaultSiblingMapThreshold = 100;
+static unsigned siblingMapThreshold = (unsigned)-1; // off until configuration default it on.
+
+void setPTreeMappingThreshold(unsigned threshold)
+{
+    /*
+    * NB: setPTreeMappingThreshold() will automatically be called via loadConfiguration
+    * Redefining this limit, will not effect existing maps, and should generally only be called once during startup.
+    */
+    if (0 == threshold)
+        threshold = (unsigned)-1;
+    siblingMapThreshold = threshold;
+}
+
+class CValueMap : public std::unordered_multimap<std::string, const IPropertyTree *>
+{
+public:
+    CValueMap(const char *_lhs, IPTArrayValue &array)
+    {
+        IPropertyTree **elements = array.getRawArray();
+        IPropertyTree **last = elements+array.elements();
+        dbgassertex(elements != last);
+        while (true)
+        {
+            const char *v = (*elements)->queryProp(_lhs);
+            if (v)
+                emplace(std::make_pair(std::string(v), *elements));
+            elements++;
+            if (last == elements)
+                break;
+        }
+    }
+    std::pair<CValueMap::iterator, CValueMap::iterator> find(const char *rhs)
+    {
+        return equal_range(std::string(rhs));
+    }
+    void insertEntry(const char *v, const IPropertyTree *tree)
+    {
+        emplace(std::make_pair(std::string(v), tree));
+    }
+    bool removeEntry(const char *v, const IPropertyTree *tree)
+    {
+        auto range = equal_range(std::string(v));
+        if (range.first == range.second)
+            return false;
+
+        auto it = range.first;
+        while (true)
+        {
+            if (it->second == tree)
+            {
+                it = erase(it);
+                return true;
+            }
+            ++it;
+            if (it == range.second)
+                break;
+        }
+        throwUnexpected();
+    }
+    void replaceEntry(const char *oldV, const char *newV, const IPropertyTree *tree)
+    {
+        verifyex(removeEntry(oldV, tree));
+        if (newV)
+            insertEntry(newV, tree);
+    }
+};
+
+class CQualifierMap
+{
+    std::unordered_map<std::string, CValueMap *> attrValueMaps;
+    CriticalSection crit;
+
+public:
+    CQualifierMap()
+    {
+    }
+    ~CQualifierMap()
+    {
+        for (auto &e: attrValueMaps)
+            delete e.second;
+    }
+    CValueMap *addMapping(const char *lhs, IPTArrayValue &array)
+    {
+        CValueMap *valueMap = new CValueMap(lhs, array);
+        attrValueMaps.emplace(std::make_pair(std::string(lhs), valueMap));
+        return valueMap;
+    }
+    CValueMap *addMappingIfNew(const char *lhs, IPTArrayValue &array)
+    {
+        CriticalBlock b(crit);
+        auto it = attrValueMaps.find(lhs);
+        if (it == attrValueMaps.end())
+            return addMapping(lhs, array);
+        else
+            return it->second;
+    }
+    void addMatchingValues(const IPropertyTree *tree)
+    {
+        for (auto &e: attrValueMaps)
+        {
+            const char *v = tree->queryProp(e.first.c_str());
+            if (v)
+                e.second->insertEntry(v, tree);
+        }
+    }
+    void removeMatchingValues(const IPropertyTree *tree)
+    {
+        for (auto &e: attrValueMaps)
+        {
+            const char *lhsp = e.first.c_str();
+            const char *oldV = tree->queryProp(lhsp);
+            if (oldV)
+                verifyex(e.second->removeEntry(oldV, tree));
+        }
+    }
+    void replaceMatchingValues(const IPropertyTree *oldTree, const IPropertyTree *newTree)
+    {
+        for (auto &e: attrValueMaps)
+        {
+            const char *lhsp = e.first.c_str();
+            const char *oldV = oldTree->queryProp(lhsp);
+            if (oldV)
+            {
+                verifyex(e.second->removeEntry(oldV, oldTree));
+                const char *newV = newTree->queryProp(lhsp);
+                if (newV)
+                    e.second->insertEntry(newV, newTree);
+            }
+        }
+    }
+    CValueMap *find(const char *lhs)
+    {
+        auto it = attrValueMaps.find(lhs);
+        if (it == attrValueMaps.end())
+            return nullptr;
+        return it->second;
+    }
+    void removeEntryIfMapped(const char *lhs, const char *v, const IPropertyTree *tree)
+    {
+        auto it = attrValueMaps.find(lhs);
+        if (it != attrValueMaps.end())
+            it->second->removeEntry(v, tree);
+    }
+    void insertEntryIfMapped(const char *lhs, const char *v, const IPropertyTree *tree)
+    {
+        auto it = attrValueMaps.find(lhs);
+        if (it != attrValueMaps.end())
+            it->second->insertEntry(v, tree);
+    }
+    void replaceEntryIfMapped(const char *lhs, const char *oldv, const char *newv, const IPropertyTree *tree)
+    {
+        auto it = attrValueMaps.find(lhs);
+        if (it != attrValueMaps.end())
+            it->second->replaceEntry(oldv, newv, tree);
+    }
+};
+
+// parse qualifier, returns true if simple equality expression found
+static bool parseEqualityQualifier(const char *&xxpath, unsigned &lhsLen, const char *&rhsBegin, unsigned &rhsLen)
+{
+    const char *xpath = xxpath;
+    while (*xpath == ' ' || *xpath == '\t') xpath++;
+    if ('@' != *xpath) // only attributes supported
+        return false;
+    const char *start = xpath;
+    char quote = 0;
+    const char *lhsEnd, *quoteBegin, *quoteEnd, *rhsEnd;
+    lhsEnd = quoteBegin = quoteEnd = rhsBegin = rhsEnd = NULL;
+    bool equalSignFound = false;
+    for (;;)
+    {
+        switch (*xpath)
+        {
+        case '"':
+        case '\'':
+            if (quote)
+            {
+                if (*xpath == quote)
+                {
+                    quote = 0;
+                    quoteEnd = xpath;
+                }
+            }
+            else
+            {
+                if (quoteBegin)
+                    throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Quoted left hand side already seen");
+                quote = *xpath;
+                quoteBegin = xpath+1;
+            }
+            break;
+        case '[':
+            if (!quote)
+                throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Unclosed qualifier detected");
+            break;
+        case ']':
+            if (!quote)
+            {
+                if (!lhsEnd)
+                    lhsEnd = xpath;
+                rhsEnd = xpath;
+            }
+            break;
+        case ' ':
+        case '\t':
+            if (!lhsEnd)
+                lhsEnd = xpath;
+            break;
+        case '!':
+        case '>':
+        case '<':
+        case '~':
+        case '/':
+            if (!quote)
+                return false;
+            break;
+        case '=':
+            if (!quote)
+            {
+                if (equalSignFound)
+                    throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Unexpected expression operator xpath");
+                equalSignFound = true;
+                if (!lhsEnd)
+                    lhsEnd = xpath;
+            }
+            break;
+        case '?':
+        case '*':
+            return false;
+        case '\0':
+            rhsEnd = xpath;
+            break;
+        }
+        if (rhsEnd)
+            break;
+        xpath++;
+        if (!rhsBegin && equalSignFound && !isspace(*xpath))
+            rhsBegin = xpath;
+    }
+    if (quote)
+        throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Parse error, unclosed quoted content");
+    if (!equalSignFound)
+        return false;
+
+    lhsLen = lhsEnd-start;
+    if (quoteBegin && !quoteEnd)
+        throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Parse error, RHS missing closing quote");
+    if (rhsBegin && !rhsEnd)
+        throw MakeXPathException(start, PTreeExcpt_XPath_ParseError, xpath-start, "Parse error, RHS missing closing quote");
+    if (!quoteBegin && rhsEnd) // only if numeric
+        return false;
+    else // quoted
+    {
+        rhsBegin = quoteBegin;
+        rhsLen = quoteEnd - rhsBegin;
+    }
+    if (rhsEnd && *xpath == ']')
+        xpath++;
+    xxpath = xpath;
+    return true;
+}
+
+class CMapQualifierIterator : public CInterfaceOf<IPropertyTreeIterator>
+{
+    CValueMap::iterator startRange, endRange;
+    CValueMap::iterator currentIter;
+public:
+    CMapQualifierIterator(CQualifierMap &_map, CValueMap::iterator _startRange, CValueMap::iterator _endRange)
+        : startRange(_startRange), endRange(_endRange)
+    {        
+    }
+
+// IPropertyTreeIterator
+    virtual bool first() override
+    {
+        currentIter = startRange;
+        return currentIter != endRange;
+    }
+    virtual bool next() override
+    {
+        currentIter++;
+        return currentIter != endRange;
+    }
+    virtual bool isValid() override
+    {
+        return currentIter != endRange;
+    }
+    virtual IPropertyTree & query() override { return const_cast<IPropertyTree &>(*currentIter->second); }
+};
+
+IPropertyTreeIterator *checkMapIterator(const char *&xxpath, IPropertyTree &child)
+{
+    /*
+    * NB: IPT's are not thread safe. It is up to the caller to ensure multiple writers do not contend.
+    * ( Dali for example ensures writer threads are exclusive )
+    * 
+    * That means multiple reader threads could be here concurrently.
+    * >1 could be constructing the qualifier map for the 1st time.
+    * For new attr updates (where map already exists), it will block on map::crit,
+    * so that there is at most 1 thread updating the map. The underlying unordered_multiset
+    * is thread safe if 1 writer, and multiple readers.
+    * 
+    * On initial map creation, allow concurrency, but only 1 will succeed to swap in the new active map.
+    * That could mean a new attr/prop. mapping is lost, until next used.
+    * NB: once the map is live, updates are write ops. and so, just as with the IPT 
+    * itself, it is expected that something will keep it thread safe (as Dali does)
+    * 
+    */
+
+    // NB: only support simple @<attrname>=<value> qualifiers
+
+    if (((unsigned)-1) == siblingMapThreshold) // disabled
+        return nullptr;
+
+    PTree &_child = (PTree &)child;
+    if (child.isCaseInsensitive()) // NB: could support but not worth it.
+        return nullptr;
+
+    IPTArrayValue *value = _child.queryValue();
+    if (!value)
+        return nullptr;
+    CQualifierMap *map = value->queryMap();
+    if (!map)
+    {
+        if (!value->isArray() || (value->elements() < siblingMapThreshold))
+            return nullptr;
+    }
+
+    unsigned lhsLen, rhsLen;
+    const char *rhsStart;
+    const char *xpath = xxpath;
+    if (!parseEqualityQualifier(xpath, lhsLen, rhsStart, rhsLen))
+        return nullptr;
+    MAKE_LSTRING(lhs, xxpath, lhsLen);
+    MAKE_LSTRING(rhs, rhsStart, rhsLen);
+
+    // NB: there can be a race here where >1 reader is constructing new map
+    CValueMap *valueMap = nullptr;
+    if (map)
+        valueMap = map->addMappingIfNew(lhs, *value);
+    else
+    {
+        OwnedPtr<CQualifierMap> newMap = new CQualifierMap();
+        valueMap = newMap->addMapping(lhs, *value);
+
+        /*
+        * NB: it's possible another read thread got here 1st, and swapped in a map.
+        * setMap returns the existing map, and the code below checks to see if it already
+        * handles the 'lhs' we're adding, if it doesn't it re-adds the qualifier mappings.
+        */
+        map = value->setMap(newMap);
+        if (!map) // successfully swapped newMap in.
+            map = newMap.getClear(); // NB: setMap owns
+        else // another thread has swapped in a map whilst I was creating new one
+            valueMap = map->addMappingIfNew(lhs, *value);
+    }
+
+    xxpath = xpath; // update parsed position
+
+    auto range = valueMap->find(rhs);
+    if (range.first != range.second)
+        return new CMapQualifierIterator(*map, range.first, range.second);
+    else
+        return LINK(nullPTreeIterator);
+}
+
+///////////////////
+
 class SeriesPTIterator : implements IPropertyTreeIterator, public CInterface
 {
 public:
@@ -766,6 +1147,80 @@ size32_t CPTValue::queryValueSize() const
 
 ///////////////////
 
+
+CPTArray::~CPTArray()
+{
+    if (map.load())
+        delete map.load();
+}
+
+CQualifierMap *CPTArray::setMap(CQualifierMap *_map)
+{
+    CQualifierMap *expected = nullptr;
+    if (map.compare_exchange_strong(expected, _map))
+        return nullptr;
+    else
+        return expected;
+}
+
+void CPTArray::addElement(IPropertyTree *tree)
+{
+    append(*tree);
+    CQualifierMap *map = queryMap();
+    if (map)
+    {
+        if (tree->getAttributeCount())
+            map->addMatchingValues(tree);
+    }
+}
+
+void CPTArray::setElement(unsigned idx, IPropertyTree *tree)
+{
+    CQualifierMap *map = queryMap();
+    if (map)
+    {
+        // remove any mappings for existing element.
+        if (isItem(idx))
+        {
+            IPropertyTree *existing = &((IPropertyTree &)item(idx));
+            map->replaceMatchingValues(existing, tree);
+        }
+        else
+            map->addMatchingValues(tree);
+    }
+    add(*tree, idx);
+}
+
+void CPTArray::removeElement(unsigned idx)
+{
+    CQualifierMap *map = queryMap();
+    if (map)
+    {
+        IPropertyTree *existing = &((IPropertyTree &)item(idx));
+        map->removeMatchingValues(existing);
+    }
+    remove(idx);
+}
+
+unsigned CPTArray::find(const IPropertyTree *search) const
+{
+    IInterface **start = getArray();
+    IInterface **last = start + ordinality();
+    IInterface **members = start;
+    while (true)
+    {
+        if (*members == search)
+            return members-start;
+        members++;
+        if (members == last)
+            break;
+    }
+    return NotFound;
+}
+
+//////////////////
+
+
 PTree::PTree(byte _flags, IPTArrayValue *_value, ChildMap *_children)
 {
     flags = _flags;
@@ -792,22 +1247,12 @@ aindex_t PTree::findChild(IPropertyTree *child, bool remove)
 {
     if (value && value->isArray())
     {
-        unsigned i;
-        for (i=0; i<value->elements(); i++)
-        {
-            IPropertyTree *_child = value->queryElement(i);
-            if (_child == child)
-            {
-                if (remove)
-                {
-                    assertex(value);
-                    value->removeElement(i);
-                }
-                return i;
-            }
-        }
+        unsigned pos = value->find(child);
+        if (remove && NotFound != pos)
+            value->removeElement(pos);
+        return pos;
     }
-    else if (children)
+    else if (checkChildren())
     {
         IPropertyTree *_child = children->query(child->queryName());
         if (_child == child)
@@ -1580,7 +2025,7 @@ IPropertyTree *PTree::setPropTree(const char *xpath, IPropertyTree *val)
 bool PTree::isArray(const char *xpath) const
 {
     if (!xpath || !*xpath) //item in an array child of parent? I don't think callers ever access array container directly
-        return (parent && parent->isArray(queryName()));
+        return arrayOwner && arrayOwner->isArray();
     else if (isAttribute(xpath))
         return false;
     else
@@ -1618,13 +2063,13 @@ void PTree::addPTreeArrayItem(IPropertyTree *existing, const char *xpath, PTree
 {
     IPropertyTree *iptval = static_cast<IPropertyTree *>(val);
     PTree *tree = nullptr;
-    val->setParent(this);
     if (existing)
     {
         dbgassertex(QUERYINTERFACE(existing, PTree));
         tree = static_cast<PTree *>(existing);
         if (tree->value && tree->value->isArray())
         {
+            val->setOwner(tree->value);
             if ((aindex_t) -1 == pos)
                 tree->value->addElement(iptval);
             else
@@ -1635,6 +2080,7 @@ void PTree::addPTreeArrayItem(IPropertyTree *existing, const char *xpath, PTree
 
     IPTArrayValue *array = new CPTArray();
     IPropertyTree *container = create(xpath, array);
+    val->setOwner(array);
     if (existing)
     {
         array->addElement(LINK(existing));
@@ -1643,7 +2089,7 @@ void PTree::addPTreeArrayItem(IPropertyTree *existing, const char *xpath, PTree
             array->addElement(iptval);
         else
             array->setElement(0, iptval);
-        tree->setParent(this);
+        tree->setOwner(array);
         children->replace(xpath, container);
     }
     else
@@ -1747,31 +2193,30 @@ bool PTree::removeTree(IPropertyTree *child)
 {
     if (child == this)
         throw MakeIPTException(-1, "Cannot remove self");
-    if (children)
+
+    if (checkChildren())
     {
-        Owned<IPropertyTreeIterator> iter = children->getIterator(false);
-        if (iter->first())
+        IPropertyTree *_child = children->query(child->queryName());
+        if (_child)
         {
-            do
+            if (child == _child)
+                return children->removeExact(child);
+            else
             {
-                PTree *element = (PTree *) &iter->query();
-                if (element == child)
-                    return children->removeExact(element);
-
-                if (element->value && element->value->isArray())
+                IPTArrayValue *value = ((PTree *)_child)->queryValue();
+                if (value && value->isArray())
                 {
-                    Linked<PTree> tmp = (PTree*) child;
-                    aindex_t i = element->findChild(child, true);
-                    if (NotFound != i)
+                    unsigned pos = value->find(child);
+                    if (NotFound != pos)
                     {
-                        removingElement(child, i);
-                        if (0 == element->value->elements())
-                            children->removeExact(element);
+                        removingElement(child, pos);
+                        value->removeElement(pos);
+                        if (0 == value->elements())
+                            children->removeExact(_child);
                         return true;
                     }
                 }
             }
-            while (iter->next());
         }
     }
     return false;
@@ -2162,10 +2607,6 @@ restart:
                         }
                         else
                         {
-                            if (wild)
-                                iter.setown(new PTIdMatchIterator(this, id, isnocase(), flags & iptiter_sort));
-                            else
-                                iter.setown(child->getElements(NULL));
                             const char *start = xxpath-1;
                             for (;;)
                             {
@@ -2195,8 +2636,17 @@ restart:
                                     ++xxpath;
                                     if (isdigit(*xxpath))
                                     {
-                                        StringAttr qualifier(start, (xxpath-1)-start);
-                                        Owned<PTStackIterator> siter = new PTStackIterator(iter.getClear(), qualifier.get());
+                                        const char *lhsStart = start+1;
+                                        Owned<IPropertyTreeIterator> siter = checkMapIterator(lhsStart, *child);
+                                        if (!siter)
+                                        {
+                                            if (wild)
+                                                iter.setown(new PTIdMatchIterator(this, id, isnocase(), flags & iptiter_sort));
+                                            else
+                                                iter.setown(child->getElements(NULL));
+                                            StringAttr qualifier(start, (xxpath-1)-start);
+                                            siter.setown(new PTStackIterator(iter.getClear(), qualifier.get()));
+                                        }
                                         StringAttr index;
                                         xxpath = readIndex(xxpath, index);
                                         unsigned i = atoi(index.get());
@@ -2207,6 +2657,21 @@ restart:
                                 }
                                 else
                                 {
+                                    if (!wild)
+                                    {
+                                        const char *lhsStart = start+1;
+                                        Owned<IPropertyTreeIterator> mapIter = checkMapIterator(lhsStart, *child);
+                                        if (mapIter)
+                                        {
+                                            xxpath = lhsStart;
+                                            iter.swap(mapIter);
+                                            break;
+                                        }
+                                    }
+                                    if (wild)
+                                        iter.setown(new PTIdMatchIterator(this, id, isnocase(), flags & iptiter_sort));
+                                    else
+                                        iter.setown(child->getElements(NULL));
                                     StringAttr qualifier(start, xxpath-start);
                                     iter.setown(new PTStackIterator(iter.getClear(), qualifier.get()));
                                     break;
@@ -2642,7 +3107,7 @@ void PTree::addLocal(size32_t l, const void *data, bool _binary, int pos)
     if (!l) return; // right thing to do on addProp("x", NULL) ?
     IPTArrayValue *newValue = new CPTValue(l, data, _binary);
     Owned<IPropertyTree> tree = create(queryName(), newValue);
-    PTree *_tree = QUERYINTERFACE(tree.get(), PTree); assertex(_tree); _tree->setParent(this);
+    PTree *_tree = QUERYINTERFACE(tree.get(), PTree); assertex(_tree);
 
     if (_binary)
         IptFlagSet(_tree->flags, ipt_binary);
@@ -2670,6 +3135,7 @@ void PTree::addLocal(size32_t l, const void *data, bool _binary, int pos)
         array->addElement(element1);
         value = array;
     }
+    _tree->setOwner(array);
     tree->Link();
     if (-1 == pos)
         array->addElement(tree);
@@ -3092,6 +3558,12 @@ bool LocalPTree::removeAttribute(const char *key)
     AttrValue *del = findAttribute(key);
     if (!del)
         return false;
+    if (arrayOwner)
+    {
+        CQualifierMap *map = arrayOwner->queryMap();
+        if (map)
+            map->removeEntryIfMapped(key, del->value.get(), this);
+    }
     numAttrs--;
     unsigned pos = del-attrs;
     del->key.destroy();
@@ -3123,6 +3595,18 @@ void LocalPTree::setAttribute(const char *key, const char *val)
         if (!v->key.set(key))
             v->key.setPtr(isnocase() ? AttrStr::createNC(key) : AttrStr::create(key));
     }
+    if (arrayOwner)
+    {
+        CQualifierMap *map = arrayOwner->queryMap();
+        if (map)
+        {
+            if (goer)
+                map->replaceEntryIfMapped(key, v->value.get(), val, this);
+            else
+                map->insertEntryIfMapped(key, val, this);
+        }
+    }
+
     if (!v->value.set(val))
         v->value.setPtr(AttrStr::create(val));
     if (goer)
@@ -3294,6 +3778,13 @@ void CAtomPTree::setAttribute(const char *key, const char *val)
     {
         if (streq(v->value.get(), val))
             return;
+        if (arrayOwner)
+        {
+            CQualifierMap *map = arrayOwner->queryMap();
+            if (map)
+                map->replaceEntryIfMapped(key, v->value.get(), val, this);
+        }
+
         AttrStr * goer = v->value.getPtr();
         if (!v->value.set(val))
         {
@@ -3317,6 +3808,12 @@ void CAtomPTree::setAttribute(const char *key, const char *val)
             memcpy(newattrs, attrs, numAttrs*sizeof(AttrValue));
             freeAttrArray(attrs, numAttrs);
         }
+        if (arrayOwner)
+        {
+            CQualifierMap *map = arrayOwner->queryMap();
+            if (map)
+                map->insertEntryIfMapped(key, val, this);
+        }
         v = &newattrs[numAttrs];
         if (!v->key.set(key))
             v->key.setPtr(attrHT->addkey(key, isnocase()));
@@ -3333,6 +3830,12 @@ bool CAtomPTree::removeAttribute(const char *key)
     if (!del)
         return false;
     numAttrs--;
+    if (arrayOwner)
+    {
+        CQualifierMap *map = arrayOwner->queryMap();
+        if (map)
+            map->removeEntryIfMapped(key, del->value.get(), this);
+    }
     CriticalBlock block(hashcrit);
     if (del->key.isPtr())
         attrHT->removekey(del->key.getPtr(), isnocase());
@@ -3649,14 +4152,41 @@ bool PTStackIterator::next()
                     if (iter->isValid()) 
                         pushToStack(iter, xxpath);
 
-                    bool wild;
+                    bool wild, numeric;
                     const char *start = xxpath;
-                    readWildIdIndex(xxpath, wild);
+                    readWildIdIndex(xxpath, wild, numeric);
                     size32_t s = xxpath-start;
+
                     if (s)
                     {
+                         // NB: actually an id not qualifier, just sharing var.
                         qualifierText.clear().append(s, start);
-                        setIterator(element->getElements(qualifierText));
+
+                        bool mapped = false;
+                        if (!wild && !numeric)
+                        {
+                            ChildMap *children = ((PTree *)element)->checkChildren();
+                            if (children)
+                            {
+                                IPropertyTree *child = children->query(qualifierText);
+                                if (child)
+                                {
+                                    if ('[' == *xxpath)
+                                    {
+                                        const char *newXXPath = xxpath+1;
+                                        Owned<IPropertyTreeIterator> mapIter = checkMapIterator(newXXPath, *child);
+                                        if (mapIter)
+                                        {
+                                            setIterator(mapIter.getClear());
+                                            mapped = true;
+                                            xxpath = newXXPath;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        if (!mapped)
+                            setIterator(element->getElements(qualifierText));
                     }
                     else // must be qualifier.
                     {
@@ -8077,8 +8607,8 @@ jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, con
         holdLoop();
 #endif
 
-    if (!globalConfiguration)
-        globalConfiguration.setown(createPTree("global"));
+    unsigned ptreeMappingThreshold = globalConfiguration->getPropInt("@ptreeMappingThreshold", defaultSiblingMapThreshold);
+    setPTreeMappingThreshold(ptreeMappingThreshold);
 
     componentConfiguration.set(config);
     return config.getClear();

+ 3 - 0
system/jlib/jptree.hpp

@@ -358,4 +358,7 @@ jlib_decl void saveYAML(IIOStream &stream, const IPropertyTree *tree, unsigned i
 jlib_decl void printYAML(const IPropertyTree *tree, unsigned indent = 0, unsigned flags=YAML_HideRootArrayObject);
 jlib_decl void dbglogYAML(const IPropertyTree *tree, unsigned indent = 0, unsigned flags=YAML_HideRootArrayObject);
 
+// Defines the threshold where attribute value maps are created for sibling ptrees for fast lookups
+jlib_decl void setPTreeMappingThreshold(unsigned threshold);
+
 #endif

+ 33 - 8
system/jlib/jptree.ipp

@@ -138,11 +138,14 @@ jlib_decl const char *splitXPathUQ(const char *xpath, StringBuffer &path);
 jlib_decl const char *queryHead(const char *xpath, StringBuffer &head);
 jlib_decl const char *queryNextUnquoted(const char *str, char c);
 
+class CQualifierMap;
 interface IPTArrayValue
 {
     virtual ~IPTArrayValue() { }
 
     virtual bool isArray() const = 0;
+    virtual CQualifierMap *queryMap() = 0;
+    virtual CQualifierMap *setMap(CQualifierMap *_map) = 0;
     virtual bool isCompressed() const = 0;
     virtual const void *queryValue() const = 0;
     virtual MemoryBuffer &getValue(MemoryBuffer &tgt, bool binary) const = 0;
@@ -155,6 +158,8 @@ interface IPTArrayValue
     virtual unsigned elements() const = 0;
     virtual const void *queryValueRaw() const = 0;
     virtual size32_t queryValueRawSize() const = 0;
+    virtual unsigned find(const IPropertyTree *search) const = 0;
+    virtual IPropertyTree **getRawArray() const = 0;
 
     virtual void serialize(MemoryBuffer &tgt) = 0;
     virtual void deserialize(MemoryBuffer &src) = 0;
@@ -162,21 +167,29 @@ interface IPTArrayValue
 
 class CPTArray : implements IPTArrayValue, private IArray
 {
+    std::atomic<CQualifierMap *> map{nullptr};
 public:
+    ~CPTArray();
     virtual bool isArray() const override { return true; }
+    virtual CQualifierMap *queryMap() override { return map.load(); }
+    virtual CQualifierMap *setMap(CQualifierMap *_map) override;
     virtual bool isCompressed() const override { return false; }
     virtual const void *queryValue() const override { UNIMPLEMENTED; }
     virtual MemoryBuffer &getValue(MemoryBuffer &tgt, bool binary) const override { UNIMPLEMENTED; }
     virtual StringBuffer &getValue(StringBuffer &tgt, bool binary) const override { UNIMPLEMENTED; }
     virtual size32_t queryValueSize() const override { UNIMPLEMENTED; }
     virtual IPropertyTree *queryElement(unsigned idx) const override { return (idx<ordinality()) ? &((IPropertyTree &)item(idx)) : NULL; }
-    virtual void addElement(IPropertyTree *tree) override { append(*tree); }
-    virtual void setElement(unsigned idx, IPropertyTree *tree) override { add(*tree, idx); }
-    virtual void removeElement(unsigned idx) override { remove(idx); }
+    virtual void addElement(IPropertyTree *tree) override;
+    virtual void setElement(unsigned idx, IPropertyTree *tree) override;
+    virtual void removeElement(unsigned idx) override;
     virtual unsigned elements() const override { return ordinality(); }
     virtual const void *queryValueRaw() const override { UNIMPLEMENTED; return NULL; }
     virtual size32_t queryValueRawSize() const override { UNIMPLEMENTED; return 0; }
-
+    virtual unsigned find(const IPropertyTree *search) const override;
+    virtual IPropertyTree **getRawArray() const override
+    {
+        return (IPropertyTree **)getArray();
+    }
 // serializable
     virtual void serialize(MemoryBuffer &tgt) override { UNIMPLEMENTED; }
     virtual void deserialize(MemoryBuffer &src) override { UNIMPLEMENTED; }
@@ -193,6 +206,8 @@ public:
     CPTValue(size32_t size, const void *data, bool binary=false, bool raw=false, bool compressed=false);
 
     virtual bool isArray() const override { return false; }
+    virtual CQualifierMap *queryMap() override { return nullptr; }
+    virtual CQualifierMap *setMap(CQualifierMap *_map) { UNIMPLEMENTED; }
     virtual bool isCompressed() const override { return compressed; }
     virtual const void *queryValue() const override;
     virtual MemoryBuffer &getValue(MemoryBuffer &tgt, bool binary) const override;
@@ -205,6 +220,8 @@ public:
     virtual unsigned elements() const override {  UNIMPLEMENTED; return (unsigned)-1; }
     virtual const void *queryValueRaw() const override { return get(); }
     virtual size32_t queryValueRawSize() const override { return (size32_t)length(); }
+    virtual unsigned find(const IPropertyTree *search) const override { throwUnexpected(); }
+    virtual IPropertyTree **getRawArray() const override { throwUnexpected(); }
 
 // serializable
     virtual void serialize(MemoryBuffer &tgt) override;
@@ -565,6 +582,7 @@ friend class SingleIdIterator;
 friend class PTLocalIteratorBase;
 friend class PTIdMatchIterator;
 friend class ChildMap;
+friend class PTStackIterator;
 
 public:
     PTree(byte _flags=ipt_none, IPTArrayValue *_value=nullptr, ChildMap *_children=nullptr);
@@ -572,7 +590,13 @@ public:
     virtual void beforeDispose() override { }
 
     virtual unsigned queryHash() const = 0;
-    IPropertyTree *queryParent() { return parent; }
+    IPTArrayValue *queryOwner() { return arrayOwner; }
+    unsigned querySiblingPosition()
+    {
+        if (!arrayOwner)
+            return 0;
+        return arrayOwner->find(this);
+    }
     IPropertyTree *queryChild(unsigned index);
     ChildMap *queryChildren() { return children; }
     aindex_t findChild(IPropertyTree *child, bool remove=false);
@@ -584,17 +608,18 @@ public:
     void serializeAttributes(MemoryBuffer &tgt);
     IPropertyTree *clone(IPropertyTree &srcTree, bool self=false, bool sub=true);
     void clone(IPropertyTree &srcTree, IPropertyTree &dstTree, bool sub=true);
-    inline void setParent(IPropertyTree *_parent) { parent = _parent; }
+    inline void setOwner(IPTArrayValue *_arrayOwner) { arrayOwner = _arrayOwner; }
     IPropertyTree *queryCreateBranch(IPropertyTree *branch, const char *prop, bool *existing=NULL);
     IPropertyTree *splitBranchProp(const char *xpath, const char *&_prop, bool error=false);
     IPTArrayValue *queryValue() { return value; }
+    CQualifierMap *queryMap() { return value ? value->queryMap() : nullptr; }
     IPTArrayValue *detachValue() { IPTArrayValue *v = value; value = NULL; return v; }
     void setValue(IPTArrayValue *_value, bool binary) { if (value) delete value; value = _value; if (binary) IptFlagSet(flags, ipt_binary); }
     bool checkPattern(const char *&xxpath) const;
     IPropertyTree *detach()
     {
         IPropertyTree *tree = create(queryName(), value, children, true);
-        PTree *_tree = QUERYINTERFACE(tree, PTree); assertex(_tree); _tree->setParent(this);
+        PTree *_tree = QUERYINTERFACE(tree, PTree); assertex(_tree); _tree->setOwner(nullptr);
 
         std::swap(numAttrs, _tree->numAttrs);
         std::swap(attrs, _tree->attrs);
@@ -694,7 +719,7 @@ protected: // data
 
     unsigned short numAttrs = 0;
     byte flags;           // set by constructor
-    IPropertyTree *parent = nullptr; // ! currently only used if tree embedded into array, used to locate position.
+    IPTArrayValue *arrayOwner = nullptr; // ! currently only used if tree embedded into array, used to locate position.
     ChildMap *children;   // set by constructor
     IPTArrayValue *value; // set by constructor
     AttrValue *attrs = nullptr;

+ 299 - 0
testing/unittests/dalitests.cpp

@@ -34,6 +34,7 @@
 
 #include <vector>
 #include <future>
+#include <math.h>
 
 #include "unittests.hpp"
 
@@ -806,6 +807,9 @@ class CDaliSDSStressTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testSDSSubs2);
         CPPUNIT_TEST(testSDSNodeSubs);
         CPPUNIT_TEST(testEphemeralLocks);
+        CPPUNIT_TEST(testSiblingPerfLocal);
+        CPPUNIT_TEST(testSiblingPerfDali);
+        CPPUNIT_TEST(testSiblingPerfContention);
     CPPUNIT_TEST_SUITE_END();
 
     const IContextLogger &logctx;
@@ -1458,6 +1462,301 @@ public:
         for (auto &f: results)
             f.get();
     }
+    void createLevel(IPropertyTree *parent, unsigned nodeSiblings, unsigned leafSiblings, unsigned attributes, unsigned depth, unsigned level)
+    {
+        StringBuffer aname;
+        StringBuffer avalue;
+        if (2 == level) // 1st level down
+        {
+            printf(".");
+            fflush(stdout);
+        }
+        unsigned levelSiblings = depth==level ? leafSiblings : nodeSiblings;
+        for (unsigned s=0; s<levelSiblings; s++)
+        {
+            IPropertyTree *child = parent->addPropTree("Child");
+            if (level<depth)
+                createLevel(child, nodeSiblings, leafSiblings, attributes, depth, level+1);
+            for (unsigned a=0; a<attributes; a++)
+            {
+                avalue.clear().appendf("%u_%u", level, s+1);
+                child->setProp(aname.clear().appendf("@aname%u", a+1), avalue.str());
+            }
+        }
+        if (1 == level) // back at top
+            printf("\n");
+    }
+    static StringBuffer &constructXPath(StringBuffer &xpath, unsigned depth, unsigned nodeSibling, unsigned leafSibling, unsigned attr, unsigned level, const char *extra=nullptr)
+    {
+        while (true)
+        {
+            unsigned sibling = depth == level ? leafSibling : nodeSibling;
+            xpath.appendf("Child[@aname%u=\"%u_%u", attr, level, sibling);
+            if (extra && (depth == level))
+                xpath.append(extra);
+            xpath.append("\"]");
+            if (level == depth)
+                break;
+            xpath.append('/');
+            level++;
+        }
+        return xpath;
+    }
+
+    void createSiblings(IPropertyTree *root, unsigned depth, unsigned attributes, unsigned nodeSiblings, unsigned leafSiblings)
+    {
+        unsigned nodes = 0;
+        for (unsigned d=1; d<=depth; d++)
+        {
+            if (d==depth)
+                nodes += pow(nodeSiblings, d-1) * leafSiblings;
+            else
+                nodes += pow(nodeSiblings, d);
+        }
+        printf("Creating %u nodes\n", nodes);
+
+        CCycleTimer timer;
+        createLevel(root, nodeSiblings, leafSiblings, attributes, depth, 1);
+        printf("%6u ms : create time\n", timer.elapsedMs());
+    }
+
+    void testSiblingPerf(std::function<IPropertyTree *(StringBuffer &, unsigned, unsigned, unsigned, unsigned, const char *)> searchFunc, unsigned depth, unsigned attributes, unsigned nodeSiblings, unsigned leafSiblings, unsigned secondaryTests)
+    {
+        try
+        {
+            StringBuffer xpath;
+            StringBuffer v;
+            xpath.ensureCapacity(1024);
+            v.ensureCapacity(1024);
+
+            auto firstSearchFunc = [&](unsigned attr)
+            {
+                CCycleTimer timer;
+                Owned<IPropertyTree> search = searchFunc(xpath.clear(), attr, depth, nodeSiblings, leafSiblings, nullptr);
+                assertex(search);
+                printf("%6u ms : 1st search (xpath=%s) time\n", timer.elapsedMs(), xpath.str());
+                xpath.clear().appendf("@aname%u", attributes);
+                v.clear().appendf("%u_%u", depth, leafSiblings);
+                assertex(streq(v, search->queryProp(xpath)));
+            };
+
+            auto secondarySearchFunc = [&](unsigned attr, const char *extra=nullptr, unsigned siblingOffset=0)->unsigned
+            {
+                unsigned leafSibling = siblingOffset+leafSiblings-1;
+                unsigned foundCount = 0;
+                CCycleTimer timer;
+                for (unsigned t=0; t<secondaryTests; t++)
+                {
+                    Owned<IPropertyTree> search = searchFunc(xpath.clear(), attr, depth, nodeSiblings, leafSibling, extra);
+                    if (search)
+                        foundCount++;
+                    leafSibling--;
+                    if (siblingOffset == leafSibling)
+                        leafSibling = siblingOffset+leafSiblings;
+                }
+                VStringBuffer msg("%6u ms : Next ", timer.elapsedMs());
+                msg.append(secondaryTests).append(" searches for aname").append(attr);
+                if (extra)
+                    msg.append(" [extra=\"").append(extra).append("\"]");
+                msg.append(" time");
+                printf("%s\n", msg.str());
+                return foundCount;
+            };
+
+            firstSearchFunc(1); // first attribute
+
+            verifyex(secondaryTests == secondarySearchFunc(1)); // first attribute
+
+            firstSearchFunc(attributes); // 1st attribute
+            verifyex(secondaryTests == secondarySearchFunc(attributes)); // last attribute
+            verifyex(secondaryTests == secondarySearchFunc(1)); // first attribute
+
+            Owned<IPropertyTree> parent = searchFunc(xpath.clear(), 1, depth-1, nodeSiblings, nodeSiblings, nullptr);
+            verifyex(parent);
+            CCycleTimer timer;
+            unsigned max = leafSiblings;
+            if (max > 1000)
+                max = 1000;
+            unsigned removedEntries = 0, newEntries = 0, changedEntries = 0;
+            unsigned step = leafSiblings / max;
+            unsigned s = 1;
+            unsigned which = 0;
+            while (true)
+            {
+                constructXPath(xpath.clear(), depth, nodeSiblings, s, 1, depth);
+                IPropertyTree *search = parent->queryPropTree(xpath);
+                assertex(search);
+                switch (which)
+                {
+                    case 0:
+                    {
+                        verifyex(parent->removeTree(search));
+                        removedEntries++;
+                        break;
+                    }
+                    case 1:
+                    {
+                        IPropertyTree *newChild = parent->addPropTree("Child");
+                        newChild->setProp("@aname1", "NEW");
+                        newEntries++;
+                        break;
+                    }
+                    case 2:
+                    {
+                        search->setProp("@aname1", v.clear().appendf("%u_%u - CHANGED", depth, s));
+                        changedEntries++;
+                        break;
+                    }
+                }
+                s += step;
+                if (s >= leafSiblings)
+                    break;
+                ++which;
+                if (which>2)
+                    which = 0;
+            }
+            printf("%6u ms : Modify, delete and create elements time\n", timer.elapsedMs());
+
+            parent.clear(); // if Dali test, then this will commit the above changes
+            parent.setown(searchFunc(xpath.clear(), 1, depth-1, nodeSiblings, nodeSiblings, nullptr));
+
+            timer.reset();
+            Owned<IPropertyTreeIterator> iter = parent->getElements(xpath.clear().append("Child[@aname1=\"NEW\"]"));
+            unsigned count = 0;
+            ForEach (*iter)
+                ++count;
+            assertex(count == newEntries);
+            printf("%6u ms : Scan of new entries time\n", timer.elapsedMs());
+
+            s = 1;
+            which = 0;
+            timer.reset();
+            while (true)
+            {
+                if (which==2)
+                {
+                    Owned<IPropertyTree> search = searchFunc(xpath.clear(), 1, depth, nodeSiblings, s, " - CHANGED");
+                    assertex(search);
+                }
+                s += step;
+                if (s >= leafSiblings)
+                    break;
+                which++;
+                if (which>2)
+                    which = 0;
+            }
+            printf("%6u ms : scans for %u changed entries time\n", timer.elapsedMs(), changedEntries);
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e, nullptr);
+            e->Release();
+        }
+    }
+    void testSiblingPerfLocal()
+    {
+        unsigned depth = 3;
+        unsigned attributes = 3;
+        unsigned nodeSiblings = 10;
+        unsigned leafSiblings = 100000;
+        unsigned secondaryTests = 1000;
+        unsigned mappingThreshold = 10;
+
+        printf("Performing testSiblingPerfLocal\n\n");
+
+        Owned<IPropertyTree> root = createPTree();
+        auto searchFunc = [&root](StringBuffer &xpath, unsigned attr, unsigned depth, unsigned nodeSiblings, unsigned leafSibling, const char *extra=nullptr)->IPropertyTree *
+        {
+            constructXPath(xpath.clear(), depth, nodeSiblings, leafSibling, attr, 1, extra);
+            return root->getPropTree(xpath.str());
+        };
+
+        createSiblings(root, depth, attributes, nodeSiblings, leafSiblings);
+
+        printf("cloning tree for 2nd test\n");
+        Owned<IPropertyTree> copyRoot = createPTreeFromIPT(root);
+
+        setPTreeMappingThreshold(0); // disable
+        printf("Performing tests with mapping disabled\n");
+        testSiblingPerf(searchFunc, depth, attributes, nodeSiblings, leafSiblings, secondaryTests);
+
+        root.setown(copyRoot.getClear());
+
+        setPTreeMappingThreshold(mappingThreshold);
+        printf("Performing tests with mapping enabled (mappingThreshold=%u)\n", mappingThreshold);
+        testSiblingPerf(searchFunc, depth, attributes, nodeSiblings, leafSiblings, secondaryTests);
+    }
+
+    void testSiblingPerfDali()
+    {
+        unsigned depth = 3;
+        unsigned attributes = 3;
+        unsigned nodeSiblings = 2;
+        unsigned leafSiblings = 100000;
+        unsigned secondaryTests = 100;
+        unsigned mappingThreshold = 10;
+
+        printf("Performing testSiblingPerfDali\n\n");
+
+        setPTreeMappingThreshold(mappingThreshold);
+
+        Owned<IRemoteConnection> conn = querySDS().connect("/testmaps", myProcessSession(), RTM_CREATE, 10000);
+        createSiblings(conn->queryRoot(), depth, attributes, nodeSiblings, leafSiblings);
+        conn.clear(); // commit
+
+        auto searchFunc = [](StringBuffer &xpath, unsigned attr, unsigned depth, unsigned nodeSiblings, unsigned leafSibling, const char *extra=nullptr)->IPropertyTree *
+        {
+            constructXPath(xpath.clear().append("/testmaps/"), depth, nodeSiblings, leafSibling, attr, 1, extra);
+            Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, 10000);
+            assertex(conn);
+            return conn->getRoot();
+        };
+
+        printf("Performing tests with both client and server side mapping enabled (mappingThreshold=%u)\n", mappingThreshold);
+        testSiblingPerf(searchFunc, depth, attributes, nodeSiblings, leafSiblings, secondaryTests);
+    }
+
+    void testSiblingPerfContention()
+    {
+        unsigned depth = 1;
+        unsigned attributes = 10;
+        unsigned nodeSiblings = 1;
+        unsigned leafSiblings = 1000;
+        unsigned mappingThreshold = 10;
+        unsigned threads = 20;
+
+        printf("Performing testSiblingPerfContention\n\n");
+
+        Owned<IPropertyTree> root = createPTree();
+        createSiblings(root, depth, attributes, nodeSiblings, leafSiblings);
+
+        setPTreeMappingThreshold(mappingThreshold);
+
+        std::vector<std::future<void>> results;
+
+        auto searchFunc = [&root](unsigned a)
+        {
+            /*
+             * NB: initially the 1st lookups are likely to clash and only 1 will create the initial map.
+             * i.e. the other threads will not use the map.
+             */
+            StringBuffer xpath;
+            for (unsigned s=1; s<=1000; s++)
+            {
+                constructXPath(xpath.clear(), 1, 1, s, a, 1, nullptr);
+                IPropertyTree *search = root->queryPropTree(xpath);
+                assertex(search);
+            }
+        };
+
+        for (unsigned t=0; t<threads; t++)
+        {
+            unsigned a = (t % (threads/2))+1;
+            results.push_back(std::async(std::launch::async, searchFunc, a));
+        }
+        for (auto &f: results)
+            f.get();
+    }
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( CDaliSDSStressTests );