Browse Source

Merge pull request #9281 from jakesmith/hpcc-16541

HPCC-16541 jsocket / MP link closed tracing

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 years ago
parent
commit
ce277defdc

+ 16 - 8
system/jlib/jsocket.cpp

@@ -140,14 +140,6 @@ static atomic_t pre_conn_unreach_cnt = ATOMIC_INIT(0);    // global count of pre
 
 #define IPV6_SERIALIZE_PREFIX (0x00ff00ff)
 
-inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
-{
-    if (err) 
-        PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
-           (info&&*info)?" ":"",(info&&*info)?info:"",err,
-           (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
-}
-    
 
 class jlib_thrown_decl SocketException: public CInterface, public IJSOCK_Exception
 {
@@ -673,6 +665,22 @@ typedef union {
 #define INET6_ADDRSTRLEN 65
 #endif
 
+inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
+{
+    if (err)
+    {
+        PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
+           (info&&*info)?" ":"",(info&&*info)?info:"",err,
+           (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
+        if ((JSE_NOTCONN == err) || (JSE_CONNRESET == err) || (JSE_CONNABORTED == err))
+        {
+            PROGLOG("Socket not connected, stack:");
+            PrintStackReport();
+        }
+    }
+}
+
+
 
 inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
 {

+ 46 - 10
system/mp/mpcomm.cpp

@@ -48,6 +48,11 @@
 
 //#define _TRACE
 //#define _FULLTRACE
+
+#if 1 // #ifdef _FULLTRACE
+#define _TRACELINKCLOSED
+#endif
+#define _TRACEMPSERVERNOTIFYCLOSED
 #define _TRACEORPHANS
 
 
@@ -468,6 +473,7 @@ protected:
     unsigned short              port;
 public:
     bool checkclosed;
+    bool tryReopenChannel = false;
 
 // packet handlers
     PingPacketHandler           *pingpackethandler;         // TAG_SYS_PING
@@ -541,6 +547,22 @@ public:
     {
         return myNode;
     }
+    virtual void setOpt(MPServerOpts opt, const char *value)
+    {
+        switch (opt)
+        {
+            case mpsopt_channelreopen:
+            {
+                bool tf = (nullptr != value) ? strToBool(value) : false;
+                PROGLOG("Setting ChannelReopen = %s", tf ? "true" : "false");
+                tryReopenChannel = tf;
+                break;
+            }
+            default:
+                // ignore
+                break;
+        }
+    }
 };
 
 //===========================================================================
@@ -711,6 +733,17 @@ protected: friend class CMPPacketReader;
 #endif
 
 
+    bool checkReconnect(CTimeMon &tm)
+    {
+        if (!parent->tryReopenChannel)
+            return false;
+        ::Release(channelsock);
+        channelsock = nullptr;
+        if (connect(tm))
+            return true;
+        WARNLOG("Failed to reconnect");
+        return false;
+    }
     bool connect(CTimeMon &tm)
     {
         // must be called from connectsect
@@ -972,12 +1005,12 @@ public:
         {
             CriticalBlock block(connectsect);
             if (closed) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
                 LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
                 PrintStackReport();
 #endif
-                IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
-                throw e;
+                if (!checkReconnect(tm))
+                    throw new CMPException(MPERR_link_closed,remoteep);
             }
             if (!channelsock) {
                 if (!connect(tm)) {
@@ -1120,8 +1153,10 @@ public:
                 try {
                     s->shutdown();
                 }
-                catch (IException *) { 
+                catch (IException *e) {
                     socketfailed = true; // ignore if the socket has been closed
+                    WARNLOG("closeSocket() : Ignoring shutdown error");
+                    e->Release();
                 }
             }
             parent->querySelectHandler().remove(s);
@@ -1696,12 +1731,12 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
     size32_t msgsize = mb.length();
     PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
     if (closed||(reply&&!isConnected())) {  // flag error if has been disconnected
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
         PrintStackReport();
 #endif
-        IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
-        throw e;
+        if (!checkReconnect(tm))
+            throw new CMPException(MPERR_link_closed,remoteep);
     }
 
     bool ismulti = (msgsize>MAXDATAPERPACKET);
@@ -2229,7 +2264,7 @@ bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag
         return true;
     }
     if (nfy.aborted) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
         PrintStackReport();
 #endif
@@ -2315,7 +2350,7 @@ unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,Soc
         return nfy.cancel?0:nfy.count;
     }
     if (nfy.aborted) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
         PrintStackReport();
 #endif
@@ -2400,9 +2435,10 @@ bool CMPServer::nextChannel(CMPChannel *&cur)
 
 void CMPServer::notifyClosed(SocketEndpoint &ep)
 {
-#ifdef _TRACE
+#ifdef _TRACEMPSERVERNOTIFYCLOSED
     StringBuffer url;
     LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
+    PrintStackReport();
 #endif
     notifyclosedthread->notify(ep);
 }

+ 2 - 0
system/mp/mpcomm.hpp

@@ -91,12 +91,14 @@ extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false)
 extern mp_decl IInterCommunicator &queryWorldCommunicator();
 extern mp_decl bool hasMPServerStarted();
 
+enum MPServerOpts { mpsopt_null, mpsopt_channelreopen };
 interface IMPServer : extends IInterface
 {
     virtual mptag_t createReplyTag() = 0;
     virtual ICommunicator *createCommunicator(IGroup *group, bool outer=false) = 0;
     virtual void stop() = 0;
     virtual INode *queryMyNode() = 0;
+    virtual void setOpt(MPServerOpts opt, const char *value) = 0;
 };
 
 extern mp_decl void startMPServer(unsigned port,bool paused=false);

+ 3 - 0
thorlcr/master/thmastermain.cpp

@@ -588,6 +588,9 @@ int main( int argc, char *argv[]  )
             }
         }
 
+        if (globals->getPropBool("@MPChannelReconnect"))
+            getMPServer()->setOpt(mpsopt_channelreopen, "true");
+
         setPasswordsFromSDS();
 
         if (globals->getPropBool("@enableSysLog",true))

+ 5 - 1
thorlcr/slave/slavmain.cpp

@@ -164,10 +164,14 @@ public:
         channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
         unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", 200);
         mpServers.append(* getMPServer());
+        bool reconnect = globals->getPropBool("@MPChannelReconnect");
         for (unsigned sc=1; sc<channelsPerSlave; sc++)
         {
             unsigned port = getMachinePortBase() + (sc * localThorPortInc);
-            mpServers.append(*startNewMPServer(port));
+            IMPServer *mpServer = startNewMPServer(port);
+            if (reconnect)
+                mpServer->setOpt(mpsopt_channelreopen, "true");
+            mpServers.append(*mpServer);
         }
     }
     ~CJobListener()

+ 3 - 0
thorlcr/slave/thslavemain.cpp

@@ -353,6 +353,9 @@ int main( int argc, char *argv[]  )
         setSlaveAffinity(globals->getPropInt("@SLAVEPROCESSNUM"));
 
         startMPServer(getFixedPort(TPORT_mp));
+
+        if (globals->getPropBool("@MPChannelReconnect"))
+            getMPServer()->setOpt(mpsopt_channelreopen, "true");
 #ifdef USE_MP_LOG
         startLogMsgParentReceiver();
         LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp));