udptrs.cpp 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "udplib.hpp"
  15. #include "udpsha.hpp"
  16. #include "udptrs.hpp"
  17. #include "jsocket.hpp"
  18. #include "jlog.hpp"
  19. #ifdef _WIN32
  20. #include <winsock.h>
  21. #else
  22. #include <sys/socket.h>
  23. #include <sys/time.h>
  24. #include <sys/resource.h>
  25. #endif
  26. #include <math.h>
  27. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  28. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  29. #endif
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 5; // value in sec
  33. bool udpSnifferEnabled = true;
  34. #ifdef _DEBUG
  35. //#define _SIMULATE_LOST_PACKETS
  36. #endif
  37. using roxiemem::DataBuffer;
  38. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  39. class UdpReceiverEntry
  40. {
  41. queue_t *output_queue;
  42. bool initialized;
  43. // Circular buffer of packets available for resending
  44. unsigned retryDataCount;
  45. unsigned retryDataIdx;
  46. unsigned maxRetryData;
  47. DataBuffer **retryData;
  48. public:
  49. ISocket *send_flow_socket;
  50. ISocket *data_socket;
  51. unsigned numQueues;
  52. int current_q;
  53. int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
  54. int *maxPktsPerQ; // to minimise power function re-calc for evey packet
  55. SpinLock lock;
  56. unsigned maxUdpSequence;
  57. unsigned nextUdpSequence()
  58. {
  59. SpinBlock b(lock); // MORE - is this needed?
  60. unsigned ret = ++maxUdpSequence;
  61. if (ret & UDP_SEQUENCE_BITS) // overflowed
  62. ret = 1;
  63. return ret;
  64. }
  65. // MORE - consider where we need critsecs in here!
  66. void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
  67. {
  68. unsigned minUdpSequence;
  69. SpinBlock b(lock);
  70. {
  71. if (retryDataCount)
  72. minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
  73. else
  74. minUdpSequence = maxUdpSequence;
  75. }
  76. UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), cmd, myNodeIndex, 0, minUdpSequence, maxUdpSequence};
  77. try
  78. {
  79. send_flow_socket->write(&msg, msg.length);
  80. }
  81. catch(IException *e)
  82. {
  83. StringBuffer s;
  84. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).toCharArray());
  85. e->Release();
  86. }
  87. catch (...)
  88. {
  89. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  90. }
  91. }
  92. unsigned cleanRetryData(const UdpPermitToSendMsg &permit, PointerArray &retries)
  93. {
  94. // Any saved packets < lastReceived that are not listed as missing can be deleted
  95. SpinBlock b(lock);
  96. unsigned totalData = 0;
  97. if (checkTraceLevel(TRACE_RETRY_DATA, 3))
  98. {
  99. unsigned minUdpSequence;
  100. if (retryDataCount)
  101. minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
  102. else
  103. minUdpSequence = maxUdpSequence;
  104. StringBuffer permitStr;
  105. permit.toString(permitStr);
  106. DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence);
  107. }
  108. unsigned lastReceived = permit.lastSequenceSeen;
  109. unsigned missingIndex = 0;
  110. unsigned missingCount = permit.missingCount;
  111. unsigned i = 0;
  112. if (maxRetryData)
  113. {
  114. while (i < retryDataCount && retries.length() < permit.max_data)
  115. {
  116. unsigned idx = (retryDataIdx + i) % maxRetryData;
  117. DataBuffer *buffer = retryData[idx];
  118. if (buffer)
  119. {
  120. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  121. unsigned thisSequence = header->udpSequence;
  122. if (thisSequence > lastReceived)
  123. break;
  124. if (!missingCount || thisSequence < permit.missingSequences[missingIndex])
  125. {
  126. ::Release(buffer);
  127. retryData[idx] = NULL;
  128. if (i)
  129. i++; // MORE - leaves holes - is this smart? Alternatively could close up... Should be rare anyway
  130. else
  131. {
  132. retryDataIdx = (retryDataIdx + 1) % maxRetryData;
  133. retryDataCount--;
  134. }
  135. }
  136. else if (thisSequence == permit.missingSequences[missingIndex])
  137. {
  138. totalData += header->length;
  139. retries.append(buffer);
  140. i++;
  141. missingIndex++;
  142. missingCount--;
  143. }
  144. else
  145. {
  146. missingIndex++;
  147. missingCount--;
  148. }
  149. }
  150. else
  151. {
  152. if (i)
  153. i++;
  154. else
  155. {
  156. // Removing leading nulls
  157. retryDataCount--;
  158. retryDataIdx = (retryDataIdx + 1) % maxRetryData;
  159. }
  160. }
  161. }
  162. }
  163. if (checkTraceLevel(TRACE_RETRY_DATA, 3))
  164. DBGLOG("UdpSender: cleanRetryData found %u to resend total size %u, total %u still available", retries.length(), totalData, retryDataCount);
  165. return totalData;
  166. }
  167. unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
  168. {
  169. moreRequested = false;
  170. maxPackets = permit.max_data;
  171. PointerArray toSend;
  172. unsigned totalSent = cleanRetryData(permit, toSend);
  173. while (toSend.length() < maxPackets && dataQueued())
  174. {
  175. DataBuffer *buffer = popQueuedData();
  176. if (buffer) // Aborted slave queries leave NULL records on queue
  177. {
  178. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  179. toSend.append(buffer);
  180. totalSent += header->length;
  181. #ifdef __linux__
  182. if (isLocal && (totalSent> 100000))
  183. break;
  184. #endif
  185. }
  186. }
  187. maxPackets = toSend.length();
  188. for (unsigned idx = 0; idx < maxPackets; idx++)
  189. {
  190. DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
  191. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  192. bool isRetry = (header->udpSequence != 0);
  193. if (isRetry)
  194. {
  195. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  196. DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.destNodeIndex, header->udpSequence);
  197. atomic_inc(&packetsRetried);
  198. }
  199. else
  200. header->udpSequence = nextUdpSequence();
  201. unsigned length = header->length;
  202. if (bucket)
  203. {
  204. MTIME_SECTION(timer, "bucket_wait");
  205. bucket->wait((length / 1024)+1);
  206. }
  207. try
  208. {
  209. if (udpSendCompletedInData)
  210. {
  211. if (idx == maxPackets-1)
  212. {
  213. // MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
  214. if (false && dataQueued()) // Causes some problems because no flow control info gets through at all
  215. {
  216. moreRequested = true;
  217. header->udpSequence |= (UDP_SEQUENCE_COMPLETE|UDP_SEQUENCE_MORE);
  218. }
  219. else
  220. header->udpSequence |= UDP_SEQUENCE_COMPLETE;
  221. }
  222. }
  223. #ifdef _SIMULATE_LOST_PACKETS
  224. if (isRetry || (header->udpSequence % 100) != 0)
  225. #endif
  226. data_socket->write(buffer->data, length);
  227. header->udpSequence &= ~UDP_SEQUENCE_BITS;
  228. }
  229. catch(IException *e)
  230. {
  231. StringBuffer s;
  232. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  233. e->Release();
  234. }
  235. catch(...)
  236. {
  237. DBGLOG("UdpSender: write exception - unknown exception");
  238. }
  239. if (!isRetry && maxRetryData)
  240. {
  241. unsigned slot = (retryDataIdx + retryDataCount) % maxRetryData;
  242. if (retryDataCount < maxRetryData)
  243. retryDataCount++;
  244. else
  245. {
  246. if (udpTraceLevel > 0)
  247. DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.destNodeIndex, header->udpSequence);
  248. ::Release(retryData[slot]);
  249. }
  250. retryData[slot] = buffer;
  251. }
  252. else
  253. {
  254. ::Release(buffer);
  255. }
  256. }
  257. return totalSent;
  258. }
  259. bool dataQueued()
  260. {
  261. for (unsigned i = 0; i < numQueues; i++)
  262. {
  263. if (!output_queue[i].empty())
  264. return true;
  265. }
  266. return false;
  267. }
  268. bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  269. {
  270. for (unsigned i = 0; i < numQueues; i++)
  271. {
  272. if (output_queue[i].dataQueued(key, pkCmpFn))
  273. return true;
  274. }
  275. return false;
  276. }
  277. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  278. {
  279. bool anyRemoved = false;
  280. for (unsigned i = 0; i < numQueues; i++)
  281. {
  282. if (output_queue[i].removeData(key, pkCmpFn))
  283. anyRemoved = true;
  284. }
  285. return anyRemoved;
  286. }
  287. inline void pushData(unsigned queue, DataBuffer *buffer)
  288. {
  289. output_queue[queue].pushOwn(buffer);
  290. }
  291. DataBuffer *popQueuedData()
  292. {
  293. DataBuffer *buffer;
  294. while (1)
  295. {
  296. for (unsigned i = 0; i < numQueues; i++)
  297. {
  298. if (udpOutQsPriority)
  299. {
  300. if (output_queue[current_q].empty())
  301. {
  302. if (udpTraceLevel >= 5)
  303. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  304. currentQNumPkts = 0;
  305. current_q = (current_q + 1) % numQueues;
  306. }
  307. else
  308. {
  309. buffer = output_queue[current_q].pop();
  310. currentQNumPkts++;
  311. if (udpTraceLevel >= 5)
  312. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  313. if (currentQNumPkts >= maxPktsPerQ[current_q])
  314. {
  315. currentQNumPkts = 0;
  316. current_q = (current_q + 1) % numQueues;
  317. }
  318. return buffer;
  319. }
  320. }
  321. else
  322. {
  323. current_q = (current_q + 1) % numQueues;
  324. if (!output_queue[current_q].empty())
  325. {
  326. return output_queue[current_q].pop();
  327. }
  328. }
  329. }
  330. MilliSleep(10);
  331. DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
  332. }
  333. }
  334. UdpReceiverEntry()
  335. {
  336. send_flow_socket = data_socket = NULL;
  337. numQueues = 0;
  338. current_q = 0;
  339. initialized = false;
  340. output_queue = 0;
  341. currentQNumPkts = 0;
  342. maxPktsPerQ = 0;
  343. maxUdpSequence = 0;
  344. retryDataCount = 0;
  345. retryDataIdx = 0;
  346. retryData = NULL;
  347. }
  348. void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
  349. {
  350. assert(!initialized);
  351. maxRetryData = _maxRetryData;
  352. numQueues = _numQueues;
  353. const IpAddress &ip = getNodeAddress(destNodeIndex);
  354. if (!ip.isNull())
  355. {
  356. try
  357. {
  358. SocketEndpoint sendFlowEp(sendFlowPort, ip);
  359. SocketEndpoint dataEp(dataPort, ip);
  360. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  361. data_socket = ISocket::udp_connect(dataEp);
  362. if (isLocal)
  363. {
  364. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  365. if (udpTraceLevel > 0)
  366. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  367. }
  368. }
  369. catch(IException *e)
  370. {
  371. StringBuffer error, ipstr;
  372. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  373. throw;
  374. }
  375. catch(...)
  376. {
  377. StringBuffer ipstr;
  378. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  379. throw;
  380. }
  381. output_queue = new queue_t[numQueues];
  382. maxPktsPerQ = new int[numQueues];
  383. for (unsigned j = 0; j < numQueues; j++)
  384. {
  385. output_queue[j].set_queue_size(queueSize);
  386. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  387. }
  388. if (maxRetryData)
  389. {
  390. retryData = new DataBuffer *[maxRetryData];
  391. memset(retryData, 0, maxRetryData * sizeof(DataBuffer *));
  392. }
  393. initialized = true;
  394. if (udpTraceLevel > 0)
  395. {
  396. StringBuffer ipStr;
  397. DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
  398. }
  399. }
  400. }
  401. ~UdpReceiverEntry()
  402. {
  403. if (retryData) delete[] retryData; // MORE - should I release the values pointed to as well - not a big deal as quitting anyway...
  404. if (send_flow_socket) send_flow_socket->Release();
  405. if (data_socket) data_socket->Release();
  406. if (output_queue) delete [] output_queue;
  407. if (maxPktsPerQ) delete [] maxPktsPerQ;
  408. }
  409. };
  410. class CSendManager : public CInterface, implements ISendManager
  411. {
  412. friend class send_send_flow;
  413. class CMessagePacker : public CInterface, implements IMessagePacker
  414. {
  415. CSendManager &parent;
  416. unsigned destNodeIndex;
  417. UdpPacketHeader package_header;
  418. DataBuffer *part_buffer;
  419. unsigned data_buffer_size;
  420. unsigned data_used;
  421. void *mem_buffer;
  422. unsigned mem_buffer_size;
  423. unsigned totalSize;
  424. bool packed_request;
  425. unsigned requested_size;
  426. MemoryBuffer metaInfo;
  427. bool last_message_done;
  428. bool aborted;
  429. int queue_number;
  430. public:
  431. IMPLEMENT_IINTERFACE;
  432. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, CSendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  433. : parent(_parent)
  434. {
  435. queue_number = _queue;
  436. destNodeIndex = _destNode;
  437. package_header.length = 0; // filled in with proper value later
  438. package_header.metalength = 0;
  439. package_header.ruid = ruid;
  440. package_header.msgId = msgId;
  441. package_header.pktSeq = 0;
  442. package_header.nodeIndex = _sourceNode;
  443. package_header.msgSeq = _msgSeq;
  444. package_header.udpSequence = 0; // these are allocated when transmitted
  445. aborted = false;
  446. packed_request = false;
  447. part_buffer = bufferManager->allocate();
  448. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  449. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  450. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  451. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  452. data_used = headerSize + sizeof(unsigned short);
  453. mem_buffer = 0;
  454. mem_buffer_size = 0;
  455. last_message_done = false;
  456. totalSize = 0;
  457. if (udpTraceLevel >= 40)
  458. DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid="RUIDF" id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
  459. }
  460. ~CMessagePacker()
  461. {
  462. if (part_buffer)
  463. part_buffer->Release();
  464. if (mem_buffer) free (mem_buffer);
  465. if (udpTraceLevel >= 40)
  466. {
  467. DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid="RUIDF" id=0x%.8x mseq=%u pktSeq=%x node=%u",
  468. package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
  469. }
  470. }
  471. virtual void abort()
  472. {
  473. aborted = true;
  474. }
  475. virtual void *getBuffer(unsigned len, bool variable)
  476. {
  477. if (variable)
  478. len += sizeof(RecordLengthType);
  479. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  480. {
  481. // Won't fit in one, so allocate temp location
  482. if (mem_buffer_size < len)
  483. {
  484. free(mem_buffer);
  485. mem_buffer = malloc(len);
  486. mem_buffer_size = len;
  487. }
  488. packed_request = false;
  489. if (variable)
  490. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  491. else
  492. return mem_buffer;
  493. }
  494. if (part_buffer && ((data_buffer_size - data_used) < len))
  495. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  496. if (!part_buffer)
  497. {
  498. part_buffer = bufferManager->allocate();
  499. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  500. }
  501. packed_request = true;
  502. if (variable)
  503. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  504. else
  505. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  506. }
  507. virtual void putBuffer(const void *buf, unsigned len, bool variable)
  508. {
  509. if (variable)
  510. {
  511. assertex(len < MAX_RECORD_LENGTH);
  512. buf = ((char *) buf) - sizeof(RecordLengthType);
  513. *(RecordLengthType *) buf = len;
  514. len += sizeof(RecordLengthType);
  515. }
  516. totalSize += len;
  517. if (packed_request)
  518. {
  519. assert(len <= (data_buffer_size - data_used));
  520. data_used += len;
  521. }
  522. else
  523. {
  524. while (len)
  525. {
  526. if (!part_buffer)
  527. {
  528. part_buffer = bufferManager->allocate();
  529. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  530. data_used = 0;
  531. }
  532. unsigned chunkLen = data_buffer_size - data_used;
  533. if (chunkLen > len)
  534. chunkLen = len;
  535. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  536. data_used += chunkLen;
  537. len -= chunkLen;
  538. buf = &(((char*)buf)[chunkLen]);
  539. if (len)
  540. flush(false);
  541. }
  542. }
  543. }
  544. virtual void sendMetaInfo(const void *buf, unsigned len) {
  545. metaInfo.append(len, buf);
  546. }
  547. virtual void flush(bool last_msg = false)
  548. {
  549. assert(!aborted);
  550. if (!last_message_done && last_msg)
  551. {
  552. last_message_done = true;
  553. if (!part_buffer)
  554. part_buffer = bufferManager->allocate();
  555. const char *metaData = metaInfo.toByteArray();
  556. unsigned metaLength = metaInfo.length();
  557. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  558. while (metaLength > maxMetaLength)
  559. {
  560. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  561. put_package(part_buffer, data_used, maxMetaLength);
  562. metaLength -= maxMetaLength;
  563. metaData += maxMetaLength;
  564. data_used = 0;
  565. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  566. part_buffer = bufferManager->allocate();
  567. }
  568. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  569. package_header.pktSeq |= 0x80000000;
  570. put_package(part_buffer, data_used, metaLength);
  571. }
  572. else if (part_buffer)
  573. {
  574. // Just flush current - used when no room for current row
  575. if (data_used)
  576. put_package(part_buffer, data_used, 0); // buffer released in put_package
  577. else
  578. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  579. }
  580. part_buffer = 0;
  581. data_buffer_size = 0;
  582. data_used = 0;
  583. }
  584. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  585. {
  586. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  587. package_header.metalength = metalength;
  588. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  589. parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
  590. if (udpTraceLevel >= 50)
  591. {
  592. if (package_header.length==991)
  593. DBGLOG("NEarly");
  594. DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid="RUIDF" id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
  595. package_header.ruid, package_header.msgId, package_header.msgSeq,
  596. package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
  597. }
  598. package_header.pktSeq++;
  599. }
  600. virtual bool dataQueued()
  601. {
  602. return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
  603. }
  604. virtual unsigned size() const
  605. {
  606. return totalSize;
  607. }
  608. };
  609. class StartedThread : public Thread
  610. {
  611. private:
  612. Semaphore started;
  613. virtual int run()
  614. {
  615. started.signal();
  616. return doRun();
  617. }
  618. protected:
  619. bool running;
  620. public:
  621. StartedThread(const char *name) : Thread(name)
  622. {
  623. running = false;
  624. }
  625. ~StartedThread()
  626. {
  627. running = false;
  628. join();
  629. }
  630. virtual void start()
  631. {
  632. running = true;
  633. Thread::start();
  634. started.wait();
  635. }
  636. virtual int doRun() = 0;
  637. };
  638. class send_send_flow : public StartedThread
  639. {
  640. /*
  641. I don't like this code much at all
  642. Looping round all every time status of any changes seems like a bad thing especially as scale goes up
  643. Even though these look like a bitmap they are not used as such presently
  644. - as a result, if data_added() is called while state is completed, we lose the request I think
  645. - probably get away with it because of the dataqueued check
  646. doRun() uses state bits without protection
  647. A count of number pending for each might be better than just a flag
  648. Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
  649. - normally MANY in pending (and order is interesting)
  650. - normally few in any other state (only 1 if thread keeping up), order not really very interesting
  651. - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
  652. - in particular don't lock while processing the chain
  653. - Never need to be in >1 chain
  654. msTick() probably better than time() for detecting timeouts
  655. */
  656. enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
  657. unsigned target_count;
  658. char *state;
  659. unsigned char *timeouts; // Number of consecutive timeouts
  660. time_t *request_time;
  661. CriticalSection cr;
  662. Semaphore sem;
  663. CSendManager &parent;
  664. virtual int doRun()
  665. {
  666. // MORE - this is reading the state values unprotected
  667. // Not sure that this represents any issue in practice...
  668. if (udpTraceLevel > 0)
  669. DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
  670. while (running)
  671. {
  672. bool idle = false;
  673. if (sem.wait(1000))
  674. {
  675. if (udpTraceLevel > 4)
  676. DBGLOG("UdpSender: send_send_flow::doRun signal received");
  677. }
  678. else
  679. idle = true;
  680. if (!running) return 0;
  681. time_t now;
  682. time(&now);
  683. // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
  684. // In a typical heavy load scenario most will be pending
  685. // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
  686. // Separate lists for each state (don't need one for sending) ?
  687. for (unsigned i = 0; i < target_count; i++)
  688. {
  689. switch (state[i]) // MORE - should really protect it?
  690. {
  691. case completed:
  692. done(i, false);
  693. break;
  694. case completed_more:
  695. done(i, true);
  696. break;
  697. case pending_request:
  698. if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
  699. break;
  700. timeouts[i]++;
  701. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i sec max=%i sec",
  702. timeouts[i], udpMaxRetryTimedoutReqs,
  703. i, (int) (now - request_time[i]), udpRequestToSendTimeout);
  704. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  705. if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
  706. {
  707. abort(i);
  708. break;
  709. }
  710. // fall into...
  711. case new_request:
  712. sendRequest(i);
  713. break;
  714. default:
  715. if (idle && parent.dataQueued(i))
  716. {
  717. EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
  718. data_added(i);
  719. }
  720. }
  721. }
  722. }
  723. return 0;
  724. }
  725. void done(unsigned index, bool moreRequested)
  726. {
  727. bool dataRemaining;
  728. {
  729. CriticalBlock b(cr);
  730. dataRemaining = parent.dataQueued(index);
  731. if (dataRemaining)
  732. {
  733. state[index] = pending_request;
  734. time(&request_time[index]);
  735. }
  736. else
  737. {
  738. state[index] = 0;
  739. timeouts[index] = 0;
  740. }
  741. }
  742. if (udpTraceLevel > 3)
  743. DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
  744. if (udpSendCompletedInData)
  745. {
  746. if (dataRemaining)
  747. {
  748. // MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
  749. // though there is a possible race to consider
  750. if (!moreRequested)
  751. parent.sendRequest(index, flow_t::request_to_send);
  752. }
  753. }
  754. else
  755. parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
  756. }
  757. void sendRequest(unsigned index)
  758. {
  759. if (udpTraceLevel > 3)
  760. DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
  761. CriticalBlock b(cr);
  762. parent.sendRequest(index, flow_t::request_to_send);
  763. state[index] = pending_request;
  764. time(&request_time[index]);
  765. }
  766. void abort(unsigned index)
  767. {
  768. if (udpTraceLevel > 3)
  769. DBGLOG("UdpSender: abort sending queued data to node=%u", index);
  770. CriticalBlock b(cr);
  771. state[index] = 0;
  772. timeouts[index] = 0;
  773. parent.abortData(index);
  774. }
  775. public:
  776. send_send_flow(CSendManager &_parent, unsigned numNodes)
  777. : StartedThread("UdpLib::send_send_flow"), parent(_parent)
  778. {
  779. target_count = numNodes;
  780. state = new char [target_count];
  781. memset(state, 0, target_count);
  782. timeouts = new unsigned char [target_count];
  783. memset(timeouts, 0, target_count);
  784. request_time = new time_t [target_count];
  785. memset(request_time, 0, sizeof(time_t) * target_count);
  786. start();
  787. }
  788. ~send_send_flow()
  789. {
  790. running = false;
  791. sem.signal();
  792. join();
  793. delete [] state;
  794. delete [] timeouts;
  795. delete [] request_time;
  796. }
  797. void clear_to_send_received(unsigned index)
  798. {
  799. CriticalBlock b(cr);
  800. state[index] = sending_data;
  801. }
  802. void send_done(unsigned index, bool moreRequested)
  803. {
  804. CriticalBlock b(cr);
  805. state[index] = moreRequested ? completed_more : completed;
  806. sem.signal();
  807. }
  808. void data_added(unsigned index)
  809. {
  810. CriticalBlock b(cr);
  811. // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
  812. // Because done() checks to see if any data pending and re-calls data_added, we get away with it
  813. // Using bits sounds more sensible though?
  814. // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
  815. if (!state[index]) // MORE - should just test the bit?
  816. {
  817. state[index] = new_request;
  818. if (udpTraceLevel > 3)
  819. DBGLOG("UdpSender: state set to new_request for node=%u", index);
  820. sem.signal();
  821. }
  822. }
  823. };
  824. class send_receive_flow : public StartedThread
  825. {
  826. CSendManager &parent;
  827. int receive_port;
  828. Owned<ISocket> flow_socket;
  829. public:
  830. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  831. {
  832. receive_port = r_port;
  833. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  834. {
  835. if (!enableSocketMaxSetting)
  836. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  837. check_set_max_socket_read_buffer(udpFlowSocketsSize);
  838. }
  839. flow_socket.setown(ISocket::udp_create(receive_port));
  840. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  841. size32_t actualSize = flow_socket->get_receive_buffer_size();
  842. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  843. start();
  844. }
  845. ~send_receive_flow()
  846. {
  847. running = false;
  848. if (flow_socket)
  849. flow_socket->close();
  850. join();
  851. }
  852. virtual int doRun()
  853. {
  854. if (udpTraceLevel > 0)
  855. DBGLOG("UdpSender: send_receive_flow started");
  856. #ifdef __linux__
  857. setLinuxThreadPriority(2);
  858. #endif
  859. while(running)
  860. {
  861. UdpPermitToSendMsg f;
  862. while (running)
  863. {
  864. try
  865. {
  866. unsigned int res ;
  867. flow_socket->read(&f, 1, sizeof(f), res, 5);
  868. assertex(res == f.length);
  869. #ifdef CRC_MESSAGES
  870. assertex(f.crc == f.calcCRC());
  871. #endif
  872. switch (f.cmd)
  873. {
  874. case flow_t::ok_to_send:
  875. if (udpTraceLevel > 1)
  876. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
  877. parent.data->ok_to_send(f);
  878. break;
  879. default:
  880. DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
  881. }
  882. }
  883. catch (IException *e)
  884. {
  885. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  886. {
  887. StringBuffer s;
  888. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).toCharArray());
  889. }
  890. e->Release();
  891. }
  892. catch (...)
  893. {
  894. if (running)
  895. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  896. MilliSleep(0);
  897. }
  898. }
  899. }
  900. return 0;
  901. }
  902. };
  903. class send_data : public StartedThread
  904. {
  905. CSendManager &parent;
  906. ISocket *sniffer_socket;
  907. SocketEndpoint ep;
  908. simple_queue<UdpPermitToSendMsg> send_queue;
  909. Linked<TokenBucket> bucket;
  910. void send_sniff(bool busy)
  911. {
  912. sniff_msg msg = {sizeof(sniff_msg), busy ? flow_t::busy : flow_t::idle, parent.myNodeIndex};
  913. try
  914. {
  915. if (!sniffer_socket)
  916. {
  917. sniffer_socket = ISocket::multicast_connect(ep, 3);
  918. if (udpTraceLevel > 1)
  919. {
  920. StringBuffer url;
  921. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  922. }
  923. }
  924. sniffer_socket->write(&msg, sizeof(msg));
  925. if (udpTraceLevel > 1)
  926. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  927. }
  928. catch(IException *e)
  929. {
  930. StringBuffer s;
  931. StringBuffer url;
  932. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).toCharArray());
  933. e->Release();
  934. }
  935. catch(...)
  936. {
  937. StringBuffer url;
  938. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  939. if (sniffer_socket)
  940. {
  941. sniffer_socket->Release();
  942. sniffer_socket = NULL;
  943. }
  944. }
  945. }
  946. public:
  947. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  948. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  949. {
  950. sniffer_socket = NULL;
  951. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  952. {
  953. if (!enableSocketMaxSetting)
  954. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  955. check_set_max_socket_write_buffer(udpLocalWriteSocketSize);
  956. }
  957. start();
  958. }
  959. ~send_data()
  960. {
  961. running = false;
  962. UdpPermitToSendMsg dummy;
  963. send_queue.push(dummy);
  964. join();
  965. if (sniffer_socket)
  966. sniffer_socket->Release();
  967. }
  968. bool ok_to_send(const UdpPermitToSendMsg &msg)
  969. {
  970. if (send_queue.push(msg, 15))
  971. return true;
  972. else
  973. {
  974. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
  975. return false;
  976. }
  977. }
  978. virtual int doRun()
  979. {
  980. if (udpTraceLevel > 0)
  981. DBGLOG("UdpSender: send_data started");
  982. #ifdef __linux__
  983. setLinuxThreadPriority(1); // MORE - windows?
  984. #endif
  985. UdpPermitToSendMsg permit;
  986. while (running)
  987. {
  988. send_queue.pop(permit);
  989. if (!running)
  990. return 0;
  991. if (udpSnifferEnabled)
  992. send_sniff(true);
  993. parent.send_flow->clear_to_send_received(permit.destNodeIndex);
  994. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
  995. bool moreRequested;
  996. unsigned maxPackets;
  997. unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
  998. if (udpSendCompletedInData && !maxPackets)
  999. parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
  1000. parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
  1001. if (udpSnifferEnabled)
  1002. send_sniff(false);
  1003. if (udpTraceLevel > 1)
  1004. DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
  1005. }
  1006. if (udpTraceLevel > 0)
  1007. DBGLOG("UdpSender: send_data stopped");
  1008. return 0;
  1009. }
  1010. };
  1011. friend class send_send_flow;
  1012. friend class send_receive_flow;
  1013. friend class send_data;
  1014. unsigned numNodes;
  1015. int receive_flow_port;
  1016. int send_flow_port;
  1017. int data_port;
  1018. unsigned myNodeIndex;
  1019. unsigned numQueues;
  1020. UdpReceiverEntry *receiversTable;
  1021. send_send_flow *send_flow;
  1022. send_receive_flow *receive_flow;
  1023. send_data *data;
  1024. Linked<TokenBucket> bucket;
  1025. SpinLock msgSeqLock;
  1026. unsigned msgSeq;
  1027. static bool comparePacket(void *pkData, void *key)
  1028. {
  1029. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  1030. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  1031. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  1032. }
  1033. inline unsigned getNextMessageSequence()
  1034. {
  1035. SpinBlock b(msgSeqLock);
  1036. unsigned res = ++msgSeq;
  1037. if (!res)
  1038. res = ++msgSeq;
  1039. return res;
  1040. }
  1041. public:
  1042. IMPLEMENT_IINTERFACE;
  1043. CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned maxRetryData, unsigned _myNodeIndex, TokenBucket *_bucket)
  1044. : bucket(_bucket)
  1045. {
  1046. #ifndef _WIN32
  1047. setpriority(PRIO_PROCESS, 0, -3);
  1048. #endif
  1049. numNodes = getNumNodes();
  1050. receive_flow_port = client_flow_port;
  1051. send_flow_port = server_flow_port;
  1052. data_port = d_port;
  1053. myNodeIndex = _myNodeIndex;
  1054. numQueues = _numQueues;
  1055. receiversTable = new UdpReceiverEntry[numNodes];
  1056. if (maxRetryData > MAX_RESEND_TABLE_SIZE)
  1057. maxRetryData = MAX_RESEND_TABLE_SIZE;
  1058. for (unsigned i = 0; i < numNodes; i++)
  1059. receiversTable[i].init(i, numQueues, q_size, maxRetryData, send_flow_port, data_port, i==myNodeIndex);
  1060. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  1061. send_flow = new send_send_flow(*this, numNodes);
  1062. receive_flow = new send_receive_flow(*this, client_flow_port);
  1063. msgSeq = 0;
  1064. }
  1065. ~CSendManager()
  1066. {
  1067. delete []receiversTable;
  1068. delete send_flow;
  1069. delete receive_flow;
  1070. delete data;
  1071. }
  1072. void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
  1073. {
  1074. // NOTE: takes ownership of the DataBuffer
  1075. assert(queue < numQueues);
  1076. assert(destNodeIndex < numNodes);
  1077. receiversTable[destNodeIndex].pushData(queue, buffer);
  1078. send_flow->data_added(destNodeIndex);
  1079. }
  1080. inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
  1081. {
  1082. receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
  1083. }
  1084. bool dataQueued(unsigned destIndex)
  1085. {
  1086. return receiversTable[destIndex].dataQueued();
  1087. }
  1088. bool abortData(unsigned destIndex)
  1089. {
  1090. return receiversTable[destIndex].removeData(NULL, NULL);
  1091. }
  1092. // Interface ISendManager
  1093. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
  1094. {
  1095. if (destNodeIndex >= numNodes)
  1096. throw MakeStringException(ROXIE_UDP_ERROR, "createMesagePacker: invalid destination node index %i", destNodeIndex);
  1097. return new CMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
  1098. }
  1099. virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
  1100. {
  1101. UdpPacketHeader pkHdr;
  1102. pkHdr.ruid = ruid;
  1103. pkHdr.msgId = msgId;
  1104. return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
  1105. }
  1106. virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
  1107. {
  1108. UdpPacketHeader pkHdr;
  1109. pkHdr.ruid = ruid;
  1110. pkHdr.msgId = msgId;
  1111. return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
  1112. }
  1113. virtual bool allDone()
  1114. {
  1115. for (unsigned i = 0; i < numNodes; i++)
  1116. {
  1117. if (receiversTable[i].dataQueued())
  1118. return false;
  1119. }
  1120. return true;
  1121. }
  1122. };
  1123. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, unsigned maxRetryData, TokenBucket *rateLimiter, unsigned myNodeIndex)
  1124. {
  1125. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, maxRetryData, myNodeIndex, rateLimiter);
  1126. }