udptrr.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <string>
  14. #include <map>
  15. #include <queue>
  16. #include "jthread.hpp"
  17. #include "jlog.hpp"
  18. #include "jisem.hpp"
  19. #include "jsocket.hpp"
  20. #include "udplib.hpp"
  21. #include "udptrr.hpp"
  22. #include "udptrs.hpp"
  23. #include "udpipmap.hpp"
  24. #include "udpmsgpk.hpp"
  25. #include "roxiemem.hpp"
  26. #include "roxie.hpp"
  27. #ifdef _WIN32
  28. #include <io.h>
  29. #include <winsock2.h>
  30. #else
  31. #include <sys/socket.h>
  32. #include <sys/time.h>
  33. #include <sys/resource.h>
  34. #endif
  35. #include <thread>
  36. using roxiemem::DataBuffer;
  37. using roxiemem::IRowManager;
  38. unsigned udpMaxPendingPermits = 1;
  39. RelaxedAtomic<unsigned> flowPermitsSent = {0};
  40. RelaxedAtomic<unsigned> flowRequestsReceived = {0};
  41. RelaxedAtomic<unsigned> dataPacketsReceived = {0};
  42. static unsigned lastFlowPermitsSent = 0;
  43. static unsigned lastFlowRequestsReceived = 0;
  44. static unsigned lastDataPacketsReceived = 0;
  45. // The code that redirects flow messages from data socket to flow socket relies on the assumption tested here
  46. static_assert(sizeof(UdpRequestToSendMsg) < sizeof(UdpPacketHeader), "Expected UDP rts size to be less than packet header");
  47. class CReceiveManager : implements IReceiveManager, public CInterface
  48. {
  49. /*
  50. * The ReceiveManager has several threads:
  51. * 1. receive_receive_flow (priority 3)
  52. * - waits for packets on flow port
  53. * - maintains list of nodes that have pending requests
  54. * - sends ok_to_send to one sender (or more) at a time
  55. * 2. receive_data (priority 4)
  56. * - reads data packets off data socket
  57. * - runs at v. high priority
  58. * - used to have an option to perform collation on this thread but a bad idea:
  59. * - can block (ends up in memory manager via attachDataBuffer).
  60. * - Does not apply back pressure
  61. * - Just enqueues them. We don't give permission to send more than the queue can hold, but it's a soft limit
  62. * 3. PacketCollator (standard priority)
  63. * - dequeues packets
  64. * - collates packets
  65. *
  66. */
  67. /*
  68. * Handling lost packets
  69. *
  70. * We try to make lost packets unlikely by telling agents when to send (and making sure they don't send unless
  71. * there's a good chance that socket buffer will have room). But we can't legislate for network issues.
  72. *
  73. * What packets can be lost?
  74. * 1. Data packets - handled via sliding window of resendable packets (or by retrying whole query after a timeout, of resend logic disabled)
  75. * 2. RequestToSend - the sender's resend thread checks periodically. There's a short initial timeout for getting a reply (either "request_received"
  76. * or "okToSend"), then a longer timeout for actually sending.
  77. * 3. OkToSend - there is a timeout after which the permission is considered invalid (based on how long it SHOULD take to send them).
  78. * The requestToSend retry mechanism would then make sure retried.
  79. * MORE - if I don't get a response from OkToSend I should assume lost and requeue it.
  80. * 4. complete - covered by same timeout as okToSend. A lost complete will mean incoming data to that node stalls for the duration of this timeout,
  81. *
  82. */
  83. class UdpSenderEntry // one per node in the system
  84. {
  85. // This is created the first time a message from a previously unseen IP arrives, and remains alive indefinitely
  86. // Note that the various members are accessed by different threads, but no member is accessed from more than one thread
  87. // (except where noted) so protection is not required
  88. // Note that UDP ordering rules mean we can't guarantee that we don't see a "request_to_send" for the next transfer before
  89. // we see the "complete" for the current one. Even if we were sure network stack would not reorder, these come from different
  90. // threads on the sender side and the order is not 100% guaranteed, so we need to cope with it.
  91. // We also need to recover gracefully (and preferably quickly) if any flow or data messages go missing. Currently the sender
  92. // will resend the rts if no ok_to_send within timeout, but there may be a better way?
  93. public:
  94. // Used only by receive_flow thread
  95. IpAddress dest;
  96. ISocket *flowSocket = nullptr;
  97. UdpSenderEntry *prevSender = nullptr; // Used to form list of all senders that have outstanding requests
  98. UdpSenderEntry *nextSender = nullptr; // Used to form list of all senders that have outstanding requests
  99. flowType::flowCmd state = flowType::send_completed; // Meaning I'm not on any queue
  100. sequence_t flowSeq = 0; // the sender's most recent flow sequence number
  101. sequence_t sendSeq = 0; // the sender's most recent sequence number from request-to-send, representing sequence number of next packet it will send
  102. unsigned timeouts = 0; // How many consecutive timeouts have happened on the current request
  103. unsigned requestTime = 0; // When we received the active requestToSend
  104. unsigned timeStamp = 0; // When we last sent okToSend
  105. private:
  106. // Updated by receive_data thread, read atomically by receive_flow
  107. mutable CriticalSection psCrit;
  108. PacketTracker packetsSeen;
  109. public:
  110. UdpSenderEntry(const IpAddress &_dest, unsigned port) : dest(_dest)
  111. {
  112. SocketEndpoint ep(port, dest);
  113. #ifdef SOCKET_SIMULATION
  114. if (isUdpTestMode)
  115. flowSocket = CSimulatedWriteSocket::udp_connect(ep);
  116. else
  117. #endif
  118. flowSocket = ISocket::udp_connect(ep);
  119. }
  120. ~UdpSenderEntry()
  121. {
  122. if (flowSocket)
  123. {
  124. flowSocket->close();
  125. flowSocket->Release();
  126. }
  127. }
  128. bool noteSeen(UdpPacketHeader &hdr)
  129. {
  130. if (udpResendLostPackets)
  131. {
  132. CriticalBlock b(psCrit);
  133. return packetsSeen.noteSeen(hdr);
  134. }
  135. else
  136. return false;
  137. }
  138. bool canSendAny() const
  139. {
  140. // We can send some if (a) the first available new packet is less than TRACKER_BITS above the first unreceived packet or
  141. // (b) we are assuming arrival in order, and there are some marked seen that are > first unseen OR
  142. // (c) the oldest in-flight packet has expired
  143. if (!udpResendLostPackets)
  144. return true;
  145. {
  146. CriticalBlock b(psCrit);
  147. if (packetsSeen.canRecord(sendSeq))
  148. return true;
  149. if (udpAssumeSequential && packetsSeen.hasGaps())
  150. return true;
  151. }
  152. if (msTick()-requestTime > udpResendTimeout)
  153. return true;
  154. return false;
  155. }
  156. void acknowledgeRequest(const IpAddress &returnAddress, sequence_t _flowSeq, sequence_t _sendSeq)
  157. {
  158. if (flowSeq==_flowSeq)
  159. {
  160. // It's a duplicate request-to-send - ignore it? Or assume it means they lost our ok-to-send ? MORE - probably depends on state
  161. if (udpTraceLevel || udpTraceFlow)
  162. {
  163. StringBuffer s;
  164. DBGLOG("UdpFlow: ignoring duplicate requestToSend %" SEQF "u from node %s", _flowSeq, dest.getIpText(s).str());
  165. }
  166. return;
  167. }
  168. flowSeq = _flowSeq;
  169. sendSeq = _sendSeq;
  170. requestTime = msTick();
  171. timeouts = 0;
  172. try
  173. {
  174. UdpPermitToSendMsg msg;
  175. msg.cmd = flowType::request_received;
  176. msg.flowSeq = _flowSeq;
  177. msg.destNode = returnAddress;
  178. msg.max_data = 0;
  179. if (udpResendLostPackets)
  180. {
  181. CriticalBlock b(psCrit);
  182. msg.seen = packetsSeen.copy();
  183. }
  184. if (udpTraceLevel > 3 || udpTraceFlow)
  185. {
  186. StringBuffer ipStr;
  187. DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
  188. }
  189. flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
  190. flowPermitsSent++;
  191. }
  192. catch(IException *e)
  193. {
  194. StringBuffer d, s;
  195. DBGLOG("UdpReceiver: acknowledgeRequest failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
  196. e->Release();
  197. }
  198. }
  199. void requestToSend(unsigned maxTransfer, const IpAddress &returnAddress)
  200. {
  201. try
  202. {
  203. UdpPermitToSendMsg msg;
  204. msg.cmd = maxTransfer ? flowType::ok_to_send : flowType::request_received;
  205. msg.flowSeq = flowSeq;
  206. msg.destNode = returnAddress;
  207. msg.max_data = maxTransfer;
  208. if (udpResendLostPackets)
  209. {
  210. CriticalBlock b(psCrit);
  211. msg.seen = packetsSeen.copy();
  212. }
  213. if (udpTraceLevel > 3 || udpTraceFlow)
  214. {
  215. StringBuffer ipStr;
  216. DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
  217. }
  218. flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
  219. flowPermitsSent++;
  220. }
  221. catch(IException *e)
  222. {
  223. StringBuffer d, s;
  224. DBGLOG("UdpReceiver: requestToSend failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
  225. e->Release();
  226. }
  227. }
  228. };
  229. class SenderList
  230. {
  231. UdpSenderEntry *head = nullptr;
  232. UdpSenderEntry *tail = nullptr;
  233. unsigned numEntries = 0;
  234. void checkListIsValid(UdpSenderEntry *lookfor)
  235. {
  236. #ifdef _DEBUG
  237. UdpSenderEntry *prev = nullptr;
  238. UdpSenderEntry *finger = head;
  239. unsigned length = 0;
  240. while (finger)
  241. {
  242. if (finger==lookfor)
  243. lookfor = nullptr;
  244. prev = finger;
  245. finger = finger->nextSender;
  246. length++;
  247. }
  248. assert(prev == tail);
  249. assert(lookfor==nullptr);
  250. assert(numEntries==length);
  251. #endif
  252. }
  253. public:
  254. unsigned length() const { return numEntries; }
  255. operator UdpSenderEntry *() const
  256. {
  257. return head;
  258. }
  259. void append(UdpSenderEntry *sender)
  260. {
  261. if (tail)
  262. {
  263. tail->nextSender = sender;
  264. sender->prevSender = tail;
  265. tail = sender;
  266. }
  267. else
  268. {
  269. head = tail = sender;
  270. }
  271. numEntries++;
  272. checkListIsValid(sender);
  273. }
  274. void remove(UdpSenderEntry *sender)
  275. {
  276. if (sender->prevSender)
  277. sender->prevSender->nextSender = sender->nextSender;
  278. else
  279. head = sender->nextSender;
  280. if (sender->nextSender)
  281. sender->nextSender->prevSender = sender->prevSender;
  282. else
  283. tail = sender->prevSender;
  284. sender->prevSender = nullptr;
  285. sender->nextSender = nullptr;
  286. numEntries--;
  287. checkListIsValid(nullptr);
  288. }
  289. };
  290. IpMapOf<UdpSenderEntry> sendersTable;
  291. class receive_receive_flow : public Thread
  292. {
  293. CReceiveManager &parent;
  294. Owned<ISocket> flow_socket;
  295. const unsigned flow_port;
  296. const unsigned maxSlotsPerSender;
  297. std::atomic<bool> running = { false };
  298. SenderList pendingRequests; // List of people wanting permission to send
  299. SenderList pendingPermits; // List of people given permission to send
  300. void enqueueRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq)
  301. {
  302. switch (requester->state)
  303. {
  304. case flowType::ok_to_send:
  305. pendingPermits.remove(requester);
  306. // Fall through
  307. case flowType::send_completed:
  308. pendingRequests.append(requester);
  309. requester->state = flowType::request_to_send;
  310. break;
  311. case flowType::request_to_send:
  312. // Perhaps the sender never saw our permission? Already on queue...
  313. break;
  314. default:
  315. // Unexpected state, should never happen!
  316. DBGLOG("ERROR: Unexpected state %s in enqueueRequest", flowType::name(requester->state));
  317. throwUnexpected();
  318. break;
  319. }
  320. requester->acknowledgeRequest(myNode.getIpAddress(), flowSeq, sendSeq); // Acknowledge receipt of the request
  321. }
  322. void okToSend(UdpSenderEntry *requester, unsigned slots)
  323. {
  324. switch (requester->state)
  325. {
  326. case flowType::request_to_send:
  327. pendingRequests.remove(requester);
  328. // Fall through
  329. case flowType::send_completed:
  330. pendingPermits.append(requester);
  331. requester->state = flowType::ok_to_send;
  332. break;
  333. case flowType::ok_to_send:
  334. // Perhaps the sender never saw our permission? Already on queue...
  335. break;
  336. default:
  337. // Unexpected state, should never happen!
  338. DBGLOG("ERROR: Unexpected state %s in okToSend", flowType::name(requester->state));
  339. throwUnexpected();
  340. break;
  341. }
  342. requester->timeStamp = msTick();
  343. requester->requestToSend(slots, myNode.getIpAddress());
  344. }
  345. void noteDone(UdpSenderEntry *requester, UdpRequestToSendMsg &msg)
  346. {
  347. switch (requester->state)
  348. {
  349. case flowType::request_to_send:
  350. // A bit unexpected but will happen if our previous permission timed out and we pushed to back of the requests queue
  351. pendingRequests.remove(requester);
  352. break;
  353. case flowType::ok_to_send:
  354. pendingPermits.remove(requester);
  355. break;
  356. case flowType::send_completed:
  357. DBGLOG("Duplicate completed message received: msg %s flowSeq %" SEQF "u sendSeq %" SEQF "u. Ignoring", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq);
  358. break;
  359. default:
  360. // Unexpected state, should never happen! Ignore.
  361. DBGLOG("ERROR: Unexpected state %s in noteDone", flowType::name(requester->state));
  362. break;
  363. }
  364. requester->state = flowType::send_completed;
  365. }
  366. public:
  367. receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
  368. : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender)
  369. {
  370. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  371. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  372. #ifdef SOCKET_SIMULATION
  373. if (isUdpTestMode)
  374. flow_socket.setown(CSimulatedReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
  375. else
  376. #endif
  377. flow_socket.setown(ISocket::udp_create(flow_port));
  378. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  379. size32_t actualSize = flow_socket->get_receive_buffer_size();
  380. DBGLOG("UdpReceiver: receive_receive_flow created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
  381. }
  382. ~receive_receive_flow()
  383. {
  384. running = false;
  385. if (flow_socket)
  386. flow_socket->close();
  387. join();
  388. }
  389. virtual void start()
  390. {
  391. running = true;
  392. Thread::start();
  393. }
  394. virtual int run() override
  395. {
  396. DBGLOG("UdpReceiver: receive_receive_flow started");
  397. #ifdef __linux__
  398. setLinuxThreadPriority(3);
  399. #else
  400. adjustPriority(1);
  401. #endif
  402. UdpRequestToSendMsg msg;
  403. unsigned timeout = 5000;
  404. while (running)
  405. {
  406. try
  407. {
  408. if (udpTraceLevel > 5 || udpTraceFlow)
  409. {
  410. DBGLOG("UdpReceiver: wait_read(%u)", timeout);
  411. }
  412. bool dataAvail = flow_socket->wait_read(timeout);
  413. if (dataAvail)
  414. {
  415. const unsigned l = sizeof(msg);
  416. unsigned int res ;
  417. flow_socket->readtms(&msg, l, l, res, 0);
  418. flowRequestsReceived++;
  419. assert(res==l);
  420. if (udpTraceLevel > 5 || udpTraceFlow)
  421. {
  422. StringBuffer ipStr;
  423. DBGLOG("UdpReceiver: received %s msg flowSeq %" SEQF "u sendSeq %" SEQF "u from node=%s", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq, msg.sourceNode.getTraceText(ipStr).str());
  424. }
  425. UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode];
  426. switch (msg.cmd)
  427. {
  428. case flowType::request_to_send:
  429. enqueueRequest(sender, msg.flowSeq, msg.sendSeq);
  430. break;
  431. case flowType::send_completed:
  432. noteDone(sender, msg);
  433. break;
  434. case flowType::request_to_send_more:
  435. noteDone(sender, msg);
  436. enqueueRequest(sender, msg.flowSeq+1, msg.sendSeq);
  437. break;
  438. default:
  439. DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
  440. }
  441. }
  442. timeout = 5000; // The default timeout is 5 seconds if nothing is waiting for response...
  443. if (pendingPermits)
  444. {
  445. unsigned now = msTick();
  446. for (UdpSenderEntry *finger = pendingPermits; finger != nullptr; )
  447. {
  448. if (now - finger->timeStamp >= udpRequestToSendAckTimeout)
  449. {
  450. if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
  451. {
  452. StringBuffer s;
  453. DBGLOG("permit to send %" SEQF "u to node %s timed out after %u ms, rescheduling", finger->flowSeq, finger->dest.getIpText(s).str(), udpRequestToSendAckTimeout);
  454. }
  455. UdpSenderEntry *next = finger->nextSender;
  456. pendingPermits.remove(finger);
  457. if (++finger->timeouts > udpMaxRetryTimedoutReqs && udpMaxRetryTimedoutReqs != 0)
  458. {
  459. if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
  460. {
  461. StringBuffer s;
  462. DBGLOG("permit to send %" SEQF "u to node %s timed out %u times - abandoning", finger->flowSeq, finger->dest.getIpText(s).str(), finger->timeouts);
  463. }
  464. }
  465. else
  466. {
  467. // Put it back on the queue (at the back)
  468. finger->timeStamp = now;
  469. pendingRequests.append(finger);
  470. finger->state = flowType::request_to_send;
  471. }
  472. finger = next;
  473. }
  474. else
  475. {
  476. timeout = finger->timeStamp + udpRequestToSendAckTimeout - now;
  477. break;
  478. }
  479. }
  480. }
  481. unsigned slots = parent.input_queue->available();
  482. bool anyCanSend = false;
  483. for (UdpSenderEntry *finger = pendingRequests; finger != nullptr; finger = finger->nextSender)
  484. {
  485. if (pendingPermits.length()>=udpMaxPendingPermits)
  486. break;
  487. if (!slots) // || slots<minSlotsPerSender)
  488. {
  489. timeout = 1; // Slots should free up very soon!
  490. break;
  491. }
  492. // If requester would not be able to send me any (because of the ones in flight) then wait
  493. if (finger->canSendAny())
  494. {
  495. unsigned requestSlots = slots;
  496. if (requestSlots>maxSlotsPerSender)
  497. requestSlots = maxSlotsPerSender;
  498. okToSend(finger, requestSlots);
  499. slots -= requestSlots;
  500. if (timeout > udpRequestToSendAckTimeout)
  501. timeout = udpRequestToSendAckTimeout;
  502. anyCanSend = true;
  503. }
  504. else
  505. {
  506. if (udpTraceFlow)
  507. {
  508. StringBuffer s;
  509. DBGLOG("Sender %s can't be given permission to send yet as resend buffer full", finger->dest.getIpText(s).str());
  510. }
  511. }
  512. }
  513. if (slots && pendingRequests.length() && pendingPermits.length()<udpMaxPendingPermits && !anyCanSend)
  514. {
  515. if (udpTraceFlow)
  516. {
  517. StringBuffer s;
  518. DBGLOG("All senders blocked by resend buffers");
  519. }
  520. timeout = 1; // Hopefully one of the senders should unblock soon
  521. }
  522. }
  523. catch (IException *e)
  524. {
  525. if (running)
  526. {
  527. StringBuffer s;
  528. DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
  529. }
  530. e->Release();
  531. }
  532. catch (...)
  533. {
  534. DBGLOG("UdpReceiver: receive_receive_flow::run unknown exception");
  535. }
  536. }
  537. return 0;
  538. }
  539. };
  540. class receive_data : public Thread
  541. {
  542. CReceiveManager &parent;
  543. ISocket *receive_socket = nullptr;
  544. ISocket *selfFlowSocket = nullptr;
  545. std::atomic<bool> running = { false };
  546. Semaphore started;
  547. public:
  548. receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
  549. {
  550. unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2;
  551. if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
  552. if (check_max_socket_read_buffer(ip_buffer) < 0)
  553. throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
  554. #ifdef SOCKET_SIMULATION
  555. if (isUdpTestMode)
  556. {
  557. receive_socket = CSimulatedReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
  558. selfFlowSocket = CSimulatedWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
  559. }
  560. else
  561. #endif
  562. {
  563. receive_socket = ISocket::udp_create(parent.data_port);
  564. selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
  565. }
  566. receive_socket->set_receive_buffer_size(ip_buffer);
  567. size32_t actualSize = receive_socket->get_receive_buffer_size();
  568. DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
  569. running = false;
  570. }
  571. virtual void start()
  572. {
  573. running = true;
  574. Thread::start();
  575. started.wait();
  576. }
  577. ~receive_data()
  578. {
  579. running = false;
  580. if (receive_socket)
  581. receive_socket->close();
  582. if (selfFlowSocket)
  583. selfFlowSocket->close();
  584. join();
  585. ::Release(receive_socket);
  586. ::Release(selfFlowSocket);
  587. }
  588. virtual int run()
  589. {
  590. DBGLOG("UdpReceiver: receive_data started");
  591. #ifdef __linux__
  592. setLinuxThreadPriority(4);
  593. #else
  594. adjustPriority(2);
  595. #endif
  596. DataBuffer *b = NULL;
  597. started.signal();
  598. unsigned lastOOOReport = 0;
  599. unsigned lastPacketsOOO = 0;
  600. while (running)
  601. {
  602. try
  603. {
  604. unsigned int res;
  605. b = bufferManager->allocate();
  606. while (true)
  607. {
  608. receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
  609. if (res!=sizeof(UdpRequestToSendMsg))
  610. break;
  611. selfFlowSocket->write(b->data, res);
  612. }
  613. dataPacketsReceived++;
  614. UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
  615. assert(hdr.length == res && hdr.length > sizeof(hdr));
  616. UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
  617. if (sender->noteSeen(hdr))
  618. {
  619. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  620. {
  621. StringBuffer s;
  622. DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
  623. }
  624. hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
  625. }
  626. else
  627. {
  628. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  629. {
  630. StringBuffer s;
  631. DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
  632. }
  633. }
  634. parent.input_queue->pushOwn(b);
  635. b = NULL;
  636. }
  637. catch (IException *e)
  638. {
  639. ::Release(b);
  640. b = NULL;
  641. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  642. {
  643. StringBuffer s;
  644. DBGLOG("UdpReceiver: receive_data::run read failed port=%u - Exp: %s", parent.data_port, e->errorMessage(s).str());
  645. MilliSleep(1000); // Give a chance for mem free
  646. }
  647. e->Release();
  648. }
  649. catch (...)
  650. {
  651. ::Release(b);
  652. b = NULL;
  653. DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
  654. MilliSleep(1000);
  655. }
  656. if (udpStatsReportInterval)
  657. {
  658. unsigned now = msTick();
  659. if (now-lastOOOReport > udpStatsReportInterval)
  660. {
  661. lastOOOReport = now;
  662. if (packetsOOO > lastPacketsOOO)
  663. {
  664. DBGLOG("%u more packets received out-of-order by this server (%u total)", packetsOOO-lastPacketsOOO, packetsOOO-0);
  665. lastPacketsOOO = packetsOOO;
  666. }
  667. if (flowRequestsReceived > lastFlowRequestsReceived)
  668. {
  669. DBGLOG("%u more flow requests received by this server (%u total)", flowRequestsReceived-lastFlowRequestsReceived, flowRequestsReceived-0);
  670. lastFlowRequestsReceived = flowRequestsReceived;
  671. }
  672. if (flowPermitsSent > lastFlowPermitsSent)
  673. {
  674. DBGLOG("%u more flow permits sent by this server (%u total)", flowPermitsSent-lastFlowPermitsSent, flowPermitsSent-0);
  675. lastFlowPermitsSent = flowPermitsSent;
  676. }
  677. if (dataPacketsReceived > lastDataPacketsReceived)
  678. {
  679. DBGLOG("%u more data packets received by this server (%u total)", dataPacketsReceived-lastDataPacketsReceived, dataPacketsReceived-0);
  680. lastDataPacketsReceived = dataPacketsReceived;
  681. }
  682. }
  683. }
  684. }
  685. ::Release(b);
  686. return 0;
  687. }
  688. };
  689. class CPacketCollator : public Thread
  690. {
  691. CReceiveManager &parent;
  692. public:
  693. CPacketCollator(CReceiveManager &_parent) : Thread("CPacketCollator"), parent(_parent) {}
  694. virtual int run()
  695. {
  696. DBGLOG("UdpReceiver: CPacketCollator::run");
  697. parent.collatePackets();
  698. return 0;
  699. }
  700. } collatorThread;
  701. friend class receive_receive_flow;
  702. friend class receive_send_flow;
  703. friend class receive_data;
  704. friend class ReceiveFlowManager;
  705. queue_t *input_queue;
  706. int input_queue_size;
  707. receive_receive_flow *receive_flow;
  708. receive_data *data;
  709. int receive_flow_port;
  710. int data_port;
  711. std::atomic<bool> running = { false };
  712. bool encrypted = false;
  713. typedef std::map<ruid_t, CMessageCollator*> uid_map;
  714. uid_map collators;
  715. SpinLock collatorsLock; // protects access to collators map
  716. public:
  717. IMPLEMENT_IINTERFACE;
  718. CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int queue_size, int m_slot_pr_client, bool _encrypted)
  719. : collatorThread(*this), encrypted(_encrypted), sendersTable([client_flow_port](const ServerIdentifier ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
  720. {
  721. #ifndef _WIN32
  722. setpriority(PRIO_PROCESS, 0, -15);
  723. #endif
  724. receive_flow_port = server_flow_port;
  725. data_port = d_port;
  726. input_queue_size = queue_size;
  727. input_queue = new queue_t(queue_size);
  728. data = new receive_data(*this);
  729. receive_flow = new receive_receive_flow(*this, server_flow_port, m_slot_pr_client);
  730. running = true;
  731. collatorThread.start();
  732. data->start();
  733. receive_flow->start();
  734. MilliSleep(15);
  735. }
  736. ~CReceiveManager()
  737. {
  738. running = false;
  739. input_queue->interrupt();
  740. collatorThread.join();
  741. delete data;
  742. delete receive_flow;
  743. delete input_queue;
  744. }
  745. virtual void detachCollator(const IMessageCollator *msgColl)
  746. {
  747. ruid_t ruid = msgColl->queryRUID();
  748. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
  749. {
  750. SpinBlock b(collatorsLock);
  751. collators.erase(ruid);
  752. }
  753. msgColl->Release();
  754. }
  755. void collatePackets()
  756. {
  757. while(running)
  758. {
  759. try
  760. {
  761. DataBuffer *dataBuff = input_queue->pop(true);
  762. collatePacket(dataBuff);
  763. }
  764. catch (IException * e)
  765. {
  766. //An interrupted semaphore exception is expected at closedown - anything else should be reported
  767. if (!dynamic_cast<InterruptedSemaphoreException *>(e))
  768. EXCLOG(e);
  769. e->Release();
  770. }
  771. }
  772. }
  773. void collatePacket(DataBuffer *dataBuff)
  774. {
  775. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  776. if (udpTraceLevel >= 4)
  777. {
  778. StringBuffer s;
  779. DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
  780. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
  781. }
  782. Linked <CMessageCollator> msgColl;
  783. bool isDefault = false;
  784. {
  785. SpinBlock b(collatorsLock);
  786. try
  787. {
  788. msgColl.set(collators[pktHdr->ruid]);
  789. if (!msgColl)
  790. {
  791. msgColl.set(collators[RUID_DISCARD]);
  792. isDefault = true;
  793. unwantedDiscarded++;
  794. }
  795. }
  796. catch (IException *E)
  797. {
  798. EXCLOG(E);
  799. E->Release();
  800. }
  801. catch (...)
  802. {
  803. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  804. EXCLOG(E);
  805. E->Release();
  806. }
  807. }
  808. if (udpTraceLevel && isDefault && !isUdpTestMode)
  809. {
  810. StringBuffer s;
  811. DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str());
  812. }
  813. if (msgColl && msgColl->attach_databuffer(dataBuff))
  814. dataBuff = nullptr;
  815. else
  816. dataBuff->Release();
  817. }
  818. virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
  819. {
  820. CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
  821. if (udpTraceLevel > 2)
  822. DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
  823. {
  824. SpinBlock b(collatorsLock);
  825. collators[ruid] = msgColl;
  826. }
  827. msgColl->Link();
  828. return msgColl;
  829. }
  830. };
  831. IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
  832. int udpQueueSize, unsigned maxSlotsPerSender,
  833. bool encrypted)
  834. {
  835. assertex (maxSlotsPerSender <= (unsigned) udpQueueSize);
  836. assertex (maxSlotsPerSender <= (unsigned) TRACKER_BITS);
  837. return new CReceiveManager(server_flow_port, data_port, client_flow_port, udpQueueSize, maxSlotsPerSender, encrypted);
  838. }
  839. /*
  840. Thoughts on flow control / streaming:
  841. 1. The "continuation packet" mechanism does have some advantages
  842. - easy recovery from agent failures
  843. - agent recovers easily from Roxie server failures
  844. - flow control is simple (but is it effective?)
  845. 2. Abandoning continuation packet in favour of streaming would give us the following issues:
  846. - would need some flow control to stop getting ahead of a Roxie server that consumed slowly
  847. - flow control is non trivial if you want to avoid tying up a agent thread and want agent to be able to recover from Roxie server failure
  848. - Need to work out how to do GSS - the nextGE info needs to be passed back in the flow control?
  849. - can't easily recover from agent failures if you already started processing
  850. - unless you assume that the results from agent are always deterministic and can retry and skip N
  851. - potentially ties up a agent thread for a while
  852. - do we need to have a larger thread pool but limit how many actually active?
  853. 3. Order of work
  854. - Just adding streaming while ignoring flow control and continuation stuff (i.e. we still stop for permission to continue periodically)
  855. - Shouldn't make anything any _worse_ ...
  856. - except that won't be able to recover from a agent dying mid-stream (at least not without some considerable effort)
  857. - what will happen then?
  858. - May also break server-side caching (that no-one has used AFAIK). Maybe restrict to nohits as we change....
  859. - Add some flow control
  860. - would prevent agent getting too far ahead in cases that are inadequately flow-controlled today
  861. - shouldn't make anything any worse...
  862. - Think about removing continuation mechanism from some cases
  863. Per Gavin, streaming would definitely help for the lowest frequency term. It may help for the others as well if it avoided any significant start up costs - e.g., opening the indexes,
  864. creating the segment monitors, creating the various cursors, and serialising the context (especially because there are likely to be multiple cursors).
  865. To add streaming:
  866. - Need to check for meta availability other than when first received
  867. - when ?
  868. - Need to cope with a getNext() blocking without it causing issues
  869. - perhaps should recode getNext() of variable-size rows first?
  870. More questions:
  871. - Can we afford the memory for the resend info?
  872. - Save maxPacketsPerSender per sender ?
  873. - are we really handling restart and sequence wraparound correctly?
  874. - what about server-side caching? Makes it hard
  875. - but maybe we should only cache tiny replies anyway....
  876. Problems found while testing implemetnation:
  877. - the unpacker cursor read code is crap
  878. - there is a potential to deadlock when need to make a callback agent->server during a streamed result (indexread5 illustrates)
  879. - resolution callback code doesn't really need to be query specific - could go to the default handler
  880. - but other callbacks - ALIVE, EXCEPTION, and debugger are not so clear
  881. - It's not at all clear where to move the code for processing metadata
  882. - callback paradigm would solve both - but it has to be on a client thread (e.g. from within call to next()).
  883. The following are used in "pseudo callback" mode:
  884. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  885. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  886. #define ROXIE_PING 0x3ffffff9u
  887. - goes to own handler anyway
  888. #define ROXIE_TRACEINFO 0x3ffffffau
  889. - could go in meta? Not time critical. Could all go to single handler? (a bit hard since we want to intercept for caller...)
  890. #define ROXIE_FILECALLBACK 0x3ffffffbu
  891. - could go to single handler
  892. #define ROXIE_ALIVE 0x3ffffffcu
  893. - currently getting delayed a bit too much potentially if downstream processing is slow? Do I even need it if streaming?
  894. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  895. - could go in metadata of standard response
  896. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  897. - ditto
  898. #define ROXIE_EXCEPTION 0x3fffffffu
  899. - ditto
  900. And the continuation metadata.
  901. What if EVERYTHING was a callback? - here's an exception... here's some more rows... here's some tracing... here's some continuation metadata
  902. Somewhere sometime I need to marshall from one thread to another though (maybe more than once unless I can guarantee callback is always very fast)
  903. OR (is it the same) everything is metadata ? Metadata can contain any of the above information (apart from rows - or maybe they are just another type)
  904. If I can't deal quickly with a packet of information, I queue it up? Spanning complicates things though. I need to be able to spot complete portions of metadata
  905. (and in kind-of the same way I need to be able to spot complete rows of data even when they span multiple packets.) I think data is really a bit different from the rest -
  906. you expect it to be continuous and you want the others to interrupt the flow.
  907. If continuation info was restricted to a "yes/no" (i.e. had to be continued on same node as started on) could have simple "Is there any continuation" bit. Others are sent in their
  908. own packets so are a little different. Does that make it harder to recover? Not sure that it does really (just means that the window at which a failure causes a problem starts earlier).
  909. However it may be an issue tying up agent thread for a while (and do we know when to untie it if the Roxie server abandons/restarts?)
  910. Perhaps it makes sense to pause at this point (with streaming disabled and with retry mechanism optional)
  911. */