udpsha.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "jsocket.hpp"
  16. #include "jlog.hpp"
  17. #include "roxie.hpp"
  18. #include "roxiemem.hpp"
  19. #include "portlist.h"
  20. #ifdef _WIN32
  21. #include <winsock2.h>
  22. #else
  23. #include <sys/socket.h>
  24. #endif
  25. using roxiemem::DataBuffer;
  26. using roxiemem::IDataBufferManager;
  27. IDataBufferManager *bufferManager;
  28. bool udpTraceFlow = false;
  29. bool udpTraceTimeouts = false;
  30. unsigned udpTraceLevel = 0;
  31. unsigned udpFlowSocketsSize = 131072;
  32. unsigned udpLocalWriteSocketSize = 1024000;
  33. unsigned udpStatsReportInterval = 60000;
  34. #ifdef TEST_DROPPED_PACKETS
  35. bool udpDropDataPackets = false;
  36. unsigned udpDropFlowPackets[flowType::max_flow_cmd] = {};
  37. unsigned flowPacketsSent[flowType::max_flow_cmd] = {};
  38. #endif
  39. unsigned multicastTTL = 1;
  40. MODULE_INIT(INIT_PRIORITY_STANDARD)
  41. {
  42. bufferManager = roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE);
  43. return true;
  44. }
  45. MODULE_EXIT()
  46. {
  47. bufferManager->Release();
  48. }
  49. const IpAddress ServerIdentifier::getIpAddress() const
  50. {
  51. IpAddress ret;
  52. ret.setIP4(netAddress);
  53. return ret;
  54. }
  55. bool ServerIdentifier::isMe() const
  56. {
  57. return *this==myNode;
  58. }
  59. ServerIdentifier myNode;
  60. //---------------------------------------------------------------------------------------------
  61. void queue_t::set_queue_size(unsigned _limit)
  62. {
  63. limit = _limit;
  64. }
  65. queue_t::queue_t(unsigned _limit)
  66. {
  67. set_queue_size(_limit);
  68. }
  69. queue_t::~queue_t()
  70. {
  71. while (head)
  72. {
  73. auto p = head;
  74. head = head->msgNext;
  75. ::Release(p);
  76. }
  77. }
  78. unsigned queue_t::available()
  79. {
  80. CriticalBlock b(c_region);
  81. if (count < limit)
  82. return limit - count;
  83. return 0;
  84. }
  85. int queue_t::free_slots()
  86. {
  87. int res=0;
  88. while (res <= 0)
  89. {
  90. c_region.enter();
  91. res = limit - count;
  92. if (res <= 0)
  93. signal_free_sl++;
  94. c_region.leave();
  95. if (res <= 0)
  96. {
  97. while (!free_sl.wait(3000))
  98. {
  99. if (udpTraceLevel >= 1)
  100. DBGLOG("queue_t::free_slots blocked for 3 seconds waiting for free_sl semaphore");
  101. }
  102. }
  103. }
  104. return res;
  105. }
  106. void queue_t::interrupt()
  107. {
  108. data_avail.interrupt();
  109. }
  110. void queue_t::pushOwn(DataBuffer *buf)
  111. {
  112. // Could probably be done lock-free, which given one thread using this is high priority might avoid some
  113. // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
  114. assert(!buf->msgNext);
  115. {
  116. CriticalBlock b(c_region);
  117. if (tail)
  118. {
  119. assert(head);
  120. assert(!tail->msgNext);
  121. tail->msgNext = buf;
  122. }
  123. else
  124. {
  125. assert(!head);
  126. head = buf;
  127. }
  128. tail = buf;
  129. count++;
  130. #ifdef _DEBUG
  131. if (count > limit)
  132. DBGLOG("queue_t::pushOwn set count to %u", count);
  133. #endif
  134. }
  135. data_avail.signal();
  136. }
  137. DataBuffer *queue_t::pop(bool block)
  138. {
  139. if (!data_avail.wait(block ? INFINITE : 0))
  140. return nullptr;
  141. DataBuffer *ret = nullptr;
  142. unsigned signalFreeSlots = 0;
  143. {
  144. CriticalBlock b(c_region);
  145. if (unlikely(!count))
  146. return nullptr;
  147. count--;
  148. ret = head;
  149. head = ret->msgNext;
  150. if (!head)
  151. {
  152. assert(!count);
  153. tail = nullptr;
  154. }
  155. if (count < limit && signal_free_sl)
  156. {
  157. signal_free_sl--;
  158. signalFreeSlots++;
  159. }
  160. }
  161. ret->msgNext = nullptr;
  162. if (signalFreeSlots)
  163. free_sl.signal(signalFreeSlots);
  164. return ret;
  165. }
  166. unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
  167. {
  168. unsigned removed = 0;
  169. unsigned signalFreeSlots = 0;
  170. {
  171. CriticalBlock b(c_region);
  172. if (count)
  173. {
  174. DataBuffer *prev = nullptr;
  175. DataBuffer *finger = head;
  176. while (finger)
  177. {
  178. if (!key || !pkCmpFn || pkCmpFn((const void*) finger, key))
  179. {
  180. auto temp = finger;
  181. finger = finger->msgNext;
  182. if (prev==nullptr)
  183. {
  184. assert(head==temp);
  185. head = finger;
  186. }
  187. else
  188. prev->msgNext = finger;
  189. if (temp==tail)
  190. tail = prev;
  191. ::Release(temp);
  192. count--;
  193. if (count < limit && signal_free_sl)
  194. {
  195. signal_free_sl--;
  196. signalFreeSlots++;
  197. }
  198. removed++;
  199. }
  200. else
  201. {
  202. prev = finger;
  203. finger = finger->msgNext;
  204. }
  205. }
  206. }
  207. }
  208. if (signalFreeSlots)
  209. free_sl.signal(signalFreeSlots);
  210. return removed;
  211. }
  212. bool queue_t::dataQueued(const void *key, PKT_CMP_FUN pkCmpFn)
  213. {
  214. CriticalBlock b(c_region);
  215. DataBuffer *finger = head;
  216. while (finger)
  217. {
  218. if (pkCmpFn((const void*) finger, key))
  219. return true;
  220. finger = finger->msgNext;
  221. }
  222. return false;
  223. }
  224. #ifndef _WIN32
  225. #define HOSTENT hostent
  226. #include <netdb.h>
  227. #endif
  228. int check_set(const char *path, int value)
  229. {
  230. #ifdef __linux__
  231. FILE *f = fopen(path,"r");
  232. char res[32];
  233. char *r = 0;
  234. int si = 0;
  235. if (f) {
  236. r = fgets(res, sizeof(res), f);
  237. fclose(f);
  238. }
  239. if (r)
  240. si = atoi(r);
  241. if (!si)
  242. {
  243. OWARNLOG("WARNING: Failed to read value for %s", path);
  244. return 0;
  245. }
  246. else if (si<value)
  247. return -1;
  248. #endif
  249. return 0;
  250. }
  251. int check_max_socket_read_buffer(int size) {
  252. return check_set("/proc/sys/net/core/rmem_max", size);
  253. }
  254. int check_max_socket_write_buffer(int size) {
  255. return check_set("/proc/sys/net/core/wmem_max", size);
  256. }
  257. #if defined( __linux__) || defined(__APPLE__)
  258. void setLinuxThreadPriority(int level)
  259. {
  260. pthread_t self = pthread_self();
  261. int policy;
  262. sched_param param;
  263. int rc;
  264. if (( rc = pthread_getschedparam(self, &policy, &param)) != 0)
  265. DBGLOG("pthread_getschedparam error: %d", rc);
  266. if (level < 0)
  267. UNIMPLEMENTED;
  268. else if (!level)
  269. {
  270. param.sched_priority = 0;
  271. policy = SCHED_OTHER;
  272. }
  273. else
  274. {
  275. policy = SCHED_RR;
  276. param.sched_priority = level;
  277. }
  278. if(( rc = pthread_setschedparam(self, policy, &param)) != 0)
  279. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "i TID=%i", rc, policy, param.sched_priority, (unsigned __int64) self, threadLogID());
  280. else
  281. DBGLOG("priority set id=%" I64F "i policy=%i pri=%i TID=%i", (unsigned __int64) self, policy, param.sched_priority, threadLogID());
  282. }
  283. #endif
  284. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
  285. {
  286. if (bufferManager)
  287. bufferManager->poolStats(memStats);
  288. }
  289. RelaxedAtomic<unsigned> packetsOOO;
  290. bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
  291. {
  292. bool resent = false;
  293. sequence_t seq = hdr.sendSeq;
  294. if (hdr.pktSeq & UDP_PACKET_RESENT)
  295. resent = true;
  296. // Four cases: less than lastUnseen, equal to, within TRACKER_BITS of, or higher
  297. // Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
  298. bool duplicate = false;
  299. unsigned delta = seq - base;
  300. if (udpTraceLevel > 5)
  301. {
  302. DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
  303. dump();
  304. }
  305. if (delta < TRACKER_BITS)
  306. {
  307. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  308. unsigned bit = seq % 64;
  309. __uint64 bitm = U64C(1)<<bit;
  310. duplicate = (seen[idx] & bitm) != 0;
  311. seen[idx] |= bitm;
  312. if (seq==base)
  313. {
  314. while (seen[idx] & bitm)
  315. {
  316. // Important to update in this order, so that during the window where they are inconsistent we have
  317. // false negatives rather than false positives
  318. seen[idx] &= ~bitm;
  319. base++;
  320. idx = (base / 64) % TRACKER_DWORDS;
  321. bit = base % 64;
  322. bitm = U64C(1)<<bit;
  323. }
  324. }
  325. // calculate new hwm, with some care for wrapping
  326. if ((int) (seq - hwm) > 0)
  327. hwm = seq;
  328. else if (!resent)
  329. packetsOOO++;
  330. }
  331. else if (resent)
  332. // Don't treat a resend that goes out of range as indicative of a restart - it probably just means
  333. // that the resend was not needed and the original moved things on when it arrived
  334. duplicate = true;
  335. else
  336. {
  337. // We've gone forwards too far to track, or backwards because server restarted
  338. // We have taken steps to try to avoid the former...
  339. // In theory could try to preserve SOME information in the former case, but as it shouldn't happen, can we be bothered?
  340. #ifdef _DEBUG
  341. if (udpResendLostPackets)
  342. {
  343. DBGLOG("Received packet %" SEQF "u will cause loss of information in PacketTracker", seq);
  344. dump();
  345. }
  346. //assert(false);
  347. #endif
  348. memset(seen, 0, sizeof(seen));
  349. base = seq+1;
  350. hwm = seq;
  351. }
  352. return duplicate;
  353. }
  354. const PacketTracker PacketTracker::copy() const
  355. {
  356. // This is called within a critical section. Would be better if we could avoid having to do so,
  357. // but we want to be able to read a consistent set of values
  358. PacketTracker ret;
  359. ret.base = base;
  360. ret.hwm = hwm;
  361. memcpy(ret.seen, seen, sizeof(seen));
  362. return ret;
  363. }
  364. bool PacketTracker::hasSeen(sequence_t seq) const
  365. {
  366. // Accessed only on sender side where these are not modified, so no need for locking
  367. // Careful about wrapping!
  368. unsigned delta = seq - base;
  369. if (udpTraceLevel > 5)
  370. {
  371. DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
  372. dump();
  373. }
  374. if (delta < TRACKER_BITS)
  375. {
  376. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  377. unsigned bit = seq % 64;
  378. return (seen[idx] & (U64C(1)<<bit)) != 0;
  379. }
  380. else if (delta > INT_MAX) // Or we could just make delta a signed int? But code above would have to check >0
  381. return true;
  382. else
  383. return false;
  384. }
  385. bool PacketTracker::canRecord(sequence_t seq) const
  386. {
  387. // Careful about wrapping!
  388. unsigned delta = seq - base;
  389. if (udpTraceLevel > 5)
  390. {
  391. DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
  392. dump();
  393. }
  394. return (delta < TRACKER_BITS);
  395. }
  396. bool PacketTracker::hasGaps() const
  397. {
  398. return base!=hwm+1;
  399. }
  400. void PacketTracker::dump() const
  401. {
  402. DBGLOG("PacketTracker base=%" SEQF "u, hwm=%" SEQF "u, seen[0]=%" I64F "x", base, hwm, seen[0]);
  403. }
  404. #ifdef _USE_CPPUNIT
  405. #include "unittests.hpp"
  406. class PacketTrackerTest : public CppUnit::TestFixture
  407. {
  408. CPPUNIT_TEST_SUITE(PacketTrackerTest);
  409. CPPUNIT_TEST(testNoteSeen);
  410. CPPUNIT_TEST(testReplay);
  411. CPPUNIT_TEST_SUITE_END();
  412. void testNoteSeen()
  413. {
  414. PacketTracker p;
  415. UdpPacketHeader hdr;
  416. hdr.pktSeq = 0;
  417. // Some simple tests
  418. CPPUNIT_ASSERT(!p.hasSeen(0));
  419. CPPUNIT_ASSERT(!p.hasSeen(1));
  420. hdr.sendSeq = 0;
  421. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  422. CPPUNIT_ASSERT(p.hasSeen(0));
  423. CPPUNIT_ASSERT(!p.hasSeen(1));
  424. CPPUNIT_ASSERT(!p.hasSeen(2000));
  425. CPPUNIT_ASSERT(!p.hasSeen(2001));
  426. hdr.pktSeq = UDP_PACKET_RESENT;
  427. CPPUNIT_ASSERT(p.noteSeen(hdr));
  428. hdr.pktSeq = 0;
  429. hdr.sendSeq = 2000;
  430. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  431. CPPUNIT_ASSERT(p.hasSeen(0));
  432. CPPUNIT_ASSERT(p.hasSeen(1));
  433. CPPUNIT_ASSERT(p.hasSeen(2000));
  434. CPPUNIT_ASSERT(!p.hasSeen(2001));
  435. hdr.sendSeq = 0;
  436. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  437. CPPUNIT_ASSERT(p.hasSeen(0));
  438. CPPUNIT_ASSERT(!p.hasSeen(1));
  439. CPPUNIT_ASSERT(!p.hasSeen(2000));
  440. CPPUNIT_ASSERT(!p.hasSeen(2001));
  441. PacketTracker p2;
  442. hdr.sendSeq = 1;
  443. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  444. CPPUNIT_ASSERT(!p2.hasSeen(0));
  445. CPPUNIT_ASSERT(p2.hasSeen(1));
  446. hdr.sendSeq = TRACKER_BITS-1; // This is the highest value we can record without losing information
  447. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  448. CPPUNIT_ASSERT(!p2.hasSeen(0));
  449. CPPUNIT_ASSERT(p2.hasSeen(1));
  450. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  451. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS));
  452. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  453. hdr.sendSeq = TRACKER_BITS;
  454. p2.noteSeen(hdr);
  455. CPPUNIT_ASSERT(p2.hasSeen(0));
  456. CPPUNIT_ASSERT(p2.hasSeen(1));
  457. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  458. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS));
  459. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  460. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+2));
  461. }
  462. void t(PacketTracker &p, sequence_t seq, unsigned pseq)
  463. {
  464. UdpPacketHeader hdr;
  465. hdr.sendSeq = seq;
  466. hdr.pktSeq = pseq;
  467. if (seq==29)
  468. CPPUNIT_ASSERT(p.noteSeen(hdr) == false);
  469. else
  470. p.noteSeen(hdr);
  471. }
  472. void testReplay()
  473. {
  474. PacketTracker p;
  475. t(p,1,0x1);
  476. t(p,2,0x2);
  477. t(p,3,0x3);
  478. t(p,4,0x4);
  479. t(p,5,0x5);
  480. t(p,6,0x6);
  481. t(p,7,0x7);
  482. t(p,8,0x8);
  483. t(p,9,0x9);
  484. t(p,11,0xb);
  485. t(p,12,0xc);
  486. t(p,13,0xd);
  487. t(p,14,0xe);
  488. t(p,15,0xf);
  489. t(p,16,0x10);
  490. t(p,17,0x11);
  491. t(p,18,0x12);
  492. t(p,19,0x13);
  493. t(p,20,0x14);
  494. t(p,21,0x15);
  495. t(p,22,0x16);
  496. t(p,23,0x17);
  497. t(p,24,0x18);
  498. t(p,25,0x19);
  499. t(p,26,0x1a);
  500. t(p,27,0x1b);
  501. t(p,28,0x1c);
  502. t(p,50,0x40000032);
  503. t(p,51,0x40000033);
  504. t(p,52,0x40000034);
  505. t(p,53,0x40000035);
  506. t(p,54,0x40000036);
  507. t(p,55,0x40000037);
  508. t(p,56,0x40000038);
  509. t(p,57,0x40000039);
  510. t(p,58,0x4000003a);
  511. t(p,59,0x4000003b);
  512. t(p,60,0x4000003c);
  513. t(p,61,0x4000003d);
  514. t(p,62,0xc0000000);
  515. t(p,63,0x4000003e);
  516. t(p,64,0x4000003f);
  517. t(p,65,0x40000040);
  518. t(p,66,0x40000041);
  519. t(p,67,0x40000042);
  520. t(p,68,0x40000043);
  521. t(p,69,0x40000044);
  522. t(p,70,0x40000045);
  523. t(p,71,0x40000046);
  524. t(p,72,0x40000047);
  525. t(p,73,0x40000048);
  526. t(p,74,0x40000049);
  527. t(p,75,0x4000004a);
  528. t(p,76,0x4000004b);
  529. t(p,77,0x4000004c);
  530. t(p,78,0x4000004d);
  531. t(p,79,0x4000004e);
  532. t(p,80,0x4000004f);
  533. t(p,81,0x40000050);
  534. t(p,82,0x40000051);
  535. t(p,83,0x40000052);
  536. t(p,84,0x40000053);
  537. t(p,85,0x40000054);
  538. t(p,86,0x40000055);
  539. t(p,87,0x40000056);
  540. t(p,88,0x40000057);
  541. t(p,89,0x40000058);
  542. t(p,90,0x40000059);
  543. t(p,91,0x4000005a);
  544. t(p,92,0x4000005b);
  545. t(p,93,0x4000005c);
  546. t(p,0,0x40000000);
  547. t(p,1,0x40000001);
  548. t(p,2,0x40000002);
  549. t(p,3,0x40000003);
  550. t(p,4,0x40000004);
  551. t(p,5,0x40000005);
  552. t(p,6,0x40000006);
  553. t(p,7,0x40000007);
  554. t(p,8,0x40000008);
  555. t(p,9,0x40000009);
  556. t(p,10,0x4000000a);
  557. t(p,11,0x4000000b);
  558. t(p,12,0x4000000c);
  559. t(p,13,0x4000000d);
  560. t(p,14,0x4000000e);
  561. t(p,15,0x4000000f);
  562. t(p,16,0x40000010);
  563. t(p,17,0x40000011);
  564. t(p,18,0x40000012);
  565. t(p,19,0x40000013);
  566. t(p,20,0x40000014);
  567. t(p,21,0x40000015);
  568. t(p,22,0x40000016);
  569. t(p,23,0x40000017);
  570. t(p,24,0x40000018);
  571. t(p,25,0x40000019);
  572. t(p,26,0x4000001a);
  573. t(p,27,0x4000001b);
  574. t(p,28,0x4000001c);
  575. t(p,29,0x4000001d);
  576. }
  577. };
  578. CPPUNIT_TEST_SUITE_REGISTRATION( PacketTrackerTest );
  579. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PacketTrackerTest, "PacketTrackerTest" );
  580. #endif
  581. /*
  582. Crazy thoughts on network-wide flow control
  583. Avoid sending data that clashes with other outbound or inbound data
  584. is outbound really an issue?
  585. if only inbound, should be easier
  586. can have each inbound node police its own, for a start
  587. udplib already tries to do this
  588. when sending permission to send, best to pick someone that is not sending to anyone else
  589. udplib already tries to do this
  590. but it can still lead to idleness - id node 1 sending to node 2, and node2 to node 1, node3 can't find anyone idle.
  591. If you do need global:
  592. Every bit of data getting sent (perhaps over a certain size threshold?) gets permission from central traffic cop
  593. Outbound packet says source node, target node size
  594. Reply says source,target,size
  595. Cop allows immediately if nothing inflight between those pairs
  596. Cop assumes completion
  597. Cop redundancy
  598. - a backup cop is listening in?
  599. - use multicast for requests and replies?
  600. - no reply implies what?
  601. - backup cop just needs heartbeat from active cop
  602. - permission expires
  603. - multiple cops for blocks of targets?
  604. - but I want global view of who is sending
  605. */
  606. /* Simulating traffic flow
  607. N threads that simulate behaviour of agents
  608. fake write socket that
  609. - accepts data pushed to it
  610. - moves it to a read socket
  611. fake read socket that
  612. - accepts packets from write sockets
  613. - discards when full (but tells you)
  614. - delivers packets to consumer
  615. */
  616. #ifdef SOCKET_SIMULATION
  617. bool isUdpTestMode = false;
  618. bool udpTestUseUdpSockets = true;
  619. CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
  620. {
  621. return new CSimulatedQueueWriteSocket(ep);
  622. }
  623. size32_t CSimulatedQueueWriteSocket::write(void const* buf, size32_t size)
  624. {
  625. CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
  626. CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
  627. if (dest)
  628. dest->writeSimulatedPacket(buf, size);
  629. else
  630. {
  631. StringBuffer s;
  632. DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
  633. }
  634. return size;
  635. }
  636. std::map<SocketEndpoint, CSimulatedQueueReadSocket *> CSimulatedQueueReadSocket::allReaders;
  637. CriticalSection CSimulatedQueueReadSocket::allReadersCrit;
  638. CSimulatedQueueReadSocket::CSimulatedQueueReadSocket(const SocketEndpoint &_me) : me(_me)
  639. {
  640. StringBuffer s;
  641. DBGLOG("Creating fake socket %s", me.getUrlStr(s).str());
  642. CriticalBlock b(allReadersCrit);
  643. allReaders[me] = this;
  644. }
  645. CSimulatedQueueReadSocket::~CSimulatedQueueReadSocket()
  646. {
  647. StringBuffer s;
  648. DBGLOG("Closing fake socket %s", me.getUrlStr(s).str());
  649. CriticalBlock b(allReadersCrit);
  650. allReaders.erase(me);
  651. }
  652. CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::udp_create(const SocketEndpoint &_me)
  653. {
  654. return new CSimulatedQueueReadSocket(_me);
  655. }
  656. CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
  657. {
  658. CriticalBlock b(allReadersCrit);
  659. return allReaders[ep];
  660. }
  661. void CSimulatedQueueReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
  662. {
  663. {
  664. CriticalBlock b(crit);
  665. if (size+used > max)
  666. {
  667. DBGLOG("Lost packet");
  668. return;
  669. }
  670. packetSizes.push(size);
  671. packets.push(memcpy(malloc(size), buf, size));
  672. used += size;
  673. }
  674. // StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
  675. avail.signal();
  676. }
  677. void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
  678. {
  679. unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
  680. readtms(buf, min_size, max_size, size_read, tms);
  681. }
  682. void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  683. unsigned timeout)
  684. {
  685. size_read = 0;
  686. if (!timeout || wait_read(timeout))
  687. {
  688. CriticalBlock b(crit);
  689. const void *thisData = packets.front();
  690. unsigned thisSize = packetSizes.front();
  691. if (thisSize > max_size)
  692. {
  693. assert(false);
  694. UNIMPLEMENTED; // Partial packet read not supported yet - add if needed
  695. }
  696. else
  697. {
  698. packets.pop();
  699. packetSizes.pop();
  700. used -= thisSize;
  701. }
  702. size_read = thisSize;
  703. memcpy(buf, thisData, thisSize);
  704. free((void *) thisData);
  705. }
  706. else
  707. throw makeStringException(JSOCKERR_timeout_expired, "");
  708. }
  709. int CSimulatedQueueReadSocket::wait_read(unsigned timeout)
  710. {
  711. bool ret = avail.wait(timeout);
  712. return ret;
  713. }
  714. //------------------------------------------------------------------------------------------------------------------------------
  715. //Hash the ip and port and map that to a local port - complain if more than one combination is hashed to the same port.
  716. constexpr unsigned basePort = 9010;
  717. constexpr unsigned maxPorts = 980;
  718. static std::atomic_bool connected[maxPorts];
  719. unsigned getMappedSocketPort(const SocketEndpoint & ep)
  720. {
  721. unsigned hash = ep.hash(0x31415926);
  722. return basePort + hash % maxPorts;
  723. }
  724. CSimulatedUdpReadSocket::CSimulatedUdpReadSocket(const SocketEndpoint &_me)
  725. {
  726. port = getMappedSocketPort(_me);
  727. if (connected[port-basePort].exchange(true))
  728. throw makeStringException(0, "Two ip/ports mapped to the same port - improve the hash (or change maxPorts)!");
  729. realSocket.setown(ISocket::udp_create(port));
  730. }
  731. CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket()
  732. {
  733. connected[port-basePort].exchange(false);
  734. }
  735. size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
  736. void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
  737. void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
  738. {
  739. realSocket->read(buf, min_size, max_size, size_read, timeoutsecs);
  740. }
  741. void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
  742. {
  743. realSocket->readtms(buf, min_size, max_size, size_read, timeout);
  744. }
  745. int CSimulatedUdpReadSocket::wait_read(unsigned timeout)
  746. {
  747. return realSocket->wait_read(timeout);
  748. }
  749. void CSimulatedUdpReadSocket::close()
  750. {
  751. realSocket->close();
  752. }
  753. CSimulatedUdpReadSocket* CSimulatedUdpReadSocket::udp_create(const SocketEndpoint &_me) { return new CSimulatedUdpReadSocket(_me); }
  754. CSimulatedUdpWriteSocket::CSimulatedUdpWriteSocket( const SocketEndpoint &ep)
  755. {
  756. unsigned port = getMappedSocketPort(ep);
  757. SocketEndpoint localEp(port, queryLocalIP());
  758. realSocket.setown(ISocket::udp_connect(localEp));
  759. }
  760. CSimulatedUdpWriteSocket* CSimulatedUdpWriteSocket::udp_connect( const SocketEndpoint &ep) { return new CSimulatedUdpWriteSocket(ep); }
  761. size32_t CSimulatedUdpWriteSocket::write(void const* buf, size32_t size)
  762. {
  763. return realSocket->write(buf, size);
  764. }
  765. void CSimulatedUdpWriteSocket::set_send_buffer_size(size32_t sz)
  766. {
  767. realSocket->set_send_buffer_size(sz);
  768. }
  769. void CSimulatedUdpWriteSocket::close()
  770. {
  771. realSocket->close();
  772. }
  773. #endif