udptrr.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #undef new
  14. #include <string>
  15. #include <map>
  16. #include <queue>
  17. #include "jthread.hpp"
  18. #include "jlog.hpp"
  19. #include "jisem.hpp"
  20. #include "jsocket.hpp"
  21. #include "udplib.hpp"
  22. #include "udptrr.hpp"
  23. #include "udptrs.hpp"
  24. #include "roxiemem.hpp"
  25. #include "roxie.hpp"
  26. #ifdef _WIN32
  27. #include <io.h>
  28. #include <winsock2.h>
  29. #else
  30. #include <sys/socket.h>
  31. #include <sys/time.h>
  32. #include <sys/resource.h>
  33. #endif
  34. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  35. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  36. #endif
  37. using roxiemem::DataBuffer;
  38. using roxiemem::IRowManager;
  39. unsigned udpRetryBusySenders = 0; // seems faster with 0 than 1 in my testing on small clusters and sustained throughput
  40. unsigned udpInlineCollationPacketLimit;
  41. bool udpInlineCollation = false;
  42. bool udpSendCompletedInData = false;
  43. class CReceiveManager : implements IReceiveManager, public CInterface
  44. {
  45. class ReceiveFlowManager : public Thread
  46. {
  47. private:
  48. CReceiveManager &parent;
  49. class UdpSenderEntry // one per node in the system
  50. {
  51. unsigned destNodeIndex;
  52. unsigned myNodeIndex;
  53. ISocket *flowSocket;
  54. public:
  55. unsigned nextIndex; // Used to form list of all senders that have outstanding requests
  56. UdpSenderEntry()
  57. {
  58. nextIndex = (unsigned) -1;
  59. flowSocket = NULL;
  60. destNodeIndex = (unsigned) -1;
  61. myNodeIndex = (unsigned) -1;
  62. }
  63. ~UdpSenderEntry()
  64. {
  65. if (flowSocket)
  66. {
  67. flowSocket->close();
  68. flowSocket->Release();
  69. }
  70. }
  71. void init(unsigned _destNodeIndex, unsigned _myNodeIndex, unsigned port)
  72. {
  73. assert(!flowSocket);
  74. destNodeIndex = _destNodeIndex;
  75. myNodeIndex = _myNodeIndex;
  76. SocketEndpoint ep(port, getNodeAddress(destNodeIndex));
  77. flowSocket = ISocket::udp_connect(ep);
  78. }
  79. void requestToSend(unsigned maxTransfer)
  80. {
  81. try
  82. {
  83. UdpPermitToSendMsg msg;
  84. msg.length = sizeof(UdpPermitToSendMsg);
  85. msg.cmd = flow_t::ok_to_send;
  86. msg.destNodeIndex = myNodeIndex;
  87. msg.max_data = maxTransfer;
  88. #ifdef CRC_MESSAGES
  89. msg.crc = msg.calcCRC();
  90. #endif
  91. flowSocket->write(&msg, msg.length);
  92. }
  93. catch(IException *e)
  94. {
  95. StringBuffer s;
  96. DBGLOG("UdpReceiver: send_acknowledge failed node=%u %s", destNodeIndex, e->errorMessage(s).str());
  97. e->Release();
  98. }
  99. }
  100. } *sendersTable;
  101. unsigned maxSenders;
  102. unsigned firstRequest;
  103. unsigned lastRequest;
  104. unsigned maxSlotsPerSender;
  105. bool running;
  106. SpinLock receiveFlowLock; // Protecting the currentTransfer variable and the chain of active transfers
  107. unsigned currentTransfer;
  108. Semaphore requestPending;
  109. Semaphore transferComplete;
  110. public:
  111. ReceiveFlowManager(CReceiveManager &_parent, unsigned _maxSenders, unsigned _maxSlotsPerSender)
  112. : Thread("UdpLib::ReceiveFlowManager"), parent(_parent)
  113. {
  114. firstRequest = (unsigned) -1;
  115. lastRequest = (unsigned) -1;
  116. currentTransfer = (unsigned) -1;
  117. running = false;
  118. maxSenders = _maxSenders;
  119. maxSlotsPerSender = _maxSlotsPerSender;
  120. sendersTable = new UdpSenderEntry[maxSenders];
  121. for (unsigned i = 0; i < maxSenders; i++)
  122. {
  123. sendersTable[i].init(i, parent.myNodeIndex, parent.send_flow_port);
  124. }
  125. }
  126. ~ReceiveFlowManager()
  127. {
  128. running = false;
  129. requestPending.signal();
  130. transferComplete.signal();
  131. join();
  132. delete [] sendersTable;
  133. }
  134. unsigned send_acknowledge()
  135. {
  136. int timeout = 1;
  137. unsigned max_transfer;
  138. UdpSenderEntry *sender = NULL;
  139. {
  140. SpinBlock b(receiveFlowLock);
  141. if (firstRequest != (unsigned) -1)
  142. {
  143. assert(firstRequest < maxSenders);
  144. //find first non-busy sender, and move it to front of sendersTable request chain
  145. int retry = udpRetryBusySenders;
  146. unsigned finger = firstRequest;
  147. unsigned prev = -1;
  148. for (;;)
  149. {
  150. if (udpSnifferEnabled && parent.sniffer->is_busy(finger))
  151. {
  152. prev = finger;
  153. finger = sendersTable[finger].nextIndex;
  154. if (finger==(unsigned)-1)
  155. {
  156. if (retry--)
  157. {
  158. if (udpTraceLevel > 4)
  159. DBGLOG("UdpReceive: All senders busy");
  160. MilliSleep(1);
  161. finger = firstRequest;
  162. prev = -1;
  163. }
  164. else
  165. break; // give up and use first anyway
  166. }
  167. }
  168. else
  169. {
  170. if (finger != firstRequest)
  171. {
  172. if (finger == lastRequest)
  173. lastRequest = prev;
  174. assert(prev != -1);
  175. sendersTable[prev].nextIndex = sendersTable[finger].nextIndex;
  176. sendersTable[finger].nextIndex = firstRequest;
  177. firstRequest = finger;
  178. }
  179. break;
  180. }
  181. }
  182. if (udpInlineCollation)
  183. max_transfer = udpInlineCollationPacketLimit;
  184. else
  185. max_transfer = parent.input_queue->free_slots();
  186. if (max_transfer > maxSlotsPerSender)
  187. max_transfer = maxSlotsPerSender;
  188. timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
  189. currentTransfer = firstRequest;
  190. sender = &sendersTable[firstRequest];
  191. //indicate not in queue (MORE - what if wanted to send > allowing?? Do we know how much it wanted to send?)
  192. if (firstRequest==lastRequest)
  193. lastRequest = (unsigned) -1;
  194. firstRequest = sender->nextIndex;
  195. sender->nextIndex = (unsigned) -1;
  196. }
  197. }
  198. if (sender)
  199. sender->requestToSend(max_transfer);
  200. return timeout;
  201. }
  202. void request(const UdpRequestToSendMsg &msg)
  203. {
  204. unsigned index = msg.sourceNodeIndex;
  205. assertex(index < maxSenders);
  206. UdpSenderEntry &sender = sendersTable[index];
  207. {
  208. SpinBlock b(receiveFlowLock);
  209. if ((lastRequest == index) || (sender.nextIndex != (unsigned) -1))
  210. {
  211. DBGLOG("UdpReceiver: received duplicate request_to_send msg from node=%d", index);
  212. return;
  213. }
  214. // Chain it onto list
  215. if (firstRequest != (unsigned) -1)
  216. sendersTable[lastRequest].nextIndex = index;
  217. else
  218. firstRequest = index;
  219. lastRequest = index;
  220. }
  221. requestPending.signal();
  222. }
  223. void completed(unsigned index)
  224. {
  225. assert(index < maxSenders);
  226. bool isCurrent;
  227. {
  228. SpinBlock b(receiveFlowLock);
  229. isCurrent = (index == currentTransfer);
  230. }
  231. if (isCurrent)
  232. transferComplete.signal();
  233. else
  234. DBGLOG("UdpReceiver: completed msg from node %u is not for current transfer (%u) ", index, currentTransfer);
  235. }
  236. virtual void start()
  237. {
  238. running = true;
  239. Thread::start();
  240. }
  241. virtual int run()
  242. {
  243. DBGLOG("UdpReceiver: ReceiveFlowManager started");
  244. if (udpSnifferSendThreadPriority)
  245. {
  246. #ifdef __linux__
  247. setLinuxThreadPriority(udpSnifferSendThreadPriority);
  248. #else
  249. adjustPriority(1);
  250. #endif
  251. }
  252. while (running)
  253. {
  254. requestPending.wait();
  255. unsigned maxTime = send_acknowledge();
  256. if (!transferComplete.wait(maxTime) && udpTraceLevel > 0)
  257. {
  258. DBGLOG("UdpReceiver: transfer timed out after %d ms from node=%u", maxTime, currentTransfer);
  259. // MORE - a timeout here means everything stalled... look into when it can happen!
  260. }
  261. }
  262. return 0;
  263. }
  264. };
  265. class receive_sniffer : public Thread
  266. {
  267. struct SnifferEntry {
  268. time_t timeStamp;
  269. char busy;
  270. SnifferEntry() { timeStamp = 0; busy = 0; }
  271. } *snifferTable;
  272. ISocket *sniffer_socket;
  273. unsigned snifferPort;
  274. IpAddress snifferIP;
  275. CReceiveManager &parent;
  276. bool running;
  277. inline void update(unsigned index, char busy)
  278. {
  279. if (udpTraceLevel > 5)
  280. DBGLOG("UdpReceive: sniffer sets is_busy[%d} to %d", index, busy);
  281. snifferTable[index].busy = busy;
  282. if (busy)
  283. time(&snifferTable[index].timeStamp);
  284. }
  285. public:
  286. receive_sniffer(CReceiveManager &_parent, unsigned _snifferPort, const IpAddress &_snifferIP, unsigned numNodes)
  287. : Thread("udplib::receive_sniffer"), parent(_parent), snifferPort(_snifferPort), snifferIP(_snifferIP), running(false)
  288. {
  289. snifferTable = new SnifferEntry[numNodes];
  290. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  291. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  292. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  293. sniffer_socket->set_receive_buffer_size(udpFlowSocketsSize);
  294. if (udpTraceLevel)
  295. {
  296. StringBuffer ipStr;
  297. snifferIP.getIpText(ipStr);
  298. size32_t actualSize = sniffer_socket->get_receive_buffer_size();
  299. DBGLOG("UdpReceiver: receive_sniffer port open %s:%i sockbuffsize=%d actual %d", ipStr.str(), snifferPort, udpFlowSocketsSize, actualSize);
  300. }
  301. }
  302. ~receive_sniffer()
  303. {
  304. running = false;
  305. if (sniffer_socket) sniffer_socket->close();
  306. join();
  307. if (sniffer_socket) sniffer_socket->Release();
  308. delete [] snifferTable;
  309. }
  310. bool is_busy(unsigned index)
  311. {
  312. if (snifferTable[index].busy)
  313. {
  314. time_t now;
  315. time(&now);
  316. return (now - snifferTable[index].timeStamp) < 10;
  317. }
  318. else
  319. return false;
  320. }
  321. virtual int run()
  322. {
  323. DBGLOG("UdpReceiver: sniffer started");
  324. if (udpSnifferReadThreadPriority)
  325. {
  326. #ifdef __linux__
  327. setLinuxThreadPriority(udpSnifferReadThreadPriority);
  328. #else
  329. adjustPriority(1);
  330. #endif
  331. }
  332. while (running)
  333. {
  334. try
  335. {
  336. unsigned int res;
  337. sniff_msg msg;
  338. sniffer_socket->read(&msg, 1, sizeof(msg), res, 5);
  339. update(msg.nodeIndex, msg.cmd == flow_t::busy);
  340. }
  341. catch (IException *e)
  342. {
  343. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  344. {
  345. StringBuffer s;
  346. DBGLOG("UdpReceiver: receive_sniffer::run read failed %s", e->errorMessage(s).str());
  347. MilliSleep(1000);
  348. }
  349. e->Release();
  350. }
  351. catch (...)
  352. {
  353. DBGLOG("UdpReceiver: receive_sniffer::run unknown exception port %u", parent.data_port);
  354. if (sniffer_socket) {
  355. sniffer_socket->Release();
  356. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  357. }
  358. MilliSleep(1000);
  359. }
  360. }
  361. return 0;
  362. }
  363. virtual void start()
  364. {
  365. if (udpSnifferEnabled)
  366. {
  367. running = true;
  368. Thread::start();
  369. }
  370. }
  371. };
  372. class receive_receive_flow : public Thread
  373. {
  374. Owned<ISocket> flow_socket;
  375. int flow_port;
  376. CReceiveManager &parent;
  377. bool running;
  378. public:
  379. receive_receive_flow(CReceiveManager &_parent, int flow_p) : Thread("UdpLib::receive_receive_flow"), parent(_parent)
  380. {
  381. flow_port = flow_p;
  382. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  383. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  384. flow_socket.setown(ISocket::udp_create(flow_port));
  385. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  386. size32_t actualSize = flow_socket->get_receive_buffer_size();
  387. DBGLOG("UdpReceiver: rcv_flow_socket created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
  388. running = false;
  389. }
  390. ~receive_receive_flow()
  391. {
  392. running = false;
  393. if (flow_socket)
  394. flow_socket->close();
  395. join();
  396. }
  397. virtual void start()
  398. {
  399. running = true;
  400. Thread::start();
  401. }
  402. virtual int run()
  403. {
  404. DBGLOG("UdpReceiver: receive_receive_flow started");
  405. #ifdef __linux__
  406. setLinuxThreadPriority(3);
  407. #else
  408. adjustPriority(1);
  409. #endif
  410. UdpRequestToSendMsg f;
  411. while (running)
  412. {
  413. try
  414. {
  415. int l = sizeof(f);
  416. unsigned int res ;
  417. flow_socket->read(&f, 1, l, res, 5);
  418. switch (f.cmd)
  419. {
  420. case flow_t::request_to_send:
  421. if (udpTraceLevel > 5)
  422. DBGLOG("UdpReceiver: received request_to_send msg from node=%u", f.sourceNodeIndex);
  423. parent.manager->request(f);
  424. break;
  425. case flow_t::send_completed:
  426. if (udpTraceLevel > 5)
  427. DBGLOG("UdpReceiver: received send_completed msg from node=%u", f.sourceNodeIndex);
  428. parent.manager->completed(f.sourceNodeIndex);
  429. break;
  430. case flow_t::request_to_send_more:
  431. if (udpTraceLevel > 5)
  432. DBGLOG("UdpReceiver: received request_to_send_more msg from node=%u", f.sourceNodeIndex);
  433. parent.manager->completed(f.sourceNodeIndex);
  434. parent.manager->request(f);
  435. break;
  436. default:
  437. DBGLOG("UdpReceiver: reveived unrecognized flow control message cmd=%i", f.cmd);
  438. }
  439. }
  440. catch (IException *e)
  441. {
  442. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  443. {
  444. StringBuffer s;
  445. DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
  446. }
  447. e->Release();
  448. }
  449. catch (...) {
  450. DBGLOG("UdpReceiver: receive_receive_flow::run unknown exception");
  451. MilliSleep(15);
  452. }
  453. }
  454. return 0;
  455. }
  456. };
  457. class receive_data : public Thread
  458. {
  459. CReceiveManager &parent;
  460. ISocket *receive_socket;
  461. bool running;
  462. Semaphore started;
  463. public:
  464. receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
  465. {
  466. unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD;
  467. if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
  468. if (check_max_socket_read_buffer(ip_buffer) < 0)
  469. throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
  470. receive_socket = ISocket::udp_create(parent.data_port);
  471. receive_socket->set_receive_buffer_size(ip_buffer);
  472. size32_t actualSize = receive_socket->get_receive_buffer_size();
  473. DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
  474. running = false;
  475. }
  476. virtual void start()
  477. {
  478. running = true;
  479. Thread::start();
  480. started.wait();
  481. }
  482. ~receive_data()
  483. {
  484. running = false;
  485. if (receive_socket)
  486. receive_socket->close();
  487. join();
  488. ::Release(receive_socket);
  489. }
  490. virtual int run()
  491. {
  492. DBGLOG("UdpReceiver: receive_data started");
  493. #ifdef __linux__
  494. setLinuxThreadPriority(4);
  495. #else
  496. adjustPriority(2);
  497. #endif
  498. DataBuffer *b = NULL;
  499. started.signal();
  500. while (running)
  501. {
  502. try
  503. {
  504. unsigned int res;
  505. b = bufferManager->allocate();
  506. receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
  507. UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
  508. unsigned flowBits = hdr.udpSequence;
  509. if (flowBits & UDP_SEQUENCE_COMPLETE)
  510. {
  511. parent.manager->completed(hdr.nodeIndex);
  512. }
  513. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  514. DBGLOG("UdpReceiver: %u bytes received, node=%u", res, hdr.nodeIndex);
  515. if (udpInlineCollation)
  516. parent.collatePacket(b);
  517. else
  518. parent.input_queue->pushOwn(b);
  519. b = NULL;
  520. }
  521. catch (IException *e)
  522. {
  523. ::Release(b);
  524. b = NULL;
  525. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  526. {
  527. StringBuffer s;
  528. DBGLOG("UdpReceiver: receive_data::run read failed port=%u - Exp: %s", parent.data_port, e->errorMessage(s).str());
  529. MilliSleep(1000); // Give a chance for mem free
  530. }
  531. e->Release();
  532. }
  533. catch (...)
  534. {
  535. ::Release(b);
  536. b = NULL;
  537. DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
  538. MilliSleep(1000);
  539. }
  540. }
  541. ::Release(b);
  542. return 0;
  543. }
  544. };
  545. class CPacketCollator : public Thread
  546. {
  547. CReceiveManager &parent;
  548. public:
  549. CPacketCollator(CReceiveManager &_parent) : Thread("CPacketCollator"), parent(_parent) {}
  550. virtual int run()
  551. {
  552. DBGLOG("UdpReceiver: CPacketCollator::run");
  553. parent.collatePackets();
  554. return 0;
  555. }
  556. } collatorThread;
  557. friend class receive_receive_flow;
  558. friend class receive_send_flow;
  559. friend class receive_data;
  560. friend class ReceiveFlowManager;
  561. friend class receive_sniffer;
  562. queue_t *input_queue;
  563. int input_queue_size;
  564. receive_receive_flow *receive_flow;
  565. receive_data *data;
  566. ReceiveFlowManager *manager;
  567. receive_sniffer *sniffer;
  568. unsigned myNodeIndex;
  569. int send_flow_port;
  570. int receive_flow_port;
  571. int data_port;
  572. bool running;
  573. typedef std::map<ruid_t, IMessageCollator*> uid_map;
  574. Linked<IMessageCollator> defaultMessageCollator;
  575. uid_map collators; // MORE - more sensible to use a jlib mapping I would have thought
  576. SpinLock collatorsLock; // protects access to collators map and defaultMessageCollator (note that defaultMessageCollator is not just set at startup)
  577. public:
  578. IMPLEMENT_IINTERFACE;
  579. CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client, unsigned _myNodeIndex)
  580. : collatorThread(*this)
  581. {
  582. #ifndef _WIN32
  583. setpriority(PRIO_PROCESS, 0, -15);
  584. #endif
  585. myNodeIndex = _myNodeIndex;
  586. receive_flow_port = server_flow_port;
  587. send_flow_port = client_flow_port;
  588. data_port = d_port;
  589. input_queue_size = queue_size;
  590. input_queue = new queue_t(queue_size);
  591. data = new receive_data(*this);
  592. manager = new ReceiveFlowManager(*this, getNumNodes(), m_slot_pr_client);
  593. receive_flow = new receive_receive_flow(*this, server_flow_port);
  594. if (udpSnifferEnabled)
  595. sniffer = new receive_sniffer(*this, snif_port, multicast_ip, getNumNodes());
  596. else
  597. sniffer = nullptr;
  598. running = true;
  599. collatorThread.start();
  600. data->start();
  601. manager->start();
  602. receive_flow->start();
  603. if (udpSnifferEnabled)
  604. sniffer->start();
  605. MilliSleep(15);
  606. }
  607. ~CReceiveManager()
  608. {
  609. running = false;
  610. input_queue->interrupt();
  611. collatorThread.join();
  612. delete data;
  613. delete receive_flow;
  614. delete manager;
  615. delete sniffer;
  616. delete input_queue;
  617. }
  618. virtual void detachCollator(const IMessageCollator *msgColl)
  619. {
  620. ruid_t ruid = msgColl->queryRUID();
  621. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
  622. {
  623. SpinBlock b(collatorsLock);
  624. collators.erase(ruid);
  625. }
  626. msgColl->Release();
  627. }
  628. virtual void setDefaultCollator(IMessageCollator *msgColl)
  629. {
  630. if (udpTraceLevel>=5) DBGLOG("UdpReceiver: setDefaultCollator");
  631. SpinBlock b(collatorsLock);
  632. defaultMessageCollator.set(msgColl);
  633. }
  634. void collatePackets()
  635. {
  636. unsigned lastDiscardedMsgSeq = 0;
  637. while(running)
  638. {
  639. DataBuffer *dataBuff = input_queue->pop();
  640. collatePacket(dataBuff);
  641. }
  642. }
  643. void collatePacket(DataBuffer *dataBuff)
  644. {
  645. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  646. if (udpTraceLevel >= 4)
  647. DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%u",
  648. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->nodeIndex);
  649. Linked <IMessageCollator> msgColl;
  650. bool isDefault = false;
  651. {
  652. SpinBlock b(collatorsLock);
  653. try
  654. {
  655. msgColl.set(collators[pktHdr->ruid]);
  656. if (!msgColl)
  657. {
  658. msgColl.set(defaultMessageCollator); // MORE - if we get a header, we can send an abort.
  659. isDefault = true;
  660. }
  661. }
  662. catch (IException *E)
  663. {
  664. EXCLOG(E);
  665. E->Release();
  666. }
  667. catch (...)
  668. {
  669. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  670. EXCLOG(E);
  671. E->Release();
  672. }
  673. }
  674. if (msgColl)
  675. {
  676. if (udpTraceLevel && isDefault)
  677. DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex);
  678. if (msgColl->add_package(dataBuff))
  679. {
  680. dataBuff = 0;
  681. }
  682. }
  683. else
  684. {
  685. // MORE - tell the slave to stop sending?
  686. // if (udpTraceLevel > 1 && lastDiscardedMsgSeq != pktHdr->msgSeq)
  687. // DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - discarding packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex);
  688. // lastDiscardedMsgSeq = pktHdr->msgSeq;
  689. }
  690. if (dataBuff)
  691. {
  692. dataBuff->Release();
  693. atomic_inc(&unwantedDiscarded);
  694. }
  695. }
  696. virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
  697. {
  698. IMessageCollator *msgColl = createCMessageCollator(rowManager, ruid);
  699. if (udpTraceLevel >= 2)
  700. DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
  701. {
  702. SpinBlock b(collatorsLock);
  703. collators[ruid] = msgColl;
  704. }
  705. msgColl->Link();
  706. return msgColl;
  707. }
  708. };
  709. IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
  710. int sniffer_port, const IpAddress &sniffer_multicast_ip,
  711. int udpQueueSize, unsigned maxSlotsPerSender,
  712. unsigned myNodeIndex)
  713. {
  714. assertex (maxSlotsPerSender <= udpQueueSize);
  715. return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender, myNodeIndex);
  716. }
  717. /*
  718. Thoughts on flow control / streaming:
  719. 1. The "continuation packet" mechanism does have some advantages
  720. - easy recovery from slave failures
  721. - slave recovers easily from Roxie server failures
  722. - flow control is simple (but is it effective?)
  723. 2. Abandoning continuation packet in favour of streaming would give us the following issues:
  724. - would need some flow control to stop getting ahead of a Roxie server that consumed slowly
  725. - flow control is non trivial if you want to avoid tying up a slave thread and want slave to be able to recover from Roxie server failure
  726. - Need to work out how to do GSS - the nextGE info needs to be passed back in the flow control?
  727. - can't easily recover from slave failures if you already started processing
  728. - unless you assume that the results from slave are always deterministic and can retry and skip N
  729. - potentially ties up a slave thread for a while
  730. - do we need to have a larger thread pool but limit how many actually active?
  731. 3. Order of work
  732. - Just adding streaming while ignoring flow control and continuation stuff (i.e. we still stop for permission to continue periodically)
  733. - Shouldn't make anything any _worse_ ...
  734. - except that won't be able to recover from a slave dying mid-stream (at least not without some considerable effort)
  735. - what will happen then?
  736. - May also break server-side caching (that no-one has used AFAIK). Maybe restrict to nohits as we change....
  737. - Add some flow control
  738. - would prevent slave getting too far ahead in cases that are inadequately flow-controlled today
  739. - shouldn't make anything any worse...
  740. - Think about removing continuation mechanism from some cases
  741. Per Gavin, streaming would definitely help for the lowest frequency term. It may help for the others as well if it avoided any significant start up costs - e.g., opening the indexes,
  742. creating the segment monitors, creating the various cursors, and serialising the context (especially because there are likely to be multiple cursors).
  743. To add streaming:
  744. - Need to check for meta availability other than when first received
  745. - when ?
  746. - Need to cope with a getNext() blocking without it causing issues
  747. - perhaps should recode getNext() of variable-size rows first?
  748. More questions:
  749. - Can we afford the memory for the resend info?
  750. - Save maxPacketsPerSender per sender ?
  751. - are we really handling restart and sequence wraparound correctly?
  752. - what about server-side caching? Makes it hard
  753. - but maybe we should only cache tiny replies anyway....
  754. Problems found while testing implemetnation:
  755. - the unpacker cursor read code is crap
  756. - there is a potential to deadlock when need to make a callback slave->server during a streamed result (indexread5 illustrates)
  757. - resolution callback code doesn't really need to be query specific - could go to the default handler
  758. - but other callbacks - ALIVE, EXCEPTION, and debugger are not so clear
  759. - It's not at all clear where to move the code for processing metadata
  760. - callback paradigm would solve both - but it has to be on a client thread (e.g. from within call to next()).
  761. The following are used in "pseudo callback" mode:
  762. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  763. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  764. #define ROXIE_PING 0x3ffffff9u
  765. - goes to own handler anyway
  766. #define ROXIE_TRACEINFO 0x3ffffffau
  767. - could go in meta? Not time critical. Could all go to single handler? (a bit hard since we want to intercept for caller...)
  768. #define ROXIE_FILECALLBACK 0x3ffffffbu
  769. - could go to single handler
  770. #define ROXIE_ALIVE 0x3ffffffcu
  771. - currently getting delayed a bit too much potentially if downstream processing is slow? Do I even need it if streaming?
  772. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  773. - could go in metadata of standard response
  774. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  775. - ditto
  776. #define ROXIE_EXCEPTION 0x3fffffffu
  777. - ditto
  778. And the continuation metadata.
  779. What if EVERYTHING was a callback? - here's an exception... here's some more rows... here's some tracing... here's some continuation metadata
  780. Somewhere sometime I need to marshall from one thread to another though (maybe more than once unless I can guarantee callback is always very fast)
  781. OR (is it the same) everything is metadata ? Metadata can contain any of the above information (apart from rows - or maybe they are just another type)
  782. If I can't deal quickly with a packet of information, I queue it up? Spanning complicates things though. I need to be able to spot complete portions of metadata
  783. (and in kind-of the same way I need to be able to spot complete rows of data even when they span multiple packets.) I think data is really a bit different from the rest -
  784. you expect it to be continuous and you want the others to interrupt the flow.
  785. If continuation info was restricted to a "yes/no" (i.e. had to be continued on same node as started on) could have simple "Is there any continuation" bit. Others are sent in their
  786. own packets so are a little different. Does that make it harder to recover? Not sure that it does really (just means that the window at which a failure causes a problem starts earlier).
  787. However it may be an issue tying up slave thread for a while (and do we know when to untie it if the Roxie server abandons/restarts?)
  788. Perhaps it makes sense to pause at this point (with streaming disabled and with retry mechanism optional)
  789. */