浏览代码

Merge pull request #15600 from ghalliday/issue26819b

HPCC-26819 Use shutdown to abort reading from flow thread

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父节点
当前提交
1b3e7ca708

+ 7 - 3
roxie/udplib/udpsha.hpp

@@ -298,8 +298,9 @@ private:
     virtual bool set_nonblock(bool on) override { UNIMPLEMENTED; }
     virtual bool set_nagle(bool on) override { UNIMPLEMENTED; }
     virtual void set_linger(int lingersecs) override { UNIMPLEMENTED; }
-    virtual void  cancel_accept() override { UNIMPLEMENTED; }
-    virtual void  shutdown(unsigned mode=SHUTDOWN_READWRITE) override { UNIMPLEMENTED; }
+    virtual void cancel_accept() override { UNIMPLEMENTED; }
+    virtual void shutdown(unsigned mode) override { UNIMPLEMENTED; }
+    virtual void shutdownNoThrow(unsigned mode) override { UNIMPLEMENTED; }
     virtual int name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
     virtual int peer_name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
     virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep) override { UNIMPLEMENTED; }
@@ -388,7 +389,8 @@ public:
                          unsigned timeout) override;
     virtual int wait_read(unsigned timeout) override;
     virtual void close() override {}
-
+    virtual void  shutdown(unsigned mode) override { }
+    virtual void  shutdownNoThrow(unsigned mode) override{ }
 };
 
 class CSimulatedQueueWriteSocket : public CSocketSimulator
@@ -405,6 +407,8 @@ public:
 
 class CSimulatedUdpSocket : public CSocketSimulator
 {
+    virtual void  shutdown(unsigned mode) override { realSocket->shutdown(mode); }
+    virtual void  shutdownNoThrow(unsigned mode) override{ realSocket->shutdownNoThrow(mode); }
 protected:
     Owned<ISocket> realSocket;
 };

+ 2 - 2
roxie/udplib/udptrs.cpp

@@ -792,8 +792,8 @@ class CSendManager : implements ISendManager, public CInterface
         ~send_receive_flow() 
         {
             running = false;
-            if (flow_socket) 
-                flow_socket->close();
+            if (flow_socket)
+                flow_socket->shutdownNoThrow();
             join();
         }
         

+ 12 - 0
system/jlib/jsocket.cpp

@@ -420,6 +420,7 @@ public:
     void        errclose();
     bool        connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); }
     void        shutdown(unsigned mode=SHUTDOWN_READWRITE);
+    void        shutdownNoThrow(unsigned mode);
 
     ISocket*    accept(bool allowcancel, SocketEndpoint *peerEp=nullptr);
     int         wait_read(unsigned timeout);
@@ -2521,6 +2522,17 @@ void CSocket::shutdown(unsigned mode)
     }
 }
 
+void CSocket::shutdownNoThrow(unsigned mode)
+{
+    if (state == ss_open) {
+        state = ss_shutdown;
+#ifdef SOCKTRACE
+        PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%p)", mode, sock, sock, this);
+#endif
+        ::shutdown(sock, mode);
+    }
+}
+
 void CSocket::errclose()
 {
 #ifdef USERECVSEM

+ 3 - 0
system/jlib/jsocket.hpp

@@ -340,6 +340,9 @@ public:
     //
     virtual void  shutdown(unsigned mode=SHUTDOWN_READWRITE) = 0; // not needed for UDP
 
+    // Same as shutdown, but never throws an exception (to call from closedown destructors)
+    virtual void  shutdownNoThrow(unsigned mode=SHUTDOWN_READWRITE) = 0; // not needed for UDP
+
     // Get local name of accepted (or connected) socket and returns port
     virtual int name(char *name,size32_t namemax)=0;
 

+ 6 - 0
system/security/securesocket/securesocket.cpp

@@ -247,6 +247,12 @@ public:
         m_socket->shutdown(mode);
     }
 
+    // Same as shutdown, but never throws an exception (to call from closedown destructors)
+    virtual void  shutdownNoThrow(unsigned mode)
+    {
+        m_socket->shutdownNoThrow(mode);
+    }
+
     // Get local name of accepted (or connected) socket and returns port
     virtual int name(char *name,size32_t namemax)
     {