Browse Source

HPCC-11025 - Fix various MP close socket issues

Fix:
1) Spot non-MP clients connecting to the MP port early and ignore.
2) IpAddress::setNetAddress - set 'null' ip if null IPv4 provided.
3) Ignore InterCommunicator close sockets as exceptions unless
specifically listening to socket closed.
4) Ingore Communicator close socket exceptions unless part
internal to group communicator is based on.
5) Ensure close socket exception on Thor watchdog recycles Thor.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 11 years ago
parent
commit
16ffbf5f79
4 changed files with 87 additions and 18 deletions
  1. 1 1
      system/jlib/jsocket.cpp
  2. 1 0
      system/mp/mpbase.hpp
  3. 71 16
      system/mp/mpcomm.cpp
  4. 14 1
      thorlcr/master/mawatchdog.cpp

+ 1 - 1
system/jlib/jsocket.cpp

@@ -3074,8 +3074,8 @@ void IpAddress::setNetAddress(size32_t sz,const void *src)
     if (sz==sizeof(unsigned)) { // IPv4
         netaddr[0] = 0;
         netaddr[1] = 0;
-        netaddr[2]=0xffff0000;
         netaddr[3] = *(const unsigned *)src;
+        netaddr[2] = netaddr[3] ? 0xffff0000 : 0; // leave as null if Ipv4 address is null
     }
     else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6
         memcpy(&netaddr,src,sz);

+ 1 - 0
system/mp/mpbase.hpp

@@ -144,6 +144,7 @@ extern mp_decl void initMyNode(unsigned short port);
 
 interface IMP_Exception: extends IException
 {
+    virtual const SocketEndpoint &queryEndpoint() const = 0;
 };
 
 enum MessagePassingError

+ 71 - 16
system/mp/mpcomm.cpp

@@ -195,10 +195,6 @@ public:
     {
     }
 
-    CMPException(MessagePassingError err) : error(err) 
-    {
-    }
-
     StringBuffer &  errorMessage(StringBuffer &str) const
     { 
         StringBuffer tmp;
@@ -216,6 +212,7 @@ public:
     { 
         return MSGAUD_user; 
     }
+    virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
 private:
     MessagePassingError error;
     SocketEndpoint endpoint;
@@ -1668,8 +1665,21 @@ int CMPConnectThread::run()
                 SocketEndpoint hostep;
                 SocketEndpointV4 id[2];
                 sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT); 
+                if (rd != sizeof(SocketEndpointV4[2]))
+                {
+                    FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid number of connection bytes serialized");
+                    sock->close();
+                    continue;
+                }
                 id[0].get(remoteep);
                 id[1].get(hostep);
+                if (remoteep.isNull() || hostep.isNull())
+                {
+                    // JCSMORE, I think remoteep really must/should match a IP of this local host
+                    FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid remote and/or host ep serialized");
+                    sock->close();
+                    continue;
+                }
 #ifdef _FULLTRACE       
                 StringBuffer tmp1;
                 remoteep.getUrlStr(tmp1);
@@ -2191,12 +2201,34 @@ public:
         if (sender)
             *sender = NULL;
         CTimeMon tm(timeout);
-        if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm)) {
-            if (sender) 
-                *sender = createINode(mbuf.getSender());
-            return true;
+        loop
+        {
+            try
+            {
+                if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
+                {
+                    if (sender)
+                        *sender = createINode(mbuf.getSender());
+                    return true;
+                }
+                return false;
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed != e->errorCode())
+                    throw;
+                const SocketEndpoint &ep = e->queryEndpoint();
+                if (src)
+                {
+                    if (ep == src->endpoint())
+                        throw;
+                }
+                StringBuffer epStr;
+                ep.getUrlStr(epStr);
+                FLLOG(MCoperatorWarning, unknownJob, "CInterCommunicator: ignoring closed endpoint: %s", epStr.str());
+                e->Release();
+            }
         }
-        return false;
     }
 
 
@@ -2408,14 +2440,37 @@ public:
         else
             srcep = &queryEndpoint(srcrank);
         CTimeMon tm(timeout);
-        if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm)) {
-            if (sender)
-                *sender = group->rank(mbuf.getSender());
-            return true;
+        loop
+        {
+            try
+            {
+                if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
+                {
+                    if (sender)
+                        *sender = group->rank(mbuf.getSender());
+                    return true;
+                }
+                if (sender)
+                    *sender = RANK_NULL;
+                return false;
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed == e->errorCode())
+                {
+                    const SocketEndpoint &ep = e->queryEndpoint();
+                    if (RANK_NULL == group->rank(ep))
+                    {
+                        StringBuffer epStr;
+                        ep.getUrlStr(epStr);
+                        FLLOG(MCoperatorWarning, unknownJob, "CCommunicator: ignoring closed endpoint from outside the communicator group: %s", epStr.str());
+                        e->Release();
+                        continue; // recv again
+                    }
+                }
+                throw;
+            }
         }
-        if (sender)
-            *sender = RANK_NULL;
-        return false;
     }
     
     void flush(mptag_t tag)

+ 14 - 1
thorlcr/master/mawatchdog.cpp

@@ -196,7 +196,20 @@ void CMasterWatchdogBase::main()
         {
             HeartBeatPacketHeader hb;
             MemoryBuffer progressData;
-            unsigned sz = readPacket(hb, progressData);
+            unsigned sz;
+            try
+            {
+                sz = readPacket(hb, progressData);
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed != e->errorCode())
+                    throw;
+                const SocketEndpoint &ep = e->queryEndpoint();
+                StringBuffer epStr;
+                ep.getUrlStr(epStr);
+                abortThor(MakeThorOperatorException(TE_AbortException, "Watchdog has lost connectivity with Thor slave: %s (Process terminated or node down?)", epStr.str()));
+            }
             if (stopped)
                 break;
             else if (sz)