udptrr.cpp 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <string>
  14. #include <map>
  15. #include <queue>
  16. #include <algorithm>
  17. #include "jthread.hpp"
  18. #include "jlog.hpp"
  19. #include "jisem.hpp"
  20. #include "jsocket.hpp"
  21. #include "udplib.hpp"
  22. #include "udptrr.hpp"
  23. #include "udptrs.hpp"
  24. #include "udpipmap.hpp"
  25. #include "udpmsgpk.hpp"
  26. #include "roxiemem.hpp"
  27. #include "roxie.hpp"
  28. #ifdef _WIN32
  29. #include <io.h>
  30. #include <winsock2.h>
  31. #else
  32. #include <sys/socket.h>
  33. #include <sys/time.h>
  34. #include <sys/resource.h>
  35. #endif
  36. #include <thread>
  37. /*
  38. The UDP transport layer uses the following timeouts:
  39. Timeouts:
  40. udpFlowAckTimeout - the maximum time that it is expected to take to receive an acknowledgement of a flow message (when one is
  41. sent) - should be small
  42. => timeout for request to send before re-requesting
  43. [sender] resend the request to send
  44. updDataSendTimeout - the maximum time that is expected to take to send the data once a permit has been granted.
  45. => timeout for assuming send_complete has been lost
  46. [sender] the timeout before re-requesting a request-to-send after sending a block of data
  47. (unused if permits are asynchronous)
  48. [receiver] Used to estimate a timeout if there are active requests and not enough active slots
  49. udpRequestTimeout - A reasonable expected time between a request for a permit until the permit is granted
  50. => timeout for guarding against an ok_to_send being lost.
  51. [sender] if no permit granted within timeout re-request to send
  52. udpPermitTimeout - the maximum time that it is expected to take to send and receive the data once a permit is granted.
  53. => Timeout for a permit before it is assumed lost
  54. [receiver] If rts received while permit is active, permit is resent. If not complete within timeout,
  55. revoke the permit.
  56. udpResendDelay - the time that should have elapsed before a missing data packet is resent
  57. (I think this only makes sense if a new permit can be granted before all the data has been received,
  58. so the request to send more is sent to the flow port.)
  59. 0 means they are unlikely to be lost, so worth resending as soon as it appears to be missing - trading
  60. duplicate packets for delays
  61. [sender] minimum time to elapse from initial send before sending packets that are assumed lost
  62. [receiver] minimum time to elapse before receiver assumes that a sender will send missing packets.
  63. Also:
  64. udpMaxPermitDeadTimeouts - How many permit grants are allowed to expire (with no flow message) until sender is assumed down
  65. udpRequestDeadTimeout - Timeout for sender getting no response to request to send before assuming that the receiver is dead
  66. General flow:
  67. -------------
  68. The general flow is as follows:
  69. On the sender:
  70. * When data becomes available (and none was previously present)
  71. - Send a request to send to the receiver flow port. Set timeout to ack timeout.
  72. * When receive "request_received"
  73. - Set timeout to udpRequestTimeout
  74. * When receive ok_to_send, add the permit to a permit queue.
  75. - Mark target as permit pending (to avoid resending requests)
  76. * Periodically:
  77. If permit requested, timeout has expired, and permit not received, then resubmit te request (with ack timeout)
  78. * When a permit is popped from the queue
  79. - gather packets to resend that are not recorded as received in the header sent by the receiver
  80. - gather any extra data packets to send up to the permit size.
  81. - Send a begin_send [to the <flow> port - so the permit is adjusted early]
  82. - Send the data packets
  83. - if no more data (and nothing in the resend list) send send_completed to data port
  84. if async permits:
  85. send send_completed to data port, and request_to_send to the flow port (timeout = udpFlowAckTimeout).
  86. else
  87. send request_to_send_more to the data port, and set re-request timeout to udpDataSendTimeout.
  88. On the receiver:
  89. * When receive request_to_send:
  90. - If the flowSeq matches an active permit resend the ok_to_send
  91. - otherwise add to requests list (if not already present) and send an acknowledgement
  92. - check to grant new permits
  93. * When receive begin_send
  94. - Adjust the permit to the actual number of packets being sent
  95. - check to grant new permits
  96. * When receive send_completed:
  97. - remove from permits list (and free up the permit)
  98. - check to grant new permits
  99. * When receive request_to_send_more:
  100. - Treat as send_complete, followed by a request_to_send.
  101. Behaviour on lost flow messages
  102. -------------------------------
  103. * request_to_send.
  104. - sender will re-request fairly quickly
  105. - receiver needs no special support
  106. => delay of ack timeout for this sender to start sending data
  107. * request_received
  108. - sender will re-request fairly quickly
  109. - receiver needs to acknowledge duplicate requests to send (but ignore requests with a lower flow id, since they have arrived out of order)
  110. => extra flow message, but no delay since receiver will still go ahead with allocating permits.
  111. * ok_to_send
  112. - sender will re-request if not received within a time limit
  113. - receiver will remove permit after timeout, and grant new permits
  114. - if a receiver gets a request to send for an active permit, it resends the ok_to_send (but retains the original timeout)
  115. => the available permits will be reduced by the number allocated to the sender until the permit expires.
  116. If multiple permits are not supported no data will be received by this node.
  117. if (udpRequestTimeout < udpPermitTimeout) the sender will re-request the permit, and potentially be re-sent ok_to_send
  118. * send_start
  119. - allocated permits are not reduced as quickly as they could be
  120. => input queue will not have as much data sent to it, (reducing the number of parallel sends?)
  121. * send_completed
  122. - allocated permit will last longer than it needs to.
  123. => similar to ok_to_send: reduced permits and delay in receiving any extra data if only a single permit is allowed.
  124. any miscalulated permits will be returned when permit expires.
  125. * request_to_send_more
  126. - allocated permit will last longer than it needs to
  127. - sender will eventually send a new request-to-send after the DataSend timeout
  128. => reduced permits and no data received for a while if a single permit
  129. * data packet
  130. - the next flow message from the receiver will contain details of which packets have been received.
  131. - the next permit will possibly be used to send some of the missing packets (see suggestions for changes from current)
  132. => collator will not be able to combine and pass data stream onto the activities. Receiver memory consumption will go up.
  133. Reordering problems:
  134. - send_started processed after data received - fewer permits issued and possible over-commit on the number of permits, take care it cannot persist
  135. - send_completed processed after next request_to_send - ignored because flow seq is previous seq
  136. - request_received processed after ok_to_send - permit still pushed, unlikely to cause any problems.
  137. Timeout problems
  138. ----------------
  139. For each of the timeouts, what happens if they are set too high, or too low, and what is an estimate for a "good" value? In general, setting a
  140. timeout too high will affect the time to recover from lost packets. Setting a timeout too low may cause catestrophic degredation under load -
  141. because the extra flow messages or data packets will reduce the overall capacity.
  142. * udpFlowAckTimeout
  143. Too high: delay in sending data if a request_to_send/request_received is lost
  144. Too low: flow control cannot acknowledge quickly enough, and receiver flow control is flooded.
  145. Suggestion: Should keep low, but avoid any risk of flooding. 10 times the typical time to process a request?
  146. * updDataSendTimeout
  147. Too high: lost send_begin will reduce the number of permits, lost request_to_send_more will delay the sender
  148. Too low: permits will expire while the data is being transferred - slots will be over-committed.
  149. sender will potentially re-request to send before all the data has been sent over the wire
  150. which will cause all "missing" packets to be resent if udpResendDelay is low.
  151. Suggestion: If multiple permits, probably better to be too high than too low.
  152. E.g. The time to send and receive the data for all/half the slots?
  153. * udpRequestTimeout
  154. Too high: if (>udPermitTimeout) then sender will need to wait for the permit to be regranted
  155. Too low: could flood receiver with requests if it is very busy
  156. Suggestion: A fraction of the udpPermitTimeout (e.g. 1/2) so a missing ok_to_send will be spotted without losing the permit
  157. * udpPermitTimeout
  158. Too high: Lost ok_to_send messages will reduce the number of permits for a long time
  159. Too low: Receiver will be flooded with requests to send when large numbers of nodes want to send.
  160. Suggestion: Better to be too low than too high. Similar to udpDataSendTimeout? 10 * the ack timeout?
  161. (If lower than the udpDataSendTimeout then the permit could be resent)
  162. * udpResendDelay
  163. Too high: Missing packets will take a long time to be resent.
  164. Too low: If large proportion of packets reordered or dropped by the network packets will be sent unnecessarily
  165. Suggestion: Set it to lower than the permit timeout, I'm not sure we want to run on a network where that many packets are being lost!
  166. Timeouts for assuming sender/receiver is dead
  167. Note: A sender/receiver is never really considered dead - it only affects active requests. If a new request is
  168. received then the communication will be restarted.
  169. * udpMaxPermitDeadTimeouts
  170. Too high: The number of available permits will be artificially reduced for too long. Another reason for supporting multiple permits.
  171. Too low: Unlikely to cause many problems. The sender should re-request anyway. (Currently doesn't throw away any data)
  172. Suggestion: ~5?. Better to err low. Enough so that the loss of a ok_to_send or start/complete are unlikely within the time.
  173. * udpRequestDeadTimeout
  174. Too high: Packets held in memory for too long, lots of re-requests sent (and ignored)
  175. Too low: Valid data may be lost when the network gets busy.
  176. Suggestion: 10s?? Better to err high, but I suspect this is much too high.... 50x the Ack timeout should really be enough (which would be 100ms)
  177. Other udp settings
  178. ------------------
  179. * udpResendLostPackets
  180. Enable the code to support resending missing packets. On a completely reliable network, disabling it would
  181. reduce the time that blocks were held onto in the sender, reducing the maximum memory footprint.
  182. * udpMaxPendingPermits
  183. How many permits can be granted at the same time. Larger numbers will cope better with large numbers of senders
  184. sending small amounts of data. Small numbers will increase the number of packets each sender can send. only 1 or 2 are not recommended...
  185. * udpMaxClientPercent
  186. The base number of slots allocated to each sender is (queueSize/maxPendingPermits). This allows a larger proportion
  187. to be initially granted (on the assumption that many senders will then update the actual number in use with a
  188. smaller number).
  189. * udpMinSlotsPerSender
  190. The smallest number of slots to assign to a sender, defaults to 1. Could increase to prevent lots of small permits being granted.
  191. * udpAssumeSequential
  192. If the sender has received a later sequence data packet, then resend a non-received packet - regardless of the udpResendDelay
  193. * udpResendAllMissingPackets
  194. Do no limit the number of missing packets sent to the size of the permit - send all that are ok to end. The rationale is that
  195. it is best to get all missing packets sent as quickly as possible to avoid the receiver being blocked.
  196. * udpAllowAsyncPermits
  197. If set it allows a new permit to be send before the receiver has read all the data for the previous permit. This allows multiple
  198. permits to be issued to the same client at the same time (the data could have been sent but not yet received, or the completed
  199. message from a pervious permit may have been lost.)
  200. * udpStatsReportInterval
  201. * udpAdjustThreadPriorities
  202. Used for experimentation. Unlikely to be useful to set to false.
  203. * udpTraceFlow
  204. * udpTraceTimeouts
  205. * udpTraceLevel
  206. * udpOutQsPriority
  207. * udpFlowSocketsSize
  208. * udpLocalWriteSocketSize
  209. Behaviour on lost servers
  210. -------------------------
  211. Receiver:
  212. - Each time a permit expires (i.e. no completed message) the number of timeouts is increased.
  213. - If it exceeds a limit then the request is removed.
  214. - The number of timeouts is reset whenever any message is received from the sender.
  215. => If a dead sender isn't spotted then the number of active permits may be artificially reduced while the sender is granted a
  216. permit. If a single sender has all the permits then the node will not receive any data while that sender has a permit.
  217. Sender:
  218. - Each time a request to send is sent the number of timeouts is increased
  219. - If the number exceeds the timeout threshold all pending data for that target is thrown away and the timeout is reset
  220. - Timeout count is reset when an acknowledgement or permit is received from the receiver
  221. Conclusions
  222. -----------
  223. There are a few conclusions that are worth recording
  224. * We need to support multiple permits, otherwise lost send_completed or ok_to_send flow messages will lead to periods when
  225. no data is being received. (Unless we add acknowledgements for those messages.)
  226. * Separate data and flow ports are necessary - otherwise flow requests from other senders will be held up by data.
  227. send_completed and request_to_send_more should be sent on the data port though (so they don't overtake the data and release the
  228. permits too early).
  229. * keep the send_completed message - rather than using the seq bit from the last packet (if only for the case where no data is sent)
  230. Which socket should flow messages go to?
  231. ----------------------------------------
  232. Most messages need to go the flow port, otherwise they will be held up by the data packets. There are a couple of
  233. messages that are less obvious:
  234. send_start:
  235. flow socket is likely to be best - because the message will not be held up by data packets being sent by *other*
  236. roxie nodes.
  237. send_completed:
  238. data socket makes sense since it indicates that the sender has finished sending data. There is no advantage (and
  239. some disadvantage) to it arriving early.
  240. request_to_send:
  241. send to the flow socket - otherwise a sender would need to wait for data from this or other senders before permission
  242. could be granted.
  243. request_to_send_more:
  244. Really two messages (send_complete and request_to_send), and only used if asynchronous permits are disabled. It needs
  245. to go to the data socket for the same reason as send_completed.
  246. Questions/suggestions/future
  247. ----------------------------
  248. - What should the relative priorities of the receive flow and data threads be? [ I think should probably be the same ]
  249. - Should the receiver immediately send a permit of 0 blocks to the receiver on send_complete/request_to_send_more to ensure missing
  250. blocks are sent as quickly as possible (and sender memory is freed up)? [ I suspect yes if request_to_send goes to the data port ]
  251. - Should ok_to_send also have an acknowledgement? [ The udpRequestTimeout provides a mechanism for spotting missing packets ]
  252. - Switch to using ns for the timeout values - so more detailed response timings can be gathered/reported
  253. Supporting multiple permits:
  254. ----------------------------
  255. The aim is to allow multiple senders to stream packets at the same time. The code should aim to not allocate more
  256. permits than there are currently slots available on the receive queue, but a slight temporary over-commit is not
  257. a problem.
  258. The algorithm needs to be resilient when flow control messages are lost.
  259. Approach:
  260. * Add a SendPermit member to the UdpSender class, including a numPackets reservation
  261. * When a permit is granted, set numPackets to the number of slots
  262. * Before a client sends a sequence of data packets it sends a start_send flow control message with the number of packets it is sending.
  263. * When the receiver gets that flow control message it sets numPackets to the number of packets being sent.
  264. * When the receiver receives a non-duplicate data packet it decrements numPackets.
  265. * The slots allocated in a permit are limited by the queue space and the sum of all numPackets values for the active permits. (Ignore overcommit)
  266. This is then further extended to allow multiple permits per sender....
  267. Supporting Asynchronous request_to_send
  268. ---------------------------------------
  269. Each sender can have up to MaxPermitsPerSender permits active at the same time. They are revoked when a completed
  270. message is received (for that permit, or a later permit). The permits are implemented as an array within the sender
  271. to avoid any dynamic memory allocation.
  272. The main difference for asynchronous requests is that instead of sending request_to_send_more to the data socket, it is split
  273. into two messages - send_complete sent to the data port, and a request_to_send_more to the flow port.
  274. What is the trade off for asynchronous requests?
  275. - Synchronous requests are received after the data, so it makes sense for udpResendDelay to be 0 (i.e. send immediately).
  276. This means that missing data packets are likely to be sent much more quickly.
  277. - Asynchronous requests allow a sender to start sending more data before the previous data has been read. When
  278. there is a single sender this will significantly reduce the latency.
  279. - Asynchronous requests also ensure that requests to send more are not held up by data being sent by other nodes.
  280. It *might* be possible to treat the permits as a circular buffer, but I don't think it would significantly
  281. improve the efficiency. (Minor when allocating a permit.)
  282. * If a sender has a permit for 0 packets it should only send appropriate "missing" packets, possibly none.
  283. Race conditions:
  284. update of flowid on flow thread may clash with access to conditional decrement from the data thread.
  285. - fewer problems if check is prevPermits>0 rather than != 0
  286. - will eventually (quickly?) recover since no data will be sent, and the done will clear the counters
  287. request_to_send while previous data has not yet been read and processed
  288. - as long as subsequent sends don't send any new data, eventually a send_complete will get through, allowing more data to be sent.
  289. */
  290. using roxiemem::DataBuffer;
  291. using roxiemem::IRowManager;
  292. RelaxedAtomic<unsigned> flowPermitsSent = {0};
  293. RelaxedAtomic<unsigned> flowRequestsReceived = {0};
  294. RelaxedAtomic<unsigned> dataPacketsReceived = {0};
  295. static unsigned lastFlowPermitsSent = 0;
  296. static unsigned lastFlowRequestsReceived = 0;
  297. static unsigned lastDataPacketsReceived = 0;
  298. // The code that redirects flow messages from data socket to flow socket relies on the assumption tested here
  299. static_assert(sizeof(UdpRequestToSendMsg) < sizeof(UdpPacketHeader), "Expected UDP rts size to be less than packet header");
  300. // The following enum is used for the current state of each sender within the udp receiving code
  301. enum class ReceiveState {
  302. idle, // no activity from the sender - wating for a request to send
  303. requested, // permit to be send has been requested but not granted (other permits may have been granted)
  304. granted, // at least one permit granted and no pending request, waiting for data to be sent
  305. max
  306. };
  307. constexpr const char * receiveStateNameText[(unsigned)ReceiveState::max] = { "idle", "requested", "granted" };
  308. const char * receiveStateName(ReceiveState idx) { return receiveStateNameText[(unsigned)idx]; }
  309. template <class T>
  310. class LinkedListOf
  311. {
  312. T *head = nullptr;
  313. T *tail = nullptr;
  314. unsigned numEntries = 0;
  315. void checkListIsValid(T *lookfor)
  316. {
  317. #ifdef _DEBUG
  318. T *prev = nullptr;
  319. T *finger = head;
  320. unsigned length = 0;
  321. while (finger)
  322. {
  323. if (finger==lookfor)
  324. lookfor = nullptr;
  325. prev = finger;
  326. finger = finger->next;
  327. length++;
  328. }
  329. assert(prev == tail);
  330. assert(lookfor==nullptr);
  331. assert(numEntries==length);
  332. #endif
  333. }
  334. public:
  335. unsigned length() const { return numEntries; }
  336. operator T *() const
  337. {
  338. return head;
  339. }
  340. void append(T *sender)
  341. {
  342. assertex(!sender->next && (sender != tail));
  343. if (tail)
  344. {
  345. tail->next = sender;
  346. sender->prev = tail;
  347. tail = sender;
  348. }
  349. else
  350. {
  351. head = tail = sender;
  352. }
  353. numEntries++;
  354. checkListIsValid(sender);
  355. }
  356. void remove(T *sender)
  357. {
  358. if (sender->prev)
  359. sender->prev->next = sender->next;
  360. else
  361. head = sender->next;
  362. if (sender->next)
  363. sender->next->prev = sender->prev;
  364. else
  365. tail = sender->prev;
  366. sender->prev = nullptr;
  367. sender->next = nullptr;
  368. numEntries--;
  369. checkListIsValid(nullptr);
  370. }
  371. };
  372. class CReceiveManager : implements IReceiveManager, public CInterface
  373. {
  374. /*
  375. * The ReceiveManager has several threads:
  376. * 1. receive_receive_flow (priority 3)
  377. * - waits for packets on flow port
  378. * - maintains list of nodes that have pending requests
  379. * - sends ok_to_send to one sender (or more) at a time
  380. * 2. receive_data (priority 4)
  381. * - reads data packets off data socket
  382. * - runs at v. high priority
  383. * - used to have an option to perform collation on this thread but a bad idea:
  384. * - can block (ends up in memory manager via attachDataBuffer).
  385. * - Does not apply back pressure
  386. * - Just enqueues them. We don't give permission to send more than the queue can hold, but it's a soft limit
  387. * 3. PacketCollator (standard priority)
  388. * - dequeues packets
  389. * - collates packets
  390. *
  391. */
  392. /*
  393. * Handling lost packets
  394. *
  395. * We try to make lost packets unlikely by telling agents when to send (and making sure they don't send unless
  396. * there's a good chance that socket buffer will have room). But we can't legislate for network issues.
  397. *
  398. * What packets can be lost?
  399. * 1. Data packets - handled via sliding window of resendable packets (or by retrying whole query after a timeout, of resend logic disabled)
  400. * 2. RequestToSend - the sender's resend thread checks periodically. There's a short initial timeout for getting a reply (either "request_received"
  401. * or "okToSend"), then a longer timeout for actually sending.
  402. * 3. OkToSend - there is a timeout after which the permission is considered invalid (based on how long it SHOULD take to send them).
  403. * The requestToSend retry mechanism would then make sure retried.
  404. * MORE - if I don't get a response from OkToSend I should assume lost and requeue it.
  405. * 4. complete - covered by same timeout as okToSend. A lost complete will mean incoming data to that node stalls for the duration of this timeout,
  406. *
  407. */
  408. class UdpSenderEntry;
  409. class SendPermit
  410. {
  411. public:
  412. SendPermit * prev = nullptr;
  413. SendPermit * next = nullptr;
  414. UdpSenderEntry * owner = nullptr;
  415. std::atomic<unsigned> flowSeq{0}; // The flow id of the request to send data. Atomic since read from data thead.
  416. std::atomic<unsigned> numPackets{0};// Updated by receive_data thread, read atomically by receive_flow
  417. std::atomic<unsigned> sendSeq{0}; // the send sequence when the request - will be <= all datapackets sent for that permit
  418. unsigned permitTime = 0; // when was the permit issued?
  419. public:
  420. bool isActive() const
  421. {
  422. //NOTE: a flowSeq if 0 is not a valid flowSeq (sender ensures that it is never used)
  423. return flowSeq.load(std::memory_order_acquire) != 0;
  424. }
  425. //How many are reserved - never return < 0 to avoid race condition where permit is being expired when a data packet for that permit
  426. //arrives.
  427. unsigned getNumReserved() const
  428. {
  429. int permits = numPackets.load(std::memory_order_acquire);
  430. return (unsigned)std::max(permits, 0);
  431. }
  432. void grantPermit(unsigned _flowSeq, unsigned _sendSeq, unsigned num, unsigned _permitTime)
  433. {
  434. flowSeq = _flowSeq;
  435. sendSeq = _sendSeq;
  436. numPackets.store(num, std::memory_order_release);
  437. permitTime = _permitTime;
  438. }
  439. void revokePermit()
  440. {
  441. flowSeq = 0;
  442. sendSeq = 0;
  443. permitTime = 0;
  444. numPackets.store(0, std::memory_order_release);
  445. }
  446. };
  447. //Increasing this number, increases the number of concurrent permits a sender may have (and its resilience to lost flow messages),
  448. //but also increases the processing cost since code often iterates through all the permits. 2..4 likely to be good values.
  449. static constexpr unsigned MaxPermitsPerSender = 4;
  450. class UdpSenderEntry // one per node in the system
  451. {
  452. // This is created the first time a message from a previously unseen IP arrives, and remains alive indefinitely
  453. // Note that the various members are accessed by different threads, but no member is accessed from more than one thread
  454. // (except where noted) so protection is not required
  455. // Note that UDP ordering rules mean we can't guarantee that we don't see a "request_to_send" for the next transfer before
  456. // we see the "complete" for the current one. Even if we were sure network stack would not reorder, these come from different
  457. // threads on the sender side and the order is not 100% guaranteed, so we need to cope with it.
  458. // We also need to recover gracefully (and preferably quickly) if any flow or data messages go missing. Currently the sender
  459. // will resend the rts if no ok_to_send within timeout, but there may be a better way?
  460. public:
  461. // Used only by receive_flow thread
  462. IpAddress dest;
  463. ISocket *flowSocket = nullptr;
  464. UdpSenderEntry *prev = nullptr; // Used to form list of all senders that have outstanding requests
  465. UdpSenderEntry *next = nullptr; // Used to form list of all senders that have outstanding requests
  466. ReceiveState state = ReceiveState::idle; // Meaning I'm not on any queue
  467. sequence_t flowSeq = 0; // the sender's most recent flow sequence number
  468. sequence_t sendSeq = 0; // the sender's most recent sequence number from request-to-send, representing
  469. // sequence number of next packet it will send
  470. unsigned timeouts = 0; // How many consecutive timeouts have happened on the current request
  471. unsigned requestTime = 0; // When we received the active requestToSend
  472. unsigned lastPermitTime = 0; // When was the last permit granted?
  473. unsigned numPermits = 0; // How many permits allocated?
  474. mutable CriticalSection psCrit;
  475. PacketTracker packetsSeen;
  476. SendPermit permits[MaxPermitsPerSender];
  477. SendPermit * lastDataPermit = permits; // optimize data packet->permit mapping. Initialise by pointing at the first permit
  478. public:
  479. UdpSenderEntry(const IpAddress &_dest, unsigned port) : dest(_dest)
  480. {
  481. SocketEndpoint ep(port, dest);
  482. #ifdef SOCKET_SIMULATION
  483. if (isUdpTestMode)
  484. if (udpTestUseUdpSockets)
  485. flowSocket = CSimulatedUdpWriteSocket::udp_connect(ep);
  486. else
  487. flowSocket = CSimulatedQueueWriteSocket::udp_connect(ep);
  488. else
  489. #endif
  490. flowSocket = ISocket::udp_connect(ep);
  491. for (SendPermit & permit : permits)
  492. permit.owner = this;
  493. }
  494. ~UdpSenderEntry()
  495. {
  496. if (flowSocket)
  497. {
  498. shutdownAndCloseNoThrow(flowSocket);
  499. flowSocket->Release();
  500. }
  501. }
  502. bool noteSeen(UdpPacketHeader &hdr)
  503. {
  504. if (udpResendLostPackets)
  505. {
  506. CriticalBlock b(psCrit);
  507. return packetsSeen.noteSeen(hdr);
  508. }
  509. else
  510. return false;
  511. }
  512. bool canSendAny() const
  513. {
  514. // We can send some if (a) the first available new packet is less than TRACKER_BITS above the first unreceived packet or
  515. // (b) we are assuming arrival in order, and there are some marked seen that are > first unseen OR
  516. // (c) the oldest in-flight packet has expired
  517. if (!udpResendLostPackets || (udpResendDelay == 0))
  518. return true;
  519. {
  520. CriticalBlock b(psCrit);
  521. if (packetsSeen.canRecord(sendSeq))
  522. return true;
  523. if (udpAssumeSequential && packetsSeen.hasGaps())
  524. return true;
  525. }
  526. //The best approximation to the oldest-inflight packet - because permits may have expired...
  527. return (msTick()-lastPermitTime > udpResendDelay);
  528. }
  529. void acknowledgeRequest(const IpAddress &returnAddress, sequence_t _flowSeq, sequence_t _sendSeq)
  530. {
  531. if (flowSeq==_flowSeq)
  532. {
  533. // It's a duplicate request-to-send - either they lost the request_received, or the ok_to_send (which has timed out)
  534. // whichever is the case we should resend the acknowledgement to prevent the sender flooding us with requests
  535. if (udpTraceLevel || udpTraceFlow)
  536. {
  537. StringBuffer s;
  538. DBGLOG("UdpFlow: Duplicate requestToSend %" SEQF "u from node %s", _flowSeq, dest.getIpText(s).str());
  539. }
  540. }
  541. flowSeq = _flowSeq;
  542. sendSeq = _sendSeq;
  543. requestTime = msTick();
  544. timeouts = 0;
  545. try
  546. {
  547. UdpPermitToSendMsg msg;
  548. msg.cmd = flowType::request_received;
  549. msg.flowSeq = _flowSeq;
  550. msg.destNode = returnAddress;
  551. msg.max_data = 0;
  552. if (udpResendLostPackets)
  553. {
  554. CriticalBlock b(psCrit);
  555. msg.seen = packetsSeen.copy();
  556. }
  557. if (udpTraceLevel > 3 || udpTraceFlow)
  558. {
  559. StringBuffer ipStr;
  560. DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
  561. }
  562. #ifdef TEST_DROPPED_PACKETS
  563. flowPacketsSent[msg.cmd]++;
  564. if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd]==0)
  565. {
  566. StringBuffer ipStr;
  567. DBGLOG("UdpReceiver: deliberately dropping request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
  568. }
  569. else
  570. #endif
  571. flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
  572. flowPermitsSent++;
  573. }
  574. catch(IException *e)
  575. {
  576. StringBuffer d, s;
  577. DBGLOG("UdpReceiver: acknowledgeRequest failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
  578. e->Release();
  579. }
  580. }
  581. void sendPermitToSend(unsigned maxTransfer, const IpAddress &returnAddress)
  582. {
  583. try
  584. {
  585. UdpPermitToSendMsg msg;
  586. msg.cmd = flowType::ok_to_send;
  587. msg.flowSeq = flowSeq;
  588. msg.destNode = returnAddress;
  589. msg.max_data = maxTransfer;
  590. if (udpResendLostPackets)
  591. {
  592. CriticalBlock b(psCrit);
  593. msg.seen = packetsSeen.copy();
  594. }
  595. if (udpTraceLevel > 3 || udpTraceFlow)
  596. {
  597. StringBuffer ipStr;
  598. DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
  599. }
  600. #ifdef TEST_DROPPED_PACKETS
  601. flowPacketsSent[msg.cmd]++;
  602. if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd]==0)
  603. {
  604. StringBuffer ipStr;
  605. DBGLOG("UdpReceiver: deliberately dropping ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
  606. }
  607. else
  608. #endif
  609. flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
  610. flowPermitsSent++;
  611. }
  612. catch(IException *e)
  613. {
  614. StringBuffer d, s;
  615. DBGLOG("UdpReceiver: requestToSend failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
  616. e->Release();
  617. }
  618. }
  619. // code to track the number of permits - all functions are called from the flow control thread, except for decPermit() from the data thread
  620. // need to be careful about concurent modifications. The exact number isn't critical, but
  621. // we should never return a -ve number. Simplest to implement by checking in getNumReserved() rather than using a cas in decPermit()
  622. // How many permits outstanding for a given flowSeq?
  623. inline unsigned getNumReserved(unsigned flowSeq) const
  624. {
  625. for (const SendPermit & permit : permits)
  626. {
  627. if (permit.flowSeq == flowSeq)
  628. return permit.getNumReserved();
  629. }
  630. return 0;
  631. }
  632. //Total reservations outstanding for the sender
  633. inline unsigned getTotalReserved() const
  634. {
  635. unsigned total = 0;
  636. for (const SendPermit & permit : permits)
  637. {
  638. total += permit.getNumReserved();
  639. }
  640. return total;
  641. }
  642. inline bool hasActivePermit() const
  643. {
  644. return (numPermits != 0);
  645. }
  646. bool hasUnusedPermit() const
  647. {
  648. return (numPermits != MaxPermitsPerSender);
  649. }
  650. inline SendPermit * queryPermit(unsigned flowSeq)
  651. {
  652. for (SendPermit & permit : permits)
  653. {
  654. if (permit.flowSeq == flowSeq)
  655. return &permit;
  656. }
  657. return nullptr;
  658. }
  659. inline SendPermit & allocatePermit(unsigned permitTime, unsigned num)
  660. {
  661. for (SendPermit & permit : permits)
  662. {
  663. if (!permit.isActive())
  664. {
  665. numPermits++;
  666. lastPermitTime = permitTime;
  667. permit.grantPermit(flowSeq, sendSeq, num, permitTime);
  668. return permit;
  669. }
  670. }
  671. throwUnexpected();
  672. }
  673. void revokePermit(SendPermit & permit)
  674. {
  675. permit.revokePermit();
  676. numPermits--;
  677. }
  678. inline void updateNumReserved(unsigned flowSeq, unsigned num)
  679. {
  680. for (SendPermit & permit : permits)
  681. {
  682. if (permit.flowSeq == flowSeq)
  683. {
  684. permit.numPackets.store(num, std::memory_order_release);
  685. return;
  686. }
  687. }
  688. }
  689. inline void decPermit(unsigned msgSeq)
  690. {
  691. if (lastDataPermit->isActive())
  692. {
  693. //If the message sequence is still larger than the lastDataPermit sequence, then the permit will not have been reallocated, so ok to decrement
  694. if ((int)(msgSeq - lastDataPermit->sendSeq) >= 0)
  695. {
  696. lastDataPermit->numPackets.fetch_sub(1, std::memory_order_acq_rel);
  697. return;
  698. }
  699. }
  700. //Although this is a bit more work than matching by flowSeq it shouldn't be too inefficient.
  701. SendPermit * bestPermit = nullptr;
  702. int bestDelta = INT_MAX;
  703. for (SendPermit & permit : permits)
  704. {
  705. if (permit.isActive())
  706. {
  707. int delta = (int)msgSeq - permit.sendSeq;
  708. //Check if this message sequence could belong to this permit (sequence number is larger)
  709. if (delta >= 0)
  710. {
  711. if (delta < bestDelta)
  712. {
  713. bestPermit = &permit;
  714. bestDelta = delta;
  715. }
  716. }
  717. }
  718. }
  719. if (bestPermit)
  720. {
  721. bestPermit->numPackets.fetch_sub(1, std::memory_order_acq_rel);
  722. lastDataPermit = bestPermit;
  723. }
  724. }
  725. };
  726. using SenderList = LinkedListOf<UdpSenderEntry>;
  727. using PermitList = LinkedListOf<SendPermit>;
  728. IpMapOf<UdpSenderEntry> sendersTable;
  729. class receive_receive_flow : public Thread
  730. {
  731. CReceiveManager &parent;
  732. Owned<ISocket> flow_socket;
  733. const unsigned flow_port;
  734. const unsigned maxSlotsPerSender;
  735. const unsigned maxPermits; // Must be provided in the constructor
  736. std::atomic<bool> running = { false };
  737. SenderList pendingRequests; // List of senders requesting permission to send
  738. PermitList pendingPermits; // List of active permits
  739. private:
  740. void noteRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq)
  741. {
  742. //Check for a permit that is still live, if found it is likely to ok_to_send was lost.
  743. SendPermit * permit = requester->queryPermit(flowSeq);
  744. if (permit)
  745. {
  746. //if present resend the ok_to_send with the size that was granted
  747. unsigned slots = permit->getNumReserved();
  748. requester->sendPermitToSend(slots, myNode.getIpAddress());
  749. return;
  750. }
  751. //One of
  752. //a) A new request has arrived
  753. //b) The sender has restarted
  754. // The receiver will eventually time out the old permits, and a new ok_to_send will be sent.
  755. //c) Messages have been received out of order (e.g. request_to_send_more after a request_to_send?)
  756. // Almost impossible - it would need to be a very delayed resend. The sender will ignore, and resend
  757. // a new request_to_send if necessary.
  758. switch (requester->state)
  759. {
  760. case ReceiveState::granted:
  761. case ReceiveState::idle:
  762. pendingRequests.append(requester);
  763. requester->state = ReceiveState::requested;
  764. break;
  765. case ReceiveState::requested:
  766. // Perhaps the sender never saw our acknowledgement? Already on queue... resend an acknowledgement
  767. break;
  768. default:
  769. // Unexpected state, should never happen!
  770. ERRLOG("ERROR: Unexpected state %s in noteRequest", receiveStateName(requester->state));
  771. throwUnexpected();
  772. break;
  773. }
  774. requester->acknowledgeRequest(myNode.getIpAddress(), flowSeq, sendSeq); // Acknowledge receipt of the request
  775. }
  776. void grantPermit(UdpSenderEntry *requester, unsigned slots)
  777. {
  778. //State must be 'requested' if it is on the pendingRequests list
  779. if (requester->state != ReceiveState::requested)
  780. {
  781. // Unexpected state, should never happen!
  782. ERRLOG("ERROR: Unexpected state %s in grantPermit", receiveStateName(requester->state));
  783. throwUnexpected();
  784. }
  785. pendingRequests.remove(requester);
  786. unsigned now = msTick();
  787. SendPermit & permit = requester->allocatePermit(now, slots);
  788. pendingPermits.append(&permit);
  789. requester->state = ReceiveState::granted;
  790. requester->requestTime = now;
  791. requester->sendPermitToSend(slots, myNode.getIpAddress());
  792. }
  793. void noteDone(UdpSenderEntry *requester, const UdpRequestToSendMsg &msg)
  794. {
  795. const unsigned flowSeq = msg.flowSeq;
  796. SendPermit * permit = requester->queryPermit(flowSeq);
  797. //A completed message, on the data flow, may often be received after the next request to send.
  798. //If so it should not update the state, but it should clear all grants with a flowid <= the new flowid
  799. //since all the data will have been sent. (If it has not been received it is either lost or OOO (unlikely).)
  800. for (SendPermit & permit : requester->permits)
  801. {
  802. if (permit.isActive() && ((int)(permit.flowSeq - flowSeq) <= 0))
  803. {
  804. pendingPermits.remove(&permit);
  805. requester->revokePermit(permit);
  806. }
  807. }
  808. //If it matches the current flowSeq, then we can assume everything is complete, otherwise leave the state as it is
  809. if (flowSeq != requester->flowSeq)
  810. return;
  811. switch (requester->state)
  812. {
  813. case ReceiveState::requested:
  814. // A bit unexpected but will happen if the permission timed out and the request was added to the requests queue
  815. pendingRequests.remove(requester);
  816. break;
  817. case ReceiveState::granted:
  818. break;
  819. case ReceiveState::idle:
  820. DBGLOG("Duplicate completed message received: msg %s flowSeq %" SEQF "u sendSeq %" SEQF "u. Ignoring", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq);
  821. break;
  822. default:
  823. // Unexpected state, should never happen! Ignore.
  824. ERRLOG("ERROR: Unexpected state %s in noteDone", receiveStateName(requester->state));
  825. break;
  826. }
  827. requester->state = ReceiveState::idle;
  828. }
  829. public:
  830. receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
  831. : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size)
  832. {
  833. }
  834. ~receive_receive_flow()
  835. {
  836. if (running)
  837. {
  838. running = false;
  839. shutdownAndCloseNoThrow(flow_socket);
  840. join();
  841. }
  842. }
  843. virtual void start()
  844. {
  845. running = true;
  846. if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
  847. throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
  848. #ifdef SOCKET_SIMULATION
  849. if (isUdpTestMode)
  850. if (udpTestUseUdpSockets)
  851. flow_socket.setown(CSimulatedUdpReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
  852. else
  853. flow_socket.setown(CSimulatedQueueReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
  854. else
  855. #endif
  856. flow_socket.setown(ISocket::udp_create(flow_port));
  857. flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
  858. size32_t actualSize = flow_socket->get_receive_buffer_size();
  859. DBGLOG("UdpReceiver: receive_receive_flow created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
  860. Thread::start();
  861. }
  862. void doFlowRequest(const UdpRequestToSendMsg &msg)
  863. {
  864. flowRequestsReceived++;
  865. if (udpTraceLevel > 5 || udpTraceFlow)
  866. {
  867. StringBuffer ipStr;
  868. DBGLOG("UdpReceiver: received %s msg flowSeq %" SEQF "u sendSeq %" SEQF "u from node=%s", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq, msg.sourceNode.getTraceText(ipStr).str());
  869. }
  870. UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode];
  871. unsigned flowSeq = msg.flowSeq;
  872. switch (msg.cmd)
  873. {
  874. case flowType::request_to_send:
  875. noteRequest(sender, flowSeq, msg.sendSeq);
  876. break;
  877. case flowType::send_start:
  878. // Could potentially go up if the sender sends more missing packets than the receiver granted, or if
  879. // the permit has timed out.
  880. sender->updateNumReserved(msg.flowSeq, msg.packets);
  881. break;
  882. case flowType::send_completed:
  883. noteDone(sender, msg);
  884. break;
  885. case flowType::request_to_send_more:
  886. {
  887. noteDone(sender, msg);
  888. unsigned nextFlowSeq = std::max(flowSeq+1, 1U); // protect against a flowSeq of 0
  889. noteRequest(sender, nextFlowSeq, msg.sendSeq);
  890. break;
  891. }
  892. default:
  893. DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
  894. }
  895. }
  896. unsigned checkPendingRequests()
  897. {
  898. unsigned timeout = 5000; // The default timeout is 5 seconds if nothing is waiting for response...
  899. unsigned permitsIssued = 0;
  900. if (pendingPermits)
  901. {
  902. unsigned now = msTick();
  903. //First remove any expired permits (stored in expiry-order in the permit list)
  904. SendPermit *finger = pendingPermits;
  905. while (finger)
  906. {
  907. unsigned elapsed = now - finger->permitTime;
  908. if (elapsed >= udpPermitTimeout)
  909. {
  910. UdpSenderEntry * sender = finger->owner;
  911. if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
  912. {
  913. StringBuffer s;
  914. DBGLOG("permit %" SEQF "u to node %s (%u packets) timed out after %u ms, rescheduling", sender->flowSeq, sender->dest.getIpText(s).str(), sender->getTotalReserved(), elapsed);
  915. }
  916. SendPermit *next = finger->next;
  917. pendingPermits.remove(finger);
  918. sender->revokePermit(*finger);
  919. if (++sender->timeouts > udpMaxPermitDeadTimeouts && udpMaxPermitDeadTimeouts != 0)
  920. {
  921. if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
  922. {
  923. StringBuffer s;
  924. DBGLOG("permit to send %" SEQF "u to node %s timed out %u times - abandoning", sender->flowSeq, sender->dest.getIpText(s).str(), sender->timeouts);
  925. }
  926. //Currently this is benign. If the sender really is alive it will send another request.
  927. //Should this have a more significant effect and throw away any data that has been received from that sender??
  928. //Only change the state if there are no other active permits. Only the last request will be re-sent.
  929. if (!sender->hasActivePermit())
  930. sender->state = ReceiveState::idle;
  931. }
  932. else if (sender->state != ReceiveState::requested)
  933. {
  934. // Put it back on the request queue (at the back) - even if there are other active permits
  935. pendingRequests.append(sender);
  936. sender->state = ReceiveState::requested;
  937. }
  938. finger = next;
  939. }
  940. else
  941. {
  942. timeout = udpPermitTimeout - elapsed;
  943. break;
  944. }
  945. }
  946. // Sum the number of reserved slots assigned to active permits
  947. while (finger)
  948. {
  949. permitsIssued += finger->getNumReserved();
  950. finger = finger->next;
  951. }
  952. }
  953. // Aim is to issue enough permits to use all available the space in the queue. Adjust available by the
  954. // number already issued (avoid underflow if over-committed).
  955. unsigned slots = parent.input_queue->available();
  956. if (slots >= permitsIssued)
  957. slots -= permitsIssued;
  958. else
  959. slots = 0;
  960. bool anyCanSend = false;
  961. bool anyCannotSend = false;
  962. //Note, iterate if slots==0 so that timeout code is processed.
  963. for (UdpSenderEntry *finger = pendingRequests; finger != nullptr; finger = finger->next)
  964. {
  965. if (pendingPermits.length()>=udpMaxPendingPermits)
  966. break;
  967. if (slots < udpMinSlotsPerSender)
  968. {
  969. //The number of slots may increase if (a) data is read off the input queue, or (b) a send_start adjusts the number of permits
  970. //(b) will result on a read on this thread so no need to adjust timeout.
  971. //(a) requires some data to be read, so assume a tenth of the time to read all data
  972. const unsigned udpWaitForSlotTimeout = std::max(updDataSendTimeout/10, 1U);
  973. if (timeout > udpWaitForSlotTimeout)
  974. timeout = udpWaitForSlotTimeout; // Slots should free up very soon!
  975. break;
  976. }
  977. // If requester would not be able to send me any (because of the ones in flight) then wait
  978. if (finger->canSendAny())
  979. {
  980. //If multiple done messages are lost and an rts has been processed there may be no permits free
  981. //
  982. //The transfer will recover once the permit expires. Could consider expiring the oldest permit, but it
  983. //is possible that the data is still in transit, and the done may be about to appear soon.
  984. //Waiting is likely to be the better option
  985. if (finger->hasUnusedPermit())
  986. {
  987. unsigned requestSlots = slots;
  988. //If already 2 outstanding permits, grant a new permit for 0 slots to send any missing packets, but nothing else.
  989. if (requestSlots>maxSlotsPerSender)
  990. requestSlots = maxSlotsPerSender;
  991. if (requestSlots > maxPermits-permitsIssued)
  992. requestSlots = maxPermits-permitsIssued;
  993. grantPermit(finger, requestSlots);
  994. slots -= requestSlots;
  995. anyCanSend = true;
  996. if (timeout > udpPermitTimeout)
  997. timeout = udpPermitTimeout;
  998. }
  999. else
  1000. {
  1001. //Sender has a request to send, but all permits are active - suggests a previous done has been lost/not received yet
  1002. //Do not set anyCannotSend - because a permit being freed will be triggered by a flow message - so no need to
  1003. //adjust the timeout. (A different situation from waiting for a data packet to allow a sender to proceed.)
  1004. if (udpTraceFlow)
  1005. {
  1006. StringBuffer s;
  1007. DBGLOG("Sender %s can't be given permission to send yet as all permits active", finger->dest.getIpText(s).str());
  1008. }
  1009. }
  1010. }
  1011. else
  1012. {
  1013. anyCannotSend = true;
  1014. if (udpTraceFlow)
  1015. {
  1016. StringBuffer s;
  1017. DBGLOG("Sender %s can't be given permission to send yet as resend buffer full", finger->dest.getIpText(s).str());
  1018. }
  1019. }
  1020. }
  1021. if (anyCannotSend && !anyCanSend)
  1022. {
  1023. // A very unusual situation - all potential readers cannot send any extra packets because there are significant missing packets
  1024. if (udpTraceFlow)
  1025. {
  1026. StringBuffer s;
  1027. DBGLOG("All senders blocked by resend buffers");
  1028. }
  1029. //Hard to tell what should happen to the timeout - try again when the resend timeout will allow missing packets to be sent
  1030. unsigned missingPacketTimeout = std::max(udpResendDelay, 1U);
  1031. if (timeout > missingPacketTimeout)
  1032. timeout = missingPacketTimeout; // Hopefully one of the senders should unblock soon
  1033. }
  1034. return timeout;
  1035. }
  1036. virtual int run() override
  1037. {
  1038. DBGLOG("UdpReceiver: receive_receive_flow started");
  1039. #ifdef __linux__
  1040. setLinuxThreadPriority(3);
  1041. #else
  1042. adjustPriority(1);
  1043. #endif
  1044. UdpRequestToSendMsg msg;
  1045. unsigned timeout = 5000;
  1046. while (running)
  1047. {
  1048. try
  1049. {
  1050. if (udpTraceLevel > 5 || udpTraceFlow)
  1051. {
  1052. DBGLOG("UdpReceiver: wait_read(%u)", timeout);
  1053. }
  1054. bool dataAvail = flow_socket->wait_read(timeout);
  1055. if (dataAvail)
  1056. {
  1057. const unsigned l = sizeof(msg);
  1058. unsigned int res ;
  1059. flow_socket->readtms(&msg, l, l, res, 0);
  1060. assert(res==l);
  1061. doFlowRequest(msg);
  1062. }
  1063. timeout = checkPendingRequests();
  1064. }
  1065. catch (IException *e)
  1066. {
  1067. if (running)
  1068. {
  1069. StringBuffer s;
  1070. DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
  1071. }
  1072. e->Release();
  1073. }
  1074. catch (...)
  1075. {
  1076. DBGLOG("UdpReceiver: receive_receive_flow::run unknown exception");
  1077. }
  1078. }
  1079. return 0;
  1080. }
  1081. };
  1082. class receive_data : public Thread
  1083. {
  1084. CReceiveManager &parent;
  1085. ISocket *receive_socket = nullptr;
  1086. ISocket *selfFlowSocket = nullptr;
  1087. std::atomic<bool> running = { false };
  1088. Semaphore started;
  1089. public:
  1090. receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
  1091. {
  1092. unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2;
  1093. if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
  1094. if (check_max_socket_read_buffer(ip_buffer) < 0)
  1095. throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
  1096. #ifdef SOCKET_SIMULATION
  1097. if (isUdpTestMode)
  1098. {
  1099. if (udpTestUseUdpSockets)
  1100. {
  1101. receive_socket = CSimulatedUdpReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
  1102. selfFlowSocket = CSimulatedUdpWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
  1103. }
  1104. else
  1105. {
  1106. receive_socket = CSimulatedQueueReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
  1107. selfFlowSocket = CSimulatedQueueWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
  1108. }
  1109. }
  1110. else
  1111. #endif
  1112. {
  1113. receive_socket = ISocket::udp_create(parent.data_port);
  1114. selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
  1115. }
  1116. receive_socket->set_receive_buffer_size(ip_buffer);
  1117. size32_t actualSize = receive_socket->get_receive_buffer_size();
  1118. DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
  1119. running = false;
  1120. }
  1121. virtual void start()
  1122. {
  1123. running = true;
  1124. Thread::start();
  1125. started.wait();
  1126. }
  1127. ~receive_data()
  1128. {
  1129. DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
  1130. running = false;
  1131. shutdownAndCloseNoThrow(receive_socket);
  1132. shutdownAndCloseNoThrow(selfFlowSocket);
  1133. join();
  1134. ::Release(receive_socket);
  1135. ::Release(selfFlowSocket);
  1136. }
  1137. virtual int run()
  1138. {
  1139. DBGLOG("UdpReceiver: receive_data started");
  1140. #ifdef __linux__
  1141. setLinuxThreadPriority(4);
  1142. #else
  1143. adjustPriority(2);
  1144. #endif
  1145. started.signal();
  1146. unsigned lastOOOReport = 0;
  1147. unsigned lastPacketsOOO = 0;
  1148. unsigned timeout = 5000;
  1149. DataBuffer *b = nullptr;
  1150. while (running)
  1151. {
  1152. try
  1153. {
  1154. if (!b)
  1155. b = bufferManager->allocate();
  1156. unsigned int res;
  1157. while (true)
  1158. {
  1159. receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout);
  1160. if (res!=sizeof(UdpRequestToSendMsg))
  1161. break;
  1162. //Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data
  1163. //Redirect them to the flow thread to process them.
  1164. selfFlowSocket->write(b->data, res);
  1165. }
  1166. dataPacketsReceived++;
  1167. UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
  1168. assert(hdr.length == res && hdr.length > sizeof(hdr));
  1169. UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
  1170. if (sender->noteSeen(hdr))
  1171. {
  1172. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  1173. {
  1174. StringBuffer s;
  1175. DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
  1176. }
  1177. hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
  1178. }
  1179. else
  1180. {
  1181. //Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted)
  1182. sender->decPermit(hdr.msgSeq);
  1183. if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
  1184. {
  1185. StringBuffer s;
  1186. DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
  1187. }
  1188. }
  1189. parent.input_queue->pushOwn(b);
  1190. b = nullptr;
  1191. if (udpStatsReportInterval)
  1192. {
  1193. unsigned now = msTick();
  1194. if (now-lastOOOReport > udpStatsReportInterval)
  1195. {
  1196. lastOOOReport = now;
  1197. if (packetsOOO > lastPacketsOOO)
  1198. {
  1199. DBGLOG("%u more packets received out-of-order by this server (%u total)", packetsOOO-lastPacketsOOO, packetsOOO-0);
  1200. lastPacketsOOO = packetsOOO;
  1201. }
  1202. if (flowRequestsReceived > lastFlowRequestsReceived)
  1203. {
  1204. DBGLOG("%u more flow requests received by this server (%u total)", flowRequestsReceived-lastFlowRequestsReceived, flowRequestsReceived-0);
  1205. lastFlowRequestsReceived = flowRequestsReceived;
  1206. }
  1207. if (flowPermitsSent > lastFlowPermitsSent)
  1208. {
  1209. DBGLOG("%u more flow permits sent by this server (%u total)", flowPermitsSent-lastFlowPermitsSent, flowPermitsSent-0);
  1210. lastFlowPermitsSent = flowPermitsSent;
  1211. }
  1212. if (dataPacketsReceived > lastDataPacketsReceived)
  1213. {
  1214. DBGLOG("%u more data packets received by this server (%u total)", dataPacketsReceived-lastDataPacketsReceived, dataPacketsReceived-0);
  1215. lastDataPacketsReceived = dataPacketsReceived;
  1216. }
  1217. }
  1218. }
  1219. }
  1220. catch (IException *e)
  1221. {
  1222. if (running && e->errorCode() != JSOCKERR_timeout_expired)
  1223. {
  1224. StringBuffer s;
  1225. DBGLOG("UdpReceiver: receive_data::run read failed port=%u - Exp: %s", parent.data_port, e->errorMessage(s).str());
  1226. MilliSleep(1000); // Give a chance for mem free
  1227. }
  1228. e->Release();
  1229. }
  1230. catch (...)
  1231. {
  1232. DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
  1233. MilliSleep(1000);
  1234. }
  1235. }
  1236. ::Release(b);
  1237. return 0;
  1238. }
  1239. };
  1240. class CPacketCollator : public Thread
  1241. {
  1242. CReceiveManager &parent;
  1243. public:
  1244. CPacketCollator(CReceiveManager &_parent) : Thread("CPacketCollator"), parent(_parent) {}
  1245. virtual int run()
  1246. {
  1247. DBGLOG("UdpReceiver: CPacketCollator::run");
  1248. parent.collatePackets();
  1249. return 0;
  1250. }
  1251. } collatorThread;
  1252. friend class receive_receive_flow;
  1253. friend class receive_send_flow;
  1254. friend class receive_data;
  1255. friend class ReceiveFlowManager;
  1256. queue_t *input_queue;
  1257. receive_receive_flow *receive_flow;
  1258. receive_data *data;
  1259. const int input_queue_size;
  1260. const int receive_flow_port;
  1261. const int data_port;
  1262. std::atomic<bool> running = { false };
  1263. bool encrypted = false;
  1264. typedef std::map<ruid_t, CMessageCollator*> uid_map;
  1265. uid_map collators;
  1266. CriticalSection collatorsLock; // protects access to collators map
  1267. public:
  1268. IMPLEMENT_IINTERFACE;
  1269. CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int queue_size, bool _encrypted)
  1270. : collatorThread(*this), encrypted(_encrypted),
  1271. sendersTable([client_flow_port](const ServerIdentifier ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);}),
  1272. input_queue_size(queue_size), receive_flow_port(server_flow_port), data_port(d_port)
  1273. {
  1274. #ifndef _WIN32
  1275. if (udpAdjustThreadPriorities)
  1276. setpriority(PRIO_PROCESS, 0, -15);
  1277. #endif
  1278. assertex(data_port != receive_flow_port);
  1279. input_queue = new queue_t(queue_size);
  1280. data = new receive_data(*this);
  1281. //NOTE: If all slots are allocated to a single client, then if that server goes down it will prevent any data being received from
  1282. //any other sender for the udpPermitTimeout period
  1283. unsigned maxSlotsPerClient = (udpMaxPendingPermits == 1) ? queue_size : (udpMaxClientPercent * queue_size) / (udpMaxPendingPermits * 100);
  1284. assertex(maxSlotsPerClient != 0);
  1285. if (maxSlotsPerClient > queue_size)
  1286. maxSlotsPerClient = queue_size;
  1287. if (udpResendLostPackets && maxSlotsPerClient > TRACKER_BITS)
  1288. maxSlotsPerClient = TRACKER_BITS;
  1289. receive_flow = new receive_receive_flow(*this, server_flow_port, maxSlotsPerClient);
  1290. running = true;
  1291. collatorThread.start();
  1292. data->start();
  1293. receive_flow->start();
  1294. MilliSleep(15);
  1295. }
  1296. ~CReceiveManager()
  1297. {
  1298. running = false;
  1299. input_queue->interrupt();
  1300. collatorThread.join();
  1301. delete data;
  1302. delete receive_flow;
  1303. delete input_queue;
  1304. }
  1305. virtual void detachCollator(const IMessageCollator *msgColl)
  1306. {
  1307. ruid_t ruid = msgColl->queryRUID();
  1308. if (udpTraceLevel >= 2) DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
  1309. {
  1310. CriticalBlock b(collatorsLock);
  1311. collators.erase(ruid);
  1312. }
  1313. msgColl->Release();
  1314. }
  1315. void collatePackets()
  1316. {
  1317. while(running)
  1318. {
  1319. try
  1320. {
  1321. DataBuffer *dataBuff = input_queue->pop(true);
  1322. collatePacket(dataBuff);
  1323. }
  1324. catch (IException * e)
  1325. {
  1326. //An interrupted semaphore exception is expected at closedown - anything else should be reported
  1327. if (!dynamic_cast<InterruptedSemaphoreException *>(e))
  1328. EXCLOG(e);
  1329. e->Release();
  1330. }
  1331. }
  1332. }
  1333. void collatePacket(DataBuffer *dataBuff)
  1334. {
  1335. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  1336. if (udpTraceLevel >= 4)
  1337. {
  1338. StringBuffer s;
  1339. DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
  1340. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
  1341. }
  1342. Linked <CMessageCollator> msgColl;
  1343. bool isDefault = false;
  1344. {
  1345. try
  1346. {
  1347. CriticalBlock b(collatorsLock);
  1348. msgColl.set(collators[pktHdr->ruid]);
  1349. if (!msgColl)
  1350. {
  1351. msgColl.set(collators[RUID_DISCARD]);
  1352. isDefault = true;
  1353. unwantedDiscarded++;
  1354. }
  1355. }
  1356. catch (IException *E)
  1357. {
  1358. EXCLOG(E);
  1359. E->Release();
  1360. }
  1361. catch (...)
  1362. {
  1363. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  1364. EXCLOG(E);
  1365. E->Release();
  1366. }
  1367. }
  1368. if (udpTraceLevel && isDefault && !isUdpTestMode)
  1369. {
  1370. StringBuffer s;
  1371. DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str());
  1372. }
  1373. if (msgColl && msgColl->attach_databuffer(dataBuff))
  1374. dataBuff = nullptr;
  1375. else
  1376. dataBuff->Release();
  1377. }
  1378. virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
  1379. {
  1380. CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
  1381. if (udpTraceLevel > 2)
  1382. DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
  1383. {
  1384. CriticalBlock b(collatorsLock);
  1385. collators[ruid] = msgColl;
  1386. }
  1387. msgColl->Link();
  1388. return msgColl;
  1389. }
  1390. };
  1391. IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
  1392. int udpQueueSize, bool encrypted)
  1393. {
  1394. return new CReceiveManager(server_flow_port, data_port, client_flow_port, udpQueueSize, encrypted);
  1395. }
  1396. /*
  1397. Thoughts on flow control / streaming:
  1398. 1. The "continuation packet" mechanism does have some advantages
  1399. - easy recovery from agent failures
  1400. - agent recovers easily from Roxie server failures
  1401. - flow control is simple (but is it effective?)
  1402. 2. Abandoning continuation packet in favour of streaming would give us the following issues:
  1403. - would need some flow control to stop getting ahead of a Roxie server that consumed slowly
  1404. - flow control is non trivial if you want to avoid tying up a agent thread and want agent to be able to recover from Roxie server failure
  1405. - Need to work out how to do GSS - the nextGE info needs to be passed back in the flow control?
  1406. - can't easily recover from agent failures if you already started processing
  1407. - unless you assume that the results from agent are always deterministic and can retry and skip N
  1408. - potentially ties up a agent thread for a while
  1409. - do we need to have a larger thread pool but limit how many actually active?
  1410. 3. Order of work
  1411. - Just adding streaming while ignoring flow control and continuation stuff (i.e. we still stop for permission to continue periodically)
  1412. - Shouldn't make anything any _worse_ ...
  1413. - except that won't be able to recover from a agent dying mid-stream (at least not without some considerable effort)
  1414. - what will happen then?
  1415. - May also break server-side caching (that no-one has used AFAIK). Maybe restrict to nohits as we change....
  1416. - Add some flow control
  1417. - would prevent agent getting too far ahead in cases that are inadequately flow-controlled today
  1418. - shouldn't make anything any worse...
  1419. - Think about removing continuation mechanism from some cases
  1420. Per Gavin, streaming would definitely help for the lowest frequency term. It may help for the others as well if it avoided any significant start up costs - e.g., opening the indexes,
  1421. creating the segment monitors, creating the various cursors, and serialising the context (especially because there are likely to be multiple cursors).
  1422. To add streaming:
  1423. - Need to check for meta availability other than when first received
  1424. - when ?
  1425. - Need to cope with a getNext() blocking without it causing issues
  1426. - perhaps should recode getNext() of variable-size rows first?
  1427. More questions:
  1428. - Can we afford the memory for the resend info?
  1429. - Save maxPacketsPerSender per sender ?
  1430. - are we really handling restart and sequence wraparound correctly?
  1431. - what about server-side caching? Makes it hard
  1432. - but maybe we should only cache tiny replies anyway....
  1433. Problems found while testing implemetnation:
  1434. - the unpacker cursor read code is crap
  1435. - there is a potential to deadlock when need to make a callback agent->server during a streamed result (indexread5 illustrates)
  1436. - resolution callback code doesn't really need to be query specific - could go to the default handler
  1437. - but other callbacks - ALIVE, EXCEPTION, and debugger are not so clear
  1438. - It's not at all clear where to move the code for processing metadata
  1439. - callback paradigm would solve both - but it has to be on a client thread (e.g. from within call to next()).
  1440. The following are used in "pseudo callback" mode:
  1441. #define ROXIE_DEBUGREQUEST 0x3ffffff7u
  1442. #define ROXIE_DEBUGCALLBACK 0x3ffffff8u
  1443. #define ROXIE_PING 0x3ffffff9u
  1444. - goes to own handler anyway
  1445. #define ROXIE_TRACEINFO 0x3ffffffau
  1446. - could go in meta? Not time critical. Could all go to single handler? (a bit hard since we want to intercept for caller...)
  1447. #define ROXIE_FILECALLBACK 0x3ffffffbu
  1448. - could go to single handler
  1449. #define ROXIE_ALIVE 0x3ffffffcu
  1450. - currently getting delayed a bit too much potentially if downstream processing is slow? Do I even need it if streaming?
  1451. #define ROXIE_KEYEDLIMIT_EXCEEDED 0x3ffffffdu
  1452. - could go in metadata of standard response
  1453. #define ROXIE_LIMIT_EXCEEDED 0x3ffffffeu
  1454. - ditto
  1455. #define ROXIE_EXCEPTION 0x3fffffffu
  1456. - ditto
  1457. And the continuation metadata.
  1458. What if EVERYTHING was a callback? - here's an exception... here's some more rows... here's some tracing... here's some continuation metadata
  1459. Somewhere sometime I need to marshall from one thread to another though (maybe more than once unless I can guarantee callback is always very fast)
  1460. OR (is it the same) everything is metadata ? Metadata can contain any of the above information (apart from rows - or maybe they are just another type)
  1461. If I can't deal quickly with a packet of information, I queue it up? Spanning complicates things though. I need to be able to spot complete portions of metadata
  1462. (and in kind-of the same way I need to be able to spot complete rows of data even when they span multiple packets.) I think data is really a bit different from the rest -
  1463. you expect it to be continuous and you want the others to interrupt the flow.
  1464. If continuation info was restricted to a "yes/no" (i.e. had to be continued on same node as started on) could have simple "Is there any continuation" bit. Others are sent in their
  1465. own packets so are a little different. Does that make it harder to recover? Not sure that it does really (just means that the window at which a failure causes a problem starts earlier).
  1466. However it may be an issue tying up agent thread for a while (and do we know when to untie it if the Roxie server abandons/restarts?)
  1467. Perhaps it makes sense to pause at this point (with streaming disabled and with retry mechanism optional)
  1468. */