udpsha.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "jsocket.hpp"
  16. #include "jlog.hpp"
  17. #include "roxie.hpp"
  18. #include "roxiemem.hpp"
  19. #include "portlist.h"
  20. #ifdef _WIN32
  21. #include <winsock2.h>
  22. #else
  23. #include <sys/socket.h>
  24. #endif
  25. using roxiemem::DataBuffer;
  26. using roxiemem::IDataBufferManager;
  27. IDataBufferManager *bufferManager;
  28. // All exported udp configuration options - these values provide the default values
  29. #ifdef TEST_DROPPED_PACKETS
  30. bool udpDropDataPackets = false;
  31. unsigned udpDropDataPacketsPercent = 0;
  32. unsigned udpDropFlowPackets[flowType::max_flow_cmd] = {};
  33. unsigned flowPacketsSent[flowType::max_flow_cmd] = {};
  34. #endif
  35. bool udpTraceFlow = false;
  36. bool udpTraceTimeouts = false;
  37. unsigned udpTraceLevel = 0;
  38. unsigned udpFlowSocketsSize = 131072;
  39. unsigned udpLocalWriteSocketSize = 1024000;
  40. unsigned udpStatsReportInterval = 60000;
  41. unsigned udpOutQsPriority = 0;
  42. unsigned udpMaxPermitDeadTimeouts = 5; // How many permit grants are allowed to expire (with no flow message) until request is ignored
  43. unsigned udpRequestDeadTimeout = 10000; // Timeout for sender getting no response to request to send before assuming that the receiver is dead.
  44. //The following control the timeouts within the udp layer. All timings are in milliseconds, but I suspect some of these should possibly be sub-millisecond
  45. //The following timeouts are described in more detail in a comment at the head of udptrr.cpp
  46. unsigned udpFlowAckTimeout = 2; // [sender] the maximum time that it is expected to take to receive an acknowledgement of a flow message (when one is sent) - should be small
  47. unsigned updDataSendTimeout = 20; // [sender+receiver] how long to receive the maximum amount of data, ~100 packets of 8K should take 10ms on a 1Gb network. Timeout for assuming send_complete has been lost
  48. unsigned udpRequestTimeout = 20; // [sender] A reasonable expected time between a request for a permit until the permit is granted - used as a timeout to guard against an ok_to_send has been lost.
  49. unsigned udpPermitTimeout = 50; // [receiver] How long is a grant expected to last before it is assumed lost?
  50. unsigned udpResendDelay = 0; // [sender+receiver] How long should elapse after a data packet has been sent before we assume it is lost.
  51. // 0 means they are unlikely to be lost, so worth resending as soon as it appears to be missing - trading duplicate packets for delays (good if allowasync=false)
  52. unsigned udpMaxPendingPermits = 10; // This seems like a reasonable compromise - each sender will be able to send up to 20% of the input queue each request.
  53. unsigned udpMaxClientPercent = 600; // What percentage of (queueSize/maxPendingPermits) should be granted to each sender.
  54. unsigned udpMinSlotsPerSender = 1; // The smallest number of slots to assign to a sender
  55. bool udpResendAllMissingPackets = true; // If set do not limit the number of missing packets sent to the size of the permit.
  56. bool udpResendLostPackets = true; // is the code to resend lost data packets enabled?
  57. bool udpAssumeSequential = false; // If a data packet with a later sequence has been received is it reasonable to assume it has been lost?
  58. bool udpAdjustThreadPriorities = false; // Adjust the priorities for the UDP receiving and sending threads so they have priority.
  59. // Enabling tends to cause a big rise in context switches from other threads, so disabled by default
  60. bool udpAllowAsyncPermits = false; // Allow requests to send more data to overtake the data packets that are being sent.
  61. unsigned multicastTTL = 1;
  62. MODULE_INIT(INIT_PRIORITY_STANDARD)
  63. {
  64. bufferManager = roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE);
  65. return true;
  66. }
  67. MODULE_EXIT()
  68. {
  69. bufferManager->Release();
  70. }
  71. const IpAddress ServerIdentifier::getIpAddress() const
  72. {
  73. IpAddress ret;
  74. ret.setIP4(netAddress);
  75. return ret;
  76. }
  77. bool ServerIdentifier::isMe() const
  78. {
  79. return *this==myNode;
  80. }
  81. ServerIdentifier myNode;
  82. //---------------------------------------------------------------------------------------------
  83. void queue_t::set_queue_size(unsigned _limit)
  84. {
  85. limit = _limit;
  86. }
  87. queue_t::queue_t(unsigned _limit)
  88. {
  89. set_queue_size(_limit);
  90. }
  91. queue_t::~queue_t()
  92. {
  93. while (head)
  94. {
  95. auto p = head;
  96. head = head->msgNext;
  97. ::Release(p);
  98. }
  99. }
  100. void queue_t::interrupt()
  101. {
  102. data_avail.interrupt();
  103. }
  104. void queue_t::doEnqueue(DataBuffer *buf)
  105. {
  106. // Must currently be called within a critical section. Does not signal - that should be done outside the CS.
  107. // Could probably be done lock-free, which given one thread using this is high priority might avoid some
  108. // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
  109. if (tail)
  110. {
  111. assert(head);
  112. assert(!tail->msgNext);
  113. tail->msgNext = buf;
  114. }
  115. else
  116. {
  117. assert(!head);
  118. head = buf;
  119. }
  120. tail = buf;
  121. count.fastAdd(1); // inside a critical section, so no need for atomic inc.
  122. #ifdef _DEBUG
  123. if (count > limit)
  124. DBGLOG("queue_t::pushOwn set count to %u", count.load());
  125. #endif
  126. }
  127. void queue_t::pushOwnWait(DataBuffer * buf)
  128. {
  129. assert(!buf->msgNext);
  130. for (;;)
  131. {
  132. {
  133. CriticalBlock b(c_region);
  134. if (count < limit)
  135. {
  136. doEnqueue(buf);
  137. break; // signal outside the critical section, rather than here, so the waiting thread can progress
  138. }
  139. signal_free_sl++;
  140. }
  141. while (!free_sl.wait(3000))
  142. {
  143. if (udpTraceLevel >= 1)
  144. DBGLOG("queue_t::pushOwnWait blocked for 3 seconds waiting for free_sl semaphore");
  145. }
  146. }
  147. data_avail.signal();
  148. }
  149. void queue_t::pushOwn(DataBuffer *buf)
  150. {
  151. // Could probably be done lock-free, which given one thread using this is high priority might avoid some
  152. // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
  153. assert(!buf->msgNext);
  154. {
  155. CriticalBlock b(c_region);
  156. doEnqueue(buf);
  157. }
  158. data_avail.signal();
  159. }
  160. DataBuffer *queue_t::pop(bool block)
  161. {
  162. if (!data_avail.wait(block ? INFINITE : 0))
  163. return nullptr;
  164. DataBuffer *ret = nullptr;
  165. unsigned signalFreeSlots = 0;
  166. {
  167. CriticalBlock b(c_region);
  168. if (unlikely(!count))
  169. return nullptr;
  170. count.fastAdd(-1); // inside a critical section => not atomic
  171. ret = head;
  172. head = ret->msgNext;
  173. if (!head)
  174. {
  175. assert(!count);
  176. tail = nullptr;
  177. }
  178. if (count < limit && signal_free_sl)
  179. {
  180. signal_free_sl--;
  181. signalFreeSlots++;
  182. }
  183. }
  184. ret->msgNext = nullptr;
  185. if (signalFreeSlots)
  186. free_sl.signal(signalFreeSlots);
  187. return ret;
  188. }
  189. unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
  190. {
  191. unsigned removed = 0;
  192. unsigned signalFreeSlots = 0;
  193. {
  194. CriticalBlock b(c_region);
  195. if (count)
  196. {
  197. DataBuffer *prev = nullptr;
  198. DataBuffer *finger = head;
  199. while (finger)
  200. {
  201. if (!key || !pkCmpFn || pkCmpFn((const void*) finger, key))
  202. {
  203. auto temp = finger;
  204. finger = finger->msgNext;
  205. if (prev==nullptr)
  206. {
  207. assert(head==temp);
  208. head = finger;
  209. }
  210. else
  211. prev->msgNext = finger;
  212. if (temp==tail)
  213. tail = prev;
  214. ::Release(temp);
  215. count.fastAdd(-1);
  216. if (count < limit && signal_free_sl)
  217. {
  218. signal_free_sl--;
  219. signalFreeSlots++;
  220. }
  221. removed++;
  222. }
  223. else
  224. {
  225. prev = finger;
  226. finger = finger->msgNext;
  227. }
  228. }
  229. }
  230. }
  231. if (signalFreeSlots)
  232. free_sl.signal(signalFreeSlots);
  233. return removed;
  234. }
  235. bool queue_t::dataQueued(const void *key, PKT_CMP_FUN pkCmpFn)
  236. {
  237. CriticalBlock b(c_region);
  238. DataBuffer *finger = head;
  239. while (finger)
  240. {
  241. if (pkCmpFn((const void*) finger, key))
  242. return true;
  243. finger = finger->msgNext;
  244. }
  245. return false;
  246. }
  247. #ifndef _WIN32
  248. #define HOSTENT hostent
  249. #include <netdb.h>
  250. #endif
  251. int check_set(const char *path, int value)
  252. {
  253. #ifdef __linux__
  254. FILE *f = fopen(path,"r");
  255. char res[32];
  256. char *r = 0;
  257. int si = 0;
  258. if (f) {
  259. r = fgets(res, sizeof(res), f);
  260. fclose(f);
  261. }
  262. if (r)
  263. si = atoi(r);
  264. if (!si)
  265. {
  266. OWARNLOG("WARNING: Failed to read value for %s", path);
  267. return 0;
  268. }
  269. else if (si<value)
  270. return -1;
  271. #endif
  272. return 0;
  273. }
  274. int check_max_socket_read_buffer(int size) {
  275. return check_set("/proc/sys/net/core/rmem_max", size);
  276. }
  277. int check_max_socket_write_buffer(int size) {
  278. return check_set("/proc/sys/net/core/wmem_max", size);
  279. }
  280. #if defined( __linux__) || defined(__APPLE__)
  281. void setLinuxThreadPriority(int level)
  282. {
  283. if (!udpAdjustThreadPriorities)
  284. return;
  285. pthread_t self = pthread_self();
  286. int policy;
  287. sched_param param;
  288. int rc;
  289. if (( rc = pthread_getschedparam(self, &policy, &param)) != 0)
  290. DBGLOG("pthread_getschedparam error: %d", rc);
  291. if (level < 0)
  292. UNIMPLEMENTED;
  293. else if (!level)
  294. {
  295. param.sched_priority = 0;
  296. policy = SCHED_OTHER;
  297. }
  298. else
  299. {
  300. policy = SCHED_RR;
  301. param.sched_priority = level;
  302. }
  303. if(( rc = pthread_setschedparam(self, policy, &param)) != 0)
  304. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "i TID=%i", rc, policy, param.sched_priority, (unsigned __int64) self, threadLogID());
  305. else
  306. DBGLOG("priority set id=%" I64F "i policy=%i pri=%i TID=%i", (unsigned __int64) self, policy, param.sched_priority, threadLogID());
  307. }
  308. #endif
  309. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
  310. {
  311. if (bufferManager)
  312. bufferManager->poolStats(memStats);
  313. }
  314. RelaxedAtomic<unsigned> packetsOOO;
  315. bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
  316. {
  317. bool resent = false;
  318. sequence_t seq = hdr.sendSeq;
  319. if (hdr.pktSeq & UDP_PACKET_RESENT)
  320. resent = true;
  321. // Four cases: less than lastUnseen, equal to, within TRACKER_BITS of, or higher
  322. // Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
  323. bool duplicate = false;
  324. unsigned delta = seq - base;
  325. if (udpTraceLevel > 5)
  326. {
  327. DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
  328. dump();
  329. }
  330. if (delta < TRACKER_BITS)
  331. {
  332. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  333. unsigned bit = seq % 64;
  334. __uint64 bitm = U64C(1)<<bit;
  335. duplicate = (seen[idx] & bitm) != 0;
  336. seen[idx] |= bitm;
  337. if (seq==base)
  338. {
  339. while (seen[idx] & bitm)
  340. {
  341. // Important to update in this order, so that during the window where they are inconsistent we have
  342. // false negatives rather than false positives
  343. seen[idx] &= ~bitm;
  344. base++;
  345. idx = (base / 64) % TRACKER_DWORDS;
  346. bit = base % 64;
  347. bitm = U64C(1)<<bit;
  348. }
  349. }
  350. // calculate new hwm, with some care for wrapping
  351. if ((int) (seq - hwm) > 0)
  352. hwm = seq;
  353. else if (!resent)
  354. packetsOOO++;
  355. }
  356. else if (resent)
  357. // Don't treat a resend that goes out of range as indicative of a restart - it probably just means
  358. // that the resend was not needed and the original moved things on when it arrived
  359. duplicate = true;
  360. else
  361. {
  362. // We've gone forwards too far to track, or backwards because server restarted
  363. // We have taken steps to try to avoid the former...
  364. // In theory could try to preserve SOME information in the former case, but as it shouldn't happen, can we be bothered?
  365. #ifdef _DEBUG
  366. if (udpResendLostPackets)
  367. {
  368. DBGLOG("Received packet %" SEQF "u will cause loss of information in PacketTracker", seq);
  369. dump();
  370. }
  371. //assert(false);
  372. #endif
  373. memset(seen, 0, sizeof(seen));
  374. base = seq+1;
  375. hwm = seq;
  376. }
  377. return duplicate;
  378. }
  379. const PacketTracker PacketTracker::copy() const
  380. {
  381. // This is called within a critical section. Would be better if we could avoid having to do so,
  382. // but we want to be able to read a consistent set of values
  383. PacketTracker ret;
  384. ret.base = base;
  385. ret.hwm = hwm;
  386. memcpy(ret.seen, seen, sizeof(seen));
  387. return ret;
  388. }
  389. bool PacketTracker::hasSeen(sequence_t seq) const
  390. {
  391. // Accessed only on sender side where these are not modified, so no need for locking
  392. // Careful about wrapping!
  393. unsigned delta = seq - base;
  394. if (udpTraceLevel > 5)
  395. {
  396. DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
  397. dump();
  398. }
  399. if (delta < TRACKER_BITS)
  400. {
  401. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  402. unsigned bit = seq % 64;
  403. return (seen[idx] & (U64C(1)<<bit)) != 0;
  404. }
  405. else if (delta > INT_MAX) // Or we could just make delta a signed int? But code above would have to check >0
  406. return true;
  407. else
  408. return false;
  409. }
  410. bool PacketTracker::canRecord(sequence_t seq) const
  411. {
  412. // Careful about wrapping!
  413. unsigned delta = seq - base;
  414. if (udpTraceLevel > 5)
  415. {
  416. DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
  417. dump();
  418. }
  419. return (delta < TRACKER_BITS);
  420. }
  421. bool PacketTracker::hasGaps() const
  422. {
  423. return base!=hwm+1;
  424. }
  425. void PacketTracker::dump() const
  426. {
  427. DBGLOG("PacketTracker base=%" SEQF "u, hwm=%" SEQF "u, seen[0]=%" I64F "x", base, hwm, seen[0]);
  428. }
  429. //---------------------------------------------------------------------------------------------------------------------
  430. void sanityCheckUdpSettings(unsigned receiveQueueSize, unsigned numSenders, __uint64 networkSpeedBitsPerSecond)
  431. {
  432. unsigned maxDataPacketSize = 0x2000; // assume jumbo frames roxiemem::DATA_ALIGNMENT_SIZE;
  433. __uint64 bytesPerSecond = networkSpeedBitsPerSecond / 10;
  434. unsigned __int64 minPacketTimeNs = (maxDataPacketSize * U64C(1000000000)) / bytesPerSecond;
  435. unsigned __int64 minLatencyNs = 50000;
  436. unsigned maxSlotsPerClient = (udpMaxPendingPermits == 1) ? receiveQueueSize : (udpMaxClientPercent * receiveQueueSize) / (udpMaxPendingPermits * 100);
  437. unsigned __int64 minTimeForAllPackets = receiveQueueSize * minPacketTimeNs;
  438. //The data for a permit may arrive after the data from all the other senders => need to take the entire queue into account
  439. unsigned __int64 minTimeForPermitPackets = minTimeForAllPackets;
  440. auto trace = [](const char * title, unsigned value, unsigned __int64 minValue, unsigned maxFactor)
  441. {
  442. DBGLOG("%s: %u [%u..%u] us: %u [%u..%u]", title, value, (unsigned)(minValue/1000000), (unsigned)(minValue*maxFactor/1000000),
  443. value*1000, (unsigned)(minValue/1000), (unsigned)(minValue*maxFactor/1000));
  444. };
  445. //MORE: Allow the udpReceiverSize to be defined and finish implementing the following, with some comments to describe the thinking
  446. // All in milliseconds
  447. if (udpTraceTimeouts || udpTraceLevel >= 1)
  448. {
  449. DBGLOG("udpAssumeSequential: %s", boolToStr(udpAssumeSequential));
  450. DBGLOG("udpResendLostPackets: %s", boolToStr(udpResendLostPackets));
  451. DBGLOG("udpResendAllMissingPackets: %s", boolToStr(udpResendAllMissingPackets));
  452. DBGLOG("udpAdjustThreadPriorities: %s", boolToStr(udpAdjustThreadPriorities));
  453. DBGLOG("udpAllowAsyncPermits: %s", boolToStr(udpAllowAsyncPermits));
  454. trace("udpFlowAckTimeout", udpFlowAckTimeout, minLatencyNs*2, 20);
  455. trace("updDataSendTimeout", updDataSendTimeout, minTimeForAllPackets, 10);
  456. trace("udpPermitTimeout", udpPermitTimeout, 2 * minLatencyNs + minTimeForPermitPackets, 10);
  457. trace("udpRequestTimeout", udpRequestTimeout, (2 * minLatencyNs + minTimeForPermitPackets) * 2 / 5, 10);
  458. trace("udpResendDelay", udpResendDelay, minTimeForAllPackets, 10);
  459. DBGLOG("udpMaxPendingPermits: %u [%u..%u]", udpMaxPendingPermits, udpMaxPendingPermits, udpMaxPendingPermits);
  460. DBGLOG("udpMaxClientPercent: %u [%u..%u]", udpMaxClientPercent, 100, 500);
  461. DBGLOG("udpMaxPermitDeadTimeouts: %u [%u..%u]", udpMaxPermitDeadTimeouts, 2, 10);
  462. DBGLOG("udpRequestDeadTimeout: %u [%u..%u]", udpRequestDeadTimeout, 10000, 120000);
  463. DBGLOG("udpMinSlotsPerSender: %u [%u..%u]", udpMinSlotsPerSender, 1, 5);
  464. }
  465. // Some sanity checks
  466. if (!udpResendLostPackets)
  467. WARNLOG("udpResendLostPackets is currently disabled - only viable on a very reliable network");
  468. if (udpAllowAsyncPermits)
  469. {
  470. if (udpResendDelay == 0)
  471. ERRLOG("udpResendDelay of 0 should not be used if udpAllowAsyncPermits=true");
  472. }
  473. else
  474. {
  475. if (udpResendDelay != 0)
  476. WARNLOG("udpResendDelay of 0 is recommended if udpAllowAsyncPermits=false");
  477. }
  478. if (udpFlowAckTimeout == 0)
  479. {
  480. ERRLOG("udpFlowAckTimeout should not be set to 0");
  481. udpFlowAckTimeout = 1;
  482. }
  483. if (udpRequestTimeout == 0)
  484. {
  485. ERRLOG("udpRequestTimeout should not be set to 0");
  486. udpFlowAckTimeout = 10;
  487. }
  488. if (udpMaxPendingPermits > receiveQueueSize)
  489. throwUnexpectedX("udpMaxPendingPermits > receiveQueueSize");
  490. if (maxSlotsPerClient == 0)
  491. throwUnexpectedX("maxSlotsPerClient == 0");
  492. if (udpFlowAckTimeout * 10 > udpRequestTimeout)
  493. WARNLOG("udpFlowAckTimeout should be significantly smaller than udpRequestTimeout");
  494. if (udpRequestTimeout >= udpPermitTimeout)
  495. WARNLOG("udpRequestTimeout should be lower than udpPermitTimeout, otherwise dropped ok_to_send will not be spotted early enough");
  496. if (udpMaxPendingPermits == 1)
  497. WARNLOG("udpMaxPendingPermits=1: only one sender can send at a time");
  498. if (udpMaxClientPercent < 100)
  499. ERRLOG("udpMaxClientPercent should be >= 100");
  500. else if (maxSlotsPerClient * udpMaxClientPercent / 100 > receiveQueueSize)
  501. ERRLOG("maxSlotsPerClient * udpMaxClientPercent exceeds the queue size => all slots will be initially allocated to the first sender");
  502. if (udpMinSlotsPerSender > 10)
  503. ERRLOG("udpMinSlotsPerSender of %u is higher than recommended", udpMinSlotsPerSender);
  504. }
  505. //---------------------------------------------------------------------------------------------------------------------
  506. #ifdef _USE_CPPUNIT
  507. #include "unittests.hpp"
  508. class PacketTrackerTest : public CppUnit::TestFixture
  509. {
  510. CPPUNIT_TEST_SUITE(PacketTrackerTest);
  511. CPPUNIT_TEST(testNoteSeen);
  512. CPPUNIT_TEST(testReplay);
  513. CPPUNIT_TEST_SUITE_END();
  514. void testNoteSeen()
  515. {
  516. PacketTracker p;
  517. UdpPacketHeader hdr;
  518. hdr.pktSeq = 0;
  519. // Some simple tests
  520. CPPUNIT_ASSERT(!p.hasSeen(0));
  521. CPPUNIT_ASSERT(!p.hasSeen(1));
  522. hdr.sendSeq = 0;
  523. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  524. CPPUNIT_ASSERT(p.hasSeen(0));
  525. CPPUNIT_ASSERT(!p.hasSeen(1));
  526. CPPUNIT_ASSERT(!p.hasSeen(2000));
  527. CPPUNIT_ASSERT(!p.hasSeen(2001));
  528. hdr.pktSeq = UDP_PACKET_RESENT;
  529. CPPUNIT_ASSERT(p.noteSeen(hdr));
  530. hdr.pktSeq = 0;
  531. hdr.sendSeq = 2000;
  532. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  533. CPPUNIT_ASSERT(p.hasSeen(0));
  534. CPPUNIT_ASSERT(p.hasSeen(1));
  535. CPPUNIT_ASSERT(p.hasSeen(2000));
  536. CPPUNIT_ASSERT(!p.hasSeen(2001));
  537. hdr.sendSeq = 0;
  538. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  539. CPPUNIT_ASSERT(p.hasSeen(0));
  540. CPPUNIT_ASSERT(!p.hasSeen(1));
  541. CPPUNIT_ASSERT(!p.hasSeen(2000));
  542. CPPUNIT_ASSERT(!p.hasSeen(2001));
  543. PacketTracker p2;
  544. hdr.sendSeq = 1;
  545. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  546. CPPUNIT_ASSERT(!p2.hasSeen(0));
  547. CPPUNIT_ASSERT(p2.hasSeen(1));
  548. hdr.sendSeq = TRACKER_BITS-1; // This is the highest value we can record without losing information
  549. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  550. CPPUNIT_ASSERT(!p2.hasSeen(0));
  551. CPPUNIT_ASSERT(p2.hasSeen(1));
  552. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  553. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS));
  554. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  555. hdr.sendSeq = TRACKER_BITS;
  556. p2.noteSeen(hdr);
  557. CPPUNIT_ASSERT(p2.hasSeen(0));
  558. CPPUNIT_ASSERT(p2.hasSeen(1));
  559. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  560. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS));
  561. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  562. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+2));
  563. }
  564. void t(PacketTracker &p, sequence_t seq, unsigned pseq)
  565. {
  566. UdpPacketHeader hdr;
  567. hdr.sendSeq = seq;
  568. hdr.pktSeq = pseq;
  569. if (seq==29)
  570. CPPUNIT_ASSERT(p.noteSeen(hdr) == false);
  571. else
  572. p.noteSeen(hdr);
  573. }
  574. void testReplay()
  575. {
  576. PacketTracker p;
  577. t(p,1,0x1);
  578. t(p,2,0x2);
  579. t(p,3,0x3);
  580. t(p,4,0x4);
  581. t(p,5,0x5);
  582. t(p,6,0x6);
  583. t(p,7,0x7);
  584. t(p,8,0x8);
  585. t(p,9,0x9);
  586. t(p,11,0xb);
  587. t(p,12,0xc);
  588. t(p,13,0xd);
  589. t(p,14,0xe);
  590. t(p,15,0xf);
  591. t(p,16,0x10);
  592. t(p,17,0x11);
  593. t(p,18,0x12);
  594. t(p,19,0x13);
  595. t(p,20,0x14);
  596. t(p,21,0x15);
  597. t(p,22,0x16);
  598. t(p,23,0x17);
  599. t(p,24,0x18);
  600. t(p,25,0x19);
  601. t(p,26,0x1a);
  602. t(p,27,0x1b);
  603. t(p,28,0x1c);
  604. t(p,50,0x40000032);
  605. t(p,51,0x40000033);
  606. t(p,52,0x40000034);
  607. t(p,53,0x40000035);
  608. t(p,54,0x40000036);
  609. t(p,55,0x40000037);
  610. t(p,56,0x40000038);
  611. t(p,57,0x40000039);
  612. t(p,58,0x4000003a);
  613. t(p,59,0x4000003b);
  614. t(p,60,0x4000003c);
  615. t(p,61,0x4000003d);
  616. t(p,62,0xc0000000);
  617. t(p,63,0x4000003e);
  618. t(p,64,0x4000003f);
  619. t(p,65,0x40000040);
  620. t(p,66,0x40000041);
  621. t(p,67,0x40000042);
  622. t(p,68,0x40000043);
  623. t(p,69,0x40000044);
  624. t(p,70,0x40000045);
  625. t(p,71,0x40000046);
  626. t(p,72,0x40000047);
  627. t(p,73,0x40000048);
  628. t(p,74,0x40000049);
  629. t(p,75,0x4000004a);
  630. t(p,76,0x4000004b);
  631. t(p,77,0x4000004c);
  632. t(p,78,0x4000004d);
  633. t(p,79,0x4000004e);
  634. t(p,80,0x4000004f);
  635. t(p,81,0x40000050);
  636. t(p,82,0x40000051);
  637. t(p,83,0x40000052);
  638. t(p,84,0x40000053);
  639. t(p,85,0x40000054);
  640. t(p,86,0x40000055);
  641. t(p,87,0x40000056);
  642. t(p,88,0x40000057);
  643. t(p,89,0x40000058);
  644. t(p,90,0x40000059);
  645. t(p,91,0x4000005a);
  646. t(p,92,0x4000005b);
  647. t(p,93,0x4000005c);
  648. t(p,0,0x40000000);
  649. t(p,1,0x40000001);
  650. t(p,2,0x40000002);
  651. t(p,3,0x40000003);
  652. t(p,4,0x40000004);
  653. t(p,5,0x40000005);
  654. t(p,6,0x40000006);
  655. t(p,7,0x40000007);
  656. t(p,8,0x40000008);
  657. t(p,9,0x40000009);
  658. t(p,10,0x4000000a);
  659. t(p,11,0x4000000b);
  660. t(p,12,0x4000000c);
  661. t(p,13,0x4000000d);
  662. t(p,14,0x4000000e);
  663. t(p,15,0x4000000f);
  664. t(p,16,0x40000010);
  665. t(p,17,0x40000011);
  666. t(p,18,0x40000012);
  667. t(p,19,0x40000013);
  668. t(p,20,0x40000014);
  669. t(p,21,0x40000015);
  670. t(p,22,0x40000016);
  671. t(p,23,0x40000017);
  672. t(p,24,0x40000018);
  673. t(p,25,0x40000019);
  674. t(p,26,0x4000001a);
  675. t(p,27,0x4000001b);
  676. t(p,28,0x4000001c);
  677. t(p,29,0x4000001d);
  678. }
  679. };
  680. CPPUNIT_TEST_SUITE_REGISTRATION( PacketTrackerTest );
  681. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PacketTrackerTest, "PacketTrackerTest" );
  682. #endif
  683. /*
  684. Crazy thoughts on network-wide flow control
  685. Avoid sending data that clashes with other outbound or inbound data
  686. is outbound really an issue?
  687. if only inbound, should be easier
  688. can have each inbound node police its own, for a start
  689. udplib already tries to do this
  690. when sending permission to send, best to pick someone that is not sending to anyone else
  691. udplib already tries to do this
  692. but it can still lead to idleness - id node 1 sending to node 2, and node2 to node 1, node3 can't find anyone idle.
  693. If you do need global:
  694. Every bit of data getting sent (perhaps over a certain size threshold?) gets permission from central traffic cop
  695. Outbound packet says source node, target node size
  696. Reply says source,target,size
  697. Cop allows immediately if nothing inflight between those pairs
  698. Cop assumes completion
  699. Cop redundancy
  700. - a backup cop is listening in?
  701. - use multicast for requests and replies?
  702. - no reply implies what?
  703. - backup cop just needs heartbeat from active cop
  704. - permission expires
  705. - multiple cops for blocks of targets?
  706. - but I want global view of who is sending
  707. */
  708. /* Simulating traffic flow
  709. N threads that simulate behaviour of agents
  710. fake write socket that
  711. - accepts data pushed to it
  712. - moves it to a read socket
  713. fake read socket that
  714. - accepts packets from write sockets
  715. - discards when full (but tells you)
  716. - delivers packets to consumer
  717. */
  718. #ifdef SOCKET_SIMULATION
  719. bool isUdpTestMode = false;
  720. bool udpTestUseUdpSockets = true;
  721. bool udpTestSocketJitter = false;
  722. unsigned udpTestSocketDelay = 0;
  723. bool udpTestVariableDelay = false;
  724. static CriticalSection allWriteSocketsCrit;
  725. static ICopyArrayOf<CSimulatedQueueWriteSocket> allWriteSockets;
  726. class DelayedSocketWriter : public Thread
  727. {
  728. public:
  729. virtual int run() override
  730. {
  731. while (running)
  732. {
  733. unsigned shortestDelay = udpTestSocketDelay;
  734. unsigned now = msTick();
  735. {
  736. CriticalBlock b(allWriteSocketsCrit);
  737. ForEachItemIn(idx, allWriteSockets)
  738. {
  739. CSimulatedQueueWriteSocket &ws = allWriteSockets.item(idx);
  740. shortestDelay = std::min(shortestDelay, ws.writeDelayed(now));
  741. }
  742. }
  743. MilliSleep(shortestDelay);
  744. }
  745. return 0;
  746. }
  747. virtual void start()
  748. {
  749. running = true;
  750. Thread::start();
  751. }
  752. void stop()
  753. {
  754. running = false;
  755. join();
  756. }
  757. private:
  758. std::atomic<bool> running = { false };
  759. } delayedWriter;
  760. CSimulatedQueueWriteSocket::CSimulatedQueueWriteSocket(const SocketEndpoint &ep) : destEp(ep), delay(udpTestSocketDelay), jitter(udpTestSocketJitter)
  761. {
  762. if (delay)
  763. {
  764. CriticalBlock b(allWriteSocketsCrit);
  765. if (!allWriteSockets.length())
  766. delayedWriter.start();
  767. allWriteSockets.append(*this);
  768. }
  769. }
  770. CSimulatedQueueWriteSocket::~CSimulatedQueueWriteSocket()
  771. {
  772. if (delay)
  773. {
  774. CriticalBlock b(allWriteSocketsCrit);
  775. allWriteSockets.zap(*this);
  776. if (!allWriteSockets.length())
  777. delayedWriter.stop();
  778. }
  779. }
  780. CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
  781. {
  782. return new CSimulatedQueueWriteSocket(ep);
  783. }
  784. unsigned CSimulatedQueueWriteSocket::writeDelayed(unsigned now)
  785. {
  786. CriticalBlock b(crit);
  787. while (dueTimes.size())
  788. {
  789. int delay = dueTimes.front() - now;
  790. if (delay > 0)
  791. return delay;
  792. unsigned jitteredSize = 0;
  793. const void *jitteredBuff = nullptr;
  794. if (jitter && dueTimes.size()>1 && rand() % 100 == 0)
  795. {
  796. jitteredSize = packetSizes.front();
  797. jitteredBuff = packets.front();
  798. dueTimes.pop();
  799. packets.pop();
  800. packetSizes.pop();
  801. }
  802. CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
  803. CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
  804. if (dest)
  805. {
  806. dest->writeOwnSimulatedPacket(packets.front(), packetSizes.front());
  807. if (jitteredBuff)
  808. dest->writeOwnSimulatedPacket(jitteredBuff, jitteredSize);
  809. }
  810. else
  811. {
  812. StringBuffer s;
  813. free((void *) packets.front());
  814. if (jitteredBuff)
  815. free((void *) jitteredBuff);
  816. DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
  817. }
  818. dueTimes.pop();
  819. packets.pop();
  820. packetSizes.pop();
  821. }
  822. return (unsigned) -1;
  823. }
  824. size32_t CSimulatedQueueWriteSocket::write(void const* buf, size32_t size)
  825. {
  826. if (delay)
  827. {
  828. CriticalBlock b(crit);
  829. packetSizes.push(size);
  830. packets.push(memcpy(malloc(size), buf, size));
  831. dueTimes.push(msTick() + delay * (udpTestVariableDelay && size>200 ? 1 : 3));
  832. }
  833. else
  834. {
  835. CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
  836. CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
  837. if (dest)
  838. dest->writeSimulatedPacket(buf, size);
  839. else
  840. {
  841. StringBuffer s;
  842. DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
  843. }
  844. }
  845. return size;
  846. }
  847. std::map<SocketEndpoint, CSimulatedQueueReadSocket *> CSimulatedQueueReadSocket::allReaders;
  848. CriticalSection CSimulatedQueueReadSocket::allReadersCrit;
  849. CSimulatedQueueReadSocket::CSimulatedQueueReadSocket(const SocketEndpoint &_me) : me(_me)
  850. {
  851. StringBuffer s;
  852. DBGLOG("Creating fake socket %s", me.getUrlStr(s).str());
  853. CriticalBlock b(allReadersCrit);
  854. allReaders[me] = this;
  855. }
  856. CSimulatedQueueReadSocket::~CSimulatedQueueReadSocket()
  857. {
  858. StringBuffer s;
  859. DBGLOG("Closing fake socket %s", me.getUrlStr(s).str());
  860. CriticalBlock b(allReadersCrit);
  861. allReaders.erase(me);
  862. }
  863. CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::udp_create(const SocketEndpoint &_me)
  864. {
  865. return new CSimulatedQueueReadSocket(_me);
  866. }
  867. CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
  868. {
  869. CriticalBlock b(allReadersCrit);
  870. return allReaders[ep];
  871. }
  872. void CSimulatedQueueReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
  873. {
  874. {
  875. CriticalBlock b(crit);
  876. if (size+used > max)
  877. {
  878. DBGLOG("Lost packet");
  879. return;
  880. }
  881. packetSizes.push(size);
  882. packets.push(memcpy(malloc(size), buf, size));
  883. used += size;
  884. }
  885. // StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
  886. avail.signal();
  887. }
  888. void CSimulatedQueueReadSocket::writeOwnSimulatedPacket(void const* buf, size32_t size)
  889. {
  890. {
  891. CriticalBlock b(crit);
  892. if (size+used > max)
  893. {
  894. DBGLOG("Lost packet");
  895. free((void *) buf);
  896. return;
  897. }
  898. packetSizes.push(size);
  899. packets.push(buf);
  900. used += size;
  901. }
  902. // StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
  903. avail.signal();
  904. }
  905. void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
  906. {
  907. unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
  908. readtms(buf, min_size, max_size, size_read, tms);
  909. }
  910. void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  911. unsigned timeout)
  912. {
  913. size_read = 0;
  914. if (!timeout || wait_read(timeout))
  915. {
  916. CriticalBlock b(crit);
  917. const void *thisData = packets.front();
  918. unsigned thisSize = packetSizes.front();
  919. if (thisSize > max_size)
  920. {
  921. assert(false);
  922. UNIMPLEMENTED; // Partial packet read not supported yet - add if needed
  923. }
  924. else
  925. {
  926. packets.pop();
  927. packetSizes.pop();
  928. used -= thisSize;
  929. }
  930. size_read = thisSize;
  931. memcpy(buf, thisData, thisSize);
  932. free((void *) thisData);
  933. }
  934. else
  935. throw makeStringException(JSOCKERR_timeout_expired, "");
  936. }
  937. int CSimulatedQueueReadSocket::wait_read(unsigned timeout)
  938. {
  939. bool ret = avail.wait(timeout);
  940. return ret;
  941. }
  942. //------------------------------------------------------------------------------------------------------------------------------
  943. //Hash the ip and port and map that to a local port - complain if more than one combination is hashed to the same port.
  944. constexpr unsigned basePort = 9010;
  945. constexpr unsigned maxPorts = 980;
  946. static std::atomic_bool connected[maxPorts];
  947. unsigned getMappedSocketPort(const SocketEndpoint & ep)
  948. {
  949. unsigned hash = ep.hash(0x31415926);
  950. return basePort + hash % maxPorts;
  951. }
  952. CSimulatedUdpReadSocket::CSimulatedUdpReadSocket(const SocketEndpoint &_me)
  953. {
  954. port = getMappedSocketPort(_me);
  955. if (connected[port-basePort].exchange(true))
  956. throw makeStringException(0, "Two ip/ports mapped to the same port - improve the hash (or change maxPorts)!");
  957. realSocket.setown(ISocket::udp_create(port));
  958. }
  959. CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket()
  960. {
  961. connected[port-basePort].exchange(false);
  962. }
  963. size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
  964. void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
  965. void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
  966. {
  967. realSocket->read(buf, min_size, max_size, size_read, timeoutsecs);
  968. }
  969. void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
  970. {
  971. realSocket->readtms(buf, min_size, max_size, size_read, timeout);
  972. }
  973. int CSimulatedUdpReadSocket::wait_read(unsigned timeout)
  974. {
  975. return realSocket->wait_read(timeout);
  976. }
  977. void CSimulatedUdpReadSocket::close()
  978. {
  979. realSocket->close();
  980. }
  981. CSimulatedUdpReadSocket* CSimulatedUdpReadSocket::udp_create(const SocketEndpoint &_me) { return new CSimulatedUdpReadSocket(_me); }
  982. CSimulatedUdpWriteSocket::CSimulatedUdpWriteSocket( const SocketEndpoint &ep)
  983. {
  984. unsigned port = getMappedSocketPort(ep);
  985. SocketEndpoint localEp(port, queryLocalIP());
  986. realSocket.setown(ISocket::udp_connect(localEp));
  987. }
  988. CSimulatedUdpWriteSocket* CSimulatedUdpWriteSocket::udp_connect( const SocketEndpoint &ep) { return new CSimulatedUdpWriteSocket(ep); }
  989. size32_t CSimulatedUdpWriteSocket::write(void const* buf, size32_t size)
  990. {
  991. return realSocket->write(buf, size);
  992. }
  993. void CSimulatedUdpWriteSocket::set_send_buffer_size(size32_t sz)
  994. {
  995. realSocket->set_send_buffer_size(sz);
  996. }
  997. void CSimulatedUdpWriteSocket::close()
  998. {
  999. realSocket->close();
  1000. }
  1001. #endif