瀏覽代碼

Merge pull request #5836 from jakesmith/hpcc-11256

HPCC-11256 - Introduce new SDS node subscription mechanism

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父節點
當前提交
2032af6a13
共有 9 個文件被更改,包括 775 次插入170 次删除
  1. 2 2
      dali/base/dacoven.cpp
  2. 25 0
      dali/base/dacsds.cpp
  3. 90 94
      dali/base/dacsds.ipp
  4. 415 69
      dali/base/dasds.cpp
  5. 9 3
      dali/base/dasds.hpp
  6. 13 2
      dali/base/dasubs.cpp
  7. 2 0
      dali/base/dasubs.ipp
  8. 1 0
      system/jlib/jptree.hpp
  9. 218 0
      testing/unittests/dalitests.cpp

+ 2 - 2
dali/base/dacoven.cpp

@@ -37,12 +37,12 @@ extern void closedownDFS();
 // base is saved in store whenever block exhausted, so replacement coven servers can restart 
 
 // server side versioning.
-#define ServerVersion    "3.11"
+#define ServerVersion    "3.12"
 #define MinClientVersion "1.5"
 
 
 // client side default versioning.
-static StringAttr ClientVersion("3.5");
+static StringAttr ClientVersion("3.6");
 static StringAttr MinServerVersion("3.1");      // when this upped check initClientProcess instances
 static CDaliVersion _ServerVersion;
 

+ 25 - 0
dali/base/dacsds.cpp

@@ -41,6 +41,7 @@ static unsigned clientThrottleDelay;
 #define MIN_GETXPATHS_CONNECT_SVER "3.2"
 #define MIN_APPEND_OPT_SVER "3.3"
 #define MIN_GETIDS_SVER "3.5"
+#define MIN_NODESUBSCRIBE_SVER "3.12"
 
 
 static ISDSManager *SDSManager=NULL;
@@ -1652,11 +1653,35 @@ SubscriptionId CClientSDSManager::subscribe(const char *xpath, ISDSSubscription
     return subscriber->getId();
 }
 
+
+SubscriptionId CClientSDSManager::subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue)
+{
+    if (queryDaliServerVersion().compare(MIN_NODESUBSCRIBE_SVER) < 0)
+        throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " MIN_NODESUBSCRIBE_SVER " for subscribeExact");
+    assertex(xpath);
+    StringBuffer s;
+    if ('/' != *xpath)
+    {
+        s.append('/').append(xpath);
+        xpath = s.str();
+    }
+    CSDSNodeSubscriberProxy *subscriber = new CSDSNodeSubscriberProxy(xpath, sendValue, notify);
+    querySubscriptionManager(SDSNODE_PUBLISHER)->add(subscriber, subscriber->getId());
+    return subscriber->getId();
+}
+
 void CClientSDSManager::unsubscribe(SubscriptionId id)
 {
     querySubscriptionManager(SDS_PUBLISHER)->remove(id);
 }
 
+void CClientSDSManager::unsubscribeExact(SubscriptionId id)
+{
+    if (queryDaliServerVersion().compare(MIN_NODESUBSCRIBE_SVER) < 0)
+        throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " MIN_NODESUBSCRIBE_SVER " for unsubscribeExact");
+    querySubscriptionManager(SDSNODE_PUBLISHER)->remove(id);
+}
+
 StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
 {
     CMessageBuffer mb;

+ 90 - 94
dali/base/dacsds.ipp

@@ -28,52 +28,6 @@
 
 #include "dasds.ipp"
   
-
-class CSubscriberContainerBase : public CInterface, implements IInterface
-{
-    DECL_NAMEDCOUNT;
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CSubscriberContainerBase(ISubscription *_subscriber, SubscriptionId _id) : 
-      subscriber(_subscriber), id(_id)
-    {
-        INIT_NAMEDCOUNT;
-        unsubscribed = false;
-    }
-
-    bool notify(MemoryBuffer &mb)
-    {
-        try { 
-            subscriber->notify(mb); 
-            return true;
-        }
-        catch (IException *e)
-        {
-            LOG(MCuserWarning, e, "SDS: Error notifying subscriber");
-            e->Release();
-            
-        }
-        return false; // unsubscribe 
-    }
-
-    const SubscriptionId &queryId() const { return id; }
-    const void *queryFindParam() const
-    {
-        return (const void *) &id;
-    }
-
-    bool isUnsubscribed() { return unsubscribed || subscriber->aborted(); }
-    void setUnsubscribed() { unsubscribed = true; }
-
-protected:
-    Owned<ISubscription> subscriber;
-    SubscriptionId id;
-    bool unsubscribed;
-};
-
-/////////////////
-
 class CClientSDSManager;
 class CClientRemoteTree;
 class CRemoteConnection : public CConnectionBase, public CTrackChanges, implements IRemoteConnection
@@ -158,12 +112,10 @@ public:
     ~CConnectionLock() { conn.lockCrit.leave(); }
 };
 //////////////////
-class CSDSConnectionSubscriberProxy : public CInterface, implements ISubscription
+class CSDSConnectionSubscriberProxy : public CInterfaceOf<ISubscription>
 {
     DECL_NAMEDCOUNT;
 public:
-    IMPLEMENT_IINTERFACE;
-
     CSDSConnectionSubscriberProxy(ISDSConnectionSubscription &_sdsNotify, ConnectionId connId) : sdsNotify(&_sdsNotify)
     {
         INIT_NAMEDCOUNT;
@@ -199,40 +151,46 @@ private:
     Linked<ISDSConnectionSubscription> sdsNotify;
 };
 
+static void checkValidSubscriptionPath(const char *xpath)
+{
+    bool quote=false, sep=false;
+    const char *_xpath = xpath;
+    loop
+    {
+        char next = *_xpath;
+        if ('\0' == next)
+            break;
+        if ('\"' == next)
+        {
+            sep = false;
+            quote = !quote;
+        }
+        else if ('/' == next && !quote)
+        {
+            if (sep)
+                throw MakeStringException(0, "UNSUPPORTED: '//' syntax unsupported in subscriber xpath (path=\"%s\")", xpath); // JCSMORE - TBD?
+            sep = true;
+        }
+        else
+            sep = false;
+        ++_xpath;
+    }
+}
+
 //////////////////
-class CSDSSubscriberProxy : public CInterface, implements ISubscription
+class CSDSSubscriberProxyBase : public CInterfaceOf<ISubscription>
 {
     DECL_NAMEDCOUNT;
+protected:
+    SubscriptionId id;
+    StringAttr xpath;
+    MemoryAttr data;
 public:
-    IMPLEMENT_IINTERFACE;
-
-    CSDSSubscriberProxy(const char *xpath, bool sub, bool sendValue, ISDSSubscription &_sdsNotify) : sdsNotify(&_sdsNotify)
+    CSDSSubscriberProxyBase(const char *_xpath, bool sendValue)
     {
         INIT_NAMEDCOUNT;
-        bool quote=false, sep=false;
-        const char *_xpath = xpath;
-        const char *end = _xpath+strlen(_xpath);
-        while (_xpath != end)
-        {
-            if ('\"' == *_xpath)
-            {
-                sep = false;
-                if (quote) quote = false;
-                else quote = true;
-            }
-            else if ('/' == *_xpath && !quote)
-            {
-                if (sep)
-                    throw MakeStringException(0, "UNSUPPORTED: '//' syntax unsupported in subscriber xpath (path=\"%s\")", xpath); // JCSMORE - TBD?
-                sep = true;
-            }
-            else
-                sep = false;
-            ++_xpath;
-        }
-        MemoryBuffer _data;
-        _data.append(xpath).append(sub).append(sendValue);
-        data.set(_data.length(), _data.toByteArray());
+        checkValidSubscriptionPath(_xpath);
+        xpath.set(_xpath);
         id = queryCoven().getUniqueId();
     }
     SubscriptionId getId() const { return id; }
@@ -242,12 +200,34 @@ public:
     {
         return data;
     }
+    virtual void abort() // called when server closes
+    { 
+        // JCS TBD?
+    }
+    virtual bool aborted() // called when server closes
+    { 
+        return false;
+    }
+};
+
+class CSDSSubscriberProxy : public CSDSSubscriberProxyBase
+{
+    Linked<ISDSSubscription> sdsNotify;
+public:
+    CSDSSubscriberProxy(const char *_xpath, bool sub, bool sendValue, ISDSSubscription &_sdsNotify)
+        : CSDSSubscriberProxyBase(_xpath, sendValue), sdsNotify(&_sdsNotify)
+    {
+        MemoryBuffer _data;
+        _data.append(xpath).append(sub).append(sendValue);
+        data.set(_data.length(), _data.toByteArray());
+    }
+// ISubscription impl.
     virtual void notify(MemoryBuffer &returnData)
     {
         StringAttr xpath;
-        SDSNotifyFlags flags;
+        int flags;
         returnData.read(xpath);
-        returnData.read((int &) flags);
+        returnData.read(flags);
         bool valueData;
         if (returnData.length()-returnData.getPos()) // remaining
         {
@@ -256,29 +236,43 @@ public:
             {
                 unsigned l;
                 returnData.read(l);
-                sdsNotify->notify(id, xpath, flags, l, returnData.readDirect(l));
+                sdsNotify->notify(id, xpath, (SDSNotifyFlags)flags, l, returnData.readDirect(l));
             }
             else
-                sdsNotify->notify(id, xpath, flags);
+                sdsNotify->notify(id, xpath, (SDSNotifyFlags)flags);
         }
         else
-            sdsNotify->notify(id, xpath, flags);
+            sdsNotify->notify(id, xpath, (SDSNotifyFlags)flags);
     }
+};
 
-    virtual void abort() // called when server closes
-    { 
-        // JCS TBD?
+class CSDSNodeSubscriberProxy : public CSDSSubscriberProxyBase
+{
+    Linked<ISDSNodeSubscription> sdsNotify;
+public:
+    CSDSNodeSubscriberProxy(const char *_xpath, bool sendValue, ISDSNodeSubscription &_sdsNotify)
+        : CSDSSubscriberProxyBase(_xpath, sendValue) , sdsNotify(&_sdsNotify)
+    {
+        MemoryBuffer _data;
+        _data.append(xpath).append(sendValue);
+        data.set(_data.length(), _data.toByteArray());
     }
-
-    virtual bool aborted() // called when server closes
-    { 
-        return false;
+// ISubscription impl.
+    virtual void notify(MemoryBuffer &returnData)
+    {
+        int flags;
+        returnData.read(flags);
+        unsigned valueLen = 0;
+        const void *valueData = NULL;
+        bool isValueData;
+        returnData.read(isValueData);
+        if (isValueData)
+        {
+            returnData.read(valueLen);
+            valueData = returnData.readDirect(valueLen);
+        }
+        sdsNotify->notify(id, (SDSNotifyFlags)flags, valueLen, valueData);
     }
-
-private:
-    SubscriptionId id;
-    MemoryAttr data, valueData;
-    Linked<ISDSSubscription> sdsNotify;
 };
 
 ////////////////////
@@ -391,7 +385,9 @@ public:
     virtual IRemoteConnections *connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout);
     virtual IRemoteConnection *connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout);
     virtual SubscriptionId subscribe(const char *xpath, ISDSSubscription &notify, bool sub=true, bool sendValue=false);
+    virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
+    virtual void unsubscribeExact(SubscriptionId id);
     virtual StringBuffer &getLocks(StringBuffer &out);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);

+ 415 - 69
dali/base/dasds.cpp

@@ -486,6 +486,46 @@ private:
 };
 
 //////////////
+
+class CSubscriberContainerBase : public CInterfaceOf<IInterface>
+{
+    DECL_NAMEDCOUNT;
+protected:
+    bool unsubscribed;
+    Owned<ISubscription> subscriber;
+    SubscriptionId id;
+public:
+    CSubscriberContainerBase(ISubscription *_subscriber, SubscriptionId _id) :
+      subscriber(_subscriber), id(_id)
+    {
+        INIT_NAMEDCOUNT;
+        unsubscribed = false;
+    }
+    bool notify(MemoryBuffer &mb) const
+    {
+        try
+        {
+            subscriber->notify(mb);
+            return true;
+        }
+        catch (IException *e)
+        {
+            LOG(MCuserWarning, e, "SDS: Error notifying subscriber");
+            e->Release();
+        }
+        return false; // unsubscribe
+    }
+    const SubscriptionId &queryId() const { return id; }
+    const void *queryFindParam() const
+    {
+        return (const void *) &id;
+    }
+    bool isUnsubscribed() { return unsubscribed || subscriber->aborted(); }
+    void setUnsubscribed() { unsubscribed = true; }
+};
+
+/////////////////
+
 class CConnectionSubscriberContainer : public CSubscriberContainerBase
 {
 public:
@@ -651,7 +691,7 @@ public:
         MemoryBuffer mb(ma.length(), ma.get());
         mb.read(xpath);
         mb.read(sub);
-        if (mb.length()-mb.getPos()) // remaining
+        if (mb.remaining()) // remaining
             mb.read(sendValue);
         else
             sendValue = false;
@@ -1686,37 +1726,49 @@ SDSNotifyFlags translatePDState(PDState state)
     return (SDSNotifyFlags) state; // mirrored for now.
 }
 
-void buildNotifyData(CPTStack &stack, MemoryBuffer &mb)
+void buildNotifyData(MemoryBuffer &notifyData, PDState state, CPTStack *stack, MemoryBuffer *data)
 {
-    mb.append('/');
-    PTree *parent = &stack.item(0); // root
-
-    unsigned n = stack.ordinality();
-    if (n>1)
+    if (stack)
     {
-        unsigned s = 1;
-        loop
+        notifyData.append('/');
+        PTree *parent = &stack->item(0); // root
+
+        unsigned n = stack->ordinality();
+        if (n>1)
         {
-            PTree &child = stack.item(s);
-            const char *str = child.queryName();
-            mb.append(strlen(str), str);
-            if (child.queryParent())
+            unsigned s = 1;
+            loop
             {
-                char temp[12];
-                unsigned written = numtostr(temp, parent->findChild(&child)+1);
-                mb.append('[').append(written, temp).append(']');
+                PTree &child = stack->item(s);
+                const char *str = child.queryName();
+                notifyData.append(strlen(str), str);
+                if (child.queryParent())
+                {
+                    char temp[12];
+                    unsigned written = numtostr(temp, parent->findChild(&child)+1);
+                    notifyData.append('[').append(written, temp).append(']');
+                }
+                else
+                    notifyData.append(3, "[1]");
+                parent = &child;
+                s++;
+                if (s<n)
+                    notifyData.append('/');
+                else
+                    break;
             }
-            else
-                mb.append(3, "[1]");
-            parent = &child;
-            s++;
-            if (s<n)
-                mb.append('/');
-            else
-                break;
         }
+        notifyData.append('\0');
+    }
+    notifyData.append((int)translatePDState(state));
+    if (data)
+    {
+        notifyData.append(true);
+        notifyData.append(data->length());
+        notifyData.append(*data);
     }
-    mb.append('\0');
+    else
+        notifyData.append(false);
 }
 
 class CSubscriberNotifier;
@@ -1732,7 +1784,7 @@ class CSubscriberNotifier : public CInterface
         MemoryBuffer notifyData;
     };
 public:
-    CSubscriberNotifier(CSubscriberNotifierTable &_table, CSubscriberContainer &_subscriber, MemoryBuffer &notifyData)
+    CSubscriberNotifier(CSubscriberNotifierTable &_table, CSubscriberContainerBase &_subscriber, MemoryBuffer &notifyData)
         : table(_table), subscriber(_subscriber)
     {
         INIT_NAMEDCOUNT;
@@ -1787,7 +1839,7 @@ public:
 private:
     Linked<CChange> change;
     CIArrayOf<CChange> changeQueue;
-    CSubscriberContainer &subscriber;
+    CSubscriberContainerBase &subscriber;
     MemoryAttr notifyData;
     CSubscriberNotifierTable &table;
 };
@@ -1803,8 +1855,6 @@ public:
     CConnectionSubscriptionManager()
     {
     }
-
-
     unsigned querySubscribers()
     {
         CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
@@ -1855,6 +1905,18 @@ interface ICoalesce : extends IInterface
 
 //////////////////////
 
+class CNodeSubscriberContainer;
+interface INodeSubscriptionManager : extends ISubscriptionManager
+{
+    virtual void associateSubscriber(CNodeSubscriberContainer &subscriber) = 0;
+    virtual void removeSubscriberAssociation(SubscriptionId id) = 0;
+    virtual void notifyDelete(CServerRemoteTree &node) = 0;
+    virtual void notify(CServerRemoteTree &node, PDState state) = 0;
+    virtual MemoryBuffer &collectSubscribers(MemoryBuffer &out) const = 0;
+};
+
+//////////////////////
+
 enum LockStatus { LockFailed, LockHeld, LockTimedOut, LockSucceeded };
 
 class CCovenSDSManager : public CSDSManagerBase, implements ISDSManagerServer, implements ISubscriptionManager, implements IExceptionHandler
@@ -1879,7 +1941,7 @@ public:
     CLockInfo *queryLockInfo(__int64 id) { return lockTable.find(&id); }
     CSubscriberTable &querySubscriberTable() { return subscribers; }
     IExternalHandler *queryExternalHandler(const char *handler) { if (!handler) return NULL; CExternalHandlerMapping *mapping = externalHandlers.find(handler); return mapping ? &mapping->query() : NULL; }
-    void handleNotify(CSubscriberContainer &subscriber, PDState state, CPTStack &stack, MemoryBuffer *data=NULL);
+    void handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer &notifyData);
     void startNotification(IPropertyTree &changeTree, CPTStack &stack, CBranchChange &changes); // subscription notification
     MemoryBuffer &collectUsageStats(MemoryBuffer &out);
     MemoryBuffer &collectConnections(MemoryBuffer &out);
@@ -1909,6 +1971,10 @@ public:
     CSubscriberContainerList *getSubscribers(const char *xpath, CPTStack &stack);
     void getExternalValue(__int64 index, MemoryBuffer &mb);
     IPropertyTree *getXPathsSortLimitMatchTree(const char *baseXPath, const char *matchXPath, const char *sortby, bool caseinsensitive, bool ascending, unsigned from, unsigned limit);
+    void addNodeSubscriber(ISubscription *sub, SubscriptionId id);
+    void removeNodeSubscriber(SubscriptionId id);
+    void notifyNodeDelete(CServerRemoteTree &node);
+    void notifyNode(CServerRemoteTree &node, PDState state);
 
 // ISDSConnectionManager
     virtual CRemoteTreeBase *get(CRemoteConnection &connection, __int64 serverId);
@@ -1927,7 +1993,9 @@ public:
     virtual IRemoteConnections *connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout);
     virtual IRemoteConnection *connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout);
     virtual SubscriptionId subscribe(const char *xpath, ISDSSubscription &notify, bool sub=true, bool sendValue=false);
+    virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
+    virtual void unsubscribeExact(SubscriptionId id);
     virtual StringBuffer &getLocks(StringBuffer &out);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);
@@ -1940,7 +2008,6 @@ public:
     virtual unsigned countConnections();
     virtual bool setSDSDebug(StringArray &params, StringBuffer &reply);
     virtual unsigned countActiveLocks();
-    virtual unsigned countSubscribers() const;
     virtual unsigned queryExternalSizeThreshold() const { return externalSizeThreshold; }
     virtual void setExternalSizeThreshold(unsigned _size) { externalSizeThreshold = _size; }
     virtual bool queryRestartOnError() const { return restartOnError; }
@@ -2004,6 +2071,7 @@ private:
     CExternalHandlerTable externalHandlers;
     CSubscriberNotifierTable subscriberNotificationTable;
     Owned<CConnectionSubscriptionManager> connectionSubscriptionManager;
+    Owned<INodeSubscriptionManager> nodeSubscriptionManager;
     bool restartOnError, externalEnvironment;
     IStoreHelper *iStoreHelper;
     bool doTimeComparison;
@@ -2574,6 +2642,8 @@ public:
             }
         }
         if (SDSManager->queryStopped()) return; // don't bother building up free list that will never be used hence (could get v. big/slow)
+        if (isSubscribed())
+            SDSManager->notifyNodeDelete(*this);
         CHECKEDCRITICALBLOCK(SDSManager->treeRegCrit, fakeCritTimeout);
 
         SDSManager->queryAllNodes().freeElem(serverId);
@@ -2727,13 +2797,198 @@ public:
         ret.setBuffer(len, mem, len-1);
         return true;
     }
-
+    void setSubscribed(bool tf)
+    {
+        if (tf)
+            IptFlagSet(flags, ipt_ext4);
+        else
+            IptFlagClr(flags, ipt_ext4);
+    }
+    inline bool isSubscribed() const
+    {
+        return IptFlagTst(flags, ipt_ext4);
+    }
 private:
     PDState processData(IPropertyTree &changeTree, Owned<CBranchChange> &parentBranchChange, MemoryBuffer &newIds);
     PDState checkChange(IPropertyTree &tree, CBranchChange *parentBranchChange=NULL);
 friend class COrphanHandler;
 };
 
+class CNodeSubscriberContainer : public CSubscriberContainerBase
+{
+    StringAttr xpath;
+    bool sendValue;
+    ICopyArrayOf<CServerRemoteTree> nodes; // never linked, node must signal the unsubscription and removal of subscriber and these references
+public:
+    CNodeSubscriberContainer(ISubscription *subscriber, SubscriptionId id, bool _sendValue, const char *_xpath)
+        : CSubscriberContainerBase(subscriber, id), sendValue(_sendValue), xpath(_xpath)
+    {
+    }
+    bool querySendValue() const { return sendValue; }
+    void add(CServerRemoteTree &node) { nodes.append(node); }
+    ICopyArrayOf<CServerRemoteTree> &queryNodes() { return nodes; }
+    MemoryBuffer &getInfo(MemoryBuffer &out) const
+    {
+        out.append(id).append(xpath).append(nodes.ordinality());
+        return out;
+    }
+};
+
+class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscriptionManager
+{
+public:
+    class CNodeSubscriberContainerList : public CSimpleInterfaceOf<IInterface>
+    {
+        CServerRemoteTree *node;
+        ICopyArrayOf<CNodeSubscriberContainer> subscribers;
+    public:
+        CNodeSubscriberContainerList(CServerRemoteTree *_node) : node(_node)
+        {
+        }
+        const void *queryFindParam() const { return &node; }
+        ICopyArrayOf<CNodeSubscriberContainer> &querySubscribers() { return subscribers; }
+        void add(CNodeSubscriberContainer &subscriber) { subscribers.append(subscriber); }
+    };
+
+    CCovenSDSManager &owner;
+    OwningSimpleHashTableOf<CNodeSubscriberContainer, SubscriptionId> subscribersById;
+    OwningSimpleHashTableOf<CNodeSubscriberContainerList, CServerRemoteTree *> subscriberListByNode;
+    CriticalSection lock;
+
+    void _notify(CServerRemoteTree *node, PDState state)
+    {
+        MemoryBuffer sendValueNotifyData;
+        CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
+        assertex(subscriberList);
+        ICopyArrayOf<CNodeSubscriberContainer> &subscribers = subscriberList->querySubscribers();
+        int lastSendValue = -1;
+        ForEachItemIn(s, subscribers)
+        {
+            CNodeSubscriberContainer &subscriber = subscribers.item(s);
+            if (subscriber.querySendValue())
+            {
+                if (1 != lastSendValue) // overkill unless many subscribers to same node
+                {
+                    MemoryBuffer mb;
+                    node->getPropBin(NULL, mb);
+                    buildNotifyData(sendValueNotifyData.clear(), state, NULL, &mb);
+                    lastSendValue = 1;
+                }
+                SDSManager->handleNotify(subscriber, sendValueNotifyData);
+            }
+            else
+            {
+                if (0 != lastSendValue) // overkill unless many subscribers to same node
+                {
+                    buildNotifyData(sendValueNotifyData.clear(), state, NULL, NULL);
+                    lastSendValue = 0;
+                }
+                SDSManager->handleNotify(subscriber, sendValueNotifyData);
+            }
+        }
+    }
+    void _removeNode(CServerRemoteTree *node, SubscriptionId id)
+    {
+        CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
+        assertex(subscriberList);
+        ICopyArrayOf<CNodeSubscriberContainer> &subscribers = subscriberList->querySubscribers();
+        ForEachItemInRev(s, subscribers)
+        {
+            CNodeSubscriberContainer &subscriber = subscribers.item(s);
+            if (0 == id) // remove all associated subscribers (node being deleted)
+            {
+                ICopyArrayOf<CServerRemoteTree> &nodes = subscriber.queryNodes();
+                verifyex(nodes.zap(*node));
+                SubscriptionId sid = subscriber.queryId();
+                subscribers.remove(s);
+                if (0 == nodes.ordinality()) // IOW this was the last node this subscriber was associated with
+                    subscribersById.remove(&sid);
+            }
+            else if (subscriber.queryId() == id)
+                subscribers.remove(s);
+        }
+        if (0 == subscribers.ordinality())
+        {
+            node->setSubscribed(false);
+            subscriberListByNode.removeExact(subscriberList);
+        }
+    }
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+    CNodeSubscriptionManager(CCovenSDSManager &_owner) : owner(_owner) { }
+    void notify(CServerRemoteTree &node, PDState state)
+    {
+        // shouldn't be here, unless node is in subscribers table
+        CriticalBlock b(lock);
+        _notify(&node, state);
+    }
+    void notifyDelete(CServerRemoteTree &node)
+    {
+        // shouldn't be here, unless node is in subscribers table
+        CriticalBlock b(lock);
+        _notify(&node, PDS_Deleted);
+        _removeNode(&node, 0);
+    }
+    // ISubscriptionManager impl.
+    virtual void add(ISubscription *sub, SubscriptionId id)
+    {
+        CriticalBlock b(lock);
+        /* calls back out to owner to scan for match, so that SDSManager can protect root/treereg.
+         * It calls back (associateSubscriber) in this class to add subscribers based on matches.
+         */
+        owner.addNodeSubscriber(sub, id);
+    }
+    virtual void remove(SubscriptionId id)
+    {
+        CriticalBlock b(lock);
+        /* calls back out to owner to protect root/treereg.
+         * It calls back into removeSubscriberAssociation.
+         */
+        owner.removeNodeSubscriber(id);
+    }
+    virtual void removeSubscriberAssociation(SubscriptionId id) // Always called back from within remove() above.
+    {
+        CNodeSubscriberContainer *subscriber = subscribersById.find(id);
+        if (!subscriber)
+            return; // may not exist if removed already
+        ICopyArrayOf<CServerRemoteTree> &nodes = subscriber->queryNodes();
+        ForEachItemIn(n, nodes)
+        {
+            CServerRemoteTree &node = nodes.item(n);
+            _removeNode(&node, id);
+        }
+        verifyex(subscribersById.removeExact(subscriber));
+    }
+    void associateSubscriber(CNodeSubscriberContainer &subscriber) // Always called back from within add() above.
+    {
+        /* caller has established there are matches and added them to 'subscriber'
+         * add to HT's
+         */
+        verifyex(subscribersById.add(*LINK(&subscriber)));
+        ICopyArrayOf<CServerRemoteTree> &nodes = subscriber.queryNodes();
+        ForEachItemIn(n, nodes)
+        {
+            CServerRemoteTree *node = &nodes.item(n);
+            CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
+            if (!subscriberList)
+            {
+                subscriberList = new CNodeSubscriberContainerList(node);
+                verifyex(subscriberListByNode.add(* subscriberList));
+            }
+            subscriberList->add(subscriber);
+        }
+    }
+    MemoryBuffer &collectSubscribers(MemoryBuffer &out) const
+    {
+        out.append(subscribersById.count());
+        SuperHashIteratorOf<CNodeSubscriberContainer> sdsNodeIter(subscribersById);
+        ForEach(sdsNodeIter)
+            sdsNodeIter.query().getInfo(out);
+        return out;
+    }
+};
+
 #if defined(_WIN32) && defined(__old_new)
 #define new __old_new
 #endif
@@ -3021,9 +3276,9 @@ PDState CServerRemoteTree::processData(IPropertyTree &changeTree, Owned<CBranchC
             if (childChanges.getPropBool("@new"))
             {
                 child = (CServerRemoteTree *)createChild(childChanges.getPropInt("@pos", -1), childChanges.queryProp("@name"));
-                mergePDState(res, PDS_Structure);
                 newIds.append(child->queryServerId());
-                mergePDState(state, PDS_Added);
+                mergePDState(localChange, PDS_Added);
+                mergePDState(res, PDS_Added);
             }
             else
             {
@@ -3053,6 +3308,8 @@ PDState CServerRemoteTree::processData(IPropertyTree &changeTree, Owned<CBranchC
     else
         branchChange->noteChange(localChange, res);
 
+    if ((localChange != PDS_None) && isSubscribed())
+        SDSManager->notifyNode(*this, localChange);
     if (!parentBranchChange.get())
         parentBranchChange.setown(branchChange.getClear());
     else
@@ -5686,7 +5943,9 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const
     }
     registerSubscriptionManager(SDS_PUBLISHER, this);
     connectionSubscriptionManager.setown(new CConnectionSubscriptionManager());
+    nodeSubscriptionManager.setown(new CNodeSubscriptionManager(*this));
     registerSubscriptionManager(SDSCONN_PUBLISHER, connectionSubscriptionManager.get());
+    registerSubscriptionManager(SDSNODE_PUBLISHER, nodeSubscriptionManager);
 
     // add external handlers
     Owned<CXMLFileExternal> xmlExternalHandler = new CXMLFileExternal(dataPath, backupHandler);
@@ -6582,11 +6841,30 @@ SubscriptionId CCovenSDSManager::subscribe(const char *xpath, ISDSSubscription &
     return subscriber->getId();
 }
 
+SubscriptionId CCovenSDSManager::subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue)
+{
+    assertex(xpath);
+    StringBuffer s;
+    if ('/' != *xpath)
+    {
+        s.append('/').append(xpath);
+        xpath = s.str();
+    }
+    CSDSNodeSubscriberProxy *subscriber = new CSDSNodeSubscriberProxy(xpath, sendValue, notify);
+    querySubscriptionManager(SDSNODE_PUBLISHER)->add(subscriber, subscriber->getId());
+    return subscriber->getId();
+}
+
 void CCovenSDSManager::unsubscribe(SubscriptionId id)
 {
     querySubscriptionManager(SDS_PUBLISHER)->remove(id);
 }
 
+void CCovenSDSManager::unsubscribeExact(SubscriptionId id)
+{
+    querySubscriptionManager(SDSNODE_PUBLISHER)->remove(id);
+}
+
 bool CCovenSDSManager::removeNotifyHandler(const char *handlerKey)
 {
     return nodeNotifyHandlers.remove(handlerKey);
@@ -7750,6 +8028,16 @@ StringBuffer &formatSubscriberInfo(MemoryBuffer &src, StringBuffer &out)
     return out;
 }
 
+StringBuffer &formatNodeSubscriberInfo(MemoryBuffer &src, StringBuffer &out)
+{
+    SubscriptionId subscriptionId;
+    StringAttr xpath;
+    unsigned nodeCount;
+    src.read(subscriptionId).read(xpath).read(nodeCount);
+    out.append("SubscriptionId=").appendf("%"I64F"x", subscriptionId).append(", xpath=").append(xpath).append(", nodes=").append(nodeCount);
+    return out;
+}
+
 StringBuffer &formatConnections(MemoryBuffer &src, StringBuffer &out)
 {
     unsigned count;
@@ -7769,18 +8057,25 @@ StringBuffer &formatConnections(MemoryBuffer &src, StringBuffer &out)
 
 StringBuffer &formatSubscribers(MemoryBuffer &src, StringBuffer &out)
 {
-    unsigned count;
-    src.read(count);
-    if (count)
+    unsigned sdsSubscribers, sdsNodeSubscribers=0;
+    src.read(sdsSubscribers);
+    unsigned s = sdsSubscribers;
+    while (s--)
     {
-        while (count--)
+        formatSubscriberInfo(src, out);
+        if (s) out.newline();
+    }
+    if (src.remaining())
+    {
+        src.read(sdsNodeSubscribers);
+        s = sdsNodeSubscribers;
+        while (s--)
         {
-            formatSubscriberInfo(src, out);
-            if (count) out.newline();
+            formatNodeSubscriberInfo(src, out);
+            if (s) out.newline();
         }
     }
-    else
-        out.append("No current subscriptions");
+    out.newline().appendf("%d xpath subscribers, %d node subscribers\n", sdsSubscribers, sdsNodeSubscribers);
     return out;
 }
 
@@ -7802,11 +8097,6 @@ unsigned CCovenSDSManager::countActiveLocks()
     return activeLocks;
 }
 
-unsigned CCovenSDSManager::countSubscribers() const
-{
-    return subscribers.count();
-}
-
 MemoryBuffer &CCovenSDSManager::collectUsageStats(MemoryBuffer &out)
 {
     { CHECKEDCRITICALBLOCK(cTableCrit, fakeCritTimeout);
@@ -7841,9 +8131,10 @@ MemoryBuffer &CCovenSDSManager::collectSubscribers(MemoryBuffer &out)
 {
     CHECKEDCRITICALBLOCK(sTableCrit, fakeCritTimeout);
     out.append(subscribers.count());
-    SuperHashIteratorOf<CSubscriberContainer> iter(subscribers.queryBaseTable());
-    ForEach(iter)
-        iter.query().getInfo(out);
+    SuperHashIteratorOf<CSubscriberContainer> sdsIter(subscribers.queryBaseTable());
+    ForEach(sdsIter)
+        sdsIter.query().getInfo(out);
+    nodeSubscriptionManager->collectSubscribers(out);
     return out;
 }
 
@@ -7940,7 +8231,7 @@ void CCovenSDSManager::handleNodeNotify(notifications n, CServerRemoteTree &tree
     }
 }
 
-void CCovenSDSManager::handleNotify(CSubscriberContainer &subscriber, PDState state, CPTStack &stack, MemoryBuffer *data)
+void CCovenSDSManager::handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer &notifyData)
 {
     class CNotifyPoolFactory : public CInterface, public IThreadFactory
     {
@@ -7984,18 +8275,6 @@ void CCovenSDSManager::handleNotify(CSubscriberContainer &subscriber, PDState st
         factory->Release();
     }
 
-    MemoryBuffer notifyData;
-    buildNotifyData(stack, notifyData);
-    notifyData.append(translatePDState(state));
-    if (data)
-    {
-        notifyData.append(true);
-        notifyData.append(data->length());
-        notifyData.append(*data);
-    }
-    else
-        notifyData.append(false);
-
     Owned<CSubscriberNotifier> _notifier;
     { CHECKEDCRITICALBLOCK(nfyTableCrit, fakeCritTimeout);
         SubscriptionId id = subscriber.queryId();
@@ -8111,6 +8390,7 @@ public:
         bool sub;
         if (prune(xpath.str(), sub, pruned))
         {
+            MemoryBuffer notifyData;
             if (sub)
             {
                 ForEachItemInRev(s, subs)
@@ -8119,7 +8399,11 @@ public:
                     if (!subscriber.isUnsubscribed())
                     {
                         if (subscriber.qualify(stack))
-                            SDSManager->handleNotify(subscriber, state, stack);
+                        {
+                            if (0 == notifyData.length())
+                                buildNotifyData(notifyData, state, &stack, NULL);
+                            SDSManager->handleNotify(subscriber, notifyData);
+                        }
                         else
                             pruned.append(*LINK(&subscriber));
                     }
@@ -8136,7 +8420,11 @@ public:
                         if (!subscriber.isUnsubscribed())
                         {
                             if (subscriber.qualify(stack))
-                                SDSManager->handleNotify(subscriber, state, stack);
+                            {
+                                if (0 == notifyData.length())
+                                    buildNotifyData(notifyData, state, &stack, NULL);
+                                SDSManager->handleNotify(subscriber, notifyData);
+                            }
                             else
                                 pruned.append(*LINK(&subscriber));
                         }
@@ -8180,8 +8468,10 @@ public:
         else if (sub) // xpath matched some subscribers, and/or below some, need to check for sub subscribers
         {
             bool ret = false;
+            MemoryBuffer notifyData;
             if (changes.state && changes.local)
             {
+                int lastSendValue = -1;
                 ForEachItemInRev(s, subs)
                 {
                     CSubscriberContainer &subscriber = subs.item(s);
@@ -8191,12 +8481,23 @@ public:
                         {
                             if (subscriber.querySendValue())
                             {
-                                MemoryBuffer mb;
-                                changes.tree->getPropBin(NULL, mb);
-                                SDSManager->handleNotify(subscriber, changes.state, stack, &mb);
+                                if (1 != lastSendValue)
+                                {
+                                    MemoryBuffer mb;
+                                    changes.tree->getPropBin(NULL, mb);
+                                    buildNotifyData(notifyData.clear(), changes.state, &stack, &mb);
+                                    lastSendValue = 1;
+                                }
                             }
                             else
-                                SDSManager->handleNotify(subscriber, changes.state, stack);
+                            {
+                                if (0 != lastSendValue)
+                                {
+                                    buildNotifyData(notifyData.clear(), changes.state, &stack, NULL);
+                                    lastSendValue = 0;
+                                }
+                            }
+                            SDSManager->handleNotify(subscriber, notifyData);
                         }
                         else
                             pruned.append(*LINK(&subscriber));
@@ -8416,6 +8717,51 @@ bool CCovenSDSManager::fireException(IException *e)
     return true;
 }
 
+void CCovenSDSManager::addNodeSubscriber(ISubscription *sub, SubscriptionId id)
+{
+    MemoryBuffer mb;
+    mb.setBuffer(sub->queryData().length(), (void *)sub->queryData().get());
+    StringAttr xpath;
+    bool sendValue;
+    mb.read(xpath);
+    mb.read(sendValue);
+
+    CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
+    CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
+    Owned<IPropertyTreeIterator> iter = root->getElements(xpath+1);
+    if (!iter->first())
+        throw MakeSDSException(SDSExcpt_SubscriptionNoMatch, "Failed to match any nodes: %s", xpath.get());
+    else
+    {
+        Owned<CNodeSubscriberContainer> subscriber = new CNodeSubscriberContainer(sub, id, sendValue, xpath);
+        do
+        {
+            CServerRemoteTree &node = (CServerRemoteTree &)iter->query();
+            node.setSubscribed(true);
+            subscriber->add(node);
+        }
+        while (iter->next());
+        nodeSubscriptionManager->associateSubscriber(*subscriber);
+    }
+}
+
+void CCovenSDSManager::removeNodeSubscriber(SubscriptionId id)
+{
+    CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
+    CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
+    nodeSubscriptionManager->removeSubscriberAssociation(id);
+}
+
+void CCovenSDSManager::notifyNodeDelete(CServerRemoteTree &node)
+{
+    nodeSubscriptionManager->notifyDelete(node);
+}
+
+void CCovenSDSManager::notifyNode(CServerRemoteTree &node, PDState state)
+{
+    nodeSubscriptionManager->notify(node, state);
+}
+
 ///////////////////////
 
 class CDaliSDSServer: public CInterface, public IDaliServer

+ 9 - 3
dali/base/dasds.hpp

@@ -40,12 +40,16 @@
 #define RTM_MODE(X, M) ((X & M) == M)
 
 enum SDSNotifyFlags { SDSNotify_None=0x00, SDSNotify_Data=0x01, SDSNotify_Structure=0x02, SDSNotify_Added=(SDSNotify_Structure+0x04), SDSNotify_Deleted=(SDSNotify_Structure+0x08), SDSNotify_Renamed=(SDSNotify_Structure+0x10) };
-typedef __int64 SDSNotificationFlags_t;
 interface ISDSSubscription : extends IInterface
 {
     virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL) = 0;
 };
 
+interface ISDSNodeSubscription : extends IInterface
+{
+    virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL) = 0;
+};
+
 interface ISDSConnectionSubscription : extends IInterface
 {
     virtual void notify() = 0;
@@ -93,7 +97,9 @@ interface ISDSManager
     virtual IRemoteConnection *connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout) = 0;
     virtual IRemoteConnections *connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout) = 0; // timeout applies to each connection
     virtual SubscriptionId subscribe(const char *xpath, ISDSSubscription &notify, bool sub=true, bool sendValue=false) = 0;
+    virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false) = 0;
     virtual void unsubscribe(SubscriptionId id) = 0;
+    virtual void unsubscribeExact(SubscriptionId id) = 0;
     virtual StringBuffer &getLocks(StringBuffer &out) = 0;
     virtual StringBuffer &getUsageStats(StringBuffer &out) = 0;
     virtual StringBuffer &getConnections(StringBuffer &out) = 0;
@@ -143,7 +149,6 @@ interface ISDSManagerServer : extends ISDSManager
     virtual bool setSDSDebug(StringArray &params, StringBuffer &reply)=0;
     virtual unsigned countConnections() = 0;
     virtual unsigned countActiveLocks() = 0;
-    virtual unsigned countSubscribers() const = 0;
     virtual unsigned queryExternalSizeThreshold() const = 0;
     virtual void setExternalSizeThreshold(unsigned _size) = 0;
     virtual bool queryRestartOnError() const = 0;
@@ -196,7 +201,8 @@ enum SDSExceptionCodes
     SDSExcpt_ClientCacheDirty,
     SDSExcpt_InvalidSessionId,
     SDSExcpt_LockHeld,
-    SDSExcpt_SubscriptionParseError
+    SDSExcpt_SubscriptionParseError,
+    SDSExcpt_SubscriptionNoMatch
 };
 
 interface ISDSException : extends IException { };

+ 13 - 2
dali/base/dasubs.cpp

@@ -466,13 +466,24 @@ public:
         size32_t dlen = data.length();
         mb.append(dlen);
         mb.append(dlen,data.get());
-        try {
+        try
+        {
             queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SUBSCRIPTION_REQUEST);
             if (mb.length())
                 throw deserializeException(mb);
         }
-        catch (IException *e) {
+        catch (IException *e)
+        {
             PrintExceptionLog(e,"Dali CDaliSubscriptionManagerStub::add");
+            {
+                CriticalBlock block(subscriptionsect);
+                unsigned idx = ids.find(id);
+                if (NotFound != idx)
+                {
+                    ids.remove(idx);
+                    subscriptions.remove(idx);
+                }
+            }
             throw;
         }
     }

+ 2 - 0
dali/base/dasubs.ipp

@@ -55,6 +55,8 @@ extern da_decl void closeSubscriptionManager();
 #define SDS_PUBLISHER       2
 #define NQS_PUBLISHER       3
 #define SDSCONN_PUBLISHER   4
+#define SDSEXACT_PUBLISHER  5
+#define SDSNODE_PUBLISHER   6
 
 
 #if 0

+ 1 - 0
system/jlib/jptree.hpp

@@ -168,6 +168,7 @@ interface IPTreeNodeCreator : extends IInterface
     virtual IPropertyTree *create(const char *tag) = 0;
 };
 
+// NB ipt_ext4 - used by SDS
 // NB ipt_ext5 - used by SDS
 enum ipt_flags { ipt_none=0x00, ipt_caseInsensitive=0x01, ipt_binary=0x02, ipt_ordered=0x04, ipt_ext1=0x08, ipt_ext2=16, ipt_ext3=32, ipt_ext4=64, ipt_ext5=128 };
 jlib_decl IPTreeMaker *createPTreeMaker(byte flags=ipt_none, IPropertyTree *root=NULL, IPTreeNodeCreator *nodeCreator=NULL);

+ 218 - 0
testing/unittests/dalitests.cpp

@@ -468,6 +468,7 @@ class DaliTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testDFSAddFailReAdd);
         CPPUNIT_TEST(testDFSRetrySuperLock);
         CPPUNIT_TEST(testDFSHammer);
+        CPPUNIT_TEST(testSDSNodeSubs);
     CPPUNIT_TEST_SUITE_END();
 
 #ifndef COMPAT
@@ -1099,6 +1100,223 @@ public:
         logctx.CTXLOG("%d subscription notifications, check sum = %"I64F"d",subchangenum,subchangetotal);
     }
 
+    class CNodeSubCommitThread : public CInterface, implements IThreaded
+    {
+        StringAttr xpath;
+        bool finalDelete;
+        CThreaded threaded;
+    public:
+        IMPLEMENT_IINTERFACE;
+
+        CNodeSubCommitThread(const char *_xpath, bool _finalDelete) : threaded("CNodeSubCommitThread"), xpath(_xpath), finalDelete(_finalDelete)
+        {
+        }
+        virtual void main()
+        {
+            unsigned mode = RTM_LOCK_WRITE;
+            if (finalDelete)
+                mode |= RTM_DELETE_ON_DISCONNECT;
+            Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), mode, 1000000);
+            assertex(conn);
+            for (unsigned i=0; i<5; i++)
+            {
+                VStringBuffer val("newval%d", i+1);
+                conn->queryRoot()->setProp(NULL, val.str());
+                conn->commit();
+            }
+            conn->queryRoot()->setProp("subnode", "newval");
+            conn->commit();
+            conn.clear(); // if finalDelete=true, deletes subscribed node in process, should get notificaiton
+
+        }
+        void start()
+        {
+            threaded.init(this);
+        }
+        void join()
+        {
+            threaded.join();
+        }
+    };
+
+
+    class CResults
+    {
+        StringArray results;
+        CRC32 crc;
+    public:
+        void add(const char *out)
+        {
+            PROGLOG("%s", out);
+            results.append(out);
+        }
+        unsigned getCRC()
+        {
+            results.sort();
+            ForEachItemIn(r, results)
+            {
+                const char *result = results.item(r);
+                crc.tally(strlen(result), result);
+            }
+            PROGLOG("CRC = %x", crc.get());
+            results.kill();
+            return crc.get();
+        }
+    };
+
+    class CSubscriber : CSimpleInterface, implements ISDSNodeSubscription
+    {
+        StringAttr path;
+        CResults &results;
+        unsigned notifications, expectedNotifications;
+        Semaphore joinSem;
+    public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+        CSubscriber(const char *_path, CResults &_results, unsigned _expectedNotifications)
+            : path(_path), results(_results), expectedNotifications(_expectedNotifications)
+        {
+            notifications = 0;
+            if (0 == expectedNotifications)
+                joinSem.signal();
+        }
+        virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+        {
+            StringAttr value;
+            if (valueLen)
+                value.set((const char *)valueData, valueLen);
+            VStringBuffer res("Subscriber(%s): flags=%d, value=%s", path.get(), flags, 0==valueLen ? "(none)" : value.get());
+            results.add(res);
+            ++notifications;
+            if (notifications == expectedNotifications)
+                joinSem.signal();
+        }
+        void join()
+        {
+            if (joinSem.wait(5000))
+            {
+                MilliSleep(100); // wait a bit, see if get more than expected
+                if (notifications == expectedNotifications)
+                {
+                    VStringBuffer out("Subscriber(%s): %d notifications received", path.get(), notifications);
+                    results.add(out);
+                    return;
+                }
+            }
+            VStringBuffer out("Expected %d notifications, received %d", expectedNotifications, notifications);
+            results.add(out);
+        }
+    };
+
+    void sdsNodeCommit(const char *test, unsigned from, unsigned to, bool finalDelete)
+    {
+        CIArrayOf<CNodeSubCommitThread> commitThreads;
+        for (unsigned i=from; i<=to; i++)
+        {
+            VStringBuffer path("/DAREGRESS/NodeSubTest/node%d", i);
+            CNodeSubCommitThread *commitThread = new CNodeSubCommitThread(path, finalDelete);
+            commitThreads.append(* commitThread);
+        }
+        ForEachItemIn(t, commitThreads)
+            commitThreads.item(t).start();
+        ForEachItemIn(t2, commitThreads)
+            commitThreads.item(t2).join();
+    }
+
+    void testSDSNodeSubs()
+    {
+        // setup
+        Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS/NodeSubTest", myProcessSession(), RTM_CREATE, 1000000);
+        IPropertyTree *root = conn->queryRoot();
+        unsigned i, ai;
+        for (i=0; i<10; i++)
+        {
+            VStringBuffer name("node%d", i+1);
+            IPropertyTree *sub = root->setPropTree(name, createPTree());
+            for (ai=0; ai<2; ai++)
+            {
+                VStringBuffer name("@attr%d", i+1);
+                VStringBuffer val("val%d", i+1);
+                sub->setProp(name, val);
+            }
+        }
+        conn.clear();
+
+        CResults results;
+
+        {
+            const char *testPath = "/DAREGRESS/NodeSubTest/doesnotexist";
+            Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 0);
+            try
+            {
+                querySDS().subscribeExact(testPath, *subscriber, true);
+                throwUnexpected();
+            }
+            catch(IException *e)
+            {
+                if (SDSExcpt_SubscriptionNoMatch != e->errorCode())
+                    throw;
+                results.add("Correctly failed to add subscriber to non-existent node.");
+            }
+            subscriber.clear();
+        }
+
+        {
+            const char *testPath = "/DAREGRESS/NodeSubTest/node1";
+            Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 2*5+1+1);
+            SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
+
+            sdsNodeCommit(testPath, 1, 1, false);
+            sdsNodeCommit(testPath, 1, 1, true); // will delete 'node1'
+
+            subscriber->join();
+            querySDS().unsubscribeExact(id); // will actually be a NOP, as will be already unsubscribed when 'node1' deleted.
+        }
+
+        {
+            const char *testPath = "/DAREGRESS/NodeSubTest/node*";
+            Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 9*6);
+            SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
+
+            sdsNodeCommit(testPath, 2, 10, false);
+
+            subscriber->join();
+            querySDS().unsubscribeExact(id);
+        }
+
+        {
+            UInt64Array subscriberIds;
+            IArrayOf<CSubscriber> subscribers;
+            for (i=2; i<=10; i++) // NB: from 2, as 'node1' deleted in previous tests
+            {
+                for (ai=0; ai<2; ai++)
+                {
+                    VStringBuffer path("/DAREGRESS/NodeSubTest/node%d[@attr%d=\"val%d\"]", i, i, i);
+                    Owned<CSubscriber> subscriber = new CSubscriber(path, results, 11);
+                    SubscriptionId id = querySDS().subscribeExact(path, *subscriber, 0==ai);
+                    subscribers.append(* subscriber.getClear());
+                    subscriberIds.append(id);
+                }
+            }
+            const char *testPath = "/DAREGRESS/NodeSubTest/node*";
+            Owned<CSubscriber> subscriber = new CSubscriber(testPath, results, 9*5+9*(5+1));
+            SubscriptionId id = querySDS().subscribeExact(testPath, *subscriber, false);
+
+            sdsNodeCommit(testPath, 2, 10, false);
+            sdsNodeCommit(testPath, 2, 10, true);
+
+            subscriber->join();
+            querySDS().unsubscribeExact(id);
+            ForEachItemIn(s, subscriberIds)
+            {
+                subscribers.item(s).join();
+                querySDS().unsubscribeExact(subscriberIds.item(s));
+            }
+        }
+
+        ASSERT(0xa68e2324 == results.getCRC() && "SDS Node notifcation differences");
+    }
+
     /*
      * This test is silly and can take a very long time on clusters with
      * a large file-system. But keeping it here for further reference.