/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ // New IPv6 Version - IN PROGRESS /* TBD IPv6 connect multicast look at loopback */ #include "platform.h" #ifdef _VER_C5 #include #else #include "platform.h" #include #endif #include #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include #include #include #include #else #include #include #include #include #include #include #include #include #endif #include #include "jmutex.hpp" #include "jsocket.hpp" #include "jexcept.hpp" #include "jio.hpp" #include "jmisc.hpp" #include "jthread.hpp" #include "jqueue.tpp" #include "jtime.hpp" #include "jprop.hpp" #include "jregexp.hpp" #include "jdebug.hpp" #include "build-config.h" // various options #define CONNECT_TIMEOUT_REFUSED_WAIT 1000 // maximum to sleep on connect_timeout #define TRACE_SLOW_BLOCK_TRANSFER #define DEFAULT_CONNECT_TIME (100*1000) // for connect_wait #ifndef _WIN32 #define BLOCK_POLLED_SINGLE_CONNECTS // NB this is much slower in windows #define CENTRAL_NODE_RANDOM_DELAY #else #define USERECVSEM // to singlethread BF_SYNC_TRANSFER_PUSH #endif #ifdef _DEBUG //#define SOCKTRACE #endif #ifdef _TESTING #define _TRACE #endif #ifdef _TRACE #define THROWJSOCKEXCEPTION(exc) \ { StringBuffer msg; \ msg.appendf("Target: %s, Raised in: %s, line %d",tracename ,__FILE__, __LINE__); \ IJSOCK_Exception *e = new SocketException(exc,msg.str());\ throw e; } #define THROWJSOCKEXCEPTION2(exc) \ { StringBuffer msg; \ msg.appendf("Raised in: %s, line %d",__FILE__, __LINE__); \ IJSOCK_Exception *e = new SocketException(exc,msg.str());\ throw e; } #define LOGERR(err,ref,info) LogErr(err,ref,info,__LINE__,NULL) #define LOGERR2(err,ref,info) LogErr(err,ref,info,__LINE__,tracename) #else #define THROWJSOCKEXCEPTION(exc) \ { IJSOCK_Exception *e = new SocketException(exc);\ throw e; } #define THROWJSOCKEXCEPTION2(exc) THROWJSOCKEXCEPTION(exc) #define LOGERR(err,ref,info) #define LOGERR2(err,ref,info) #endif JSocketStatistics STATS; static bool IP4only=false; // slighly faster if we know no IPv6 static bool IP6preferred=false; // e.g. for DNS and socket create IpSubNet PreferredSubnet(NULL,NULL); // set this if you prefer a particular subnet for debugging etc // e.g. PreferredSubnet("192.168.16.0", "255.255.255.0") #define IPV6_SERIALIZE_PREFIX (0x00ff00ff) inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename) { if (err) PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno, (info&&*info)?" ":"",(info&&*info)?info:"",err, (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:""); } class jlib_thrown_decl SocketException: public CInterface, public IJSOCK_Exception { public: IMPLEMENT_IINTERFACE; SocketException(int code,const char *_msg=NULL) : errcode(code) { if (_msg) msg = strdup(_msg); else msg = NULL; }; ~SocketException() { free(msg); } int errorCode() const { return errcode; } static StringBuffer & geterrormessage(int err,StringBuffer &str) { switch (err) { case JSOCKERR_ok: return str.append("ok"); case JSOCKERR_not_opened: return str.append("socket not opened"); case JSOCKERR_bad_address: return str.append("bad address"); case JSOCKERR_connection_failed: return str.append("connection failed"); case JSOCKERR_broken_pipe: return str.append("connection is broken"); case JSOCKERR_graceful_close: return str.append("connection closed other end"); case JSOCKERR_invalid_access_mode: return str.append("invalid access mode"); case JSOCKERR_timeout_expired: return str.append("timeout expired"); case JSOCKERR_port_in_use: return str.append("port in use"); case JSOCKERR_cancel_accept: return str.append("cancel accept"); case JSOCKERR_connectionless_socket: return str.append("connectionless socket"); case JSOCKERR_handle_too_large: return str.append("handle too large"); case JSOCKERR_bad_netaddr: return str.append("bad net addr"); case JSOCKERR_ipv6_not_implemented: return str.append("IPv6 not implemented"); // OS errors #ifdef _WIN32 case WSAEINTR: return str.append("WSAEINTR(10004) - Interrupted system call."); case WSAEBADF: return str.append("WSAEBADF(10009) - Bad file number."); case WSAEACCES: return str.append("WSAEACCES(10013) - Permission denied."); case WSAEFAULT: return str.append("WSAEFAULT(10014) - Bad address."); case WSAEINVAL: return str.append("WSAEINVAL(10022) - Invalid argument."); case WSAEMFILE: return str.append("WSAEMFILE(10024) - Too many open files."); case WSAEWOULDBLOCK: return str.append("WSAEWOULDBLOCK(10035) - Operation would block."); case WSAEINPROGRESS: return str.append("WSAEINPROGRESS(10036) - Operation now in progress."); case WSAEALREADY: return str.append("WSAEALREADY(10037) - Operation already in progress."); case WSAENOTSOCK: return str.append("WSAENOTSOCK(10038) - Socket operation on nonsocket."); case WSAEDESTADDRREQ: return str.append("WSAEDESTADDRREQ(10039) - Destination address required."); case WSAEMSGSIZE: return str.append("WSAEMSGSIZE(10040) - Message too long."); case WSAEPROTOTYPE: return str.append("WSAEPROTOTYPE(10041) - Protocol wrong type for socket."); case WSAENOPROTOOPT: return str.append("WSAENOPROTOOPT(10042) - Protocol not available."); case WSAEPROTONOSUPPORT: return str.append("WSAEPROTONOSUPPORT(10043) - Protocol not supported."); case WSAESOCKTNOSUPPORT: return str.append("WSAESOCKTNOSUPPORT(10044) - Socket type not supported."); case WSAEOPNOTSUPP: return str.append("WSAEOPNOTSUPP(10045) - Operation not supported on socket."); case WSAEPFNOSUPPORT: return str.append("WSAEPFNOSUPPORT(10046) - Protocol family not supported."); case WSAEAFNOSUPPORT: return str.append("WSAEAFNOSUPPORT(10047) - Address family not supported by protocol family."); case WSAEADDRINUSE: return str.append("WSAEADDRINUSE(10048) - Address already in use."); case WSAEADDRNOTAVAIL: return str.append("WSAEADDRNOTAVAIL(10049) - Cannot assign requested address."); case WSAENETDOWN: return str.append("WSAENETDOWN(10050) - Network is down."); case WSAENETUNREACH: return str.append("WSAENETUNREACH(10051) - Network is unreachable."); case WSAENETRESET: return str.append("WSAENETRESET(10052) - Network dropped connection on reset."); case WSAECONNABORTED: return str.append("WSAECONNABORTED(10053) - Software caused connection abort."); case WSAECONNRESET: return str.append("WSAECONNRESET(10054) - Connection reset by peer."); case WSAENOBUFS: return str.append("WSAENOBUFS(10055) - No buffer space available."); case WSAEISCONN: return str.append("WSAEISCONN(10056) - Socket is already connected."); case WSAENOTCONN: return str.append("WSAENOTCONN(10057) - Socket is not connected."); case WSAESHUTDOWN: return str.append("WSAESHUTDOWN(10058) - Cannot send after socket shutdown."); case WSAETOOMANYREFS: return str.append("WSAETOOMANYREFS(10059) - Too many references: cannot splice."); case WSAETIMEDOUT: return str.append("WSAETIMEDOUT(10060) - Connection timed out."); case WSAECONNREFUSED: return str.append("WSAECONNREFUSED(10061) - Connection refused."); case WSAELOOP: return str.append("WSAELOOP(10062) - Too many levels of symbolic links."); case WSAENAMETOOLONG: return str.append("WSAENAMETOOLONG(10063) - File name too long."); case WSAEHOSTDOWN: return str.append("WSAEHOSTDOWN(10064) - Host is down."); case WSAEHOSTUNREACH: return str.append("WSAEHOSTUNREACH(10065) - No route to host."); case WSASYSNOTREADY: return str.append("WSASYSNOTREADY(10091) - The network subsystem is unusable."); case WSAVERNOTSUPPORTED: return str.append("WSAVERNOTSUPPORTED(10092) - The Windows Sockets DLL cannot support this application."); case WSANOTINITIALISED: return str.append("WSANOTINITIALISED(10093) - Winsock not initialized."); case WSAEDISCON: return str.append("WSAEDISCON(10101) - Disconnect."); case WSAHOST_NOT_FOUND: return str.append("WSAHOST_NOT_FOUND(11001) - Host not found."); case WSATRY_AGAIN: return str.append("WSATRY_AGAIN(11002) - Nonauthoritative host not found."); case WSANO_RECOVERY: return str.append("WSANO_RECOVERY(11003) - Nonrecoverable error."); case WSANO_DATA: return str.append("WSANO_DATA(11004) - Valid name, no data record of requested type."); #else case ENOTSOCK: return str.append("ENOTSOCK - Socket operation on non-socket "); case EDESTADDRREQ: return str.append("EDESTADDRREQ - Destination address required "); case EMSGSIZE: return str.append("EMSGSIZE - Message too long "); case EPROTOTYPE: return str.append("EPROTOTYPE - Protocol wrong type for socket "); case ENOPROTOOPT: return str.append("ENOPROTOOPT - Protocol not available "); case EPROTONOSUPPORT: return str.append("EPROTONOSUPPORT - Protocol not supported "); case ESOCKTNOSUPPORT: return str.append("ESOCKTNOSUPPORT - Socket type not supported "); case EOPNOTSUPP: return str.append("EOPNOTSUPP - Operation not supported on socket "); case EPFNOSUPPORT: return str.append("EPFNOSUPPORT - Protocol family not supported "); case EAFNOSUPPORT: return str.append("EAFNOSUPPORT - Address family not supported by protocol family "); case EADDRINUSE: return str.append("EADDRINUSE - Address already in use "); case EADDRNOTAVAIL: return str.append("EADDRNOTAVAIL - Can't assign requested address "); case ENETDOWN: return str.append("ENETDOWN - Network is down "); case ENETUNREACH: return str.append("ENETUNREACH - Network is unreachable "); case ENETRESET: return str.append("ENETRESET - Network dropped connection because of reset "); case ECONNABORTED: return str.append("ECONNABORTED - Software caused connection abort "); case ECONNRESET: return str.append("ECONNRESET - Connection reset by peer "); case ENOBUFS: return str.append("ENOBUFS - No buffer space available "); case EISCONN: return str.append("EISCONN - Socket is already connected "); case ENOTCONN: return str.append("ENOTCONN - Socket is not connected "); case ESHUTDOWN: return str.append("ESHUTDOWN - Can't send after socket shutdown "); case ETOOMANYREFS: return str.append("ETOOMANYREFS - Too many references: can't splice "); case ETIMEDOUT: return str.append("ETIMEDOUT - Connection timed out "); case ECONNREFUSED: return str.append("ECONNREFUSED - Connection refused "); case EHOSTDOWN: return str.append("EHOSTDOWN - Host is down "); case EHOSTUNREACH: return str.append("EHOSTUNREACH - No route to host "); case EWOULDBLOCK: return str.append("EWOULDBLOCK - operation already in progress"); case EINPROGRESS: return str.append("EINPROGRESS - operation now in progress "); #endif } IException *ose = MakeOsException(err); ose->errorMessage(str); ose->Release(); return str; } StringBuffer & errorMessage(StringBuffer &str) const { if (msg) return geterrormessage(errcode,str).append('\n').append(msg); return geterrormessage(errcode,str); } MessageAudience errorAudience() const { switch (errcode) { case JSOCKERR_port_in_use: return MSGAUD_operator; } return MSGAUD_user; } private: int errcode; char *msg; }; IJSOCK_Exception *IPv6NotImplementedException(const char *filename,unsigned lineno) { StringBuffer msg; msg.appendf("%s(%d)",filename,lineno); return new SocketException(JSOCKERR_ipv6_not_implemented,msg.str()); } struct MCASTREQ { struct in_addr imr_multiaddr; /* multicast group to join */ struct in_addr imr_interface; /* interface to join on */ MCASTREQ(const char *mcip) { imr_multiaddr.s_addr = inet_addr(mcip); imr_interface.s_addr = htonl(INADDR_ANY); } }; #ifdef __APPLE__ #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0x4000 #endif #endif #if defined( _WIN32) #define T_SOCKET SOCKET #define T_FD_SET fd_set #define XFD_SETSIZE FD_SETSIZE #define ETIMEDOUT WSAETIMEDOUT #define ECONNREFUSED WSAECONNREFUSED #define XFD_ZERO(s) FD_ZERO(s) #define SEND_FLAGS 0 #define BADSOCKERR(err) ((err==WSAEBADF)||(err==WSAENOTSOCK)) #define CHECKSOCKRANGE(s) #elif defined(__FreeBSD__) || defined(__APPLE__) #define XFD_SETSIZE FD_SETSIZE #define T_FD_SET fd_set #define XFD_ZERO(s) FD_ZERO(s) #define T_SOCKET int #define SEND_FLAGS (MSG_NOSIGNAL) #define BADSOCKERR(err) ((err==EBADF)||(err==ENOTSOCK)) #define CHECKSOCKRANGE(s) #else #define XFD_SETSIZE 8192 struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our own // linux 64 bit #ifdef __linux__ #ifdef __x86_64__ #undef __FDMASK #define __FDMASK(d) (1UL << ((d) % __NFDBITS)) #undef __FDELT #define __FDELT(d) ((d) / __NFDBITS) #undef __FD_SET #define __FD_SET(d, s) (__FDS_BITS (s)[__FDELT(d)] |= __FDMASK(d)) #undef __FD_ISSET #define __FD_ISSET(d, s) ((__FDS_BITS (s)[__FDELT(d)] & __FDMASK(d)) != 0) #endif #define CHECKSOCKRANGE(s) { if (s>=XFD_SETSIZE) THROWJSOCKEXCEPTION2(JSOCKERR_handle_too_large); } #endif // end 64 bit #define T_FD_SET xfd_set #define XFD_ZERO(s) memset(s,0,sizeof(xfd_set)) #define T_SOCKET int #define SEND_FLAGS (MSG_NOSIGNAL) #define BADSOCKERR(err) ((err==EBADF)||(err==ENOTSOCK)) #endif #ifdef CENTRAL_NODE_RANDOM_DELAY static SocketEndpointArray CentralNodeArray; #endif enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast}; class CSocket: public CInterface, public ISocket { public: IMPLEMENT_IINTERFACE; static CriticalSection crit; protected: friend class CSocketConnectWait; enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state; T_SOCKET sock; char* hostname; // host address unsigned short hostport; // host port SOCKETMODE sockmode; IpAddress targetip; SocketEndpoint returnep; // set by set_return_addr MCASTREQ * mcastreq; size32_t nextblocksize; unsigned blockflags; unsigned blocktimeoutms; bool owned; enum {accept_not_cancelled, accept_cancel_pending, accept_cancelled} accept_cancel_state; bool in_accept; bool nonblocking; bool nagling; static unsigned connectingcount; #ifdef USERECVSEM static Semaphore receiveblocksem; bool receiveblocksemowned; // owned by this socket #endif #ifdef _TRACE char * tracename; #endif public: void open(int listen_queue_size,bool reuseports=false); bool connect_timeout( unsigned timeout, bool noexception); void connect_wait( unsigned timems); void udpconnect(); void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs); void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs); void read(void* buf, size32_t size); size32_t write(void const* buf, size32_t size); size32_t write_multiple(unsigned num,void const**buf, size32_t *size); size32_t udp_write_to(SocketEndpoint &ep,void const* buf, size32_t size); void close(); void errclose(); bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); } void shutdown(unsigned mode); ISocket* accept(bool allowcancel); int wait_read(unsigned timeout); int wait_write(unsigned timeout); int name(char *name,size32_t namemax); int peer_name(char *name,size32_t namemax); SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep); IpAddress & getPeerAddress(IpAddress &addr); void set_return_addr(int port,const char *name); // sets returnep void cancel_accept(); size32_t get_max_send_size(); bool set_nonblock(bool on=true); bool set_nagle(bool on); void set_linger(int lingersecs); void set_keep_alive(bool set); virtual void set_inherit(bool inherit=false); virtual bool check_connection(); // Block functions void set_block_mode(unsigned flags,size32_t recsize=0,unsigned timeoutms=0); bool send_block(const void *blk,size32_t sz); size32_t receive_block_size(); size32_t receive_block(void *blk,size32_t sz); size32_t get_send_buffer_size(); void set_send_buffer_size(size32_t sz); bool join_multicast_group(SocketEndpoint &ep); // for udp multicast bool leave_multicast_group(SocketEndpoint &ep); // for udp multicast size32_t get_receive_buffer_size(); void set_receive_buffer_size(size32_t sz); size32_t avail_read(); int pre_connect(bool block); int post_connect(); CSocket(const SocketEndpoint &_ep,SOCKETMODE smode,const char *name); CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned); virtual ~CSocket(); unsigned OShandle() { return (unsigned)sock; } private: int closesock() { if (sock!=INVALID_SOCKET) { T_SOCKET s = sock; sock = INVALID_SOCKET; STATS.activesockets--; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: Closing socket %x %d (%x)", s, s, this); #endif #ifdef _WIN32 return ::closesocket(s); #else return ::close(s); #endif } else return 0; } }; CriticalSection CSocket::crit; unsigned CSocket::connectingcount=0; #ifdef USERECVSEM Semaphore CSocket::receiveblocksem(2); #endif #ifdef _WIN32 class win_socket_library { static bool initdone; // to prevent dependancy probs very early on (e.g. jlog) public: win_socket_library() { init(); } bool init() { if (initdone) return true; WSADATA wsa; if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) { if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) { MessageBox(NULL,"Failed to initialize windows sockets","JLib Socket Error",MB_OK); return false; } } initdone = true; return true; } ~win_socket_library() { WSACleanup(); } }; bool win_socket_library::initdone = false; static win_socket_library ws32_lib; #define ERRNO() WSAGetLastError() #define EADDRINUSE WSAEADDRINUSE #define EINTRCALL WSAEINTR #define ECONNRESET WSAECONNRESET #define ECONNABORTED WSAECONNABORTED #define ENOTCONN WSAENOTCONN #define EWOULDBLOCK WSAEWOULDBLOCK #define EINPROGRESS WSAEINPROGRESS #define ENOTSOCK WSAENOTSOCK struct j_sockaddr_in6 { short sin6_family; /* AF_INET6 */ u_short sin6_port; /* Transport level port number */ u_long sin6_flowinfo; /* IPv6 flow information */ struct in_addr6 sin6_addr; /* IPv6 address */ u_long sin6_scope_id; /* set of interfaces for a scope */ }; typedef union { struct sockaddr sa; struct j_sockaddr_in6 sin6; struct sockaddr_in sin; } J_SOCKADDR; #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR)) static int _inet_pton(int af, const char* src, void* dst) { DEFINE_SOCKADDR(u); int address_length; switch (af) { case AF_INET: u.sin.sin_family = AF_INET; address_length = sizeof (u.sin); break; case AF_INET6: u.sin6.sin6_family = AF_INET6; address_length = sizeof (u.sin6); break; default: #ifdef EAFNOSUPPORT errno = EAFNOSUPPORT; #else errno = 52; #endif return -1; } ws32_lib.init(); int ret = WSAStringToAddress ((LPTSTR) src, af, NULL, &u.sa, &address_length); if (ret == 0) { switch (af) { case AF_INET: memcpy (dst, &u.sin.sin_addr, sizeof (struct in_addr)); break; case AF_INET6: memcpy (dst, &u.sin6.sin6_addr, sizeof (u.sin6.sin6_addr)); break; } return 1; } errno = WSAGetLastError(); // PROGLOG("errno = %d",errno); return 0; } static const char * _inet_ntop (int af, const void *src, char *dst, socklen_t cnt) { /* struct sockaddr can't accomodate struct sockaddr_in6. */ DEFINE_SOCKADDR(u); DWORD dstlen = cnt; size_t srcsize; memset(&u,0,sizeof(u)); switch (af) { case AF_INET: u.sin.sin_family = AF_INET; u.sin.sin_addr = *(struct in_addr *) src; srcsize = sizeof (u.sin); break; case AF_INET6: u.sin6.sin6_family = AF_INET6; memcpy(&u.sin6.sin6_addr,src,sizeof(in_addr6)); srcsize = sizeof (u.sin6); break; default: return NULL; } ws32_lib.init(); if (WSAAddressToString (&u.sa, srcsize, NULL, dst, &dstlen) != 0) { errno = WSAGetLastError(); return NULL; } return (const char *) dst; } int inet_aton (const char *name, struct in_addr *addr) { addr->s_addr = inet_addr (name); return (addr->s_addr == (u_long)-1)?1:0; // 255.255.255.255 has had it here } #else #define _inet_ntop inet_ntop #define _inet_pton inet_pton #define in_addr6 in6_addr typedef union { struct sockaddr sa; struct sockaddr_in6 sin6; struct sockaddr_in sin; } J_SOCKADDR; #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR)) #define EINTRCALL EINTR #define ERRNO() (errno) #ifndef INADDR_NONE #define INADDR_NONE (-1) #endif #endif #ifndef INET6_ADDRSTRLEN #define INET6_ADDRSTRLEN 65 #endif inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port) { if (!IP6preferred) { if (ip.getNetAddress(sizeof(in_addr),&u.sin.sin_addr)==sizeof(in_addr)) { u.sin.sin_family = AF_INET; u.sin.sin_port = htons(port); return sizeof(u.sin); } } if (IP4only) IPV6_NOT_IMPLEMENTED(); ip.getNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr); u.sin6.sin6_family = AF_INET6; u.sin6.sin6_port = htons(port); return sizeof(u.sin6); } inline socklen_t setSockAddrAny(J_SOCKADDR &u, unsigned short port) { if (IP6preferred) { #ifdef _WIN32 IN6ADDR_SETANY((PSOCKADDR_IN6)&u.sin6.sin6_addr); #else memcpy(&u.sin6.sin6_addr,&in6addr_any,sizeof(in_addr6)); #endif u.sin6.sin6_family= AF_INET6; u.sin6.sin6_port = htons(port); return sizeof(u.sin6); } u.sin.sin_addr.s_addr = htonl(INADDR_ANY); u.sin.sin_family= AF_INET; u.sin.sin_port = htons(port); return sizeof(u.sin); } inline void getSockAddrEndpoint(const J_SOCKADDR &u, socklen_t ul, SocketEndpoint &ep) { if (ul==sizeof(u.sin)) { ep.setNetAddress(sizeof(in_addr),&u.sin.sin_addr); ep.port = htons(u.sin.sin_port); } else { ep.setNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr); ep.port = htons(u.sin6.sin6_port); } } /* might need fcntl(F_SETFL), or ioctl(FIONBIO) */ /* Posix.1g says fcntl */ #if defined(O_NONBLOCK) bool CSocket::set_nonblock(bool on) { int flags = fcntl(sock, F_GETFL, 0); if (flags == -1) return nonblocking; if (on) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(sock, F_SETFL, flags)==0) { bool wasNonBlocking = nonblocking; nonblocking = on; return wasNonBlocking; } return nonblocking; } #else bool CSocket::set_nonblock(bool on) { #ifdef _WIN32 u_long yes = on?1:0; if (ioctlsocket(sock, FIONBIO, &yes)==0) { #else int yes = on?1:0; if (ioctl(sock, FIONBIO, &yes)==0) { #endif bool wasNonBlocking = nonblocking; nonblocking = on; return wasNonBlocking; } return nonblocking; } #endif bool CSocket::set_nagle(bool on) { bool ret = nagling; nagling = on; int enabled = !on; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled)) != 0) { nagling = !on; } return ret; } void CSocket::set_inherit(bool inherit) { #ifndef _WIN32 long flag = fcntl(sock, F_GETFD); if(inherit) flag &= ~FD_CLOEXEC; else flag |= FD_CLOEXEC; fcntl(sock, F_SETFD, flag); #endif } size32_t CSocket::avail_read() { #ifdef _WIN32 u_long avail; if (ioctlsocket(sock, FIONREAD, &avail)==0) #else int avail; if (ioctl(sock, FIONREAD, &avail)==0) #endif return (size32_t)avail; int err = ERRNO(); LOGERR2(err,1,"avail_read"); return 0; } int CSocket::pre_connect (bool block) { assertex(hostname); DEFINE_SOCKADDR(u); if (targetip.isNull()) { set_return_addr(hostport,hostname); targetip.ipset(returnep); } socklen_t ul = setSockAddr(u,targetip,hostport); sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6); owned = true; state = ss_pre_open; // will be set to open by post_connect if (sock == INVALID_SOCKET) { int err = ERRNO(); THROWJSOCKEXCEPTION(err); } STATS.activesockets++; int err = 0; set_nonblock(!block); int rc = ::connect(sock, &u.sa, ul); if (rc==SOCKET_ERROR) { err = ERRNO(); if ((err != EINPROGRESS)&&(err != EWOULDBLOCK)&&(err != ETIMEDOUT)&&(err!=ECONNREFUSED)) // handled by caller LOGERR2(err,1,"pre_connect"); } #ifdef SOCKTRACE PROGLOG("SOCKTRACE: pre-connected socket%s %x %d (%x) err=%d", block?"(block)":"", sock, sock, (int)this, err); #endif return err; } int CSocket::post_connect () { set_nonblock(false); int err = 0; socklen_t errlen = sizeof(err); int rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error if ((rc!=0)&&!err) err = ERRNO(); // some implementations of getsockopt duff if (err==0) { nagling = true; set_nagle(false); state = ss_open; } else if ((err!=ETIMEDOUT)&&(err!=ECONNREFUSED)) // handled by caller LOGERR2(err,1,"post_connect"); return err; } void CSocket::open(int listen_queue_size,bool reuseports) { if (IP6preferred) sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6); else sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0); if (sock == INVALID_SOCKET) { THROWJSOCKEXCEPTION(ERRNO()); } STATS.activesockets++; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: opened socket %x %d (%x)", sock,sock,this); #endif if ((hostport==0)&&(sockmode==sm_udp)) { state = ss_open; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: opened socket return udp"); #endif set_inherit(false); return; } #ifndef _WIN32 reuseports = true; // for some reason linux requires reuse ports #endif if (reuseports) { int on = 1; setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); } DEFINE_SOCKADDR(u); socklen_t ul; if (hostname) { if (targetip.isNull()) { set_return_addr(hostport,hostname); targetip.ipset(returnep); } ul = setSockAddr(u,targetip,hostport); } else ul = setSockAddrAny(u,hostport); int saverr; if (::bind(sock, &u.sa, ul) != 0) { saverr = ERRNO(); if (saverr==EADDRINUSE) { // don't log as error (some usages probe ports) ErrPortInUse: closesock(); char msg[1024]; sprintf(msg,"Target: %s, port = %d, Raised in: %s, line %d",tracename,(int)hostport,__FILE__, __LINE__); IJSOCK_Exception *e = new SocketException(JSOCKERR_port_in_use,msg); throw e; } else { closesock(); THROWJSOCKEXCEPTION(saverr); } } if (!connectionless()) { if (::listen(sock, listen_queue_size) != 0) { saverr = ERRNO(); if (saverr==EADDRINUSE) goto ErrPortInUse; closesock(); THROWJSOCKEXCEPTION(saverr); } } if (mcastreq) { if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)mcastreq, sizeof(*mcastreq))!=0) { saverr = ERRNO(); closesock(); THROWJSOCKEXCEPTION(saverr); } } state = ss_open; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: opened socket return"); #endif set_inherit(false); } ISocket* CSocket::accept(bool allowcancel) { if ((accept_cancel_state!=accept_not_cancelled) && allowcancel) { accept_cancel_state=accept_cancelled; return NULL; } if (state != ss_open) { ERRLOG("invalid accept, state = %d",(int)state); THROWJSOCKEXCEPTION(JSOCKERR_not_opened); } if (connectionless()) { THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket); } T_SOCKET newsock; loop { in_accept = true; newsock = (sock!=INVALID_SOCKET)?::accept(sock, NULL, NULL):INVALID_SOCKET; in_accept = false; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: accept created socket %x %d (%x)", newsock,newsock,this); #endif if (newsock!=INVALID_SOCKET) { if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) { ::close(newsock); newsock=INVALID_SOCKET; } else { accept_cancel_state = accept_not_cancelled; break; } } int saverr; saverr = ERRNO(); if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) { accept_cancel_state = accept_cancelled; if (allowcancel) return NULL; THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept); } if (saverr != EINTRCALL) { accept_cancel_state = accept_not_cancelled; THROWJSOCKEXCEPTION(saverr); } } if (state != ss_open) { accept_cancel_state = accept_cancelled; if (allowcancel) return NULL; THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept); } CSocket *ret = new CSocket(newsock,sm_tcp,true); ret->set_inherit(false); return ret; } void CSocket::set_linger(int lingertime) { struct linger l; l.l_onoff = (lingertime>=0)?1:0; l.l_linger = (lingertime>=0)?lingertime:0; if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l)) != 0) { WARNLOG("Linger not set"); } } void CSocket::set_keep_alive(bool set) { int on=set?1:0; if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&on, sizeof(on)) != 0) { WARNLOG("KeepAlive not set"); } } int CSocket::name(char *retname,size32_t namemax) { if (!retname) namemax = 0; if (namemax) retname[0] = 0; retname[0] = 0; if (state != ss_open) { THROWJSOCKEXCEPTION(JSOCKERR_not_opened); } DEFINE_SOCKADDR(u); socklen_t ul = sizeof(u); if (::getsockname(sock,&u.sa, &ul)<0) { THROWJSOCKEXCEPTION(ERRNO()); } SocketEndpoint ep; getSockAddrEndpoint(u,ul,ep); StringBuffer s; ep.getIpText(s); if (namemax>=1) { if (namemax-11) { if (namemax-1 sock = new CSocket(ep,sm_tcp,NULL); try { sock->connect_timeout(100,true); } catch (IJSOCK_Exception *e) { EXCLOG(e,"CSocket::cancel_eccept"); e->Release(); } } // ================================================================================ // connect versions ISocket* ISocket::connect( const SocketEndpoint &ep ) { // general connect return ISocket::connect_wait(ep,DEFAULT_CONNECT_TIME); } inline void refused_sleep(CTimeMon &tm, unsigned &refuseddelay) { unsigned remaining; if (!tm.timedout(&remaining)) { if (refuseddelay0) { // select succeeded - return error from socket (0 if connected) socklen_t errlen = sizeof(err); rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error if ((rc!=0)&&!err) err = ERRNO(); // some implementations of getsockopt duff if (err) // probably ECONNREFUSED but treat all errors same refused_sleep(tm,refuseddelay); } else if (rc<0) { err = ERRNO(); LOGERR2(err,2,"::select"); } } if (err==0) { err = post_connect(); if (err==0) { STATS.connects++; STATS.connecttime+=usTick()-startt; #ifdef _TRACE char peer[256]; peer[0] = 'C'; peer[1] = '!'; strcpy(peer+2,hostname?hostname:"(NULL)"); free(tracename); tracename = strdup(peer); #endif return true; } } errclose(); } #ifdef SOCKTRACE PROGLOG("connect_timeout: failed %d",err); #endif STATS.failedconnects++; STATS.failedconnecttime+=usTick()-startt; if (!noexception) THROWJSOCKEXCEPTION(JSOCKERR_connection_failed); return false; } ISocket* ISocket::connect_timeout(const SocketEndpoint &ep,unsigned timeout) { if (ep.isNull()||(ep.port==0)) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); Owned sock = new CSocket(ep,sm_tcp,NULL); sock->connect_timeout(timeout,false); return sock.getClear(); } #define POLLTIME 50 void CSocket::connect_wait(unsigned timems) { // simple connect with timeout (no fancy stuff!) unsigned startt = usTick(); CTimeMon tm(timems); bool exit = false; int err; unsigned refuseddelay = 1; while (!exit) { #ifdef CENTRAL_NODE_RANDOM_DELAY ForEachItemIn(cn,CentralNodeArray) { SocketEndpoint &ep=CentralNodeArray.item(cn); if (ep.ipequals(targetip)) { unsigned sleeptime = getRandom() % 1000; StringBuffer s; ep.getIpText(s); PrintLog("Connection to central node %s - sleeping %d milliseconds", s.str(), sleeptime); Sleep(sleeptime); break; } } #endif unsigned remaining; exit = tm.timedout(&remaining); bool blockselect = exit; // if last time round block { CriticalBlock block(crit); if (++connectingcount>4) blockselect = true; } err = pre_connect(blockselect); if (blockselect) { if (err&&!exit) refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same } else { unsigned timeoutms = (exit||(remaining<10000))?10000:remaining; unsigned polltime = 1; while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) { T_FD_SET fds; struct timeval tv; XFD_ZERO(&fds); FD_SET((unsigned)sock, &fds); T_FD_SET except; XFD_ZERO(&except); FD_SET((unsigned)sock, &except); #ifdef BLOCK_POLLED_SINGLE_CONNECTS tv.tv_sec = timeoutms / 1000; tv.tv_usec = (timeoutms % 1000)*1000; #else tv.tv_sec = 0; tv.tv_usec = 0; #endif CHECKSOCKRANGE(sock); int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv ); if (rc>0) { // select succeeded - return error from socket (0 if connected) socklen_t errlen = sizeof(err); rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error if ((rc!=0)&&!err) err = ERRNO(); // some implementations of getsockopt duff if (err) refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same break; } if (rc<0) { err = ERRNO(); LOGERR2(err,2,"::select"); break; } if (!timeoutms) { #ifdef SOCKTRACE PROGLOG("connecttimeout: timed out"); #endif err = -1; break; } #ifdef BLOCK_POLLED_SINGLE_CONNECTS break; #else if (timeoutmsPOLLTIME/2) polltime = POLLTIME; else polltime *= 2; #endif } } { CriticalBlock block(crit); --connectingcount; } if (err==0) { err = post_connect(); if (err==0) { STATS.connects++; STATS.connecttime+=usTick()-startt; #ifdef _TRACE char peer[256]; peer[0] = 'C'; peer[1] = '!'; strcpy(peer+2,hostname?hostname:"(NULL)"); free(tracename); tracename = strdup(peer); #endif return; } } errclose(); } #ifdef SOCKTRACE PROGLOG("connect_wait: failed %d",err); #endif STATS.failedconnects++; STATS.failedconnecttime+=usTick()-startt; THROWJSOCKEXCEPTION(JSOCKERR_connection_failed); } ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems) { if (ep.isNull()||(ep.port==0)) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); Owned sock = new CSocket(ep,sm_tcp,NULL); sock->connect_wait(timems); return sock.getClear(); } void CSocket::udpconnect() { DEFINE_SOCKADDR(u); if (targetip.isNull()) { set_return_addr(hostport,hostname); targetip.ipset(returnep); } socklen_t ul = setSockAddr(u,targetip,hostport); sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6); #ifdef SOCKTRACE PROGLOG("SOCKTRACE: udp connected socket %x %d (%x)", sock, sock, this); #endif STATS.activesockets++; if (sock == INVALID_SOCKET) { THROWJSOCKEXCEPTION(ERRNO()); } int res = ::connect(sock, &u.sa, ul); if (res != 0) { // works for UDP closesock(); THROWJSOCKEXCEPTION(JSOCKERR_connection_failed); } nagling = false; // means nothing for UDP state = ss_open; #ifdef _TRACE char peer[256]; peer[0] = 'C'; peer[1] = '!'; strcpy(peer+2,hostname?hostname:"(NULL)"); free(tracename); tracename = strdup(peer); #endif } int CSocket::wait_read(unsigned timeout) { int ret = 0; while (sock!=INVALID_SOCKET) { T_FD_SET fds; XFD_ZERO(&fds); FD_SET((unsigned)sock, &fds); CHECKSOCKRANGE(sock); if (timeout==WAIT_FOREVER) { ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL ); } else { struct timeval tv; tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000)*1000; ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv ); } if (ret==SOCKET_ERROR) { int err = ERRNO(); if (err!=EINTRCALL) { // else retry (should adjust time but for our usage don't think it matters that much) LOGERR2(err,1,"wait_read"); break; } } else break; } return ret; } int CSocket::wait_write(unsigned timeout) { int ret = 0; while (sock!=INVALID_SOCKET) { T_FD_SET fds; XFD_ZERO(&fds); FD_SET((unsigned)sock, &fds); CHECKSOCKRANGE(sock); if (timeout==WAIT_FOREVER) { ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL ); } else { struct timeval tv; tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000)*1000; ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv ); } if (ret==SOCKET_ERROR) { int err = ERRNO(); if (err!=EINTRCALL) { // else retry (should adjust time but for our usage don't think it matters that much) LOGERR2(err,1,"wait_write"); break; } } else break; } return ret; } void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutms) { if (timeoutms == WAIT_FOREVER) { read(buf,min_size, max_size, size_read,WAIT_FOREVER); return; } unsigned startt=usTick(); size_read = 0; if (state != ss_open) { THROWJSOCKEXCEPTION(JSOCKERR_not_opened); } unsigned start; unsigned timeleft; start = msTick(); timeleft = timeoutms; do { int rc = wait_read(timeleft); if (rc < 0) { THROWJSOCKEXCEPTION(ERRNO()); } if (rc == 0) { THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired); } unsigned elapsed = (msTick()-start); if (elapsed1024) { class Copt { T_SOCKET sock; bool nagling; public: Copt(T_SOCKET _sock,bool _nagling) { nagling = _nagling; int enabled = 1; int disabled = 0; if (!nagling) setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&disabled, sizeof(disabled)); setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&enabled, sizeof(enabled)); } ~Copt() { int enabled = 1; int disabled = 0; setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&disabled, sizeof(disabled)); if (!nagling) setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled)); } } copt(sock,nagling); for (i=0;is) cpy = s; memcpy(outbuf+os,b,cpy); os += cpy; left =- cpy; s -= cpy; b += cpy; if (left==0) { write(outbuf,os); break; } else if (os==outbufsize) { write(outbuf,os); os = 0; } } } #endif #endif STATS.writes++; STATS.writesize += res; STATS.writetime+=usTick()-startt; return res; } bool CSocket::send_block(const void *blk,size32_t sz) { unsigned startt=usTick(); #ifdef TRACE_SLOW_BLOCK_TRANSFER unsigned startt2; unsigned startt3; #endif if (blockflags&BF_SYNC_TRANSFER_PULL) { size32_t rd; bool eof = true; readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms); if (eof) return false; #ifdef TRACE_SLOW_BLOCK_TRANSFER startt2=usTick(); #endif } if (!blk||!sz) { sz = 0; write(&sz,sizeof(sz)); try { bool reply; size32_t rd; readtms(&reply,sizeof(reply),sizeof(reply),rd,blocktimeoutms); } catch (IJSOCK_Exception *e) { if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) EXCLOG(e,"CSocket::send_block"); e->Release(); } return false; } size32_t rsz=sz; _WINREV(rsz); write(&rsz,sizeof(rsz)); if (blockflags&BF_SYNC_TRANSFER_PUSH) { #ifdef TRACE_SLOW_BLOCK_TRANSFER startt2=usTick(); #endif size32_t rd; bool eof = true; readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms); if (eof) return false; #ifdef TRACE_SLOW_BLOCK_TRANSFER startt3=usTick(); #endif } write(blk,sz); if (blockflags&BF_RELIABLE_TRANSFER) { bool isok=false; size32_t rd; readtms(&isok,sizeof(isok),sizeof(isok),rd,blocktimeoutms); if (!isok) return false; } unsigned nowt = usTick(); unsigned elapsed = nowt-startt; STATS.blocksendtime+=elapsed; STATS.numblocksends++; STATS.blocksendsize+=sz; if (elapsed>STATS.longestblocksend) { STATS.longestblocksend = elapsed; STATS.longestblocksize = sz; } #ifdef TRACE_SLOW_BLOCK_TRANSFER static unsigned lastreporttime=0; static unsigned lastexceeded=0; if (elapsed>1000000*60) { // over 1min unsigned t = msTick(); if (1) { //((t-lastreporttime>1000*60) || // only report once per min // (elapsed>lastexceeded*2)) { lastexceeded = elapsed; lastreporttime = t; WARNLOG("send_block took %ds to %s (%d,%d,%d)",elapsed/1000000,tracename,startt2-startt,startt3-startt2,nowt-startt3); } } #endif return true; } #ifdef USERECVSEM class CSemProtect { Semaphore *sem; bool *owned; public: CSemProtect() { clear(); } ~CSemProtect() { if (sem&&*owned) { *owned = false; sem->signal(); } } void set(Semaphore *_sem,bool *_owned) { sem = _sem; owned = _owned; } bool wait(Semaphore *_sem,bool *_owned,unsigned timeout) { if (!*_owned&&!_sem->wait(timeout)) return false; *_owned = true; set(_sem,_owned); return true; } void clear() { sem = NULL; owned = NULL; } }; #endif size32_t CSocket::receive_block_size() { // assumed always paired with receive_block if (nextblocksize) { if (blockflags&BF_SYNC_TRANSFER_PULL) { bool eof=false; write(&eof,sizeof(eof)); } size32_t rd; readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms); _WINREV(nextblocksize); if (nextblocksize==0) { // confirm eof try { bool confirm=true; write(&confirm,sizeof(confirm)); } catch (IJSOCK_Exception *e) { if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) EXCLOG(e,"receive_block_size"); e->Release(); } } else if (blockflags&BF_SYNC_TRANSFER_PUSH) { // leaves receiveblocksem clear #ifdef USERECVSEM CSemProtect semprot; // this will catch exception in write while (!semprot.wait(&receiveblocksem,&receiveblocksemowned,60*1000*5)) WARNLOG("Receive block stalled"); #endif bool eof=false; write(&eof,sizeof(eof)); #ifdef USERECVSEM semprot.clear(); #endif } } return nextblocksize; } size32_t CSocket::receive_block(void *blk,size32_t maxsize) { #ifdef USERECVSEM CSemProtect semprot; // this will catch exceptions #endif size32_t sz = nextblocksize; if (sz) { if (sz==UINT_MAX) { // need to get size if (!blk||!maxsize) { if (blockflags&BF_SYNC_TRANSFER_PUSH) { // ignore block size size32_t rd; readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms); } if (blockflags&(BF_SYNC_TRANSFER_PULL|BF_SYNC_TRANSFER_PUSH)) { // signal eof bool eof=true; write(&eof,sizeof(eof)); nextblocksize = 0; return 0; } } sz = receive_block_size(); if (!sz) return 0; } unsigned startt=usTick(); // include sem block but not initial handshake #ifdef USERECVSEM if (blockflags&BF_SYNC_TRANSFER_PUSH) // read_block_size sets semaphore semprot.set(&receiveblocksem,&receiveblocksemowned); // this will reset semaphore on exit #endif nextblocksize = UINT_MAX; size32_t rd; if (sz<=maxsize) { readtms(blk,sz,sz,rd,blocktimeoutms); } else { // truncate readtms(blk,maxsize,maxsize,rd,blocktimeoutms); sz -= maxsize; void *tmp=malloc(sz); readtms(tmp,sz,sz,rd,blocktimeoutms); free(tmp); sz = maxsize; } if (blockflags&BF_RELIABLE_TRANSFER) { bool isok=true; write(&isok,sizeof(isok)); } unsigned elapsed = usTick()-startt; STATS.blockrecvtime+=elapsed; STATS.numblockrecvs++; STATS.blockrecvsize+=sz; } return sz; } void CSocket::set_block_mode(unsigned flags, size32_t recsize, unsigned _timeoutms) { blockflags = flags; nextblocksize = UINT_MAX; blocktimeoutms = _timeoutms?_timeoutms:WAIT_FOREVER; } void CSocket::shutdown(unsigned mode) { if (state == ss_open) { state = ss_shutdown; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%x)", mode, sock, sock, this); #endif int rc = ::shutdown(sock, mode); if (rc != 0) { int err=ERRNO(); if (err==ENOTCONN) { LOGERR2(err,9,"shutdown"); err = JSOCKERR_broken_pipe; } THROWJSOCKEXCEPTION(err); } } } void CSocket::errclose() { #ifdef USERECVSEM if (receiveblocksemowned) { receiveblocksemowned = false; receiveblocksem.signal(); } #endif if (state != ss_close) { state = ss_close; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: errclose socket %x %d (%x)", sock, sock, this); #endif if (mcastreq) setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq)); closesock(); } } void CSocket::close() { #ifdef USERECVSEM if (receiveblocksemowned) { receiveblocksemowned = false; receiveblocksem.signal(); } #endif if (state != ss_close) { #ifdef SOCKTRACE PROGLOG("SOCKTRACE: close socket %x %d (%x)", sock, sock, this); #endif state = ss_close; if (mcastreq) setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq)); if (closesock() != 0) { THROWJSOCKEXCEPTION(ERRNO()); } } } size32_t CSocket::get_max_send_size() { size32_t maxsend=0; socklen_t size = sizeof(maxsend); #if _WIN32 getsockopt(sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char *) &maxsend, &size); #else getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size); // not the same but closest I can find #endif return maxsend; } size32_t CSocket::get_send_buffer_size() { size32_t maxsend=0; socklen_t size = sizeof(maxsend); getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size); return maxsend; } void CSocket::set_send_buffer_size(size32_t maxsend) { if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&maxsend, sizeof(maxsend))!=0) { LOGERR2(ERRNO(),1,"setsockopt(SO_SNDBUF)"); } #ifdef CHECKBUFSIZE size32_t v; if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&v, sizeof(v))!=0) { LOGERR2(ERRNO(),1,"getsockopt(SO_SNDBUF)"); } if (v!=maxsend) WARNLOG("set_send_buffer_size requested %d, got %d",maxsend,v); #endif } size32_t CSocket::get_receive_buffer_size() { size32_t max=0; socklen_t size = sizeof(max); getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *) &max, &size); return max; } void CSocket::set_receive_buffer_size(size32_t max) { if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&max, sizeof(max))!=0) { LOGERR2(ERRNO(),1,"setsockopt(SO_RCVBUF)"); } #ifdef CHECKBUFSIZE size32_t v; if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&v, sizeof(v))!=0) { LOGERR2(ERRNO(),1,"getsockopt(SO_RCVBUF)"); } if (v sock = new CSocket(ep,sm_tcp_server,NULL); sock->open(listen_queue_size); return sock.getClear(); } ISocket* ISocket::create_ip(unsigned short p,const char *host,int listen_queue_size) { if (p==0) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); SocketEndpoint ep(host,p); Owned sock = new CSocket(ep,sm_tcp_server,host); sock->open(listen_queue_size); return sock.getClear(); } ISocket* ISocket::udp_create(unsigned short p) { SocketEndpoint ep; ep.port=p; Owned sock = new CSocket(ep,(p==0)?sm_udp:sm_udp_server,NULL); sock->open(0); return sock.getClear(); } ISocket* ISocket::multicast_create(unsigned short p, const char *mcip) { if (p==0) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); SocketEndpoint ep(mcip,p); Owned sock = new CSocket(ep,sm_multicast_server,mcip); sock->open(0,true); return sock.getClear(); } ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip) { if (p==0) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); SocketEndpoint ep(p, ip); StringBuffer tmp; Owned sock = new CSocket(ep,sm_multicast_server,ip.getIpText(tmp).str()); sock->open(0,true); return sock.getClear(); } ISocket* ISocket::udp_connect(unsigned short p, char const* name) { if (!name||!*name||(p==0)) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); SocketEndpoint ep(name, p); Owned sock = new CSocket(ep,sm_udp,name); sock->udpconnect(); return sock.getClear(); } ISocket* ISocket::udp_connect(const SocketEndpoint &ep) { Owned sock = new CSocket(ep,sm_udp,NULL); sock->udpconnect(); return sock.getClear(); } ISocket* ISocket::multicast_connect(unsigned short p, char const* mcip, unsigned _ttl) { if (p==0) THROWJSOCKEXCEPTION2(JSOCKERR_bad_address); SocketEndpoint ep(mcip,p); return multicast_connect(ep, _ttl); } ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl) { Owned sock = new CSocket(ep,sm_multicast,NULL); sock->udpconnect(); u_char ttl = _ttl; setsockopt(sock->OShandle(), IPPROTO_IP, IP_MULTICAST_TTL, (char *) &ttl, sizeof(ttl)); return sock.getClear(); } ISocket* ISocket::attach(int s, bool tcpip) { CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false); return sock; } bool isInterfaceIp(const IpAddress &ip, const char *ifname) { #ifdef _WIN32 return false; #else int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD if (fd<0) return false; MemoryAttr ma; char *buf = (char *)ma.allocate(1024); struct ifconf ifc; ifc.ifc_len = 1024; ifc.ifc_buf = buf; if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces return false; struct ifreq *ifr = ifc.ifc_req; unsigned n = ifc.ifc_len/sizeof(struct ifreq); bool match = false; for(unsigned i=0; iifr_name,ifname)) continue; IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr))); if (ip.ipequals(iptest)) { match = true; break; } } close(fd); return match; #endif } bool getInterfaceIp(IpAddress &ip,const char *ifname) { #ifdef _WIN32 return false; #else ip.ipset(NULL); int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD if (fd<0) return false; MemoryAttr ma; char *buf = (char *)ma.allocate(1024); struct ifconf ifc; ifc.ifc_len = 1024; ifc.ifc_buf = buf; if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces return false; struct ifreq *ifr = ifc.ifc_req; unsigned n = ifc.ifc_len/sizeof(struct ifreq); for (int loopback = 0; loopback <= 1; loopback++) { for (int i=0; iifr_name,ifname)) continue; IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr))); if (iptest.isLoopBack() == useLoopback) { if (ip.isNull()) ip.ipset(iptest); else if (!PreferredSubnet.isNull()&&!PreferredSubnet.test(ip)&&PreferredSubnet.test(iptest)) ip.ipset(iptest); } } if (!ip.isNull()) break; } close(fd); return !ip.isNull(); #endif } static StringAttr cachehostname; static IpAddress cachehostip; static IpAddress localhostip; static CriticalSection hostnamesect; static StringBuffer EnvConfPath; const char * GetCachedHostName() { CriticalBlock c(hostnamesect); if (!cachehostname.get()) { #ifndef _WIN32 StringBuffer ifs; IpAddress ip; if (EnvConfPath.length() == 0) EnvConfPath.append(CONFIG_DIR).append(PATHSEPSTR).append("environment.conf"); Owned conf = createProperties(EnvConfPath.str(), true); if (conf->getProp("interface", ifs) && ifs.length()) { if (getInterfaceIp(ip, ifs.str())) { StringBuffer ips; ip.getIpText(ips); if (ips.length()) { cachehostname.set(ips.str()); cachehostip.ipset(ip); return cachehostname.get(); } } } #endif char temp[1024]; if (gethostname(temp, sizeof(temp))==0) cachehostname.set(temp); else cachehostname.set("localhost"); // assume no NIC card } return cachehostname.get(); } IpAddress & queryLocalIP() { CriticalBlock c(hostnamesect); if (localhostip.isNull()) if (IP6preferred) localhostip.ipset("::1"); //IPv6 else localhostip.ipset("127.0.0.1"); //IPv4 return localhostip; } IpAddress & queryHostIP() { CriticalBlock c(hostnamesect); if (cachehostip.isNull()) { if (!cachehostip.ipset(GetCachedHostName())) { cachehostip.ipset(queryLocalIP()); printf("hostname %s not resolved, using localhost\n",GetCachedHostName()); // don't use jlog in case recursive } } return cachehostip; } IpAddress &GetHostIp(IpAddress &ip) { ip.ipset(queryHostIP()); return ip; } IpAddress &localHostToNIC(IpAddress &ip) { if (ip.isLoopBack()) GetHostIp(ip); return ip; } // IpAddress inline bool isIp4(const unsigned *netaddr) { if (IP4only) return true; if (netaddr[2]==0xffff0000) return (netaddr[1]==0)&&(netaddr[0]==0); if (netaddr[2]==0) if ((netaddr[3]==0)&&(netaddr[0]==0)&&(netaddr[1]==0)) return true; // null address // maybe should get loopback here return false; } bool IpAddress::isIp4() const { return ::isIp4(netaddr); } bool IpAddress::isNull() const { return (netaddr[3]==0)&&(IP4only||((netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0))); } bool IpAddress::isLoopBack() const { if (::isIp4(netaddr)&&((netaddr[3] & 0x000000ff)==0x000007f)) return true; return (netaddr[3]==0)&&(netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==1); } bool IpAddress::isLocal() const { if (isLoopBack() || isHost()) return true; IpAddress ip(*this); return isInterfaceIp(ip, NULL); } bool IpAddress::isLinkLocal() const { return (netaddr[0]&&0x3ff==0x3fa)&&(netaddr[2]==0); } bool IpAddress::isSiteLocal() const // depreciated { if (::isIp4(netaddr)) { switch (netaddr[3]&0xff) { case 10: return true; case 192: return ((netaddr[3]&0xff00)==0xc000); case 172: return ((netaddr[3]&0x0f00)==0x0100); } return false; } return (netaddr[0]&&0x3ff==0x3fb)&&(netaddr[2]==0); } bool IpAddress::ipequals(const IpAddress & other) const { // reverse compare for speed return (other.netaddr[3]==netaddr[3])&&(IP4only||((other.netaddr[2]==netaddr[2])&&(other.netaddr[1]==netaddr[1])&&(other.netaddr[0]==netaddr[0]))); } int IpAddress::ipcompare(const IpAddress & other) const { return memcmp(&netaddr, &other.netaddr, sizeof(netaddr)); } unsigned IpAddress::iphash(unsigned prev) const { return hashc((const byte *)&netaddr,sizeof(netaddr),prev); } bool IpAddress::isHost() const { return ipequals(queryHostIP()); } static bool decodeNumericIP(const char *text,unsigned *netaddr) { if (!text) return false; bool isv6 = strchr(text,':')!=NULL; StringBuffer tmp; if ((*text=='[')&&!IP4only) { text++; size32_t l = strlen(text); if ((l<=2)||(text[l-1]!=']')) return false; text = tmp.append(l-2,text); } if (!isv6&&isdigit(text[0])) { if (_inet_pton(AF_INET, text, &netaddr[3])>0) { netaddr[2] = netaddr[3]?0xffff0000:0; // check for NULL netaddr[1] = 0; netaddr[0] = 0; // special handling for loopback? return true; } } else if (isv6&&!IP4only) { int ret = _inet_pton(AF_INET6, text, netaddr); if (ret>=0) return (ret>0); int err = ERRNO(); StringBuffer tmp("_inet_pton: "); tmp.append(text); LOGERR(err,1,tmp.str()); } return false; } static bool lookupHostAddress(const char *name,unsigned *netaddr) { // if IP4only or using MS V6 can only resolve IPv4 using static bool recursioncheck = false; // needed to stop error message recursing unsigned retry=10; #if defined(__linux__) || defined(getaddrinfo) if (IP4only) { #else { #endif CriticalBlock c(hostnamesect); hostent * entry = gethostbyname(name); while (entry==NULL) { if (retry--==0) { if (!recursioncheck) { recursioncheck = true; LogErr(h_errno,1,"gethostbyname failed",__LINE__,name); recursioncheck = false; } return false; } { CriticalUnblock ub(hostnamesect); Sleep((10-retry)*100); } entry = gethostbyname(name); } if (entry->h_addr_list[0]) { unsigned ptr = 0; if (!PreferredSubnet.isNull()) { loop { ptr++; if (entry->h_addr_list[ptr]==NULL) { ptr = 0; break; } IpAddress ip; ip.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]); if (PreferredSubnet.test(ip)) break; } } memcpy(&netaddr[3], entry->h_addr_list[ptr], sizeof(netaddr[3])); netaddr[2] = 0xffff0000; netaddr[1] = 0; netaddr[0] = 0; return true; } return false; } #if defined(__linux__) || defined(getaddrinfo) struct addrinfo hints; memset(&hints,0,sizeof(hints)); struct addrinfo *addrInfo = NULL; loop { memset(&hints,0,sizeof(hints)); int ret = getaddrinfo(name, NULL , &hints, &addrInfo); if (!ret) break; if (retry--==0) { if (!recursioncheck) { recursioncheck = true; LogErr(ret,1,"getaddrinfo failed",__LINE__,name); #ifdef _DEBUG PrintStackReport(); #endif recursioncheck = false; } return false; } Sleep((10-retry)*100); } struct addrinfo *best = NULL; bool snm = !PreferredSubnet.isNull(); loop { struct addrinfo *ai; for (ai = addrInfo; ai; ai = ai->ai_next) { // printf("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s\n",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL"); switch (ai->ai_family) { case AF_INET: { if (snm) { IpAddress ip; ip.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr)); if (!PreferredSubnet.test(ip)) continue; } if ((best==NULL)||((best->ai_family==AF_INET6)&&!IP6preferred)) best = ai; break; } case AF_INET6: { if (snm) { IpAddress ip; ip.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr)); if (!PreferredSubnet.test(ip)) continue; } if ((best==NULL)||((best->ai_family==AF_INET)&&IP6preferred)) best = ai; break; } } } if (best||!snm) break; snm = false; } if (best) { if (best->ai_family==AF_INET6) memcpy(netaddr,&(((sockaddr_in6 *)best->ai_addr)->sin6_addr),sizeof(netaddr)); else { memcpy(netaddr+3,&(((sockaddr_in *)best->ai_addr)->sin_addr),sizeof(netaddr[3])); netaddr[2] = 0xffff0000; netaddr[1] = 0; netaddr[0] = 0; } } freeaddrinfo(addrInfo); return best!=NULL; #endif return false; } bool IpAddress::ipset(const char *text) { if (text&&*text) { if ((text[0]=='.')&&(text[1]==0)) { ipset(queryHostIP()); return true; } if (decodeNumericIP(text,netaddr)) return true; const char *s; for (s=text;*s;s++) if (!isdigit(*s)&&(*s!=':')&&(*s!='.')) break; if (!*s) return ipset(NULL); if (lookupHostAddress(text,netaddr)) return true; } memset(&netaddr,0,sizeof(netaddr)); return false; } inline char * addbyte(char *s,byte b) { if (b>=100) { *(s++) = b/100+'0'; b %= 100; *(s++) = b/10+'0'; b %= 10; } else if (b>=10) { *(s++) = b/10+'0'; b %= 10; } *(s++) = b+'0'; return s; } StringBuffer & IpAddress::getIpText(StringBuffer & out) const { if (::isIp4(netaddr)) { const byte *ip = (const byte *)&netaddr[3]; char ips[16]; char *s = ips; for (unsigned i=0;i<4;i++) { if (i) *(s++) = '.'; s = addbyte(s,ip[i]); } return out.append(s-ips,ips); } char tmp[INET6_ADDRSTRLEN]; const char *res = _inet_ntop(AF_INET6, &netaddr, tmp, sizeof(tmp)); if (!res) throw MakeOsException(errno); return out.append(res); } void IpAddress::ipserialize(MemoryBuffer & out) const { if (((netaddr[2]==0xffff0000)||(netaddr[2]==0))&&(netaddr[1]==0)&&(netaddr[0]==0)) { if (netaddr[3]==IPV6_SERIALIZE_PREFIX) throw MakeStringException(-1,"Invalid network address"); // hack prevention out.append(sizeof(netaddr[3]), &netaddr[3]); } else { unsigned pfx = IPV6_SERIALIZE_PREFIX; out.append(sizeof(pfx),&pfx).append(sizeof(netaddr),&netaddr); } } void IpAddress::ipdeserialize(MemoryBuffer & in) { unsigned pfx; in.read(sizeof(pfx),&pfx); if (pfx!=IPV6_SERIALIZE_PREFIX) { netaddr[0] = 0; netaddr[1] = 0; netaddr[2] = (pfx>0x1000000)?0xffff0000:0; // catch null and loopback netaddr[3] = pfx; } else in.read(sizeof(netaddr),&netaddr); } unsigned IpAddress::ipdistance(const IpAddress &other,unsigned offset) const { if (offset>3) offset = 3; int i1; _cpyrev4(&i1,&netaddr[3-offset]); int i2; _cpyrev4(&i2,&other.netaddr[3-offset]); i1-=i2; if (i1>0) return i1; return -i1; } bool IpAddress::ipincrement(unsigned count,byte minoctet,byte maxoctet,unsigned short minipv6piece,unsigned maxipv6piece) { unsigned base; if (::isIp4(netaddr)) { base = maxoctet-minoctet+1; if (!base||(base>256)) return false; byte * ips = (byte *)&netaddr[3]; byte * ip = ips+4; while (count) { if (ip==ips) return false; // overflow ip--; unsigned v = (count+((*ip>minoctet)?(*ip-minoctet):0)); *ip = minoctet + v%base; count = v/base; } } else { base = maxipv6piece-minipv6piece+1; if (!base||(base>0x10000)) return false; unsigned short * ps = (unsigned short *)&netaddr; unsigned short * p = ps+8; while (count) { if (p==ps) return false; // overflow (actually near impossible!) p--; unsigned v = (count+((*p>minipv6piece)?(*p-minipv6piece):0)); *p = minipv6piece + v%base; count = v/base; } } return true; } unsigned IpAddress::ipsetrange( const char *text) // e.g. 10.173.72.1-65 ('-' may be omitted) { unsigned e=0; unsigned f=0; const char *r = strchr(text,'-'); bool ok; if (r) { e = atoi(r+1); StringBuffer tmp(r-text,text); ok = ipset(tmp.str()); if (!::isIp4(netaddr)) IPV6_NOT_IMPLEMENTED(); // TBD IPv6 if (ok) { while ((r!=text)&&(*(r-1)!='.')) r--; f = (r!=text)?atoi(r):0; } } else ok = ipset(text); if ((f>e)||!ok) return 0; return e-f+1; } size32_t IpAddress::getNetAddress(size32_t maxsz,void *dst) const { if (maxsz==sizeof(unsigned)) { if (::isIp4(netaddr)) { *(unsigned *)dst = netaddr[3]; return maxsz; } } else if (!IP4only&&(maxsz==sizeof(netaddr))) { memcpy(dst,&netaddr,maxsz); return maxsz; } return 0; } void IpAddress::setNetAddress(size32_t sz,const void *src) { if (sz==sizeof(unsigned)) { // IPv4 netaddr[0] = 0; netaddr[1] = 0; netaddr[2]=0xffff0000; netaddr[3] = *(const unsigned *)src; } else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6 memcpy(&netaddr,src,sz); if ((netaddr[2]==0)&&(netaddr[3]>0x1000000)&&(netaddr[0]==0)&&(netaddr[1]==0)) netaddr[2]=0xffff0000; // use this form only } else memset(&netaddr,0,sizeof(netaddr)); } void SocketEndpoint::deserialize(MemoryBuffer & in) { ipdeserialize(in); in.read(port); } void SocketEndpoint::serialize(MemoryBuffer & out) const { ipserialize(out); out.append(port); } bool SocketEndpoint::set(const char *name,unsigned short _port) { if (name) { if (*name=='[') { const char *s = name+1; const char *t = strchr(s,']'); if (t) { StringBuffer tmp(t-s,s); if (t[1]==':') _port = atoi(t+2); return set(tmp.str(),_port); } } const char * colon = strchr(name, ':'); if (colon) { if (!IP4only&&strchr(colon+1, ':')) colon = NULL; // hello its IpV6 } else colon = strchr(name, '|'); // strange hole convention char ips[260]; if (colon) { size32_t l = colon-name; if (l>=sizeof(ips)) l = sizeof(ips)-1; memcpy(ips,name,l); ips[l] = 0; name = ips; _port = atoi(colon+1); } if (ipset(name)) { port = _port; return true; } } ipset(NULL); port = 0; return false; } void SocketEndpoint::getUrlStr(char * str, size32_t len) const { if (len==0) return; StringBuffer _str; getUrlStr(_str); size32_t l = _str.length()+1; if (l>len) { l = len-1; str[l] = 0; } memcpy(str,_str.toCharArray(),l); } StringBuffer &SocketEndpoint::getUrlStr(StringBuffer &str) const { getIpText(str); if (port) str.append(':').append((unsigned)port); // TBD IPv6 put [] on return str; } unsigned SocketEndpoint::hash(unsigned prev) const { return hashc((const byte *)&port,sizeof(port),iphash(prev)); } //--------------------------------------------------------------------------- SocketListCreator::SocketListCreator() { lastPort = 0; } void SocketListCreator::addSocket(const SocketEndpoint &ep) { StringBuffer ipstr; ep.getIpText(ipstr); addSocket(ipstr.str(), ep.port); } void SocketListCreator::addSocket(const char * ip, unsigned port) { if (fullText.length()) fullText.append("|"); const char * prev = lastIp; const char * startCopy = ip; if (prev) { if (strcmp(ip, prev) == 0) { fullText.append("="); startCopy = NULL; } else { const char * cur = ip; loop { char n = *cur; if (!n) break; if (n != *prev) break; cur++; prev++; if (n == '.') startCopy = cur; } if (startCopy != ip) fullText.append("*"); } } fullText.append(startCopy); if (lastPort != port) fullText.append(":").append(port); lastIp.set(ip); lastPort = port; } const char * SocketListCreator::getText() { return fullText.str(); } void SocketListCreator::addSockets(SocketEndpointArray &array) { ForEachItemIn(i,array) { SocketEndpoint &sockep=array.item(i); StringBuffer ipstr; sockep.getIpText(ipstr); addSocket(ipstr.str(),sockep.port); } } //--------------------------------------------------------------------------- SocketListParser::SocketListParser(const char * text) { fullText.set(text); cursor = NULL; lastPort = 0; } void SocketListParser::first(unsigned port) { cursor = fullText; lastIp.set(NULL); lastPort = port; } bool SocketListParser::get(StringAttr & ip, unsigned & port, unsigned index, unsigned defport) { first(defport); do { if (!next(ip, port)) return false; } while (index--); return true; } bool SocketListParser::next(StringAttr & ip, unsigned & port) { // IPV6TBD StringBuffer ipText; if (*cursor == 0) return false; if (*cursor == '=') { ipText.append(lastIp); cursor++; } else if (*cursor == '*') { cursor++; //count the number of dots in the tail const char * cur = cursor; unsigned count = 0; loop { char c = *cur++; switch (c) { case 0: case '|': case ',': case ':': goto done; case '.': ++count; break; } } done: //copy up to the appropriate dot from the previous ip. const unsigned dotCount = 3; //more what about 6 digit ip's cur = lastIp; loop { char c = *cur++; switch (c) { case 0: case '|': case ',': case ':': assertex(!"Should not get here!"); goto done2; case '.': ipText.append(c); if (++count == dotCount) goto done2; break; default: ipText.append(c); break; } } done2:; } bool inPort = false; port = lastPort; loop { char c = *cursor++; switch (c) { case 0: cursor--; goto doneCopy; case '|': case ',': goto doneCopy; case ':': port = atoi(cursor); inPort = true; break;; default: if (!inPort) ipText.append(c); break; } } doneCopy: lastIp.set(ipText.str()); ip.set(lastIp); lastPort = port; return true; } unsigned SocketListParser::getSockets(SocketEndpointArray &array,unsigned defport) { first(defport); StringAttr ip; unsigned port; while (next(ip,port)) { SocketEndpoint ep(ip,port); array.append(ep); } return array.ordinality(); } void getSocketStatistics(JSocketStatistics &stats) { // should put in simple lock memcpy(&stats,&STATS,sizeof(stats)); } void resetSocketStatistics() { unsigned activesockets=STATS.activesockets; memset(&STATS,0,sizeof(STATS)); STATS.activesockets = activesockets; } static StringBuffer &appendtime(StringBuffer &s,unsigned us) { // attemp to get into more sensible units if (us>10000000) return s.append(us/1000000).append('s'); if (us>10000) return s.append(us/1000).append("ms"); return s.append(us).append("us"); } StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &str) { str.append("connects=").append(stats.connects).append('\n'); appendtime(str.append("connecttime="),stats.connecttime).append('\n'); str.append("failedconnects=").append(stats.failedconnects).append('\n'); appendtime(str.append("failedconnecttime="),stats.failedconnecttime).append('\n'); str.append("reads=").append(stats.reads).append('\n'); appendtime(str.append("readtime="),stats.readtime).append('\n'); str.append("readsize=").append(stats.readsize).append(" bytes\n"); str.append("writes=").append(stats.writes).append('\n'); appendtime(str.append("writetime="),stats.writetime).append('\n'); str.append("writesize=").append(stats.writesize).append(" bytes").append('\n'); str.append("activesockets=").append(stats.activesockets).append('\n'); str.append("numblockrecvs=").append(stats.numblockrecvs).append('\n'); str.append("numblocksends=").append(stats.numblocksends).append('\n'); str.append("blockrecvsize=").append(stats.blockrecvsize).append('\n'); str.append("blocksendsize=").append(stats.blocksendsize).append('\n'); str.append("blockrecvtime=").append(stats.blockrecvtime).append('\n'); str.append("blocksendtime=").append(stats.blocksendtime).append('\n'); str.append("longestblocksend=").append(stats.longestblocksend).append('\n'); str.append("longestblocksize=").append(stats.longestblocksize); return str; } // =============================================================================== // select thread for handling multiple selects struct SelectItem { ISocket *sock; T_SOCKET handle; ISocketSelectNotify *nfy; byte mode; bool del; }; inline SelectItem &Array__Member2Param(SelectItem &src) { return src; } inline void Array__Assign(SelectItem & dest, SelectItem &src) { dest=src; } inline bool Array__Equal(SelectItem &m, SelectItem &p) { return m.sock==p.sock; } inline void Array__Destroy(SelectItem &p) { } class SelectItemArray : public ArrayOf { }; #define SELECT_TIMEOUT_SECS 1 // but it does (TBD) #ifdef _WIN32 // fd_set utility functions inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src) { unsigned i = src.fd_count; dst.fd_count = i; while (i--) dst.fd_array[i] = src.fd_array[i]; // possibly better as memcpy return &dst; } inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c) { unsigned n = s.fd_count; unsigned i; for(i=0;i=XFD_SETSIZE) return false; return FD_ISSET(h,&s); // does not remove entry or set termination flag when done } #endif class CSocketSelectThread: public Thread { bool terminating; CriticalSection sect; Semaphore ticksem; atomic_t tickwait; SelectItemArray items; unsigned offset; bool selectvarschange; unsigned waitingchange; Semaphore waitingchangesem; int validateselecterror; unsigned validateerrcount; const char *selecttrace; unsigned basesize; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER T_SOCKET dummysock[2]; #else T_SOCKET dummysock; #endif bool dummysockopen; void opendummy() { CriticalBlock block(sect); if (!dummysockopen) { #ifdef _USE_PIPE_FOR_SELECT_TRIGGER if(pipe(dummysock)) { WARNLOG("CSocketSelectThread: create pipe failed %d",ERRNO()); return; } for (unsigned i=0;i<2;i++) { int flags = fcntl(dummysock[i], F_GETFL, 0); if (flags!=-1) { flags |= O_NONBLOCK; fcntl(dummysock[i], F_SETFL, flags); } flags = fcntl(dummysock[i], F_GETFD, 0); if (flags!=-1) { flags |= FD_CLOEXEC; fcntl(dummysock[i], F_SETFD, flags); } } CHECKSOCKRANGE(dummysock[0]); #else if (IP6preferred) dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6); else dummysock = ::socket(AF_INET, SOCK_STREAM, 0); CHECKSOCKRANGE(dummysock); #endif dummysockopen = true; } } void closedummy() { CriticalBlock block(sect); if (dummysockopen) { #ifdef _USE_PIPE_FOR_SELECT_TRIGGER #ifdef SOCKTRACE PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%x)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this); #endif ::close(dummysock[0]); ::close(dummysock[1]); #else #ifdef _WIN32 ::closesocket(dummysock); #else ::close(dummysock); #endif #endif dummysockopen = false; } } void triggerselect() { if (atomic_read(&tickwait)) ticksem.signal(); #ifdef _USE_PIPE_FOR_SELECT_TRIGGER CriticalBlock block(sect); char c = 0; write(dummysock[1], &c, 1); #else closedummy(); #endif } void resettrigger() { #ifdef _USE_PIPE_FOR_SELECT_TRIGGER CriticalBlock block(sect); char c; while((::read(dummysock[0], &c, sizeof(c))) == sizeof(c)); #endif } #ifdef _WIN32 #define HASHTABSIZE 256 #define HASHNULL (HASHTABSIZE-1) #define HASHTABMASK (HASHTABSIZE-1) byte hashtab[HASHTABSIZE]; #define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles void inithash() { memset(&hashtab,HASHNULL,sizeof(hashtab)); assertex(FD_SETSIZE<255); } void reinithash() { // done this way because index of items changes and hash table not that big inithash(); assertex(items.ordinality()Release(); si.nfy->Release(); } catch (IException *e) { EXCLOG(e,"~CSocketSelectThread"); e->Release(); } } } Owned termexcept; void updateItems() { // must be in CriticalBlock block(sect); unsigned n = items.ordinality(); bool hashupdateneeded = (n!=basesize); // additions all come at end for (unsigned i=0;iRelease(); try { #ifdef SOCKTRACE PROGLOG("CSocketSelectThread::updateItems release %d",si.handle); #endif si.sock->Release(); } catch (IException *e) { EXCLOG(e,"CSocketSelectThread::updateItems"); e->Release(); } n--; if (i=XFD_SETSIZE-1) // leave 1 spare return false; SelectItem sn; sn.nfy = LINK(nfy); sn.sock = LINK(sock); sn.mode = (byte)mode; sn.handle = (T_SOCKET)sock->OShandle(); CHECKSOCKRANGE(sn.handle); sn.del = false; items.append(sn); selectvarschange = true; triggerselect(); return true; } bool remove(ISocket *sock) { if (terminating) return false; CriticalBlock block(sect); if (sock==NULL) { // wait until no changes outstanding while (selectvarschange) { waitingchange++; CriticalUnblock unblock(sect); waitingchangesem.wait(); } return true; } ForEachItemIn(i,items) { SelectItem &si = items.item(i); if (!si.del&&(si.sock==sock)) { si.del = true; selectvarschange = true; triggerselect(); return true; } } return false; } void stop(bool wait) { terminating = true; triggerselect(); if (wait) join(); } bool sockOk(T_SOCKET sock) { PROGLOG("CSocketSelectThread: sockOk testing %d",sock); int err = 0; int t=0; socklen_t tl = sizeof(t); if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) { StringBuffer sockstr; const char *tracename = sockstr.append((unsigned)sock).str(); LOGERR2(ERRNO(),1,"CSocketSelectThread select handle"); return false; } T_FD_SET fds; struct timeval tv; XFD_ZERO(&fds); FD_SET((unsigned)sock, &fds); //FD_SET((unsigned)sock, &except); tv.tv_sec = 0; tv.tv_usec = 0; CHECKSOCKRANGE(sock); int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv ); if (rc<0) { StringBuffer sockstr; const char *tracename = sockstr.append((unsigned)sock).str(); LOGERR2(ERRNO(),2,"CSocketSelectThread select handle"); return false; } else if (rc>0) PROGLOG("CSocketSelectThread: select handle %d selected(2) %d",sock,rc); XFD_ZERO(&fds); FD_SET((unsigned)sock, &fds); tv.tv_sec = 0; tv.tv_usec = 0; rc = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv ); if (rc<0) { StringBuffer sockstr; const char *tracename = sockstr.append((unsigned)sock).str(); LOGERR2(ERRNO(),3,"CSocketSelectThread select handle"); return false; } else if (rc>0) PROGLOG("CSocketSelectThread: select handle %d selected(2) %d",sock,rc); return true; } bool checkSocks() { bool ret = false; ForEachItemIn(i,items) { SelectItem &si = items.item(i); if (si.del) ret = true; // maybe that bad one else if (!sockOk(si.handle)) { si.del = true; ret = true; } } return ret; } void updateSelectVars(T_FD_SET &rdfds,T_FD_SET &wrfds,T_FD_SET &exfds,bool &isrd,bool &iswr,bool &isex,unsigned &ni,T_SOCKET &max_sockid) { CriticalBlock block(sect); selectvarschange = false; if (waitingchange) { waitingchangesem.signal(waitingchange); waitingchange = 0; } if (validateselecterror) { // something went wrong so check sockets validateerrcount++; if (!checkSocks()) { // bad socket not found PROGLOG("CSocketSelectThread::updateSelectVars cannot find socket error"); if (validateerrcount>10) throw MakeStringException(-1,"CSocketSelectThread:Socket select error %d",validateselecterror); } } else validateerrcount = 0; updateItems(); XFD_ZERO( &rdfds ); XFD_ZERO( &wrfds ); XFD_ZERO( &exfds ); isrd=false; iswr=false; isex=false; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER max_sockid=dummysockopen?dummysock[0]:0; #else opendummy(); max_sockid=dummysockopen?dummysock:0; #endif ni = items.ordinality(); #ifdef _WIN32 if (offset>=ni) #endif offset = 0; unsigned j=offset; ForEachItemIn(i,items) { SelectItem &si = items.item(j); j++; if (j==ni) j = 0; if (si.mode & SELECTMODE_READ) { FD_SET( si.handle, &rdfds ); isrd = true; } if (si.mode & SELECTMODE_WRITE) { FD_SET( si.handle, &wrfds ); iswr = true; } if (si.mode & SELECTMODE_EXCEPT) { FD_SET( si.handle, &exfds ); isex = true; } max_sockid=std::max(si.handle, max_sockid); } if (dummysockopen) { #ifdef _USE_PIPE_FOR_SELECT_TRIGGER FD_SET( dummysock[0], &rdfds ); isrd = true; #else FD_SET( dummysock, &exfds ); isex = true; #endif } validateselecterror = 0; max_sockid++; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: selecting on %d sockets",ni); #endif } int run() { try { T_FD_SET rdfds; T_FD_SET wrfds; T_FD_SET exfds; timeval selecttimeout; bool isrd; bool iswr; bool isex; T_SOCKET maxsockid; unsigned ni; selectvarschange = true; unsigned numto = 0; unsigned lastnumto = 0; unsigned totnum = 0; unsigned total = 0; while (!terminating) { selecttimeout.tv_sec = SELECT_TIMEOUT_SECS; // linux modifies so initialize inside loop selecttimeout.tv_usec = 0; if (selectvarschange) { updateSelectVars(rdfds,wrfds,exfds,isrd,iswr,isex,ni,maxsockid); } if (ni==0) { validateerrcount = 0; atomic_inc(&tickwait); if(!selectvarschange&&!terminating) ticksem.wait(SELECT_TIMEOUT_SECS*1000); atomic_dec(&tickwait); continue; } T_FD_SET rs; T_FD_SET ws; T_FD_SET es; T_FD_SET *rsp = isrd?cpyfds(rs,rdfds):NULL; T_FD_SET *wsp = iswr?cpyfds(ws,wrfds):NULL; T_FD_SET *esp = isex?cpyfds(es,exfds):NULL; int n = ::select(maxsockid,(fd_set *)rsp,(fd_set *)wsp,(fd_set *)esp,&selecttimeout); // first parameter needed for posix if (terminating) break; if (n < 0) { CriticalBlock block(sect); int err = ERRNO(); if (err != EINTRCALL) { if (dummysockopen) { LOGERR(err,12,"CSocketSelectThread select error"); // should cache error ? validateselecterror = err; #ifndef _USE_PIPE_FOR_SELECT_TRIGGER closedummy(); // just in case was culprit #endif } selectvarschange = true; continue; } n = 0; } else if (n>0) { validateerrcount = 0; numto = 0; lastnumto = 0; total += n; totnum++; SelectItemArray tonotify; { CriticalBlock block(sect); #ifdef _WIN32 if (isrd) processfds(rs,SELECTMODE_READ,tonotify); if (iswr) processfds(ws,SELECTMODE_WRITE,tonotify); if (isex) processfds(es,SELECTMODE_EXCEPT,tonotify); #else unsigned i; SelectItem *si = items.getArray(offset); SelectItem *sie = items.getArray(ni-1)+1; bool r = isrd; bool w = iswr; bool e = isex; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER if (r&&dummysockopen&&findfds(rs,dummysock[0],r)) { resettrigger(); --n; } #endif for (i=0;(n>0)&&(ihandle,r)) { if (!si->del) { tonotify.append(*si); tonotify.item(tonotify.length()-1).mode = SELECTMODE_READ; } --n; } if (w&&findfds(ws,si->handle,w)) { if (!si->del) { tonotify.append(*si); tonotify.item(tonotify.length()-1).mode = SELECTMODE_WRITE; } --n; } if (e&&findfds(es,si->handle,e)) { if (!si->del) { tonotify.append(*si); tonotify.item(tonotify.length()-1).mode = SELECTMODE_EXCEPT; } --n; } si++; if (si==sie) si = items.getArray(); } #endif } ForEachItemIn(j,tonotify) { SelectItem &si = tonotify.item(j); try { si.nfy->notifySelected(si.sock,si.mode); // ignore return } catch (IException *e) { // should be acted upon by notifySelected EXCLOG(e,"CSocketSelectThread notifySelected"); throw ; } } } else { validateerrcount = 0; if ((++numto>=lastnumto*2)) { lastnumto = numto; if (selecttrace&&(numto>4)) PROGLOG("%s: Select Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0); } /* if (numto&&(numto%100)) { CriticalBlock block(sect); if (!selectvarschange) selectvarschange = checkSocks(); } */ } if (++offset>=ni) offset = 0; } } catch (IException *e) { EXCLOG(e,"CSocketSelectThread"); termexcept.setown(e); } CriticalBlock block(sect); try { updateItems(); } catch (IException *e) { EXCLOG(e,"CSocketSelectThread(2)"); if (!termexcept) termexcept.setown(e); else e->Release(); } return 0; } }; class CSocketSelectHandler: public CInterface, implements ISocketSelectHandler { CIArrayOf threads; CriticalSection sect; bool started; StringAttr selecttrace; public: IMPLEMENT_IINTERFACE; CSocketSelectHandler(const char *trc) : selecttrace(trc) { started = false; } void start() { CriticalBlock block(sect); if (!started) { started = true; ForEachItemIn(i,threads) { threads.item(i).start(); } } } void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy) { CriticalBlock block(sect); loop { bool added=false; ForEachItemIn(i,threads) { if (added) threads.item(i).remove(sock); else added = threads.item(i).add(sock,mode,nfy); } if (added) return; CSocketSelectThread *thread = new CSocketSelectThread(selecttrace); threads.append(*thread); if (started) thread->start(); } } void remove(ISocket *sock) { CriticalBlock block(sect); ForEachItemIn(i,threads) { if (threads.item(i).remove(sock)&&sock) break; } } void stop(bool wait) { IException *e=NULL; CriticalBlock block(sect); unsigned i = 0; while (iread(&len, sizeof(len)); _WINREV4(len); if (len) { void * target = buffer.reserve(len); socket->read(target, len); } } void readBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms) { size32_t len; size32_t sizeGot; socket->readtms(&len, sizeof(len), sizeof(len), sizeGot, timeoutms); _WINREV4(len); if (len) { void * target = buffer.reserve(len); socket->readtms(target, len, len, sizeGot, timeoutms); } } void writeBuffer(ISocket * socket, MemoryBuffer & buffer) { unsigned len = buffer.length(); _WINREV4(len); socket->write(&len, sizeof(len)); if (len) socket->write(buffer.toByteArray(), buffer.length()); } bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer) { try { readBuffer(socket, buffer); return true; } catch (IException * e) { switch (e->errorCode()) { case JSOCKERR_graceful_close: break; default: EXCLOG(e,"catchReadBuffer"); break; } e->Release(); } return false; } bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms) { try { readBuffer(socket, buffer, timeoutms); return true; } catch (IException * e) { switch (e->errorCode()) { case JSOCKERR_graceful_close: break; default: EXCLOG(e,"catchReadBuffer"); break; } e->Release(); } return false; } bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer) { try { writeBuffer(socket, buffer); return true; } catch (IException * e) { EXCLOG(e,"catchWriteBuffer"); e->Release(); } return false; } // utility interface for simple conversations // conversation is always between two ends, // at any given time one end must be receiving and other sending (though these may swap during the conversation) class CSingletonSocketConnection: public CInterface, implements IConversation { Owned sock; Owned listensock; enum { Snone, Saccept, Sconnect, Srecv, Ssend, Scancelled } state; bool accepting; bool cancelling; SocketEndpoint ep; CriticalSection crit; public: IMPLEMENT_IINTERFACE; CSingletonSocketConnection(SocketEndpoint &_ep) { ep = _ep; state = Snone; cancelling = false; } ~CSingletonSocketConnection() { try { if (sock) sock->close(); } catch (IException *e) { if (e->errorCode()!=JSOCKERR_graceful_close) EXCLOG(e,"CSingletonSocketConnection close"); e->Release(); } } bool connect(unsigned timeoutms) { CriticalBlock block(crit); if (cancelling) state = Scancelled; if (state==Scancelled) return false; assertex(!sock); ISocket *newsock=NULL; state = Sconnect; unsigned start; if (timeoutms!=(unsigned)INFINITE) start = msTick(); while (state==Sconnect) { try { CriticalUnblock unblock(crit); newsock = ISocket::connect_wait(ep,1000*60*4); break; } catch (IException * e) { if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) { e->Release(); if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) { state = Snone; return false; } } else { state = Scancelled; EXCLOG(e,"CSingletonSocketConnection::connect"); e->Release(); return false; } } } if (state!=Sconnect) { ::Release(newsock); newsock = NULL; } if (!newsock) { state = Scancelled; return false; } sock.setown(newsock); return true; } bool send(MemoryBuffer &mb) { CriticalBlock block(crit); if (cancelling) state = Scancelled; if (state==Scancelled) return false; assertex(sock); state = Srecv; try { CriticalUnblock unblock(crit); writeBuffer(sock,mb); } catch (IException * e) { state = Scancelled; EXCLOG(e,"CSingletonSocketConnection::send"); e->Release(); return false; } state = Snone; return true; } unsigned short setRandomPort(unsigned short base, unsigned num) { loop { try { ep.port = base+(unsigned short)(getRandom()%num); listensock.setown(ISocket::create(ep.port)); return ep.port; } catch (IException *e) { if (e->errorCode()!=JSOCKERR_port_in_use) { state = Scancelled; EXCLOG(e,"CSingletonSocketConnection::setRandomPort"); e->Release(); break; } e->Release(); } } return 0; } bool accept(unsigned timeoutms) { CriticalBlock block(crit); if (cancelling) state = Scancelled; if (state==Scancelled) return false; if (!sock) { ISocket *newsock=NULL; state = Saccept; loop { try { { CriticalUnblock unblock(crit); if (!listensock) listensock.setown(ISocket::create(ep.port)); if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) { state = Snone; return false; } } if (cancelling) state = Scancelled; if (state==Scancelled) return false; { CriticalUnblock unblock(crit); newsock=listensock->accept(true); break; } } catch (IException *e) { if (e->errorCode()==JSOCKERR_graceful_close) PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying..."); else { state = Scancelled; EXCLOG(e,"CSingletonSocketConnection::accept"); e->Release(); break; } e->Release(); } } if (state!=Saccept) { ::Release(newsock); newsock = NULL; } if (!newsock) { state = Scancelled; return false; } sock.setown(newsock); } return true; } bool recv(MemoryBuffer &mb, unsigned timeoutms) { CriticalBlock block(crit); if (cancelling) state = Scancelled; if (state==Scancelled) return false; assertex(sock); state = Srecv; try { CriticalUnblock unblock(crit); readBuffer(sock,mb,timeoutms); } catch (IException *e) { if (e->errorCode()==JSOCKERR_timeout_expired) state = Snone; else { state = Scancelled; if (e->errorCode()!=JSOCKERR_graceful_close) EXCLOG(e,"CSingletonSocketConnection::recv"); } e->Release(); return false; } state = Snone; return true; } virtual void cancel() { CriticalBlock block(crit); while (state!=Scancelled) { cancelling = true; try { switch (state) { case Saccept: { if (listensock) listensock->cancel_accept(); } break; case Sconnect: // wait for timeout break; case Srecv: { if (sock) sock->close(); } break; case Ssend: // wait for finished break; default: state = Scancelled; break; } } catch (IException *e) { EXCLOG(e,"CSingletonSocketConnection::cancel"); e->Release(); } { CriticalUnblock unblock(crit); Sleep(1000); } } } }; IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *_ep) { SocketEndpoint ep; if (_ep) ep = *_ep; if (port) ep.port = port; return new CSingletonSocketConnection(ep); } // interface for reading from multiple sockets using the BF_SYNC_TRANSFER_PUSH protocol class CSocketBufferReader: public CInterface, implements ISocketBufferReader { class SocketElem: public CInterface, implements ISocketSelectNotify { CSocketBufferReader *parent; unsigned num; // top bit used for ready MemoryAttr blk; CriticalSection sect; Linked sock; bool active; bool pending; public: IMPLEMENT_IINTERFACE; void init(CSocketBufferReader *_parent,ISocket *_sock,unsigned _n) { parent = _parent; num = _n; sock.set(_sock); active = true; pending = false; } virtual bool notifySelected(ISocket *socket,unsigned selected) { assertex(sock==socket); { CriticalBlock block(sect); if (pending) { active = false; parent->remove(sock); return false; } pending = true; unsigned t1=usTick(); size32_t sz = sock->receive_block_size(); unsigned t2=usTick(); if (sz) sock->receive_block(blk.allocate(sz),sz); else parent->remove(sock); unsigned t3=usTick(); if (t3-t1>60*1000000) PROGLOG("CSocketBufferReader(%d): slow receive_block (%d,%d) sz=%d",num,t2-t1,t3-t2,sz); } parent->enqueue(this); // nb outside sect critical block return false; // always return false } unsigned get(MemoryBuffer &mb) { CriticalBlock block(sect); assertex(pending); size32_t sz = blk.length(); if (sz) mb.setBuffer(sz,blk.detach(),true); pending = false; if (!active) { active = true; parent->add(*this); } return num; } size32_t size() { return blk.length(); } ISocket *getSocket() { return sock; } } *elems; SimpleInterThreadQueueOf readyq; Owned selecthandler; size32_t buffersize; size32_t buffermax; unsigned bufferwaiting; CriticalSection buffersect; Semaphore buffersem; bool isdone; public: IMPLEMENT_IINTERFACE; CSocketBufferReader(const char *trc) { selecthandler.setown(createSocketSelectHandler(trc)); elems = NULL; } ~CSocketBufferReader() { delete [] elems; } virtual void init(unsigned num,ISocket **sockets,size32_t _buffermax) { elems = new SocketElem[num]; for (unsigned i=0;istart(); } virtual unsigned get(MemoryBuffer &mb) { SocketElem &e = *readyq.dequeue(); CriticalBlock block(buffersect); assertex(buffersize>=e.size()); buffersize-=e.size(); if (bufferwaiting) { buffersem.signal(bufferwaiting); bufferwaiting = 0; } return e.get(mb); } virtual void done(bool wait) { buffersem.signal(0x10000); isdone = true; selecthandler->stop(wait); if (wait) { delete [] elems; elems = NULL; } } void enqueue(SocketElem *elem) { if (elem) { CriticalBlock block(buffersect); size32_t sz = elem->size(); while ((buffersize>0)&&(sz>0)&&(buffersize+sz>buffermax)) { if (isdone) return; bufferwaiting++; CriticalUnblock unblock(buffersect); buffersem.wait(); } buffersize += sz; } readyq.enqueue(elem); } void remove(ISocket *sock) { selecthandler->remove(sock); } void add(SocketElem &elem) { selecthandler->add(elem.getSocket(),SELECTMODE_READ,&elem); } }; ISocketBufferReader *createSocketBufferReader(const char *trc) { return new CSocketBufferReader(trc); } extern jlib_decl void markNodeCentral(SocketEndpoint &ep) { #ifdef CENTRAL_NODE_RANDOM_DELAY CriticalBlock block(CSocket::crit); CentralNodeArray.append(ep); #endif } static CSocket *prepareSocket(unsigned idx,const SocketEndpoint &ep, ISocketConnectNotify &inotify) { Owned sock = new CSocket(ep,sm_tcp,NULL); int err = sock->pre_connect(false); if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) return sock.getClear(); if (err==0) { int err = sock->post_connect(); if (err==0) inotify.connected(idx,ep,sock); else { sock->errclose(); inotify.failed(idx,ep,err); } } else inotify.failed(idx,ep,err); return NULL; } void multiConnect(const SocketEndpointArray &eps,ISocketConnectNotify &inotify,unsigned timeout) { class SocketElem: public CInterface, implements ISocketSelectNotify { CriticalSection *sect; ISocketSelectHandler *handler; unsigned *remaining; Semaphore *notifysem; ISocketConnectNotify *inotify; public: Owned sock; SocketEndpoint ep; unsigned idx; IMPLEMENT_IINTERFACE; void init(CSocket *_sock,unsigned _idx,SocketEndpoint &_ep,CriticalSection *_sect,ISocketSelectHandler *_handler,ISocketConnectNotify *_inotify, unsigned *_remaining, Semaphore *_notifysem) { ep = _ep; idx = _idx; inotify = _inotify; sock.setown(_sock), sect = _sect; handler = _handler; remaining = _remaining; notifysem = _notifysem; } virtual bool notifySelected(ISocket *socket,unsigned selected) { CriticalBlock block(*sect); handler->remove(socket); int err = sock->post_connect(); CSocket *newsock = NULL; { CriticalUnblock unblock(*sect); // up to caller to cope with multithread if (err==0) inotify->connected(idx,ep,sock); else if ((err==ETIMEDOUT)||(err==ECONNREFUSED)) { // don't give up so easily (maybe listener not yet started (i.e. racing)) newsock = prepareSocket(idx,ep,*inotify); Sleep(100); // not very nice but without this would just loop } else inotify->failed(idx,ep,err); } if (newsock) { sock.setown(newsock); handler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,this); } else { sock.clear(); (*remaining)--; notifysem->signal(); } return false; } } *elems; unsigned n = eps.ordinality(); unsigned remaining = n; if (!n) return; elems = new SocketElem[n]; unsigned i; CriticalSection sect; Semaphore notifysem; Owned selecthandler = createSocketSelectHandler( #ifdef _DEBUG "multiConnect" #else NULL #endif ); StringBuffer name; for (i=0;iadd(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,&elems[i]); } else remaining--; } if (remaining) { unsigned lastremaining=remaining; selecthandler->start(); loop { bool to=!notifysem.wait(timeout); { CriticalBlock block(sect); if (remaining==0) break; if (to&&(remaining==lastremaining)) break; // nothing happened recently lastremaining = remaining; } } selecthandler->stop(true); } selecthandler.clear(); if (remaining) { for (unsigned j=0;j &retsockets,unsigned timeout) { unsigned n = eps.ordinality(); if (n==0) return; if (n==1) { // no need for multi ISocket *sock = NULL; try { sock = ISocket::connect_timeout(eps.item(0),timeout); } catch (IException *e) { // ignore error just append NULL sock = NULL; e->Release(); } retsockets.append(sock); return; } while (retsockets.ordinality() &retsockets; public: cNotify(PointerIArrayOf &_retsockets,CriticalSection &_sect) : retsockets(_retsockets),sect(_sect) { } void connected(unsigned idx,const SocketEndpoint &ep,ISocket *sock) { CriticalBlock block(sect); assertex(idxLink(); retsockets.replace(sock,idx); } void failed(unsigned idx,const SocketEndpoint &ep,int err) { StringBuffer s; PROGLOG("multiConnect failed to %s with %d",ep.getUrlStr(s).str(),err); } } notify(retsockets,sect); multiConnect(eps,notify,timeout); } inline void flushText(StringBuffer &text,unsigned short port,unsigned &rep,unsigned &range) { if (rep) { text.append('*').append(rep+1); rep = 0; } else if (range) { text.append('-').append(range); range = 0; } if (port) text.append(':').append(port); } StringBuffer &SocketEndpointArray::getText(StringBuffer &text) { unsigned count = ordinality(); if (!count) return text; if (count==1) return item(0).getUrlStr(text); byte lastip[4]; const SocketEndpoint &first = item(0); bool lastis4 = first.getNetAddress(sizeof(lastip),&lastip)==sizeof(lastip); unsigned short lastport = first.port; first.getIpText(text); unsigned rep=0; unsigned range=0; for (unsigned i=1;ij) text.append('.'); text.append((int)ip[k]); } } memcpy(&lastip,&ip,sizeof(lastip)); lastis4 = is4; lastport = ep.port; } flushText(text,lastport,rep,range); return text; } inline const char *getnum(const char *s,unsigned &n) { n = 0; while (isdigit(*s)) { n = n*10+(*s-'0'); s++; } return s; } inline bool appendv4range(SocketEndpointArray *array,char *str,SocketEndpoint &ep, unsigned defport) { char *s = str; unsigned dc = 0; unsigned port = defport; unsigned rng = 0; unsigned rep = 1; bool notip = false; while (*s) { if (*s=='.') { dc++; s++; } else if (*s==':') { *s = 0; s = (char *)getnum(s+1,port); } else if (*s=='-') { *s = 0; s = (char *)getnum(s+1,rng); } else if (*s=='*') { *s = 0; s = (char *)getnum(s+1,rep); } else { if (!isdigit(*s)) notip = true; s++; } } ep.port = port; if (*str) { if (!notip&&((dc<3)&&((dc!=1)||(strlen(str)!=1)))) { if (!ep.isIp4()) { return false; } StringBuffer tmp; ep.getIpText(tmp); size32_t l = tmp.length(); dc++; loop { if (tmp.length()==0) return false; if (tmp.charAt(tmp.length()-1)=='.') if (--dc==0) break; tmp.setLength(tmp.length()-1); } tmp.append(str); if (rng) { tmp.appendf("-%d",rng); rep = ep.ipsetrange(tmp.str()); } else ep.ipset(tmp.str()); } else if (rng) { // not nice as have to add back range (must be better way - maybe ipincrementto) TBD StringBuffer tmp; tmp.appendf("%s-%d",str,rng); rep = ep.ipsetrange(tmp.str()); } else if (*str) ep.ipset(str); if (ep.isNull()) ep.port = 0; for (unsigned i=0;iappend(ep); if (rng) ep.ipincrement(1); } } else {// just a port change if (ep.isNull()) // avoid null values with ports ep.port = 0; array->append(ep); } return true; } void SocketEndpointArray::fromText(const char *text,unsigned defport) { // this is quite complicated with (mixed) IPv4 and IPv6 // only support 'full' IPv6 and no ranges char *str = strdup(text); char *s = str; SocketEndpoint ep; bool eol = false; loop { while (isspace(*s)||(*s==',')) s++; if (!*s) break; char *e=s; if (*e=='[') { // we have a IPv6 while (*e&&(*e!=']')) e++; while ((*e!=',')&&!isspace(*e)) { if (!*s) { eol = true; break; } e++; } *e = 0; ep.set(s,defport); if (ep.isNull()) { // Error TBD } append(ep); } else { bool hascolon = false; bool isv6 = false; do { if (*e==':') { if (hascolon) isv6 = true; else hascolon = true; } e++; if (!*e) { eol = true; break; } } while (!isspace(*e)&&(*e!=',')); *e = 0; if (isv6) { ep.set(s,defport); if (ep.isNull()) { // Error TBD } append(ep); } else { if (!appendv4range(this,s,ep,defport)) { // Error TBD } } } if (eol) break; s = e+1; } free(str); } bool IpSubNet::set(const char *_net,const char *_mask) { if (!_net||!decodeNumericIP(_net,net)) { // _net NULL means match everything memset(net,0,sizeof(net)); memset(mask,0,sizeof(mask)); return (_net==NULL); } if (!_mask||!decodeNumericIP(_mask,mask)) { // _mask NULL means match exact memset(mask,0xff,sizeof(mask)); return (_mask==NULL); } if (isIp4(net)!=isIp4(mask)) return false; for (unsigned j=0;j<4;j++) if (net[j]&~mask[j]) return false; return true; } bool IpSubNet::test(const IpAddress &ip) { unsigned i; if (ip.getNetAddress(sizeof(i),&i)==sizeof(i)) { if (!isIp4(net)) return false; return (i&mask[3])==(net[3]&mask[3]); } unsigned na[4]; if (ip.getNetAddress(sizeof(na),&na)==sizeof(na)) { for (unsigned j=0;j<4;j++) if ((na[j]&mask[j])!=(net[j]&mask[j])) return false; return true; } return false; } StringBuffer IpSubNet::getNetText(StringBuffer &text) { char tmp[INET6_ADDRSTRLEN]; return text.append(_inet_ntop(isIp4(net)?AF_INET:AF_INET6, &net, tmp, sizeof(tmp))); } StringBuffer IpSubNet::getMaskText(StringBuffer &text) { char tmp[INET6_ADDRSTRLEN]; return text.append(_inet_ntop(isIp4(net)?AF_INET:AF_INET6, &mask, tmp, sizeof(tmp))); // isIp4(net) is correct here } bool IpSubNet::isNull() { for (unsigned i=0;i<4;i++) if (net[i]||mask[i]) return false; return true; } IpSubNet &queryPreferredSubnet() { return PreferredSubnet; } bool setPreferredSubnet(const char *ip,const char *mask) { // also resets cached host IP if (PreferredSubnet.set(ip,mask)) { if (!cachehostip.isNull()) { cachehostip.ipset(NULL); queryHostIP(); } return true; } else return false; } StringBuffer lookupHostName(const IpAddress &ip,StringBuffer &ret) { // not a common routine (no Jlib function!) only support IPv4 initially unsigned ipa; if (ip.getNetAddress(sizeof(ipa),&ipa)==sizeof(ipa)) { struct hostent *phostent = gethostbyaddr( (char *) &ipa, sizeof(ipa), PF_INET); if (phostent) ret.append(phostent->h_name); else ip.getIpText(ret); } else ip.getIpText(ret); return ret; } struct SocketEndpointHTElem { IInterface *ii; SocketEndpoint ep; SocketEndpointHTElem(const SocketEndpoint _ep,IInterface *_ii) { ep.set(_ep); ii = _ii; } ~SocketEndpointHTElem() { ::Release(ii); } }; class jlib_decl CSocketEndpointHashTable : public SuperHashTableOf, implements ISocketEndpointHashTable { virtual void onAdd(void *) {} virtual void onRemove(void *e) { delete (SocketEndpointHTElem *)e; } unsigned getHashFromElement(const void *e) const { return ((const SocketEndpointHTElem *)e)->ep.hash(0); } unsigned getHashFromFindParam(const void *fp) const { return ((const SocketEndpoint *)fp)->hash(0); } const void * getFindParam(const void *p) const { return &((const SocketEndpointHTElem *)p)->ep; } bool matchesFindParam(const void * et, const void *fp, unsigned) const { return ((const SocketEndpointHTElem *)et)->ep.equals(*(SocketEndpoint *)fp); } IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(SocketEndpointHTElem,SocketEndpoint); public: IMPLEMENT_IINTERFACE; CSocketEndpointHashTable() {} ~CSocketEndpointHashTable() { kill(); } void add(const SocketEndpoint &ep, IInterface *i) { SocketEndpointHTElem *e = SuperHashTableOf::find(&ep); if (e) { ::Release(e->ii); e->ii = i; } else { e = new SocketEndpointHTElem(ep,i); SuperHashTableOf::add(*e); } } void remove(const SocketEndpoint &ep) { SuperHashTableOf::remove(&ep); } IInterface *find(const SocketEndpoint &ep) { SocketEndpointHTElem *e = SuperHashTableOf::find(&ep); if (e) return e->ii; return NULL; } }; ISocketEndpointHashTable *createSocketEndpointHashTable() { CSocketEndpointHashTable *ht = new CSocketEndpointHashTable; return ht; } class CSocketConnectWait: public CInterface, implements ISocketConnectWait { Owned sock; bool done; CTimeMon connecttm; unsigned startt; bool oneshot; bool isopen; int initerr; void successfulConnect() { STATS.connects++; STATS.connecttime+=usTick()-startt; #ifdef _TRACE char peer[256]; peer[0] = 'C'; peer[1] = '!'; strcpy(peer+2,sock->hostname?sock->hostname:"(NULL)"); free(sock->tracename); sock->tracename = strdup(peer); #endif } void failedConnect() { STATS.failedconnects++; STATS.failedconnecttime+=usTick()-startt; const char* tracename = sock->tracename; THROWJSOCKEXCEPTION(JSOCKERR_connection_failed); } public: IMPLEMENT_IINTERFACE; CSocketConnectWait(SocketEndpoint &ep,unsigned connecttimeoutms) : connecttm(connecttimeoutms) { oneshot = (connecttimeoutms==0); // i.e. as long as one connect takes done = false; startt = usTick(); sock.setown(new CSocket(ep,sm_tcp,NULL)); isopen = true; initerr = sock->pre_connect(false); } ISocket *wait(unsigned timems) { // this is a bit spagetti due to dual timeouts etc CTimeMon waittm(timems); unsigned refuseddelay = 1; bool waittimedout = false; bool connectimedout = false; do { bool connectdone = false; unsigned remaining; connectimedout = connecttm.timedout(&remaining); unsigned waitremaining; waittimedout = waittm.timedout(&waitremaining); if (oneshot||(waitremainingpre_connect(false); initerr = 0; if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) err = 0; // continue else { if (err==0) connectdone = true; // done immediately else if(!oneshot) // probably ECONNREFUSED but treat all errors same refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound } } if (!connectdone&&(err==0)) { SOCKET s = sock->sock; T_FD_SET fds; struct timeval tv; XFD_ZERO(&fds); FD_SET((unsigned)s, &fds); T_FD_SET except; XFD_ZERO(&except); FD_SET((unsigned)s, &except); tv.tv_sec = remaining / 1000; tv.tv_usec = (remaining % 1000)*1000; CHECKSOCKRANGE(s); int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv ); if (rc==0) break; // timeout done = true; err = 0; if (rc>0) { // select succeeded - return error from socket (0 if connected) socklen_t errlen = sizeof(err); rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error if ((rc!=0)&&!err) err = ERRNO(); // some implementations of getsockopt duff if (err&&!oneshot) // probably ECONNREFUSED but treat all errors same refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound } else { // select failed err = ERRNO(); LOGERR(err,2,"CSocketConnectWait ::select"); } } if (err==0) { err = sock->post_connect(); if (err==0) { successfulConnect(); return sock.getClear(); } } sock->errclose(); isopen = false; } while (!waittimedout&&!oneshot); if (connectimedout) { STATS.failedconnects++; STATS.failedconnecttime+=usTick()-startt; const char* tracename = sock->tracename; THROWJSOCKEXCEPTION(JSOCKERR_connection_failed); } return NULL; } }; ISocketConnectWait *nonBlockingConnect(SocketEndpoint &ep,unsigned connecttimeoutms) { return new CSocketConnectWait(ep,connecttimeoutms); }