udptrs.cpp 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "jsocket.hpp"
  17. #include "jlog.hpp"
  18. #include "roxie.hpp"
  19. #ifdef _WIN32
  20. #include <winsock.h>
  21. #else
  22. #include <sys/socket.h>
  23. #include <sys/time.h>
  24. #include <sys/resource.h>
  25. #endif
  26. #include <math.h>
  27. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  28. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  29. #endif
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
  33. bool udpSnifferEnabled = true;
  34. #ifdef _DEBUG
  35. //#define _SIMULATE_LOST_PACKETS
  36. #endif
  37. using roxiemem::DataBuffer;
  38. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  39. class UdpReceiverEntry
  40. {
  41. queue_t *output_queue;
  42. bool initialized;
  43. public:
  44. ISocket *send_flow_socket;
  45. ISocket *data_socket;
  46. unsigned numQueues;
  47. int current_q;
  48. int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
  49. int *maxPktsPerQ; // to minimise power function re-calc for evey packet
  50. // MORE - consider where we need critsecs in here!
  51. void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
  52. {
  53. UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
  54. try
  55. {
  56. send_flow_socket->write(&msg, msg.length);
  57. }
  58. catch(IException *e)
  59. {
  60. StringBuffer s;
  61. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  62. e->Release();
  63. }
  64. catch (...)
  65. {
  66. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  67. }
  68. }
  69. unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
  70. {
  71. moreRequested = false;
  72. maxPackets = permit.max_data;
  73. PointerArray toSend;
  74. unsigned totalSent = 0;
  75. while (toSend.length() < maxPackets && dataQueued())
  76. {
  77. DataBuffer *buffer = popQueuedData();
  78. if (buffer) // Aborted slave queries leave NULL records on queue
  79. {
  80. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  81. toSend.append(buffer);
  82. totalSent += header->length;
  83. #ifdef __linux__
  84. if (isLocal && (totalSent> 100000))
  85. break;
  86. #endif
  87. }
  88. }
  89. maxPackets = toSend.length();
  90. for (unsigned idx = 0; idx < maxPackets; idx++)
  91. {
  92. DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
  93. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  94. unsigned length = header->length;
  95. if (bucket)
  96. {
  97. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  98. bucket->wait((length / 1024)+1);
  99. }
  100. if (udpSendCompletedInData && idx == maxPackets-1)
  101. header->pktSeq |= UDP_PACKET_ENDBURST;
  102. try
  103. {
  104. data_socket->write(buffer->data, length);
  105. }
  106. catch(IException *e)
  107. {
  108. StringBuffer s;
  109. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  110. e->Release();
  111. }
  112. catch(...)
  113. {
  114. DBGLOG("UdpSender: write exception - unknown exception");
  115. }
  116. ::Release(buffer);
  117. }
  118. return totalSent;
  119. }
  120. bool dataQueued()
  121. {
  122. for (unsigned i = 0; i < numQueues; i++)
  123. {
  124. if (!output_queue[i].empty())
  125. return true;
  126. }
  127. return false;
  128. }
  129. bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  130. {
  131. for (unsigned i = 0; i < numQueues; i++)
  132. {
  133. if (output_queue[i].dataQueued(key, pkCmpFn))
  134. return true;
  135. }
  136. return false;
  137. }
  138. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  139. {
  140. bool anyRemoved = false;
  141. for (unsigned i = 0; i < numQueues; i++)
  142. {
  143. if (output_queue[i].removeData(key, pkCmpFn))
  144. anyRemoved = true;
  145. }
  146. return anyRemoved;
  147. }
  148. inline void pushData(unsigned queue, DataBuffer *buffer)
  149. {
  150. output_queue[queue].pushOwn(buffer);
  151. }
  152. DataBuffer *popQueuedData()
  153. {
  154. DataBuffer *buffer;
  155. while (1)
  156. {
  157. for (unsigned i = 0; i < numQueues; i++)
  158. {
  159. if (udpOutQsPriority)
  160. {
  161. if (output_queue[current_q].empty())
  162. {
  163. if (udpTraceLevel >= 5)
  164. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  165. currentQNumPkts = 0;
  166. current_q = (current_q + 1) % numQueues;
  167. }
  168. else
  169. {
  170. buffer = output_queue[current_q].pop();
  171. currentQNumPkts++;
  172. if (udpTraceLevel >= 5)
  173. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  174. if (currentQNumPkts >= maxPktsPerQ[current_q])
  175. {
  176. currentQNumPkts = 0;
  177. current_q = (current_q + 1) % numQueues;
  178. }
  179. return buffer;
  180. }
  181. }
  182. else
  183. {
  184. current_q = (current_q + 1) % numQueues;
  185. if (!output_queue[current_q].empty())
  186. {
  187. return output_queue[current_q].pop();
  188. }
  189. }
  190. }
  191. MilliSleep(10);
  192. DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
  193. }
  194. }
  195. UdpReceiverEntry()
  196. {
  197. send_flow_socket = data_socket = NULL;
  198. numQueues = 0;
  199. current_q = 0;
  200. initialized = false;
  201. output_queue = 0;
  202. currentQNumPkts = 0;
  203. maxPktsPerQ = 0;
  204. }
  205. void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
  206. {
  207. assert(!initialized);
  208. numQueues = _numQueues;
  209. const IpAddress &ip = getNodeAddress(destNodeIndex);
  210. if (!ip.isNull())
  211. {
  212. try
  213. {
  214. SocketEndpoint sendFlowEp(sendFlowPort, ip);
  215. SocketEndpoint dataEp(dataPort, ip);
  216. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  217. data_socket = ISocket::udp_connect(dataEp);
  218. if (isLocal)
  219. {
  220. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  221. if (udpTraceLevel > 0)
  222. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  223. }
  224. }
  225. catch(IException *e)
  226. {
  227. StringBuffer error, ipstr;
  228. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  229. throw;
  230. }
  231. catch(...)
  232. {
  233. StringBuffer ipstr;
  234. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  235. throw;
  236. }
  237. output_queue = new queue_t[numQueues];
  238. maxPktsPerQ = new int[numQueues];
  239. for (unsigned j = 0; j < numQueues; j++)
  240. {
  241. output_queue[j].set_queue_size(queueSize);
  242. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  243. }
  244. initialized = true;
  245. if (udpTraceLevel > 0)
  246. {
  247. StringBuffer ipStr;
  248. DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
  249. }
  250. }
  251. }
  252. ~UdpReceiverEntry()
  253. {
  254. if (send_flow_socket) send_flow_socket->Release();
  255. if (data_socket) data_socket->Release();
  256. if (output_queue) delete [] output_queue;
  257. if (maxPktsPerQ) delete [] maxPktsPerQ;
  258. }
  259. };
  260. class CSendManager : implements ISendManager, public CInterface
  261. {
  262. friend class send_send_flow;
  263. class StartedThread : public Thread
  264. {
  265. private:
  266. Semaphore started;
  267. virtual int run()
  268. {
  269. started.signal();
  270. return doRun();
  271. }
  272. protected:
  273. bool running;
  274. public:
  275. StartedThread(const char *name) : Thread(name)
  276. {
  277. running = false;
  278. }
  279. ~StartedThread()
  280. {
  281. running = false;
  282. join();
  283. }
  284. virtual void start()
  285. {
  286. running = true;
  287. Thread::start();
  288. started.wait();
  289. }
  290. virtual int doRun() = 0;
  291. };
  292. class send_send_flow : public StartedThread
  293. {
  294. /*
  295. I don't like this code much at all
  296. Looping round all every time status of any changes seems like a bad thing especially as scale goes up
  297. Even though these look like a bitmap they are not used as such presently
  298. - as a result, if data_added() is called while state is completed, we lose the request I think
  299. - probably get away with it because of the dataqueued check
  300. doRun() uses state bits without protection
  301. A count of number pending for each might be better than just a flag
  302. Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
  303. - normally MANY in pending (and order is interesting)
  304. - normally few in any other state (only 1 if thread keeping up), order not really very interesting
  305. - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
  306. - in particular don't lock while processing the chain
  307. - Never need to be in >1 chain
  308. msTick() probably better than time() for detecting timeouts
  309. */
  310. enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
  311. unsigned target_count;
  312. char *state;
  313. unsigned char *timeouts; // Number of consecutive timeouts
  314. unsigned *request_time;
  315. CriticalSection cr;
  316. Semaphore sem;
  317. CSendManager &parent;
  318. virtual int doRun()
  319. {
  320. // MORE - this is reading the state values unprotected
  321. // Not sure that this represents any issue in practice...
  322. if (udpTraceLevel > 0)
  323. DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
  324. while (running)
  325. {
  326. bool idle = false;
  327. if (sem.wait(1000))
  328. {
  329. if (udpTraceLevel > 4)
  330. DBGLOG("UdpSender: send_send_flow::doRun signal received");
  331. }
  332. else
  333. idle = true;
  334. if (!running) return 0;
  335. unsigned now = msTick();
  336. // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
  337. // In a typical heavy load scenario most will be pending
  338. // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
  339. // Separate lists for each state (don't need one for sending) ?
  340. for (unsigned i = 0; i < target_count; i++)
  341. {
  342. switch (state[i]) // MORE - should really protect it?
  343. {
  344. case completed:
  345. done(i, false);
  346. break;
  347. case completed_more:
  348. done(i, true);
  349. break;
  350. case pending_request:
  351. if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
  352. break;
  353. timeouts[i]++;
  354. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
  355. timeouts[i], udpMaxRetryTimedoutReqs,
  356. i, (int) (now - request_time[i]), udpRequestToSendTimeout);
  357. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  358. if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
  359. {
  360. abort(i);
  361. break;
  362. }
  363. // fall into...
  364. case new_request:
  365. sendRequest(i);
  366. break;
  367. default:
  368. if (idle && parent.dataQueued(i))
  369. {
  370. EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
  371. data_added(i);
  372. }
  373. }
  374. }
  375. }
  376. return 0;
  377. }
  378. void done(unsigned index, bool moreRequested)
  379. {
  380. bool dataRemaining;
  381. {
  382. CriticalBlock b(cr);
  383. dataRemaining = parent.dataQueued(index);
  384. if (dataRemaining)
  385. {
  386. state[index] = pending_request;
  387. request_time[index] = msTick();
  388. }
  389. else
  390. {
  391. state[index] = 0;
  392. timeouts[index] = 0;
  393. }
  394. }
  395. if (udpTraceLevel > 3)
  396. DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
  397. if (udpSendCompletedInData)
  398. {
  399. if (dataRemaining)
  400. {
  401. // MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
  402. // though there is a possible race to consider
  403. if (!moreRequested)
  404. parent.sendRequest(index, flow_t::request_to_send);
  405. }
  406. }
  407. else
  408. parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
  409. }
  410. void sendRequest(unsigned index)
  411. {
  412. if (udpTraceLevel > 3)
  413. DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
  414. CriticalBlock b(cr);
  415. parent.sendRequest(index, flow_t::request_to_send);
  416. state[index] = pending_request;
  417. request_time[index] = msTick();
  418. }
  419. void abort(unsigned index)
  420. {
  421. if (udpTraceLevel > 3)
  422. DBGLOG("UdpSender: abort sending queued data to node=%u", index);
  423. CriticalBlock b(cr);
  424. state[index] = 0;
  425. timeouts[index] = 0;
  426. parent.abortData(index);
  427. }
  428. public:
  429. send_send_flow(CSendManager &_parent, unsigned numNodes)
  430. : StartedThread("UdpLib::send_send_flow"), parent(_parent)
  431. {
  432. target_count = numNodes;
  433. state = new char [target_count];
  434. memset(state, 0, target_count);
  435. timeouts = new unsigned char [target_count];
  436. memset(timeouts, 0, target_count);
  437. request_time = new unsigned [target_count];
  438. memset(request_time, 0, sizeof(unsigned) * target_count);
  439. start();
  440. }
  441. ~send_send_flow()
  442. {
  443. running = false;
  444. sem.signal();
  445. join();
  446. delete [] state;
  447. delete [] timeouts;
  448. delete [] request_time;
  449. }
  450. void clear_to_send_received(unsigned index)
  451. {
  452. CriticalBlock b(cr);
  453. state[index] = sending_data;
  454. }
  455. void send_done(unsigned index, bool moreRequested)
  456. {
  457. CriticalBlock b(cr);
  458. state[index] = moreRequested ? completed_more : completed;
  459. sem.signal();
  460. }
  461. void data_added(unsigned index)
  462. {
  463. CriticalBlock b(cr);
  464. // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
  465. // Because done() checks to see if any data pending and re-calls data_added, we get away with it
  466. // Using bits sounds more sensible though?
  467. // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
  468. if (!state[index]) // MORE - should just test the bit?
  469. {
  470. state[index] = new_request;
  471. if (udpTraceLevel > 3)
  472. DBGLOG("UdpSender: state set to new_request for node=%u", index);
  473. sem.signal();
  474. }
  475. }
  476. };
  477. class send_receive_flow : public StartedThread
  478. {
  479. CSendManager &parent;
  480. int receive_port;
  481. Owned<ISocket> flow_socket;
  482. public:
  483. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  484. {
  485. receive_port = r_port;
  486. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  487. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  488. flow_socket.setown(ISocket::udp_create(receive_port));
  489. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  490. size32_t actualSize = flow_socket->get_receive_buffer_size();
  491. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  492. start();
  493. }
  494. ~send_receive_flow()
  495. {
  496. running = false;
  497. if (flow_socket)
  498. flow_socket->close();
  499. join();
  500. }
  501. virtual int doRun()
  502. {
  503. if (udpTraceLevel > 0)
  504. DBGLOG("UdpSender: send_receive_flow started");
  505. #ifdef __linux__
  506. setLinuxThreadPriority(2);
  507. #endif
  508. while(running)
  509. {
  510. UdpPermitToSendMsg f;
  511. while (running)
  512. {
  513. try
  514. {
  515. unsigned int res ;
  516. flow_socket->read(&f, 1, sizeof(f), res, 5);
  517. assertex(res == f.length);
  518. #ifdef CRC_MESSAGES
  519. assertex(f.hdr.crc == f.calcCRC());
  520. #endif
  521. switch (f.cmd)
  522. {
  523. case flow_t::ok_to_send:
  524. if (udpTraceLevel > 1)
  525. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
  526. parent.data->ok_to_send(f);
  527. break;
  528. default:
  529. DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
  530. }
  531. }
  532. catch (IException *e)
  533. {
  534. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  535. {
  536. StringBuffer s;
  537. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
  538. }
  539. e->Release();
  540. }
  541. catch (...)
  542. {
  543. if (running)
  544. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  545. MilliSleep(0);
  546. }
  547. }
  548. }
  549. return 0;
  550. }
  551. };
  552. class send_data : public StartedThread
  553. {
  554. CSendManager &parent;
  555. ISocket *sniffer_socket;
  556. SocketEndpoint ep;
  557. simple_queue<UdpPermitToSendMsg> send_queue;
  558. Linked<TokenBucket> bucket;
  559. void send_sniff(bool busy)
  560. {
  561. unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
  562. sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
  563. try
  564. {
  565. if (!sniffer_socket)
  566. {
  567. sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
  568. if (udpTraceLevel > 1)
  569. {
  570. StringBuffer url;
  571. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  572. }
  573. }
  574. sniffer_socket->write(&msg, sizeof(msg));
  575. if (udpTraceLevel > 1)
  576. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  577. }
  578. catch(IException *e)
  579. {
  580. StringBuffer s;
  581. StringBuffer url;
  582. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
  583. e->Release();
  584. }
  585. catch(...)
  586. {
  587. StringBuffer url;
  588. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  589. if (sniffer_socket)
  590. {
  591. sniffer_socket->Release();
  592. sniffer_socket = NULL;
  593. }
  594. }
  595. }
  596. public:
  597. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  598. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  599. {
  600. sniffer_socket = NULL;
  601. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  602. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  603. start();
  604. }
  605. ~send_data()
  606. {
  607. running = false;
  608. UdpPermitToSendMsg dummy;
  609. send_queue.push(dummy);
  610. join();
  611. if (sniffer_socket)
  612. sniffer_socket->Release();
  613. }
  614. bool ok_to_send(const UdpPermitToSendMsg &msg)
  615. {
  616. if (send_queue.push(msg, 15))
  617. return true;
  618. else
  619. {
  620. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
  621. return false;
  622. }
  623. }
  624. virtual int doRun()
  625. {
  626. if (udpTraceLevel > 0)
  627. DBGLOG("UdpSender: send_data started");
  628. #ifdef __linux__
  629. setLinuxThreadPriority(1); // MORE - windows?
  630. #endif
  631. UdpPermitToSendMsg permit;
  632. while (running)
  633. {
  634. send_queue.pop(permit);
  635. if (!running)
  636. return 0;
  637. if (udpSnifferEnabled)
  638. send_sniff(true);
  639. parent.send_flow->clear_to_send_received(permit.destNodeIndex);
  640. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
  641. bool moreRequested;
  642. unsigned maxPackets;
  643. unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
  644. if (udpSendCompletedInData && !maxPackets)
  645. parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
  646. parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
  647. if (udpSnifferEnabled)
  648. send_sniff(false);
  649. if (udpTraceLevel > 1)
  650. DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
  651. }
  652. if (udpTraceLevel > 0)
  653. DBGLOG("UdpSender: send_data stopped");
  654. return 0;
  655. }
  656. };
  657. friend class send_send_flow;
  658. friend class send_receive_flow;
  659. friend class send_data;
  660. unsigned numNodes;
  661. int receive_flow_port;
  662. int send_flow_port;
  663. int data_port;
  664. unsigned myNodeIndex;
  665. unsigned numQueues;
  666. UdpReceiverEntry *receiversTable;
  667. send_send_flow *send_flow;
  668. send_receive_flow *receive_flow;
  669. send_data *data;
  670. Linked<TokenBucket> bucket;
  671. SpinLock msgSeqLock;
  672. unsigned msgSeq;
  673. static bool comparePacket(void *pkData, void *key)
  674. {
  675. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  676. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  677. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  678. }
  679. inline unsigned getNextMessageSequence()
  680. {
  681. SpinBlock b(msgSeqLock);
  682. unsigned res = ++msgSeq;
  683. if (!res)
  684. res = ++msgSeq;
  685. return res;
  686. }
  687. public:
  688. IMPLEMENT_IINTERFACE;
  689. CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
  690. : bucket(_bucket)
  691. {
  692. #ifndef _WIN32
  693. setpriority(PRIO_PROCESS, 0, -3);
  694. #endif
  695. numNodes = getNumNodes();
  696. receive_flow_port = client_flow_port;
  697. send_flow_port = server_flow_port;
  698. data_port = d_port;
  699. myNodeIndex = _myNodeIndex;
  700. numQueues = _numQueues;
  701. receiversTable = new UdpReceiverEntry[numNodes];
  702. for (unsigned i = 0; i < numNodes; i++)
  703. receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
  704. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  705. send_flow = new send_send_flow(*this, numNodes);
  706. receive_flow = new send_receive_flow(*this, client_flow_port);
  707. msgSeq = 0;
  708. }
  709. ~CSendManager()
  710. {
  711. delete []receiversTable;
  712. delete send_flow;
  713. delete receive_flow;
  714. delete data;
  715. }
  716. void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
  717. {
  718. // NOTE: takes ownership of the DataBuffer
  719. assert(queue < numQueues);
  720. assert(destNodeIndex < numNodes);
  721. receiversTable[destNodeIndex].pushData(queue, buffer);
  722. send_flow->data_added(destNodeIndex);
  723. }
  724. inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
  725. {
  726. receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
  727. }
  728. bool dataQueued(unsigned destIndex)
  729. {
  730. return receiversTable[destIndex].dataQueued();
  731. }
  732. bool abortData(unsigned destIndex)
  733. {
  734. return receiversTable[destIndex].removeData(NULL, NULL);
  735. }
  736. // Interface ISendManager
  737. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
  738. {
  739. if (destNodeIndex >= numNodes)
  740. throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
  741. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
  742. }
  743. virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
  744. {
  745. UdpPacketHeader pkHdr;
  746. pkHdr.ruid = ruid;
  747. pkHdr.msgId = msgId;
  748. return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
  749. }
  750. virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
  751. {
  752. UdpPacketHeader pkHdr;
  753. pkHdr.ruid = ruid;
  754. pkHdr.msgId = msgId;
  755. return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
  756. }
  757. virtual bool allDone()
  758. {
  759. for (unsigned i = 0; i < numNodes; i++)
  760. {
  761. if (receiversTable[i].dataQueued())
  762. return false;
  763. }
  764. return true;
  765. }
  766. };
  767. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
  768. {
  769. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
  770. }
  771. class CMessagePacker : implements IMessagePacker, public CInterface
  772. {
  773. ISendManager &parent;
  774. unsigned destNodeIndex;
  775. UdpPacketHeader package_header;
  776. DataBuffer *part_buffer;
  777. unsigned data_buffer_size;
  778. unsigned data_used;
  779. void *mem_buffer;
  780. unsigned mem_buffer_size;
  781. unsigned totalSize;
  782. bool packed_request;
  783. MemoryBuffer metaInfo;
  784. bool last_message_done;
  785. int queue_number;
  786. public:
  787. IMPLEMENT_IINTERFACE;
  788. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  789. : parent(_parent)
  790. {
  791. queue_number = _queue;
  792. destNodeIndex = _destNode;
  793. package_header.length = 0; // filled in with proper value later
  794. package_header.metalength = 0;
  795. package_header.ruid = ruid;
  796. package_header.msgId = msgId;
  797. package_header.pktSeq = 0;
  798. package_header.nodeIndex = _sourceNode;
  799. package_header.msgSeq = _msgSeq;
  800. packed_request = false;
  801. part_buffer = bufferManager->allocate();
  802. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  803. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  804. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  805. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  806. data_used = headerSize + sizeof(unsigned short);
  807. mem_buffer = 0;
  808. mem_buffer_size = 0;
  809. last_message_done = false;
  810. totalSize = 0;
  811. if (udpTraceLevel >= 40)
  812. DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
  813. }
  814. ~CMessagePacker()
  815. {
  816. if (part_buffer)
  817. part_buffer->Release();
  818. if (mem_buffer) free (mem_buffer);
  819. if (udpTraceLevel >= 40)
  820. {
  821. DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
  822. package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
  823. }
  824. }
  825. virtual void *getBuffer(unsigned len, bool variable)
  826. {
  827. if (variable)
  828. len += sizeof(RecordLengthType);
  829. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  830. {
  831. // Won't fit in one, so allocate temp location
  832. if (mem_buffer_size < len)
  833. {
  834. free(mem_buffer);
  835. mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
  836. mem_buffer_size = len;
  837. }
  838. packed_request = false;
  839. if (variable)
  840. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  841. else
  842. return mem_buffer;
  843. }
  844. if (part_buffer && ((data_buffer_size - data_used) < len))
  845. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  846. if (!part_buffer)
  847. {
  848. part_buffer = bufferManager->allocate();
  849. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  850. }
  851. packed_request = true;
  852. if (variable)
  853. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  854. else
  855. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  856. }
  857. virtual void putBuffer(const void *buf, unsigned len, bool variable)
  858. {
  859. if (variable)
  860. {
  861. assertex(len < MAX_RECORD_LENGTH);
  862. buf = ((char *) buf) - sizeof(RecordLengthType);
  863. *(RecordLengthType *) buf = len;
  864. len += sizeof(RecordLengthType);
  865. }
  866. totalSize += len;
  867. if (packed_request)
  868. {
  869. assert(len <= (data_buffer_size - data_used));
  870. data_used += len;
  871. }
  872. else
  873. {
  874. while (len)
  875. {
  876. if (!part_buffer)
  877. {
  878. part_buffer = bufferManager->allocate();
  879. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  880. data_used = 0;
  881. }
  882. unsigned chunkLen = data_buffer_size - data_used;
  883. if (chunkLen > len)
  884. chunkLen = len;
  885. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  886. data_used += chunkLen;
  887. len -= chunkLen;
  888. buf = &(((char*)buf)[chunkLen]);
  889. if (len)
  890. flush(false);
  891. }
  892. }
  893. }
  894. virtual void sendMetaInfo(const void *buf, unsigned len) {
  895. metaInfo.append(len, buf);
  896. }
  897. virtual void flush(bool last_msg = false)
  898. {
  899. if (!last_message_done && last_msg)
  900. {
  901. last_message_done = true;
  902. if (!part_buffer)
  903. part_buffer = bufferManager->allocate();
  904. const char *metaData = metaInfo.toByteArray();
  905. unsigned metaLength = metaInfo.length();
  906. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  907. while (metaLength > maxMetaLength)
  908. {
  909. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  910. put_package(part_buffer, data_used, maxMetaLength);
  911. metaLength -= maxMetaLength;
  912. metaData += maxMetaLength;
  913. data_used = 0;
  914. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  915. part_buffer = bufferManager->allocate();
  916. }
  917. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  918. package_header.pktSeq |= UDP_PACKET_COMPLETE;
  919. put_package(part_buffer, data_used, metaLength);
  920. }
  921. else if (part_buffer)
  922. {
  923. // Just flush current - used when no room for current row
  924. if (data_used)
  925. put_package(part_buffer, data_used, 0); // buffer released in put_package
  926. else
  927. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  928. }
  929. part_buffer = 0;
  930. data_buffer_size = 0;
  931. data_used = 0;
  932. }
  933. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  934. {
  935. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  936. package_header.metalength = metalength;
  937. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  938. parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
  939. if (udpTraceLevel >= 50)
  940. {
  941. if (package_header.length==991)
  942. DBGLOG("NEarly");
  943. DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
  944. package_header.ruid, package_header.msgId, package_header.msgSeq,
  945. package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
  946. }
  947. package_header.pktSeq++;
  948. }
  949. virtual bool dataQueued()
  950. {
  951. return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
  952. }
  953. virtual unsigned size() const
  954. {
  955. return totalSize;
  956. }
  957. };
  958. IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  959. {
  960. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
  961. }