udptrs.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "udpipmap.hpp"
  17. #include "jsocket.hpp"
  18. #include "jlog.hpp"
  19. #include "jencrypt.hpp"
  20. #include "roxie.hpp"
  21. #ifdef _WIN32
  22. #include <winsock.h>
  23. #else
  24. #include <sys/socket.h>
  25. #include <sys/time.h>
  26. #include <sys/resource.h>
  27. #endif
  28. #include <math.h>
  29. #include <atomic>
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
  33. unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
  34. bool udpSnifferEnabled = false;
  35. using roxiemem::DataBuffer;
  36. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  37. // But using them on this side means we guarantee that the packets fit into databuffers on the other side... But so would matching their size
  38. /*
  39. *
  40. * There are 3 threads running to manage the data transfer from agent back to server:
  41. * send_resend_flow
  42. * - checks periodically that nothing is waiting for a "request to send" that timed out
  43. * send_receive_flow
  44. * - waits on socket receiving "ok_to_send" packets from servers
  45. * - updates state of relevant receivers
  46. * - pushes permission tokens to a queue
  47. * send_data
  48. * - waits on queue of permission tokens
  49. * - broadcasts "busy"
  50. * - writes data to server
  51. * - broadcasts "no longer "
  52. * - sends "completed" or "completed but I want to send more" flow message to server
  53. *
  54. * Queueing up data packets is done by the agent worker threads.
  55. * *
  56. *
  57. * Data races to watch for
  58. * 1. Two agent threads add data at same time - only one should sent rts (use atomic_inc for the count)
  59. * 2. We check for timeout and resend rts or fail just as permission comes in
  60. * - resend rts is harmless ?
  61. * - fail is acceptable
  62. * 3. After sending data, we need to decide whether to set state to 'pending' (and send rts) or empty. If we read count, decide it's zero
  63. * and then (before we set state) someone adds data (and sends rts), we must not set state to empty. CAS to set state empty only if
  64. * it's sending_data perhaps?
  65. * 4. While sending data, someone adds new data. They need to send rts and set state to pending whether empty or sending_data
  66. * 5. Do we need sending_data state? Is it the same as empty, really? Is empty the same as 'count==0' ? Do we even need state?
  67. * - send rts whenever incrementing count from zero
  68. * - resend rts if count is non-zero and timed out
  69. * - resend rts if we send data but there is some remaining
  70. */
  71. static byte key[32] = {
  72. 0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
  73. 0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
  74. };
  75. class UdpReceiverEntry : public IUdpReceiverEntry
  76. {
  77. private:
  78. queue_t *output_queue = nullptr;
  79. bool initialized = false;
  80. const bool isLocal = false;
  81. const bool encrypted = false;
  82. ISocket *send_flow_socket = nullptr;
  83. ISocket *data_socket = nullptr;
  84. const unsigned numQueues;
  85. int current_q = 0;
  86. int currentQNumPkts = 0; // Current Queue Number of Consecutive Processed Packets.
  87. int *maxPktsPerQ = nullptr; // to minimise power function re-calc for every packet
  88. void sendRequest(flowType::flowCmd cmd, unsigned packets )
  89. {
  90. UdpRequestToSendMsg msg = { cmd, static_cast<unsigned short>(packets), sourceIP };
  91. try
  92. {
  93. if (udpTraceLevel > 3)
  94. {
  95. StringBuffer s;
  96. DBGLOG("UdpSender: sending flowType::%s msg to node=%s", flowType::name(cmd), ip.getIpText(s).str());
  97. }
  98. send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
  99. }
  100. catch(IException *e)
  101. {
  102. StringBuffer s;
  103. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  104. e->Release();
  105. }
  106. catch (...)
  107. {
  108. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  109. }
  110. }
  111. const IpAddress sourceIP;
  112. public:
  113. const IpAddress ip;
  114. unsigned timeouts = 0; // Number of consecutive timeouts
  115. unsigned requestExpiryTime = 0;
  116. static bool comparePacket(const void *pkData, const void *key)
  117. {
  118. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  119. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  120. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  121. }
  122. std::atomic<unsigned> packetsQueued = { 0 };
  123. void sendDone(unsigned packets)
  124. {
  125. bool dataRemaining = packetsQueued.load(std::memory_order_relaxed);
  126. // If dataRemaining says 0, but someone adds a row in this window, the request_to_send will be sent BEFORE the send_completed
  127. // So long as receiver handles that, are we good?
  128. if (dataRemaining)
  129. {
  130. requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
  131. sendRequest(flowType::request_to_send_more, packets);
  132. }
  133. else
  134. {
  135. requestExpiryTime = 0;
  136. sendRequest(flowType::send_completed, packets);
  137. }
  138. timeouts = 0;
  139. }
  140. void requestToSend()
  141. {
  142. requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
  143. sendRequest(flowType::request_to_send, 0);
  144. }
  145. void requestAcknowledged()
  146. {
  147. if (requestExpiryTime)
  148. requestExpiryTime = msTick() + udpRequestToSendTimeout;
  149. }
  150. // MORE - consider where/if we need critsecs in here!
  151. unsigned sendData(const UdpPermitToSendMsg &permit, TokenBucket *bucket)
  152. {
  153. requestExpiryTime = 0;
  154. unsigned maxPackets = permit.max_data;
  155. std::vector<DataBuffer *> toSend;
  156. unsigned totalSent = 0;
  157. while (toSend.size() < maxPackets && packetsQueued.load(std::memory_order_relaxed))
  158. {
  159. DataBuffer *buffer = popQueuedData();
  160. if (!buffer)
  161. break; // Suggests data was aborted before we got to pop it
  162. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  163. toSend.push_back(buffer);
  164. totalSent += header->length;
  165. #if defined(__linux__) || defined(__APPLE__)
  166. if (isLocal && (totalSent> 100000)) // Avoids sending too fast to local node, for reasons lost in the mists of time
  167. break;
  168. #endif
  169. }
  170. MemoryBuffer encryptBuffer;
  171. for (DataBuffer *buffer: toSend)
  172. {
  173. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  174. unsigned length = header->length;
  175. if (bucket)
  176. {
  177. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  178. bucket->wait((length / 1024)+1);
  179. }
  180. try
  181. {
  182. if (encrypted)
  183. {
  184. aesEncrypt(key, sizeof(key), buffer->data, length, encryptBuffer.clear());
  185. data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
  186. }
  187. else
  188. data_socket->write(buffer->data, length);
  189. }
  190. catch(IException *e)
  191. {
  192. StringBuffer s;
  193. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  194. e->Release();
  195. }
  196. catch(...)
  197. {
  198. DBGLOG("UdpSender: write exception - unknown exception");
  199. }
  200. ::Release(buffer);
  201. }
  202. sendDone(toSend.size());
  203. return totalSent;
  204. }
  205. bool dataQueued(const UdpPacketHeader &key)
  206. {
  207. // Used when a retry packet is received, to determine whether the query is in fact completed
  208. // but just stuck in transit queues
  209. if (packetsQueued.load(std::memory_order_relaxed))
  210. {
  211. for (unsigned i = 0; i < numQueues; i++)
  212. {
  213. if (output_queue[i].dataQueued(&key, &comparePacket))
  214. return true;
  215. }
  216. }
  217. return false;
  218. }
  219. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  220. {
  221. // Used after receiving an abort, to avoid sending data that is no longer required
  222. unsigned removed = 0;
  223. if (packetsQueued.load(std::memory_order_relaxed))
  224. {
  225. for (unsigned i = 0; i < numQueues; i++)
  226. {
  227. removed += output_queue[i].removeData(key, pkCmpFn);
  228. }
  229. packetsQueued -= removed;
  230. }
  231. return removed > 0;
  232. }
  233. void abort()
  234. {
  235. // Called if too many timeouts on a request to send
  236. if (udpTraceLevel > 3)
  237. {
  238. StringBuffer s;
  239. DBGLOG("UdpSender: abort sending queued data to node=%s", ip.getIpText(s).str());
  240. }
  241. timeouts = 0;
  242. requestExpiryTime = 0;
  243. removeData(nullptr, nullptr);
  244. }
  245. inline void pushData(unsigned queue, DataBuffer *buffer)
  246. {
  247. output_queue[queue].pushOwn(buffer);
  248. if (!packetsQueued++)
  249. requestToSend();
  250. }
  251. DataBuffer *popQueuedData()
  252. {
  253. DataBuffer *buffer;
  254. for (unsigned i = 0; i < numQueues; i++)
  255. {
  256. if (udpOutQsPriority)
  257. {
  258. buffer = output_queue[current_q].pop(false);
  259. if (!buffer)
  260. {
  261. if (udpTraceLevel >= 5)
  262. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  263. currentQNumPkts = 0;
  264. current_q = (current_q + 1) % numQueues;
  265. }
  266. else
  267. {
  268. currentQNumPkts++;
  269. if (udpTraceLevel >= 5)
  270. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  271. if (currentQNumPkts >= maxPktsPerQ[current_q])
  272. {
  273. currentQNumPkts = 0;
  274. current_q = (current_q + 1) % numQueues;
  275. }
  276. packetsQueued--;
  277. return buffer;
  278. }
  279. }
  280. else
  281. {
  282. current_q = (current_q + 1) % numQueues;
  283. buffer = output_queue[current_q].pop(false);
  284. if (buffer)
  285. {
  286. packetsQueued--;
  287. return buffer;
  288. }
  289. }
  290. }
  291. // If we get here, it suggests we were told to get a buffer but no queue has one.
  292. // This should be rare but possible if data gets removed following an abort, as
  293. // there is a window in abort() between the remove and the decrement of packetsQueued.
  294. return nullptr;
  295. }
  296. UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort, bool _encrypted)
  297. : ip (_ip), sourceIP(_sourceIP), numQueues(_numQueues), isLocal(_ip.isLocal()), encrypted(_encrypted)
  298. {
  299. assert(!initialized);
  300. assert(numQueues > 0);
  301. if (!ip.isNull())
  302. {
  303. try
  304. {
  305. SocketEndpoint sendFlowEp(_sendFlowPort, ip);
  306. SocketEndpoint dataEp(_dataPort, ip);
  307. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  308. data_socket = ISocket::udp_connect(dataEp);
  309. if (isLocal)
  310. {
  311. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  312. if (udpTraceLevel > 0)
  313. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  314. }
  315. }
  316. catch(IException *e)
  317. {
  318. StringBuffer error, ipstr;
  319. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  320. throw;
  321. }
  322. catch(...)
  323. {
  324. StringBuffer ipstr;
  325. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  326. throw;
  327. }
  328. output_queue = new queue_t[numQueues];
  329. maxPktsPerQ = new int[numQueues];
  330. for (unsigned j = 0; j < numQueues; j++)
  331. {
  332. output_queue[j].set_queue_size(_queueSize);
  333. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  334. }
  335. initialized = true;
  336. if (udpTraceLevel > 0)
  337. {
  338. StringBuffer ipStr;
  339. DBGLOG("UdpSender: added entry for ip=%s to receivers table - send_flow_port=%d", ip.getIpText(ipStr).str(), _sendFlowPort);
  340. }
  341. }
  342. }
  343. ~UdpReceiverEntry()
  344. {
  345. if (send_flow_socket) send_flow_socket->Release();
  346. if (data_socket) data_socket->Release();
  347. if (output_queue) delete [] output_queue;
  348. if (maxPktsPerQ) delete [] maxPktsPerQ;
  349. }
  350. };
  351. class CSendManager : implements ISendManager, public CInterface
  352. {
  353. class StartedThread : public Thread
  354. {
  355. private:
  356. Semaphore started;
  357. virtual int run()
  358. {
  359. started.signal();
  360. return doRun();
  361. }
  362. protected:
  363. bool running;
  364. public:
  365. StartedThread(const char *name) : Thread(name)
  366. {
  367. running = false;
  368. }
  369. ~StartedThread()
  370. {
  371. running = false;
  372. join();
  373. }
  374. virtual void start()
  375. {
  376. running = true;
  377. Thread::start();
  378. started.wait();
  379. }
  380. virtual int doRun() = 0;
  381. };
  382. class send_resend_flow : public StartedThread
  383. {
  384. // Check if any senders have timed out
  385. CSendManager &parent;
  386. Semaphore terminated;
  387. virtual int doRun() override
  388. {
  389. if (udpTraceLevel > 0)
  390. DBGLOG("UdpSender: send_resend_flow started");
  391. unsigned timeout = udpRequestToSendTimeout;
  392. while (running)
  393. {
  394. if (terminated.wait(timeout) || !running)
  395. break;
  396. unsigned now = msTick();
  397. timeout = udpRequestToSendTimeout;
  398. for (auto&& dest: parent.receiversTable)
  399. {
  400. unsigned expireTime = dest.requestExpiryTime;
  401. if (expireTime)
  402. {
  403. if (expireTime <= now)
  404. {
  405. dest.timeouts++;
  406. {
  407. StringBuffer s;
  408. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%s",
  409. dest.timeouts, udpMaxRetryTimedoutReqs, dest.ip.getIpText(s).str());
  410. }
  411. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  412. if (udpMaxRetryTimedoutReqs && (dest.timeouts >= udpMaxRetryTimedoutReqs))
  413. dest.abort();
  414. else
  415. dest.requestToSend();
  416. }
  417. else if (expireTime-now < timeout)
  418. timeout = expireTime-now;
  419. }
  420. }
  421. }
  422. return 0;
  423. }
  424. public:
  425. send_resend_flow(CSendManager &_parent)
  426. : StartedThread("UdpLib::send_resend_flow"), parent(_parent)
  427. {
  428. start();
  429. }
  430. ~send_resend_flow()
  431. {
  432. running = false;
  433. terminated.signal();
  434. join();
  435. }
  436. };
  437. class send_receive_flow : public StartedThread
  438. {
  439. CSendManager &parent;
  440. int receive_port;
  441. Owned<ISocket> flow_socket;
  442. public:
  443. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  444. {
  445. receive_port = r_port;
  446. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  447. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  448. flow_socket.setown(ISocket::udp_create(receive_port));
  449. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  450. size32_t actualSize = flow_socket->get_receive_buffer_size();
  451. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  452. start();
  453. }
  454. ~send_receive_flow()
  455. {
  456. running = false;
  457. if (flow_socket)
  458. flow_socket->close();
  459. join();
  460. }
  461. virtual int doRun()
  462. {
  463. if (udpTraceLevel > 0)
  464. DBGLOG("UdpSender: send_receive_flow started");
  465. #ifdef __linux__
  466. setLinuxThreadPriority(2);
  467. #endif
  468. while(running)
  469. {
  470. UdpPermitToSendMsg f = { flowType::ok_to_send, 0, { } };
  471. while (running)
  472. {
  473. try
  474. {
  475. unsigned int res ;
  476. flow_socket->read(&f, sizeof(f), sizeof(f), res, 5);
  477. assert(res==sizeof(f));
  478. switch (f.cmd)
  479. {
  480. case flowType::ok_to_send:
  481. if (udpTraceLevel > 1)
  482. {
  483. StringBuffer s;
  484. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%s", f.max_data, f.destNode.getTraceText(s).str());
  485. }
  486. parent.data->ok_to_send(f);
  487. break;
  488. case flowType::request_received:
  489. if (udpTraceLevel > 1)
  490. {
  491. StringBuffer s;
  492. DBGLOG("UdpSender: received request_received msg from node=%s", f.destNode.getTraceText(s).str());
  493. }
  494. parent.receiversTable[f.destNode].requestAcknowledged();
  495. break;
  496. default:
  497. DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
  498. }
  499. }
  500. catch (IException *e)
  501. {
  502. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  503. {
  504. StringBuffer s;
  505. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
  506. }
  507. e->Release();
  508. }
  509. catch (...)
  510. {
  511. if (running)
  512. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  513. MilliSleep(0);
  514. }
  515. }
  516. }
  517. return 0;
  518. }
  519. };
  520. class send_data : public StartedThread
  521. {
  522. CSendManager &parent;
  523. ISocket *sniffer_socket;
  524. SocketEndpoint ep;
  525. simple_queue<UdpPermitToSendMsg> send_queue;
  526. Linked<TokenBucket> bucket;
  527. void send_sniff(sniffType::sniffCmd busy)
  528. {
  529. sniff_msg msg = { busy, parent.myIP};
  530. try
  531. {
  532. if (!sniffer_socket)
  533. {
  534. sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
  535. if (udpTraceLevel > 1)
  536. {
  537. StringBuffer url;
  538. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  539. }
  540. }
  541. sniffer_socket->write(&msg, sizeof(msg));
  542. if (udpTraceLevel > 1)
  543. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  544. }
  545. catch(IException *e)
  546. {
  547. StringBuffer s;
  548. StringBuffer url;
  549. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
  550. e->Release();
  551. }
  552. catch(...)
  553. {
  554. StringBuffer url;
  555. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  556. if (sniffer_socket)
  557. {
  558. sniffer_socket->Release();
  559. sniffer_socket = NULL;
  560. }
  561. }
  562. }
  563. public:
  564. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  565. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  566. {
  567. sniffer_socket = NULL;
  568. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  569. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  570. start();
  571. }
  572. ~send_data()
  573. {
  574. running = false;
  575. UdpPermitToSendMsg dummy = {};
  576. send_queue.push(dummy);
  577. join();
  578. if (sniffer_socket)
  579. sniffer_socket->Release();
  580. }
  581. bool ok_to_send(const UdpPermitToSendMsg &msg)
  582. {
  583. if (send_queue.push(msg, 15))
  584. return true;
  585. else
  586. {
  587. StringBuffer s;
  588. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - node=%s, maxData=%u", msg.destNode.getTraceText(s).str(), msg.max_data);
  589. return false;
  590. }
  591. }
  592. virtual int doRun()
  593. {
  594. if (udpTraceLevel > 0)
  595. DBGLOG("UdpSender: send_data started");
  596. #ifdef __linux__
  597. setLinuxThreadPriority(1); // MORE - windows? Is this even a good idea? Must not send faster than receiver can pull off the socket
  598. #endif
  599. UdpPermitToSendMsg permit;
  600. while (running)
  601. {
  602. send_queue.pop(permit);
  603. if (!running)
  604. return 0;
  605. if (udpSnifferEnabled)
  606. send_sniff(sniffType::busy);
  607. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode];
  608. unsigned payload = receiverInfo.sendData(permit, bucket);
  609. if (udpSnifferEnabled)
  610. send_sniff(sniffType::idle);
  611. if (udpTraceLevel > 1)
  612. {
  613. StringBuffer s;
  614. DBGLOG("UdpSender: sent %u bytes to node=%s", payload, permit.destNode.getTraceText(s).str());
  615. }
  616. }
  617. if (udpTraceLevel > 0)
  618. DBGLOG("UdpSender: send_data stopped");
  619. return 0;
  620. }
  621. };
  622. friend class send_resend_flow;
  623. friend class send_receive_flow;
  624. friend class send_data;
  625. unsigned numQueues;
  626. IpMapOf<UdpReceiverEntry> receiversTable;
  627. send_resend_flow *resend_flow;
  628. send_receive_flow *receive_flow;
  629. send_data *data;
  630. Linked<TokenBucket> bucket;
  631. IpAddress myIP;
  632. std::atomic<unsigned> msgSeq{0};
  633. inline unsigned getNextMessageSequence()
  634. {
  635. unsigned res;
  636. do
  637. {
  638. res = ++msgSeq;
  639. } while (unlikely(!res));
  640. return res;
  641. }
  642. public:
  643. IMPLEMENT_IINTERFACE;
  644. CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool encrypted)
  645. : bucket(_bucket),
  646. myIP(_myIP),
  647. receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port, encrypted](const ServerIdentifier &ip) { return new UdpReceiverEntry(ip.getIpAddress(), _myIP, _numQueues, q_size, server_flow_port, data_port, encrypted);})
  648. {
  649. #ifndef _WIN32
  650. setpriority(PRIO_PROCESS, 0, -3);
  651. #endif
  652. numQueues = _numQueues;
  653. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  654. resend_flow = new send_resend_flow(*this);
  655. receive_flow = new send_receive_flow(*this, client_flow_port);
  656. }
  657. ~CSendManager()
  658. {
  659. delete resend_flow;
  660. delete receive_flow;
  661. delete data;
  662. }
  663. // Interface ISendManager
  664. virtual void writeOwn(IUdpReceiverEntry &receiver, DataBuffer *buffer, unsigned len, unsigned queue) override
  665. {
  666. // NOTE: takes ownership of the DataBuffer
  667. assert(queue < numQueues);
  668. static_cast<UdpReceiverEntry &>(receiver).pushData(queue, buffer);
  669. }
  670. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
  671. {
  672. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[destNode], myIP, getNextMessageSequence(), queue);
  673. }
  674. virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
  675. {
  676. UdpPacketHeader pkHdr;
  677. pkHdr.ruid = ruid;
  678. pkHdr.msgId = msgId;
  679. return receiversTable[destNode].dataQueued(pkHdr);
  680. }
  681. virtual bool abortData(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode)
  682. {
  683. UdpPacketHeader pkHdr;
  684. pkHdr.ruid = ruid;
  685. pkHdr.msgId = msgId;
  686. return receiversTable[destNode].removeData((void*) &pkHdr, &UdpReceiverEntry::comparePacket);
  687. }
  688. virtual bool allDone()
  689. {
  690. // Used for some timing tests only
  691. for (auto&& dest: receiversTable)
  692. {
  693. if (dest.packetsQueued.load(std::memory_order_relaxed))
  694. return false;
  695. }
  696. return true;
  697. }
  698. };
  699. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit)
  700. {
  701. assertex(!myNode.getIpAddress().isNull());
  702. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter, encryptionInTransit);
  703. }
  704. class CMessagePacker : implements IMessagePacker, public CInterface
  705. {
  706. ISendManager &parent;
  707. IUdpReceiverEntry &receiver;
  708. UdpPacketHeader package_header;
  709. DataBuffer *part_buffer;
  710. unsigned data_buffer_size;
  711. unsigned data_used;
  712. void *mem_buffer;
  713. unsigned mem_buffer_size;
  714. unsigned totalSize;
  715. bool packed_request;
  716. MemoryBuffer metaInfo;
  717. bool last_message_done;
  718. int queue_number;
  719. public:
  720. IMPLEMENT_IINTERFACE;
  721. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
  722. : parent(_parent), receiver(_receiver)
  723. {
  724. queue_number = _queue;
  725. package_header.length = 0; // filled in with proper value later
  726. package_header.metalength = 0;
  727. package_header.ruid = ruid;
  728. package_header.msgId = msgId;
  729. package_header.pktSeq = 0;
  730. package_header.node.setIp(_sourceNode);
  731. package_header.msgSeq = _msgSeq;
  732. packed_request = false;
  733. part_buffer = bufferManager->allocate();
  734. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  735. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  736. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  737. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  738. data_used = headerSize + sizeof(unsigned short);
  739. mem_buffer = 0;
  740. mem_buffer_size = 0;
  741. last_message_done = false;
  742. totalSize = 0;
  743. }
  744. ~CMessagePacker()
  745. {
  746. if (part_buffer)
  747. part_buffer->Release();
  748. if (mem_buffer) free (mem_buffer);
  749. }
  750. virtual void *getBuffer(unsigned len, bool variable) override
  751. {
  752. if (variable)
  753. len += sizeof(RecordLengthType);
  754. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  755. {
  756. // Won't fit in one, so allocate temp location
  757. if (mem_buffer_size < len)
  758. {
  759. free(mem_buffer);
  760. mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
  761. mem_buffer_size = len;
  762. }
  763. packed_request = false;
  764. if (variable)
  765. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  766. else
  767. return mem_buffer;
  768. }
  769. if (part_buffer && ((data_buffer_size - data_used) < len))
  770. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  771. if (!part_buffer)
  772. {
  773. part_buffer = bufferManager->allocate();
  774. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  775. }
  776. packed_request = true;
  777. if (variable)
  778. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  779. else
  780. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  781. }
  782. virtual void putBuffer(const void *buf, unsigned len, bool variable) override
  783. {
  784. if (variable)
  785. {
  786. assertex(len < MAX_RECORD_LENGTH);
  787. buf = ((char *) buf) - sizeof(RecordLengthType);
  788. *(RecordLengthType *) buf = len;
  789. len += sizeof(RecordLengthType);
  790. }
  791. totalSize += len;
  792. if (packed_request)
  793. {
  794. assert(len <= (data_buffer_size - data_used));
  795. data_used += len;
  796. }
  797. else
  798. {
  799. while (len)
  800. {
  801. if (!part_buffer)
  802. {
  803. part_buffer = bufferManager->allocate();
  804. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  805. data_used = 0;
  806. }
  807. unsigned chunkLen = data_buffer_size - data_used;
  808. if (chunkLen > len)
  809. chunkLen = len;
  810. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  811. data_used += chunkLen;
  812. len -= chunkLen;
  813. buf = &(((char*)buf)[chunkLen]);
  814. if (len)
  815. flush(false);
  816. }
  817. }
  818. }
  819. virtual void sendMetaInfo(const void *buf, unsigned len) override {
  820. metaInfo.append(len, buf);
  821. }
  822. virtual void flush() override { flush(true); }
  823. virtual unsigned size() const override
  824. {
  825. return totalSize;
  826. }
  827. private:
  828. void flush(bool last_msg)
  829. {
  830. if (!last_message_done && last_msg)
  831. {
  832. last_message_done = true;
  833. if (!part_buffer)
  834. part_buffer = bufferManager->allocate();
  835. const char *metaData = metaInfo.toByteArray();
  836. unsigned metaLength = metaInfo.length();
  837. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  838. while (metaLength > maxMetaLength)
  839. {
  840. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  841. put_package(part_buffer, data_used, maxMetaLength);
  842. metaLength -= maxMetaLength;
  843. metaData += maxMetaLength;
  844. data_used = 0;
  845. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  846. part_buffer = bufferManager->allocate();
  847. }
  848. if (metaLength)
  849. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  850. package_header.pktSeq |= UDP_PACKET_COMPLETE;
  851. put_package(part_buffer, data_used, metaLength);
  852. }
  853. else if (part_buffer)
  854. {
  855. // Just flush current - used when no room for current row
  856. if (data_used)
  857. put_package(part_buffer, data_used, 0); // buffer released in put_package
  858. else
  859. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  860. }
  861. part_buffer = 0;
  862. data_buffer_size = 0;
  863. data_used = 0;
  864. }
  865. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  866. {
  867. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  868. package_header.metalength = metalength;
  869. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  870. parent.writeOwn(receiver, dataBuff, package_header.length, queue_number);
  871. package_header.pktSeq++;
  872. }
  873. };
  874. extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
  875. {
  876. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _receiver, _sourceNode, _msgSeq, _queue);
  877. }