udpsha.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef updsha_include
  14. #define updsha_include
  15. #include "jmutex.hpp"
  16. #include "roxiemem.hpp"
  17. #include "jcrc.hpp"
  18. #include <limits>
  19. #include <queue>
  20. #include <map>
  21. typedef unsigned sequence_t;
  22. #define SEQF
  23. extern roxiemem::IDataBufferManager *bufferManager;
  24. typedef bool (*PKT_CMP_FUN) (const void *pkData, const void *key);
  25. // Flag bits in pktSeq field
  26. #define UDP_PACKET_COMPLETE 0x80000000 // Packet completes a single agent request
  27. #define UDP_PACKET_RESENT 0x40000000 // Packet is a repeat of one that the server may have missed
  28. #define UDP_PACKET_SEQUENCE_MASK 0x3fffffff
  29. struct UdpPacketHeader
  30. {
  31. unsigned short length; // total length of packet including the header, data, and meta
  32. unsigned short metalength; // length of metadata (comes after header and data)
  33. ServerIdentifier node; // Node this message came from
  34. unsigned msgSeq; // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
  35. unsigned pktSeq; // sequence number of this packet within the message (top bit signifies final packet)
  36. sequence_t sendSeq; // sequence number of this packet among all those send from this node to this target
  37. // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
  38. ruid_t ruid; // The uid allocated by the server to this agent transaction
  39. unsigned msgId; // sub-id allocated by the server to this request within the transaction
  40. };
  41. constexpr unsigned TRACKER_BITS=1024; // Power of two recommended
  42. constexpr unsigned TRACKER_DWORDS=(TRACKER_BITS+63)/64;
  43. // Some more things we can consider:
  44. // 1. sendSeq gives us some insight into lost packets that might help is get inflight calcuation right (if it is still needed)
  45. // 2. If we can definitively declare that a packet is lost, we can fail that messageCollator earlier (and thus get the resend going earlier)
  46. // 3. Worth seeing why resend doesn't use same collator. We could skip sending (though would still need to calculate) the bit we already had...
  47. class PacketTracker
  48. {
  49. // This uses a circular buffer indexed by seq to store information about which packets we have seen
  50. private:
  51. sequence_t base = 0; // Sequence number of first packet represented in the array
  52. sequence_t hwm = (sequence_t) -1; // Sequence number of highest sequence number ever seen
  53. unsigned __int64 seen[TRACKER_DWORDS] = {0}; // bitmask representing whether we have seen (base+n)
  54. void dump() const;
  55. public:
  56. // Note that we have seen this packet, and return indicating whether we had already seen it
  57. bool noteSeen(UdpPacketHeader &hdr);
  58. const PacketTracker copy() const;
  59. inline sequence_t lastSeen() const { return hwm; }
  60. bool hasSeen(sequence_t seq) const;
  61. bool canRecord(sequence_t seq) const;
  62. bool hasGaps() const;
  63. };
  64. using roxiemem::DataBuffer;
  65. // queue_t is used to hold a fifo queue of DataBuffer elements to be sent or collated.
  66. // Originally implemented as a circular buffer, but we don't want to block adding even if full (we do however want to avoid requesting more if full)
  67. // so now reimplemented as a single-linked list. There is a field in the DataBuffers that can be used for chaining them together that is used
  68. // in a few other places, e.g. collator
  69. class queue_t
  70. {
  71. DataBuffer *head = nullptr; // We add at tail and remove from head
  72. DataBuffer *tail = nullptr;
  73. unsigned count = 0;
  74. unsigned limit = 0;
  75. CriticalSection c_region;
  76. InterruptableSemaphore data_avail;
  77. Semaphore free_sl; // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
  78. unsigned signal_free_sl = 0; // Number of people waiting in free_sl. Only updated within critical section
  79. public:
  80. void interrupt();
  81. void pushOwn(DataBuffer *buffer);
  82. DataBuffer *pop(bool block);
  83. bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
  84. unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
  85. unsigned available(); // non-blocking
  86. int free_slots(); // block if no free slots
  87. void set_queue_size(unsigned limit); //must be called immediately after constructor if default constructor is used
  88. queue_t(unsigned int queue_size);
  89. queue_t() {};
  90. ~queue_t();
  91. inline int capacity() const { return limit; }
  92. };
  93. template < class _et >
  94. class simple_queue
  95. {
  96. _et *elements;
  97. unsigned int element_count;
  98. int first;
  99. int last;
  100. int active_buffers;
  101. CriticalSection c_region;
  102. Semaphore data_avail;
  103. Semaphore free_space;
  104. public:
  105. void push(const _et &element)
  106. {
  107. free_space.wait();
  108. c_region.enter();
  109. int next = (last + 1) % element_count;
  110. elements[last] = element;
  111. last = next;
  112. active_buffers++;
  113. c_region.leave();
  114. data_avail.signal();
  115. }
  116. bool push(const _et &element,long timeout)
  117. {
  118. if (free_space.wait(timeout) ) {
  119. c_region.enter();
  120. int next = (last + 1) % element_count;
  121. elements[last] = element;
  122. last = next;
  123. active_buffers++;
  124. c_region.leave();
  125. data_avail.signal();
  126. return true;
  127. }
  128. return false;
  129. }
  130. void pop (_et &element)
  131. {
  132. data_avail.wait();
  133. c_region.enter();
  134. element = elements[first];
  135. first = (first + 1) % element_count;
  136. active_buffers--;
  137. c_region.leave();
  138. free_space.signal();
  139. }
  140. unsigned in_queue() {
  141. c_region.enter();
  142. unsigned res = active_buffers;
  143. c_region.leave();
  144. return res;
  145. }
  146. bool empty()
  147. {
  148. c_region.enter();
  149. bool res = (active_buffers == 0);
  150. c_region.leave();
  151. return res;
  152. }
  153. simple_queue(unsigned int queue_size)
  154. {
  155. element_count = queue_size;
  156. elements = new _et[element_count];
  157. free_space.signal(element_count);
  158. active_buffers = 0;
  159. first = 0;
  160. last = 0;
  161. }
  162. ~simple_queue() {
  163. delete [] elements;
  164. }
  165. };
  166. #ifndef _WIN32
  167. #define HANDLE_PRAGMA_PACK_PUSH_POP
  168. #endif
  169. class flowType {
  170. public:
  171. enum flowCmd : unsigned short { ok_to_send, request_received, request_to_send, send_completed, request_to_send_more, max_flow_cmd };
  172. static const char *name(flowCmd m)
  173. {
  174. switch (m)
  175. {
  176. case ok_to_send: return "ok_to_send";
  177. case request_received: return "request_received";
  178. case request_to_send: return "request_to_send";
  179. case send_completed: return "send_completed";
  180. case request_to_send_more: return "request_to_send_more";
  181. default:
  182. assert(false);
  183. return "??";
  184. }
  185. };
  186. };
  187. class sniffType {
  188. public:
  189. enum sniffCmd : unsigned short { busy, idle };
  190. };
  191. #pragma pack(push,1)
  192. struct UdpPermitToSendMsg
  193. {
  194. flowType::flowCmd cmd;
  195. unsigned short max_data;
  196. sequence_t flowSeq;
  197. ServerIdentifier destNode;
  198. PacketTracker seen;
  199. };
  200. struct UdpRequestToSendMsg
  201. {
  202. flowType::flowCmd cmd;
  203. unsigned short packets;
  204. sequence_t sendSeq;
  205. sequence_t flowSeq;
  206. ServerIdentifier sourceNode;
  207. };
  208. struct sniff_msg
  209. {
  210. sniffType::sniffCmd cmd;
  211. ServerIdentifier nodeIp;
  212. };
  213. #pragma pack(pop)
  214. int check_max_socket_read_buffer(int size);
  215. int check_max_socket_write_buffer(int size);
  216. int check_set_max_socket_read_buffer(int size);
  217. int check_set_max_socket_write_buffer(int size);
  218. #define TRACE_RETRY_DATA 0x08
  219. #define TRACE_MSGPACK 0x10
  220. inline bool checkTraceLevel(unsigned category, unsigned level)
  221. {
  222. return (udpTraceLevel >= level);
  223. }
  224. #define SOCKET_SIMULATION
  225. #ifdef SOCKET_SIMULATION
  226. #ifdef _DEBUG
  227. #define TEST_DROPPED_PACKETS
  228. #endif
  229. #ifdef TEST_DROPPED_PACKETS
  230. extern UDPLIB_API bool udpDropDataPackets;
  231. extern UDPLIB_API unsigned udpDropFlowPackets[flowType::max_flow_cmd];
  232. extern unsigned flowPacketsSent[flowType::max_flow_cmd];
  233. #endif
  234. extern UDPLIB_API bool isUdpTestMode;
  235. extern UDPLIB_API bool udpTestUseUdpSockets;
  236. class CSocketSimulator : public CInterfaceOf<ISocket>
  237. {
  238. private:
  239. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  240. unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
  241. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  242. unsigned timeout) override { UNIMPLEMENTED; }
  243. virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; }
  244. virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
  245. virtual size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
  246. virtual size32_t get_max_send_size() override { UNIMPLEMENTED; }
  247. virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) override { UNIMPLEMENTED; }
  248. virtual int logPollError(unsigned revents, const char *rwstr) override { UNIMPLEMENTED; }
  249. virtual int wait_read(unsigned timeout) override { UNIMPLEMENTED; }
  250. virtual int wait_write(unsigned timeout) override { UNIMPLEMENTED; }
  251. virtual bool set_nonblock(bool on) override { UNIMPLEMENTED; }
  252. virtual bool set_nagle(bool on) override { UNIMPLEMENTED; }
  253. virtual void set_linger(int lingersecs) override { UNIMPLEMENTED; }
  254. virtual void cancel_accept() override { UNIMPLEMENTED; }
  255. virtual void shutdown(unsigned mode) override { UNIMPLEMENTED; }
  256. virtual void shutdownNoThrow(unsigned mode) override { UNIMPLEMENTED; }
  257. virtual int name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  258. virtual int peer_name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  259. virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  260. virtual IpAddress &getPeerAddress(IpAddress &addr) override { UNIMPLEMENTED; }
  261. virtual SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override { UNIMPLEMENTED; }
  262. virtual bool connectionless() override { UNIMPLEMENTED; }
  263. virtual void set_return_addr(int port,const char *name) override { UNIMPLEMENTED; }
  264. virtual void set_block_mode ( // must be called before block operations
  265. unsigned flags, // BF_* flags (must match receive_block)
  266. size32_t recsize=0, // record size (required for rec compression)
  267. unsigned timeoutms=0 // timeout in milisecs (0 for no timeout)
  268. ) override { UNIMPLEMENTED; }
  269. virtual bool send_block(
  270. const void *blk, // data to send
  271. size32_t sz // size to send (0 for eof)
  272. ) override { UNIMPLEMENTED; }
  273. virtual size32_t receive_block_size () override { UNIMPLEMENTED; }
  274. virtual size32_t receive_block(
  275. void *blk, // receive pointer
  276. size32_t sz // max size to read (0 for sync eof)
  277. // if less than block size truncates block
  278. ) override { UNIMPLEMENTED; }
  279. virtual void close() override { UNIMPLEMENTED; }
  280. virtual unsigned OShandle() const override { UNIMPLEMENTED; }
  281. virtual size32_t avail_read() override { UNIMPLEMENTED; }
  282. virtual size32_t write_multiple(unsigned num,void const**buf, size32_t *size) override { UNIMPLEMENTED; }
  283. virtual size32_t get_send_buffer_size() override { UNIMPLEMENTED; }
  284. virtual void set_send_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  285. virtual bool join_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  286. virtual bool leave_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  287. virtual void set_ttl(unsigned _ttl) override { UNIMPLEMENTED; }
  288. virtual size32_t get_receive_buffer_size() override { UNIMPLEMENTED; }
  289. virtual void set_receive_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  290. virtual void set_keep_alive(bool set) override { UNIMPLEMENTED; }
  291. virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size) override { UNIMPLEMENTED; }
  292. virtual bool check_connection() override { UNIMPLEMENTED; }
  293. virtual bool isSecure() const override { UNIMPLEMENTED; }
  294. virtual bool isValid() const override { UNIMPLEMENTED; }
  295. };
  296. class CSimulatedQueueReadSocket : public CSocketSimulator
  297. {
  298. friend class CSimulatedQueueWriteSocket;
  299. CSimulatedQueueReadSocket(const SocketEndpoint &_me);
  300. ~CSimulatedQueueReadSocket();
  301. std::queue<unsigned> packetSizes;
  302. std::queue<const void *> packets;
  303. unsigned max = 131072;
  304. unsigned used = 0;
  305. SocketEndpoint me;
  306. CriticalSection crit;
  307. Semaphore avail;
  308. void writeSimulatedPacket(void const* buf, size32_t size);
  309. static std::map<SocketEndpoint, CSimulatedQueueReadSocket *> allReaders;
  310. static CriticalSection allReadersCrit;
  311. public:
  312. static CSimulatedQueueReadSocket* udp_create(const SocketEndpoint &_me);
  313. static CSimulatedQueueReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
  314. virtual size32_t get_receive_buffer_size() override { return max; }
  315. virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
  316. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  317. unsigned timeoutsecs = WAIT_FOREVER) override;
  318. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  319. unsigned timeout) override;
  320. virtual int wait_read(unsigned timeout) override;
  321. virtual void close() override {}
  322. virtual void shutdown(unsigned mode) override { }
  323. virtual void shutdownNoThrow(unsigned mode) override{ }
  324. };
  325. class CSimulatedQueueWriteSocket : public CSocketSimulator
  326. {
  327. CSimulatedQueueWriteSocket( const SocketEndpoint &ep) : destEp(ep) {}
  328. const SocketEndpoint destEp;
  329. public:
  330. static CSimulatedQueueWriteSocket* udp_connect( const SocketEndpoint &ep);
  331. virtual size32_t write(void const* buf, size32_t size) override;
  332. virtual void set_send_buffer_size(size32_t sz) override {};
  333. virtual void close() override {};
  334. };
  335. class CSimulatedUdpSocket : public CSocketSimulator
  336. {
  337. virtual void shutdown(unsigned mode) override { realSocket->shutdown(mode); }
  338. virtual void shutdownNoThrow(unsigned mode) override{ realSocket->shutdownNoThrow(mode); }
  339. protected:
  340. Owned<ISocket> realSocket;
  341. };
  342. class CSimulatedUdpReadSocket : public CSimulatedUdpSocket
  343. {
  344. CSimulatedUdpReadSocket(const SocketEndpoint &_me);
  345. ~CSimulatedUdpReadSocket();
  346. public:
  347. unsigned port;
  348. static CSimulatedUdpReadSocket* udp_create(const SocketEndpoint &_me);
  349. virtual size32_t get_receive_buffer_size() override;
  350. virtual void set_receive_buffer_size(size32_t sz) override;
  351. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
  352. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
  353. virtual int wait_read(unsigned timeout) override;
  354. virtual void close() override;
  355. };
  356. class CSimulatedUdpWriteSocket : public CSimulatedUdpSocket
  357. {
  358. CSimulatedUdpWriteSocket( const SocketEndpoint &ep);
  359. public:
  360. static CSimulatedUdpWriteSocket* udp_connect( const SocketEndpoint &ep);
  361. virtual size32_t write(void const* buf, size32_t size) override;
  362. virtual void set_send_buffer_size(size32_t sz) override;
  363. virtual void close() override;
  364. };
  365. #endif
  366. #endif