/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ // Socket abstraction #ifndef __JSOCKIO_H__ #define __JSOCKIO_H__ #ifndef _VER_C5 #include #endif #include "jexpdef.hpp" #include "jexcept.hpp" #include "jthread.hpp" #define DEFAULT_LISTEN_QUEUE_SIZE 200 // maximum for windows 2000 server #define DEFAULT_LINGER_TIME 1000 // seconds #ifndef WAIT_FOREVER #define WAIT_FOREVER ((unsigned)-1) #endif enum JSOCKET_ERROR_CODES { JSOCKERR_ok = 0, JSOCKERR_not_opened = -1, // accept,name,peer_name,read,write JSOCKERR_bad_address = -2, // connect JSOCKERR_connection_failed = -3, // connect JSOCKERR_broken_pipe = -4, // read,write JSOCKERR_invalid_access_mode = -5, // accept JSOCKERR_timeout_expired = -6, // read JSOCKERR_port_in_use = -7, // create JSOCKERR_cancel_accept = -8, // accept JSOCKERR_connectionless_socket = -9, // accept, cancel_accept JSOCKERR_graceful_close = -10, // read,send JSOCKERR_handle_too_large = -11, // select, connect etc (linux only) JSOCKERR_bad_netaddr = -12, // get/set net address JSOCKERR_ipv6_not_implemented = -13 // various }; // Block operation flags #define BF_ASYNC_TRANSFER 0 // send_block sends immediately (default) #define BF_SYNC_TRANSFER_PULL 1 // send_block waits until receiver ready (i.e. receives first) #define BF_LZW_COMPRESS 2 // compress using LZW compression #define BF_REC_COMPRESS 4 // compress using record difference compression #define BF_RELIABLE_TRANSFER 8 // retries on socket failure #define BF_SYNC_TRANSFER_PUSH 16 // send_block pushes that has data (i.e. sends first) // shutdown options #define SHUTDOWN_READ 0 #define SHUTDOWN_WRITE 1 #define SHUTDOWN_READWRITE 2 // // Abstract socket interface // class jlib_decl IpAddress { unsigned netaddr[4]; public: IpAddress() { ipset(NULL); } IpAddress(const IpAddress& other) { ipset(other); } IpAddress(const char *text) { ipset(text); } bool ipset(const char *text); // sets to NULL if fails or text=NULL void ipset(const IpAddress& other) { memcpy(&netaddr,&other.netaddr,sizeof(netaddr)); } bool ipequals(const IpAddress & other) const; int ipcompare(const IpAddress & other) const; // depreciated unsigned iphash(unsigned prev=0) const; bool isNull() const; // is null bool isHost() const; // is primary host NIC ip bool isLoopBack() const; // is loopback (localhost: 127.0.0.1 or ::1) bool isLocal() const; // matches local interface bool isLinkLocal() const; bool isSiteLocal() const; // depreciated bool isIp4() const; StringBuffer &getIpText(StringBuffer & out) const; void ipserialize(MemoryBuffer & out) const; void ipdeserialize(MemoryBuffer & in); unsigned ipdistance(const IpAddress &ip,unsigned offset=0) const; // network order distance (offset: 0-3 word (leat sig.), 0=Ipv4) bool ipincrement(unsigned count,byte minoctet=0,byte maxoctet=255,unsigned short minipv6piece=0,unsigned maxipv6piece=0xffff); unsigned ipsetrange( const char *text); // e.g. 10.173.72.1-65 ('-' may be omitted) // returns number in range (use ipincrement to iterate through) size32_t getNetAddress(size32_t maxsz,void *dst) const; // for internal use - returns 0 if address doesn't fit void setNetAddress(size32_t sz,const void *src); // for internal use inline void operator = ( const IpAddress &other ) { ipset(other); } }; inline IpAddress &Array__Member2Param(IpAddress &src) { return src; } inline void Array__Assign(IpAddress & dest, IpAddress &src) { dest=src; } inline bool Array__Equal(IpAddress &m, IpAddress &p) { return m.ipequals(p); } inline void Array__Destroy(IpAddress &p) { } class jlib_decl IpAddressArray : public ArrayOf { public: StringBuffer &getText(StringBuffer &text); void fromText(const char *s,unsigned defport); }; extern jlib_decl IpAddress & queryHostIP(); extern jlib_decl IpAddress & queryLocalIP(); extern jlib_decl const char * GetCachedHostName(); inline StringBuffer & GetHostName(StringBuffer &str) { return str.append(GetCachedHostName()); } extern jlib_decl IpAddress &GetHostIp(IpAddress &ip); extern jlib_decl IpAddress &localHostToNIC(IpAddress &ip); class jlib_decl SocketEndpoint : extends IpAddress { public: SocketEndpoint() { set(NULL,0); }; SocketEndpoint(const char *name,unsigned short _port=0) { set(name,_port); }; SocketEndpoint(unsigned short _port) { setLocalHost(_port); }; SocketEndpoint(unsigned short _port, const IpAddress & _ip) { set(_port,_ip); }; SocketEndpoint(const SocketEndpoint &other) { set(other); } void deserialize(MemoryBuffer & in); void serialize(MemoryBuffer & out) const; bool set(const char *name,unsigned short _port=0); inline void set(const SocketEndpoint & value) { ipset(value); port = value.port; } inline void setLocalHost(unsigned short _port) { port = _port; GetHostIp(*this); } // NB *not* localhost(127.0.0.1) inline void set(unsigned short _port, const IpAddress & _ip) { ipset(_ip); port = _port; }; inline bool equals(const SocketEndpoint &ep) const { return ((port==ep.port)&&ipequals(ep)); } void getUrlStr(char * str, size32_t len) const; // in form ip4:port or [ip6]:port StringBuffer &getUrlStr(StringBuffer &str) const; // in form ip4:port or [ip6]:port inline void operator = ( const SocketEndpoint &other ) { ipset(other); port = other.port; } unsigned hash(unsigned prev) const; unsigned short port; }; inline SocketEndpoint &Array__Member2Param(SocketEndpoint &src) { return src; } inline void Array__Assign(SocketEndpoint & dest, SocketEndpoint &src) { dest=src; } inline bool Array__Equal(SocketEndpoint &m, SocketEndpoint &p) { return m.equals(p); } inline void Array__Destroy(SocketEndpoint &p) { } class jlib_decl SocketEndpointArray : public ArrayOf { public: StringBuffer &getText(StringBuffer &text); void fromText(const char *s,unsigned defport); }; interface ISocketEndpointHashTable: implements IInterface { virtual void add(const SocketEndpoint &ep, IInterface *i)=0; // takes ownership virtual void remove(const SocketEndpoint &ep)=0; // releases virtual IInterface *find(const SocketEndpoint &ep)=0; // does not link }; extern jlib_decl ISocketEndpointHashTable *createSocketEndpointHashTable(); class jlib_decl IpSubNet { unsigned net[4]; unsigned mask[4]; public: IpSubNet() {set(NULL,NULL); } IpSubNet(const char *_net,const char *_mask) { set(_net,_mask); } bool set(const char *_net,const char *_mask); // _net NULL means match everything // _mask NULL means match exact bool test(const IpAddress &ip); StringBuffer getNetText(StringBuffer &text); StringBuffer getMaskText(StringBuffer &text); bool isNull(); }; class jlib_decl ISocket : extends IInterface { public: // // Create client socket connected to a TCP server socket static ISocket* connect( const SocketEndpoint &ep ); // general connect static ISocket* connect_timeout( const SocketEndpoint &ep , unsigned timeout); // connect where should must take longer than timeout (in ms) to connect static ISocket* connect_wait( const SocketEndpoint &ep, unsigned timems); // connect where should try connecting for *at least* time specified // (e.g. if don't know that server listening yet) // if 0 specified for time then does single (blocking) connect try // Create client socket connected to a UDP server socket // static ISocket* udp_connect( unsigned short port, char const* host); static ISocket* udp_connect( const SocketEndpoint &ep); // // Create server TCP socket // static ISocket* create( unsigned short port, int listen_queue_size = DEFAULT_LISTEN_QUEUE_SIZE); // // Create server TCP socket listening a specific IP // static ISocket* create_ip( unsigned short port, const char *host, int listen_queue_size = DEFAULT_LISTEN_QUEUE_SIZE); // // Create server UDP socket // static ISocket* udp_create( unsigned short port); // Create client socket connected to a multicast server socket // static ISocket* multicast_connect( unsigned short port, const char *mcgroupip, unsigned ttl); static ISocket* multicast_connect( const SocketEndpoint &ep, unsigned ttl); // // Create server multicast socket // static ISocket* multicast_create( unsigned short port, const char *mcgroupip); static ISocket* multicast_create( unsigned short port, const IpAddress &mcgroupip); // // Creates an ISocket for an already created socket // static ISocket* attach(int s,bool tcpip=true); virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) = 0; virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) = 0; virtual void read(void* buf, size32_t size) = 0; virtual size32_t write(void const* buf, size32_t size) = 0; // returns amount written normally same as in size (see set_nonblock) virtual size32_t get_max_send_size() = 0; // // This method is called by server to accept client connection // virtual ISocket* accept(bool allowcancel=false) = 0; // not needed for UDP // // This method is called to check whether a socket has data ready // virtual int wait_read(unsigned timeout) = 0; // // This method is called to check whether a socket is ready to write (i.e. some free buffer space) // virtual int wait_write(unsigned timeout) = 0; // // can be used with write to allow it to return if it would block // be sure and restore to old state before calling other functions on this socket // virtual bool set_nonblock(bool on) = 0; // returns old state // enable 'nagling' - small packet coalescing (implies delayed transmission) // virtual bool set_nagle(bool on) = 0; // returns old state // set 'linger' time - time close will linger so that outstanding unsent data will be transmitted // virtual void set_linger(int lingersecs) = 0; // // Cancel accept operation and close socket // virtual void cancel_accept() = 0; // not needed for UDP // // Shutdown socket: prohibit write and/or read operations on socket // virtual void shutdown(unsigned mode=SHUTDOWN_READWRITE) = 0; // not needed for UDP // Get name of accepted socket and returns port virtual int name(char *name,size32_t namemax)=0; // Get peer name of socket and returns port - in UDP returns return addr virtual int peer_name(char *name,size32_t namemax)=0; // Get peer endpoint of socket - in UDP returns return addr virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep)=0; // Get peer ip of socket - in UDP returns return addr virtual IpAddress &getPeerAddress(IpAddress &addr)=0; // // Close socket // virtual bool connectionless()=0; // true if accept need not be called (i.e. UDP) virtual void set_return_addr(int port,const char *name) = 0; // used for UDP servers only // Block functions virtual void set_block_mode ( // must be called before block operations unsigned flags, // BF_* flags (must match receive_block) size32_t recsize=0, // record size (required for rec compression) unsigned timeoutms=0 // timeout in milisecs (0 for no timeout) )=0; virtual bool send_block( const void *blk, // data to send size32_t sz // size to send (0 for eof) )=0; virtual size32_t receive_block_size ()=0; // get size of next block (always must call receive_block after) virtual size32_t receive_block( void *blk, // receive pointer size32_t sz // max size to read (0 for sync eof) // if less than block size truncates block )=0; virtual void close() = 0; virtual unsigned OShandle() = 0; // for internal use virtual size32_t avail_read() = 0; // called after wait_read to see how much data available virtual size32_t write_multiple(unsigned num,void const**buf, size32_t *size) = 0; // same as write except writes multiple blocks virtual size32_t get_send_buffer_size() =0; // get OS send buffer virtual void set_send_buffer_size(size32_t sz) =0; // set OS send buffer size virtual bool join_multicast_group(SocketEndpoint &ep) = 0; // for udp multicast virtual bool leave_multicast_group(SocketEndpoint &ep) = 0; // for udp multicast virtual size32_t get_receive_buffer_size() =0; // get OS receive buffer virtual void set_receive_buffer_size(size32_t sz) =0; // set OS receive buffer size virtual void set_keep_alive(bool set)=0; // set option SO_KEEPALIVE virtual size32_t udp_write_to(SocketEndpoint &ep,void const* buf, size32_t size)=0; virtual bool check_connection() = 0; /* Exceptions raised: (when set_raise_exceptions(TRUE)) create sys:(socket, bind, listen) udp_create sys:(socket, bind, listen) accept JSOCKERR_not_opened, sys:(accept,setsockopt), JSOCKERR_invalid_access_mode, JSOCKERR_cancel_accept, JSOCKERR_connectionless_socket name JSOCKERR_not_opened, sys:(getsockname) peer_name JSOCKERR_not_opened, sys:(getpeername) cancel_accept {connect}, sys:(gethostname), JSOCKERR_connectionless_socket connect JSOCKERR_bad_address, JSOCKERR_connection_failed, sys:(socket, connect, setsockopt) udp_connect JSOCKERR_bad_address, sys:(socket, connect, setsockopt) read (timeout) JSOCKERR_not_opened, JSOCKERR_broken_pipe, JSOCKERR_timeout_expired ,sys:(select, read), JSOCKERR_graceful_close read (no timeout) JSOCKERR_not_opened, JSOCKERR_broken_pipe, sys:(read), JSOCKERR_graceful_close write JSOCKERR_not_opened, JSOCKERR_broken_pipe, sys:(write), JSOCKERR_graceful_close close sys:(write) shutdown sys:(shutdown),JSOCKERR_broken_pipe */ }; interface jlib_thrown_decl IJSOCK_Exception: extends IException { }; extern jlib_decl IJSOCK_Exception *IPv6NotImplementedException(const char *filename,unsigned lineno); #define IPV6_NOT_IMPLEMENTED() throw IPv6NotImplementedException(__FILE__, __LINE__) //--------------------------------------------------------------------------- // These classes are useful for compressing a list of ip:ports to pass around. class jlib_decl SocketListCreator { public: SocketListCreator(); void addSocket(const SocketEndpoint &ep); void addSocket(const char * ip, unsigned port); const char * getText(); void addSockets(SocketEndpointArray &array); protected: StringBuffer fullText; StringAttr lastIp; unsigned lastPort; }; class jlib_decl SocketListParser // This class depreciated - new code should use SocketEndpointArray::fromText and getText { public: SocketListParser(const char * text); void first(unsigned defport=0); bool get(StringAttr & ip, unsigned & port, unsigned index, unsigned defport=0); // alternative to iterating.. bool next(StringAttr & ip, unsigned & port); unsigned getSockets(SocketEndpointArray &array,unsigned defport=0); protected: StringAttr fullText; StringAttr lastIp; const char * cursor; unsigned lastPort; }; struct JSocketStatistics { unsigned connects; // successful unsigned connecttime; // all times in microsecs unsigned failedconnects; unsigned failedconnecttime; unsigned reads; unsigned readtime; __int64 readsize; // all sizes in bytes unsigned writes; unsigned writetime; __int64 writesize; unsigned activesockets; unsigned numblockrecvs; unsigned numblocksends; __int64 blockrecvsize; __int64 blocksendsize; unsigned blockrecvtime; // not including initial handshake unsigned blocksendtime; unsigned longestblocksend; unsigned longestblocksize; }; extern jlib_decl void getSocketStatistics(JSocketStatistics &stats); extern jlib_decl void resetSocketStatistics(); extern jlib_decl StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &buf); // Select Thread #define SELECTMODE_READ 1 #define SELECTMODE_WRITE 2 #define SELECTMODE_EXCEPT 4 interface ISocketSelectNotify: extends IInterface { virtual bool notifySelected(ISocket *sock,unsigned selected)=0; // return false to continue to next selected, true to re-select }; interface ISocketSelectHandler: extends IInterface { public: virtual void start()=0; virtual void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)=0; virtual void remove(ISocket *sock)=0; virtual void stop(bool wait)=0; }; extern jlib_decl ISocketSelectHandler *createSocketSelectHandler(const char *trc=NULL); class MemoryBuffer; // sends/receives length as well as contents. extern jlib_decl void readBuffer(ISocket * socket, MemoryBuffer & buffer); extern jlib_decl void readBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms); extern jlib_decl void writeBuffer(ISocket * socket, MemoryBuffer & buffer); // ditto but catches any exceptions extern jlib_decl bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer); extern jlib_decl bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms); extern jlib_decl bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer); // utility interface for simple conversations // conversation is always between two ends, // at any given time one end must be receiving and other sending (though these may swap during the conversation) interface IConversation: extends IInterface { virtual bool accept(unsigned timeoutms)=0; // one side accepts virtual bool connect(unsigned timeoutms)=0; // other side connects virtual bool send(MemoryBuffer &mb)=0; // 0 length buffer can be sent virtual bool recv(MemoryBuffer &mb, unsigned timeoutms)=0; // up to protocol to terminate conversation (e.g. by zero length buffer) virtual void cancel()=0; // cancels above methods (from separate thread) virtual unsigned short setRandomPort(unsigned short base, unsigned num)=0; // sets a random unique port for accept use }; extern jlib_decl IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *ep=NULL); // the end that listens may omit ep // this function does not connect so raises no socket exceptions // interface for reading from multiple sockets using the BF_SYNC_TRANSFER_PUSH protocol interface ISocketBufferReader: extends IInterface { public: virtual void init(unsigned num,ISocket **sockets,size32_t buffermax=(unsigned)-1)=0; virtual unsigned get(MemoryBuffer &mb)=0; virtual void done(bool wait)=0; }; extern jlib_decl ISocketBufferReader *createSocketBufferReader(const char *trc=NULL); extern jlib_decl void markNodeCentral(SocketEndpoint &ep); // random delay for linux interface ISocketConnectNotify { public: virtual void connected(unsigned idx,const SocketEndpoint &ep,ISocket *socket)=0; // must link socket if kept virtual void failed(unsigned idx,const SocketEndpoint &ep,int err)=0; }; extern jlib_decl void multiConnect(const SocketEndpointArray &eps,ISocketConnectNotify &inotify,unsigned timeout); extern jlib_decl void multiConnect(const SocketEndpointArray &eps,PointerIArrayOf &retsockets,unsigned timeout); interface ISocketConnectWait: extends IInterface { public: virtual ISocket *wait(unsigned waittimems)=0; // return NULL if time expired, throws exception if connect failed // releasing ISocketConnectWait cancels the connect iff wait has never returned socket }; extern jlib_decl ISocketConnectWait *nonBlockingConnect(SocketEndpoint &ep,unsigned connectimeoutms=0); // buffered socket interface IBufferedSocket : implements IInterface { virtual int readline(char* buf, int maxlen, IMultiException *me) = 0; virtual int read(char* buf, int maxlen) = 0; virtual int readline(char* buf, int maxlen, bool keepcrlf, IMultiException *me) = 0; virtual void setReadTimeout(unsigned int timeout) = 0; }; #define BSOCKET_READ_TIMEOUT 600 #define BSOCKET_CLIENT_READ_TIMEOUT 7200 extern jlib_decl IBufferedSocket* createBufferedSocket(ISocket* socket); #define MAX_NET_ADDRESS_SIZE (16) extern jlib_decl IpSubNet &queryPreferredSubnet(); // preferred subnet when resolving multiple NICs extern jlib_decl bool setPreferredSubnet(const char *ip,const char *mask); // also resets cached host IP extern jlib_decl StringBuffer lookupHostName(const IpAddress &ip,StringBuffer &ret); extern jlib_decl bool lookupInterfaceIp(IpAddress &ip,const char *ifname,bool test); // if test true returns true if ip matches interface // if test false returns first ip for interface #endif