udptrs.cpp 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "jsocket.hpp"
  17. #include "jlog.hpp"
  18. #include "roxie.hpp"
  19. #ifdef _WIN32
  20. #include <winsock.h>
  21. #else
  22. #include <sys/socket.h>
  23. #include <sys/time.h>
  24. #include <sys/resource.h>
  25. #endif
  26. #include <math.h>
  27. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  28. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  29. #endif
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
  33. bool udpSnifferEnabled = true;
  34. #ifdef _DEBUG
  35. //#define _SIMULATE_LOST_PACKETS
  36. #endif
  37. using roxiemem::DataBuffer;
  38. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  39. class UdpReceiverEntry
  40. {
  41. queue_t *output_queue;
  42. bool initialized;
  43. public:
  44. ISocket *send_flow_socket;
  45. ISocket *data_socket;
  46. unsigned numQueues;
  47. int current_q;
  48. int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
  49. int *maxPktsPerQ; // to minimise power function re-calc for evey packet
  50. // MORE - consider where we need critsecs in here!
  51. void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
  52. {
  53. UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
  54. try
  55. {
  56. send_flow_socket->write(&msg, msg.length);
  57. }
  58. catch(IException *e)
  59. {
  60. StringBuffer s;
  61. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  62. e->Release();
  63. }
  64. catch (...)
  65. {
  66. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  67. }
  68. }
  69. unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
  70. {
  71. moreRequested = false;
  72. maxPackets = permit.max_data;
  73. PointerArray toSend;
  74. unsigned totalSent = 0;
  75. while (toSend.length() < maxPackets && dataQueued())
  76. {
  77. DataBuffer *buffer = popQueuedData();
  78. if (buffer) // Aborted slave queries leave NULL records on queue
  79. {
  80. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  81. toSend.append(buffer);
  82. totalSent += header->length;
  83. #ifdef __linux__
  84. if (isLocal && (totalSent> 100000))
  85. break;
  86. #endif
  87. }
  88. }
  89. maxPackets = toSend.length();
  90. for (unsigned idx = 0; idx < maxPackets; idx++)
  91. {
  92. DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
  93. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  94. unsigned length = header->length;
  95. if (bucket)
  96. {
  97. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  98. bucket->wait((length / 1024)+1);
  99. }
  100. try
  101. {
  102. data_socket->write(buffer->data, length);
  103. }
  104. catch(IException *e)
  105. {
  106. StringBuffer s;
  107. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  108. e->Release();
  109. }
  110. catch(...)
  111. {
  112. DBGLOG("UdpSender: write exception - unknown exception");
  113. }
  114. ::Release(buffer);
  115. }
  116. return totalSent;
  117. }
  118. bool dataQueued()
  119. {
  120. for (unsigned i = 0; i < numQueues; i++)
  121. {
  122. if (!output_queue[i].empty())
  123. return true;
  124. }
  125. return false;
  126. }
  127. bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  128. {
  129. for (unsigned i = 0; i < numQueues; i++)
  130. {
  131. if (output_queue[i].dataQueued(key, pkCmpFn))
  132. return true;
  133. }
  134. return false;
  135. }
  136. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  137. {
  138. bool anyRemoved = false;
  139. for (unsigned i = 0; i < numQueues; i++)
  140. {
  141. if (output_queue[i].removeData(key, pkCmpFn))
  142. anyRemoved = true;
  143. }
  144. return anyRemoved;
  145. }
  146. inline void pushData(unsigned queue, DataBuffer *buffer)
  147. {
  148. output_queue[queue].pushOwn(buffer);
  149. }
  150. DataBuffer *popQueuedData()
  151. {
  152. DataBuffer *buffer;
  153. while (1)
  154. {
  155. for (unsigned i = 0; i < numQueues; i++)
  156. {
  157. if (udpOutQsPriority)
  158. {
  159. if (output_queue[current_q].empty())
  160. {
  161. if (udpTraceLevel >= 5)
  162. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  163. currentQNumPkts = 0;
  164. current_q = (current_q + 1) % numQueues;
  165. }
  166. else
  167. {
  168. buffer = output_queue[current_q].pop();
  169. currentQNumPkts++;
  170. if (udpTraceLevel >= 5)
  171. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  172. if (currentQNumPkts >= maxPktsPerQ[current_q])
  173. {
  174. currentQNumPkts = 0;
  175. current_q = (current_q + 1) % numQueues;
  176. }
  177. return buffer;
  178. }
  179. }
  180. else
  181. {
  182. current_q = (current_q + 1) % numQueues;
  183. if (!output_queue[current_q].empty())
  184. {
  185. return output_queue[current_q].pop();
  186. }
  187. }
  188. }
  189. MilliSleep(10);
  190. DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
  191. }
  192. }
  193. UdpReceiverEntry()
  194. {
  195. send_flow_socket = data_socket = NULL;
  196. numQueues = 0;
  197. current_q = 0;
  198. initialized = false;
  199. output_queue = 0;
  200. currentQNumPkts = 0;
  201. maxPktsPerQ = 0;
  202. }
  203. void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
  204. {
  205. assert(!initialized);
  206. numQueues = _numQueues;
  207. const IpAddress &ip = getNodeAddress(destNodeIndex);
  208. if (!ip.isNull())
  209. {
  210. try
  211. {
  212. SocketEndpoint sendFlowEp(sendFlowPort, ip);
  213. SocketEndpoint dataEp(dataPort, ip);
  214. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  215. data_socket = ISocket::udp_connect(dataEp);
  216. if (isLocal)
  217. {
  218. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  219. if (udpTraceLevel > 0)
  220. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  221. }
  222. }
  223. catch(IException *e)
  224. {
  225. StringBuffer error, ipstr;
  226. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  227. throw;
  228. }
  229. catch(...)
  230. {
  231. StringBuffer ipstr;
  232. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  233. throw;
  234. }
  235. output_queue = new queue_t[numQueues];
  236. maxPktsPerQ = new int[numQueues];
  237. for (unsigned j = 0; j < numQueues; j++)
  238. {
  239. output_queue[j].set_queue_size(queueSize);
  240. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  241. }
  242. initialized = true;
  243. if (udpTraceLevel > 0)
  244. {
  245. StringBuffer ipStr;
  246. DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
  247. }
  248. }
  249. }
  250. ~UdpReceiverEntry()
  251. {
  252. if (send_flow_socket) send_flow_socket->Release();
  253. if (data_socket) data_socket->Release();
  254. if (output_queue) delete [] output_queue;
  255. if (maxPktsPerQ) delete [] maxPktsPerQ;
  256. }
  257. };
  258. class CSendManager : implements ISendManager, public CInterface
  259. {
  260. friend class send_send_flow;
  261. class StartedThread : public Thread
  262. {
  263. private:
  264. Semaphore started;
  265. virtual int run()
  266. {
  267. started.signal();
  268. return doRun();
  269. }
  270. protected:
  271. bool running;
  272. public:
  273. StartedThread(const char *name) : Thread(name)
  274. {
  275. running = false;
  276. }
  277. ~StartedThread()
  278. {
  279. running = false;
  280. join();
  281. }
  282. virtual void start()
  283. {
  284. running = true;
  285. Thread::start();
  286. started.wait();
  287. }
  288. virtual int doRun() = 0;
  289. };
  290. class send_send_flow : public StartedThread
  291. {
  292. /*
  293. I don't like this code much at all
  294. Looping round all every time status of any changes seems like a bad thing especially as scale goes up
  295. Even though these look like a bitmap they are not used as such presently
  296. - as a result, if data_added() is called while state is completed, we lose the request I think
  297. - probably get away with it because of the dataqueued check
  298. doRun() uses state bits without protection
  299. A count of number pending for each might be better than just a flag
  300. Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
  301. - normally MANY in pending (and order is interesting)
  302. - normally few in any other state (only 1 if thread keeping up), order not really very interesting
  303. - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
  304. - in particular don't lock while processing the chain
  305. - Never need to be in >1 chain
  306. msTick() probably better than time() for detecting timeouts
  307. */
  308. enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
  309. unsigned target_count;
  310. char *state;
  311. unsigned char *timeouts; // Number of consecutive timeouts
  312. unsigned *request_time;
  313. CriticalSection cr;
  314. Semaphore sem;
  315. CSendManager &parent;
  316. virtual int doRun()
  317. {
  318. // MORE - this is reading the state values unprotected
  319. // Not sure that this represents any issue in practice...
  320. if (udpTraceLevel > 0)
  321. DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
  322. while (running)
  323. {
  324. bool idle = false;
  325. if (sem.wait(1000))
  326. {
  327. if (udpTraceLevel > 4)
  328. DBGLOG("UdpSender: send_send_flow::doRun signal received");
  329. }
  330. else
  331. idle = true;
  332. if (!running) return 0;
  333. unsigned now = msTick();
  334. // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
  335. // In a typical heavy load scenario most will be pending
  336. // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
  337. // Separate lists for each state (don't need one for sending) ?
  338. for (unsigned i = 0; i < target_count; i++)
  339. {
  340. switch (state[i]) // MORE - should really protect it?
  341. {
  342. case completed:
  343. done(i, false);
  344. break;
  345. case completed_more:
  346. done(i, true);
  347. break;
  348. case pending_request:
  349. if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
  350. break;
  351. timeouts[i]++;
  352. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
  353. timeouts[i], udpMaxRetryTimedoutReqs,
  354. i, (int) (now - request_time[i]), udpRequestToSendTimeout);
  355. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  356. if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
  357. {
  358. abort(i);
  359. break;
  360. }
  361. // fall into...
  362. case new_request:
  363. sendRequest(i);
  364. break;
  365. default:
  366. if (idle && parent.dataQueued(i))
  367. {
  368. EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
  369. data_added(i);
  370. }
  371. }
  372. }
  373. }
  374. return 0;
  375. }
  376. void done(unsigned index, bool moreRequested)
  377. {
  378. bool dataRemaining;
  379. {
  380. CriticalBlock b(cr);
  381. dataRemaining = parent.dataQueued(index);
  382. if (dataRemaining)
  383. {
  384. state[index] = pending_request;
  385. request_time[index] = msTick();
  386. }
  387. else
  388. {
  389. state[index] = 0;
  390. timeouts[index] = 0;
  391. }
  392. }
  393. if (udpTraceLevel > 3)
  394. DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
  395. parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
  396. }
  397. void sendRequest(unsigned index)
  398. {
  399. if (udpTraceLevel > 3)
  400. DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
  401. CriticalBlock b(cr);
  402. parent.sendRequest(index, flow_t::request_to_send);
  403. state[index] = pending_request;
  404. request_time[index] = msTick();
  405. }
  406. void abort(unsigned index)
  407. {
  408. if (udpTraceLevel > 3)
  409. DBGLOG("UdpSender: abort sending queued data to node=%u", index);
  410. CriticalBlock b(cr);
  411. state[index] = 0;
  412. timeouts[index] = 0;
  413. parent.abortData(index);
  414. }
  415. public:
  416. send_send_flow(CSendManager &_parent, unsigned numNodes)
  417. : StartedThread("UdpLib::send_send_flow"), parent(_parent)
  418. {
  419. target_count = numNodes;
  420. state = new char [target_count];
  421. memset(state, 0, target_count);
  422. timeouts = new unsigned char [target_count];
  423. memset(timeouts, 0, target_count);
  424. request_time = new unsigned [target_count];
  425. memset(request_time, 0, sizeof(unsigned) * target_count);
  426. start();
  427. }
  428. ~send_send_flow()
  429. {
  430. running = false;
  431. sem.signal();
  432. join();
  433. delete [] state;
  434. delete [] timeouts;
  435. delete [] request_time;
  436. }
  437. void clear_to_send_received(unsigned index)
  438. {
  439. CriticalBlock b(cr);
  440. state[index] = sending_data;
  441. }
  442. void send_done(unsigned index, bool moreRequested)
  443. {
  444. CriticalBlock b(cr);
  445. state[index] = moreRequested ? completed_more : completed;
  446. sem.signal();
  447. }
  448. void data_added(unsigned index)
  449. {
  450. CriticalBlock b(cr);
  451. // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
  452. // Because done() checks to see if any data pending and re-calls data_added, we get away with it
  453. // Using bits sounds more sensible though?
  454. // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
  455. if (!state[index]) // MORE - should just test the bit?
  456. {
  457. state[index] = new_request;
  458. if (udpTraceLevel > 3)
  459. DBGLOG("UdpSender: state set to new_request for node=%u", index);
  460. sem.signal();
  461. }
  462. }
  463. };
  464. class send_receive_flow : public StartedThread
  465. {
  466. CSendManager &parent;
  467. int receive_port;
  468. Owned<ISocket> flow_socket;
  469. public:
  470. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  471. {
  472. receive_port = r_port;
  473. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  474. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  475. flow_socket.setown(ISocket::udp_create(receive_port));
  476. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  477. size32_t actualSize = flow_socket->get_receive_buffer_size();
  478. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  479. start();
  480. }
  481. ~send_receive_flow()
  482. {
  483. running = false;
  484. if (flow_socket)
  485. flow_socket->close();
  486. join();
  487. }
  488. virtual int doRun()
  489. {
  490. if (udpTraceLevel > 0)
  491. DBGLOG("UdpSender: send_receive_flow started");
  492. #ifdef __linux__
  493. setLinuxThreadPriority(2);
  494. #endif
  495. while(running)
  496. {
  497. UdpPermitToSendMsg f;
  498. while (running)
  499. {
  500. try
  501. {
  502. unsigned int res ;
  503. flow_socket->read(&f, 1, sizeof(f), res, 5);
  504. assertex(res == f.length);
  505. #ifdef CRC_MESSAGES
  506. assertex(f.hdr.crc == f.calcCRC());
  507. #endif
  508. switch (f.cmd)
  509. {
  510. case flow_t::ok_to_send:
  511. if (udpTraceLevel > 1)
  512. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
  513. parent.data->ok_to_send(f);
  514. break;
  515. default:
  516. DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
  517. }
  518. }
  519. catch (IException *e)
  520. {
  521. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  522. {
  523. StringBuffer s;
  524. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
  525. }
  526. e->Release();
  527. }
  528. catch (...)
  529. {
  530. if (running)
  531. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  532. MilliSleep(0);
  533. }
  534. }
  535. }
  536. return 0;
  537. }
  538. };
  539. class send_data : public StartedThread
  540. {
  541. CSendManager &parent;
  542. ISocket *sniffer_socket;
  543. SocketEndpoint ep;
  544. simple_queue<UdpPermitToSendMsg> send_queue;
  545. Linked<TokenBucket> bucket;
  546. void send_sniff(bool busy)
  547. {
  548. unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
  549. sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
  550. try
  551. {
  552. if (!sniffer_socket)
  553. {
  554. sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
  555. if (udpTraceLevel > 1)
  556. {
  557. StringBuffer url;
  558. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  559. }
  560. }
  561. sniffer_socket->write(&msg, sizeof(msg));
  562. if (udpTraceLevel > 1)
  563. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  564. }
  565. catch(IException *e)
  566. {
  567. StringBuffer s;
  568. StringBuffer url;
  569. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
  570. e->Release();
  571. }
  572. catch(...)
  573. {
  574. StringBuffer url;
  575. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  576. if (sniffer_socket)
  577. {
  578. sniffer_socket->Release();
  579. sniffer_socket = NULL;
  580. }
  581. }
  582. }
  583. public:
  584. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  585. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  586. {
  587. sniffer_socket = NULL;
  588. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  589. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  590. start();
  591. }
  592. ~send_data()
  593. {
  594. running = false;
  595. UdpPermitToSendMsg dummy;
  596. send_queue.push(dummy);
  597. join();
  598. if (sniffer_socket)
  599. sniffer_socket->Release();
  600. }
  601. bool ok_to_send(const UdpPermitToSendMsg &msg)
  602. {
  603. if (send_queue.push(msg, 15))
  604. return true;
  605. else
  606. {
  607. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
  608. return false;
  609. }
  610. }
  611. virtual int doRun()
  612. {
  613. if (udpTraceLevel > 0)
  614. DBGLOG("UdpSender: send_data started");
  615. #ifdef __linux__
  616. setLinuxThreadPriority(1); // MORE - windows?
  617. #endif
  618. UdpPermitToSendMsg permit;
  619. while (running)
  620. {
  621. send_queue.pop(permit);
  622. if (!running)
  623. return 0;
  624. if (udpSnifferEnabled)
  625. send_sniff(true);
  626. parent.send_flow->clear_to_send_received(permit.destNodeIndex);
  627. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
  628. bool moreRequested;
  629. unsigned maxPackets;
  630. unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
  631. parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
  632. if (udpSnifferEnabled)
  633. send_sniff(false);
  634. if (udpTraceLevel > 1)
  635. DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
  636. }
  637. if (udpTraceLevel > 0)
  638. DBGLOG("UdpSender: send_data stopped");
  639. return 0;
  640. }
  641. };
  642. friend class send_send_flow;
  643. friend class send_receive_flow;
  644. friend class send_data;
  645. unsigned numNodes;
  646. int receive_flow_port;
  647. int send_flow_port;
  648. int data_port;
  649. unsigned myNodeIndex;
  650. unsigned numQueues;
  651. UdpReceiverEntry *receiversTable;
  652. send_send_flow *send_flow;
  653. send_receive_flow *receive_flow;
  654. send_data *data;
  655. Linked<TokenBucket> bucket;
  656. SpinLock msgSeqLock;
  657. unsigned msgSeq;
  658. static bool comparePacket(void *pkData, void *key)
  659. {
  660. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  661. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  662. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  663. }
  664. inline unsigned getNextMessageSequence()
  665. {
  666. SpinBlock b(msgSeqLock);
  667. unsigned res = ++msgSeq;
  668. if (!res)
  669. res = ++msgSeq;
  670. return res;
  671. }
  672. public:
  673. IMPLEMENT_IINTERFACE;
  674. CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
  675. : bucket(_bucket)
  676. {
  677. #ifndef _WIN32
  678. setpriority(PRIO_PROCESS, 0, -3);
  679. #endif
  680. numNodes = getNumNodes();
  681. receive_flow_port = client_flow_port;
  682. send_flow_port = server_flow_port;
  683. data_port = d_port;
  684. myNodeIndex = _myNodeIndex;
  685. numQueues = _numQueues;
  686. receiversTable = new UdpReceiverEntry[numNodes];
  687. for (unsigned i = 0; i < numNodes; i++)
  688. receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
  689. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  690. send_flow = new send_send_flow(*this, numNodes);
  691. receive_flow = new send_receive_flow(*this, client_flow_port);
  692. msgSeq = 0;
  693. }
  694. ~CSendManager()
  695. {
  696. delete []receiversTable;
  697. delete send_flow;
  698. delete receive_flow;
  699. delete data;
  700. }
  701. void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
  702. {
  703. // NOTE: takes ownership of the DataBuffer
  704. assert(queue < numQueues);
  705. assert(destNodeIndex < numNodes);
  706. receiversTable[destNodeIndex].pushData(queue, buffer);
  707. send_flow->data_added(destNodeIndex);
  708. }
  709. inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
  710. {
  711. receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
  712. }
  713. bool dataQueued(unsigned destIndex)
  714. {
  715. return receiversTable[destIndex].dataQueued();
  716. }
  717. bool abortData(unsigned destIndex)
  718. {
  719. return receiversTable[destIndex].removeData(NULL, NULL);
  720. }
  721. // Interface ISendManager
  722. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
  723. {
  724. if (destNodeIndex >= numNodes)
  725. throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
  726. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
  727. }
  728. virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
  729. {
  730. UdpPacketHeader pkHdr;
  731. pkHdr.ruid = ruid;
  732. pkHdr.msgId = msgId;
  733. return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
  734. }
  735. virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
  736. {
  737. UdpPacketHeader pkHdr;
  738. pkHdr.ruid = ruid;
  739. pkHdr.msgId = msgId;
  740. return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
  741. }
  742. virtual bool allDone()
  743. {
  744. for (unsigned i = 0; i < numNodes; i++)
  745. {
  746. if (receiversTable[i].dataQueued())
  747. return false;
  748. }
  749. return true;
  750. }
  751. };
  752. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
  753. {
  754. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
  755. }
  756. class CMessagePacker : implements IMessagePacker, public CInterface
  757. {
  758. ISendManager &parent;
  759. unsigned destNodeIndex;
  760. UdpPacketHeader package_header;
  761. DataBuffer *part_buffer;
  762. unsigned data_buffer_size;
  763. unsigned data_used;
  764. void *mem_buffer;
  765. unsigned mem_buffer_size;
  766. unsigned totalSize;
  767. bool packed_request;
  768. MemoryBuffer metaInfo;
  769. bool last_message_done;
  770. int queue_number;
  771. public:
  772. IMPLEMENT_IINTERFACE;
  773. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  774. : parent(_parent)
  775. {
  776. queue_number = _queue;
  777. destNodeIndex = _destNode;
  778. package_header.length = 0; // filled in with proper value later
  779. package_header.metalength = 0;
  780. package_header.ruid = ruid;
  781. package_header.msgId = msgId;
  782. package_header.pktSeq = 0;
  783. package_header.nodeIndex = _sourceNode;
  784. package_header.msgSeq = _msgSeq;
  785. package_header.udpSequence = 0; // these are allocated when transmitted
  786. packed_request = false;
  787. part_buffer = bufferManager->allocate();
  788. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  789. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  790. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  791. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  792. data_used = headerSize + sizeof(unsigned short);
  793. mem_buffer = 0;
  794. mem_buffer_size = 0;
  795. last_message_done = false;
  796. totalSize = 0;
  797. if (udpTraceLevel >= 40)
  798. DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
  799. }
  800. ~CMessagePacker()
  801. {
  802. if (part_buffer)
  803. part_buffer->Release();
  804. if (mem_buffer) free (mem_buffer);
  805. if (udpTraceLevel >= 40)
  806. {
  807. DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
  808. package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
  809. }
  810. }
  811. virtual void *getBuffer(unsigned len, bool variable)
  812. {
  813. if (variable)
  814. len += sizeof(RecordLengthType);
  815. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  816. {
  817. // Won't fit in one, so allocate temp location
  818. if (mem_buffer_size < len)
  819. {
  820. free(mem_buffer);
  821. mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
  822. mem_buffer_size = len;
  823. }
  824. packed_request = false;
  825. if (variable)
  826. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  827. else
  828. return mem_buffer;
  829. }
  830. if (part_buffer && ((data_buffer_size - data_used) < len))
  831. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  832. if (!part_buffer)
  833. {
  834. part_buffer = bufferManager->allocate();
  835. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  836. }
  837. packed_request = true;
  838. if (variable)
  839. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  840. else
  841. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  842. }
  843. virtual void putBuffer(const void *buf, unsigned len, bool variable)
  844. {
  845. if (variable)
  846. {
  847. assertex(len < MAX_RECORD_LENGTH);
  848. buf = ((char *) buf) - sizeof(RecordLengthType);
  849. *(RecordLengthType *) buf = len;
  850. len += sizeof(RecordLengthType);
  851. }
  852. totalSize += len;
  853. if (packed_request)
  854. {
  855. assert(len <= (data_buffer_size - data_used));
  856. data_used += len;
  857. }
  858. else
  859. {
  860. while (len)
  861. {
  862. if (!part_buffer)
  863. {
  864. part_buffer = bufferManager->allocate();
  865. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  866. data_used = 0;
  867. }
  868. unsigned chunkLen = data_buffer_size - data_used;
  869. if (chunkLen > len)
  870. chunkLen = len;
  871. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  872. data_used += chunkLen;
  873. len -= chunkLen;
  874. buf = &(((char*)buf)[chunkLen]);
  875. if (len)
  876. flush(false);
  877. }
  878. }
  879. }
  880. virtual void sendMetaInfo(const void *buf, unsigned len) {
  881. metaInfo.append(len, buf);
  882. }
  883. virtual void flush(bool last_msg = false)
  884. {
  885. if (!last_message_done && last_msg)
  886. {
  887. last_message_done = true;
  888. if (!part_buffer)
  889. part_buffer = bufferManager->allocate();
  890. const char *metaData = metaInfo.toByteArray();
  891. unsigned metaLength = metaInfo.length();
  892. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  893. while (metaLength > maxMetaLength)
  894. {
  895. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  896. put_package(part_buffer, data_used, maxMetaLength);
  897. metaLength -= maxMetaLength;
  898. metaData += maxMetaLength;
  899. data_used = 0;
  900. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  901. part_buffer = bufferManager->allocate();
  902. }
  903. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  904. package_header.pktSeq |= UDP_PACKET_COMPLETE;
  905. put_package(part_buffer, data_used, metaLength);
  906. }
  907. else if (part_buffer)
  908. {
  909. // Just flush current - used when no room for current row
  910. if (data_used)
  911. put_package(part_buffer, data_used, 0); // buffer released in put_package
  912. else
  913. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  914. }
  915. part_buffer = 0;
  916. data_buffer_size = 0;
  917. data_used = 0;
  918. }
  919. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  920. {
  921. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  922. package_header.metalength = metalength;
  923. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  924. parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
  925. if (udpTraceLevel >= 50)
  926. {
  927. if (package_header.length==991)
  928. DBGLOG("NEarly");
  929. DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
  930. package_header.ruid, package_header.msgId, package_header.msgSeq,
  931. package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
  932. }
  933. package_header.pktSeq++;
  934. }
  935. virtual bool dataQueued()
  936. {
  937. return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
  938. }
  939. virtual unsigned size() const
  940. {
  941. return totalSize;
  942. }
  943. };
  944. IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  945. {
  946. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
  947. }