udptrs.cpp 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "udpipmap.hpp"
  17. #include "jsocket.hpp"
  18. #include "jlog.hpp"
  19. #include "jencrypt.hpp"
  20. #include "jsecrets.hpp"
  21. #include "roxie.hpp"
  22. #ifdef _WIN32
  23. #include <winsock.h>
  24. #else
  25. #include <sys/socket.h>
  26. #include <sys/time.h>
  27. #include <sys/resource.h>
  28. #endif
  29. #include <math.h>
  30. #include <atomic>
  31. unsigned udpOutQsPriority = 0;
  32. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  33. unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
  34. unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
  35. using roxiemem::DataBuffer;
  36. /*
  37. *
  38. * There are 3 threads running to manage the data transfer from agent back to server:
  39. * send_resend_flow
  40. * - checks periodically that nothing is waiting for a "request to send" that timed out
  41. * send_receive_flow
  42. * - waits on socket receiving "ok_to_send" packets from servers
  43. * - updates state of relevant receivers
  44. * - pushes permission tokens to a queue
  45. * send_data
  46. * - waits on queue of permission tokens
  47. * - broadcasts "busy"
  48. * - writes data to server
  49. * - broadcasts "no longer "
  50. * - sends "completed" or "completed but I want to send more" flow message to server
  51. *
  52. * Queueing up data packets is done by the agent worker threads.
  53. * *
  54. *
  55. * Data races to watch for
  56. * 1. Two agent threads add data at same time - only one should sent rts (use atomic inc for the count)
  57. * 2. We check for timeout and resend rts or fail just as permission comes in
  58. * - resend rts is harmless ?
  59. * - fail is acceptable
  60. * 3. After sending data, we need to decide whether to set state to 'pending' (and send rts) or empty. If we read count, decide it's zero
  61. * and then (before we set state) someone adds data (and sends rts), we must not set state to empty. CAS to set state empty only if
  62. * it's sending_data perhaps?
  63. * 4. While sending data, someone adds new data. They need to send rts and set state to pending whether empty or sending_data
  64. * 5. Do we need sending_data state? Is it the same as empty, really? Is empty the same as 'count==0' ? Do we even need state?
  65. * - send rts whenever incrementing count from zero
  66. * - resend rts if count is non-zero and timed out
  67. * - resend rts if we send data but there is some remaining
  68. */
  69. // UdpResentList keeps a copy of up to TRACKER_BITS previously sent packets so we can send them again
  70. RelaxedAtomic<unsigned> packetsResent;
  71. RelaxedAtomic<unsigned> flowRequestsSent;
  72. RelaxedAtomic<unsigned> flowPermitsReceived;
  73. RelaxedAtomic<unsigned> dataPacketsSent;
  74. unsigned udpResendTimeout = 10; // in millseconds
  75. bool udpResendLostPackets = true;
  76. bool udpAssumeSequential = false;
  77. static unsigned lastResentReport = 0;
  78. static unsigned lastPacketsResent = 0;
  79. static unsigned lastFlowRequestsSent = 0;
  80. static unsigned lastFlowPermitsReceived = 0;
  81. static unsigned lastDataPacketsSent = 0;
  82. class UdpResendList
  83. {
  84. private:
  85. DataBuffer *entries[TRACKER_BITS] = { nullptr };
  86. unsigned timeSent[TRACKER_BITS] = { 0 };
  87. sequence_t first = 0;
  88. unsigned count = 0; // number of non-null entries
  89. public:
  90. void append(DataBuffer *buf)
  91. {
  92. UdpPacketHeader *header = (UdpPacketHeader*) buf->data;
  93. sequence_t seq = header->sendSeq;
  94. header->pktSeq |= UDP_PACKET_RESENT;
  95. if (!count)
  96. {
  97. first = seq;
  98. }
  99. else if (seq - first >= TRACKER_BITS)
  100. {
  101. // This shouldn't happen if we have steps in place to block ending new until we are sure old have been delivered.
  102. throwUnexpected();
  103. }
  104. unsigned idx = seq % TRACKER_BITS;
  105. assert(entries[idx] == nullptr);
  106. entries[idx] = buf;
  107. timeSent[idx] = msTick();
  108. count++;
  109. }
  110. // This function does two things:
  111. // 1. Updates the circular buffer to release any packets that are confirmed delivered
  112. // 2. Appends any packets that need resending to the toSend list
  113. void noteRead(const PacketTracker &seen, std::vector<DataBuffer *> &toSend, unsigned space, unsigned nextSendSequence)
  114. {
  115. if (!count)
  116. return;
  117. unsigned now = msTick();
  118. sequence_t seq = first;
  119. unsigned checked = 0;
  120. bool released = false;
  121. while (checked < count && space)
  122. {
  123. unsigned idx = seq % TRACKER_BITS;
  124. if (entries[idx])
  125. {
  126. UdpPacketHeader *header = (UdpPacketHeader*) entries[idx]->data;
  127. assert(seq == header->sendSeq);
  128. if (seen.hasSeen(header->sendSeq))
  129. {
  130. ::Release(entries[idx]);
  131. entries[idx] = nullptr;
  132. count--;
  133. released = true;
  134. }
  135. else
  136. {
  137. // The current table entry is not marked as seen by receiver. Should we resend it?
  138. if (now-timeSent[idx] >= udpResendTimeout || // Note that this will block us from sending newer packets, if we have reached limit of tracking.
  139. (udpAssumeSequential && (int)(seq - seen.lastSeen()) < 0)) // so we (optionally) assume any packet not received that is EARLIER than one that HAS been received is lost.
  140. {
  141. if (udpTraceLevel > 1 || udpTraceTimeouts)
  142. DBGLOG("Resending %" SEQF "u last sent %u ms ago", seq, now-timeSent[idx]);
  143. timeSent[idx] = now;
  144. packetsResent++;
  145. toSend.push_back(entries[idx]);
  146. space--;
  147. }
  148. checked++;
  149. }
  150. }
  151. seq++;
  152. }
  153. if (released && count)
  154. {
  155. while (entries[first % TRACKER_BITS] == nullptr)
  156. first++;
  157. }
  158. }
  159. unsigned firstTracked() const
  160. {
  161. assert(count); // Meaningless to call this if count is 0
  162. return first;
  163. }
  164. unsigned numActive() const
  165. {
  166. return count;
  167. }
  168. bool canRecord(unsigned seq) const
  169. {
  170. return (count==0 || seq - first < TRACKER_BITS);
  171. }
  172. };
  173. class UdpReceiverEntry : public IUdpReceiverEntry
  174. {
  175. UdpReceiverEntry() = delete;
  176. UdpReceiverEntry ( const UdpReceiverEntry & ) = delete;
  177. private:
  178. queue_t *output_queue = nullptr;
  179. bool initialized = false;
  180. const bool isLocal = false;
  181. const bool encrypted = false;
  182. ISocket *send_flow_socket = nullptr;
  183. ISocket *data_socket = nullptr;
  184. const unsigned numQueues;
  185. int current_q = 0;
  186. int currentQNumPkts = 0; // Current Queue Number of Consecutive Processed Packets.
  187. int *maxPktsPerQ = nullptr; // to minimise power function re-calc for every packet
  188. void sendRequest(UdpRequestToSendMsg &msg)
  189. {
  190. try
  191. {
  192. if (udpTraceLevel > 3 || udpTraceFlow)
  193. {
  194. StringBuffer s, s2;
  195. DBGLOG("UdpSender[%s]: sending flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", msg.sourceNode.getTraceText(s2).str(), flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
  196. }
  197. #ifdef TEST_DROPPED_PACKETS
  198. flowPacketsSent[msg.cmd]++;
  199. if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd] == 0)
  200. {
  201. StringBuffer s, s2;
  202. DBGLOG("UdpSender[%s]: deliberately dropping flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", msg.sourceNode.getTraceText(s2).str(), flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
  203. }
  204. else
  205. #endif
  206. send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
  207. flowRequestsSent++;
  208. }
  209. catch(IException *e)
  210. {
  211. StringBuffer s;
  212. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  213. e->Release();
  214. }
  215. catch (...)
  216. {
  217. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  218. }
  219. }
  220. const IpAddress sourceIP;
  221. UdpResendList *resendList = nullptr;
  222. public:
  223. const IpAddress ip;
  224. unsigned timeouts = 0; // Number of consecutive timeouts
  225. std::atomic<unsigned> requestExpiryTime = { 0 }; // Updated by send_flow thread, read by send_resend thread and send_data thread
  226. static bool comparePacket(const void *pkData, const void *key)
  227. {
  228. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  229. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  230. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  231. }
  232. std::atomic<unsigned> packetsQueued = { 0 };
  233. std::atomic<sequence_t> nextSendSequence = {0};
  234. std::atomic<sequence_t> activeFlowSequence = {0};
  235. CriticalSection activeCrit;
  236. bool hasDataToSend() const
  237. {
  238. return (packetsQueued.load(std::memory_order_relaxed) || (resendList && resendList->numActive()));
  239. }
  240. void sendDone(unsigned packets)
  241. {
  242. //This function has a potential race condition with requestToSendNew:
  243. //packetsQueued must be checked within the critical section to ensure that requestToSend hasn't been called
  244. //between retrieving the count and entering the critical section, otherwise this function will set
  245. //requestExpiryTime to 0 (and indicate the operation is done)even though there packetsQueued is non-zero.
  246. CriticalBlock b(activeCrit);
  247. bool dataRemaining;
  248. if (resendList)
  249. dataRemaining = (packetsQueued.load(std::memory_order_relaxed) && resendList->canRecord(nextSendSequence)) || resendList->numActive();
  250. else
  251. dataRemaining = packetsQueued.load(std::memory_order_relaxed);
  252. // If dataRemaining says 0, but someone adds a row in this window, the request_to_send will be sent BEFORE the send_completed
  253. // So long as receiver handles that, are we good?
  254. UdpRequestToSendMsg msg;
  255. msg.packets = packets; // Note this is how many we sent
  256. msg.sendSeq = nextSendSequence;
  257. msg.sourceNode = sourceIP;
  258. if (dataRemaining && requestExpiryTime) // requestExpiryTime will be non-zero UNLESS someone called abort() just before I got here
  259. {
  260. msg.flowSeq = activeFlowSequence++;
  261. msg.cmd = flowType::request_to_send_more;
  262. requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
  263. }
  264. else
  265. {
  266. msg.flowSeq = activeFlowSequence;
  267. msg.cmd = flowType::send_completed;
  268. requestExpiryTime = 0;
  269. }
  270. sendRequest(msg);
  271. timeouts = 0;
  272. }
  273. void requestToSendNew()
  274. {
  275. //See comment in sendDone() on a potential race condition.
  276. CriticalBlock b(activeCrit);
  277. // This is called from data thread when new data added to a previously-empty list
  278. if (!requestExpiryTime)
  279. {
  280. // If there's already an active request - no need to create a new one
  281. UdpRequestToSendMsg msg;
  282. msg.cmd = flowType::request_to_send;
  283. msg.packets = 0;
  284. msg.sendSeq = nextSendSequence;
  285. msg.flowSeq = ++activeFlowSequence;
  286. msg.sourceNode = sourceIP;
  287. requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
  288. sendRequest(msg);
  289. }
  290. }
  291. void resendRequestToSend()
  292. {
  293. // This is called from timeout thread when a previously-send request has had no response
  294. timeouts++;
  295. if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
  296. {
  297. StringBuffer s;
  298. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (flow=%u, max=%i, timeout=%u, expiryTime=%u) waiting ok_to_send msg from node=%s",
  299. timeouts, activeFlowSequence.load(), udpMaxRetryTimedoutReqs, udpRequestToSendAckTimeout, requestExpiryTime.load(), ip.getIpText(s).str());
  300. }
  301. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  302. CriticalBlock b(activeCrit);
  303. if (udpMaxRetryTimedoutReqs && (timeouts >= udpMaxRetryTimedoutReqs))
  304. {
  305. abort();
  306. return;
  307. }
  308. if (requestExpiryTime)
  309. {
  310. UdpRequestToSendMsg msg;
  311. msg.cmd = flowType::request_to_send;
  312. msg.packets = 0;
  313. msg.sendSeq = nextSendSequence;
  314. msg.flowSeq = activeFlowSequence;
  315. msg.sourceNode = sourceIP;
  316. requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
  317. sendRequest(msg);
  318. }
  319. }
  320. void requestAcknowledged()
  321. {
  322. CriticalBlock b(activeCrit);
  323. if (requestExpiryTime)
  324. requestExpiryTime = msTick() + udpRequestToSendTimeout;
  325. }
  326. unsigned sendData(const UdpPermitToSendMsg &permit, TokenBucket *bucket)
  327. {
  328. #ifdef _DEBUG
  329. // Consistency check
  330. if (permit.destNode.getIpAddress().ipcompare(ip) != 0)
  331. {
  332. StringBuffer p, s;
  333. DBGLOG("UdpFlow: permit ip %s does not match receiver table ip %s", permit.destNode.getTraceText(p).str(), ip.getIpText(s).str());
  334. printStackReport();
  335. }
  336. #endif
  337. if (permit.flowSeq != activeFlowSequence)
  338. {
  339. if (udpTraceLevel>1 || udpTraceFlow)
  340. {
  341. StringBuffer s;
  342. DBGLOG("UdpFlow: ignoring out-of-date permit_to_send seq %" SEQF "u (expected %" SEQF "u) to node %s", permit.flowSeq, activeFlowSequence+0, permit.destNode.getTraceText(s).str());
  343. }
  344. return 0;
  345. }
  346. unsigned maxPackets = permit.max_data;
  347. std::vector<DataBuffer *> toSend;
  348. unsigned totalSent = 0;
  349. unsigned resending = 0;
  350. if (resendList)
  351. {
  352. resendList->noteRead(permit.seen, toSend, maxPackets, nextSendSequence.load(std::memory_order_relaxed));
  353. resending = toSend.size();
  354. maxPackets -= resending;
  355. // Don't send any packet that would end up overwriting an active packet in our resend list
  356. if (resendList->numActive())
  357. {
  358. unsigned inflight = nextSendSequence - resendList->firstTracked();
  359. assert(inflight <= TRACKER_BITS);
  360. if (maxPackets > TRACKER_BITS-inflight)
  361. {
  362. maxPackets = TRACKER_BITS-inflight;
  363. if (udpTraceLevel>2 || maxPackets == 0)
  364. DBGLOG("Can't send more than %d new packets or we will overwrite unreceived packets (%u in flight, %u active %u resending now)", maxPackets, inflight, resendList->numActive(), resending);
  365. // Note that this may mean we can't send any packets, despite having asked for permission to do so
  366. // We will keep on asking.
  367. }
  368. }
  369. }
  370. if (udpTraceLevel>2)
  371. DBGLOG("Resending %u packets", (unsigned) toSend.size());
  372. while (maxPackets && packetsQueued.load(std::memory_order_relaxed))
  373. {
  374. DataBuffer *buffer = popQueuedData();
  375. if (!buffer)
  376. break; // Suggests data was aborted before we got to pop it
  377. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  378. header->sendSeq = nextSendSequence++;
  379. toSend.push_back(buffer);
  380. maxPackets--;
  381. totalSent += header->length;
  382. #if defined(__linux__) || defined(__APPLE__)
  383. if (isLocal && (totalSent> 100000)) // Avoids sending too fast to local node, for reasons lost in the mists of time
  384. break;
  385. #endif
  386. }
  387. MemoryBuffer encryptBuffer;
  388. for (DataBuffer *buffer: toSend)
  389. {
  390. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  391. unsigned length = header->length;
  392. if (bucket)
  393. {
  394. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  395. bucket->wait((length / 1024)+1);
  396. }
  397. try
  398. {
  399. #ifdef TEST_DROPPED_PACKETS
  400. if (udpDropDataPackets && ((header->pktSeq & UDP_PACKET_RESENT)==0) && (header->pktSeq==0 || header->pktSeq==10 || ((header->pktSeq&UDP_PACKET_COMPLETE) != 0)))
  401. DBGLOG("Deliberately dropping packet %" SEQF "u", header->sendSeq);
  402. else
  403. #endif
  404. if (encrypted)
  405. {
  406. encryptBuffer.clear();
  407. encryptBuffer.append(sizeof(UdpPacketHeader), header); // We don't encrypt the header
  408. length -= sizeof(UdpPacketHeader);
  409. const char *data = buffer->data + sizeof(UdpPacketHeader);
  410. const MemoryAttr &udpkey = getSecretUdpKey(true);
  411. aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer);
  412. header->length = encryptBuffer.length();
  413. encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating
  414. assert(length <= DATA_PAYLOAD);
  415. if (udpTraceLevel > 5)
  416. DBGLOG("ENCRYPT: Writing %u bytes to data socket", encryptBuffer.length());
  417. data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
  418. }
  419. else
  420. data_socket->write(buffer->data, length);
  421. dataPacketsSent++;
  422. }
  423. catch(IException *e)
  424. {
  425. StringBuffer s;
  426. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  427. e->Release();
  428. }
  429. catch(...)
  430. {
  431. DBGLOG("UdpSender: write exception - unknown exception");
  432. }
  433. if (resendList)
  434. {
  435. if (resending)
  436. resending--; //Don't add the ones I am resending back onto list - they are still there!
  437. else
  438. resendList->append(buffer);
  439. }
  440. else
  441. ::Release(buffer);
  442. }
  443. sendDone(toSend.size());
  444. return totalSent;
  445. }
  446. bool dataQueued(const UdpPacketHeader &key)
  447. {
  448. // Used when a retry packet is received, to determine whether the query is in fact completed
  449. // but just stuck in transit queues
  450. if (packetsQueued.load(std::memory_order_relaxed))
  451. {
  452. for (unsigned i = 0; i < numQueues; i++)
  453. {
  454. if (output_queue[i].dataQueued(&key, &comparePacket))
  455. return true;
  456. }
  457. }
  458. return false;
  459. }
  460. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  461. {
  462. // Used after receiving an abort, to avoid sending data that is no longer required
  463. // Note that we don't attempt to remove packets that have already been sent from the resend list
  464. unsigned removed = 0;
  465. if (packetsQueued.load(std::memory_order_relaxed))
  466. {
  467. for (unsigned i = 0; i < numQueues; i++)
  468. {
  469. removed += output_queue[i].removeData(key, pkCmpFn);
  470. }
  471. packetsQueued -= removed;
  472. }
  473. return removed > 0;
  474. }
  475. void abort()
  476. {
  477. // Called if too many timeouts on a request to send
  478. if (udpTraceLevel > 3)
  479. {
  480. StringBuffer s;
  481. DBGLOG("UdpSender: abort sending queued data to node=%s", ip.getIpText(s).str());
  482. }
  483. timeouts = 0;
  484. requestExpiryTime = 0;
  485. removeData(nullptr, nullptr);
  486. }
  487. inline void pushData(unsigned queue, DataBuffer *buffer)
  488. {
  489. output_queue[queue].free_slots(); // block until at least one free space
  490. output_queue[queue].pushOwn(buffer);
  491. if (!packetsQueued++)
  492. requestToSendNew();
  493. }
  494. DataBuffer *popQueuedData()
  495. {
  496. DataBuffer *buffer;
  497. for (unsigned i = 0; i < numQueues; i++)
  498. {
  499. if (udpOutQsPriority)
  500. {
  501. buffer = output_queue[current_q].pop(false);
  502. if (!buffer)
  503. {
  504. if (udpTraceLevel >= 5)
  505. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  506. currentQNumPkts = 0;
  507. current_q = (current_q + 1) % numQueues;
  508. }
  509. else
  510. {
  511. currentQNumPkts++;
  512. if (udpTraceLevel >= 5)
  513. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  514. if (currentQNumPkts >= maxPktsPerQ[current_q])
  515. {
  516. currentQNumPkts = 0;
  517. current_q = (current_q + 1) % numQueues;
  518. }
  519. packetsQueued--;
  520. return buffer;
  521. }
  522. }
  523. else
  524. {
  525. current_q = (current_q + 1) % numQueues;
  526. buffer = output_queue[current_q].pop(false);
  527. if (buffer)
  528. {
  529. packetsQueued--;
  530. return buffer;
  531. }
  532. }
  533. }
  534. // If we get here, it suggests we were told to get a buffer but no queue has one.
  535. // This should be rare but possible if data gets removed following an abort, as
  536. // there is a window in abort() between the remove and the decrement of packetsQueued.
  537. return nullptr;
  538. }
  539. UdpReceiverEntry(const IpAddress _ip, const IpAddress _myIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort, bool _encrypted)
  540. : ip (_ip), sourceIP(_myIP), numQueues(_numQueues), isLocal(_ip.isLocal()), encrypted(_encrypted)
  541. {
  542. assert(!initialized);
  543. assert(numQueues > 0);
  544. if (!ip.isNull())
  545. {
  546. try
  547. {
  548. SocketEndpoint sendFlowEp(_sendFlowPort, ip);
  549. SocketEndpoint dataEp(_dataPort, ip);
  550. #ifdef SOCKET_SIMULATION
  551. if (isUdpTestMode)
  552. {
  553. if (udpTestUseUdpSockets)
  554. {
  555. send_flow_socket = CSimulatedUdpWriteSocket::udp_connect(sendFlowEp);
  556. data_socket = CSimulatedUdpWriteSocket::udp_connect(dataEp);
  557. }
  558. else
  559. {
  560. send_flow_socket = CSimulatedQueueWriteSocket::udp_connect(sendFlowEp);
  561. data_socket = CSimulatedQueueWriteSocket::udp_connect(dataEp);
  562. }
  563. }
  564. else
  565. #endif
  566. {
  567. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  568. data_socket = ISocket::udp_connect(dataEp);
  569. }
  570. if (isLocal)
  571. {
  572. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  573. if (udpTraceLevel > 0)
  574. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  575. }
  576. }
  577. catch(IException *e)
  578. {
  579. StringBuffer error, ipstr;
  580. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  581. throw;
  582. }
  583. catch(...)
  584. {
  585. StringBuffer ipstr;
  586. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  587. throw;
  588. }
  589. output_queue = new queue_t[numQueues];
  590. maxPktsPerQ = new int[numQueues];
  591. for (unsigned j = 0; j < numQueues; j++)
  592. {
  593. output_queue[j].set_queue_size(_queueSize);
  594. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  595. }
  596. initialized = true;
  597. if (udpTraceLevel > 0)
  598. {
  599. StringBuffer ipStr, myIpStr;
  600. DBGLOG("UdpSender[%s]: added entry for ip=%s to receivers table - send_flow_port=%d", _myIP.getIpText(myIpStr).str(), ip.getIpText(ipStr).str(), _sendFlowPort);
  601. }
  602. }
  603. if (udpResendLostPackets)
  604. {
  605. DBGLOG("UdpSender: created resend list with %u entries", TRACKER_BITS);
  606. resendList = new UdpResendList;
  607. }
  608. else
  609. DBGLOG("UdpSender: resend list disabled");
  610. }
  611. ~UdpReceiverEntry()
  612. {
  613. if (send_flow_socket) send_flow_socket->Release();
  614. if (data_socket) data_socket->Release();
  615. if (output_queue) delete [] output_queue;
  616. if (maxPktsPerQ) delete [] maxPktsPerQ;
  617. delete resendList;
  618. }
  619. };
  620. class CSendManager : implements ISendManager, public CInterface
  621. {
  622. class StartedThread : public Thread
  623. {
  624. private:
  625. Semaphore started;
  626. virtual int run()
  627. {
  628. started.signal();
  629. return doRun();
  630. }
  631. protected:
  632. std::atomic<bool> running;
  633. public:
  634. StartedThread(const char *name) : Thread(name)
  635. {
  636. running = false;
  637. }
  638. ~StartedThread()
  639. {
  640. if (running)
  641. {
  642. running = false;
  643. join();
  644. }
  645. }
  646. virtual void start()
  647. {
  648. running = true;
  649. Thread::start();
  650. started.wait();
  651. }
  652. virtual int doRun() = 0;
  653. };
  654. class send_resend_flow : public StartedThread
  655. {
  656. // Check if any senders have timed out
  657. CSendManager &parent;
  658. Semaphore terminated;
  659. virtual int doRun() override
  660. {
  661. if (udpTraceLevel > 0)
  662. DBGLOG("UdpSender[%s]: send_resend_flow started", parent.myId);
  663. unsigned timeout = udpRequestToSendTimeout;
  664. while (running)
  665. {
  666. if (terminated.wait(timeout) || !running)
  667. break;
  668. unsigned now = msTick();
  669. timeout = udpRequestToSendTimeout;
  670. for (auto&& dest: parent.receiversTable)
  671. {
  672. #ifdef _DEBUG
  673. // Consistency check
  674. UdpReceiverEntry &receiverInfo = parent.receiversTable[dest.ip];
  675. if (&receiverInfo != &dest)
  676. {
  677. StringBuffer s;
  678. DBGLOG("UdpSender[%s]: table entry %s does not find itself", parent.myId, dest.ip.getIpText(s).str());
  679. printStackReport();
  680. }
  681. #endif
  682. unsigned expireTime = dest.requestExpiryTime;
  683. if (expireTime)
  684. {
  685. int timeToGo = expireTime-now;
  686. if (timeToGo <= 0)
  687. dest.resendRequestToSend();
  688. else if ((unsigned) timeToGo < timeout)
  689. timeout = timeToGo;
  690. }
  691. }
  692. if (udpStatsReportInterval && (now-lastResentReport > udpStatsReportInterval))
  693. {
  694. // MORE - some of these should really be tracked per destination
  695. lastResentReport = now;
  696. if (packetsResent > lastPacketsResent)
  697. {
  698. DBGLOG("%u more packets resent by this agent (%u total)", packetsResent-lastPacketsResent, packetsResent-0);
  699. lastPacketsResent = packetsResent;
  700. }
  701. if (flowRequestsSent > lastFlowRequestsSent)
  702. {
  703. DBGLOG("%u more flow request packets sent by this agent (%u total)", flowRequestsSent - lastFlowRequestsSent, flowRequestsSent-0);
  704. lastFlowRequestsSent = flowRequestsSent;
  705. }
  706. if (flowPermitsReceived > lastFlowPermitsReceived)
  707. {
  708. DBGLOG("%u more flow control packets recived by this agent (%u total)", flowPermitsReceived - lastFlowPermitsReceived, flowPermitsReceived-0);
  709. lastFlowPermitsReceived = flowPermitsReceived;
  710. }
  711. if (dataPacketsSent > lastDataPacketsSent)
  712. {
  713. DBGLOG("%u more data packets sent by this agent (%u total)", dataPacketsSent - lastDataPacketsSent, dataPacketsSent-0);
  714. lastDataPacketsSent = dataPacketsSent;
  715. }
  716. }
  717. }
  718. return 0;
  719. }
  720. public:
  721. send_resend_flow(CSendManager &_parent)
  722. : StartedThread("UdpLib::send_resend_flow"), parent(_parent)
  723. {
  724. start();
  725. }
  726. ~send_resend_flow()
  727. {
  728. running = false;
  729. terminated.signal();
  730. join();
  731. }
  732. };
  733. class send_receive_flow : public StartedThread
  734. {
  735. CSendManager &parent;
  736. int receive_port;
  737. Owned<ISocket> flow_socket;
  738. public:
  739. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  740. {
  741. receive_port = r_port;
  742. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  743. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  744. #ifdef SOCKET_SIMULATION
  745. if (isUdpTestMode)
  746. {
  747. if (udpTestUseUdpSockets)
  748. flow_socket.setown(CSimulatedUdpReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
  749. else
  750. flow_socket.setown(CSimulatedQueueReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
  751. }
  752. else
  753. #endif
  754. flow_socket.setown(ISocket::udp_create(receive_port));
  755. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  756. size32_t actualSize = flow_socket->get_receive_buffer_size();
  757. DBGLOG("UdpSender[%s]: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", parent.myId, receive_port, udpFlowSocketsSize, actualSize);
  758. start();
  759. }
  760. ~send_receive_flow()
  761. {
  762. running = false;
  763. if (flow_socket)
  764. flow_socket->shutdownNoThrow();
  765. join();
  766. }
  767. virtual int doRun()
  768. {
  769. if (udpTraceLevel > 0)
  770. DBGLOG("UdpSender[%s]: send_receive_flow started", parent.myId);
  771. #ifdef __linux__
  772. setLinuxThreadPriority(2);
  773. #endif
  774. while(running)
  775. {
  776. UdpPermitToSendMsg f = { flowType::ok_to_send, 0, { } };
  777. unsigned readsize = udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen);
  778. while (running)
  779. {
  780. try
  781. {
  782. unsigned int res;
  783. flow_socket->readtms(&f, readsize, readsize, res, 5000);
  784. flowPermitsReceived++;
  785. assert(res==readsize);
  786. switch (f.cmd)
  787. {
  788. case flowType::ok_to_send:
  789. if (udpTraceLevel > 2 || udpTraceFlow)
  790. {
  791. StringBuffer s;
  792. DBGLOG("UdpSender[%s]: received ok_to_send msg max %d packets from node=%s seq %" SEQF "u", parent.myId, f.max_data, f.destNode.getTraceText(s).str(), f.flowSeq);
  793. }
  794. parent.data->ok_to_send(f);
  795. break;
  796. case flowType::request_received:
  797. if (udpTraceLevel > 2 || udpTraceFlow)
  798. {
  799. StringBuffer s;
  800. DBGLOG("UdpSender[%s]: received request_received msg from node=%s seq %" SEQF "u", parent.myId, f.destNode.getTraceText(s).str(), f.flowSeq);
  801. }
  802. parent.receiversTable[f.destNode].requestAcknowledged();
  803. break;
  804. default:
  805. DBGLOG("UdpSender[%s]: received unknown flow message type=%d", parent.myId, f.cmd);
  806. }
  807. }
  808. catch (IException *e)
  809. {
  810. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  811. {
  812. StringBuffer s;
  813. DBGLOG("UdpSender[%s]: send_receive_flow::read failed port=%i %s", parent.myId, receive_port, e->errorMessage(s).str());
  814. }
  815. e->Release();
  816. }
  817. catch (...)
  818. {
  819. if (running)
  820. DBGLOG("UdpSender[%s]: send_receive_flow::unknown exception", parent.myId);
  821. MilliSleep(0);
  822. }
  823. }
  824. }
  825. return 0;
  826. }
  827. };
  828. class send_data : public StartedThread
  829. {
  830. CSendManager &parent;
  831. simple_queue<UdpPermitToSendMsg> send_queue;
  832. Linked<TokenBucket> bucket;
  833. public:
  834. send_data(CSendManager &_parent, TokenBucket *_bucket)
  835. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  836. {
  837. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  838. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  839. start();
  840. }
  841. ~send_data()
  842. {
  843. running = false;
  844. UdpPermitToSendMsg dummy = {};
  845. send_queue.push(dummy);
  846. join();
  847. }
  848. bool ok_to_send(const UdpPermitToSendMsg &msg)
  849. {
  850. if (send_queue.push(msg, 15))
  851. return true;
  852. else
  853. {
  854. StringBuffer s;
  855. DBGLOG("UdpSender[%s]: push() failed - ignored ok_to_send msg - node=%s, maxData=%u", parent.myId, msg.destNode.getTraceText(s).str(), msg.max_data);
  856. return false;
  857. }
  858. }
  859. virtual int doRun()
  860. {
  861. if (udpTraceLevel > 0)
  862. DBGLOG("UdpSender[%s]: send_data started", parent.myId);
  863. #ifdef __linux__
  864. setLinuxThreadPriority(1); // MORE - windows? Is this even a good idea? Must not send faster than receiver can pull off the socket
  865. #endif
  866. UdpPermitToSendMsg permit;
  867. while (running)
  868. {
  869. send_queue.pop(permit);
  870. if (!running)
  871. return 0;
  872. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode];
  873. unsigned payload = receiverInfo.sendData(permit, bucket);
  874. if (udpTraceLevel > 2)
  875. {
  876. StringBuffer s;
  877. DBGLOG("UdpSender[%s]: sent %u bytes to node=%s under permit %" SEQF "u", parent.myId, payload, permit.destNode.getTraceText(s).str(), permit.flowSeq);
  878. }
  879. }
  880. if (udpTraceLevel > 0)
  881. DBGLOG("UdpSender[%s]: send_data stopped", parent.myId);
  882. return 0;
  883. }
  884. };
  885. friend class send_resend_flow;
  886. friend class send_receive_flow;
  887. friend class send_data;
  888. unsigned numQueues;
  889. IpAddress myIP;
  890. StringBuffer myIdStr;
  891. const char *myId;
  892. IpMapOf<UdpReceiverEntry> receiversTable;
  893. send_resend_flow *resend_flow;
  894. send_receive_flow *receive_flow;
  895. send_data *data;
  896. Linked<TokenBucket> bucket;
  897. bool encrypted;
  898. std::atomic<unsigned> msgSeq{0};
  899. inline unsigned getNextMessageSequence()
  900. {
  901. unsigned res;
  902. do
  903. {
  904. res = ++msgSeq;
  905. } while (unlikely(!res));
  906. return res;
  907. }
  908. public:
  909. IMPLEMENT_IINTERFACE;
  910. CSendManager(int server_flow_port, int data_port, int client_flow_port, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool _encrypted)
  911. : bucket(_bucket),
  912. myIP(_myIP),
  913. receiversTable([_numQueues, q_size, server_flow_port, data_port, _encrypted, this](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), myIP, _numQueues, q_size, server_flow_port, data_port, _encrypted);}),
  914. encrypted(_encrypted)
  915. {
  916. myId = myIP.getIpText(myIdStr).str();
  917. #ifndef _WIN32
  918. setpriority(PRIO_PROCESS, 0, -3);
  919. #endif
  920. numQueues = _numQueues;
  921. data = new send_data(*this, bucket);
  922. resend_flow = new send_resend_flow(*this);
  923. receive_flow = new send_receive_flow(*this, client_flow_port);
  924. }
  925. ~CSendManager()
  926. {
  927. delete resend_flow;
  928. delete receive_flow;
  929. delete data;
  930. }
  931. // Interface ISendManager
  932. virtual void writeOwn(IUdpReceiverEntry &receiver, DataBuffer *buffer, unsigned len, unsigned queue) override
  933. {
  934. // NOTE: takes ownership of the DataBuffer
  935. assert(queue < numQueues);
  936. assert(buffer);
  937. static_cast<UdpReceiverEntry &>(receiver).pushData(queue, buffer);
  938. }
  939. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
  940. {
  941. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[destNode], myIP, getNextMessageSequence(), queue, encrypted);
  942. }
  943. virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
  944. {
  945. UdpPacketHeader pkHdr;
  946. pkHdr.ruid = ruid;
  947. pkHdr.msgId = msgId;
  948. return receiversTable[destNode].dataQueued(pkHdr);
  949. }
  950. virtual bool abortData(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode)
  951. {
  952. UdpPacketHeader pkHdr;
  953. pkHdr.ruid = ruid;
  954. pkHdr.msgId = msgId;
  955. return receiversTable[destNode].removeData((void*) &pkHdr, &UdpReceiverEntry::comparePacket);
  956. }
  957. virtual void abortAll(const ServerIdentifier &destNode)
  958. {
  959. receiversTable[destNode].abort();
  960. }
  961. virtual bool allDone()
  962. {
  963. // Used for some timing tests only
  964. for (auto&& dest: receiversTable)
  965. {
  966. if (dest.hasDataToSend())
  967. return false;
  968. }
  969. return true;
  970. }
  971. };
  972. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, const IpAddress &_myIP, TokenBucket *rateLimiter, bool encryptionInTransit)
  973. {
  974. assertex(!_myIP.isNull());
  975. return new CSendManager(server_flow_port, data_port, client_flow_port, queue_size_pr_server, queues_pr_server, _myIP, rateLimiter, encryptionInTransit);
  976. }
  977. class CMessagePacker : implements IMessagePacker, public CInterface
  978. {
  979. ISendManager &parent;
  980. IUdpReceiverEntry &receiver;
  981. UdpPacketHeader package_header;
  982. DataBuffer *part_buffer;
  983. const unsigned data_buffer_size;
  984. unsigned data_used;
  985. void *mem_buffer;
  986. unsigned mem_buffer_size;
  987. unsigned totalSize;
  988. bool packed_request;
  989. MemoryBuffer metaInfo;
  990. bool last_message_done;
  991. int queue_number;
  992. public:
  993. IMPLEMENT_IINTERFACE;
  994. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue, bool _encrypted)
  995. : parent(_parent), receiver(_receiver), data_buffer_size(DATA_PAYLOAD - sizeof(UdpPacketHeader) - (_encrypted ? 16 : 0))
  996. {
  997. queue_number = _queue;
  998. package_header.length = 0; // filled in with proper value later
  999. package_header.metalength = 0;
  1000. package_header.ruid = ruid;
  1001. package_header.msgId = msgId;
  1002. package_header.pktSeq = 0;
  1003. package_header.node.setIp(_sourceNode);
  1004. package_header.msgSeq = _msgSeq;
  1005. packed_request = false;
  1006. part_buffer = bufferManager->allocate();
  1007. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  1008. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  1009. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  1010. data_used = headerSize + sizeof(unsigned short);
  1011. mem_buffer = 0;
  1012. mem_buffer_size = 0;
  1013. last_message_done = false;
  1014. totalSize = 0;
  1015. }
  1016. ~CMessagePacker()
  1017. {
  1018. if (part_buffer)
  1019. part_buffer->Release();
  1020. if (mem_buffer) free (mem_buffer);
  1021. }
  1022. virtual void *getBuffer(unsigned len, bool variable) override
  1023. {
  1024. if (variable)
  1025. len += sizeof(RecordLengthType);
  1026. if (data_buffer_size < len)
  1027. {
  1028. // Won't fit in one, so allocate temp location
  1029. if (mem_buffer_size < len)
  1030. {
  1031. free(mem_buffer);
  1032. mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
  1033. mem_buffer_size = len;
  1034. }
  1035. packed_request = false;
  1036. if (variable)
  1037. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  1038. else
  1039. return mem_buffer;
  1040. }
  1041. if (part_buffer && ((data_buffer_size - data_used) < len))
  1042. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  1043. if (!part_buffer)
  1044. {
  1045. part_buffer = bufferManager->allocate();
  1046. }
  1047. packed_request = true;
  1048. if (variable)
  1049. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  1050. else
  1051. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  1052. }
  1053. virtual void putBuffer(const void *buf, unsigned len, bool variable) override
  1054. {
  1055. if (variable)
  1056. {
  1057. assertex(len < MAX_RECORD_LENGTH);
  1058. buf = ((char *) buf) - sizeof(RecordLengthType);
  1059. *(RecordLengthType *) buf = len;
  1060. len += sizeof(RecordLengthType);
  1061. }
  1062. totalSize += len;
  1063. if (packed_request)
  1064. {
  1065. assert(len <= (data_buffer_size - data_used));
  1066. data_used += len;
  1067. }
  1068. else
  1069. {
  1070. while (len)
  1071. {
  1072. if (!part_buffer)
  1073. {
  1074. part_buffer = bufferManager->allocate();
  1075. data_used = 0;
  1076. }
  1077. unsigned chunkLen = data_buffer_size - data_used;
  1078. if (chunkLen > len)
  1079. chunkLen = len;
  1080. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  1081. data_used += chunkLen;
  1082. len -= chunkLen;
  1083. buf = &(((char*)buf)[chunkLen]);
  1084. if (len)
  1085. flush(false);
  1086. }
  1087. }
  1088. }
  1089. virtual void sendMetaInfo(const void *buf, unsigned len) override {
  1090. metaInfo.append(len, buf);
  1091. }
  1092. virtual void flush() override { flush(true); }
  1093. virtual unsigned size() const override
  1094. {
  1095. return totalSize;
  1096. }
  1097. private:
  1098. void flush(bool last_msg)
  1099. {
  1100. if (!last_message_done && last_msg)
  1101. {
  1102. last_message_done = true;
  1103. if (!part_buffer)
  1104. part_buffer = bufferManager->allocate();
  1105. const char *metaData = metaInfo.toByteArray();
  1106. unsigned metaLength = metaInfo.length();
  1107. unsigned maxMetaLength = data_buffer_size - data_used;
  1108. while (metaLength > maxMetaLength)
  1109. {
  1110. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  1111. put_package(part_buffer, data_used, maxMetaLength);
  1112. metaLength -= maxMetaLength;
  1113. metaData += maxMetaLength;
  1114. data_used = 0;
  1115. maxMetaLength = data_buffer_size;
  1116. part_buffer = bufferManager->allocate();
  1117. }
  1118. if (metaLength)
  1119. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  1120. package_header.pktSeq |= UDP_PACKET_COMPLETE;
  1121. put_package(part_buffer, data_used, metaLength);
  1122. }
  1123. else if (part_buffer)
  1124. {
  1125. // Just flush current - used when no room for current row
  1126. if (data_used)
  1127. put_package(part_buffer, data_used, 0); // buffer released in put_package
  1128. else
  1129. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  1130. }
  1131. part_buffer = 0;
  1132. data_used = 0;
  1133. }
  1134. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  1135. {
  1136. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  1137. package_header.metalength = metalength;
  1138. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  1139. parent.writeOwn(receiver, dataBuff, package_header.length, queue_number);
  1140. package_header.pktSeq++;
  1141. }
  1142. };
  1143. extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue, bool _encrypted)
  1144. {
  1145. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _receiver, _sourceNode, _msgSeq, _queue, _encrypted);
  1146. }
  1147. IRoxieOutputQueueManager *ROQ = nullptr;