udpsha.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "jsocket.hpp"
  16. #include "jlog.hpp"
  17. #include "roxie.hpp"
  18. #include "roxiemem.hpp"
  19. #include "portlist.h"
  20. #ifdef _WIN32
  21. #include <winsock2.h>
  22. #else
  23. #include <sys/socket.h>
  24. #endif
  25. using roxiemem::DataBuffer;
  26. using roxiemem::IDataBufferManager;
  27. IDataBufferManager *bufferManager;
  28. bool udpTraceFlow = false;
  29. bool udpTraceTimeouts = false;
  30. unsigned udpTraceLevel = 0;
  31. unsigned udpFlowSocketsSize = 131072;
  32. unsigned udpLocalWriteSocketSize = 1024000;
  33. unsigned udpStatsReportInterval = 60000;
  34. unsigned multicastTTL = 1;
  35. MODULE_INIT(INIT_PRIORITY_STANDARD)
  36. {
  37. bufferManager = roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE);
  38. return true;
  39. }
  40. MODULE_EXIT()
  41. {
  42. bufferManager->Release();
  43. }
  44. const IpAddress ServerIdentifier::getIpAddress() const
  45. {
  46. IpAddress ret;
  47. ret.setIP4(netAddress);
  48. return ret;
  49. }
  50. bool ServerIdentifier::isMe() const
  51. {
  52. return *this==myNode;
  53. }
  54. ServerIdentifier myNode;
  55. //---------------------------------------------------------------------------------------------
  56. void queue_t::set_queue_size(unsigned _limit)
  57. {
  58. limit = _limit;
  59. }
  60. queue_t::queue_t(unsigned _limit)
  61. {
  62. set_queue_size(_limit);
  63. }
  64. queue_t::~queue_t()
  65. {
  66. while (head)
  67. {
  68. auto p = head;
  69. head = head->msgNext;
  70. ::Release(p);
  71. }
  72. }
  73. unsigned queue_t::available()
  74. {
  75. CriticalBlock b(c_region);
  76. if (count < limit)
  77. return limit - count;
  78. return 0;
  79. }
  80. int queue_t::free_slots()
  81. {
  82. int res=0;
  83. while (res <= 0)
  84. {
  85. c_region.enter();
  86. res = limit - count;
  87. if (res <= 0)
  88. signal_free_sl++;
  89. c_region.leave();
  90. if (res <= 0)
  91. {
  92. while (!free_sl.wait(3000))
  93. {
  94. if (udpTraceLevel >= 1)
  95. DBGLOG("queue_t::free_slots blocked for 3 seconds waiting for free_sl semaphore");
  96. }
  97. }
  98. }
  99. return res;
  100. }
  101. void queue_t::interrupt()
  102. {
  103. data_avail.interrupt();
  104. }
  105. void queue_t::pushOwn(DataBuffer *buf)
  106. {
  107. // Could probably be done lock-free, which given one thread using this is high priority might avoid some
  108. // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
  109. assert(!buf->msgNext);
  110. {
  111. CriticalBlock b(c_region);
  112. if (tail)
  113. {
  114. assert(head);
  115. assert(!tail->msgNext);
  116. tail->msgNext = buf;
  117. }
  118. else
  119. {
  120. assert(!head);
  121. head = buf;
  122. }
  123. tail = buf;
  124. count++;
  125. #ifdef _DEBUG
  126. if (count > limit)
  127. DBGLOG("queue_t::pushOwn set count to %u", count);
  128. #endif
  129. }
  130. data_avail.signal();
  131. }
  132. DataBuffer *queue_t::pop(bool block)
  133. {
  134. if (!data_avail.wait(block ? INFINITE : 0))
  135. return nullptr;
  136. DataBuffer *ret = nullptr;
  137. unsigned signalFreeSlots = 0;
  138. {
  139. CriticalBlock b(c_region);
  140. if (unlikely(!count))
  141. return nullptr;
  142. count--;
  143. ret = head;
  144. head = ret->msgNext;
  145. if (!head)
  146. {
  147. assert(!count);
  148. tail = nullptr;
  149. }
  150. if (count < limit && signal_free_sl)
  151. {
  152. signal_free_sl--;
  153. signalFreeSlots++;
  154. }
  155. }
  156. ret->msgNext = nullptr;
  157. if (signalFreeSlots)
  158. free_sl.signal(signalFreeSlots);
  159. return ret;
  160. }
  161. unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
  162. {
  163. unsigned removed = 0;
  164. unsigned signalFreeSlots = 0;
  165. {
  166. CriticalBlock b(c_region);
  167. if (count)
  168. {
  169. DataBuffer *prev = nullptr;
  170. DataBuffer *finger = head;
  171. while (finger)
  172. {
  173. if (!key || !pkCmpFn || pkCmpFn((const void*) finger, key))
  174. {
  175. auto temp = finger;
  176. finger = finger->msgNext;
  177. if (prev==nullptr)
  178. {
  179. assert(head==temp);
  180. head = finger;
  181. }
  182. else
  183. prev->msgNext = finger;
  184. if (temp==tail)
  185. tail = prev;
  186. ::Release(temp);
  187. count--;
  188. if (count < limit && signal_free_sl)
  189. {
  190. signal_free_sl--;
  191. signalFreeSlots++;
  192. }
  193. removed++;
  194. }
  195. else
  196. {
  197. prev = finger;
  198. finger = finger->msgNext;
  199. }
  200. }
  201. }
  202. }
  203. if (signalFreeSlots)
  204. free_sl.signal(signalFreeSlots);
  205. return removed;
  206. }
  207. bool queue_t::dataQueued(const void *key, PKT_CMP_FUN pkCmpFn)
  208. {
  209. CriticalBlock b(c_region);
  210. DataBuffer *finger = head;
  211. while (finger)
  212. {
  213. if (pkCmpFn((const void*) finger, key))
  214. return true;
  215. finger = finger->msgNext;
  216. }
  217. return false;
  218. }
  219. #ifndef _WIN32
  220. #define HOSTENT hostent
  221. #include <netdb.h>
  222. #endif
  223. int check_set(const char *path, int value)
  224. {
  225. #ifdef __linux__
  226. FILE *f = fopen(path,"r");
  227. char res[32];
  228. char *r = 0;
  229. int si = 0;
  230. if (f) {
  231. r = fgets(res, sizeof(res), f);
  232. fclose(f);
  233. }
  234. if (r)
  235. si = atoi(r);
  236. if (!si)
  237. {
  238. OWARNLOG("WARNING: Failed to read value for %s", path);
  239. return 0;
  240. }
  241. else if (si<value)
  242. return -1;
  243. #endif
  244. return 0;
  245. }
  246. int check_max_socket_read_buffer(int size) {
  247. return check_set("/proc/sys/net/core/rmem_max", size);
  248. }
  249. int check_max_socket_write_buffer(int size) {
  250. return check_set("/proc/sys/net/core/wmem_max", size);
  251. }
  252. #if defined( __linux__) || defined(__APPLE__)
  253. void setLinuxThreadPriority(int level)
  254. {
  255. pthread_t self = pthread_self();
  256. int policy;
  257. sched_param param;
  258. int rc;
  259. if (( rc = pthread_getschedparam(self, &policy, &param)) != 0)
  260. DBGLOG("pthread_getschedparam error: %d", rc);
  261. if (level < 0)
  262. UNIMPLEMENTED;
  263. else if (!level)
  264. {
  265. param.sched_priority = 0;
  266. policy = SCHED_OTHER;
  267. }
  268. else
  269. {
  270. policy = SCHED_RR;
  271. param.sched_priority = level;
  272. }
  273. if(( rc = pthread_setschedparam(self, policy, &param)) != 0)
  274. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "i TID=%i", rc, policy, param.sched_priority, (unsigned __int64) self, threadLogID());
  275. else
  276. DBGLOG("priority set id=%" I64F "i policy=%i pri=%i TID=%i", (unsigned __int64) self, policy, param.sched_priority, threadLogID());
  277. }
  278. #endif
  279. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
  280. {
  281. if (bufferManager)
  282. bufferManager->poolStats(memStats);
  283. }
  284. RelaxedAtomic<unsigned> packetsOOO;
  285. bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
  286. {
  287. bool resent = false;
  288. sequence_t seq = hdr.sendSeq;
  289. if (hdr.pktSeq & UDP_PACKET_RESENT)
  290. resent = true;
  291. // Four cases: less than lastUnseen, equal to, within TRACKER_BITS of, or higher
  292. // Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
  293. bool duplicate = false;
  294. unsigned delta = seq - base;
  295. if (udpTraceLevel > 5)
  296. {
  297. DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
  298. dump();
  299. }
  300. if (delta < TRACKER_BITS)
  301. {
  302. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  303. unsigned bit = seq % 64;
  304. __uint64 bitm = U64C(1)<<bit;
  305. duplicate = (seen[idx] & bitm) != 0;
  306. seen[idx] |= bitm;
  307. if (seq==base)
  308. {
  309. while (seen[idx] & bitm)
  310. {
  311. // Important to update in this order, so that during the window where they are inconsistent we have
  312. // false negatives rather than false positives
  313. seen[idx] &= ~bitm;
  314. base++;
  315. idx = (base / 64) % TRACKER_DWORDS;
  316. bit = base % 64;
  317. bitm = U64C(1)<<bit;
  318. }
  319. }
  320. // calculate new hwm, with some care for wrapping
  321. if ((int) (seq - hwm) > 0)
  322. hwm = seq;
  323. else if (!resent)
  324. packetsOOO++;
  325. }
  326. else if (resent)
  327. // Don't treat a resend that goes out of range as indicative of a restart - it probably just means
  328. // that the resend was not needed and the original moved things on when it arrived
  329. duplicate = true;
  330. else
  331. {
  332. // We've gone forwards too far to track, or backwards because server restarted
  333. // We have taken steps to try to avoid the former...
  334. // In theory could try to preserve SOME information in the former case, but as it shouldn't happen, can we be bothered?
  335. #ifdef _DEBUG
  336. if (udpResendLostPackets)
  337. {
  338. DBGLOG("Received packet %" SEQF "u will cause loss of information in PacketTracker", seq);
  339. dump();
  340. }
  341. //assert(false);
  342. #endif
  343. memset(seen, 0, sizeof(seen));
  344. base = seq+1;
  345. hwm = seq;
  346. }
  347. return duplicate;
  348. }
  349. const PacketTracker PacketTracker::copy() const
  350. {
  351. // This is called within a critical section. Would be better if we could avoid having to do so,
  352. // but we want to be able to read a consistent set of values
  353. PacketTracker ret;
  354. ret.base = base;
  355. ret.hwm = hwm;
  356. memcpy(ret.seen, seen, sizeof(seen));
  357. return ret;
  358. }
  359. bool PacketTracker::hasSeen(sequence_t seq) const
  360. {
  361. // Accessed only on sender side where these are not modified, so no need for locking
  362. // Careful about wrapping!
  363. unsigned delta = seq - base;
  364. if (udpTraceLevel > 5)
  365. {
  366. DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
  367. dump();
  368. }
  369. if (delta < TRACKER_BITS)
  370. {
  371. unsigned idx = (seq / 64) % TRACKER_DWORDS;
  372. unsigned bit = seq % 64;
  373. return (seen[idx] & (U64C(1)<<bit)) != 0;
  374. }
  375. else if (delta > INT_MAX) // Or we could just make delta a signed int? But code above would have to check >0
  376. return true;
  377. else
  378. return false;
  379. }
  380. bool PacketTracker::canRecord(sequence_t seq) const
  381. {
  382. // Careful about wrapping!
  383. unsigned delta = seq - base;
  384. if (udpTraceLevel > 5)
  385. {
  386. DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
  387. dump();
  388. }
  389. return (delta < TRACKER_BITS);
  390. }
  391. bool PacketTracker::hasGaps() const
  392. {
  393. return base!=hwm+1;
  394. }
  395. void PacketTracker::dump() const
  396. {
  397. DBGLOG("PacketTracker base=%" SEQF "u, hwm=%" SEQF "u, seen[0]=%" I64F "x", base, hwm, seen[0]);
  398. }
  399. #ifdef _USE_CPPUNIT
  400. #include "unittests.hpp"
  401. class PacketTrackerTest : public CppUnit::TestFixture
  402. {
  403. CPPUNIT_TEST_SUITE(PacketTrackerTest);
  404. CPPUNIT_TEST(testNoteSeen);
  405. CPPUNIT_TEST(testReplay);
  406. CPPUNIT_TEST_SUITE_END();
  407. void testNoteSeen()
  408. {
  409. PacketTracker p;
  410. UdpPacketHeader hdr;
  411. hdr.pktSeq = 0;
  412. // Some simple tests
  413. CPPUNIT_ASSERT(!p.hasSeen(0));
  414. CPPUNIT_ASSERT(!p.hasSeen(1));
  415. hdr.sendSeq = 0;
  416. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  417. CPPUNIT_ASSERT(p.hasSeen(0));
  418. CPPUNIT_ASSERT(!p.hasSeen(1));
  419. CPPUNIT_ASSERT(!p.hasSeen(2000));
  420. CPPUNIT_ASSERT(!p.hasSeen(2001));
  421. hdr.pktSeq = UDP_PACKET_RESENT;
  422. CPPUNIT_ASSERT(p.noteSeen(hdr));
  423. hdr.pktSeq = 0;
  424. hdr.sendSeq = 2000;
  425. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  426. CPPUNIT_ASSERT(p.hasSeen(0));
  427. CPPUNIT_ASSERT(p.hasSeen(1));
  428. CPPUNIT_ASSERT(p.hasSeen(2000));
  429. CPPUNIT_ASSERT(!p.hasSeen(2001));
  430. hdr.sendSeq = 0;
  431. CPPUNIT_ASSERT(!p.noteSeen(hdr));
  432. CPPUNIT_ASSERT(p.hasSeen(0));
  433. CPPUNIT_ASSERT(!p.hasSeen(1));
  434. CPPUNIT_ASSERT(!p.hasSeen(2000));
  435. CPPUNIT_ASSERT(!p.hasSeen(2001));
  436. PacketTracker p2;
  437. hdr.sendSeq = 1;
  438. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  439. CPPUNIT_ASSERT(!p2.hasSeen(0));
  440. CPPUNIT_ASSERT(p2.hasSeen(1));
  441. hdr.sendSeq = TRACKER_BITS-1; // This is the highest value we can record without losing information
  442. CPPUNIT_ASSERT(!p2.noteSeen(hdr));
  443. CPPUNIT_ASSERT(!p2.hasSeen(0));
  444. CPPUNIT_ASSERT(p2.hasSeen(1));
  445. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  446. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS));
  447. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  448. hdr.sendSeq = TRACKER_BITS;
  449. p2.noteSeen(hdr);
  450. CPPUNIT_ASSERT(p2.hasSeen(0));
  451. CPPUNIT_ASSERT(p2.hasSeen(1));
  452. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
  453. CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS));
  454. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
  455. CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+2));
  456. }
  457. void t(PacketTracker &p, sequence_t seq, unsigned pseq)
  458. {
  459. UdpPacketHeader hdr;
  460. hdr.sendSeq = seq;
  461. hdr.pktSeq = pseq;
  462. if (seq==29)
  463. CPPUNIT_ASSERT(p.noteSeen(hdr) == false);
  464. else
  465. p.noteSeen(hdr);
  466. }
  467. void testReplay()
  468. {
  469. PacketTracker p;
  470. t(p,1,0x1);
  471. t(p,2,0x2);
  472. t(p,3,0x3);
  473. t(p,4,0x4);
  474. t(p,5,0x5);
  475. t(p,6,0x6);
  476. t(p,7,0x7);
  477. t(p,8,0x8);
  478. t(p,9,0x9);
  479. t(p,11,0xb);
  480. t(p,12,0xc);
  481. t(p,13,0xd);
  482. t(p,14,0xe);
  483. t(p,15,0xf);
  484. t(p,16,0x10);
  485. t(p,17,0x11);
  486. t(p,18,0x12);
  487. t(p,19,0x13);
  488. t(p,20,0x14);
  489. t(p,21,0x15);
  490. t(p,22,0x16);
  491. t(p,23,0x17);
  492. t(p,24,0x18);
  493. t(p,25,0x19);
  494. t(p,26,0x1a);
  495. t(p,27,0x1b);
  496. t(p,28,0x1c);
  497. t(p,50,0x40000032);
  498. t(p,51,0x40000033);
  499. t(p,52,0x40000034);
  500. t(p,53,0x40000035);
  501. t(p,54,0x40000036);
  502. t(p,55,0x40000037);
  503. t(p,56,0x40000038);
  504. t(p,57,0x40000039);
  505. t(p,58,0x4000003a);
  506. t(p,59,0x4000003b);
  507. t(p,60,0x4000003c);
  508. t(p,61,0x4000003d);
  509. t(p,62,0xc0000000);
  510. t(p,63,0x4000003e);
  511. t(p,64,0x4000003f);
  512. t(p,65,0x40000040);
  513. t(p,66,0x40000041);
  514. t(p,67,0x40000042);
  515. t(p,68,0x40000043);
  516. t(p,69,0x40000044);
  517. t(p,70,0x40000045);
  518. t(p,71,0x40000046);
  519. t(p,72,0x40000047);
  520. t(p,73,0x40000048);
  521. t(p,74,0x40000049);
  522. t(p,75,0x4000004a);
  523. t(p,76,0x4000004b);
  524. t(p,77,0x4000004c);
  525. t(p,78,0x4000004d);
  526. t(p,79,0x4000004e);
  527. t(p,80,0x4000004f);
  528. t(p,81,0x40000050);
  529. t(p,82,0x40000051);
  530. t(p,83,0x40000052);
  531. t(p,84,0x40000053);
  532. t(p,85,0x40000054);
  533. t(p,86,0x40000055);
  534. t(p,87,0x40000056);
  535. t(p,88,0x40000057);
  536. t(p,89,0x40000058);
  537. t(p,90,0x40000059);
  538. t(p,91,0x4000005a);
  539. t(p,92,0x4000005b);
  540. t(p,93,0x4000005c);
  541. t(p,0,0x40000000);
  542. t(p,1,0x40000001);
  543. t(p,2,0x40000002);
  544. t(p,3,0x40000003);
  545. t(p,4,0x40000004);
  546. t(p,5,0x40000005);
  547. t(p,6,0x40000006);
  548. t(p,7,0x40000007);
  549. t(p,8,0x40000008);
  550. t(p,9,0x40000009);
  551. t(p,10,0x4000000a);
  552. t(p,11,0x4000000b);
  553. t(p,12,0x4000000c);
  554. t(p,13,0x4000000d);
  555. t(p,14,0x4000000e);
  556. t(p,15,0x4000000f);
  557. t(p,16,0x40000010);
  558. t(p,17,0x40000011);
  559. t(p,18,0x40000012);
  560. t(p,19,0x40000013);
  561. t(p,20,0x40000014);
  562. t(p,21,0x40000015);
  563. t(p,22,0x40000016);
  564. t(p,23,0x40000017);
  565. t(p,24,0x40000018);
  566. t(p,25,0x40000019);
  567. t(p,26,0x4000001a);
  568. t(p,27,0x4000001b);
  569. t(p,28,0x4000001c);
  570. t(p,29,0x4000001d);
  571. }
  572. };
  573. CPPUNIT_TEST_SUITE_REGISTRATION( PacketTrackerTest );
  574. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PacketTrackerTest, "PacketTrackerTest" );
  575. #endif
  576. /*
  577. Crazy thoughts on network-wide flow control
  578. Avoid sending data that clashes with other outbound or inbound data
  579. is outbound really an issue?
  580. if only inbound, should be easier
  581. can have each inbound node police its own, for a start
  582. udplib already tries to do this
  583. when sending permission to send, best to pick someone that is not sending to anyone else
  584. udplib already tries to do this
  585. but it can still lead to idleness - id node 1 sending to node 2, and node2 to node 1, node3 can't find anyone idle.
  586. If you do need global:
  587. Every bit of data getting sent (perhaps over a certain size threshold?) gets permission from central traffic cop
  588. Outbound packet says source node, target node size
  589. Reply says source,target,size
  590. Cop allows immediately if nothing inflight between those pairs
  591. Cop assumes completion
  592. Cop redundancy
  593. - a backup cop is listening in?
  594. - use multicast for requests and replies?
  595. - no reply implies what?
  596. - backup cop just needs heartbeat from active cop
  597. - permission expires
  598. - multiple cops for blocks of targets?
  599. - but I want global view of who is sending
  600. */
  601. /* Simulating traffic flow
  602. N threads that simulate behaviour of agents
  603. fake write socket that
  604. - accepts data pushed to it
  605. - moves it to a read socket
  606. fake read socket that
  607. - accepts packets from write sockets
  608. - discards when full (but tells you)
  609. - delivers packets to consumer
  610. */
  611. #ifdef SOCKET_SIMULATION
  612. bool isUdpTestMode = false;
  613. CSimulatedWriteSocket* CSimulatedWriteSocket::udp_connect(const SocketEndpoint &ep)
  614. {
  615. return new CSimulatedWriteSocket(ep);
  616. }
  617. size32_t CSimulatedWriteSocket::write(void const* buf, size32_t size)
  618. {
  619. CriticalBlock b(CSimulatedReadSocket::allReadersCrit);
  620. CSimulatedReadSocket *dest = CSimulatedReadSocket::connectSimulatedSocket(destEp);
  621. if (dest)
  622. dest->writeSimulatedPacket(buf, size);
  623. else
  624. {
  625. StringBuffer s;
  626. DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
  627. }
  628. return size;
  629. }
  630. std::map<SocketEndpoint, CSimulatedReadSocket *> CSimulatedReadSocket::allReaders;
  631. CriticalSection CSimulatedReadSocket::allReadersCrit;
  632. CSimulatedReadSocket::CSimulatedReadSocket(const SocketEndpoint &_me) : me(_me)
  633. {
  634. StringBuffer s;
  635. DBGLOG("Creating fake socket %s", me.getUrlStr(s).str());
  636. CriticalBlock b(allReadersCrit);
  637. allReaders[me] = this;
  638. }
  639. CSimulatedReadSocket::~CSimulatedReadSocket()
  640. {
  641. StringBuffer s;
  642. DBGLOG("Closing fake socket %s", me.getUrlStr(s).str());
  643. CriticalBlock b(allReadersCrit);
  644. allReaders.erase(me);
  645. }
  646. CSimulatedReadSocket* CSimulatedReadSocket::udp_create(const SocketEndpoint &_me)
  647. {
  648. return new CSimulatedReadSocket(_me);
  649. }
  650. CSimulatedReadSocket* CSimulatedReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
  651. {
  652. CriticalBlock b(allReadersCrit);
  653. return allReaders[ep];
  654. }
  655. void CSimulatedReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
  656. {
  657. {
  658. CriticalBlock b(crit);
  659. if (size+used > max)
  660. {
  661. DBGLOG("Lost packet");
  662. return;
  663. }
  664. packetSizes.push(size);
  665. packets.push(memcpy(malloc(size), buf, size));
  666. used += size;
  667. }
  668. // StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
  669. avail.signal();
  670. }
  671. void CSimulatedReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
  672. {
  673. unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
  674. readtms(buf, min_size, max_size, size_read, tms);
  675. }
  676. void CSimulatedReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  677. unsigned timeout)
  678. {
  679. size_read = 0;
  680. if (!timeout || wait_read(timeout))
  681. {
  682. CriticalBlock b(crit);
  683. const void *thisData = packets.front();
  684. unsigned thisSize = packetSizes.front();
  685. if (thisSize > max_size)
  686. {
  687. assert(false);
  688. UNIMPLEMENTED; // Partial packet read not supported yet - add if needed
  689. }
  690. else
  691. {
  692. packets.pop();
  693. packetSizes.pop();
  694. used -= thisSize;
  695. }
  696. size_read = thisSize;
  697. memcpy(buf, thisData, thisSize);
  698. free((void *) thisData);
  699. }
  700. else
  701. throw makeStringException(JSOCKERR_timeout_expired, "");
  702. }
  703. int CSimulatedReadSocket::wait_read(unsigned timeout)
  704. {
  705. bool ret = avail.wait(timeout);
  706. return ret;
  707. }
  708. #ifdef _USE_CPPUNIT
  709. class SimulatedUdpStressTest : public CppUnit::TestFixture
  710. {
  711. CPPUNIT_TEST_SUITE(SimulatedUdpStressTest);
  712. CPPUNIT_TEST(simulateTraffic);
  713. CPPUNIT_TEST_SUITE_END();
  714. Owned<IDataBufferManager> dbm;
  715. bool initialized = false;
  716. void testInit()
  717. {
  718. if (!initialized)
  719. {
  720. udpTraceLevel = 1;
  721. udpTraceTimeouts = true;
  722. udpResendLostPackets = true;
  723. udpRequestToSendTimeout = 10000;
  724. udpRequestToSendAckTimeout = 10000;
  725. isUdpTestMode = true;
  726. roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
  727. dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
  728. initialized = true;
  729. }
  730. }
  731. void simulateTraffic()
  732. {
  733. testInit();
  734. myNode.setIp(IpAddress("1.2.3.4"));
  735. Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 2, 2, false);
  736. printf("Start test\n");
  737. asyncFor(20, 20, [](unsigned i)
  738. {
  739. unsigned header = 0;
  740. IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
  741. // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
  742. Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 100, 3, pretendIP, nullptr, false);
  743. Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
  744. for (unsigned i = 0; i < 10000; i++)
  745. {
  746. void *buf = mp->getBuffer(500, false);
  747. memset(buf, i, 500);
  748. mp->putBuffer(buf, 500, false);
  749. }
  750. mp->flush();
  751. Sleep(1000);
  752. });
  753. printf("End test\n");
  754. }
  755. };
  756. CPPUNIT_TEST_SUITE_REGISTRATION( SimulatedUdpStressTest );
  757. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( SimulatedUdpStressTest, "SimulatedUdpStressTest" );
  758. #endif
  759. #endif