|
@@ -486,6 +486,46 @@ private:
|
|
|
};
|
|
|
|
|
|
//////////////
|
|
|
+
|
|
|
+class CSubscriberContainerBase : public CInterfaceOf<IInterface>
|
|
|
+{
|
|
|
+ DECL_NAMEDCOUNT;
|
|
|
+protected:
|
|
|
+ bool unsubscribed;
|
|
|
+ Owned<ISubscription> subscriber;
|
|
|
+ SubscriptionId id;
|
|
|
+public:
|
|
|
+ CSubscriberContainerBase(ISubscription *_subscriber, SubscriptionId _id) :
|
|
|
+ subscriber(_subscriber), id(_id)
|
|
|
+ {
|
|
|
+ INIT_NAMEDCOUNT;
|
|
|
+ unsubscribed = false;
|
|
|
+ }
|
|
|
+ bool notify(MemoryBuffer &mb) const
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ subscriber->notify(mb);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ LOG(MCuserWarning, e, "SDS: Error notifying subscriber");
|
|
|
+ e->Release();
|
|
|
+ }
|
|
|
+ return false; // unsubscribe
|
|
|
+ }
|
|
|
+ const SubscriptionId &queryId() const { return id; }
|
|
|
+ const void *queryFindParam() const
|
|
|
+ {
|
|
|
+ return (const void *) &id;
|
|
|
+ }
|
|
|
+ bool isUnsubscribed() { return unsubscribed || subscriber->aborted(); }
|
|
|
+ void setUnsubscribed() { unsubscribed = true; }
|
|
|
+};
|
|
|
+
|
|
|
+/////////////////
|
|
|
+
|
|
|
class CConnectionSubscriberContainer : public CSubscriberContainerBase
|
|
|
{
|
|
|
public:
|
|
@@ -651,7 +691,7 @@ public:
|
|
|
MemoryBuffer mb(ma.length(), ma.get());
|
|
|
mb.read(xpath);
|
|
|
mb.read(sub);
|
|
|
- if (mb.length()-mb.getPos()) // remaining
|
|
|
+ if (mb.remaining()) // remaining
|
|
|
mb.read(sendValue);
|
|
|
else
|
|
|
sendValue = false;
|
|
@@ -1686,37 +1726,49 @@ SDSNotifyFlags translatePDState(PDState state)
|
|
|
return (SDSNotifyFlags) state; // mirrored for now.
|
|
|
}
|
|
|
|
|
|
-void buildNotifyData(CPTStack &stack, MemoryBuffer &mb)
|
|
|
+void buildNotifyData(MemoryBuffer ¬ifyData, PDState state, CPTStack *stack, MemoryBuffer *data)
|
|
|
{
|
|
|
- mb.append('/');
|
|
|
- PTree *parent = &stack.item(0); // root
|
|
|
-
|
|
|
- unsigned n = stack.ordinality();
|
|
|
- if (n>1)
|
|
|
+ if (stack)
|
|
|
{
|
|
|
- unsigned s = 1;
|
|
|
- loop
|
|
|
+ notifyData.append('/');
|
|
|
+ PTree *parent = &stack->item(0); // root
|
|
|
+
|
|
|
+ unsigned n = stack->ordinality();
|
|
|
+ if (n>1)
|
|
|
{
|
|
|
- PTree &child = stack.item(s);
|
|
|
- const char *str = child.queryName();
|
|
|
- mb.append(strlen(str), str);
|
|
|
- if (child.queryParent())
|
|
|
+ unsigned s = 1;
|
|
|
+ loop
|
|
|
{
|
|
|
- char temp[12];
|
|
|
- unsigned written = numtostr(temp, parent->findChild(&child)+1);
|
|
|
- mb.append('[').append(written, temp).append(']');
|
|
|
+ PTree &child = stack->item(s);
|
|
|
+ const char *str = child.queryName();
|
|
|
+ notifyData.append(strlen(str), str);
|
|
|
+ if (child.queryParent())
|
|
|
+ {
|
|
|
+ char temp[12];
|
|
|
+ unsigned written = numtostr(temp, parent->findChild(&child)+1);
|
|
|
+ notifyData.append('[').append(written, temp).append(']');
|
|
|
+ }
|
|
|
+ else
|
|
|
+ notifyData.append(3, "[1]");
|
|
|
+ parent = &child;
|
|
|
+ s++;
|
|
|
+ if (s<n)
|
|
|
+ notifyData.append('/');
|
|
|
+ else
|
|
|
+ break;
|
|
|
}
|
|
|
- else
|
|
|
- mb.append(3, "[1]");
|
|
|
- parent = &child;
|
|
|
- s++;
|
|
|
- if (s<n)
|
|
|
- mb.append('/');
|
|
|
- else
|
|
|
- break;
|
|
|
}
|
|
|
+ notifyData.append('\0');
|
|
|
+ }
|
|
|
+ notifyData.append((int)translatePDState(state));
|
|
|
+ if (data)
|
|
|
+ {
|
|
|
+ notifyData.append(true);
|
|
|
+ notifyData.append(data->length());
|
|
|
+ notifyData.append(*data);
|
|
|
}
|
|
|
- mb.append('\0');
|
|
|
+ else
|
|
|
+ notifyData.append(false);
|
|
|
}
|
|
|
|
|
|
class CSubscriberNotifier;
|
|
@@ -1732,7 +1784,7 @@ class CSubscriberNotifier : public CInterface
|
|
|
MemoryBuffer notifyData;
|
|
|
};
|
|
|
public:
|
|
|
- CSubscriberNotifier(CSubscriberNotifierTable &_table, CSubscriberContainer &_subscriber, MemoryBuffer ¬ifyData)
|
|
|
+ CSubscriberNotifier(CSubscriberNotifierTable &_table, CSubscriberContainerBase &_subscriber, MemoryBuffer ¬ifyData)
|
|
|
: table(_table), subscriber(_subscriber)
|
|
|
{
|
|
|
INIT_NAMEDCOUNT;
|
|
@@ -1787,7 +1839,7 @@ public:
|
|
|
private:
|
|
|
Linked<CChange> change;
|
|
|
CIArrayOf<CChange> changeQueue;
|
|
|
- CSubscriberContainer &subscriber;
|
|
|
+ CSubscriberContainerBase &subscriber;
|
|
|
MemoryAttr notifyData;
|
|
|
CSubscriberNotifierTable &table;
|
|
|
};
|
|
@@ -1803,8 +1855,6 @@ public:
|
|
|
CConnectionSubscriptionManager()
|
|
|
{
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
unsigned querySubscribers()
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
|
|
@@ -1855,6 +1905,18 @@ interface ICoalesce : extends IInterface
|
|
|
|
|
|
//////////////////////
|
|
|
|
|
|
+class CNodeSubscriberContainer;
|
|
|
+interface INodeSubscriptionManager : extends ISubscriptionManager
|
|
|
+{
|
|
|
+ virtual void associateSubscriber(CNodeSubscriberContainer &subscriber) = 0;
|
|
|
+ virtual void removeSubscriberAssociation(SubscriptionId id) = 0;
|
|
|
+ virtual void notifyDelete(CServerRemoteTree &node) = 0;
|
|
|
+ virtual void notify(CServerRemoteTree &node, PDState state) = 0;
|
|
|
+ virtual MemoryBuffer &collectSubscribers(MemoryBuffer &out) const = 0;
|
|
|
+};
|
|
|
+
|
|
|
+//////////////////////
|
|
|
+
|
|
|
enum LockStatus { LockFailed, LockHeld, LockTimedOut, LockSucceeded };
|
|
|
|
|
|
class CCovenSDSManager : public CSDSManagerBase, implements ISDSManagerServer, implements ISubscriptionManager, implements IExceptionHandler
|
|
@@ -1879,7 +1941,7 @@ public:
|
|
|
CLockInfo *queryLockInfo(__int64 id) { return lockTable.find(&id); }
|
|
|
CSubscriberTable &querySubscriberTable() { return subscribers; }
|
|
|
IExternalHandler *queryExternalHandler(const char *handler) { if (!handler) return NULL; CExternalHandlerMapping *mapping = externalHandlers.find(handler); return mapping ? &mapping->query() : NULL; }
|
|
|
- void handleNotify(CSubscriberContainer &subscriber, PDState state, CPTStack &stack, MemoryBuffer *data=NULL);
|
|
|
+ void handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer ¬ifyData);
|
|
|
void startNotification(IPropertyTree &changeTree, CPTStack &stack, CBranchChange &changes); // subscription notification
|
|
|
MemoryBuffer &collectUsageStats(MemoryBuffer &out);
|
|
|
MemoryBuffer &collectConnections(MemoryBuffer &out);
|
|
@@ -1909,6 +1971,10 @@ public:
|
|
|
CSubscriberContainerList *getSubscribers(const char *xpath, CPTStack &stack);
|
|
|
void getExternalValue(__int64 index, MemoryBuffer &mb);
|
|
|
IPropertyTree *getXPathsSortLimitMatchTree(const char *baseXPath, const char *matchXPath, const char *sortby, bool caseinsensitive, bool ascending, unsigned from, unsigned limit);
|
|
|
+ void addNodeSubscriber(ISubscription *sub, SubscriptionId id);
|
|
|
+ void removeNodeSubscriber(SubscriptionId id);
|
|
|
+ void notifyNodeDelete(CServerRemoteTree &node);
|
|
|
+ void notifyNode(CServerRemoteTree &node, PDState state);
|
|
|
|
|
|
// ISDSConnectionManager
|
|
|
virtual CRemoteTreeBase *get(CRemoteConnection &connection, __int64 serverId);
|
|
@@ -1927,7 +1993,9 @@ public:
|
|
|
virtual IRemoteConnections *connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout);
|
|
|
virtual IRemoteConnection *connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout);
|
|
|
virtual SubscriptionId subscribe(const char *xpath, ISDSSubscription ¬ify, bool sub=true, bool sendValue=false);
|
|
|
+ virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription ¬ify, bool sendValue=false);
|
|
|
virtual void unsubscribe(SubscriptionId id);
|
|
|
+ virtual void unsubscribeExact(SubscriptionId id);
|
|
|
virtual StringBuffer &getLocks(StringBuffer &out);
|
|
|
virtual StringBuffer &getUsageStats(StringBuffer &out);
|
|
|
virtual StringBuffer &getConnections(StringBuffer &out);
|
|
@@ -1940,7 +2008,6 @@ public:
|
|
|
virtual unsigned countConnections();
|
|
|
virtual bool setSDSDebug(StringArray ¶ms, StringBuffer &reply);
|
|
|
virtual unsigned countActiveLocks();
|
|
|
- virtual unsigned countSubscribers() const;
|
|
|
virtual unsigned queryExternalSizeThreshold() const { return externalSizeThreshold; }
|
|
|
virtual void setExternalSizeThreshold(unsigned _size) { externalSizeThreshold = _size; }
|
|
|
virtual bool queryRestartOnError() const { return restartOnError; }
|
|
@@ -2004,6 +2071,7 @@ private:
|
|
|
CExternalHandlerTable externalHandlers;
|
|
|
CSubscriberNotifierTable subscriberNotificationTable;
|
|
|
Owned<CConnectionSubscriptionManager> connectionSubscriptionManager;
|
|
|
+ Owned<INodeSubscriptionManager> nodeSubscriptionManager;
|
|
|
bool restartOnError, externalEnvironment;
|
|
|
IStoreHelper *iStoreHelper;
|
|
|
bool doTimeComparison;
|
|
@@ -2398,6 +2466,8 @@ public:
|
|
|
}
|
|
|
}
|
|
|
if (SDSManager->queryStopped()) return; // don't bother building up free list that will never be used hence (could get v. big/slow)
|
|
|
+ if (isSubscribed())
|
|
|
+ SDSManager->notifyNodeDelete(*this);
|
|
|
CHECKEDCRITICALBLOCK(SDSManager->treeRegCrit, fakeCritTimeout);
|
|
|
|
|
|
SDSManager->queryAllNodes().freeElem(serverId);
|
|
@@ -2551,13 +2621,198 @@ public:
|
|
|
ret.setBuffer(len, mem, len-1);
|
|
|
return true;
|
|
|
}
|
|
|
-
|
|
|
+ void setSubscribed(bool tf)
|
|
|
+ {
|
|
|
+ if (tf)
|
|
|
+ IptFlagSet(flags, ipt_ext4);
|
|
|
+ else
|
|
|
+ IptFlagClr(flags, ipt_ext4);
|
|
|
+ }
|
|
|
+ inline bool isSubscribed() const
|
|
|
+ {
|
|
|
+ return IptFlagTst(flags, ipt_ext4);
|
|
|
+ }
|
|
|
private:
|
|
|
PDState processData(IPropertyTree &changeTree, Owned<CBranchChange> &parentBranchChange, MemoryBuffer &newIds);
|
|
|
PDState checkChange(IPropertyTree &tree, CBranchChange *parentBranchChange=NULL);
|
|
|
friend class COrphanHandler;
|
|
|
};
|
|
|
|
|
|
+class CNodeSubscriberContainer : public CSubscriberContainerBase
|
|
|
+{
|
|
|
+ StringAttr xpath;
|
|
|
+ bool sendValue;
|
|
|
+ ICopyArrayOf<CServerRemoteTree> nodes; // never linked, node must signal the unsubscription and removal of subscriber and these references
|
|
|
+public:
|
|
|
+ CNodeSubscriberContainer(ISubscription *subscriber, SubscriptionId id, bool _sendValue, const char *_xpath)
|
|
|
+ : CSubscriberContainerBase(subscriber, id), sendValue(_sendValue), xpath(_xpath)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ bool querySendValue() const { return sendValue; }
|
|
|
+ void add(CServerRemoteTree &node) { nodes.append(node); }
|
|
|
+ ICopyArrayOf<CServerRemoteTree> &queryNodes() { return nodes; }
|
|
|
+ MemoryBuffer &getInfo(MemoryBuffer &out) const
|
|
|
+ {
|
|
|
+ out.append(id).append(xpath).append(nodes.ordinality());
|
|
|
+ return out;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscriptionManager
|
|
|
+{
|
|
|
+public:
|
|
|
+ class CNodeSubscriberContainerList : public CSimpleInterfaceOf<IInterface>
|
|
|
+ {
|
|
|
+ CServerRemoteTree *node;
|
|
|
+ ICopyArrayOf<CNodeSubscriberContainer> subscribers;
|
|
|
+ public:
|
|
|
+ CNodeSubscriberContainerList(CServerRemoteTree *_node) : node(_node)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ const void *queryFindParam() const { return &node; }
|
|
|
+ ICopyArrayOf<CNodeSubscriberContainer> &querySubscribers() { return subscribers; }
|
|
|
+ void add(CNodeSubscriberContainer &subscriber) { subscribers.append(subscriber); }
|
|
|
+ };
|
|
|
+
|
|
|
+ CCovenSDSManager &owner;
|
|
|
+ OwningSimpleHashTableOf<CNodeSubscriberContainer, SubscriptionId> subscribersById;
|
|
|
+ OwningSimpleHashTableOf<CNodeSubscriberContainerList, CServerRemoteTree *> subscriberListByNode;
|
|
|
+ CriticalSection lock;
|
|
|
+
|
|
|
+ void _notify(CServerRemoteTree *node, PDState state)
|
|
|
+ {
|
|
|
+ MemoryBuffer sendValueNotifyData;
|
|
|
+ CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
+ assertex(subscriberList);
|
|
|
+ ICopyArrayOf<CNodeSubscriberContainer> &subscribers = subscriberList->querySubscribers();
|
|
|
+ int lastSendValue = -1;
|
|
|
+ ForEachItemIn(s, subscribers)
|
|
|
+ {
|
|
|
+ CNodeSubscriberContainer &subscriber = subscribers.item(s);
|
|
|
+ if (subscriber.querySendValue())
|
|
|
+ {
|
|
|
+ if (1 != lastSendValue) // overkill unless many subscribers to same node
|
|
|
+ {
|
|
|
+ MemoryBuffer mb;
|
|
|
+ node->getPropBin(NULL, mb);
|
|
|
+ buildNotifyData(sendValueNotifyData.clear(), state, NULL, &mb);
|
|
|
+ lastSendValue = 1;
|
|
|
+ }
|
|
|
+ SDSManager->handleNotify(subscriber, sendValueNotifyData);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (0 != lastSendValue) // overkill unless many subscribers to same node
|
|
|
+ {
|
|
|
+ buildNotifyData(sendValueNotifyData.clear(), state, NULL, NULL);
|
|
|
+ lastSendValue = 0;
|
|
|
+ }
|
|
|
+ SDSManager->handleNotify(subscriber, sendValueNotifyData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ void _removeNode(CServerRemoteTree *node, SubscriptionId id)
|
|
|
+ {
|
|
|
+ CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
+ assertex(subscriberList);
|
|
|
+ ICopyArrayOf<CNodeSubscriberContainer> &subscribers = subscriberList->querySubscribers();
|
|
|
+ ForEachItemInRev(s, subscribers)
|
|
|
+ {
|
|
|
+ CNodeSubscriberContainer &subscriber = subscribers.item(s);
|
|
|
+ if (0 == id) // remove all associated subscribers (node being deleted)
|
|
|
+ {
|
|
|
+ ICopyArrayOf<CServerRemoteTree> &nodes = subscriber.queryNodes();
|
|
|
+ verifyex(nodes.zap(*node));
|
|
|
+ SubscriptionId sid = subscriber.queryId();
|
|
|
+ subscribers.remove(s);
|
|
|
+ if (0 == nodes.ordinality()) // IOW this was the last node this subscriber was associated with
|
|
|
+ subscribersById.remove(&sid);
|
|
|
+ }
|
|
|
+ else if (subscriber.queryId() == id)
|
|
|
+ subscribers.remove(s);
|
|
|
+ }
|
|
|
+ if (0 == subscribers.ordinality())
|
|
|
+ {
|
|
|
+ node->setSubscribed(false);
|
|
|
+ subscriberListByNode.removeExact(subscriberList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+public:
|
|
|
+ IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
+
|
|
|
+ CNodeSubscriptionManager(CCovenSDSManager &_owner) : owner(_owner) { }
|
|
|
+ void notify(CServerRemoteTree &node, PDState state)
|
|
|
+ {
|
|
|
+ // shouldn't be here, unless node is in subscribers table
|
|
|
+ CriticalBlock b(lock);
|
|
|
+ _notify(&node, state);
|
|
|
+ }
|
|
|
+ void notifyDelete(CServerRemoteTree &node)
|
|
|
+ {
|
|
|
+ // shouldn't be here, unless node is in subscribers table
|
|
|
+ CriticalBlock b(lock);
|
|
|
+ _notify(&node, PDS_Deleted);
|
|
|
+ _removeNode(&node, 0);
|
|
|
+ }
|
|
|
+ // ISubscriptionManager impl.
|
|
|
+ virtual void add(ISubscription *sub, SubscriptionId id)
|
|
|
+ {
|
|
|
+ CriticalBlock b(lock);
|
|
|
+ /* calls back out to owner to scan for match, so that SDSManager can protect root/treereg.
|
|
|
+ * It calls back (associateSubscriber) in this class to add subscribers based on matches.
|
|
|
+ */
|
|
|
+ owner.addNodeSubscriber(sub, id);
|
|
|
+ }
|
|
|
+ virtual void remove(SubscriptionId id)
|
|
|
+ {
|
|
|
+ CriticalBlock b(lock);
|
|
|
+ /* calls back out to owner to protect root/treereg.
|
|
|
+ * It calls back into removeSubscriberAssociation.
|
|
|
+ */
|
|
|
+ owner.removeNodeSubscriber(id);
|
|
|
+ }
|
|
|
+ virtual void removeSubscriberAssociation(SubscriptionId id) // Always called back from within remove() above.
|
|
|
+ {
|
|
|
+ CNodeSubscriberContainer *subscriber = subscribersById.find(id);
|
|
|
+ if (!subscriber)
|
|
|
+ return; // may not exist if removed already
|
|
|
+ ICopyArrayOf<CServerRemoteTree> &nodes = subscriber->queryNodes();
|
|
|
+ ForEachItemIn(n, nodes)
|
|
|
+ {
|
|
|
+ CServerRemoteTree &node = nodes.item(n);
|
|
|
+ _removeNode(&node, id);
|
|
|
+ }
|
|
|
+ verifyex(subscribersById.removeExact(subscriber));
|
|
|
+ }
|
|
|
+ void associateSubscriber(CNodeSubscriberContainer &subscriber) // Always called back from within add() above.
|
|
|
+ {
|
|
|
+ /* caller has established there are matches and added them to 'subscriber'
|
|
|
+ * add to HT's
|
|
|
+ */
|
|
|
+ verifyex(subscribersById.add(*LINK(&subscriber)));
|
|
|
+ ICopyArrayOf<CServerRemoteTree> &nodes = subscriber.queryNodes();
|
|
|
+ ForEachItemIn(n, nodes)
|
|
|
+ {
|
|
|
+ CServerRemoteTree *node = &nodes.item(n);
|
|
|
+ CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
+ if (!subscriberList)
|
|
|
+ {
|
|
|
+ subscriberList = new CNodeSubscriberContainerList(node);
|
|
|
+ verifyex(subscriberListByNode.add(* subscriberList));
|
|
|
+ }
|
|
|
+ subscriberList->add(subscriber);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ MemoryBuffer &collectSubscribers(MemoryBuffer &out) const
|
|
|
+ {
|
|
|
+ out.append(subscribersById.count());
|
|
|
+ SuperHashIteratorOf<CNodeSubscriberContainer> sdsNodeIter(subscribersById);
|
|
|
+ ForEach(sdsNodeIter)
|
|
|
+ sdsNodeIter.query().getInfo(out);
|
|
|
+ return out;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
#if defined(_WIN32) && defined(__old_new)
|
|
|
#define new __old_new
|
|
|
#endif
|
|
@@ -2845,8 +3100,9 @@ PDState CServerRemoteTree::processData(IPropertyTree &changeTree, Owned<CBranchC
|
|
|
if (childChanges.getPropBool("@new"))
|
|
|
{
|
|
|
child = (CServerRemoteTree *)createChild(childChanges.getPropInt("@pos", -1), childChanges.queryProp("@name"));
|
|
|
- mergePDState(res, PDS_Structure);
|
|
|
newIds.append(child->queryServerId());
|
|
|
+ mergePDState(localChange, PDS_Added);
|
|
|
+ mergePDState(res, PDS_Added);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -2876,6 +3132,8 @@ PDState CServerRemoteTree::processData(IPropertyTree &changeTree, Owned<CBranchC
|
|
|
else
|
|
|
branchChange->noteChange(localChange, res);
|
|
|
|
|
|
+ if ((localChange != PDS_None) && isSubscribed())
|
|
|
+ SDSManager->notifyNode(*this, localChange);
|
|
|
if (!parentBranchChange.get())
|
|
|
parentBranchChange.setown(branchChange.getClear());
|
|
|
else
|
|
@@ -5511,7 +5769,9 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const
|
|
|
}
|
|
|
registerSubscriptionManager(SDS_PUBLISHER, this);
|
|
|
connectionSubscriptionManager.setown(new CConnectionSubscriptionManager());
|
|
|
+ nodeSubscriptionManager.setown(new CNodeSubscriptionManager(*this));
|
|
|
registerSubscriptionManager(SDSCONN_PUBLISHER, connectionSubscriptionManager.get());
|
|
|
+ registerSubscriptionManager(SDSNODE_PUBLISHER, nodeSubscriptionManager);
|
|
|
|
|
|
// add external handlers
|
|
|
Owned<CXMLFileExternal> xmlExternalHandler = new CXMLFileExternal(dataPath, backupHandler);
|
|
@@ -6407,11 +6667,30 @@ SubscriptionId CCovenSDSManager::subscribe(const char *xpath, ISDSSubscription &
|
|
|
return subscriber->getId();
|
|
|
}
|
|
|
|
|
|
+SubscriptionId CCovenSDSManager::subscribeExact(const char *xpath, ISDSNodeSubscription ¬ify, bool sendValue)
|
|
|
+{
|
|
|
+ assertex(xpath);
|
|
|
+ StringBuffer s;
|
|
|
+ if ('/' != *xpath)
|
|
|
+ {
|
|
|
+ s.append('/').append(xpath);
|
|
|
+ xpath = s.str();
|
|
|
+ }
|
|
|
+ CSDSNodeSubscriberProxy *subscriber = new CSDSNodeSubscriberProxy(xpath, sendValue, notify);
|
|
|
+ querySubscriptionManager(SDSNODE_PUBLISHER)->add(subscriber, subscriber->getId());
|
|
|
+ return subscriber->getId();
|
|
|
+}
|
|
|
+
|
|
|
void CCovenSDSManager::unsubscribe(SubscriptionId id)
|
|
|
{
|
|
|
querySubscriptionManager(SDS_PUBLISHER)->remove(id);
|
|
|
}
|
|
|
|
|
|
+void CCovenSDSManager::unsubscribeExact(SubscriptionId id)
|
|
|
+{
|
|
|
+ querySubscriptionManager(SDSNODE_PUBLISHER)->remove(id);
|
|
|
+}
|
|
|
+
|
|
|
bool CCovenSDSManager::removeNotifyHandler(const char *handlerKey)
|
|
|
{
|
|
|
return nodeNotifyHandlers.remove(handlerKey);
|
|
@@ -7575,6 +7854,16 @@ StringBuffer &formatSubscriberInfo(MemoryBuffer &src, StringBuffer &out)
|
|
|
return out;
|
|
|
}
|
|
|
|
|
|
+StringBuffer &formatNodeSubscriberInfo(MemoryBuffer &src, StringBuffer &out)
|
|
|
+{
|
|
|
+ SubscriptionId subscriptionId;
|
|
|
+ StringAttr xpath;
|
|
|
+ unsigned nodeCount;
|
|
|
+ src.read(subscriptionId).read(xpath).read(nodeCount);
|
|
|
+ out.append("SubscriptionId=").appendf("%"I64F"x", subscriptionId).append(", xpath=").append(xpath).append(", nodes=").append(nodeCount);
|
|
|
+ return out;
|
|
|
+}
|
|
|
+
|
|
|
StringBuffer &formatConnections(MemoryBuffer &src, StringBuffer &out)
|
|
|
{
|
|
|
unsigned count;
|
|
@@ -7594,18 +7883,25 @@ StringBuffer &formatConnections(MemoryBuffer &src, StringBuffer &out)
|
|
|
|
|
|
StringBuffer &formatSubscribers(MemoryBuffer &src, StringBuffer &out)
|
|
|
{
|
|
|
- unsigned count;
|
|
|
- src.read(count);
|
|
|
- if (count)
|
|
|
+ unsigned sdsSubscribers, sdsNodeSubscribers=0;
|
|
|
+ src.read(sdsSubscribers);
|
|
|
+ unsigned s = sdsSubscribers;
|
|
|
+ while (s--)
|
|
|
{
|
|
|
- while (count--)
|
|
|
+ formatSubscriberInfo(src, out);
|
|
|
+ if (s) out.newline();
|
|
|
+ }
|
|
|
+ if (src.remaining())
|
|
|
+ {
|
|
|
+ src.read(sdsNodeSubscribers);
|
|
|
+ s = sdsNodeSubscribers;
|
|
|
+ while (s--)
|
|
|
{
|
|
|
- formatSubscriberInfo(src, out);
|
|
|
- if (count) out.newline();
|
|
|
+ formatNodeSubscriberInfo(src, out);
|
|
|
+ if (s) out.newline();
|
|
|
}
|
|
|
}
|
|
|
- else
|
|
|
- out.append("No current subscriptions");
|
|
|
+ out.newline().appendf("%d xpath subscribers, %d node subscribers\n", sdsSubscribers, sdsNodeSubscribers);
|
|
|
return out;
|
|
|
}
|
|
|
|
|
@@ -7627,11 +7923,6 @@ unsigned CCovenSDSManager::countActiveLocks()
|
|
|
return activeLocks;
|
|
|
}
|
|
|
|
|
|
-unsigned CCovenSDSManager::countSubscribers() const
|
|
|
-{
|
|
|
- return subscribers.count();
|
|
|
-}
|
|
|
-
|
|
|
MemoryBuffer &CCovenSDSManager::collectUsageStats(MemoryBuffer &out)
|
|
|
{
|
|
|
{ CHECKEDCRITICALBLOCK(cTableCrit, fakeCritTimeout);
|
|
@@ -7666,9 +7957,10 @@ MemoryBuffer &CCovenSDSManager::collectSubscribers(MemoryBuffer &out)
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(sTableCrit, fakeCritTimeout);
|
|
|
out.append(subscribers.count());
|
|
|
- SuperHashIteratorOf<CSubscriberContainer> iter(subscribers.queryBaseTable());
|
|
|
- ForEach(iter)
|
|
|
- iter.query().getInfo(out);
|
|
|
+ SuperHashIteratorOf<CSubscriberContainer> sdsIter(subscribers.queryBaseTable());
|
|
|
+ ForEach(sdsIter)
|
|
|
+ sdsIter.query().getInfo(out);
|
|
|
+ nodeSubscriptionManager->collectSubscribers(out);
|
|
|
return out;
|
|
|
}
|
|
|
|
|
@@ -7765,7 +8057,7 @@ void CCovenSDSManager::handleNodeNotify(notifications n, CServerRemoteTree &tree
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CCovenSDSManager::handleNotify(CSubscriberContainer &subscriber, PDState state, CPTStack &stack, MemoryBuffer *data)
|
|
|
+void CCovenSDSManager::handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer ¬ifyData)
|
|
|
{
|
|
|
class CNotifyPoolFactory : public CInterface, public IThreadFactory
|
|
|
{
|
|
@@ -7809,18 +8101,6 @@ void CCovenSDSManager::handleNotify(CSubscriberContainer &subscriber, PDState st
|
|
|
factory->Release();
|
|
|
}
|
|
|
|
|
|
- MemoryBuffer notifyData;
|
|
|
- buildNotifyData(stack, notifyData);
|
|
|
- notifyData.append(translatePDState(state));
|
|
|
- if (data)
|
|
|
- {
|
|
|
- notifyData.append(true);
|
|
|
- notifyData.append(data->length());
|
|
|
- notifyData.append(*data);
|
|
|
- }
|
|
|
- else
|
|
|
- notifyData.append(false);
|
|
|
-
|
|
|
Owned<CSubscriberNotifier> _notifier;
|
|
|
{ CHECKEDCRITICALBLOCK(nfyTableCrit, fakeCritTimeout);
|
|
|
SubscriptionId id = subscriber.queryId();
|
|
@@ -7936,6 +8216,7 @@ public:
|
|
|
bool sub;
|
|
|
if (prune(xpath.str(), sub, pruned))
|
|
|
{
|
|
|
+ MemoryBuffer notifyData;
|
|
|
if (sub)
|
|
|
{
|
|
|
ForEachItemInRev(s, subs)
|
|
@@ -7944,7 +8225,11 @@ public:
|
|
|
if (!subscriber.isUnsubscribed())
|
|
|
{
|
|
|
if (subscriber.qualify(stack))
|
|
|
- SDSManager->handleNotify(subscriber, state, stack);
|
|
|
+ {
|
|
|
+ if (0 == notifyData.length())
|
|
|
+ buildNotifyData(notifyData, state, &stack, NULL);
|
|
|
+ SDSManager->handleNotify(subscriber, notifyData);
|
|
|
+ }
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
|
}
|
|
@@ -7961,7 +8246,11 @@ public:
|
|
|
if (!subscriber.isUnsubscribed())
|
|
|
{
|
|
|
if (subscriber.qualify(stack))
|
|
|
- SDSManager->handleNotify(subscriber, state, stack);
|
|
|
+ {
|
|
|
+ if (0 == notifyData.length())
|
|
|
+ buildNotifyData(notifyData, state, &stack, NULL);
|
|
|
+ SDSManager->handleNotify(subscriber, notifyData);
|
|
|
+ }
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
|
}
|
|
@@ -8005,8 +8294,10 @@ public:
|
|
|
else if (sub) // xpath matched some subscribers, and/or below some, need to check for sub subscribers
|
|
|
{
|
|
|
bool ret = false;
|
|
|
+ MemoryBuffer notifyData;
|
|
|
if (changes.state && changes.local)
|
|
|
{
|
|
|
+ int lastSendValue = -1;
|
|
|
ForEachItemInRev(s, subs)
|
|
|
{
|
|
|
CSubscriberContainer &subscriber = subs.item(s);
|
|
@@ -8016,12 +8307,23 @@ public:
|
|
|
{
|
|
|
if (subscriber.querySendValue())
|
|
|
{
|
|
|
- MemoryBuffer mb;
|
|
|
- changes.tree->getPropBin(NULL, mb);
|
|
|
- SDSManager->handleNotify(subscriber, changes.state, stack, &mb);
|
|
|
+ if (1 != lastSendValue)
|
|
|
+ {
|
|
|
+ MemoryBuffer mb;
|
|
|
+ changes.tree->getPropBin(NULL, mb);
|
|
|
+ buildNotifyData(notifyData.clear(), changes.state, &stack, &mb);
|
|
|
+ lastSendValue = 1;
|
|
|
+ }
|
|
|
}
|
|
|
else
|
|
|
- SDSManager->handleNotify(subscriber, changes.state, stack);
|
|
|
+ {
|
|
|
+ if (0 != lastSendValue)
|
|
|
+ {
|
|
|
+ buildNotifyData(notifyData.clear(), changes.state, &stack, NULL);
|
|
|
+ lastSendValue = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ SDSManager->handleNotify(subscriber, notifyData);
|
|
|
}
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
@@ -8241,6 +8543,51 @@ bool CCovenSDSManager::fireException(IException *e)
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+void CCovenSDSManager::addNodeSubscriber(ISubscription *sub, SubscriptionId id)
|
|
|
+{
|
|
|
+ MemoryBuffer mb;
|
|
|
+ mb.setBuffer(sub->queryData().length(), (void *)sub->queryData().get());
|
|
|
+ StringAttr xpath;
|
|
|
+ bool sendValue;
|
|
|
+ mb.read(xpath);
|
|
|
+ mb.read(sendValue);
|
|
|
+
|
|
|
+ CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
|
|
|
+ CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
|
|
|
+ Owned<IPropertyTreeIterator> iter = root->getElements(xpath+1);
|
|
|
+ if (!iter->first())
|
|
|
+ throw MakeSDSException(SDSExcpt_SubscriptionNoMatch, "Failed to match any nodes: %s", xpath.get());
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Owned<CNodeSubscriberContainer> subscriber = new CNodeSubscriberContainer(sub, id, sendValue, xpath);
|
|
|
+ do
|
|
|
+ {
|
|
|
+ CServerRemoteTree &node = (CServerRemoteTree &)iter->query();
|
|
|
+ node.setSubscribed(true);
|
|
|
+ subscriber->add(node);
|
|
|
+ }
|
|
|
+ while (iter->next());
|
|
|
+ nodeSubscriptionManager->associateSubscriber(*subscriber);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CCovenSDSManager::removeNodeSubscriber(SubscriptionId id)
|
|
|
+{
|
|
|
+ CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
|
|
|
+ CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
|
|
|
+ nodeSubscriptionManager->removeSubscriberAssociation(id);
|
|
|
+}
|
|
|
+
|
|
|
+void CCovenSDSManager::notifyNodeDelete(CServerRemoteTree &node)
|
|
|
+{
|
|
|
+ nodeSubscriptionManager->notifyDelete(node);
|
|
|
+}
|
|
|
+
|
|
|
+void CCovenSDSManager::notifyNode(CServerRemoteTree &node, PDState state)
|
|
|
+{
|
|
|
+ nodeSubscriptionManager->notify(node, state);
|
|
|
+}
|
|
|
+
|
|
|
///////////////////////
|
|
|
|
|
|
class CDaliSDSServer: public CInterface, public IDaliServer
|