udptrr.cpp 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <string>
  14. #include <map>
  15. #include <queue>
  16. #include "jthread.hpp"
  17. #include "jlog.hpp"
  18. #include "jisem.hpp"
  19. #include "jsocket.hpp"
  20. #include "jencrypt.hpp"
  21. #include "udplib.hpp"
  22. #include "udptrr.hpp"
  23. #include "udptrs.hpp"
  24. #include "udpipmap.hpp"
  25. #include "udpmsgpk.hpp"
  26. #include "roxiemem.hpp"
  27. #include "roxie.hpp"
  28. #ifdef _WIN32
  29. #include <io.h>
  30. #include <winsock2.h>
  31. #else
  32. #include <sys/socket.h>
  33. #include <sys/time.h>
  34. #include <sys/resource.h>
  35. #endif
  36. #include <thread>
  37. using roxiemem::DataBuffer;
  38. using roxiemem::IRowManager;
  39. unsigned udpRetryBusySenders = 0; // seems faster with 0 than 1 in my testing on small clusters and sustained throughput
  40. static byte key[32] = {
  41. 0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
  42. 0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
  43. };
  44. class CReceiveManager : implements IReceiveManager, public CInterface
  45. {
  46. /*
  47. * The ReceiveManager has several threads:
  48. * 1. receive_receive_flow (priority 3)
  49. * - waits for packets on flow port
  50. * - maintains list of nodes that have pending requests
  51. * - sends ok_to_send to one sender at a time
  52. * 2. receive_sniffer (default priority 3, configurable)
  53. * - waits for packets on sniffer port
  54. * - updates information about what other node are currently up to
  55. * - idea is to preferentially send "ok_to_send" to nodes that are not currently sending to someone else
  56. * - doesn't run if no multicast
  57. * - can I instead say "If I get a request to send and I'm sending to someone else, send a "later"?
  58. * 3. receive_data (priority 4)
  59. * - reads data packets off data socket
  60. * - runs at v. high priority
  61. * - used to have an option to perform collation on this thread but a bad idea:
  62. * - can block (ends up in memory manager via attachDataBuffer).
  63. * - Does not apply back pressure
  64. * - Just enqueues them. We don't give permission to send more than the queue can hold.
  65. * 4. PacketCollator (standard priority)
  66. * - dequeues packets
  67. * - collates packets
  68. *
  69. */
  70. /*
  71. * Handling lost packets
  72. *
  73. * We try to make lost packets unlikely by telling agents when to send (and making sure they don't send unless
  74. * there's a good chance that socket buffer will have room). But we can't legislate for network issues.
  75. *
  76. * What packets can be lost?
  77. * 1. Data packets - handled via retrying the whole query (not ideal). But will also leave the inflight count wrong. We correct it any time
  78. * the data socket times out but that may not be good enough.
  79. * 2. RequestToSend - the sender's resend thread checks periodically. There's a short initial timeout for getting a reply (either "request_received"
  80. * or "okToSend"), then a longer timeout for actually sending.
  81. * 3. OkToSend - there is a timeout after which the permission is considered invalid (based on how long it SHOULD take to send them).
  82. * The requestToSend retry mechanism would then make sure retried.
  83. * MORE - if I don't get a response from OkToSend I should assume lost and requeue it.
  84. * 4. complete - covered by same timeout as okToSend. A lost complete will mean incoming data to that node stalls for the duration of this timeout,
  85. * and will also leave inflight count out-of-whack.
  86. * 4. Sniffers - expire anyway
  87. *
  88. */
  89. class UdpSenderEntry // one per node in the system
  90. {
  91. // This is created the first time a message from a previously unseen IP arrives, and remains alive indefinitely
  92. // Note that the various members are accessed by different threads, but no member is accessed from more than one thread
  93. // (except where noted) so protection is not required
  94. // Note that UDP ordering rules mean we can't guarantee that we don't see a "request_to_send" for the next transfer before
  95. // we see the "complete" for the current one. Even if we were sure network stack would not reorder, these come from different
  96. // threads on the sender side and the order is not 100% guaranteed, so we need to cope with it.
  97. // We also need to recover gracefully (and preferably quickly) if any flow or data messages go missing. Currently the sender
  98. // will resend the rts if no ok_to_send within timeout, but there may be a better way?
  99. public:
  100. // Used only by receive_flow thread
  101. IpAddress dest;
  102. ISocket *flowSocket = nullptr;
  103. UdpSenderEntry *nextSender = nullptr; // Used to form list of all senders that have outstanding requests
  104. unsigned timeouts = 0;
  105. // Set by sniffer, used by receive_flow. But races are unimportant
  106. unsigned timeStamp = 0; // When it was marked busy (0 means not busy)
  107. UdpSenderEntry(const IpAddress &_dest, unsigned port) : dest(_dest)
  108. {
  109. SocketEndpoint ep(port, dest);
  110. flowSocket = ISocket::udp_connect(ep);
  111. }
  112. ~UdpSenderEntry()
  113. {
  114. if (flowSocket)
  115. {
  116. flowSocket->close();
  117. flowSocket->Release();
  118. }
  119. }
  120. inline void noteDone()
  121. {
  122. timeouts = 0;
  123. }
  124. inline bool retryOnTimeout()
  125. {
  126. ++timeouts;
  127. if (udpTraceLevel)
  128. {
  129. StringBuffer s;
  130. DBGLOG("Timed out %d times waiting for send_done from %s", timeouts, dest.getIpText(s).str());
  131. }
  132. if (udpMaxRetryTimedoutReqs && (timeouts >= udpMaxRetryTimedoutReqs))
  133. {
  134. if (udpTraceLevel)
  135. DBGLOG("Abandoning");
  136. timeouts = 0;
  137. return false;
  138. }
  139. else
  140. {
  141. if (udpTraceLevel)
  142. DBGLOG("Retrying");
  143. return true;
  144. }
  145. }
  146. void requestToSend(unsigned maxTransfer, const IpAddress &returnAddress)
  147. {
  148. try
  149. {
  150. UdpPermitToSendMsg msg;
  151. msg.cmd = maxTransfer ? flowType::ok_to_send : flowType::request_received;
  152. msg.destNode = returnAddress;
  153. msg.max_data = maxTransfer;
  154. if (udpTraceLevel > 1)
  155. {
  156. StringBuffer ipStr;
  157. DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, returnAddress.getIpText(ipStr).str());
  158. }
  159. flowSocket->write(&msg, sizeof(UdpPermitToSendMsg));
  160. }
  161. catch(IException *e)
  162. {
  163. StringBuffer d, s;
  164. DBGLOG("UdpReceiver: requestToSend failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
  165. e->Release();
  166. }
  167. }
  168. bool is_busy()
  169. {
  170. if (timeStamp)
  171. {
  172. unsigned now = msTick();
  173. if ((now - timeStamp) < 10)
  174. return true;
  175. // MORE - might be interesting to note how often this happens. Why 10 milliseconds?
  176. timeStamp = 0; // No longer considered busy
  177. }
  178. return false;
  179. }
  180. void update(bool busy)
  181. {
  182. if (busy)
  183. timeStamp = msTick();
  184. else
  185. timeStamp = 0;
  186. }
  187. };
  188. IpMapOf<UdpSenderEntry> sendersTable;
  189. class receive_sniffer : public Thread
  190. {
  191. ISocket *sniffer_socket;
  192. unsigned snifferPort;
  193. IpAddress snifferIP;
  194. CReceiveManager &parent;
  195. std::atomic<bool> running = { false };
  196. inline void update(const IpAddress &ip, bool busy)
  197. {
  198. if (udpTraceLevel > 5)
  199. {
  200. StringBuffer s;
  201. DBGLOG("UdpReceive: sniffer sets is_busy[%s] to %d", ip.getIpText(s).str(), busy);
  202. }
  203. parent.sendersTable[ip].update(busy);
  204. }
  205. public:
  206. receive_sniffer(CReceiveManager &_parent, unsigned _snifferPort, const IpAddress &_snifferIP)
  207. : Thread("udplib::receive_sniffer"), parent(_parent), snifferPort(_snifferPort), snifferIP(_snifferIP), running(false)
  208. {
  209. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  210. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  211. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  212. sniffer_socket->set_receive_buffer_size(udpFlowSocketsSize);
  213. if (udpTraceLevel)
  214. {
  215. StringBuffer ipStr;
  216. snifferIP.getIpText(ipStr);
  217. size32_t actualSize = sniffer_socket->get_receive_buffer_size();
  218. DBGLOG("UdpReceiver: receive_sniffer port open %s:%i sockbuffsize=%d actual %d", ipStr.str(), snifferPort, udpFlowSocketsSize, actualSize);
  219. }
  220. }
  221. ~receive_sniffer()
  222. {
  223. running = false;
  224. if (sniffer_socket) sniffer_socket->close();
  225. join();
  226. if (sniffer_socket) sniffer_socket->Release();
  227. }
  228. virtual int run()
  229. {
  230. DBGLOG("UdpReceiver: sniffer started");
  231. if (udpSnifferReadThreadPriority)
  232. {
  233. #ifdef __linux__
  234. setLinuxThreadPriority(udpSnifferReadThreadPriority);
  235. #else
  236. adjustPriority(1);
  237. #endif
  238. }
  239. while (running)
  240. {
  241. try
  242. {
  243. unsigned int res;
  244. sniff_msg msg;
  245. sniffer_socket->read(&msg, 1, sizeof(msg), res, 5);
  246. update(msg.nodeIp.getIpAddress(), msg.cmd == sniffType::busy);
  247. }
  248. catch (IException *e)
  249. {
  250. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  251. {
  252. StringBuffer s;
  253. DBGLOG("UdpReceiver: receive_sniffer::run read failed %s", e->errorMessage(s).str());
  254. MilliSleep(1000);
  255. }
  256. e->Release();
  257. }
  258. catch (...)
  259. {
  260. DBGLOG("UdpReceiver: receive_sniffer::run unknown exception port %u", parent.data_port);
  261. if (sniffer_socket) {
  262. sniffer_socket->Release();
  263. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  264. }
  265. MilliSleep(1000);
  266. }
  267. }
  268. return 0;
  269. }
  270. virtual void start()
  271. {
  272. if (udpSnifferEnabled)
  273. {
  274. running = true;
  275. Thread::start();
  276. }
  277. }
  278. };
  279. class receive_receive_flow : public Thread
  280. {
  281. CReceiveManager &parent;
  282. Owned<ISocket> flow_socket;
  283. const unsigned flow_port;
  284. const unsigned maxSlotsPerSender;
  285. std::atomic<bool> running = { false };
  286. UdpSenderEntry *pendingRequests = nullptr; // Head of list of people wanting permission to send
  287. UdpSenderEntry *lastPending = nullptr; // Tail of list
  288. UdpSenderEntry *currentRequester = nullptr; // Who currently has permission to send
  289. void enqueueRequest(UdpSenderEntry *requester)
  290. {
  291. if ((lastPending == requester) || (requester->nextSender != nullptr)) // Already on queue
  292. {
  293. if (udpTraceLevel > 1)
  294. {
  295. StringBuffer s;
  296. DBGLOG("UdpReceive: received duplicate request_to_send from node %s", requester->dest.getIpText(s).str());
  297. }
  298. // We can safely ignore these
  299. }
  300. else
  301. {
  302. // Chain it onto list
  303. if (pendingRequests != nullptr)
  304. lastPending->nextSender = requester;
  305. else
  306. pendingRequests = requester;
  307. lastPending = requester;
  308. }
  309. requester->requestToSend(0, myNode.getIpAddress()); // Acknowledge receipt of the request
  310. }
  311. unsigned okToSend(UdpSenderEntry *requester)
  312. {
  313. assert (!currentRequester);
  314. unsigned max_transfer = parent.free_slots();
  315. if (max_transfer > maxSlotsPerSender)
  316. max_transfer = maxSlotsPerSender;
  317. unsigned timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
  318. currentRequester = requester;
  319. requester->requestToSend(max_transfer, myNode.getIpAddress());
  320. return timeout;
  321. }
  322. bool noteDone(UdpSenderEntry *requester)
  323. {
  324. if (requester != currentRequester)
  325. {
  326. // This should not happen - I suppose it COULD if we receive a delayed message for a transfer we had earlier given up on.
  327. // Best response is to ignore it if so
  328. DBGLOG("Received completed message is not from current sender!");
  329. // MORE - should we set currentRequester NULL here? debatable.
  330. return false;
  331. }
  332. currentRequester->noteDone();
  333. currentRequester = nullptr;
  334. return true;
  335. }
  336. unsigned timedOut(UdpSenderEntry *requester)
  337. {
  338. // MORE - this will retry indefinitely if agent in question is dead
  339. currentRequester = nullptr;
  340. if (requester->retryOnTimeout())
  341. enqueueRequest(requester);
  342. if (pendingRequests)
  343. return sendNextOk();
  344. else
  345. return 5000;
  346. }
  347. unsigned sendNextOk()
  348. {
  349. assert(pendingRequests != nullptr);
  350. if (udpSnifferEnabled)
  351. {
  352. //find first non-busy sender, and move it to front of sendersTable request chain
  353. int retry = udpRetryBusySenders;
  354. UdpSenderEntry *finger = pendingRequests;
  355. UdpSenderEntry *prev = nullptr;
  356. for (;;)
  357. {
  358. if (finger->is_busy())
  359. {
  360. prev = finger;
  361. finger = finger->nextSender;
  362. if (finger==nullptr)
  363. {
  364. if (retry--)
  365. {
  366. if (udpTraceLevel > 4)
  367. DBGLOG("UdpReceive: All senders busy");
  368. MilliSleep(1);
  369. finger = pendingRequests;
  370. prev = nullptr;
  371. }
  372. else
  373. break; // give up and use first anyway
  374. }
  375. }
  376. else
  377. {
  378. if (finger != pendingRequests)
  379. {
  380. if (finger == lastPending)
  381. lastPending = prev;
  382. assert(prev != nullptr);
  383. prev->nextSender = finger->nextSender;
  384. finger->nextSender = pendingRequests;
  385. pendingRequests = finger;
  386. }
  387. break;
  388. }
  389. }
  390. }
  391. UdpSenderEntry *nextSender = pendingRequests;
  392. // remove from front of queue
  393. if (pendingRequests==lastPending)
  394. lastPending = nullptr;
  395. pendingRequests = nextSender->nextSender;
  396. nextSender->nextSender = nullptr;
  397. return okToSend(nextSender);
  398. }
  399. public:
  400. receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
  401. : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender)
  402. {
  403. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  404. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  405. flow_socket.setown(ISocket::udp_create(flow_port));
  406. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  407. size32_t actualSize = flow_socket->get_receive_buffer_size();
  408. DBGLOG("UdpReceiver: receive_receive_flow created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
  409. }
  410. ~receive_receive_flow()
  411. {
  412. running = false;
  413. if (flow_socket)
  414. flow_socket->close();
  415. join();
  416. }
  417. virtual void start()
  418. {
  419. running = true;
  420. Thread::start();
  421. }
  422. virtual int run() override
  423. {
  424. DBGLOG("UdpReceiver: receive_receive_flow started");
  425. #ifdef __linux__
  426. setLinuxThreadPriority(3);
  427. #else
  428. adjustPriority(1);
  429. #endif
  430. UdpRequestToSendMsg msg;
  431. unsigned timeoutExpires = msTick() + 5000;
  432. while (running)
  433. {
  434. try
  435. {
  436. const unsigned l = sizeof(msg);
  437. unsigned int res ;
  438. unsigned now = msTick();
  439. if (now >= timeoutExpires)
  440. {
  441. if (currentRequester)
  442. timeoutExpires = now + timedOut(currentRequester);
  443. else
  444. timeoutExpires = now + 5000;
  445. }
  446. else
  447. {
  448. flow_socket->readtms(&msg, l, l, res, timeoutExpires-now);
  449. now = msTick();
  450. assert(res==l);
  451. if (udpTraceLevel > 5)
  452. {
  453. StringBuffer ipStr;
  454. DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
  455. }
  456. UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode];
  457. switch (msg.cmd)
  458. {
  459. case flowType::request_to_send:
  460. if (pendingRequests || currentRequester)
  461. enqueueRequest(sender); // timeoutExpires does not change - there's still an active request
  462. else
  463. timeoutExpires = now + okToSend(sender);
  464. break;
  465. case flowType::send_completed:
  466. parent.inflight += msg.packets;
  467. if (noteDone(sender) && pendingRequests)
  468. timeoutExpires = now + sendNextOk();
  469. else
  470. timeoutExpires = now + 5000;
  471. break;
  472. case flowType::request_to_send_more:
  473. parent.inflight += msg.packets;
  474. if (noteDone(sender))
  475. {
  476. if (pendingRequests)
  477. {
  478. enqueueRequest(sender);
  479. timeoutExpires = now + sendNextOk();
  480. }
  481. else
  482. timeoutExpires = now + okToSend(sender);
  483. }
  484. break;
  485. default:
  486. DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
  487. }
  488. }
  489. }
  490. catch (IException *e)
  491. {
  492. // MORE - timeouts need some attention
  493. if (e->errorCode() == JSOCKERR_timeout_expired)
  494. {
  495. // A timeout implies that there is an active permission to send, but nothing has happened.
  496. // Could be a really busy (or crashed) agent, could be a lost packet
  497. if (currentRequester)
  498. timeoutExpires = msTick() + timedOut(currentRequester);
  499. }
  500. else if (running)
  501. {
  502. StringBuffer s;
  503. DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
  504. }
  505. e->Release();
  506. }
  507. catch (...)
  508. {
  509. DBGLOG("UdpReceiver: receive_receive_flow::run unknown exception");
  510. }
  511. }
  512. return 0;
  513. }
  514. };
  515. class receive_data : public Thread
  516. {
  517. CReceiveManager &parent;
  518. ISocket *receive_socket;
  519. std::atomic<bool> running = { false };
  520. Semaphore started;
  521. public:
  522. receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
  523. {
  524. unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2;
  525. if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
  526. if (check_max_socket_read_buffer(ip_buffer) < 0)
  527. throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
  528. receive_socket = ISocket::udp_create(parent.data_port);
  529. receive_socket->set_receive_buffer_size(ip_buffer);
  530. size32_t actualSize = receive_socket->get_receive_buffer_size();
  531. DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
  532. running = false;
  533. }
  534. virtual void start()
  535. {
  536. running = true;
  537. Thread::start();
  538. started.wait();
  539. }
  540. ~receive_data()
  541. {
  542. running = false;
  543. if (receive_socket)
  544. receive_socket->close();
  545. join();
  546. ::Release(receive_socket);
  547. }
  548. virtual int run()
  549. {
  550. DBGLOG("UdpReceiver: receive_data started");
  551. #ifdef __linux__
  552. setLinuxThreadPriority(4);
  553. #else
  554. adjustPriority(2);
  555. #endif
  556. DataBuffer *b = NULL;
  557. started.signal();
  558. MemoryBuffer encryptData;
  559. size32_t max_payload = DATA_PAYLOAD;
  560. void *encryptedBuffer = nullptr;
  561. if (parent.encrypted)
  562. {
  563. max_payload = DATA_PAYLOAD+16; // AES function may add up to 16 bytes of padding
  564. encryptedBuffer = encryptData.reserveTruncate(max_payload);
  565. }
  566. while (running)
  567. {
  568. try
  569. {
  570. unsigned int res;
  571. b = bufferManager->allocate();
  572. if (parent.encrypted)
  573. {
  574. receive_socket->read(encryptedBuffer, 1, max_payload, res, 5);
  575. res = aesDecrypt(key, sizeof(key), encryptedBuffer, res, b->data, DATA_PAYLOAD);
  576. }
  577. else
  578. receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
  579. parent.inflight--;
  580. // MORE - reset it to zero if we fail to read data, or if avail_read returns 0.
  581. UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
  582. assert(hdr.length == res && hdr.length > sizeof(hdr));
  583. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  584. {
  585. StringBuffer s;
  586. DBGLOG("UdpReceiver: %u bytes received, node=%s", res, hdr.node.getTraceText(s).str());
  587. }
  588. parent.input_queue->pushOwn(b);
  589. b = NULL;
  590. }
  591. catch (IException *e)
  592. {
  593. ::Release(b);
  594. b = NULL;
  595. if (udpTraceLevel > 1 && parent.inflight)
  596. {
  597. DBGLOG("resetting inflight to 0 (was %d)", parent.inflight.load(std::memory_order_relaxed));
  598. }
  599. parent.inflight = 0;
  600. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  601. {
  602. StringBuffer s;
  603. DBGLOG("UdpReceiver: receive_data::run read failed port=%u - Exp: %s", parent.data_port, e->errorMessage(s).str());
  604. MilliSleep(1000); // Give a chance for mem free
  605. }
  606. e->Release();
  607. }
  608. catch (...)
  609. {
  610. ::Release(b);
  611. b = NULL;
  612. DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
  613. MilliSleep(1000);
  614. }
  615. }
  616. ::Release(b);
  617. return 0;
  618. }
  619. };
  620. class CPacketCollator : public Thread
  621. {
  622. CReceiveManager &parent;
  623. public:
  624. CPacketCollator(CReceiveManager &_parent) : Thread("CPacketCollator"), parent(_parent) {}
  625. virtual int run()
  626. {
  627. DBGLOG("UdpReceiver: CPacketCollator::run");
  628. parent.collatePackets();
  629. return 0;
  630. }
  631. } collatorThread;
  632. friend class receive_receive_flow;
  633. friend class receive_send_flow;
  634. friend class receive_data;
  635. friend class ReceiveFlowManager;
  636. friend class receive_sniffer;
  637. queue_t *input_queue;
  638. int input_queue_size;
  639. receive_receive_flow *receive_flow;
  640. receive_data *data;
  641. receive_sniffer *sniffer;
  642. int receive_flow_port;
  643. int data_port;
  644. std::atomic<bool> running = { false };
  645. bool encrypted = false;
  646. typedef std::map<ruid_t, CMessageCollator*> uid_map;
  647. uid_map collators;
  648. SpinLock collatorsLock; // protects access to collators map
  649. // inflight is my best guess at how many packets may be sitting in socket buffers somewhere.
  650. // Incremented when I am notified about packets having been sent, decremented as they are read off the socket.
  651. std::atomic<int> inflight = {0};
  652. int free_slots()
  653. {
  654. int free = input_queue->free_slots(); // May block if collator thread is not removing from my queue fast enough
  655. // Ignore inflight if negative (can happen because we read some inflight before we see the send_done)
  656. int i = inflight.load(std::memory_order_relaxed);
  657. if (i < 0)
  658. {
  659. if (i < -input_queue->capacity())
  660. {
  661. if (udpTraceLevel)
  662. DBGLOG("UdpReceiver: ERROR: inflight has more packets in queue but not counted (%d) than queue capacity (%d)", -i, input_queue->capacity()); // Should never happen
  663. inflight = -input_queue->capacity();
  664. }
  665. i = 0;
  666. }
  667. else if (i >= free)
  668. {
  669. if ((i > free) && (udpTraceLevel))
  670. DBGLOG("UdpReceiver: ERROR: more packets in flight (%d) than slots free (%d)", i, free); // Should never happen
  671. inflight = i = free-1;
  672. }
  673. if (i && udpTraceLevel > 1)
  674. DBGLOG("UdpReceiver: adjusting free_slots to allow for %d in flight", i);
  675. return free - i;
  676. }
  677. public:
  678. IMPLEMENT_IINTERFACE;
  679. CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client, bool _encrypted)
  680. : collatorThread(*this), sendersTable([client_flow_port](const ServerIdentifier &ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
  681. {
  682. #ifndef _WIN32
  683. setpriority(PRIO_PROCESS, 0, -15);
  684. #endif
  685. encrypted = _encrypted;
  686. receive_flow_port = server_flow_port;
  687. data_port = d_port;
  688. input_queue_size = queue_size;
  689. input_queue = new queue_t(queue_size);
  690. data = new receive_data(*this);
  691. receive_flow = new receive_receive_flow(*this, server_flow_port, m_slot_pr_client);
  692. if (udpSnifferEnabled)
  693. sniffer = new receive_sniffer(*this, snif_port, multicast_ip);
  694. else
  695. sniffer = nullptr;
  696. running = true;
  697. collatorThread.start();
  698. data->start();
  699. receive_flow->start();
  700. if (udpSnifferEnabled)
  701. sniffer->start();
  702. MilliSleep(15);
  703. }
  704. ~CReceiveManager()
  705. {
  706. running = false;
  707. input_queue->interrupt();
  708. collatorThread.join();
  709. delete data;
  710. delete receive_flow;
  711. delete sniffer;
  712. delete input_queue;
  713. }
  714. virtual void detachCollator(const IMessageCollator *msgColl)
  715. {
  716. ruid_t ruid = msgColl->queryRUID();
  717. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
  718. {
  719. SpinBlock b(collatorsLock);
  720. collators.erase(ruid);
  721. }
  722. msgColl->Release();
  723. }
  724. void collatePackets()
  725. {
  726. while(running)
  727. {
  728. DataBuffer *dataBuff = input_queue->pop(true);
  729. collatePacket(dataBuff);
  730. }
  731. }
  732. void collatePacket(DataBuffer *dataBuff)
  733. {
  734. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  735. if (udpTraceLevel >= 4)
  736. {
  737. StringBuffer s;
  738. DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
  739. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
  740. }
  741. Linked <CMessageCollator> msgColl;
  742. bool isDefault = false;
  743. {
  744. SpinBlock b(collatorsLock);
  745. try
  746. {
  747. msgColl.set(collators[pktHdr->ruid]);
  748. if (!msgColl)
  749. {
  750. msgColl.set(collators[RUID_DISCARD]);
  751. isDefault = true;
  752. unwantedDiscarded++;
  753. }
  754. }
  755. catch (IException *E)
  756. {
  757. EXCLOG(E);
  758. E->Release();
  759. }
  760. catch (...)
  761. {
  762. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  763. EXCLOG(E);
  764. E->Release();
  765. }
  766. }
  767. if (udpTraceLevel && isDefault)
  768. {
  769. StringBuffer s;
  770. DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str());
  771. }
  772. if (msgColl && msgColl->attach_databuffer(dataBuff))
  773. dataBuff = nullptr;
  774. else
  775. dataBuff->Release();
  776. }
  777. virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
  778. {
  779. CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid);
  780. if (udpTraceLevel >= 2)
  781. DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
  782. {
  783. SpinBlock b(collatorsLock);
  784. collators[ruid] = msgColl;
  785. }
  786. msgColl->Link();
  787. return msgColl;
  788. }
  789. };
  790. IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
  791. int sniffer_port, const IpAddress &sniffer_multicast_ip,
  792. int udpQueueSize, unsigned maxSlotsPerSender,
  793. bool encrypted)
  794. {
  795. assertex (maxSlotsPerSender <= (unsigned) udpQueueSize);
  796. return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender, encrypted);
  797. }
  798. /*
  799. Thoughts on flow control / streaming:
  800. 1. The "continuation packet" mechanism does have some advantages
  801. - easy recovery from agent failures
  802. - agent recovers easily from Roxie server failures
  803. - flow control is simple (but is it effective?)
  804. 2. Abandoning continuation packet in favour of streaming would give us the following issues:
  805. - would need some flow control to stop getting ahead of a Roxie server that consumed slowly
  806. - flow control is non trivial if you want to avoid tying up a agent thread and want agent to be able to recover from Roxie server failure
  807. - Need to work out how to do GSS - the nextGE info needs to be passed back in the flow control?
  808. - can't easily recover from agent failures if you already started processing
  809. - unless you assume that the results from agent are always deterministic and can retry and skip N
  810. - potentially ties up a agent thread for a while
  811. - do we need to have a larger thread pool but limit how many actually active?
  812. 3. Order of work
  813. - Just adding streaming while ignoring flow control and continuation stuff (i.e. we still stop for permission to continue periodically)
  814. - Shouldn't make anything any _worse_ ...
  815. - except that won't be able to recover from a agent dying mid-stream (at least not without some considerable effort)
  816. - what will happen then?
  817. - May also break server-side caching (that no-one has used AFAIK). Maybe restrict to nohits as we change....
  818. - Add some flow control
  819. - would prevent agent getting too far ahead in cases that are inadequately flow-controlled today
  820. - shouldn't make anything any worse...
  821. - Think about removing continuation mechanism from some cases
  822. Per Gavin, streaming would definitely help for the lowest frequency term. It may help for the others as well if it avoided any significant start up costs - e.g., opening the indexes,
  823. creating the segment monitors, creating the various cursors, and serialising the context (especially because there are likely to be multiple cursors).
  824. To add streaming:
  825. - Need to check for meta availability other than when first received
  826. - when ?
  827. - Need to cope with a getNext() blocking without it causing issues
  828. - perhaps should recode getNext() of variable-size rows first?
  829. More questions:
  830. - Can we afford the memory for the resend info?
  831. - Save maxPacketsPerSender per sender ?
  832. - are we really handling restart and sequence wraparound correctly?
  833. - what about server-side caching? Makes it hard
  834. - but maybe we should only cache tiny replies anyway....
  835. Problems found while testing implemetnation:
  836. - the unpacker cursor read code is crap
  837. - there is a potential to deadlock when need to make a callback agent->server during a streamed result (indexread5 illustrates)
  838. - resolution callback code doesn't really need to be query specific - could go to the default handler
  839. - but other callbacks - ALIVE, EXCEPTION, and debugger are not so clear
  840. - It's not at all clear where to move the code for processing metadata
  841. - callback paradigm would solve both - but it has to be on a client thread (e.g. from within call to next()).
  842. The following are used in "pseudo callback" mode:
  843. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  844. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  845. #define ROXIE_PING 0x3ffffff9u
  846. - goes to own handler anyway
  847. #define ROXIE_TRACEINFO 0x3ffffffau
  848. - could go in meta? Not time critical. Could all go to single handler? (a bit hard since we want to intercept for caller...)
  849. #define ROXIE_FILECALLBACK 0x3ffffffbu
  850. - could go to single handler
  851. #define ROXIE_ALIVE 0x3ffffffcu
  852. - currently getting delayed a bit too much potentially if downstream processing is slow? Do I even need it if streaming?
  853. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  854. - could go in metadata of standard response
  855. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  856. - ditto
  857. #define ROXIE_EXCEPTION 0x3fffffffu
  858. - ditto
  859. And the continuation metadata.
  860. What if EVERYTHING was a callback? - here's an exception... here's some more rows... here's some tracing... here's some continuation metadata
  861. Somewhere sometime I need to marshall from one thread to another though (maybe more than once unless I can guarantee callback is always very fast)
  862. OR (is it the same) everything is metadata ? Metadata can contain any of the above information (apart from rows - or maybe they are just another type)
  863. If I can't deal quickly with a packet of information, I queue it up? Spanning complicates things though. I need to be able to spot complete portions of metadata
  864. (and in kind-of the same way I need to be able to spot complete rows of data even when they span multiple packets.) I think data is really a bit different from the rest -
  865. you expect it to be continuous and you want the others to interrupt the flow.
  866. If continuation info was restricted to a "yes/no" (i.e. had to be continued on same node as started on) could have simple "Is there any continuation" bit. Others are sent in their
  867. own packets so are a little different. Does that make it harder to recover? Not sure that it does really (just means that the window at which a failure causes a problem starts earlier).
  868. However it may be an issue tying up agent thread for a while (and do we know when to untie it if the Roxie server abandons/restarts?)
  869. Perhaps it makes sense to pause at this point (with streaming disabled and with retry mechanism optional)
  870. */