udpsha.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef updsha_include
  14. #define updsha_include
  15. #include "jmutex.hpp"
  16. #include "roxiemem.hpp"
  17. #include "jcrc.hpp"
  18. #include <limits>
  19. #include <queue>
  20. #include <map>
  21. typedef unsigned sequence_t;
  22. #define SEQF
  23. extern roxiemem::IDataBufferManager *bufferManager;
  24. typedef bool (*PKT_CMP_FUN) (const void *pkData, const void *key);
  25. // Flag bits in pktSeq field
  26. #define UDP_PACKET_COMPLETE 0x80000000 // Packet completes a single agent request
  27. #define UDP_PACKET_RESENT 0x40000000 // Packet is a repeat of one that the server may have missed
  28. #define UDP_PACKET_SEQUENCE_MASK 0x3fffffff
  29. struct UdpPacketHeader
  30. {
  31. unsigned short length; // total length of packet including the header, data, and meta
  32. unsigned short metalength; // length of metadata (comes after header and data)
  33. ServerIdentifier node; // Node this message came from
  34. unsigned msgSeq; // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
  35. unsigned pktSeq; // sequence number of this packet within the message (top bit signifies final packet)
  36. sequence_t sendSeq; // sequence number of this packet among all those send from this node to this target
  37. // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
  38. ruid_t ruid; // The uid allocated by the server to this agent transaction
  39. unsigned msgId; // sub-id allocated by the server to this request within the transaction
  40. };
  41. constexpr unsigned TRACKER_BITS=1024; // Power of two recommended
  42. constexpr unsigned TRACKER_DWORDS=(TRACKER_BITS+63)/64;
  43. // Some more things we can consider:
  44. // 1. sendSeq gives us some insight into lost packets that might help is get inflight calculation right (if it is still needed)
  45. // 2. If we can definitively declare that a packet is lost, we can fail that messageCollator earlier (and thus get the resend going earlier)
  46. // 3. Worth seeing why resend doesn't use same collator. We could skip sending (though would still need to calculate) the bit we already had...
  47. class PacketTracker
  48. {
  49. // This uses a circular buffer indexed by seq to store information about which packets we have seen
  50. private:
  51. sequence_t base = 0; // Sequence number of first packet represented in the array
  52. sequence_t hwm = (sequence_t) -1; // Sequence number of highest sequence number ever seen
  53. unsigned __int64 seen[TRACKER_DWORDS] = {0}; // bitmask representing whether we have seen (base+n)
  54. public:
  55. void dump() const;
  56. // Note that we have seen this packet, and return indicating whether we had already seen it
  57. bool noteSeen(UdpPacketHeader &hdr);
  58. const PacketTracker copy() const;
  59. inline sequence_t lastSeen() const { return hwm; }
  60. bool hasSeen(sequence_t seq) const;
  61. bool canRecord(sequence_t seq) const;
  62. bool hasGaps() const;
  63. };
  64. using roxiemem::DataBuffer;
  65. // queue_t is used to hold a fifo queue of DataBuffer elements to be sent or collated.
  66. // Originally implemented as a circular buffer, but we don't want to block adding even if full (we do however want to avoid requesting more if full)
  67. // so now reimplemented as a single-linked list. There is a field in the DataBuffers that can be used for chaining them together that is used
  68. // in a few other places, e.g. collator
  69. class queue_t
  70. {
  71. DataBuffer *head = nullptr; // We add at tail and remove from head
  72. DataBuffer *tail = nullptr;
  73. RelaxedAtomic<unsigned> count{0}; // always updated inside a critical section, only atomic to guarantee it can be read outside the crit sec.
  74. unsigned limit = 0;
  75. CriticalSection c_region;
  76. InterruptableSemaphore data_avail;
  77. Semaphore free_sl; // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
  78. unsigned signal_free_sl = 0; // Number of people waiting in free_sl. Only updated within critical section
  79. public:
  80. void interrupt();
  81. void pushOwn(DataBuffer *buffer); // Non blocking enqueue (used by receiver)
  82. void pushOwnWait(DataBuffer *buffer); // Blocking enqueue (used by the sender)
  83. DataBuffer *pop(bool block);
  84. bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
  85. unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
  86. unsigned available() const // non-blocking, no critical section
  87. {
  88. unsigned num = count.load();
  89. return likely(num < limit) ? limit - num : 0;
  90. }
  91. void set_queue_size(unsigned limit); //must be called immediately after constructor if default constructor is used
  92. queue_t(unsigned int queue_size);
  93. queue_t() {};
  94. ~queue_t();
  95. inline int capacity() const { return limit; }
  96. protected:
  97. void doEnqueue(DataBuffer *buf); // internal function to add the item to the queue, but not signal
  98. };
  99. template < class _et >
  100. class simple_queue
  101. {
  102. _et *elements;
  103. unsigned int element_count;
  104. int first;
  105. int last;
  106. int active_buffers;
  107. CriticalSection c_region;
  108. Semaphore data_avail;
  109. Semaphore free_space;
  110. public:
  111. void push(const _et &element)
  112. {
  113. free_space.wait();
  114. c_region.enter();
  115. int next = (last + 1) % element_count;
  116. elements[last] = element;
  117. last = next;
  118. active_buffers++;
  119. c_region.leave();
  120. data_avail.signal();
  121. }
  122. bool push(const _et &element,long timeout)
  123. {
  124. if (free_space.wait(timeout) ) {
  125. c_region.enter();
  126. int next = (last + 1) % element_count;
  127. elements[last] = element;
  128. last = next;
  129. active_buffers++;
  130. c_region.leave();
  131. data_avail.signal();
  132. return true;
  133. }
  134. return false;
  135. }
  136. void pop (_et &element)
  137. {
  138. data_avail.wait();
  139. c_region.enter();
  140. element = elements[first];
  141. first = (first + 1) % element_count;
  142. active_buffers--;
  143. c_region.leave();
  144. free_space.signal();
  145. }
  146. unsigned in_queue() {
  147. c_region.enter();
  148. unsigned res = active_buffers;
  149. c_region.leave();
  150. return res;
  151. }
  152. bool empty()
  153. {
  154. c_region.enter();
  155. bool res = (active_buffers == 0);
  156. c_region.leave();
  157. return res;
  158. }
  159. simple_queue(unsigned int queue_size)
  160. {
  161. element_count = queue_size;
  162. elements = new _et[element_count];
  163. free_space.signal(element_count);
  164. active_buffers = 0;
  165. first = 0;
  166. last = 0;
  167. }
  168. ~simple_queue() {
  169. delete [] elements;
  170. }
  171. };
  172. #ifndef _WIN32
  173. #define HANDLE_PRAGMA_PACK_PUSH_POP
  174. #endif
  175. class flowType {
  176. public:
  177. enum flowCmd : unsigned short
  178. {
  179. //Messages sent from receiver to sender
  180. ok_to_send, // permit has been granted.
  181. request_received, // acknowledge request to send from the sender
  182. //messages sent from the sender to the receiver
  183. request_to_send, // request permit to send some data.
  184. send_start, // about to send data - indicate how many packets are actually going to be sent
  185. send_completed, // all data sent (and no data to potentially be resent).
  186. request_to_send_more, // equivalent to send_completed followed by a request_to_send (used if no async permits)
  187. // A marker for the number of flow commands:
  188. max_flow_cmd
  189. };
  190. static const char *name(flowCmd m)
  191. {
  192. switch (m)
  193. {
  194. case ok_to_send: return "ok_to_send";
  195. case request_received: return "request_received";
  196. case request_to_send: return "request_to_send";
  197. case send_start: return "send_start";
  198. case send_completed: return "send_completed";
  199. case request_to_send_more: return "request_to_send_more";
  200. default:
  201. assert(false);
  202. return "??";
  203. }
  204. };
  205. };
  206. #pragma pack(push,1)
  207. struct UdpPermitToSendMsg
  208. {
  209. flowType::flowCmd cmd;
  210. unsigned short max_data;
  211. sequence_t flowSeq;
  212. ServerIdentifier destNode;
  213. PacketTracker seen;
  214. };
  215. struct UdpRequestToSendMsg
  216. {
  217. flowType::flowCmd cmd;
  218. unsigned short packets; // Number about to send (send_start), or just sent (request_to_send_more or send_completed). Not used (0) for request_to_send
  219. sequence_t sendSeq;
  220. sequence_t flowSeq;
  221. ServerIdentifier sourceNode;
  222. };
  223. #pragma pack(pop)
  224. int check_max_socket_read_buffer(int size);
  225. int check_max_socket_write_buffer(int size);
  226. int check_set_max_socket_read_buffer(int size);
  227. int check_set_max_socket_write_buffer(int size);
  228. #define TRACE_RETRY_DATA 0x08
  229. #define TRACE_MSGPACK 0x10
  230. inline bool checkTraceLevel(unsigned category, unsigned level)
  231. {
  232. return (udpTraceLevel >= level);
  233. }
  234. extern UDPLIB_API void sanityCheckUdpSettings(unsigned receiveQueueSize, unsigned numSenders, __uint64 networkSpeedBitsPerSecond);
  235. #define SOCKET_SIMULATION
  236. #ifdef SOCKET_SIMULATION
  237. #ifdef _DEBUG
  238. #define TEST_DROPPED_PACKETS
  239. #endif
  240. #ifdef TEST_DROPPED_PACKETS
  241. extern UDPLIB_API bool udpDropDataPackets;
  242. extern UDPLIB_API unsigned udpDropDataPacketsPercent;
  243. extern UDPLIB_API unsigned udpDropFlowPackets[flowType::max_flow_cmd];
  244. extern unsigned flowPacketsSent[flowType::max_flow_cmd];
  245. #endif
  246. extern UDPLIB_API bool isUdpTestMode;
  247. extern UDPLIB_API bool udpTestUseUdpSockets;
  248. extern UDPLIB_API bool udpTestSocketJitter;
  249. extern UDPLIB_API unsigned udpTestSocketDelay;
  250. extern UDPLIB_API bool udpTestVariableDelay;
  251. class CSocketSimulator : public CInterfaceOf<ISocket>
  252. {
  253. private:
  254. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  255. unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
  256. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  257. unsigned timeout) override { UNIMPLEMENTED; }
  258. virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; }
  259. virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
  260. virtual size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
  261. virtual size32_t get_max_send_size() override { UNIMPLEMENTED; }
  262. virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) override { UNIMPLEMENTED; }
  263. virtual int logPollError(unsigned revents, const char *rwstr) override { UNIMPLEMENTED; }
  264. virtual int wait_read(unsigned timeout) override { UNIMPLEMENTED; }
  265. virtual int wait_write(unsigned timeout) override { UNIMPLEMENTED; }
  266. virtual bool set_nonblock(bool on) override { UNIMPLEMENTED; }
  267. virtual bool set_nagle(bool on) override { UNIMPLEMENTED; }
  268. virtual void set_linger(int lingersecs) override { UNIMPLEMENTED; }
  269. virtual void cancel_accept() override { UNIMPLEMENTED; }
  270. virtual void shutdown(unsigned mode) override { UNIMPLEMENTED; }
  271. virtual void shutdownNoThrow(unsigned mode) override { UNIMPLEMENTED; }
  272. virtual int name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  273. virtual int peer_name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  274. virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  275. virtual IpAddress &getPeerAddress(IpAddress &addr) override { UNIMPLEMENTED; }
  276. virtual SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override { UNIMPLEMENTED; }
  277. virtual bool connectionless() override { UNIMPLEMENTED; }
  278. virtual void set_return_addr(int port,const char *name) override { UNIMPLEMENTED; }
  279. virtual void set_block_mode ( // must be called before block operations
  280. unsigned flags, // BF_* flags (must match receive_block)
  281. size32_t recsize=0, // record size (required for rec compression)
  282. unsigned timeoutms=0 // timeout in milisecs (0 for no timeout)
  283. ) override { UNIMPLEMENTED; }
  284. virtual bool send_block(
  285. const void *blk, // data to send
  286. size32_t sz // size to send (0 for eof)
  287. ) override { UNIMPLEMENTED; }
  288. virtual size32_t receive_block_size () override { UNIMPLEMENTED; }
  289. virtual size32_t receive_block(
  290. void *blk, // receive pointer
  291. size32_t sz // max size to read (0 for sync eof)
  292. // if less than block size truncates block
  293. ) override { UNIMPLEMENTED; }
  294. virtual void close() override { UNIMPLEMENTED; }
  295. virtual unsigned OShandle() const override { UNIMPLEMENTED; }
  296. virtual size32_t avail_read() override { UNIMPLEMENTED; }
  297. virtual size32_t write_multiple(unsigned num,void const**buf, size32_t *size) override { UNIMPLEMENTED; }
  298. virtual size32_t get_send_buffer_size() override { UNIMPLEMENTED; }
  299. virtual void set_send_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  300. virtual bool join_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  301. virtual bool leave_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  302. virtual void set_ttl(unsigned _ttl) override { UNIMPLEMENTED; }
  303. virtual size32_t get_receive_buffer_size() override { UNIMPLEMENTED; }
  304. virtual void set_receive_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  305. virtual void set_keep_alive(bool set) override { UNIMPLEMENTED; }
  306. virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size) override { UNIMPLEMENTED; }
  307. virtual bool check_connection() override { UNIMPLEMENTED; }
  308. virtual bool isSecure() const override { UNIMPLEMENTED; }
  309. virtual bool isValid() const override { UNIMPLEMENTED; }
  310. };
  311. class CSimulatedQueueReadSocket : public CSocketSimulator
  312. {
  313. friend class CSimulatedQueueWriteSocket;
  314. CSimulatedQueueReadSocket(const SocketEndpoint &_me);
  315. ~CSimulatedQueueReadSocket();
  316. std::queue<unsigned> packetSizes;
  317. std::queue<const void *> packets;
  318. unsigned max = 131072;
  319. unsigned used = 0;
  320. SocketEndpoint me;
  321. CriticalSection crit;
  322. Semaphore avail;
  323. void writeSimulatedPacket(void const* buf, size32_t size);
  324. void writeOwnSimulatedPacket(void const* buf, size32_t size);
  325. static std::map<SocketEndpoint, CSimulatedQueueReadSocket *> allReaders;
  326. static CriticalSection allReadersCrit;
  327. public:
  328. static CSimulatedQueueReadSocket* udp_create(const SocketEndpoint &_me);
  329. static CSimulatedQueueReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
  330. virtual size32_t get_receive_buffer_size() override { return max; }
  331. virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
  332. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  333. unsigned timeoutsecs = WAIT_FOREVER) override;
  334. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  335. unsigned timeout) override;
  336. virtual int wait_read(unsigned timeout) override;
  337. virtual void close() override {}
  338. virtual void shutdown(unsigned mode) override { }
  339. virtual void shutdownNoThrow(unsigned mode) override{ }
  340. };
  341. class CSimulatedQueueWriteSocket : public CSocketSimulator
  342. {
  343. CSimulatedQueueWriteSocket(const SocketEndpoint &ep);
  344. ~CSimulatedQueueWriteSocket();
  345. const SocketEndpoint destEp;
  346. CriticalSection crit;
  347. std::queue<unsigned> dueTimes;
  348. std::queue<unsigned> packetSizes;
  349. std::queue<const void *> packets;
  350. unsigned delay = 0;
  351. bool jitter = false;
  352. public:
  353. static CSimulatedQueueWriteSocket* udp_connect( const SocketEndpoint &ep);
  354. virtual size32_t write(void const* buf, size32_t size) override;
  355. virtual void set_send_buffer_size(size32_t sz) override {};
  356. virtual void close() override {};
  357. unsigned writeDelayed(unsigned now);
  358. };
  359. class CSimulatedUdpSocket : public CSocketSimulator
  360. {
  361. virtual void shutdown(unsigned mode) override { realSocket->shutdown(mode); }
  362. virtual void shutdownNoThrow(unsigned mode) override{ realSocket->shutdownNoThrow(mode); }
  363. protected:
  364. Owned<ISocket> realSocket;
  365. };
  366. class CSimulatedUdpReadSocket : public CSimulatedUdpSocket
  367. {
  368. CSimulatedUdpReadSocket(const SocketEndpoint &_me);
  369. ~CSimulatedUdpReadSocket();
  370. public:
  371. unsigned port;
  372. static CSimulatedUdpReadSocket* udp_create(const SocketEndpoint &_me);
  373. virtual size32_t get_receive_buffer_size() override;
  374. virtual void set_receive_buffer_size(size32_t sz) override;
  375. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
  376. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
  377. virtual int wait_read(unsigned timeout) override;
  378. virtual void close() override;
  379. };
  380. class CSimulatedUdpWriteSocket : public CSimulatedUdpSocket
  381. {
  382. CSimulatedUdpWriteSocket( const SocketEndpoint &ep);
  383. public:
  384. static CSimulatedUdpWriteSocket* udp_connect( const SocketEndpoint &ep);
  385. virtual size32_t write(void const* buf, size32_t size) override;
  386. virtual void set_send_buffer_size(size32_t sz) override;
  387. virtual void close() override;
  388. };
  389. #endif
  390. #endif