udpsha.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef updsha_include
  14. #define updsha_include
  15. #include "jmutex.hpp"
  16. #include "roxiemem.hpp"
  17. #include "jcrc.hpp"
  18. #include <limits>
  19. #include <queue>
  20. #include <map>
  21. typedef unsigned sequence_t;
  22. #define SEQF
  23. extern roxiemem::IDataBufferManager *bufferManager;
  24. typedef bool (*PKT_CMP_FUN) (const void *pkData, const void *key);
  25. // Flag bits in pktSeq field
  26. #define UDP_PACKET_COMPLETE 0x80000000 // Packet completes a single agent request
  27. #define UDP_PACKET_RESENT 0x40000000 // Packet is a repeat of one that the server may have missed
  28. #define UDP_PACKET_SEQUENCE_MASK 0x3fffffff
  29. struct UdpPacketHeader
  30. {
  31. unsigned short length; // total length of packet including the header, data, and meta
  32. unsigned short metalength; // length of metadata (comes after header and data)
  33. ServerIdentifier node; // Node this message came from
  34. unsigned msgSeq; // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
  35. unsigned pktSeq; // sequence number of this packet within the message (top bit signifies final packet)
  36. sequence_t sendSeq; // sequence number of this packet among all those send from this node to this target
  37. // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
  38. ruid_t ruid; // The uid allocated by the server to this agent transaction
  39. unsigned msgId; // sub-id allocated by the server to this request within the transaction
  40. };
  41. constexpr unsigned TRACKER_BITS=1024; // Power of two recommended
  42. constexpr unsigned TRACKER_DWORDS=(TRACKER_BITS+63)/64;
  43. // Some more things we can consider:
  44. // 1. sendSeq gives us some insight into lost packets that might help is get inflight calcuation right (if it is still needed)
  45. // 2. If we can definitively declare that a packet is lost, we can fail that messageCollator earlier (and thus get the resend going earlier)
  46. // 3. Worth seeing why resend doesn't use same collator. We could skip sending (though would still need to calculate) the bit we already had...
  47. class PacketTracker
  48. {
  49. // This uses a circular buffer indexed by seq to store information about which packets we have seen
  50. private:
  51. sequence_t base = 0; // Sequence number of first packet represented in the array
  52. sequence_t hwm = (sequence_t) -1; // Sequence number of highest sequence number ever seen
  53. unsigned __int64 seen[TRACKER_DWORDS] = {0}; // bitmask representing whether we have seen (base+n)
  54. void dump() const;
  55. public:
  56. // Note that we have seen this packet, and return indicating whether we had already seen it
  57. bool noteSeen(UdpPacketHeader &hdr);
  58. const PacketTracker copy() const;
  59. inline sequence_t lastSeen() const { return hwm; }
  60. bool hasSeen(sequence_t seq) const;
  61. bool canRecord(sequence_t seq) const;
  62. bool hasGaps() const;
  63. };
  64. using roxiemem::DataBuffer;
  65. // queue_t is used to hold a fifo queue of DataBuffer elements to be sent or collated.
  66. // Originally implemented as a circular buffer, but we don't want to block adding even if full (we do however want to avoid requesting more if full)
  67. // so now reimplemented as a single-linked list. There is a field in the DataBuffers that can be used for chaining them together that is used
  68. // in a few other places, e.g. collator
  69. class queue_t
  70. {
  71. DataBuffer *head = nullptr; // We add at tail and remove from head
  72. DataBuffer *tail = nullptr;
  73. RelaxedAtomic<unsigned> count{0}; // always updated inside a critical section, only atomic to guarantee it can be read outside the crit sec.
  74. unsigned limit = 0;
  75. CriticalSection c_region;
  76. InterruptableSemaphore data_avail;
  77. Semaphore free_sl; // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
  78. unsigned signal_free_sl = 0; // Number of people waiting in free_sl. Only updated within critical section
  79. public:
  80. void interrupt();
  81. void pushOwn(DataBuffer *buffer); // Non blocking enqueue (used by receiver)
  82. void pushOwnWait(DataBuffer *buffer); // Blocking enqueue (used by the sender)
  83. DataBuffer *pop(bool block);
  84. bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
  85. unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
  86. unsigned available() const // non-blocking, no critical section
  87. {
  88. unsigned num = count.load();
  89. return likely(num < limit) ? limit - num : 0;
  90. }
  91. void set_queue_size(unsigned limit); //must be called immediately after constructor if default constructor is used
  92. queue_t(unsigned int queue_size);
  93. queue_t() {};
  94. ~queue_t();
  95. inline int capacity() const { return limit; }
  96. protected:
  97. void doEnqueue(DataBuffer *buf); // internal function to add the item to the queue, but not signal
  98. };
  99. template < class _et >
  100. class simple_queue
  101. {
  102. _et *elements;
  103. unsigned int element_count;
  104. int first;
  105. int last;
  106. int active_buffers;
  107. CriticalSection c_region;
  108. Semaphore data_avail;
  109. Semaphore free_space;
  110. public:
  111. void push(const _et &element)
  112. {
  113. free_space.wait();
  114. c_region.enter();
  115. int next = (last + 1) % element_count;
  116. elements[last] = element;
  117. last = next;
  118. active_buffers++;
  119. c_region.leave();
  120. data_avail.signal();
  121. }
  122. bool push(const _et &element,long timeout)
  123. {
  124. if (free_space.wait(timeout) ) {
  125. c_region.enter();
  126. int next = (last + 1) % element_count;
  127. elements[last] = element;
  128. last = next;
  129. active_buffers++;
  130. c_region.leave();
  131. data_avail.signal();
  132. return true;
  133. }
  134. return false;
  135. }
  136. void pop (_et &element)
  137. {
  138. data_avail.wait();
  139. c_region.enter();
  140. element = elements[first];
  141. first = (first + 1) % element_count;
  142. active_buffers--;
  143. c_region.leave();
  144. free_space.signal();
  145. }
  146. unsigned in_queue() {
  147. c_region.enter();
  148. unsigned res = active_buffers;
  149. c_region.leave();
  150. return res;
  151. }
  152. bool empty()
  153. {
  154. c_region.enter();
  155. bool res = (active_buffers == 0);
  156. c_region.leave();
  157. return res;
  158. }
  159. simple_queue(unsigned int queue_size)
  160. {
  161. element_count = queue_size;
  162. elements = new _et[element_count];
  163. free_space.signal(element_count);
  164. active_buffers = 0;
  165. first = 0;
  166. last = 0;
  167. }
  168. ~simple_queue() {
  169. delete [] elements;
  170. }
  171. };
  172. #ifndef _WIN32
  173. #define HANDLE_PRAGMA_PACK_PUSH_POP
  174. #endif
  175. class flowType {
  176. public:
  177. enum flowCmd : unsigned short { ok_to_send, request_received, request_to_send, send_completed, request_to_send_more, max_flow_cmd };
  178. static const char *name(flowCmd m)
  179. {
  180. switch (m)
  181. {
  182. case ok_to_send: return "ok_to_send";
  183. case request_received: return "request_received";
  184. case request_to_send: return "request_to_send";
  185. case send_completed: return "send_completed";
  186. case request_to_send_more: return "request_to_send_more";
  187. default:
  188. assert(false);
  189. return "??";
  190. }
  191. };
  192. };
  193. class sniffType {
  194. public:
  195. enum sniffCmd : unsigned short { busy, idle };
  196. };
  197. #pragma pack(push,1)
  198. struct UdpPermitToSendMsg
  199. {
  200. flowType::flowCmd cmd;
  201. unsigned short max_data;
  202. sequence_t flowSeq;
  203. ServerIdentifier destNode;
  204. PacketTracker seen;
  205. };
  206. struct UdpRequestToSendMsg
  207. {
  208. flowType::flowCmd cmd;
  209. unsigned short packets;
  210. sequence_t sendSeq;
  211. sequence_t flowSeq;
  212. ServerIdentifier sourceNode;
  213. };
  214. struct sniff_msg
  215. {
  216. sniffType::sniffCmd cmd;
  217. ServerIdentifier nodeIp;
  218. };
  219. #pragma pack(pop)
  220. int check_max_socket_read_buffer(int size);
  221. int check_max_socket_write_buffer(int size);
  222. int check_set_max_socket_read_buffer(int size);
  223. int check_set_max_socket_write_buffer(int size);
  224. #define TRACE_RETRY_DATA 0x08
  225. #define TRACE_MSGPACK 0x10
  226. inline bool checkTraceLevel(unsigned category, unsigned level)
  227. {
  228. return (udpTraceLevel >= level);
  229. }
  230. #define SOCKET_SIMULATION
  231. #ifdef SOCKET_SIMULATION
  232. #ifdef _DEBUG
  233. #define TEST_DROPPED_PACKETS
  234. #endif
  235. #ifdef TEST_DROPPED_PACKETS
  236. extern UDPLIB_API bool udpDropDataPackets;
  237. extern UDPLIB_API unsigned udpDropFlowPackets[flowType::max_flow_cmd];
  238. extern unsigned flowPacketsSent[flowType::max_flow_cmd];
  239. #endif
  240. extern UDPLIB_API bool isUdpTestMode;
  241. extern UDPLIB_API bool udpTestUseUdpSockets;
  242. extern UDPLIB_API bool udpTestSocketJitter;
  243. extern UDPLIB_API unsigned udpTestSocketDelay;
  244. extern UDPLIB_API bool udpTestVariableDelay;
  245. class CSocketSimulator : public CInterfaceOf<ISocket>
  246. {
  247. private:
  248. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  249. unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
  250. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  251. unsigned timeout) override { UNIMPLEMENTED; }
  252. virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; }
  253. virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
  254. virtual size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
  255. virtual size32_t get_max_send_size() override { UNIMPLEMENTED; }
  256. virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) override { UNIMPLEMENTED; }
  257. virtual int logPollError(unsigned revents, const char *rwstr) override { UNIMPLEMENTED; }
  258. virtual int wait_read(unsigned timeout) override { UNIMPLEMENTED; }
  259. virtual int wait_write(unsigned timeout) override { UNIMPLEMENTED; }
  260. virtual bool set_nonblock(bool on) override { UNIMPLEMENTED; }
  261. virtual bool set_nagle(bool on) override { UNIMPLEMENTED; }
  262. virtual void set_linger(int lingersecs) override { UNIMPLEMENTED; }
  263. virtual void cancel_accept() override { UNIMPLEMENTED; }
  264. virtual void shutdown(unsigned mode) override { UNIMPLEMENTED; }
  265. virtual void shutdownNoThrow(unsigned mode) override { UNIMPLEMENTED; }
  266. virtual int name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  267. virtual int peer_name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
  268. virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  269. virtual IpAddress &getPeerAddress(IpAddress &addr) override { UNIMPLEMENTED; }
  270. virtual SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override { UNIMPLEMENTED; }
  271. virtual bool connectionless() override { UNIMPLEMENTED; }
  272. virtual void set_return_addr(int port,const char *name) override { UNIMPLEMENTED; }
  273. virtual void set_block_mode ( // must be called before block operations
  274. unsigned flags, // BF_* flags (must match receive_block)
  275. size32_t recsize=0, // record size (required for rec compression)
  276. unsigned timeoutms=0 // timeout in milisecs (0 for no timeout)
  277. ) override { UNIMPLEMENTED; }
  278. virtual bool send_block(
  279. const void *blk, // data to send
  280. size32_t sz // size to send (0 for eof)
  281. ) override { UNIMPLEMENTED; }
  282. virtual size32_t receive_block_size () override { UNIMPLEMENTED; }
  283. virtual size32_t receive_block(
  284. void *blk, // receive pointer
  285. size32_t sz // max size to read (0 for sync eof)
  286. // if less than block size truncates block
  287. ) override { UNIMPLEMENTED; }
  288. virtual void close() override { UNIMPLEMENTED; }
  289. virtual unsigned OShandle() const override { UNIMPLEMENTED; }
  290. virtual size32_t avail_read() override { UNIMPLEMENTED; }
  291. virtual size32_t write_multiple(unsigned num,void const**buf, size32_t *size) override { UNIMPLEMENTED; }
  292. virtual size32_t get_send_buffer_size() override { UNIMPLEMENTED; }
  293. virtual void set_send_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  294. virtual bool join_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  295. virtual bool leave_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
  296. virtual void set_ttl(unsigned _ttl) override { UNIMPLEMENTED; }
  297. virtual size32_t get_receive_buffer_size() override { UNIMPLEMENTED; }
  298. virtual void set_receive_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
  299. virtual void set_keep_alive(bool set) override { UNIMPLEMENTED; }
  300. virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size) override { UNIMPLEMENTED; }
  301. virtual bool check_connection() override { UNIMPLEMENTED; }
  302. virtual bool isSecure() const override { UNIMPLEMENTED; }
  303. virtual bool isValid() const override { UNIMPLEMENTED; }
  304. };
  305. class CSimulatedQueueReadSocket : public CSocketSimulator
  306. {
  307. friend class CSimulatedQueueWriteSocket;
  308. CSimulatedQueueReadSocket(const SocketEndpoint &_me);
  309. ~CSimulatedQueueReadSocket();
  310. std::queue<unsigned> packetSizes;
  311. std::queue<const void *> packets;
  312. unsigned max = 131072;
  313. unsigned used = 0;
  314. SocketEndpoint me;
  315. CriticalSection crit;
  316. Semaphore avail;
  317. void writeSimulatedPacket(void const* buf, size32_t size);
  318. void writeOwnSimulatedPacket(void const* buf, size32_t size);
  319. static std::map<SocketEndpoint, CSimulatedQueueReadSocket *> allReaders;
  320. static CriticalSection allReadersCrit;
  321. public:
  322. static CSimulatedQueueReadSocket* udp_create(const SocketEndpoint &_me);
  323. static CSimulatedQueueReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
  324. virtual size32_t get_receive_buffer_size() override { return max; }
  325. virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
  326. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  327. unsigned timeoutsecs = WAIT_FOREVER) override;
  328. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  329. unsigned timeout) override;
  330. virtual int wait_read(unsigned timeout) override;
  331. virtual void close() override {}
  332. virtual void shutdown(unsigned mode) override { }
  333. virtual void shutdownNoThrow(unsigned mode) override{ }
  334. };
  335. class CSimulatedQueueWriteSocket : public CSocketSimulator
  336. {
  337. CSimulatedQueueWriteSocket(const SocketEndpoint &ep);
  338. ~CSimulatedQueueWriteSocket();
  339. const SocketEndpoint destEp;
  340. CriticalSection crit;
  341. std::queue<unsigned> dueTimes;
  342. std::queue<unsigned> packetSizes;
  343. std::queue<const void *> packets;
  344. unsigned delay = 0;
  345. bool jitter = false;
  346. public:
  347. static CSimulatedQueueWriteSocket* udp_connect( const SocketEndpoint &ep);
  348. virtual size32_t write(void const* buf, size32_t size) override;
  349. virtual void set_send_buffer_size(size32_t sz) override {};
  350. virtual void close() override {};
  351. unsigned writeDelayed(unsigned now);
  352. };
  353. class CSimulatedUdpSocket : public CSocketSimulator
  354. {
  355. virtual void shutdown(unsigned mode) override { realSocket->shutdown(mode); }
  356. virtual void shutdownNoThrow(unsigned mode) override{ realSocket->shutdownNoThrow(mode); }
  357. protected:
  358. Owned<ISocket> realSocket;
  359. };
  360. class CSimulatedUdpReadSocket : public CSimulatedUdpSocket
  361. {
  362. CSimulatedUdpReadSocket(const SocketEndpoint &_me);
  363. ~CSimulatedUdpReadSocket();
  364. public:
  365. unsigned port;
  366. static CSimulatedUdpReadSocket* udp_create(const SocketEndpoint &_me);
  367. virtual size32_t get_receive_buffer_size() override;
  368. virtual void set_receive_buffer_size(size32_t sz) override;
  369. virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
  370. virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
  371. virtual int wait_read(unsigned timeout) override;
  372. virtual void close() override;
  373. };
  374. class CSimulatedUdpWriteSocket : public CSimulatedUdpSocket
  375. {
  376. CSimulatedUdpWriteSocket( const SocketEndpoint &ep);
  377. public:
  378. static CSimulatedUdpWriteSocket* udp_connect( const SocketEndpoint &ep);
  379. virtual size32_t write(void const* buf, size32_t size) override;
  380. virtual void set_send_buffer_size(size32_t sz) override;
  381. virtual void close() override;
  382. };
  383. #endif
  384. #endif