Browse Source

Merge remote-tracking branch 'origin/candidate-4.2.2' into closedown-4.2.x

Conflicts:
	version.cmake

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
5aa4c4e8d3
1 changed files with 26 additions and 29 deletions
  1. 26 29
      dali/base/dasess.cpp

+ 26 - 29
dali/base/dasess.cpp

@@ -1228,7 +1228,7 @@ public:
     }
     ~CCovenSessionManager()
     {
-        stubs.kill();
+        stubTable.kill();
     }
 
 
@@ -1576,20 +1576,10 @@ protected:
             mb.append(abort);
             subs->notify(mb); 
         }
-
+        const void *queryFindParam() const { return &id; }
     };
 
-    CIArrayOf<CSessionSubscriptionStub> stubs; 
-
-    unsigned findsub(SubscriptionId id)
-    {
-        ForEachItemIn(i,stubs) {
-            CSessionSubscriptionStub &stub = stubs.item(i);
-            if (stub.getId()==id)
-                return i;
-        }
-        return NotFound;
-    }
+    OwningSimpleHashTableOf<CSessionSubscriptionStub, SubscriptionId> stubTable;
 
     void add(ISubscription *subs,SubscriptionId id)
     {
@@ -1597,8 +1587,9 @@ protected:
         {
             CHECKEDCRITICALBLOCK(sessmanagersect,60000);
             nstub = new CSessionSubscriptionStub(subs,id);
-            if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId)) {
-                stubs.append(*nstub);
+            if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId))
+            {
+                stubTable.replace(*nstub);
                 return;
             }
         }
@@ -1615,9 +1606,7 @@ protected:
     void remove(SubscriptionId id)
     {
         CHECKEDCRITICALBLOCK(sessmanagersect,60000);
-        unsigned i=findsub(id);
-        if (i!=NotFound) 
-            stubs.remove(i);
+        stubTable.remove(&id);
     }
 
     void stopSession(SessionId id, bool abort)
@@ -1625,27 +1614,35 @@ protected:
         PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok");
         CHECKEDCRITICALBLOCK(sessmanagersect,60000);
         // do in multiple stages as may remove one or more sub sussions
-        loop {
+        loop
+        {
             CIArrayOf<CSessionSubscriptionStub> tonotify;
-            for (unsigned i=stubs.ordinality();i;) {
-                CSessionSubscriptionStub &stub = stubs.item(--i);
-                if (stub.getSessionId()==id) {
-                    stubs.remove(i, true);
-                    tonotify.append(stub);
-                }
+            SuperHashIteratorOf<CSessionSubscriptionStub> iter(stubTable);
+            ForEach(iter)
+            {
+                CSessionSubscriptionStub &stub = iter.query();
+                if (stub.getSessionId()==id)
+                    tonotify.append(*LINK(&stub));
             }
             if (tonotify.ordinality()==0)
                 break;
-            CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
-            ForEachItemIn(j,tonotify) {
+            ForEachItemIn(j,tonotify)
+            {
                 CSessionSubscriptionStub &stub = tonotify.item(j);
-                try { stub.notify(abort); }
+                stubTable.removeExact(&stub);
+            }
+            CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
+            ForEachItemIn(j2,tonotify)
+            {
+                CSessionSubscriptionStub &stub = tonotify.item(j2);
+                try { stub.notify(abort);}
                 catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
             }
             tonotify.kill(); // clear whilst sessmanagersect unblocked, as subs may query session manager.
         }
         const CSessionState *state = sessionstates.query(id);
-        if (state) {
+        if (state)
+        {
             const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
             if (pstate) 
                 processlookup.remove(&pstate->queryNode(),this);