udptrs.cpp 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "jsocket.hpp"
  17. #include "jlog.hpp"
  18. #include "roxie.hpp"
  19. #ifdef _WIN32
  20. #include <winsock.h>
  21. #else
  22. #include <sys/socket.h>
  23. #include <sys/time.h>
  24. #include <sys/resource.h>
  25. #endif
  26. #include <math.h>
  27. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  28. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  29. #endif
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
  33. bool udpSnifferEnabled = true;
  34. #ifdef _DEBUG
  35. //#define _SIMULATE_LOST_PACKETS
  36. #endif
  37. using roxiemem::DataBuffer;
  38. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  39. class UdpReceiverEntry
  40. {
  41. queue_t *output_queue;
  42. bool initialized;
  43. public:
  44. ISocket *send_flow_socket;
  45. ISocket *data_socket;
  46. unsigned numQueues;
  47. int current_q;
  48. int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
  49. int *maxPktsPerQ; // to minimise power function re-calc for evey packet
  50. // MORE - consider where we need critsecs in here!
  51. void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
  52. {
  53. UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
  54. try
  55. {
  56. send_flow_socket->write(&msg, msg.length);
  57. }
  58. catch(IException *e)
  59. {
  60. StringBuffer s;
  61. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  62. e->Release();
  63. }
  64. catch (...)
  65. {
  66. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  67. }
  68. }
  69. unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
  70. {
  71. moreRequested = false;
  72. maxPackets = permit.max_data;
  73. PointerArray toSend;
  74. unsigned totalSent = 0;
  75. while (toSend.length() < maxPackets && dataQueued())
  76. {
  77. DataBuffer *buffer = popQueuedData();
  78. if (buffer) // Aborted slave queries leave NULL records on queue
  79. {
  80. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  81. toSend.append(buffer);
  82. totalSent += header->length;
  83. #ifdef __linux__
  84. if (isLocal && (totalSent> 100000))
  85. break;
  86. #endif
  87. }
  88. }
  89. maxPackets = toSend.length();
  90. for (unsigned idx = 0; idx < maxPackets; idx++)
  91. {
  92. DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
  93. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  94. unsigned length = header->length;
  95. if (bucket)
  96. {
  97. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  98. bucket->wait((length / 1024)+1);
  99. }
  100. try
  101. {
  102. if (udpSendCompletedInData)
  103. {
  104. if (idx == maxPackets-1)
  105. {
  106. // MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
  107. header->udpSequence = UDP_SEQUENCE_COMPLETE;
  108. }
  109. }
  110. data_socket->write(buffer->data, length);
  111. }
  112. catch(IException *e)
  113. {
  114. StringBuffer s;
  115. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  116. e->Release();
  117. }
  118. catch(...)
  119. {
  120. DBGLOG("UdpSender: write exception - unknown exception");
  121. }
  122. ::Release(buffer);
  123. }
  124. return totalSent;
  125. }
  126. bool dataQueued()
  127. {
  128. for (unsigned i = 0; i < numQueues; i++)
  129. {
  130. if (!output_queue[i].empty())
  131. return true;
  132. }
  133. return false;
  134. }
  135. bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  136. {
  137. for (unsigned i = 0; i < numQueues; i++)
  138. {
  139. if (output_queue[i].dataQueued(key, pkCmpFn))
  140. return true;
  141. }
  142. return false;
  143. }
  144. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  145. {
  146. bool anyRemoved = false;
  147. for (unsigned i = 0; i < numQueues; i++)
  148. {
  149. if (output_queue[i].removeData(key, pkCmpFn))
  150. anyRemoved = true;
  151. }
  152. return anyRemoved;
  153. }
  154. inline void pushData(unsigned queue, DataBuffer *buffer)
  155. {
  156. output_queue[queue].pushOwn(buffer);
  157. }
  158. DataBuffer *popQueuedData()
  159. {
  160. DataBuffer *buffer;
  161. while (1)
  162. {
  163. for (unsigned i = 0; i < numQueues; i++)
  164. {
  165. if (udpOutQsPriority)
  166. {
  167. if (output_queue[current_q].empty())
  168. {
  169. if (udpTraceLevel >= 5)
  170. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  171. currentQNumPkts = 0;
  172. current_q = (current_q + 1) % numQueues;
  173. }
  174. else
  175. {
  176. buffer = output_queue[current_q].pop();
  177. currentQNumPkts++;
  178. if (udpTraceLevel >= 5)
  179. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  180. if (currentQNumPkts >= maxPktsPerQ[current_q])
  181. {
  182. currentQNumPkts = 0;
  183. current_q = (current_q + 1) % numQueues;
  184. }
  185. return buffer;
  186. }
  187. }
  188. else
  189. {
  190. current_q = (current_q + 1) % numQueues;
  191. if (!output_queue[current_q].empty())
  192. {
  193. return output_queue[current_q].pop();
  194. }
  195. }
  196. }
  197. MilliSleep(10);
  198. DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
  199. }
  200. }
  201. UdpReceiverEntry()
  202. {
  203. send_flow_socket = data_socket = NULL;
  204. numQueues = 0;
  205. current_q = 0;
  206. initialized = false;
  207. output_queue = 0;
  208. currentQNumPkts = 0;
  209. maxPktsPerQ = 0;
  210. }
  211. void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
  212. {
  213. assert(!initialized);
  214. numQueues = _numQueues;
  215. const IpAddress &ip = getNodeAddress(destNodeIndex);
  216. if (!ip.isNull())
  217. {
  218. try
  219. {
  220. SocketEndpoint sendFlowEp(sendFlowPort, ip);
  221. SocketEndpoint dataEp(dataPort, ip);
  222. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  223. data_socket = ISocket::udp_connect(dataEp);
  224. if (isLocal)
  225. {
  226. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  227. if (udpTraceLevel > 0)
  228. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  229. }
  230. }
  231. catch(IException *e)
  232. {
  233. StringBuffer error, ipstr;
  234. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  235. throw;
  236. }
  237. catch(...)
  238. {
  239. StringBuffer ipstr;
  240. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  241. throw;
  242. }
  243. output_queue = new queue_t[numQueues];
  244. maxPktsPerQ = new int[numQueues];
  245. for (unsigned j = 0; j < numQueues; j++)
  246. {
  247. output_queue[j].set_queue_size(queueSize);
  248. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  249. }
  250. initialized = true;
  251. if (udpTraceLevel > 0)
  252. {
  253. StringBuffer ipStr;
  254. DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
  255. }
  256. }
  257. }
  258. ~UdpReceiverEntry()
  259. {
  260. if (send_flow_socket) send_flow_socket->Release();
  261. if (data_socket) data_socket->Release();
  262. if (output_queue) delete [] output_queue;
  263. if (maxPktsPerQ) delete [] maxPktsPerQ;
  264. }
  265. };
  266. class CSendManager : implements ISendManager, public CInterface
  267. {
  268. friend class send_send_flow;
  269. class StartedThread : public Thread
  270. {
  271. private:
  272. Semaphore started;
  273. virtual int run()
  274. {
  275. started.signal();
  276. return doRun();
  277. }
  278. protected:
  279. bool running;
  280. public:
  281. StartedThread(const char *name) : Thread(name)
  282. {
  283. running = false;
  284. }
  285. ~StartedThread()
  286. {
  287. running = false;
  288. join();
  289. }
  290. virtual void start()
  291. {
  292. running = true;
  293. Thread::start();
  294. started.wait();
  295. }
  296. virtual int doRun() = 0;
  297. };
  298. class send_send_flow : public StartedThread
  299. {
  300. /*
  301. I don't like this code much at all
  302. Looping round all every time status of any changes seems like a bad thing especially as scale goes up
  303. Even though these look like a bitmap they are not used as such presently
  304. - as a result, if data_added() is called while state is completed, we lose the request I think
  305. - probably get away with it because of the dataqueued check
  306. doRun() uses state bits without protection
  307. A count of number pending for each might be better than just a flag
  308. Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
  309. - normally MANY in pending (and order is interesting)
  310. - normally few in any other state (only 1 if thread keeping up), order not really very interesting
  311. - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
  312. - in particular don't lock while processing the chain
  313. - Never need to be in >1 chain
  314. msTick() probably better than time() for detecting timeouts
  315. */
  316. enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
  317. unsigned target_count;
  318. char *state;
  319. unsigned char *timeouts; // Number of consecutive timeouts
  320. unsigned *request_time;
  321. CriticalSection cr;
  322. Semaphore sem;
  323. CSendManager &parent;
  324. virtual int doRun()
  325. {
  326. // MORE - this is reading the state values unprotected
  327. // Not sure that this represents any issue in practice...
  328. if (udpTraceLevel > 0)
  329. DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
  330. while (running)
  331. {
  332. bool idle = false;
  333. if (sem.wait(1000))
  334. {
  335. if (udpTraceLevel > 4)
  336. DBGLOG("UdpSender: send_send_flow::doRun signal received");
  337. }
  338. else
  339. idle = true;
  340. if (!running) return 0;
  341. unsigned now = msTick();
  342. // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
  343. // In a typical heavy load scenario most will be pending
  344. // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
  345. // Separate lists for each state (don't need one for sending) ?
  346. for (unsigned i = 0; i < target_count; i++)
  347. {
  348. switch (state[i]) // MORE - should really protect it?
  349. {
  350. case completed:
  351. done(i, false);
  352. break;
  353. case completed_more:
  354. done(i, true);
  355. break;
  356. case pending_request:
  357. if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
  358. break;
  359. timeouts[i]++;
  360. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
  361. timeouts[i], udpMaxRetryTimedoutReqs,
  362. i, (int) (now - request_time[i]), udpRequestToSendTimeout);
  363. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  364. if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
  365. {
  366. abort(i);
  367. break;
  368. }
  369. // fall into...
  370. case new_request:
  371. sendRequest(i);
  372. break;
  373. default:
  374. if (idle && parent.dataQueued(i))
  375. {
  376. EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
  377. data_added(i);
  378. }
  379. }
  380. }
  381. }
  382. return 0;
  383. }
  384. void done(unsigned index, bool moreRequested)
  385. {
  386. bool dataRemaining;
  387. {
  388. CriticalBlock b(cr);
  389. dataRemaining = parent.dataQueued(index);
  390. if (dataRemaining)
  391. {
  392. state[index] = pending_request;
  393. request_time[index] = msTick();
  394. }
  395. else
  396. {
  397. state[index] = 0;
  398. timeouts[index] = 0;
  399. }
  400. }
  401. if (udpTraceLevel > 3)
  402. DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
  403. if (udpSendCompletedInData)
  404. {
  405. if (dataRemaining)
  406. {
  407. // MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
  408. // though there is a possible race to consider
  409. if (!moreRequested)
  410. parent.sendRequest(index, flow_t::request_to_send);
  411. }
  412. }
  413. else
  414. parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
  415. }
  416. void sendRequest(unsigned index)
  417. {
  418. if (udpTraceLevel > 3)
  419. DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
  420. CriticalBlock b(cr);
  421. parent.sendRequest(index, flow_t::request_to_send);
  422. state[index] = pending_request;
  423. request_time[index] = msTick();
  424. }
  425. void abort(unsigned index)
  426. {
  427. if (udpTraceLevel > 3)
  428. DBGLOG("UdpSender: abort sending queued data to node=%u", index);
  429. CriticalBlock b(cr);
  430. state[index] = 0;
  431. timeouts[index] = 0;
  432. parent.abortData(index);
  433. }
  434. public:
  435. send_send_flow(CSendManager &_parent, unsigned numNodes)
  436. : StartedThread("UdpLib::send_send_flow"), parent(_parent)
  437. {
  438. target_count = numNodes;
  439. state = new char [target_count];
  440. memset(state, 0, target_count);
  441. timeouts = new unsigned char [target_count];
  442. memset(timeouts, 0, target_count);
  443. request_time = new unsigned [target_count];
  444. memset(request_time, 0, sizeof(unsigned) * target_count);
  445. start();
  446. }
  447. ~send_send_flow()
  448. {
  449. running = false;
  450. sem.signal();
  451. join();
  452. delete [] state;
  453. delete [] timeouts;
  454. delete [] request_time;
  455. }
  456. void clear_to_send_received(unsigned index)
  457. {
  458. CriticalBlock b(cr);
  459. state[index] = sending_data;
  460. }
  461. void send_done(unsigned index, bool moreRequested)
  462. {
  463. CriticalBlock b(cr);
  464. state[index] = moreRequested ? completed_more : completed;
  465. sem.signal();
  466. }
  467. void data_added(unsigned index)
  468. {
  469. CriticalBlock b(cr);
  470. // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
  471. // Because done() checks to see if any data pending and re-calls data_added, we get away with it
  472. // Using bits sounds more sensible though?
  473. // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
  474. if (!state[index]) // MORE - should just test the bit?
  475. {
  476. state[index] = new_request;
  477. if (udpTraceLevel > 3)
  478. DBGLOG("UdpSender: state set to new_request for node=%u", index);
  479. sem.signal();
  480. }
  481. }
  482. };
  483. class send_receive_flow : public StartedThread
  484. {
  485. CSendManager &parent;
  486. int receive_port;
  487. Owned<ISocket> flow_socket;
  488. public:
  489. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  490. {
  491. receive_port = r_port;
  492. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  493. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  494. flow_socket.setown(ISocket::udp_create(receive_port));
  495. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  496. size32_t actualSize = flow_socket->get_receive_buffer_size();
  497. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  498. start();
  499. }
  500. ~send_receive_flow()
  501. {
  502. running = false;
  503. if (flow_socket)
  504. flow_socket->close();
  505. join();
  506. }
  507. virtual int doRun()
  508. {
  509. if (udpTraceLevel > 0)
  510. DBGLOG("UdpSender: send_receive_flow started");
  511. #ifdef __linux__
  512. setLinuxThreadPriority(2);
  513. #endif
  514. while(running)
  515. {
  516. UdpPermitToSendMsg f;
  517. while (running)
  518. {
  519. try
  520. {
  521. unsigned int res ;
  522. flow_socket->read(&f, 1, sizeof(f), res, 5);
  523. assertex(res == f.length);
  524. #ifdef CRC_MESSAGES
  525. assertex(f.hdr.crc == f.calcCRC());
  526. #endif
  527. switch (f.cmd)
  528. {
  529. case flow_t::ok_to_send:
  530. if (udpTraceLevel > 1)
  531. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
  532. parent.data->ok_to_send(f);
  533. break;
  534. default:
  535. DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
  536. }
  537. }
  538. catch (IException *e)
  539. {
  540. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  541. {
  542. StringBuffer s;
  543. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
  544. }
  545. e->Release();
  546. }
  547. catch (...)
  548. {
  549. if (running)
  550. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  551. MilliSleep(0);
  552. }
  553. }
  554. }
  555. return 0;
  556. }
  557. };
  558. class send_data : public StartedThread
  559. {
  560. CSendManager &parent;
  561. ISocket *sniffer_socket;
  562. SocketEndpoint ep;
  563. simple_queue<UdpPermitToSendMsg> send_queue;
  564. Linked<TokenBucket> bucket;
  565. void send_sniff(bool busy)
  566. {
  567. unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
  568. sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
  569. try
  570. {
  571. if (!sniffer_socket)
  572. {
  573. sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
  574. if (udpTraceLevel > 1)
  575. {
  576. StringBuffer url;
  577. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  578. }
  579. }
  580. sniffer_socket->write(&msg, sizeof(msg));
  581. if (udpTraceLevel > 1)
  582. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  583. }
  584. catch(IException *e)
  585. {
  586. StringBuffer s;
  587. StringBuffer url;
  588. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
  589. e->Release();
  590. }
  591. catch(...)
  592. {
  593. StringBuffer url;
  594. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  595. if (sniffer_socket)
  596. {
  597. sniffer_socket->Release();
  598. sniffer_socket = NULL;
  599. }
  600. }
  601. }
  602. public:
  603. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  604. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  605. {
  606. sniffer_socket = NULL;
  607. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  608. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  609. start();
  610. }
  611. ~send_data()
  612. {
  613. running = false;
  614. UdpPermitToSendMsg dummy;
  615. send_queue.push(dummy);
  616. join();
  617. if (sniffer_socket)
  618. sniffer_socket->Release();
  619. }
  620. bool ok_to_send(const UdpPermitToSendMsg &msg)
  621. {
  622. if (send_queue.push(msg, 15))
  623. return true;
  624. else
  625. {
  626. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
  627. return false;
  628. }
  629. }
  630. virtual int doRun()
  631. {
  632. if (udpTraceLevel > 0)
  633. DBGLOG("UdpSender: send_data started");
  634. #ifdef __linux__
  635. setLinuxThreadPriority(1); // MORE - windows?
  636. #endif
  637. UdpPermitToSendMsg permit;
  638. while (running)
  639. {
  640. send_queue.pop(permit);
  641. if (!running)
  642. return 0;
  643. if (udpSnifferEnabled)
  644. send_sniff(true);
  645. parent.send_flow->clear_to_send_received(permit.destNodeIndex);
  646. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
  647. bool moreRequested;
  648. unsigned maxPackets;
  649. unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
  650. if (udpSendCompletedInData && !maxPackets)
  651. parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
  652. parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
  653. if (udpSnifferEnabled)
  654. send_sniff(false);
  655. if (udpTraceLevel > 1)
  656. DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
  657. }
  658. if (udpTraceLevel > 0)
  659. DBGLOG("UdpSender: send_data stopped");
  660. return 0;
  661. }
  662. };
  663. friend class send_send_flow;
  664. friend class send_receive_flow;
  665. friend class send_data;
  666. unsigned numNodes;
  667. int receive_flow_port;
  668. int send_flow_port;
  669. int data_port;
  670. unsigned myNodeIndex;
  671. unsigned numQueues;
  672. UdpReceiverEntry *receiversTable;
  673. send_send_flow *send_flow;
  674. send_receive_flow *receive_flow;
  675. send_data *data;
  676. Linked<TokenBucket> bucket;
  677. SpinLock msgSeqLock;
  678. unsigned msgSeq;
  679. static bool comparePacket(void *pkData, void *key)
  680. {
  681. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  682. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  683. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  684. }
  685. inline unsigned getNextMessageSequence()
  686. {
  687. SpinBlock b(msgSeqLock);
  688. unsigned res = ++msgSeq;
  689. if (!res)
  690. res = ++msgSeq;
  691. return res;
  692. }
  693. public:
  694. IMPLEMENT_IINTERFACE;
  695. CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
  696. : bucket(_bucket)
  697. {
  698. #ifndef _WIN32
  699. setpriority(PRIO_PROCESS, 0, -3);
  700. #endif
  701. numNodes = getNumNodes();
  702. receive_flow_port = client_flow_port;
  703. send_flow_port = server_flow_port;
  704. data_port = d_port;
  705. myNodeIndex = _myNodeIndex;
  706. numQueues = _numQueues;
  707. receiversTable = new UdpReceiverEntry[numNodes];
  708. for (unsigned i = 0; i < numNodes; i++)
  709. receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
  710. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  711. send_flow = new send_send_flow(*this, numNodes);
  712. receive_flow = new send_receive_flow(*this, client_flow_port);
  713. msgSeq = 0;
  714. }
  715. ~CSendManager()
  716. {
  717. delete []receiversTable;
  718. delete send_flow;
  719. delete receive_flow;
  720. delete data;
  721. }
  722. void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
  723. {
  724. // NOTE: takes ownership of the DataBuffer
  725. assert(queue < numQueues);
  726. assert(destNodeIndex < numNodes);
  727. receiversTable[destNodeIndex].pushData(queue, buffer);
  728. send_flow->data_added(destNodeIndex);
  729. }
  730. inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
  731. {
  732. receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
  733. }
  734. bool dataQueued(unsigned destIndex)
  735. {
  736. return receiversTable[destIndex].dataQueued();
  737. }
  738. bool abortData(unsigned destIndex)
  739. {
  740. return receiversTable[destIndex].removeData(NULL, NULL);
  741. }
  742. // Interface ISendManager
  743. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
  744. {
  745. if (destNodeIndex >= numNodes)
  746. throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
  747. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
  748. }
  749. virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
  750. {
  751. UdpPacketHeader pkHdr;
  752. pkHdr.ruid = ruid;
  753. pkHdr.msgId = msgId;
  754. return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
  755. }
  756. virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
  757. {
  758. UdpPacketHeader pkHdr;
  759. pkHdr.ruid = ruid;
  760. pkHdr.msgId = msgId;
  761. return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
  762. }
  763. virtual bool allDone()
  764. {
  765. for (unsigned i = 0; i < numNodes; i++)
  766. {
  767. if (receiversTable[i].dataQueued())
  768. return false;
  769. }
  770. return true;
  771. }
  772. };
  773. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
  774. {
  775. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
  776. }
  777. class CMessagePacker : implements IMessagePacker, public CInterface
  778. {
  779. ISendManager &parent;
  780. unsigned destNodeIndex;
  781. UdpPacketHeader package_header;
  782. DataBuffer *part_buffer;
  783. unsigned data_buffer_size;
  784. unsigned data_used;
  785. void *mem_buffer;
  786. unsigned mem_buffer_size;
  787. unsigned totalSize;
  788. bool packed_request;
  789. MemoryBuffer metaInfo;
  790. bool last_message_done;
  791. int queue_number;
  792. public:
  793. IMPLEMENT_IINTERFACE;
  794. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  795. : parent(_parent)
  796. {
  797. queue_number = _queue;
  798. destNodeIndex = _destNode;
  799. package_header.length = 0; // filled in with proper value later
  800. package_header.metalength = 0;
  801. package_header.ruid = ruid;
  802. package_header.msgId = msgId;
  803. package_header.pktSeq = 0;
  804. package_header.nodeIndex = _sourceNode;
  805. package_header.msgSeq = _msgSeq;
  806. package_header.udpSequence = 0; // these are allocated when transmitted
  807. packed_request = false;
  808. part_buffer = bufferManager->allocate();
  809. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  810. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  811. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  812. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  813. data_used = headerSize + sizeof(unsigned short);
  814. mem_buffer = 0;
  815. mem_buffer_size = 0;
  816. last_message_done = false;
  817. totalSize = 0;
  818. if (udpTraceLevel >= 40)
  819. DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
  820. }
  821. ~CMessagePacker()
  822. {
  823. if (part_buffer)
  824. part_buffer->Release();
  825. if (mem_buffer) free (mem_buffer);
  826. if (udpTraceLevel >= 40)
  827. {
  828. DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
  829. package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
  830. }
  831. }
  832. virtual void *getBuffer(unsigned len, bool variable)
  833. {
  834. if (variable)
  835. len += sizeof(RecordLengthType);
  836. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  837. {
  838. // Won't fit in one, so allocate temp location
  839. if (mem_buffer_size < len)
  840. {
  841. free(mem_buffer);
  842. mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
  843. mem_buffer_size = len;
  844. }
  845. packed_request = false;
  846. if (variable)
  847. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  848. else
  849. return mem_buffer;
  850. }
  851. if (part_buffer && ((data_buffer_size - data_used) < len))
  852. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  853. if (!part_buffer)
  854. {
  855. part_buffer = bufferManager->allocate();
  856. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  857. }
  858. packed_request = true;
  859. if (variable)
  860. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  861. else
  862. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  863. }
  864. virtual void putBuffer(const void *buf, unsigned len, bool variable)
  865. {
  866. if (variable)
  867. {
  868. assertex(len < MAX_RECORD_LENGTH);
  869. buf = ((char *) buf) - sizeof(RecordLengthType);
  870. *(RecordLengthType *) buf = len;
  871. len += sizeof(RecordLengthType);
  872. }
  873. totalSize += len;
  874. if (packed_request)
  875. {
  876. assert(len <= (data_buffer_size - data_used));
  877. data_used += len;
  878. }
  879. else
  880. {
  881. while (len)
  882. {
  883. if (!part_buffer)
  884. {
  885. part_buffer = bufferManager->allocate();
  886. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  887. data_used = 0;
  888. }
  889. unsigned chunkLen = data_buffer_size - data_used;
  890. if (chunkLen > len)
  891. chunkLen = len;
  892. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  893. data_used += chunkLen;
  894. len -= chunkLen;
  895. buf = &(((char*)buf)[chunkLen]);
  896. if (len)
  897. flush(false);
  898. }
  899. }
  900. }
  901. virtual void sendMetaInfo(const void *buf, unsigned len) {
  902. metaInfo.append(len, buf);
  903. }
  904. virtual void flush(bool last_msg = false)
  905. {
  906. if (!last_message_done && last_msg)
  907. {
  908. last_message_done = true;
  909. if (!part_buffer)
  910. part_buffer = bufferManager->allocate();
  911. const char *metaData = metaInfo.toByteArray();
  912. unsigned metaLength = metaInfo.length();
  913. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  914. while (metaLength > maxMetaLength)
  915. {
  916. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  917. put_package(part_buffer, data_used, maxMetaLength);
  918. metaLength -= maxMetaLength;
  919. metaData += maxMetaLength;
  920. data_used = 0;
  921. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  922. part_buffer = bufferManager->allocate();
  923. }
  924. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  925. package_header.pktSeq |= UDP_PACKET_COMPLETE;
  926. put_package(part_buffer, data_used, metaLength);
  927. }
  928. else if (part_buffer)
  929. {
  930. // Just flush current - used when no room for current row
  931. if (data_used)
  932. put_package(part_buffer, data_used, 0); // buffer released in put_package
  933. else
  934. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  935. }
  936. part_buffer = 0;
  937. data_buffer_size = 0;
  938. data_used = 0;
  939. }
  940. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  941. {
  942. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  943. package_header.metalength = metalength;
  944. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  945. parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
  946. if (udpTraceLevel >= 50)
  947. {
  948. if (package_header.length==991)
  949. DBGLOG("NEarly");
  950. DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
  951. package_header.ruid, package_header.msgId, package_header.msgSeq,
  952. package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
  953. }
  954. package_header.pktSeq++;
  955. }
  956. virtual bool dataQueued()
  957. {
  958. return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
  959. }
  960. virtual unsigned size() const
  961. {
  962. return totalSize;
  963. }
  964. };
  965. IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  966. {
  967. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
  968. }