|
@@ -1911,7 +1911,7 @@ interface INodeSubscriptionManager : extends ISubscriptionManager
|
|
|
{
|
|
|
virtual void associateSubscriber(CNodeSubscriberContainer &subscriber) = 0;
|
|
|
virtual void removeSubscriberAssociation(SubscriptionId id) = 0;
|
|
|
- virtual void notifyDelete(CServerRemoteTree &node) = 0;
|
|
|
+ virtual void notifyDelete(CServerRemoteTree *node) = 0;
|
|
|
virtual void notify(CServerRemoteTree &node, PDState state) = 0;
|
|
|
virtual MemoryBuffer &collectSubscribers(MemoryBuffer &out) const = 0;
|
|
|
};
|
|
@@ -1942,7 +1942,7 @@ public:
|
|
|
CLockInfo *queryLockInfo(__int64 id) { return lockTable.find(&id); }
|
|
|
CSubscriberTable &querySubscriberTable() { return subscribers; }
|
|
|
IExternalHandler *queryExternalHandler(const char *handler) { if (!handler) return NULL; CExternalHandlerMapping *mapping = externalHandlers.find(handler); return mapping ? &mapping->query() : NULL; }
|
|
|
- void handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer ¬ifyData);
|
|
|
+ void handleNotify(CSubscriberContainerBase *subscriber, MemoryBuffer ¬ifyData);
|
|
|
void startNotification(IPropertyTree &changeTree, CPTStack &stack, CBranchChange &changes); // subscription notification
|
|
|
MemoryBuffer &collectUsageStats(MemoryBuffer &out);
|
|
|
MemoryBuffer &collectConnections(MemoryBuffer &out);
|
|
@@ -2702,6 +2702,11 @@ public:
|
|
|
}
|
|
|
const void *queryFindParam() const { return &node; }
|
|
|
ICopyArrayOf<CNodeSubscriberContainer> &querySubscribers() { return subscribers; }
|
|
|
+ void getSubscribers(IArrayOf<CNodeSubscriberContainer> &linkedSubscribers)
|
|
|
+ {
|
|
|
+ ForEachItemIn(s, subscribers)
|
|
|
+ linkedSubscribers.append(*LINK(&subscribers.item(s)));
|
|
|
+ }
|
|
|
void add(CNodeSubscriberContainer &subscriber) { subscribers.append(subscriber); }
|
|
|
};
|
|
|
|
|
@@ -2710,17 +2715,14 @@ public:
|
|
|
OwningSimpleHashTableOf<CNodeSubscriberContainerList, CServerRemoteTree *> subscriberListByNode;
|
|
|
CriticalSection lock;
|
|
|
|
|
|
- void _notify(CServerRemoteTree *node, PDState state)
|
|
|
+ void _notify(CServerRemoteTree *node, PDState state, IArrayOf<CNodeSubscriberContainer> &subscribers)
|
|
|
{
|
|
|
MemoryBuffer sendValueNotifyData;
|
|
|
- CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
- assertex(subscriberList);
|
|
|
- ICopyArrayOf<CNodeSubscriberContainer> &subscribers = subscriberList->querySubscribers();
|
|
|
int lastSendValue = -1;
|
|
|
- ForEachItemIn(s, subscribers)
|
|
|
+ while (subscribers.ordinality())
|
|
|
{
|
|
|
- CNodeSubscriberContainer &subscriber = subscribers.item(s);
|
|
|
- if (subscriber.querySendValue())
|
|
|
+ Owned<CNodeSubscriberContainer> subscriber = &subscribers.popGet();
|
|
|
+ if (subscriber->querySendValue())
|
|
|
{
|
|
|
if (1 != lastSendValue) // overkill unless many subscribers to same node
|
|
|
{
|
|
@@ -2729,7 +2731,7 @@ public:
|
|
|
buildNotifyData(sendValueNotifyData.clear(), state, NULL, &mb);
|
|
|
lastSendValue = 1;
|
|
|
}
|
|
|
- SDSManager->handleNotify(subscriber, sendValueNotifyData);
|
|
|
+ SDSManager->handleNotify(subscriber.getClear(), sendValueNotifyData);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -2738,10 +2740,18 @@ public:
|
|
|
buildNotifyData(sendValueNotifyData.clear(), state, NULL, NULL);
|
|
|
lastSendValue = 0;
|
|
|
}
|
|
|
- SDSManager->handleNotify(subscriber, sendValueNotifyData);
|
|
|
+ SDSManager->handleNotify(subscriber.getClear(), sendValueNotifyData);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ void _notify(CServerRemoteTree *node, PDState state)
|
|
|
+ {
|
|
|
+ CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
+ assertex(subscriberList);
|
|
|
+ IArrayOf<CNodeSubscriberContainer> subscribers;
|
|
|
+ subscriberList->getSubscribers(subscribers);
|
|
|
+ _notify(node, state, subscribers);
|
|
|
+ }
|
|
|
void _removeNode(CServerRemoteTree *node, SubscriptionId id)
|
|
|
{
|
|
|
CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
@@ -2754,10 +2764,9 @@ public:
|
|
|
{
|
|
|
ICopyArrayOf<CServerRemoteTree> &nodes = subscriber.queryNodes();
|
|
|
verifyex(nodes.zap(*node));
|
|
|
- SubscriptionId sid = subscriber.queryId();
|
|
|
subscribers.remove(s);
|
|
|
if (0 == nodes.ordinality()) // IOW this was the last node this subscriber was associated with
|
|
|
- subscribersById.remove(&sid);
|
|
|
+ subscribersById.removeExact(&subscriber);
|
|
|
}
|
|
|
else if (subscriber.queryId() == id)
|
|
|
subscribers.remove(s);
|
|
@@ -2778,12 +2787,24 @@ public:
|
|
|
CriticalBlock b(lock);
|
|
|
_notify(&node, state);
|
|
|
}
|
|
|
- void notifyDelete(CServerRemoteTree &node)
|
|
|
+ void notifyDelete(CServerRemoteTree *node)
|
|
|
{
|
|
|
// shouldn't be here, unless node is in subscribers table
|
|
|
CriticalBlock b(lock);
|
|
|
- _notify(&node, PDS_Deleted);
|
|
|
- _removeNode(&node, 0);
|
|
|
+ /* Need to be careful not to release subscribers here (on this thread)
|
|
|
+ * 1) gather subscribers(linked)
|
|
|
+ * 2) remove nodes and lists, so no longer in use by SDS
|
|
|
+ * 3) Hand ownership over to notification mechanism
|
|
|
+ *
|
|
|
+ * Subscribers will be released when notification is done with them.
|
|
|
+ */
|
|
|
+ CNodeSubscriberContainerList *subscriberList = subscriberListByNode.find(node);
|
|
|
+ assertex(subscriberList);
|
|
|
+ IArrayOf<CNodeSubscriberContainer> linkedSubscribers;
|
|
|
+ subscriberList->getSubscribers(linkedSubscribers);
|
|
|
+ _removeNode(node, 0);
|
|
|
+ // NB: Notification will take ownership of subscribers being notified.
|
|
|
+ _notify(node, PDS_Deleted, linkedSubscribers);
|
|
|
}
|
|
|
// ISubscriptionManager impl.
|
|
|
virtual void add(ISubscription *sub, SubscriptionId id)
|
|
@@ -8116,8 +8137,9 @@ void CCovenSDSManager::handleNodeNotify(notifications n, CServerRemoteTree &tree
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CCovenSDSManager::handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer ¬ifyData)
|
|
|
+void CCovenSDSManager::handleNotify(CSubscriberContainerBase *_subscriber, MemoryBuffer ¬ifyData)
|
|
|
{
|
|
|
+ Owned<CSubscriberContainerBase> subscriber = _subscriber;
|
|
|
class CNotifyPoolFactory : public CInterface, public IThreadFactory
|
|
|
{
|
|
|
class CNotifyHandler : public CInterface, implements IPooledThread
|
|
@@ -8162,7 +8184,7 @@ void CCovenSDSManager::handleNotify(CSubscriberContainerBase &subscriber, Memory
|
|
|
|
|
|
Owned<CSubscriberNotifier> _notifier;
|
|
|
{ CHECKEDCRITICALBLOCK(nfyTableCrit, fakeCritTimeout);
|
|
|
- SubscriptionId id = subscriber.queryId();
|
|
|
+ SubscriptionId id = subscriber->queryId();
|
|
|
CSubscriberNotifier *notifier = subscriberNotificationTable.find(id);
|
|
|
if (notifier)
|
|
|
{
|
|
@@ -8171,7 +8193,7 @@ void CCovenSDSManager::handleNotify(CSubscriberContainerBase &subscriber, Memory
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _notifier.setown(new CSubscriberNotifier(subscriberNotificationTable, subscriber, notifyData));
|
|
|
+ _notifier.setown(new CSubscriberNotifier(subscriberNotificationTable, *subscriber, notifyData));
|
|
|
subscriberNotificationTable.replace(*_notifier);
|
|
|
}
|
|
|
}
|
|
@@ -8312,7 +8334,7 @@ public:
|
|
|
{
|
|
|
if (0 == notifyData.length())
|
|
|
buildNotifyData(notifyData, state, &stack, NULL);
|
|
|
- SDSManager->handleNotify(subscriber, notifyData);
|
|
|
+ SDSManager->handleNotify(LINK(&subscriber), notifyData);
|
|
|
}
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
@@ -8333,7 +8355,7 @@ public:
|
|
|
{
|
|
|
if (0 == notifyData.length())
|
|
|
buildNotifyData(notifyData, state, &stack, NULL);
|
|
|
- SDSManager->handleNotify(subscriber, notifyData);
|
|
|
+ SDSManager->handleNotify(LINK(&subscriber), notifyData);
|
|
|
}
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
@@ -8406,7 +8428,7 @@ public:
|
|
|
lastSendValue = 0;
|
|
|
}
|
|
|
}
|
|
|
- SDSManager->handleNotify(subscriber, notifyData);
|
|
|
+ SDSManager->handleNotify(LINK(&subscriber), notifyData);
|
|
|
}
|
|
|
else
|
|
|
pruned.append(*LINK(&subscriber));
|
|
@@ -8640,7 +8662,7 @@ void CCovenSDSManager::removeNodeSubscriber(SubscriptionId id)
|
|
|
|
|
|
void CCovenSDSManager::notifyNodeDelete(CServerRemoteTree &node)
|
|
|
{
|
|
|
- nodeSubscriptionManager->notifyDelete(node);
|
|
|
+ nodeSubscriptionManager->notifyDelete(&node);
|
|
|
}
|
|
|
|
|
|
void CCovenSDSManager::notifyNode(CServerRemoteTree &node, PDState state)
|