|
@@ -227,7 +227,7 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- const CSessionState *query(SessionId id)
|
|
|
+ CSessionState *query(SessionId id)
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(sessstatesect,60000);
|
|
|
return SuperHashTableOf<CSessionState,SessionId>::find(&id);
|
|
@@ -245,6 +245,7 @@ class CProcessSessionState: public CSessionState
|
|
|
{
|
|
|
INode *node;
|
|
|
DaliClientRole role;
|
|
|
+ UInt64Array previousSessionIds;
|
|
|
public:
|
|
|
CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role)
|
|
|
: CSessionState(id)
|
|
@@ -261,17 +262,42 @@ public:
|
|
|
{
|
|
|
return *node;
|
|
|
}
|
|
|
-
|
|
|
DaliClientRole queryRole() const
|
|
|
{
|
|
|
return role;
|
|
|
}
|
|
|
-
|
|
|
StringBuffer &getDetails(StringBuffer &buf)
|
|
|
{
|
|
|
StringBuffer ep;
|
|
|
return buf.appendf("%16"I64F"X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role));
|
|
|
}
|
|
|
+ void addSessionIds(CProcessSessionState &other, bool prevOnly)
|
|
|
+ {
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ SessionId id = other.dequeuePreviousSessionId();
|
|
|
+ if (!id)
|
|
|
+ break;
|
|
|
+ previousSessionIds.append(id);
|
|
|
+ }
|
|
|
+ if (!prevOnly)
|
|
|
+ previousSessionIds.append(other.getId());
|
|
|
+ }
|
|
|
+ SessionId dequeuePreviousSessionId()
|
|
|
+ {
|
|
|
+ if (!previousSessionIds.ordinality())
|
|
|
+ return 0;
|
|
|
+ return previousSessionIds.pop();
|
|
|
+ }
|
|
|
+ unsigned previousSessionIdCount() const
|
|
|
+ {
|
|
|
+ return previousSessionIds.ordinality();
|
|
|
+ }
|
|
|
+ void removeOldSessionId(SessionId id)
|
|
|
+ {
|
|
|
+ if (previousSessionIds.zap(id))
|
|
|
+ PROGLOG("Removed old sessionId (%"I64F"x) from current process state", id);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CMapProcessToSession: private SuperHashTableOf<CProcessSessionState,INode>
|
|
@@ -326,21 +352,28 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ void replace(CProcessSessionState *e)
|
|
|
+ {
|
|
|
+ CHECKEDCRITICALBLOCK(mapprocesssect,60000);
|
|
|
+ SuperHashTableOf<CProcessSessionState,INode>::replace(*e);
|
|
|
+ }
|
|
|
+
|
|
|
CProcessSessionState *query(INode *n)
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
|
|
|
return SuperHashTableOf<CProcessSessionState,INode>::find(n);
|
|
|
}
|
|
|
|
|
|
- void remove(INode *n,ISessionManagerServer *manager)
|
|
|
+ bool remove(const CProcessSessionState *state, ISessionManagerServer *manager)
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
|
|
|
- CProcessSessionState *sstate = SuperHashTableOf<CProcessSessionState,INode>::find(n);
|
|
|
- if (sstate) {
|
|
|
+ if (SuperHashTableOf<CProcessSessionState,INode>::removeExact((CProcessSessionState *)state))
|
|
|
+ {
|
|
|
if (manager)
|
|
|
- manager->authorizeConnection(sstate->queryRole(),true);
|
|
|
- SuperHashTableOf<CProcessSessionState,INode>::removeExact(sstate);
|
|
|
+ manager->authorizeConnection(state->queryRole(), true);
|
|
|
+ return true;
|
|
|
}
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
unsigned count()
|
|
@@ -1211,7 +1244,6 @@ class CCovenSessionManager: public CSessionManagerBase, implements ISessionManag
|
|
|
// no fail currently
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
@@ -1279,13 +1311,22 @@ public:
|
|
|
PROGLOG("Session starting %"I64F"x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role));
|
|
|
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
|
|
|
CProcessSessionState *s = new CProcessSessionState(id,client,role);
|
|
|
- while (!sessionstates.add(s)) { // takes ownership
|
|
|
+ while (!sessionstates.add(s)) // takes ownership
|
|
|
+ {
|
|
|
WARNLOG("Dali session manager: session already registered");
|
|
|
sessionstates.remove(id);
|
|
|
}
|
|
|
- while (!processlookup.add(s)) {
|
|
|
- ERRLOG("Dali session manager: registerClient process session already registered");
|
|
|
- processlookup.remove(client,this);
|
|
|
+ while (!processlookup.add(s))
|
|
|
+ {
|
|
|
+ /* There's existing ip:port match (client) in process table..
|
|
|
+ * Old may be in process of closing or about to, but new has beaten the onClose() to it..
|
|
|
+ * Track old sessions in new CProcessSessionState instance, so that upcoming onClose() can find them
|
|
|
+ */
|
|
|
+ CProcessSessionState *previousState = processlookup.query(client);
|
|
|
+ dbgassertex(previousState); // Must be there, it's reason add() failed
|
|
|
+ s->addSessionIds(*previousState, false); // merges sessions from previous process state into new one that replaces it
|
|
|
+ WARNLOG("Dali session manager: registerClient process session already registered, old replaced");
|
|
|
+ processlookup.remove(previousState, this);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1613,7 +1654,7 @@ protected:
|
|
|
{
|
|
|
PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok");
|
|
|
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
|
|
|
- // do in multiple stages as may remove one or more sub sussions
|
|
|
+ // do in multiple stages as may remove one or more sub sessions
|
|
|
loop
|
|
|
{
|
|
|
CIArrayOf<CSessionSubscriptionStub> tonotify;
|
|
@@ -1635,7 +1676,7 @@ protected:
|
|
|
ForEachItemIn(j2,tonotify)
|
|
|
{
|
|
|
CSessionSubscriptionStub &stub = tonotify.item(j2);
|
|
|
- try { stub.notify(abort);}
|
|
|
+ try { stub.notify(abort); }
|
|
|
catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
|
|
|
}
|
|
|
tonotify.kill(); // clear whilst sessmanagersect unblocked, as subs may query session manager.
|
|
@@ -1643,31 +1684,76 @@ protected:
|
|
|
const CSessionState *state = sessionstates.query(id);
|
|
|
if (state)
|
|
|
{
|
|
|
- const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
|
|
|
- if (pstate)
|
|
|
- processlookup.remove(&pstate->queryNode(),this);
|
|
|
+ const CProcessSessionState *pState = QUERYINTERFACE(state, const CProcessSessionState);
|
|
|
+ if (pState)
|
|
|
+ {
|
|
|
+ CProcessSessionState *cState = processlookup.query(&pState->queryNode()); // get current
|
|
|
+ if (pState == cState) // so is current one.
|
|
|
+ {
|
|
|
+ /* This is reinstating a previous CProcessSessionState for this node (if there is one),
|
|
|
+ * that has not yet stopped, and adding any other pending process states to the CProcessSessionState
|
|
|
+ * being reinstated.
|
|
|
+ */
|
|
|
+ SessionId prevId = cState->dequeuePreviousSessionId();
|
|
|
+ if (prevId)
|
|
|
+ {
|
|
|
+ CSessionState *prevSessionState = sessionstates.query(prevId);
|
|
|
+ dbgassertex(prevSessionState); // must be there
|
|
|
+ CProcessSessionState *prevProcessState = QUERYINTERFACE(prevSessionState, CProcessSessionState);
|
|
|
+ dbgassertex(prevSessionState);
|
|
|
+ /* NB: prevProcessState's have 0 entries in their previousSessionIds, since they were merged at replacement time
|
|
|
+ * in addProcessSession()
|
|
|
+ */
|
|
|
+ prevProcessState->addSessionIds(*cState, true); // add in any remaining
|
|
|
+ processlookup.replace(prevProcessState);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ processlookup.remove(pState, this);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (processlookup.remove(pState, this)) // old may have been removed when replaced
|
|
|
+ {
|
|
|
+ if (cState)
|
|
|
+ {
|
|
|
+ PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
|
|
|
+ cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
sessionstates.remove(id);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void onClose(SocketEndpoint &ep)
|
|
|
{
|
|
|
- StringBuffer str;
|
|
|
- PROGLOG("Client closed (%s)",ep.getUrlStr(str).str());
|
|
|
+ StringBuffer clientStr;
|
|
|
+ PROGLOG("Client closed (%s)", ep.getUrlStr(clientStr).str());
|
|
|
+
|
|
|
SessionId idtostop;
|
|
|
{
|
|
|
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
|
|
|
Owned<INode> node = createINode(ep);
|
|
|
- if (queryCoven().inCoven(node)) {
|
|
|
- StringBuffer str;
|
|
|
- PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
|
|
|
+ if (queryCoven().inCoven(node))
|
|
|
+ {
|
|
|
+ PROGLOG("Coven Session Stopping (%s)", clientStr.str());
|
|
|
// more TBD here
|
|
|
return;
|
|
|
}
|
|
|
- CProcessSessionState *s= processlookup.query(node);
|
|
|
+ CProcessSessionState *s = processlookup.query(node);
|
|
|
if (!s)
|
|
|
return;
|
|
|
- idtostop = s->getId();
|
|
|
+ idtostop = s->dequeuePreviousSessionId();
|
|
|
+ if (idtostop)
|
|
|
+ {
|
|
|
+ PROGLOG("Previous sessionId (%"I64F"x) for %s was replaced by (%"I64F"x), stopping old session now", idtostop, clientStr.str(), s->getId());
|
|
|
+ unsigned c = s->previousSessionIdCount();
|
|
|
+ if (c) // very unlikely, but could be >1, trace for info.
|
|
|
+ PROGLOG("%d old sessions pending closure", c);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ idtostop = s->getId();
|
|
|
}
|
|
|
stopSession(idtostop,true);
|
|
|
}
|