udptrs.cpp 45 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "jsocket.hpp"
  17. #include "jlog.hpp"
  18. #include "roxie.hpp"
  19. #ifdef _WIN32
  20. #include <winsock.h>
  21. #else
  22. #include <sys/socket.h>
  23. #include <sys/time.h>
  24. #include <sys/resource.h>
  25. #endif
  26. #include <math.h>
  27. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  28. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  29. #endif
  30. unsigned udpOutQsPriority = 0;
  31. unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
  32. unsigned udpRequestToSendTimeout = 5; // value in sec
  33. bool udpSnifferEnabled = true;
  34. #ifdef _DEBUG
  35. //#define _SIMULATE_LOST_PACKETS
  36. #endif
  37. using roxiemem::DataBuffer;
  38. // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
  39. class UdpReceiverEntry
  40. {
  41. queue_t *output_queue;
  42. bool initialized;
  43. // Circular buffer of packets available for resending
  44. unsigned retryDataCount;
  45. unsigned retryDataIdx;
  46. unsigned maxRetryData;
  47. DataBuffer **retryData;
  48. public:
  49. ISocket *send_flow_socket;
  50. ISocket *data_socket;
  51. unsigned numQueues;
  52. int current_q;
  53. int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
  54. int *maxPktsPerQ; // to minimise power function re-calc for evey packet
  55. SpinLock lock;
  56. unsigned maxUdpSequence;
  57. unsigned nextUdpSequence()
  58. {
  59. SpinBlock b(lock); // MORE - is this needed?
  60. unsigned ret = ++maxUdpSequence;
  61. if (ret & UDP_SEQUENCE_BITS) // overflowed
  62. ret = 1;
  63. return ret;
  64. }
  65. // MORE - consider where we need critsecs in here!
  66. void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
  67. {
  68. unsigned minUdpSequence;
  69. SpinBlock b(lock);
  70. {
  71. if (retryDataCount)
  72. minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
  73. else
  74. minUdpSequence = maxUdpSequence;
  75. }
  76. UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0, minUdpSequence, maxUdpSequence};
  77. try
  78. {
  79. send_flow_socket->write(&msg, msg.length);
  80. }
  81. catch(IException *e)
  82. {
  83. StringBuffer s;
  84. DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
  85. e->Release();
  86. }
  87. catch (...)
  88. {
  89. DBGLOG("UdpSender: sendRequest write failed - unknown error");
  90. }
  91. }
  92. unsigned cleanRetryData(const UdpPermitToSendMsg &permit, PointerArray &retries)
  93. {
  94. // Any saved packets < lastReceived that are not listed as missing can be deleted
  95. SpinBlock b(lock);
  96. unsigned totalData = 0;
  97. if (checkTraceLevel(TRACE_RETRY_DATA, 3))
  98. {
  99. unsigned minUdpSequence;
  100. if (retryDataCount)
  101. minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
  102. else
  103. minUdpSequence = maxUdpSequence;
  104. StringBuffer permitStr;
  105. permit.toString(permitStr);
  106. DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence);
  107. }
  108. unsigned lastReceived = permit.hdr.lastSequenceSeen;
  109. unsigned missingIndex = 0;
  110. unsigned missingCount = permit.hdr.missingCount;
  111. unsigned i = 0;
  112. if (maxRetryData)
  113. {
  114. while (i < retryDataCount && retries.length() < permit.hdr.max_data)
  115. {
  116. unsigned idx = (retryDataIdx + i) % maxRetryData;
  117. DataBuffer *buffer = retryData[idx];
  118. if (buffer)
  119. {
  120. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  121. unsigned thisSequence = header->udpSequence;
  122. if (thisSequence > lastReceived)
  123. break;
  124. if (!missingCount || thisSequence < permit.missingSequences[missingIndex])
  125. {
  126. ::Release(buffer);
  127. retryData[idx] = NULL;
  128. if (i)
  129. i++; // MORE - leaves holes - is this smart? Alternatively could close up... Should be rare anyway
  130. else
  131. {
  132. retryDataIdx = (retryDataIdx + 1) % maxRetryData;
  133. retryDataCount--;
  134. }
  135. }
  136. else if (thisSequence == permit.missingSequences[missingIndex])
  137. {
  138. totalData += header->length;
  139. retries.append(buffer);
  140. i++;
  141. missingIndex++;
  142. missingCount--;
  143. }
  144. else
  145. {
  146. missingIndex++;
  147. missingCount--;
  148. }
  149. }
  150. else
  151. {
  152. if (i)
  153. i++;
  154. else
  155. {
  156. // Removing leading nulls
  157. retryDataCount--;
  158. retryDataIdx = (retryDataIdx + 1) % maxRetryData;
  159. }
  160. }
  161. }
  162. }
  163. if (checkTraceLevel(TRACE_RETRY_DATA, 3))
  164. DBGLOG("UdpSender: cleanRetryData found %u to resend total size %u, total %u still available", retries.length(), totalData, retryDataCount);
  165. return totalData;
  166. }
  167. unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
  168. {
  169. moreRequested = false;
  170. maxPackets = permit.hdr.max_data;
  171. PointerArray toSend;
  172. unsigned totalSent = cleanRetryData(permit, toSend);
  173. while (toSend.length() < maxPackets && dataQueued())
  174. {
  175. DataBuffer *buffer = popQueuedData();
  176. if (buffer) // Aborted slave queries leave NULL records on queue
  177. {
  178. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  179. toSend.append(buffer);
  180. totalSent += header->length;
  181. #ifdef __linux__
  182. if (isLocal && (totalSent> 100000))
  183. break;
  184. #endif
  185. }
  186. }
  187. maxPackets = toSend.length();
  188. for (unsigned idx = 0; idx < maxPackets; idx++)
  189. {
  190. DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
  191. UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
  192. bool isRetry = (header->udpSequence != 0);
  193. if (isRetry)
  194. {
  195. if (checkTraceLevel(TRACE_RETRY_DATA, 1))
  196. DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.hdr.destNodeIndex, header->udpSequence);
  197. atomic_inc(&packetsRetried);
  198. }
  199. else
  200. header->udpSequence = nextUdpSequence();
  201. unsigned length = header->length;
  202. if (bucket)
  203. {
  204. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  205. bucket->wait((length / 1024)+1);
  206. }
  207. try
  208. {
  209. if (udpSendCompletedInData)
  210. {
  211. if (idx == maxPackets-1)
  212. {
  213. // MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
  214. if (false && dataQueued()) // Causes some problems because no flow control info gets through at all
  215. {
  216. moreRequested = true;
  217. header->udpSequence |= (UDP_SEQUENCE_COMPLETE|UDP_SEQUENCE_MORE);
  218. }
  219. else
  220. header->udpSequence |= UDP_SEQUENCE_COMPLETE;
  221. }
  222. }
  223. #ifdef _SIMULATE_LOST_PACKETS
  224. if (isRetry || (header->udpSequence % 100) != 0)
  225. #endif
  226. data_socket->write(buffer->data, length);
  227. header->udpSequence &= ~UDP_SEQUENCE_BITS;
  228. }
  229. catch(IException *e)
  230. {
  231. StringBuffer s;
  232. DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
  233. e->Release();
  234. }
  235. catch(...)
  236. {
  237. DBGLOG("UdpSender: write exception - unknown exception");
  238. }
  239. if (!isRetry && maxRetryData)
  240. {
  241. unsigned slot = (retryDataIdx + retryDataCount) % maxRetryData;
  242. if (retryDataCount < maxRetryData)
  243. retryDataCount++;
  244. else
  245. {
  246. if (udpTraceLevel > 0)
  247. DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.hdr.destNodeIndex, header->udpSequence);
  248. ::Release(retryData[slot]);
  249. }
  250. retryData[slot] = buffer;
  251. }
  252. else
  253. {
  254. ::Release(buffer);
  255. }
  256. }
  257. return totalSent;
  258. }
  259. bool dataQueued()
  260. {
  261. for (unsigned i = 0; i < numQueues; i++)
  262. {
  263. if (!output_queue[i].empty())
  264. return true;
  265. }
  266. return false;
  267. }
  268. bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  269. {
  270. for (unsigned i = 0; i < numQueues; i++)
  271. {
  272. if (output_queue[i].dataQueued(key, pkCmpFn))
  273. return true;
  274. }
  275. return false;
  276. }
  277. bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
  278. {
  279. bool anyRemoved = false;
  280. for (unsigned i = 0; i < numQueues; i++)
  281. {
  282. if (output_queue[i].removeData(key, pkCmpFn))
  283. anyRemoved = true;
  284. }
  285. return anyRemoved;
  286. }
  287. inline void pushData(unsigned queue, DataBuffer *buffer)
  288. {
  289. output_queue[queue].pushOwn(buffer);
  290. }
  291. DataBuffer *popQueuedData()
  292. {
  293. DataBuffer *buffer;
  294. while (1)
  295. {
  296. for (unsigned i = 0; i < numQueues; i++)
  297. {
  298. if (udpOutQsPriority)
  299. {
  300. if (output_queue[current_q].empty())
  301. {
  302. if (udpTraceLevel >= 5)
  303. DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
  304. currentQNumPkts = 0;
  305. current_q = (current_q + 1) % numQueues;
  306. }
  307. else
  308. {
  309. buffer = output_queue[current_q].pop();
  310. currentQNumPkts++;
  311. if (udpTraceLevel >= 5)
  312. DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
  313. if (currentQNumPkts >= maxPktsPerQ[current_q])
  314. {
  315. currentQNumPkts = 0;
  316. current_q = (current_q + 1) % numQueues;
  317. }
  318. return buffer;
  319. }
  320. }
  321. else
  322. {
  323. current_q = (current_q + 1) % numQueues;
  324. if (!output_queue[current_q].empty())
  325. {
  326. return output_queue[current_q].pop();
  327. }
  328. }
  329. }
  330. MilliSleep(10);
  331. DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
  332. }
  333. }
  334. UdpReceiverEntry()
  335. {
  336. send_flow_socket = data_socket = NULL;
  337. numQueues = 0;
  338. current_q = 0;
  339. initialized = false;
  340. output_queue = 0;
  341. currentQNumPkts = 0;
  342. maxPktsPerQ = 0;
  343. maxUdpSequence = 0;
  344. retryDataCount = 0;
  345. retryDataIdx = 0;
  346. retryData = NULL;
  347. }
  348. void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
  349. {
  350. assert(!initialized);
  351. maxRetryData = _maxRetryData;
  352. numQueues = _numQueues;
  353. const IpAddress &ip = getNodeAddress(destNodeIndex);
  354. if (!ip.isNull())
  355. {
  356. try
  357. {
  358. SocketEndpoint sendFlowEp(sendFlowPort, ip);
  359. SocketEndpoint dataEp(dataPort, ip);
  360. send_flow_socket = ISocket::udp_connect(sendFlowEp);
  361. data_socket = ISocket::udp_connect(dataEp);
  362. if (isLocal)
  363. {
  364. data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
  365. if (udpTraceLevel > 0)
  366. DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
  367. }
  368. }
  369. catch(IException *e)
  370. {
  371. StringBuffer error, ipstr;
  372. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
  373. throw;
  374. }
  375. catch(...)
  376. {
  377. StringBuffer ipstr;
  378. DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
  379. throw;
  380. }
  381. output_queue = new queue_t[numQueues];
  382. maxPktsPerQ = new int[numQueues];
  383. for (unsigned j = 0; j < numQueues; j++)
  384. {
  385. output_queue[j].set_queue_size(queueSize);
  386. maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
  387. }
  388. if (maxRetryData)
  389. {
  390. retryData = new DataBuffer *[maxRetryData];
  391. memset(retryData, 0, maxRetryData * sizeof(DataBuffer *));
  392. }
  393. initialized = true;
  394. if (udpTraceLevel > 0)
  395. {
  396. StringBuffer ipStr;
  397. DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
  398. }
  399. }
  400. }
  401. ~UdpReceiverEntry()
  402. {
  403. if (retryData) delete[] retryData; // MORE - should I release the values pointed to as well - not a big deal as quitting anyway...
  404. if (send_flow_socket) send_flow_socket->Release();
  405. if (data_socket) data_socket->Release();
  406. if (output_queue) delete [] output_queue;
  407. if (maxPktsPerQ) delete [] maxPktsPerQ;
  408. }
  409. };
  410. class CSendManager : public CInterface, implements ISendManager
  411. {
  412. friend class send_send_flow;
  413. class StartedThread : public Thread
  414. {
  415. private:
  416. Semaphore started;
  417. virtual int run()
  418. {
  419. started.signal();
  420. return doRun();
  421. }
  422. protected:
  423. bool running;
  424. public:
  425. StartedThread(const char *name) : Thread(name)
  426. {
  427. running = false;
  428. }
  429. ~StartedThread()
  430. {
  431. running = false;
  432. join();
  433. }
  434. virtual void start()
  435. {
  436. running = true;
  437. Thread::start();
  438. started.wait();
  439. }
  440. virtual int doRun() = 0;
  441. };
  442. class send_send_flow : public StartedThread
  443. {
  444. /*
  445. I don't like this code much at all
  446. Looping round all every time status of any changes seems like a bad thing especially as scale goes up
  447. Even though these look like a bitmap they are not used as such presently
  448. - as a result, if data_added() is called while state is completed, we lose the request I think
  449. - probably get away with it because of the dataqueued check
  450. doRun() uses state bits without protection
  451. A count of number pending for each might be better than just a flag
  452. Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
  453. - normally MANY in pending (and order is interesting)
  454. - normally few in any other state (only 1 if thread keeping up), order not really very interesting
  455. - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
  456. - in particular don't lock while processing the chain
  457. - Never need to be in >1 chain
  458. msTick() probably better than time() for detecting timeouts
  459. */
  460. enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
  461. unsigned target_count;
  462. char *state;
  463. unsigned char *timeouts; // Number of consecutive timeouts
  464. time_t *request_time;
  465. CriticalSection cr;
  466. Semaphore sem;
  467. CSendManager &parent;
  468. virtual int doRun()
  469. {
  470. // MORE - this is reading the state values unprotected
  471. // Not sure that this represents any issue in practice...
  472. if (udpTraceLevel > 0)
  473. DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
  474. while (running)
  475. {
  476. bool idle = false;
  477. if (sem.wait(1000))
  478. {
  479. if (udpTraceLevel > 4)
  480. DBGLOG("UdpSender: send_send_flow::doRun signal received");
  481. }
  482. else
  483. idle = true;
  484. if (!running) return 0;
  485. time_t now;
  486. time(&now);
  487. // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
  488. // In a typical heavy load scenario most will be pending
  489. // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
  490. // Separate lists for each state (don't need one for sending) ?
  491. for (unsigned i = 0; i < target_count; i++)
  492. {
  493. switch (state[i]) // MORE - should really protect it?
  494. {
  495. case completed:
  496. done(i, false);
  497. break;
  498. case completed_more:
  499. done(i, true);
  500. break;
  501. case pending_request:
  502. if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
  503. break;
  504. timeouts[i]++;
  505. EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i sec max=%i sec",
  506. timeouts[i], udpMaxRetryTimedoutReqs,
  507. i, (int) (now - request_time[i]), udpRequestToSendTimeout);
  508. // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
  509. if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
  510. {
  511. abort(i);
  512. break;
  513. }
  514. // fall into...
  515. case new_request:
  516. sendRequest(i);
  517. break;
  518. default:
  519. if (idle && parent.dataQueued(i))
  520. {
  521. EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
  522. data_added(i);
  523. }
  524. }
  525. }
  526. }
  527. return 0;
  528. }
  529. void done(unsigned index, bool moreRequested)
  530. {
  531. bool dataRemaining;
  532. {
  533. CriticalBlock b(cr);
  534. dataRemaining = parent.dataQueued(index);
  535. if (dataRemaining)
  536. {
  537. state[index] = pending_request;
  538. time(&request_time[index]);
  539. }
  540. else
  541. {
  542. state[index] = 0;
  543. timeouts[index] = 0;
  544. }
  545. }
  546. if (udpTraceLevel > 3)
  547. DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
  548. if (udpSendCompletedInData)
  549. {
  550. if (dataRemaining)
  551. {
  552. // MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
  553. // though there is a possible race to consider
  554. if (!moreRequested)
  555. parent.sendRequest(index, flow_t::request_to_send);
  556. }
  557. }
  558. else
  559. parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
  560. }
  561. void sendRequest(unsigned index)
  562. {
  563. if (udpTraceLevel > 3)
  564. DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
  565. CriticalBlock b(cr);
  566. parent.sendRequest(index, flow_t::request_to_send);
  567. state[index] = pending_request;
  568. time(&request_time[index]);
  569. }
  570. void abort(unsigned index)
  571. {
  572. if (udpTraceLevel > 3)
  573. DBGLOG("UdpSender: abort sending queued data to node=%u", index);
  574. CriticalBlock b(cr);
  575. state[index] = 0;
  576. timeouts[index] = 0;
  577. parent.abortData(index);
  578. }
  579. public:
  580. send_send_flow(CSendManager &_parent, unsigned numNodes)
  581. : StartedThread("UdpLib::send_send_flow"), parent(_parent)
  582. {
  583. target_count = numNodes;
  584. state = new char [target_count];
  585. memset(state, 0, target_count);
  586. timeouts = new unsigned char [target_count];
  587. memset(timeouts, 0, target_count);
  588. request_time = new time_t [target_count];
  589. memset(request_time, 0, sizeof(time_t) * target_count);
  590. start();
  591. }
  592. ~send_send_flow()
  593. {
  594. running = false;
  595. sem.signal();
  596. join();
  597. delete [] state;
  598. delete [] timeouts;
  599. delete [] request_time;
  600. }
  601. void clear_to_send_received(unsigned index)
  602. {
  603. CriticalBlock b(cr);
  604. state[index] = sending_data;
  605. }
  606. void send_done(unsigned index, bool moreRequested)
  607. {
  608. CriticalBlock b(cr);
  609. state[index] = moreRequested ? completed_more : completed;
  610. sem.signal();
  611. }
  612. void data_added(unsigned index)
  613. {
  614. CriticalBlock b(cr);
  615. // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
  616. // Because done() checks to see if any data pending and re-calls data_added, we get away with it
  617. // Using bits sounds more sensible though?
  618. // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
  619. if (!state[index]) // MORE - should just test the bit?
  620. {
  621. state[index] = new_request;
  622. if (udpTraceLevel > 3)
  623. DBGLOG("UdpSender: state set to new_request for node=%u", index);
  624. sem.signal();
  625. }
  626. }
  627. };
  628. class send_receive_flow : public StartedThread
  629. {
  630. CSendManager &parent;
  631. int receive_port;
  632. Owned<ISocket> flow_socket;
  633. public:
  634. send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
  635. {
  636. receive_port = r_port;
  637. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  638. {
  639. if (!enableSocketMaxSetting)
  640. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  641. check_set_max_socket_read_buffer(udpFlowSocketsSize);
  642. }
  643. flow_socket.setown(ISocket::udp_create(receive_port));
  644. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  645. size32_t actualSize = flow_socket->get_receive_buffer_size();
  646. DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
  647. start();
  648. }
  649. ~send_receive_flow()
  650. {
  651. running = false;
  652. if (flow_socket)
  653. flow_socket->close();
  654. join();
  655. }
  656. virtual int doRun()
  657. {
  658. if (udpTraceLevel > 0)
  659. DBGLOG("UdpSender: send_receive_flow started");
  660. #ifdef __linux__
  661. setLinuxThreadPriority(2);
  662. #endif
  663. while(running)
  664. {
  665. UdpPermitToSendMsg f;
  666. while (running)
  667. {
  668. try
  669. {
  670. unsigned int res ;
  671. flow_socket->read(&f, 1, sizeof(f), res, 5);
  672. assertex(res == f.hdr.length);
  673. #ifdef CRC_MESSAGES
  674. assertex(f.hdr.crc == f.calcCRC());
  675. #endif
  676. switch (f.hdr.cmd)
  677. {
  678. case flow_t::ok_to_send:
  679. if (udpTraceLevel > 1)
  680. DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.hdr.max_data, f.hdr.destNodeIndex, res);
  681. parent.data->ok_to_send(f);
  682. break;
  683. default:
  684. DBGLOG("UdpSender: received unknown flow message type=%d", f.hdr.cmd);
  685. }
  686. }
  687. catch (IException *e)
  688. {
  689. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  690. {
  691. StringBuffer s;
  692. DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
  693. }
  694. e->Release();
  695. }
  696. catch (...)
  697. {
  698. if (running)
  699. DBGLOG("UdpSender: send_receive_flow::unknown exception");
  700. MilliSleep(0);
  701. }
  702. }
  703. }
  704. return 0;
  705. }
  706. };
  707. class send_data : public StartedThread
  708. {
  709. CSendManager &parent;
  710. ISocket *sniffer_socket;
  711. SocketEndpoint ep;
  712. simple_queue<UdpPermitToSendMsg> send_queue;
  713. Linked<TokenBucket> bucket;
  714. void send_sniff(bool busy)
  715. {
  716. unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
  717. sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
  718. try
  719. {
  720. if (!sniffer_socket)
  721. {
  722. sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
  723. if (udpTraceLevel > 1)
  724. {
  725. StringBuffer url;
  726. DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
  727. }
  728. }
  729. sniffer_socket->write(&msg, sizeof(msg));
  730. if (udpTraceLevel > 1)
  731. DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
  732. }
  733. catch(IException *e)
  734. {
  735. StringBuffer s;
  736. StringBuffer url;
  737. DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
  738. e->Release();
  739. }
  740. catch(...)
  741. {
  742. StringBuffer url;
  743. DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
  744. if (sniffer_socket)
  745. {
  746. sniffer_socket->Release();
  747. sniffer_socket = NULL;
  748. }
  749. }
  750. }
  751. public:
  752. send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
  753. : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
  754. {
  755. sniffer_socket = NULL;
  756. if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
  757. {
  758. if (!enableSocketMaxSetting)
  759. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
  760. check_set_max_socket_write_buffer(udpLocalWriteSocketSize);
  761. }
  762. start();
  763. }
  764. ~send_data()
  765. {
  766. running = false;
  767. UdpPermitToSendMsg dummy;
  768. send_queue.push(dummy);
  769. join();
  770. if (sniffer_socket)
  771. sniffer_socket->Release();
  772. }
  773. bool ok_to_send(const UdpPermitToSendMsg &msg)
  774. {
  775. if (send_queue.push(msg, 15))
  776. return true;
  777. else
  778. {
  779. DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.hdr.destNodeIndex, msg.hdr.max_data);
  780. return false;
  781. }
  782. }
  783. virtual int doRun()
  784. {
  785. if (udpTraceLevel > 0)
  786. DBGLOG("UdpSender: send_data started");
  787. #ifdef __linux__
  788. setLinuxThreadPriority(1); // MORE - windows?
  789. #endif
  790. UdpPermitToSendMsg permit;
  791. while (running)
  792. {
  793. send_queue.pop(permit);
  794. if (!running)
  795. return 0;
  796. if (udpSnifferEnabled)
  797. send_sniff(true);
  798. parent.send_flow->clear_to_send_received(permit.hdr.destNodeIndex);
  799. UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.hdr.destNodeIndex];
  800. bool moreRequested;
  801. unsigned maxPackets;
  802. unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.hdr.destNodeIndex), bucket, moreRequested, maxPackets);
  803. if (udpSendCompletedInData && !maxPackets)
  804. parent.sendRequest(permit.hdr.destNodeIndex, flow_t::send_completed);
  805. parent.send_flow->send_done(permit.hdr.destNodeIndex, moreRequested);
  806. if (udpSnifferEnabled)
  807. send_sniff(false);
  808. if (udpTraceLevel > 1)
  809. DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.hdr.destNodeIndex);
  810. }
  811. if (udpTraceLevel > 0)
  812. DBGLOG("UdpSender: send_data stopped");
  813. return 0;
  814. }
  815. };
  816. friend class send_send_flow;
  817. friend class send_receive_flow;
  818. friend class send_data;
  819. unsigned numNodes;
  820. int receive_flow_port;
  821. int send_flow_port;
  822. int data_port;
  823. unsigned myNodeIndex;
  824. unsigned numQueues;
  825. UdpReceiverEntry *receiversTable;
  826. send_send_flow *send_flow;
  827. send_receive_flow *receive_flow;
  828. send_data *data;
  829. Linked<TokenBucket> bucket;
  830. SpinLock msgSeqLock;
  831. unsigned msgSeq;
  832. static bool comparePacket(void *pkData, void *key)
  833. {
  834. UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
  835. UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
  836. return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
  837. }
  838. inline unsigned getNextMessageSequence()
  839. {
  840. SpinBlock b(msgSeqLock);
  841. unsigned res = ++msgSeq;
  842. if (!res)
  843. res = ++msgSeq;
  844. return res;
  845. }
  846. public:
  847. IMPLEMENT_IINTERFACE;
  848. CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned maxRetryData, unsigned _myNodeIndex, TokenBucket *_bucket)
  849. : bucket(_bucket)
  850. {
  851. #ifndef _WIN32
  852. setpriority(PRIO_PROCESS, 0, -3);
  853. #endif
  854. numNodes = getNumNodes();
  855. receive_flow_port = client_flow_port;
  856. send_flow_port = server_flow_port;
  857. data_port = d_port;
  858. myNodeIndex = _myNodeIndex;
  859. numQueues = _numQueues;
  860. receiversTable = new UdpReceiverEntry[numNodes];
  861. if (maxRetryData > MAX_RESEND_TABLE_SIZE)
  862. maxRetryData = MAX_RESEND_TABLE_SIZE;
  863. for (unsigned i = 0; i < numNodes; i++)
  864. receiversTable[i].init(i, numQueues, q_size, maxRetryData, send_flow_port, data_port, i==myNodeIndex);
  865. data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
  866. send_flow = new send_send_flow(*this, numNodes);
  867. receive_flow = new send_receive_flow(*this, client_flow_port);
  868. msgSeq = 0;
  869. }
  870. ~CSendManager()
  871. {
  872. delete []receiversTable;
  873. delete send_flow;
  874. delete receive_flow;
  875. delete data;
  876. }
  877. void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
  878. {
  879. // NOTE: takes ownership of the DataBuffer
  880. assert(queue < numQueues);
  881. assert(destNodeIndex < numNodes);
  882. receiversTable[destNodeIndex].pushData(queue, buffer);
  883. send_flow->data_added(destNodeIndex);
  884. }
  885. inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
  886. {
  887. receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
  888. }
  889. bool dataQueued(unsigned destIndex)
  890. {
  891. return receiversTable[destIndex].dataQueued();
  892. }
  893. bool abortData(unsigned destIndex)
  894. {
  895. return receiversTable[destIndex].removeData(NULL, NULL);
  896. }
  897. // Interface ISendManager
  898. virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
  899. {
  900. if (destNodeIndex >= numNodes)
  901. throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
  902. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
  903. }
  904. virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
  905. {
  906. UdpPacketHeader pkHdr;
  907. pkHdr.ruid = ruid;
  908. pkHdr.msgId = msgId;
  909. return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
  910. }
  911. virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
  912. {
  913. UdpPacketHeader pkHdr;
  914. pkHdr.ruid = ruid;
  915. pkHdr.msgId = msgId;
  916. return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
  917. }
  918. virtual bool allDone()
  919. {
  920. for (unsigned i = 0; i < numNodes; i++)
  921. {
  922. if (receiversTable[i].dataQueued())
  923. return false;
  924. }
  925. return true;
  926. }
  927. };
  928. ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, unsigned maxRetryData, TokenBucket *rateLimiter, unsigned myNodeIndex)
  929. {
  930. return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, maxRetryData, myNodeIndex, rateLimiter);
  931. }
  932. class CMessagePacker : public CInterface, implements IMessagePacker
  933. {
  934. ISendManager &parent;
  935. unsigned destNodeIndex;
  936. UdpPacketHeader package_header;
  937. DataBuffer *part_buffer;
  938. unsigned data_buffer_size;
  939. unsigned data_used;
  940. void *mem_buffer;
  941. unsigned mem_buffer_size;
  942. unsigned totalSize;
  943. bool packed_request;
  944. unsigned requested_size;
  945. MemoryBuffer metaInfo;
  946. bool last_message_done;
  947. int queue_number;
  948. public:
  949. IMPLEMENT_IINTERFACE;
  950. CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  951. : parent(_parent)
  952. {
  953. queue_number = _queue;
  954. destNodeIndex = _destNode;
  955. package_header.length = 0; // filled in with proper value later
  956. package_header.metalength = 0;
  957. package_header.ruid = ruid;
  958. package_header.msgId = msgId;
  959. package_header.pktSeq = 0;
  960. package_header.nodeIndex = _sourceNode;
  961. package_header.msgSeq = _msgSeq;
  962. package_header.udpSequence = 0; // these are allocated when transmitted
  963. packed_request = false;
  964. part_buffer = bufferManager->allocate();
  965. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  966. assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
  967. *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
  968. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
  969. data_used = headerSize + sizeof(unsigned short);
  970. mem_buffer = 0;
  971. mem_buffer_size = 0;
  972. last_message_done = false;
  973. totalSize = 0;
  974. if (udpTraceLevel >= 40)
  975. DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
  976. }
  977. ~CMessagePacker()
  978. {
  979. if (part_buffer)
  980. part_buffer->Release();
  981. if (mem_buffer) free (mem_buffer);
  982. if (udpTraceLevel >= 40)
  983. {
  984. DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
  985. package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
  986. }
  987. }
  988. virtual void *getBuffer(unsigned len, bool variable)
  989. {
  990. if (variable)
  991. len += sizeof(RecordLengthType);
  992. if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
  993. {
  994. // Won't fit in one, so allocate temp location
  995. if (mem_buffer_size < len)
  996. {
  997. free(mem_buffer);
  998. mem_buffer = malloc(len);
  999. mem_buffer_size = len;
  1000. }
  1001. packed_request = false;
  1002. if (variable)
  1003. return ((char *) mem_buffer) + sizeof(RecordLengthType);
  1004. else
  1005. return mem_buffer;
  1006. }
  1007. if (part_buffer && ((data_buffer_size - data_used) < len))
  1008. flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
  1009. if (!part_buffer)
  1010. {
  1011. part_buffer = bufferManager->allocate();
  1012. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  1013. }
  1014. packed_request = true;
  1015. if (variable)
  1016. return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
  1017. else
  1018. return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
  1019. }
  1020. virtual void putBuffer(const void *buf, unsigned len, bool variable)
  1021. {
  1022. if (variable)
  1023. {
  1024. assertex(len < MAX_RECORD_LENGTH);
  1025. buf = ((char *) buf) - sizeof(RecordLengthType);
  1026. *(RecordLengthType *) buf = len;
  1027. len += sizeof(RecordLengthType);
  1028. }
  1029. totalSize += len;
  1030. if (packed_request)
  1031. {
  1032. assert(len <= (data_buffer_size - data_used));
  1033. data_used += len;
  1034. }
  1035. else
  1036. {
  1037. while (len)
  1038. {
  1039. if (!part_buffer)
  1040. {
  1041. part_buffer = bufferManager->allocate();
  1042. data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  1043. data_used = 0;
  1044. }
  1045. unsigned chunkLen = data_buffer_size - data_used;
  1046. if (chunkLen > len)
  1047. chunkLen = len;
  1048. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
  1049. data_used += chunkLen;
  1050. len -= chunkLen;
  1051. buf = &(((char*)buf)[chunkLen]);
  1052. if (len)
  1053. flush(false);
  1054. }
  1055. }
  1056. }
  1057. virtual void sendMetaInfo(const void *buf, unsigned len) {
  1058. metaInfo.append(len, buf);
  1059. }
  1060. virtual void flush(bool last_msg = false)
  1061. {
  1062. if (!last_message_done && last_msg)
  1063. {
  1064. last_message_done = true;
  1065. if (!part_buffer)
  1066. part_buffer = bufferManager->allocate();
  1067. const char *metaData = metaInfo.toByteArray();
  1068. unsigned metaLength = metaInfo.length();
  1069. unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
  1070. while (metaLength > maxMetaLength)
  1071. {
  1072. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
  1073. put_package(part_buffer, data_used, maxMetaLength);
  1074. metaLength -= maxMetaLength;
  1075. metaData += maxMetaLength;
  1076. data_used = 0;
  1077. maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
  1078. part_buffer = bufferManager->allocate();
  1079. }
  1080. memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
  1081. package_header.pktSeq |= 0x80000000;
  1082. put_package(part_buffer, data_used, metaLength);
  1083. }
  1084. else if (part_buffer)
  1085. {
  1086. // Just flush current - used when no room for current row
  1087. if (data_used)
  1088. put_package(part_buffer, data_used, 0); // buffer released in put_package
  1089. else
  1090. part_buffer->Release(); // If NO data in buffer, release buffer back to pool
  1091. }
  1092. part_buffer = 0;
  1093. data_buffer_size = 0;
  1094. data_used = 0;
  1095. }
  1096. void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
  1097. {
  1098. package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
  1099. package_header.metalength = metalength;
  1100. memcpy(dataBuff->data, &package_header, sizeof(package_header));
  1101. parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
  1102. if (udpTraceLevel >= 50)
  1103. {
  1104. if (package_header.length==991)
  1105. DBGLOG("NEarly");
  1106. DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
  1107. package_header.ruid, package_header.msgId, package_header.msgSeq,
  1108. package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
  1109. }
  1110. package_header.pktSeq++;
  1111. }
  1112. virtual bool dataQueued()
  1113. {
  1114. return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
  1115. }
  1116. virtual unsigned size() const
  1117. {
  1118. return totalSize;
  1119. }
  1120. };
  1121. IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
  1122. {
  1123. return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
  1124. }