udptrr.cpp 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #undef new
  14. #include <string>
  15. #include <map>
  16. #include <queue>
  17. #include "jthread.hpp"
  18. #include "jlog.hpp"
  19. #include "jisem.hpp"
  20. #include "jsocket.hpp"
  21. #include "udplib.hpp"
  22. #include "udptrr.hpp"
  23. #include "udptrs.hpp"
  24. #include "roxiemem.hpp"
  25. #include "roxie.hpp"
  26. #ifdef _WIN32
  27. #include <io.h>
  28. #include <winsock2.h>
  29. #else
  30. #include <sys/socket.h>
  31. #include <sys/time.h>
  32. #include <sys/resource.h>
  33. #endif
  34. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  35. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  36. #endif
  37. using roxiemem::DataBuffer;
  38. using roxiemem::IRowManager;
  39. unsigned udpRetryBusySenders = 0; // seems faster with 0 than 1 in my testing on small clusters and sustained throughput
  40. unsigned udpInlineCollationPacketLimit;
  41. bool udpInlineCollation = false;
  42. bool udpSendCompletedInData = false;
  43. class CReceiveManager : public CInterface, implements IReceiveManager
  44. {
  45. class ReceiveFlowManager : public Thread
  46. {
  47. private:
  48. CReceiveManager &parent;
  49. class UdpSenderEntry // one per node in the system
  50. {
  51. SpinLock resendInfoLock;
  52. unsigned lastSeen; // Highest sequence ever seen
  53. unsigned missingCount; // Number of values in missing table
  54. unsigned missingIndex; // Pointer to start of values in missing circular buffer
  55. unsigned missingTableSize;
  56. unsigned *missing;
  57. unsigned destNodeIndex;
  58. unsigned myNodeIndex;
  59. ISocket *flowSocket;
  60. public:
  61. unsigned nextIndex; // Used to form list of all senders that have outstanding requests
  62. UdpSenderEntry()
  63. {
  64. nextIndex = (unsigned) -1;
  65. flowSocket = NULL;
  66. lastSeen = 0;
  67. missingCount = 0;
  68. missingIndex = 0;
  69. missingTableSize = 0;
  70. missing = NULL;
  71. destNodeIndex = (unsigned) -1;
  72. myNodeIndex = (unsigned) -1;
  73. }
  74. ~UdpSenderEntry()
  75. {
  76. if (flowSocket)
  77. {
  78. flowSocket->close();
  79. flowSocket->Release();
  80. }
  81. delete [] missing;
  82. }
  83. void init(unsigned _destNodeIndex, unsigned _myNodeIndex, unsigned port, unsigned _missingTableSize)
  84. {
  85. assert(!flowSocket);
  86. destNodeIndex = _destNodeIndex;
  87. myNodeIndex = _myNodeIndex;
  88. SocketEndpoint ep(port, getNodeAddress(destNodeIndex));
  89. flowSocket = ISocket::udp_connect(ep);
  90. missingTableSize = _missingTableSize;
  91. missing = new unsigned[_missingTableSize];
  92. }
  93. void noteRequest(const UdpRequestToSendMsg &msg)
  94. {
  95. // MORE - if we never go idle, we never get to see these in udpSendCompletedInData mode. Is that a big deal?
  96. SpinBlock b(resendInfoLock);
  97. if (msg.lastSequenceAvailable < lastSeen)
  98. {
  99. // must have wrapped or restarted. Reinitialize my view of sender missing packets.
  100. // MORE - if a restart (rather than a wrap) should also clear out any collators still hanging on for a reply
  101. // Should I on a wrap too?
  102. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  103. DBGLOG("Resetting missing list as sender %u seems to have restarted or wrapped", msg.sourceNodeIndex);
  104. lastSeen = 0;
  105. missingCount = 0;
  106. missingIndex = 0;
  107. }
  108. else
  109. {
  110. // abandon hope of receiving any that sender says they no longer have
  111. while (missingCount && missing[missingIndex] < msg.firstSequenceAvailable)
  112. {
  113. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  114. DBGLOG("Abandoning missing message %u from sender %u - sender no longer has it", missing[missingIndex], msg.sourceNodeIndex);
  115. atomic_inc(&packetsAbandoned);
  116. missingCount--;
  117. missingIndex++;
  118. if (missingIndex == missingTableSize)
  119. missingIndex = 0;
  120. }
  121. }
  122. }
  123. void noteReceived(const UdpPacketHeader &msg)
  124. {
  125. // MORE - what happens when we wrap?
  126. // MORE - what happens when a sender restarts? Probably similar to wrap...
  127. // Receiver restart is ok
  128. SpinBlock b(resendInfoLock);
  129. unsigned thisSequence = msg.udpSequence;
  130. if (thisSequence > lastSeen)
  131. {
  132. lastSeen++;
  133. if (lastSeen != thisSequence)
  134. {
  135. // Everything between lastSeen and just received needs adding to missing table.
  136. if (thisSequence - lastSeen > missingTableSize)
  137. {
  138. if (lastSeen==1)
  139. {
  140. // assume it's receiver restart and just ignore the missing values
  141. DBGLOG("Assuming receiver restart (lastseen==%u, thisSequence==%u, index==%u)", lastSeen, thisSequence, msg.nodeIndex);
  142. lastSeen = thisSequence;
  143. return;
  144. }
  145. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  146. DBGLOG("Big gap in UDP packet sequence (%u-%u) from node %u - some ignored!", lastSeen, thisSequence-1, msg.nodeIndex);
  147. lastSeen = thisSequence - missingTableSize; // avoids wasting CPU cycles writing more values than will fit after long interruption, receiver restart, or on corrupt packet
  148. }
  149. while (lastSeen < thisSequence)
  150. {
  151. if (checkTraceLevel(TRACE_RETRY_DATA, 3))
  152. DBGLOG("Adding packet sequence %u to missing list for node %u", lastSeen, msg.nodeIndex);
  153. missing[(missingIndex + missingCount) % missingTableSize] = lastSeen;
  154. if (missingCount < missingTableSize)
  155. missingCount++;
  156. else
  157. {
  158. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  159. DBGLOG("missing table overflow - packets will be lost");
  160. }
  161. lastSeen++;
  162. }
  163. }
  164. }
  165. else
  166. {
  167. if (checkTraceLevel(TRACE_RETRY_DATA, 2))
  168. DBGLOG("Out-of-sequence packet received %u", thisSequence);
  169. // Hopefully filling in a missing value, and USUALLY in sequence so no point binchopping
  170. for (unsigned i = 0; i < missingCount; i++)
  171. {
  172. unsigned idx = (missingIndex + i) % missingTableSize;
  173. if (missing[idx] == thisSequence)
  174. {
  175. if (checkTraceLevel(TRACE_RETRY_DATA, 2))
  176. DBGLOG("Formerly missing packet sequence received %u", thisSequence);
  177. while (i)
  178. {
  179. // copy up ones that are still missing
  180. unsigned idx2 = idx ? idx-1 : missingTableSize-1;
  181. missing[idx] = missing[idx2];
  182. idx = idx2;
  183. i--;
  184. }
  185. missingIndex++;
  186. missingCount--;
  187. break;
  188. }
  189. else if (missing[idx] < thisSequence)
  190. {
  191. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  192. DBGLOG("Unexpected packet sequence received %u - ignored", thisSequence);
  193. break;
  194. }
  195. else // (missing[idx] > thisSequence)
  196. {
  197. if (!i)
  198. {
  199. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  200. DBGLOG("Unrequested missing packet received %u - ignored", thisSequence);
  201. break;
  202. }
  203. }
  204. }
  205. }
  206. }
  207. void requestToSend(unsigned maxTransfer)
  208. {
  209. try
  210. {
  211. UdpPermitToSendMsg msg;
  212. {
  213. SpinBlock block(resendInfoLock);
  214. msg.hdr.length = sizeof(UdpPermitToSendMsg::MsgHeader) + missingCount*sizeof(msg.missingSequences[0]);
  215. msg.hdr.cmd = flow_t::ok_to_send;
  216. msg.hdr.destNodeIndex = myNodeIndex;
  217. msg.hdr.max_data = maxTransfer;
  218. msg.hdr.lastSequenceSeen = lastSeen;
  219. msg.hdr.missingCount = missingCount;
  220. for (unsigned i = 0; i < missingCount; i++)
  221. {
  222. unsigned idx = (missingIndex + i) % missingTableSize;
  223. msg.missingSequences[i] = missing[idx];
  224. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  225. DBGLOG("Requesting resend of packet %d", missing[idx]);
  226. }
  227. #ifdef CRC_MESSAGES
  228. msg.hdr.crc = msg.calcCRC();
  229. #endif
  230. }
  231. if (checkTraceLevel(TRACE_RETRY_DATA, 5))
  232. {
  233. StringBuffer s;
  234. DBGLOG("requestToSend %s", msg.toString(s).str());
  235. }
  236. flowSocket->write(&msg, msg.hdr.length);
  237. }
  238. catch(IException *e)
  239. {
  240. StringBuffer s;
  241. DBGLOG("UdpReceiver: send_acknowledge failed node=%u %s", destNodeIndex, e->errorMessage(s).str());
  242. e->Release();
  243. }
  244. }
  245. } *sendersTable;
  246. unsigned maxSenders;
  247. unsigned firstRequest;
  248. unsigned lastRequest;
  249. unsigned maxSlotsPerSender;
  250. bool running;
  251. SpinLock receiveFlowLock; // Protecting the currentTransfer variable and the chain of active transfers
  252. unsigned currentTransfer;
  253. Semaphore requestPending;
  254. Semaphore transferComplete;
  255. public:
  256. ReceiveFlowManager(CReceiveManager &_parent, unsigned _maxSenders, unsigned _maxSlotsPerSender)
  257. : Thread("UdpLib::ReceiveFlowManager"), parent(_parent)
  258. {
  259. firstRequest = (unsigned) -1;
  260. lastRequest = (unsigned) -1;
  261. currentTransfer = (unsigned) -1;
  262. running = false;
  263. maxSenders = _maxSenders;
  264. maxSlotsPerSender = _maxSlotsPerSender;
  265. sendersTable = new UdpSenderEntry[maxSenders];
  266. unsigned missingTableSize = maxSlotsPerSender;
  267. if (missingTableSize > MAX_RESEND_TABLE_SIZE)
  268. missingTableSize = MAX_RESEND_TABLE_SIZE;
  269. for (unsigned i = 0; i < maxSenders; i++)
  270. {
  271. sendersTable[i].init(i, parent.myNodeIndex, parent.send_flow_port, missingTableSize);
  272. }
  273. }
  274. ~ReceiveFlowManager()
  275. {
  276. running = false;
  277. requestPending.signal();
  278. transferComplete.signal();
  279. join();
  280. delete [] sendersTable;
  281. }
  282. unsigned send_acknowledge()
  283. {
  284. int timeout = 1;
  285. unsigned max_transfer;
  286. UdpSenderEntry *sender = NULL;
  287. {
  288. SpinBlock b(receiveFlowLock);
  289. if (firstRequest != (unsigned) -1)
  290. {
  291. assert(firstRequest < maxSenders);
  292. //find first non-busy sender, and move it to front of sendersTable request chain
  293. int retry = udpRetryBusySenders;
  294. unsigned finger = firstRequest;
  295. unsigned prev = -1;
  296. loop
  297. {
  298. if (udpSnifferEnabled && parent.sniffer->is_busy(finger))
  299. {
  300. prev = finger;
  301. finger = sendersTable[finger].nextIndex;
  302. if (finger==(unsigned)-1)
  303. {
  304. if (retry--)
  305. {
  306. if (udpTraceLevel > 4)
  307. DBGLOG("UdpReceive: All senders busy");
  308. MilliSleep(1);
  309. finger = firstRequest;
  310. prev = -1;
  311. }
  312. else
  313. break; // give up and use first anyway
  314. }
  315. }
  316. else
  317. {
  318. if (finger != firstRequest)
  319. {
  320. if (finger == lastRequest)
  321. lastRequest = prev;
  322. assert(prev != -1);
  323. sendersTable[prev].nextIndex = sendersTable[finger].nextIndex;
  324. sendersTable[finger].nextIndex = firstRequest;
  325. firstRequest = finger;
  326. }
  327. break;
  328. }
  329. }
  330. if (udpInlineCollation)
  331. max_transfer = udpInlineCollationPacketLimit;
  332. else
  333. max_transfer = parent.input_queue->free_slots();
  334. if (max_transfer > maxSlotsPerSender)
  335. max_transfer = maxSlotsPerSender;
  336. timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
  337. currentTransfer = firstRequest;
  338. sender = &sendersTable[firstRequest];
  339. //indicate not in queue (MORE - what if wanted to send > allowing?? Do we know how much it wanted to send?)
  340. if (firstRequest==lastRequest)
  341. lastRequest = (unsigned) -1;
  342. firstRequest = sender->nextIndex;
  343. sender->nextIndex = (unsigned) -1;
  344. }
  345. }
  346. if (sender)
  347. sender->requestToSend(max_transfer);
  348. return timeout;
  349. }
  350. void request(const UdpRequestToSendMsg &msg)
  351. {
  352. unsigned index = msg.sourceNodeIndex;
  353. assertex(index < maxSenders);
  354. UdpSenderEntry &sender = sendersTable[index];
  355. {
  356. SpinBlock b(receiveFlowLock);
  357. if ((lastRequest == index) || (sender.nextIndex != (unsigned) -1))
  358. {
  359. DBGLOG("UdpReceiver: received duplicate request_to_send msg from node=%d", index);
  360. return;
  361. }
  362. // Chain it onto list
  363. if (firstRequest != (unsigned) -1)
  364. sendersTable[lastRequest].nextIndex = index;
  365. else
  366. firstRequest = index;
  367. lastRequest = index;
  368. }
  369. sender.noteRequest(msg);
  370. requestPending.signal();
  371. }
  372. void requestMore(unsigned index) // simpler version of request since we don't get the extra info
  373. {
  374. assertex(index < maxSenders);
  375. UdpSenderEntry &sender = sendersTable[index];
  376. {
  377. SpinBlock b(receiveFlowLock);
  378. if ((lastRequest == index) || (sender.nextIndex != (unsigned) -1))
  379. {
  380. DBGLOG("UdpReceiver: received duplicate request_to_send msg from node=%d", index);
  381. return;
  382. }
  383. // Chain it onto list
  384. if (firstRequest != (unsigned) -1)
  385. sendersTable[lastRequest].nextIndex = index;
  386. else
  387. firstRequest = index;
  388. lastRequest = index;
  389. }
  390. requestPending.signal();
  391. }
  392. void completed(unsigned index)
  393. {
  394. assert(index < maxSenders);
  395. bool isCurrent;
  396. {
  397. SpinBlock b(receiveFlowLock);
  398. isCurrent = (index == currentTransfer);
  399. }
  400. if (isCurrent)
  401. transferComplete.signal();
  402. else
  403. DBGLOG("UdpReceiver: completed msg from node %u is not for current transfer (%u) ", index, currentTransfer);
  404. }
  405. inline void noteReceived(const UdpPacketHeader &msg)
  406. {
  407. sendersTable[msg.nodeIndex].noteReceived(msg);
  408. }
  409. virtual void start()
  410. {
  411. running = true;
  412. Thread::start();
  413. }
  414. virtual int run()
  415. {
  416. DBGLOG("UdpReceiver: ReceiveFlowManager started");
  417. while (running)
  418. {
  419. requestPending.wait();
  420. unsigned maxTime = send_acknowledge();
  421. if (!transferComplete.wait(maxTime) && udpTraceLevel > 0)
  422. {
  423. DBGLOG("UdpReceiver: transfer timed out after %d ms from node=%u", maxTime, currentTransfer);
  424. // MORE - a timeout here means everything stalled... look into when it can happen!
  425. }
  426. }
  427. return 0;
  428. }
  429. };
  430. class receive_sniffer : public Thread
  431. {
  432. struct SnifferEntry {
  433. time_t timeStamp;
  434. char busy;
  435. SnifferEntry() { timeStamp = 0; busy = 0; }
  436. } *snifferTable;
  437. ISocket *sniffer_socket;
  438. unsigned snifferPort;
  439. IpAddress snifferIP;
  440. CReceiveManager &parent;
  441. bool running;
  442. inline void update(unsigned index, char busy)
  443. {
  444. if (udpTraceLevel > 5)
  445. DBGLOG("UdpReceive: sniffer sets is_busy[%d} to %d", index, busy);
  446. snifferTable[index].busy = busy;
  447. if (busy)
  448. time(&snifferTable[index].timeStamp);
  449. }
  450. public:
  451. receive_sniffer(CReceiveManager &_parent, unsigned _snifferPort, const IpAddress &_snifferIP, unsigned numNodes)
  452. : Thread("udplib::receive_sniffer"), parent(_parent), snifferPort(_snifferPort), snifferIP(_snifferIP), running(false)
  453. {
  454. snifferTable = new SnifferEntry[numNodes];
  455. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  456. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0) {
  457. if (!enableSocketMaxSetting)
  458. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  459. check_set_max_socket_read_buffer(udpFlowSocketsSize);
  460. }
  461. sniffer_socket->set_receive_buffer_size(udpFlowSocketsSize);
  462. if (udpTraceLevel)
  463. {
  464. StringBuffer ipStr;
  465. snifferIP.getIpText(ipStr);
  466. DBGLOG("UdpReceiver: receive_sniffer port open %s:%i", ipStr.str(), snifferPort);
  467. }
  468. }
  469. ~receive_sniffer()
  470. {
  471. running = false;
  472. if (sniffer_socket) sniffer_socket->close();
  473. join();
  474. if (sniffer_socket) sniffer_socket->Release();
  475. delete [] snifferTable;
  476. }
  477. bool is_busy(unsigned index)
  478. {
  479. if (snifferTable[index].busy)
  480. {
  481. time_t now;
  482. time(&now);
  483. return (now - snifferTable[index].timeStamp) < 10;
  484. }
  485. else
  486. return false;
  487. }
  488. virtual int run()
  489. {
  490. DBGLOG("UdpReceiver: sniffer started");
  491. while (running)
  492. {
  493. try
  494. {
  495. unsigned int res;
  496. sniff_msg msg;
  497. sniffer_socket->read(&msg, 1, sizeof(msg), res, 5);
  498. update(msg.nodeIndex, msg.cmd == flow_t::busy);
  499. }
  500. catch (IException *e)
  501. {
  502. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  503. {
  504. StringBuffer s;
  505. DBGLOG("UdpReceiver: receive_sniffer::run read failed %s", e->errorMessage(s).str());
  506. MilliSleep(1000);
  507. }
  508. e->Release();
  509. }
  510. catch (...)
  511. {
  512. DBGLOG("UdpReceiver: receive_sniffer::run unknown exception port %u", parent.data_port);
  513. if (sniffer_socket) {
  514. sniffer_socket->Release();
  515. sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
  516. }
  517. MilliSleep(1000);
  518. }
  519. }
  520. return 0;
  521. }
  522. virtual void start()
  523. {
  524. if (udpSnifferEnabled)
  525. {
  526. running = true;
  527. Thread::start();
  528. }
  529. }
  530. };
  531. class receive_receive_flow : public Thread
  532. {
  533. Owned<ISocket> flow_socket;
  534. int flow_port;
  535. CReceiveManager &parent;
  536. bool running;
  537. public:
  538. receive_receive_flow(CReceiveManager &_parent, int flow_p) : Thread("UdpLib::receive_receive_flow"), parent(_parent)
  539. {
  540. flow_port = flow_p;
  541. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  542. {
  543. if (!enableSocketMaxSetting)
  544. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  545. check_set_max_socket_read_buffer(udpFlowSocketsSize);
  546. }
  547. flow_socket.setown(ISocket::udp_create(flow_port));
  548. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  549. size32_t actualSize = flow_socket->get_receive_buffer_size();
  550. DBGLOG("UdpReceiver: rcv_flow_socket created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
  551. }
  552. ~receive_receive_flow()
  553. {
  554. running = false;
  555. if (flow_socket)
  556. flow_socket->close();
  557. join();
  558. }
  559. virtual void start()
  560. {
  561. running = true;
  562. Thread::start();
  563. }
  564. virtual int run()
  565. {
  566. DBGLOG("UdpReceiver: receive_receive_flow started");
  567. #ifdef __linux__
  568. setLinuxThreadPriority(3);
  569. #else
  570. adjustPriority(1);
  571. #endif
  572. UdpRequestToSendMsg f;
  573. while (running)
  574. {
  575. try
  576. {
  577. int l = sizeof(f);
  578. unsigned int res ;
  579. flow_socket->read(&f, 1, l, res, 5);
  580. switch (f.cmd)
  581. {
  582. case flow_t::request_to_send:
  583. if (udpTraceLevel > 5)
  584. DBGLOG("UdpReceiver: received request_to_send msg from node=%u", f.sourceNodeIndex);
  585. parent.manager->request(f);
  586. break;
  587. case flow_t::send_completed:
  588. if (udpTraceLevel > 5)
  589. DBGLOG("UdpReceiver: received send_completed msg from node=%u", f.sourceNodeIndex);
  590. parent.manager->completed(f.sourceNodeIndex);
  591. break;
  592. case flow_t::request_to_send_more:
  593. if (udpTraceLevel > 5)
  594. DBGLOG("UdpReceiver: received request_to_send_more msg from node=%u", f.sourceNodeIndex);
  595. parent.manager->completed(f.sourceNodeIndex);
  596. parent.manager->request(f);
  597. break;
  598. default:
  599. DBGLOG("UdpReceiver: reveived unrecognized flow control message cmd=%i", f.cmd);
  600. }
  601. }
  602. catch (IException *e)
  603. {
  604. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  605. {
  606. StringBuffer s;
  607. DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
  608. }
  609. e->Release();
  610. }
  611. catch (...) {
  612. DBGLOG("UdpReceiver: receive_receive_flow::run unknown exception");
  613. MilliSleep(15);
  614. }
  615. }
  616. return 0;
  617. }
  618. };
  619. class receive_data : public Thread
  620. {
  621. CReceiveManager &parent;
  622. ISocket *receive_socket;
  623. bool running;
  624. Semaphore started;
  625. public:
  626. receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
  627. {
  628. unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD;
  629. if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
  630. if (check_max_socket_read_buffer(ip_buffer) < 0)
  631. {
  632. if (!enableSocketMaxSetting)
  633. throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
  634. check_set_max_socket_read_buffer(ip_buffer);
  635. }
  636. receive_socket = ISocket::udp_create(parent.data_port);
  637. receive_socket->set_receive_buffer_size(ip_buffer);
  638. size32_t actualSize = receive_socket->get_receive_buffer_size();
  639. DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
  640. }
  641. virtual void start()
  642. {
  643. running = true;
  644. Thread::start();
  645. started.wait();
  646. }
  647. ~receive_data()
  648. {
  649. running = false;
  650. if (receive_socket)
  651. receive_socket->close();
  652. join();
  653. ::Release(receive_socket);
  654. }
  655. virtual int run()
  656. {
  657. DBGLOG("UdpReceiver: receive_data started");
  658. #ifdef __linux__
  659. setLinuxThreadPriority(4);
  660. #else
  661. adjustPriority(2);
  662. #endif
  663. DataBuffer *b = NULL;
  664. started.signal();
  665. while (running)
  666. {
  667. try
  668. {
  669. unsigned int res;
  670. b = bufferManager->allocate();
  671. receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
  672. UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
  673. unsigned flowBits = hdr.udpSequence & UDP_SEQUENCE_BITS;
  674. hdr.udpSequence &= ~UDP_SEQUENCE_BITS;
  675. if (flowBits & UDP_SEQUENCE_COMPLETE)
  676. {
  677. parent.manager->completed(hdr.nodeIndex);
  678. }
  679. if (flowBits & UDP_SEQUENCE_MORE)
  680. {
  681. parent.manager->requestMore(hdr.nodeIndex); // MORE - what about the rest of request info?
  682. }
  683. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  684. DBGLOG("UdpReceiver: %u bytes received, node=%u", res, hdr.nodeIndex);
  685. if (udpInlineCollation)
  686. parent.collatePacket(b);
  687. else
  688. parent.input_queue->pushOwn(b);
  689. b = NULL;
  690. }
  691. catch (IException *e)
  692. {
  693. ::Release(b);
  694. b = NULL;
  695. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  696. {
  697. StringBuffer s;
  698. DBGLOG("UdpReceiver: receive_data::run read failed port=%u - Exp: %s", parent.data_port, e->errorMessage(s).str());
  699. MilliSleep(1000); // Give a chance for mem free
  700. }
  701. e->Release();
  702. }
  703. catch (...)
  704. {
  705. ::Release(b);
  706. b = NULL;
  707. DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
  708. MilliSleep(1000);
  709. }
  710. }
  711. ::Release(b);
  712. return 0;
  713. }
  714. };
  715. class CPacketCollator : public Thread
  716. {
  717. CReceiveManager &parent;
  718. public:
  719. CPacketCollator(CReceiveManager &_parent) : Thread("CPacketCollator"), parent(_parent) {}
  720. virtual int run()
  721. {
  722. DBGLOG("UdpReceiver: CPacketCollator::run");
  723. parent.collatePackets();
  724. return 0;
  725. }
  726. } collatorThread;
  727. friend class receive_receive_flow;
  728. friend class receive_send_flow;
  729. friend class receive_data;
  730. friend class ReceiveFlowManager;
  731. friend class receive_sniffer;
  732. queue_t *input_queue;
  733. int input_queue_size;
  734. receive_receive_flow *receive_flow;
  735. receive_data *data;
  736. ReceiveFlowManager *manager;
  737. receive_sniffer *sniffer;
  738. unsigned myNodeIndex;
  739. int send_flow_port;
  740. int receive_flow_port;
  741. int data_port;
  742. bool running;
  743. typedef std::map<ruid_t, IMessageCollator*> uid_map;
  744. Linked<IMessageCollator> defaultMessageCollator;
  745. uid_map collators; // MORE - more sensible to use a jlib mapping I would have thought
  746. SpinLock collatorsLock; // protects access to collators map and defaultMessageCollator
  747. public:
  748. IMPLEMENT_IINTERFACE;
  749. CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client, unsigned _myNodeIndex)
  750. : collatorThread(*this)
  751. {
  752. #ifndef _WIN32
  753. setpriority(PRIO_PROCESS, 0, -15);
  754. #endif
  755. myNodeIndex = _myNodeIndex;
  756. receive_flow_port = server_flow_port;
  757. send_flow_port = client_flow_port;
  758. data_port = d_port;
  759. input_queue_size = queue_size;
  760. input_queue = new queue_t(queue_size);
  761. data = new receive_data(*this);
  762. manager = new ReceiveFlowManager(*this, getNumNodes(), m_slot_pr_client);
  763. receive_flow = new receive_receive_flow(*this, server_flow_port);
  764. if (udpSnifferEnabled)
  765. sniffer = new receive_sniffer(*this, snif_port, multicast_ip, getNumNodes());
  766. running = true;
  767. collatorThread.start();
  768. data->start();
  769. manager->start();
  770. receive_flow->start();
  771. if (udpSnifferEnabled)
  772. sniffer->start();
  773. MilliSleep(15);
  774. }
  775. ~CReceiveManager()
  776. {
  777. running = false;
  778. input_queue->interrupt();
  779. collatorThread.join();
  780. delete data;
  781. delete receive_flow;
  782. delete manager;
  783. delete sniffer;
  784. delete input_queue;
  785. }
  786. virtual void detachCollator(const IMessageCollator *msgColl)
  787. {
  788. ruid_t ruid = msgColl->queryRUID();
  789. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
  790. SpinBlock b(collatorsLock);
  791. collators.erase(ruid);
  792. msgColl->Release();
  793. }
  794. virtual void setDefaultCollator(IMessageCollator *msgColl)
  795. {
  796. if (udpTraceLevel>=5) DBGLOG("UdpReceiver: setDefaultCollator");
  797. SpinBlock b(collatorsLock);
  798. defaultMessageCollator.set(msgColl);
  799. }
  800. void collatePackets()
  801. {
  802. unsigned lastDiscardedMsgSeq = 0;
  803. while(running)
  804. {
  805. DataBuffer *dataBuff = input_queue->pop();
  806. collatePacket(dataBuff);
  807. }
  808. }
  809. void collatePacket(DataBuffer *dataBuff)
  810. {
  811. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  812. manager->noteReceived(*pktHdr);
  813. if (udpTraceLevel >= 4)
  814. DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%u",
  815. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->nodeIndex);
  816. Linked <IMessageCollator> msgColl;
  817. {
  818. SpinBlock b(collatorsLock);
  819. try
  820. {
  821. msgColl.set(collators[pktHdr->ruid]);
  822. if (!msgColl)
  823. {
  824. if (udpTraceLevel)
  825. DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex);
  826. msgColl.set(defaultMessageCollator); // MORE - if we get a header, we can send an abort.
  827. }
  828. }
  829. catch (IException *E)
  830. {
  831. EXCLOG(E);
  832. E->Release();
  833. }
  834. catch (...)
  835. {
  836. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  837. EXCLOG(E);
  838. E->Release();
  839. }
  840. }
  841. if (msgColl)
  842. {
  843. if (msgColl->add_package(dataBuff))
  844. {
  845. dataBuff = 0;
  846. }
  847. }
  848. else
  849. {
  850. // MORE - tell the slave to stop sending?
  851. // if (udpTraceLevel > 1 && lastDiscardedMsgSeq != pktHdr->msgSeq)
  852. // DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - discarding packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex);
  853. // lastDiscardedMsgSeq = pktHdr->msgSeq;
  854. }
  855. if (dataBuff)
  856. {
  857. dataBuff->Release();
  858. atomic_inc(&unwantedDiscarded);
  859. }
  860. }
  861. virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
  862. {
  863. IMessageCollator *msgColl = createCMessageCollator(rowManager, ruid);
  864. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
  865. SpinBlock b(collatorsLock);
  866. collators[ruid] = msgColl;
  867. msgColl->Link();
  868. return msgColl;
  869. }
  870. };
  871. IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
  872. int sniffer_port, const IpAddress &sniffer_multicast_ip,
  873. int udpQueueSize, unsigned maxSlotsPerSender,
  874. unsigned myNodeIndex)
  875. {
  876. assertex (maxSlotsPerSender <= udpQueueSize);
  877. return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender, myNodeIndex);
  878. }
  879. /*
  880. Thoughts on flow control / streaming:
  881. 1. The "continuation packet" mechanism does have some advantages
  882. - easy recovery from slave failures
  883. - slave recovers easily from Roxie server failures
  884. - flow control is simple (but is it effective?)
  885. 2. Abandoning continuation packet in favour of streaming would give us the following issues:
  886. - would need some flow control to stop getting ahead of a Roxie server that consumed slowly
  887. - flow control is non trivial if you want to avoid tying up a slave thread and want slave to be able to recover from Roxie server failure
  888. - Need to work out how to do GSS - the nextGE info needs to be passed back in the flow control?
  889. - can't easily recover from slave failures if you already started processing
  890. - unless you assume that the results from slave are always deterministic and can retry and skip N
  891. - potentially ties up a slave thread for a while
  892. - do we need to have a larger thread pool but limit how many actually active?
  893. 3. Order of work
  894. - Just adding streaming while ignoring flow control and continuation stuff (i.e. we still stop for permission to continue periodically)
  895. - Shouldn't make anything any _worse_ ...
  896. - except that won't be able to recover from a slave dying mid-stream (at least not without some considerable effort)
  897. - what will happen then?
  898. - May also break server-side caching (that no-one has used AFAIK). Maybe restrict to nohits as we change....
  899. - Add some flow control
  900. - would prevent slave getting too far ahead in cases that are inadequately flow-controlled today
  901. - shouldn't make anything any worse...
  902. - Think about removing continuation mechanism from some cases
  903. Per Gavin, streaming would definitely help for the lowest frequency term. It may help for the others as well if it avoided any significant start up costs - e.g., opening the indexes,
  904. creating the segment monitors, creating the various cursors, and serialising the context (especially because there are likely to be multiple cursors).
  905. To add streaming:
  906. - Need to check for meta availability other than when first received
  907. - when ?
  908. - Need to cope with a getNext() blocking without it causing issues
  909. - perhaps should recode getNext() of variable-size rows first?
  910. More questions:
  911. - Can we afford the memory for the resend info?
  912. - Save maxPacketsPerSender per sender ?
  913. - are we really handling restart and sequence wraparound correctly?
  914. - what about server-side caching? Makes it hard
  915. - but maybe we should only cache tiny replies anyway....
  916. Problems found while testing implemetnation:
  917. - the unpacker cursor read code is crap
  918. - there is a potential to deadlock when need to make a callback slave->server during a streamed result (indexread5 illustrates)
  919. - resolution callback code doesn't really need to be query specific - could go to the default handler
  920. - but other callbacks - ALIVE, EXCEPTION, and debugger are not so clear
  921. - It's not at all clear where to move the code for processing metadata
  922. - callback paradigm would solve both - but it has to be on a client thread (e.g. from within call to next()).
  923. The following are used in "pseudo callback" mode:
  924. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  925. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  926. #define ROXIE_PING 0x3ffffff9u
  927. - goes to own handler anyway
  928. #define ROXIE_TRACEINFO 0x3ffffffau
  929. - could go in meta? Not time critical. Could all go to single handler? (a bit hard since we want to intercept for caller...)
  930. #define ROXIE_FILECALLBACK 0x3ffffffbu
  931. - could go to single handler
  932. #define ROXIE_ALIVE 0x3ffffffcu
  933. - currently getting delayed a bit too much potentially if downstream processing is slow? Do I even need it if streaming?
  934. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  935. - could go in metadata of standard response
  936. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  937. - ditto
  938. #define ROXIE_EXCEPTION 0x3fffffffu
  939. - ditto
  940. And the continuation metadata.
  941. What if EVERYTHING was a callback? - here's an exception... here's some more rows... here's some tracing... here's some continuation metadata
  942. Somewhere sometime I need to marshall from one thread to another though (maybe more than once unless I can guarantee callback is always very fast)
  943. OR (is it the same) everything is metadata ? Metadata can contain any of the above information (apart from rows - or maybe they are just another type)
  944. If I can't deal quickly with a packet of information, I queue it up? Spanning complicates things though. I need to be able to spot complete portions of metadata
  945. (and in kind-of the same way I need to be able to spot complete rows of data even when they span multiple packets.) I think data is really a bit different from the rest -
  946. you expect it to be continuous and you want the others to interrupt the flow.
  947. If continuation info was restricted to a "yes/no" (i.e. had to be continued on same node as started on) could have simple "Is there any continuation" bit. Others are sent in their
  948. own packets so are a little different. Does that make it harder to recover? Not sure that it does really (just means that the window at which a failure causes a problem starts earlier).
  949. However it may be an issue tying up slave thread for a while (and do we know when to untie it if the Roxie server abandons/restarts?)
  950. Perhaps it makes sense to pause at this point (with streaming disabled and with retry mechanism optional)
  951. */