Bläddra i källkod

HPCC-13968 Protect MP channel use from async expiry/deletion.

This change links the object where used, so that it remains valid,
after it has been removed channel table.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 år sedan
förälder
incheckning
48d4ce9a56
1 ändrade filer med 30 tillägg och 29 borttagningar
  1. 30 29
      system/mp/mpcomm.cpp

+ 30 - 29
system/mp/mpcomm.cpp

@@ -457,7 +457,7 @@ public:
     void stop();
     unsigned short getPort() { return port; }
     void setPort(unsigned short _port) { port = _port; }
-    CMPChannel &lookup(const SocketEndpoint &remoteep);
+    CMPChannel *lookup(const SocketEndpoint &remoteep);
     ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
     CBufferQueue &getReceiveQ() { return receiveq; }
     bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
@@ -1930,7 +1930,8 @@ int CMPConnectThread::run()
                 PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
 #endif
                 checkSelfDestruct(&id[0],sizeof(id));
-                if (!parent->lookup(_remoteep).attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
+                Linked<CMPChannel> channel = parent->lookup(_remoteep);
+                if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
 #ifdef _FULLTRACE       
                     PROGLOG("MP Connect Thread: lookup failed");
 #endif
@@ -2012,7 +2013,7 @@ public:
 
 //-----------------------------------------------------------------------------------
 
-CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
+CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
 {
     // there is an assumption here that no removes will be done within this loop
     CriticalBlock block(serversect);
@@ -2040,7 +2041,7 @@ CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
         e = new CMPChannel(this,ep);
         add(*e);
     }
-    return *e;
+    return LINK(e);
 }
 
 
@@ -2336,11 +2337,11 @@ public:
         }
 
         CTimeMon tm(timeout);
-        CMPChannel &channel = parent->lookup(dst->endpoint());
+        Linked<CMPChannel> channel = parent->lookup(dst->endpoint());
         unsigned remaining;
         if (tm.timedout(&remaining))
             return false;
-        if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
+        if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
             return false;
         mbuf.clear(); // for consistent semantics
         return true;
@@ -2350,11 +2351,11 @@ public:
     {
         CriticalBlock block(verifysect);
         CTimeMon tm(timeout);
-        CMPChannel &channel = parent->lookup(node->endpoint());
+        Linked<CMPChannel> channel = parent->lookup(node->endpoint());
         unsigned remaining;
         if (tm.timedout(&remaining))
             return false;
-        return channel.verifyConnection(tm,true);
+        return channel->verifyConnection(tm,true);
     }
 
     void verifyAll(StringBuffer &log)
@@ -2393,12 +2394,12 @@ public:
                 else
                     doverify = (myrank<rank);
                 if (doverify) {
-                    CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
+                    Linked<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
                     unsigned remaining;
                     if (tm.timedout(&remaining)) {
                         return false;
                     }
-                    if (!channel.verifyConnection(tm,true)) {
+                    if (!channel->verifyConnection(tm,true)) {
                         return false;
                     }
                 }
@@ -2408,8 +2409,8 @@ public:
             ForEachNodeInGroup(rank,*group) {
                 bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
                 if (doverify) {
-                    CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
-                    while (!channel.verifyConnection(tm,false)) {
+                    Linked<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
+                    while (!channel->verifyConnection(tm,false)) {
                         unsigned remaining;
                         if (tm.timedout(&remaining))
                             return false;
@@ -2506,9 +2507,9 @@ public:
     void disconnect(INode *node)
     {
         CriticalBlock block(verifysect);
-        CMPChannel &channel = parent->lookup(node->endpoint());
-        channel.closeSocket();
-        parent->removeChannel(&channel);
+        Linked<CMPChannel> channel = parent->lookup(node->endpoint());
+        channel->closeSocket();
+        parent->removeChannel(channel);
     }
 
     CInterCommunicator(CMPServer *_parent)
@@ -2535,7 +2536,7 @@ class CCommunicator: public CInterface, public ICommunicator
         return group->queryNode(rank).endpoint();
     }
 
-    CMPChannel &queryChannel(rank_t rank)
+    CMPChannel *getChannel(rank_t rank)
     {
         return parent->lookup(queryEndpoint(rank));
     }
@@ -2583,11 +2584,11 @@ public:
                 endrank = dstrank;
             for (;dstrank<=endrank;dstrank++) {
                 if (dstrank!=myrank) {
-                    CMPChannel &channel = queryChannel(dstrank);
+                    Linked<CMPChannel> channel = getChannel(dstrank);
                     unsigned remaining;
                     if (tm.timedout(&remaining))
                         return false;
-                    if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
+                    if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
                         return false;
                 }
             }
@@ -2652,11 +2653,11 @@ public:
         assertex(rank!=RANK_RANDOM);
         assertex(rank!=RANK_ALL);
         CTimeMon tm(timeout);
-        CMPChannel &channel = queryChannel(rank);
+        Linked<CMPChannel> channel = getChannel(rank);
         unsigned remaining;
         if (tm.timedout(&remaining))
             return false;
-        return channel.verifyConnection(tm,true);
+        return channel->verifyConnection(tm,true);
     }
 
     bool verifyAll(bool duplex, unsigned timeout)
@@ -2674,12 +2675,12 @@ public:
                 else
                     doverify = (myrank<rank);
                 if (doverify) {
-                    CMPChannel &channel = queryChannel(rank);
+                    Linked<CMPChannel> channel = getChannel(rank);
                     unsigned remaining;
                     if (tm.timedout(&remaining)) {
                         return false;
                     }
-                    if (!channel.verifyConnection(tm,true)) 
+                    if (!channel->verifyConnection(tm,true))
                         return false;
                 }
             }
@@ -2688,8 +2689,8 @@ public:
             ForEachNodeInGroup(rank,*group) {
                 bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
                 if (doverify) {
-                    CMPChannel &channel = queryChannel(rank);
-                    while (!channel.verifyConnection(tm,false)) {
+                    Linked<CMPChannel> channel = getChannel(rank);
+                    while (!channel->verifyConnection(tm,false)) {
                         unsigned remaining;
                         if (tm.timedout(&remaining))
                             return false;
@@ -2804,12 +2805,12 @@ public:
         }
             
         CTimeMon tm(timeout);
-        CMPChannel &channel = parent->lookup(mbuf.getSender());
+        Linked<CMPChannel> channel = parent->lookup(mbuf.getSender());
         unsigned remaining;
         if (tm.timedout(&remaining)) {
             return false;
         }
-        if (channel.send(mbuf,replytag,TAG_NULL,tm, true)) {
+        if (channel->send(mbuf,replytag,TAG_NULL,tm, true)) {
             mbuf.setReplyTag(TAG_NULL);
             return true;
         }
@@ -2825,9 +2826,9 @@ public:
     void disconnect(INode *node)
     {
         CriticalBlock block(verifysect);
-        CMPChannel &channel = parent->lookup(node->endpoint());
-        channel.closeSocket();
-        parent->removeChannel(&channel);
+        Linked<CMPChannel> channel = parent->lookup(node->endpoint());
+        channel->closeSocket();
+        parent->removeChannel(channel);
     }
 
     CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)