ccdqueue.cpp 119 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include <jio.hpp>
  16. #include <jqueue.tpp>
  17. #include <jsocket.hpp>
  18. #include <jlog.hpp>
  19. #include "jisem.hpp"
  20. #include "jencrypt.hpp"
  21. #include "udplib.hpp"
  22. #include "udptopo.hpp"
  23. #include "udpsha.hpp"
  24. #include "ccd.hpp"
  25. #include "ccddebug.hpp"
  26. #include "ccdquery.hpp"
  27. #include "ccdstate.hpp"
  28. #include "ccdqueue.ipp"
  29. #include "ccdsnmp.hpp"
  30. #ifdef _USE_CPPUNIT
  31. #include <cppunit/extensions/HelperMacros.h>
  32. #endif
  33. using roxiemem::OwnedRoxieRow;
  34. using roxiemem::OwnedConstRoxieRow;
  35. using roxiemem::IRowManager;
  36. using roxiemem::DataBuffer;
  37. //============================================================================================
  38. RoxiePacketHeader::RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  39. {
  40. packetlength = sizeof(RoxiePacketHeader);
  41. #ifdef TIME_PACKETS
  42. tick = 0;
  43. #endif
  44. init(_remoteId, _uid, _channel, _overflowSequence);
  45. }
  46. RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _activityId, unsigned subChannel) : serverId(source.serverId)
  47. {
  48. // Used to create the header to send a callback to originating server or an IBYTI to a buddy
  49. activityId = _activityId;
  50. uid = source.uid;
  51. queryHash = source.queryHash;
  52. channel = source.channel;
  53. overflowSequence = source.overflowSequence;
  54. continueSequence = source.continueSequence;
  55. if (_activityId >= ROXIE_ACTIVITY_SPECIAL_FIRST && _activityId <= ROXIE_ACTIVITY_SPECIAL_LAST)
  56. overflowSequence |= OUTOFBAND_SEQUENCE; // Need to make sure it is not treated as dup of actual reply in the udp layer
  57. retries = getSubChannelMask(subChannel) | (source.retries & ~ROXIE_RETRIES_MASK);
  58. #ifdef TIME_PACKETS
  59. tick = source.tick;
  60. #endif
  61. #ifdef SUBCHANNELS_IN_HEADER
  62. memcpy(subChannels, source.subChannels, sizeof(subChannels));
  63. #endif
  64. packetlength = sizeof(RoxiePacketHeader);
  65. }
  66. unsigned RoxiePacketHeader::getSubChannelMask(unsigned subChannel)
  67. {
  68. return SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  69. }
  70. unsigned RoxiePacketHeader::priorityHash() const
  71. {
  72. // Used to determine which agent to act as primary and which as secondary for a given packet (thus spreading the load)
  73. // It's important that we do NOT include channel (since that would result in different values for the different agents responding to a broadcast)
  74. // We also don't include continueSequence since we'd prefer continuations to go the same way as original
  75. unsigned hash = serverId.hash();
  76. hash = hashc((const unsigned char *) &uid, sizeof(uid), hash);
  77. hash += overflowSequence; // MORE - is this better than hashing?
  78. if (traceLevel > 9)
  79. {
  80. StringBuffer s;
  81. DBGLOG("Calculating hash: %s hash was %d", toString(s).str(), hash);
  82. }
  83. return hash;
  84. }
  85. void RoxiePacketHeader::copy(const RoxiePacketHeader &oh)
  86. {
  87. // used for saving away kill packets for later matching by match
  88. uid = oh.uid;
  89. overflowSequence = oh.overflowSequence;
  90. continueSequence = oh.continueSequence;
  91. serverId = oh.serverId;
  92. channel = oh.channel;
  93. // MORE - would it be safer, maybe even faster to copy the rest too?
  94. }
  95. bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
  96. {
  97. // used when matching up a kill packet against a pending one...
  98. // DO NOT compare activityId - they are not supposed to match, since 0 in activityid identifies ibyti!
  99. return
  100. oh.uid==uid &&
  101. (oh.overflowSequence & ~OUTOFBAND_SEQUENCE) == (overflowSequence & ~OUTOFBAND_SEQUENCE) &&
  102. oh.continueSequence == continueSequence &&
  103. oh.serverId==serverId &&
  104. oh.channel==channel;
  105. }
  106. void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  107. {
  108. retries = 0;
  109. activityId = _remoteId.activityId;
  110. queryHash = _remoteId.queryHash;
  111. uid = _uid;
  112. serverId = myNode;
  113. channel = _channel;
  114. overflowSequence = _overflowSequence;
  115. continueSequence = 0;
  116. #ifdef SUBCHANNELS_IN_HEADER
  117. clearSubChannels();
  118. #endif
  119. }
  120. #ifdef SUBCHANNELS_IN_HEADER
  121. void RoxiePacketHeader::clearSubChannels()
  122. {
  123. for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
  124. subChannels[idx].clear();
  125. }
  126. #endif
  127. StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
  128. {
  129. const IpAddress serverIP = serverId.getIpAddress();
  130. ret.append("activityId=");
  131. switch(activityId & ~ROXIE_PRIORITY_MASK)
  132. {
  133. case 0: ret.append("IBYTI"); break;
  134. case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
  135. case ROXIE_PING: ret.append("ROXIE_PING"); break;
  136. case ROXIE_TRACEINFO: ret.append("ROXIE_TRACEINFO"); break;
  137. case ROXIE_DEBUGREQUEST: ret.append("ROXIE_DEBUGREQUEST"); break;
  138. case ROXIE_DEBUGCALLBACK: ret.append("ROXIE_DEBUGCALLBACK"); break;
  139. case ROXIE_FILECALLBACK: ret.append("ROXIE_FILECALLBACK"); break;
  140. case ROXIE_ALIVE: ret.append("ROXIE_ALIVE"); break;
  141. case ROXIE_KEYEDLIMIT_EXCEEDED: ret.append("ROXIE_KEYEDLIMIT_EXCEEDED"); break;
  142. case ROXIE_LIMIT_EXCEEDED: ret.append("ROXIE_LIMIT_EXCEEDED"); break;
  143. case ROXIE_EXCEPTION: ret.append("ROXIE_EXCEPTION"); break;
  144. default:
  145. ret.appendf("%u", (activityId & ~(ROXIE_ACTIVITY_FETCH | ROXIE_PRIORITY_MASK)));
  146. if (activityId & ROXIE_ACTIVITY_FETCH)
  147. ret.appendf(" (fetch part)");
  148. break;
  149. }
  150. ret.appendf(" uid=" RUIDF " pri=", uid);
  151. switch(activityId & ROXIE_PRIORITY_MASK)
  152. {
  153. case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
  154. case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
  155. case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
  156. default: ret.append("???"); break;
  157. }
  158. ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
  159. serverIP.getIpText(ret);
  160. if (retries)
  161. {
  162. if (retries==QUERY_ABORTED)
  163. ret.append(" retries=QUERY_ABORTED");
  164. else
  165. {
  166. if (retries & ROXIE_RETRIES_MASK)
  167. ret.appendf(" retries=%04x", retries);
  168. if (retries & ROXIE_FASTLANE)
  169. ret.appendf(" FASTLANE");
  170. if (retries & ROXIE_BROADCAST)
  171. ret.appendf(" BROADCAST");
  172. }
  173. }
  174. #ifdef SUBCHANNELS_IN_HEADER
  175. ret.append(" subchannels=");
  176. for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
  177. {
  178. if (subChannels[idx].isNull())
  179. break;
  180. if (idx)
  181. ret.append(',');
  182. subChannels[idx].getTraceText(ret);
  183. if (subChannels[idx].isMe())
  184. {
  185. ret.append("(me)");
  186. }
  187. }
  188. #endif
  189. return ret;
  190. }
  191. bool RoxiePacketHeader::allChannelsFailed()
  192. {
  193. unsigned mask = (1 << (getNumAgents(channel) * SUBCHANNEL_BITS)) - 1;
  194. return (retries & mask) == mask;
  195. }
  196. bool RoxiePacketHeader::retry()
  197. {
  198. bool worthRetrying = false;
  199. unsigned mask = SUBCHANNEL_MASK;
  200. unsigned numAgents = getNumAgents(channel);
  201. for (unsigned subChannel = 0; subChannel < numAgents; subChannel++)
  202. {
  203. unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
  204. if (subRetries != SUBCHANNEL_MASK)
  205. subRetries++;
  206. if (subRetries != SUBCHANNEL_MASK)
  207. worthRetrying = true;
  208. retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
  209. mask <<= SUBCHANNEL_BITS;
  210. }
  211. return worthRetrying;
  212. }
  213. void RoxiePacketHeader::setException(unsigned subChannel)
  214. {
  215. retries |= SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  216. }
  217. unsigned RoxiePacketHeader::thisChannelRetries(unsigned subChannel)
  218. {
  219. unsigned shift = SUBCHANNEL_BITS * subChannel;
  220. unsigned mask = SUBCHANNEL_MASK << shift;
  221. return (retries & mask) >> shift;
  222. }
  223. //============================================================================================
  224. unsigned getReplicationLevel(unsigned channel)
  225. {
  226. if (!channel)
  227. return 0;
  228. Owned<const ITopologyServer> topology = getTopology();
  229. return topology->queryChannelInfo(channel).replicationLevel();
  230. }
  231. //============================================================================================
  232. // This function maps a agent number to the multicast ip used to talk to it.
  233. IpAddress multicastBase("239.1.1.1"); // TBD IPv6 (need IPv6 multicast addresses?
  234. IpAddress multicastLast("239.1.5.254");
  235. const IpAddress &getChannelIp(IpAddress &ip, unsigned _channel)
  236. {
  237. // need to be careful to avoid the .0's and the .255's (not sure why...)
  238. ip = multicastBase;
  239. if (!ip.ipincrement(_channel,1,254,1,0xffff)
  240. ||(ip.ipcompare(multicastLast)>0))
  241. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Out-of-range multicast channel %d", _channel);
  242. return ip;
  243. }
  244. static Owned<ISocket> multicastSocket;
  245. void joinMulticastChannel(unsigned channel)
  246. {
  247. IpAddress multicastIp;
  248. getChannelIp(multicastIp, channel);
  249. SocketEndpoint ep(ccdMulticastPort, multicastIp);
  250. StringBuffer epStr;
  251. ep.getUrlStr(epStr);
  252. if (!multicastSocket->join_multicast_group(ep))
  253. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to join multicast channel %d (%s)", channel, epStr.str());
  254. if (traceLevel)
  255. DBGLOG("Joined multicast channel %d (%s)", channel, epStr.str());
  256. }
  257. static SocketEndpointArray multicastEndpoints; // indexed by channel
  258. void setMulticastEndpoints(unsigned numChannels)
  259. {
  260. for (unsigned channel = 0; channel <= numChannels; channel++) // NOTE - channel 0 is special, and numChannels does not include it
  261. {
  262. IpAddress multicastIp;
  263. getChannelIp(multicastIp, channel);
  264. multicastEndpoints.append(SocketEndpoint(ccdMulticastPort, multicastIp));
  265. }
  266. }
  267. void openMulticastSocket()
  268. {
  269. if (!multicastSocket)
  270. {
  271. multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
  272. if (multicastTTL)
  273. {
  274. multicastSocket->set_ttl(multicastTTL);
  275. DBGLOG("Roxie: multicastTTL: %u", multicastTTL);
  276. }
  277. else
  278. DBGLOG("Roxie: multicastTTL not set");
  279. multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
  280. size32_t actualSize = multicastSocket->get_receive_buffer_size();
  281. if (actualSize < udpMulticastBufferSize)
  282. {
  283. DBGLOG("Roxie: multicast socket buffer size could not be set (requested=%d actual %d", udpMulticastBufferSize, actualSize);
  284. throwUnexpected();
  285. }
  286. if (traceLevel)
  287. DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", ccdMulticastPort, udpMulticastBufferSize, actualSize);
  288. if (roxieMulticastEnabled && !localAgent)
  289. {
  290. Owned<const ITopologyServer> topology = getTopology();
  291. for (unsigned channel : topology->queryChannels())
  292. {
  293. assertex(channel);
  294. joinMulticastChannel(channel);
  295. }
  296. joinMulticastChannel(0); // all agents also listen on channel 0
  297. }
  298. }
  299. }
  300. void closeMulticastSockets()
  301. {
  302. multicastSocket.clear();
  303. }
  304. static bool channelWrite(RoxiePacketHeader &buf, bool includeSelf)
  305. {
  306. size32_t minwrote = 0;
  307. if (roxieMulticastEnabled)
  308. {
  309. return multicastSocket->udp_write_to(multicastEndpoints.item(buf.channel), &buf, buf.packetlength) == buf.packetlength;
  310. }
  311. else
  312. {
  313. #ifdef SUBCHANNELS_IN_HEADER
  314. // In the containerized system, the list of subchannel IPs is captured in the packet header to ensure everyone is using the
  315. // same snapshot of the topology state.
  316. // If the subchannel IPs are not set, fill them in now. If they are set, use them.
  317. if (buf.subChannels[0].isNull())
  318. {
  319. Owned<const ITopologyServer> topo = getTopology();
  320. const SocketEndpointArray &eps = topo->queryAgents(buf.channel);
  321. if (!eps.ordinality())
  322. throw makeStringExceptionV(0, "No agents available for channel %d", buf.channel);
  323. if (buf.channel==0)
  324. {
  325. // Note that we expand any writes on channel 0 here, since we need to capture the server's view of what agents are on each channel
  326. bool allOk = true;
  327. if (traceRoxiePackets)
  328. {
  329. StringBuffer header;
  330. DBGLOG("Translating packet sent to channel 0: %s", buf.toString(header).str());
  331. }
  332. for (unsigned channel = 0; channel < numChannels; channel++)
  333. {
  334. buf.channel = channel+1;
  335. if (!channelWrite(buf, true))
  336. allOk = false;
  337. buf.clearSubChannels();
  338. }
  339. buf.channel = 0;
  340. return allOk;
  341. }
  342. unsigned hdrHashVal = buf.priorityHash();
  343. unsigned numAgents = eps.ordinality();
  344. unsigned subChannel = (hdrHashVal % numAgents);
  345. for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
  346. {
  347. if (idx == numAgents)
  348. break;
  349. buf.subChannels[idx].setIp(eps.item(subChannel));
  350. subChannel++;
  351. if (subChannel == numAgents)
  352. subChannel = 0;
  353. }
  354. }
  355. else
  356. {
  357. assert(buf.channel != 0);
  358. }
  359. for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
  360. {
  361. if (buf.subChannels[subChannel].isNull())
  362. break;
  363. if (includeSelf || !buf.subChannels[subChannel].isMe())
  364. {
  365. if (traceRoxiePackets)
  366. {
  367. StringBuffer s, header;
  368. DBGLOG("Writing %d bytes to subchannel %d (%s) %s", buf.packetlength, subChannel, buf.subChannels[subChannel].getTraceText(s).str(), buf.toString(header).str());
  369. }
  370. SocketEndpoint ep(ccdMulticastPort, buf.subChannels[subChannel].getIpAddress());
  371. size32_t wrote = multicastSocket->udp_write_to(ep, &buf, buf.packetlength);
  372. if (!subChannel || wrote < minwrote)
  373. minwrote = wrote;
  374. if (delaySubchannelPackets)
  375. MilliSleep(100);
  376. }
  377. else if (traceRoxiePackets)
  378. {
  379. StringBuffer s, header;
  380. DBGLOG("NOT writing %d bytes to subchannel %d (%s) %s", buf.packetlength, subChannel, buf.subChannels[subChannel].getTraceText(s).str(), buf.toString(header).str());
  381. }
  382. }
  383. #else
  384. Owned<const ITopologyServer> topo = getTopology();
  385. const SocketEndpointArray &eps = topo->queryAgents(buf.channel);
  386. if (!eps.ordinality())
  387. throw makeStringExceptionV(0, "No agents available for channel %d", buf.channel);
  388. ForEachItemIn(idx, eps)
  389. {
  390. size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), &buf, buf.packetlength);
  391. if (!idx || wrote < minwrote)
  392. minwrote = wrote;
  393. }
  394. #endif
  395. }
  396. return minwrote==buf.packetlength;
  397. }
  398. //============================================================================================
  399. class CRoxieQueryPacketBase : public CInterface
  400. {
  401. protected:
  402. RoxiePacketHeader *data;
  403. const byte *traceInfo;
  404. unsigned traceLength;
  405. public:
  406. IMPLEMENT_IINTERFACE;
  407. CRoxieQueryPacketBase(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
  408. {
  409. assertex(lengthRemaining >= (int) sizeof(RoxiePacketHeader));
  410. data->packetlength = lengthRemaining;
  411. const byte *finger = (const byte *) (data + 1);
  412. lengthRemaining -= sizeof(RoxiePacketHeader);
  413. if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
  414. {
  415. traceInfo = NULL;
  416. traceLength = 0;
  417. }
  418. else
  419. {
  420. assertex(lengthRemaining > 1);
  421. traceInfo = finger;
  422. lengthRemaining--;
  423. if (*finger++ & LOGGING_DEBUGGERACTIVE)
  424. {
  425. assertex(lengthRemaining >= (int) sizeof(unsigned short));
  426. unsigned short debugLen = *(unsigned short *) finger;
  427. finger += debugLen + sizeof(unsigned short);
  428. lengthRemaining -= debugLen + sizeof(unsigned short);
  429. }
  430. for (;;)
  431. {
  432. assertex(lengthRemaining>0);
  433. if (!*finger)
  434. {
  435. lengthRemaining--;
  436. finger++;
  437. break;
  438. }
  439. lengthRemaining--;
  440. finger++;
  441. }
  442. traceLength = finger - traceInfo;
  443. }
  444. }
  445. ~CRoxieQueryPacketBase()
  446. {
  447. free(data);
  448. }
  449. };
  450. // MORE - this is for TESTING ONLY - do not release with this key here like this!
  451. static byte key[32] = {
  452. 0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
  453. 0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
  454. };
  455. class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPacket
  456. {
  457. protected:
  458. const byte *continuationData = nullptr;
  459. unsigned continuationLength = 0;
  460. const byte *smartStepInfoData = nullptr;
  461. unsigned smartStepInfoLength = 0;
  462. const byte *contextData = nullptr;
  463. unsigned contextLength = 0;
  464. public:
  465. IMPLEMENT_IINTERFACE;
  466. CRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacketBase(_data, length)
  467. {
  468. const byte *finger = (const byte *) (data + 1) + traceLength;
  469. int lengthRemaining = length - sizeof(RoxiePacketHeader) - traceLength;
  470. if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
  471. {
  472. continuationData = NULL;
  473. continuationLength = 0;
  474. smartStepInfoData = NULL;
  475. smartStepInfoLength = 0;
  476. }
  477. else
  478. {
  479. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  480. {
  481. assertex(lengthRemaining >= (int) sizeof(unsigned));
  482. continuationLength = *(unsigned *) finger;
  483. continuationData = finger + sizeof(unsigned);
  484. finger = continuationData + continuationLength;
  485. lengthRemaining -= continuationLength + sizeof(unsigned);
  486. }
  487. if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
  488. {
  489. assertex(lengthRemaining >= (int) sizeof(unsigned));
  490. smartStepInfoLength = *(unsigned *) finger;
  491. smartStepInfoData = finger + sizeof(unsigned);
  492. finger = smartStepInfoData + smartStepInfoLength;
  493. lengthRemaining -= smartStepInfoLength + sizeof(unsigned);
  494. }
  495. }
  496. assertex(lengthRemaining >= 0);
  497. contextData = finger;
  498. contextLength = lengthRemaining;
  499. }
  500. virtual RoxiePacketHeader &queryHeader() const
  501. {
  502. return *data;
  503. }
  504. virtual const byte *queryTraceInfo() const
  505. {
  506. return traceInfo;
  507. }
  508. virtual unsigned getTraceLength() const
  509. {
  510. return traceLength;
  511. }
  512. virtual const void *queryContinuationData() const
  513. {
  514. return continuationData;
  515. }
  516. virtual unsigned getContinuationLength() const
  517. {
  518. return continuationLength;
  519. }
  520. virtual const byte *querySmartStepInfoData() const
  521. {
  522. return smartStepInfoData;
  523. }
  524. virtual unsigned getSmartStepInfoLength() const
  525. {
  526. return smartStepInfoLength;
  527. }
  528. virtual const void *queryContextData() const
  529. {
  530. return contextData;
  531. }
  532. virtual unsigned getContextLength() const
  533. {
  534. return contextLength;
  535. }
  536. virtual IRoxieQueryPacket *clonePacket(unsigned channel) const
  537. {
  538. unsigned length = data->packetlength;
  539. RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
  540. memcpy(newdata, data, length);
  541. newdata->channel = channel;
  542. newdata->retries |= ROXIE_BROADCAST;
  543. return createRoxiePacket(newdata, length);
  544. }
  545. virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const
  546. {
  547. assertex((data->continueSequence & CONTINUE_SEQUENCE_SKIPTO) == 0); // Should not already be any skipto info in the source packet
  548. unsigned newDataSize = data->packetlength + sizeof(unsigned) + skipDataLen;
  549. char *newdata = (char *) malloc(newDataSize);
  550. unsigned headSize = sizeof(RoxiePacketHeader);
  551. if (traceLength)
  552. headSize += traceLength;
  553. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  554. headSize += sizeof(unsigned) + continuationLength;
  555. memcpy(newdata, data, headSize); // copy in leading part of old data
  556. ((RoxiePacketHeader *) newdata)->continueSequence |= CONTINUE_SEQUENCE_SKIPTO; // set flag indicating new data is present
  557. *(unsigned *) (newdata + headSize) = skipDataLen; // add length field for new data
  558. memcpy(newdata + headSize + sizeof(unsigned), skipData, skipDataLen); // copy in new data
  559. memcpy(newdata + headSize + sizeof(unsigned) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
  560. return createRoxiePacket(newdata, newDataSize);
  561. }
  562. virtual ISerializedRoxieQueryPacket *serialize() const override
  563. {
  564. unsigned length = data->packetlength;
  565. MemoryBuffer mb;
  566. if (encryptInTransit)
  567. {
  568. const byte *plainData = (const byte *) (data+1);
  569. plainData += traceLength;
  570. unsigned plainLen = length - sizeof(RoxiePacketHeader) - traceLength;
  571. mb.append(sizeof(RoxiePacketHeader)+traceLength, data); // Header and traceInfo are unencrypted
  572. aesEncrypt(key, sizeof(key), plainData, plainLen, mb); // Encrypt everything else
  573. RoxiePacketHeader *newHeader = (RoxiePacketHeader *) mb.toByteArray();
  574. newHeader->packetlength = mb.length();
  575. }
  576. else
  577. {
  578. mb.append(length, data);
  579. }
  580. return createSerializedRoxiePacket(mb);
  581. }
  582. };
  583. // CNocryptRoxieQueryPacket implements both serialized and deserialized packet interfaces, to avoid additional copy operations when
  584. // using localAgent mode.
  585. class CNocryptRoxieQueryPacket: public CRoxieQueryPacket, implements ISerializedRoxieQueryPacket
  586. {
  587. public:
  588. IMPLEMENT_IINTERFACE;
  589. CNocryptRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacket(_data, length)
  590. {
  591. }
  592. virtual RoxiePacketHeader &queryHeader() const
  593. {
  594. return CRoxieQueryPacket::queryHeader();
  595. }
  596. virtual const byte *queryTraceInfo() const
  597. {
  598. return traceInfo;
  599. }
  600. virtual unsigned getTraceLength() const
  601. {
  602. return traceLength;
  603. }
  604. virtual ISerializedRoxieQueryPacket *cloneSerializedPacket(unsigned channel) const
  605. {
  606. unsigned length = data->packetlength;
  607. RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
  608. memcpy(newdata, data, length);
  609. newdata->channel = channel;
  610. newdata->retries |= ROXIE_BROADCAST;
  611. return new CNocryptRoxieQueryPacket(newdata, length);
  612. }
  613. virtual ISerializedRoxieQueryPacket *serialize() const override
  614. {
  615. return const_cast<CNocryptRoxieQueryPacket *>(LINK(this));
  616. }
  617. virtual IRoxieQueryPacket *deserialize() const override
  618. {
  619. return const_cast<CNocryptRoxieQueryPacket *>(LINK(this));
  620. }
  621. };
  622. class CSerializedRoxieQueryPacket : public CRoxieQueryPacketBase, implements ISerializedRoxieQueryPacket
  623. {
  624. public:
  625. IMPLEMENT_IINTERFACE;
  626. CSerializedRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacketBase(_data, length)
  627. {
  628. }
  629. virtual RoxiePacketHeader &queryHeader() const
  630. {
  631. return *data;
  632. }
  633. virtual const byte *queryTraceInfo() const
  634. {
  635. return traceInfo;
  636. }
  637. virtual unsigned getTraceLength() const
  638. {
  639. return traceLength;
  640. }
  641. virtual ISerializedRoxieQueryPacket *cloneSerializedPacket(unsigned channel) const
  642. {
  643. unsigned length = data->packetlength;
  644. RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
  645. memcpy(newdata, data, length);
  646. newdata->channel = channel;
  647. newdata->retries |= ROXIE_BROADCAST;
  648. return new CSerializedRoxieQueryPacket(newdata, length);
  649. }
  650. virtual IRoxieQueryPacket *deserialize() const override
  651. {
  652. unsigned length = data->packetlength;
  653. MemoryBuffer mb;
  654. if (encryptInTransit)
  655. {
  656. const byte *encryptedData = (const byte *) (data+1);
  657. encryptedData += traceLength;
  658. unsigned encryptedLen = length - sizeof(RoxiePacketHeader) - traceLength;
  659. mb.append(sizeof(RoxiePacketHeader)+traceLength, data); // Header and traceInfo are unencrypted
  660. aesDecrypt(key, sizeof(key), encryptedData, encryptedLen, mb); // Decrypt everything else
  661. RoxiePacketHeader *newHeader = (RoxiePacketHeader *) mb.toByteArray();
  662. newHeader->packetlength = mb.length();
  663. }
  664. else
  665. {
  666. mb.append(length, data);
  667. }
  668. return createRoxiePacket(mb);
  669. }
  670. };
  671. extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
  672. {
  673. if (!encryptInTransit)
  674. return new CNocryptRoxieQueryPacket(_data, _len);
  675. if ((unsigned short)_len != _len)
  676. {
  677. StringBuffer s;
  678. RoxiePacketHeader *header = (RoxiePacketHeader *) _data;
  679. header->toString(s);
  680. free(_data);
  681. throw MakeStringException(ROXIE_PACKET_ERROR, "Packet length %d exceeded maximum sending packet %s", _len, s.str());
  682. }
  683. return new CRoxieQueryPacket(_data, _len);
  684. }
  685. extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
  686. {
  687. unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
  688. return createRoxiePacket(m.detachOwn(), length);
  689. }
  690. extern IRoxieQueryPacket *deserializeCallbackPacket(MemoryBuffer &m)
  691. {
  692. // Direct decryption of special packets - others are only decrypted after being dequeued
  693. if (encryptInTransit)
  694. {
  695. RoxiePacketHeader *header = (RoxiePacketHeader *) m.toByteArray();
  696. assertex(header != nullptr);
  697. assertex(header->activityId == ROXIE_FILECALLBACK || header->activityId == ROXIE_DEBUGCALLBACK);
  698. assertex(m.length() >= header->packetlength);
  699. unsigned encryptedLen = header->packetlength - sizeof(RoxiePacketHeader);
  700. const void *encryptedData = (const void *)(header+1);
  701. MemoryBuffer decrypted;
  702. decrypted.append(sizeof(RoxiePacketHeader), header);
  703. decrypted.ensureCapacity(encryptedLen); // May be up to 16 bytes smaller...
  704. aesDecrypt(key, sizeof(key), encryptedData, encryptedLen, decrypted);
  705. unsigned length = decrypted.length();
  706. RoxiePacketHeader *newHeader = (RoxiePacketHeader *) decrypted.detachOwn();
  707. newHeader->packetlength = length;
  708. return createRoxiePacket(newHeader, length);
  709. }
  710. else
  711. {
  712. unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
  713. return createRoxiePacket(m.detachOwn(), length);
  714. }
  715. }
  716. extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m)
  717. {
  718. unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
  719. return new CSerializedRoxieQueryPacket(m.detachOwn(), length);
  720. }
  721. //=================================================================================
  722. AgentContextLogger::AgentContextLogger()
  723. {
  724. GetHostIp(ip);
  725. set(NULL);
  726. }
  727. AgentContextLogger::AgentContextLogger(ISerializedRoxieQueryPacket *packet)
  728. {
  729. GetHostIp(ip);
  730. set(packet);
  731. }
  732. void AgentContextLogger::set(ISerializedRoxieQueryPacket *packet)
  733. {
  734. anyOutput = false;
  735. intercept = false;
  736. debuggerActive = false;
  737. checkingHeap = false;
  738. aborted = false;
  739. stats.reset();
  740. start = msTick();
  741. if (packet)
  742. {
  743. CriticalBlock b(crit); // Why?
  744. RoxiePacketHeader &header = packet->queryHeader();
  745. const byte *traceInfo = packet->queryTraceInfo();
  746. StringBuffer s;
  747. if (traceInfo)
  748. {
  749. unsigned traceLength = packet->getTraceLength();
  750. unsigned char loggingFlags = *traceInfo;
  751. if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL
  752. {
  753. traceInfo++;
  754. traceLength--;
  755. if (loggingFlags & LOGGING_INTERCEPTED)
  756. intercept = true;
  757. if (loggingFlags & LOGGING_TRACELEVELSET)
  758. {
  759. ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
  760. traceLength--;
  761. }
  762. if (loggingFlags & LOGGING_BLIND)
  763. blind = true;
  764. if (loggingFlags & LOGGING_CHECKINGHEAP)
  765. checkingHeap = true;
  766. if (loggingFlags & LOGGING_DEBUGGERACTIVE)
  767. {
  768. assertex(traceLength > sizeof(unsigned short));
  769. debuggerActive = true;
  770. unsigned short debugLen = *(unsigned short *) traceInfo;
  771. traceInfo += debugLen + sizeof(unsigned short);
  772. traceLength -= debugLen + sizeof(unsigned short);
  773. }
  774. // Passing the wuid via the logging context prefix is a lot of a hack...
  775. if (loggingFlags & LOGGING_WUID)
  776. {
  777. unsigned wuidLen = 0;
  778. while (wuidLen < traceLength)
  779. {
  780. if (traceInfo[wuidLen]=='@'||traceInfo[wuidLen]==':')
  781. break;
  782. wuidLen++;
  783. }
  784. wuid.set((const char *) traceInfo, wuidLen);
  785. }
  786. }
  787. s.append(traceLength, (const char *) traceInfo);
  788. s.append("|");
  789. }
  790. channel = header.channel;
  791. ip.getIpText(s);
  792. s.append(':').append(channel);
  793. StringContextLogger::set(s.str());
  794. if (intercept || mergeAgentStatistics)
  795. {
  796. RoxiePacketHeader newHeader(header, ROXIE_TRACEINFO, 0); // subchannel not relevant
  797. output.setown(ROQ->createOutputStream(newHeader, true, *this));
  798. }
  799. }
  800. else
  801. {
  802. StringContextLogger::set("");
  803. channel = 0;
  804. }
  805. }
  806. void AgentContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const
  807. {
  808. if (output && mergeAgentStatistics)
  809. {
  810. MemoryBuffer buf;
  811. buf.append((char) LOG_CHILDCOUNT); // A special log entry for the stats
  812. buf.append(subGraphId);
  813. buf.append(actId);
  814. buf.append(idx);
  815. buf.append(processed);
  816. buf.append(strands);
  817. }
  818. }
  819. void AgentContextLogger::putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const
  820. {
  821. if (output && mergeAgentStatistics)
  822. {
  823. MemoryBuffer buf;
  824. buf.append((char) LOG_CHILDSTATS); // A special log entry for the stats
  825. buf.append(subGraphId);
  826. buf.append(actId);
  827. if (stats.serialize(buf))
  828. {
  829. unsigned len = buf.length();
  830. void *ret = output->getBuffer(len, true);
  831. memcpy(ret, buf.toByteArray(), len);
  832. output->putBuffer(ret, len, true);
  833. anyOutput = true;
  834. }
  835. }
  836. }
  837. void AgentContextLogger::flush()
  838. {
  839. if (output)
  840. {
  841. CriticalBlock b(crit);
  842. if (mergeAgentStatistics)
  843. {
  844. MemoryBuffer buf;
  845. buf.append((char) LOG_STATVALUES); // A special log entry for the stats
  846. if (stats.serialize(buf))
  847. {
  848. unsigned len = buf.length();
  849. void *ret = output->getBuffer(len, true);
  850. memcpy(ret, buf.toByteArray(), len);
  851. output->putBuffer(ret, len, true);
  852. anyOutput = true;
  853. }
  854. }
  855. ForEachItemIn(idx, log)
  856. {
  857. MemoryBuffer buf;
  858. LogItem &logItem = log.item(idx);
  859. logItem.serialize(buf);
  860. unsigned len = buf.length();
  861. void *ret = output->getBuffer(len, true);
  862. memcpy(ret, buf.toByteArray(), len);
  863. output->putBuffer(ret, len, true);
  864. anyOutput = true;
  865. }
  866. log.kill();
  867. if (anyOutput)
  868. output->flush();
  869. output.clear();
  870. }
  871. }
  872. //=================================================================================
  873. static SpinLock onDemandQueriesCrit;
  874. static MapXToMyClass<hash64_t, hash64_t, IQueryFactory> onDemandQueryCache;
  875. void sendUnloadMessage(hash64_t hash, const char *id, const IRoxieContextLogger &logctx)
  876. {
  877. RemoteActivityId unloadId(ROXIE_UNLOAD, hash);
  878. RoxiePacketHeader header(unloadId, 0, 0, 0);
  879. MemoryBuffer mb;
  880. mb.append(sizeof(RoxiePacketHeader), &header);
  881. mb.append((char) LOGGING_FLAGSPRESENT);
  882. mb.append(id);
  883. if (traceLevel > 1)
  884. DBGLOG("UNLOAD sent for query %s", id);
  885. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  886. ROQ->sendPacket(packet, logctx);
  887. }
  888. void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  889. {
  890. const RoxiePacketHeader &header = packet->queryHeader();
  891. unsigned channelNo = header.channel;
  892. if (logctx.queryTraceLevel())
  893. logctx.CTXLOG("Unload received for channel %d", channelNo);
  894. hash64_t hashValue = header.queryHash;
  895. hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  896. SpinBlock b(onDemandQueriesCrit);
  897. onDemandQueryCache.remove(hashValue);
  898. }
  899. void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
  900. {
  901. hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  902. SpinBlock b(onDemandQueriesCrit);
  903. onDemandQueryCache.setValue(hashValue, query);
  904. }
  905. //=================================================================================
  906. struct PingRecord
  907. {
  908. unsigned tick;
  909. IpAddress senderIP;
  910. };
  911. void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  912. {
  913. const RoxiePacketHeader &header = packet->queryHeader();
  914. const IpAddress serverIP = header.serverId.getIpAddress();
  915. unsigned contextLength = packet->getContextLength();
  916. if (contextLength != sizeof(PingRecord))
  917. {
  918. StringBuffer s;
  919. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Unexpected data size %d (expected %d) in PING: %s", contextLength, (unsigned) sizeof(PingRecord), header.toString(s).str());
  920. }
  921. const PingRecord *data = (const PingRecord *) packet->queryContextData();
  922. if (!serverIP.ipequals(data->senderIP))
  923. {
  924. StringBuffer s;
  925. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
  926. }
  927. RoxiePacketHeader newHeader(header, ROXIE_PING, 0); // subchannel not relevant
  928. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  929. void *ret = output->getBuffer(contextLength, false);
  930. memcpy(ret, data, contextLength);
  931. output->putBuffer(ret, contextLength, false);
  932. output->flush();
  933. }
  934. //=================================================================================
  935. static ThreadId roxiePacketReaderThread = 0;
  936. class IBYTIbuffer
  937. {
  938. // This class is used to track a finite set of recently-received IBYTI messages, that may have arrived before the messages they refer to
  939. // It is accessed ONLY from the main reader thread and as such does not need to be threadsafe (but does need to be fast).
  940. // We use a circular buffer, and don't bother removing anything (just treat old items as expired). If the buffer overflows we will end up
  941. // discarding the oldest tracked orphaned IBYTI - but that's ok, no worse than if we hadn't tracked them at all.
  942. public:
  943. IBYTIbuffer(unsigned _numOrphans) : numOrphans(_numOrphans)
  944. {
  945. assertex(numOrphans);
  946. orphans = new RoxiePacketHeader[numOrphans];
  947. tail = 0;
  948. }
  949. void noteOrphan(const RoxiePacketHeader &hdr)
  950. {
  951. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  952. unsigned now = msTick();
  953. // We could trace that the buffer may be too small, if (orphans[tail].activityId >= now)
  954. orphans[tail].copy(hdr);
  955. orphans[tail].activityId = now + IBYTIbufferLifetime;
  956. tail++;
  957. if (tail == numOrphans)
  958. tail = 0;
  959. }
  960. bool lookup(const RoxiePacketHeader &hdr) const
  961. {
  962. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  963. unsigned now = msTick();
  964. unsigned lookat = tail;
  965. do
  966. {
  967. if (!lookat)
  968. lookat = numOrphans;
  969. lookat--;
  970. if ((int) (orphans[lookat].activityId - now) < 0) // Watch out for wrapping
  971. break; // expired;
  972. if (orphans[lookat].matchPacket(hdr))
  973. return true;
  974. } while (lookat != tail);
  975. return false;
  976. }
  977. private:
  978. RoxiePacketHeader *orphans = nullptr;
  979. unsigned tail = 0;
  980. unsigned numOrphans = 0;
  981. };
  982. //=================================================================================
  983. //
  984. // RoxieQueue - holds pending transactions on a roxie agent
  985. class RoxieQueue : public CInterface, implements IThreadFactory
  986. {
  987. Owned <IThreadPool> workers;
  988. QueueOf<ISerializedRoxieQueryPacket, true> waiting;
  989. Semaphore available;
  990. CriticalSection qcrit;
  991. unsigned headRegionSize;
  992. unsigned numWorkers;
  993. RelaxedAtomic<unsigned> started;
  994. std::atomic<unsigned> idle;
  995. IBYTIbuffer *myIBYTIbuffer = nullptr;
  996. void noteQueued()
  997. {
  998. maxQueueLength.store_max(++queueLength);
  999. // NOTE - there is a small race condition here - if idle is 1 but two enqueue's happen
  1000. // close enough together that the signal has not yet caused idle to come back down to zero, then the
  1001. // desired new thread may not be created. It's unlikely, and it's benign in that the query is still
  1002. // processed and the thread will be created next time the HWM is reached.
  1003. if (started < numWorkers && idle==0)
  1004. {
  1005. workers->start(this);
  1006. started++;
  1007. }
  1008. }
  1009. public:
  1010. IMPLEMENT_IINTERFACE;
  1011. RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
  1012. {
  1013. headRegionSize = _headRegionSize;
  1014. numWorkers = _numWorkers;
  1015. workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
  1016. started = 0;
  1017. idle = 0;
  1018. if (IBYTIbufferSize)
  1019. myIBYTIbuffer = new IBYTIbuffer(IBYTIbufferSize);
  1020. }
  1021. ~RoxieQueue()
  1022. {
  1023. delete myIBYTIbuffer;
  1024. }
  1025. virtual IPooledThread *createNew();
  1026. void abortChannel(unsigned channel);
  1027. void start()
  1028. {
  1029. if (prestartAgentThreads)
  1030. {
  1031. while (started < numWorkers)
  1032. {
  1033. workers->start(this);
  1034. started++;
  1035. }
  1036. }
  1037. }
  1038. IPooledThreadIterator *running()
  1039. {
  1040. return workers->running();
  1041. }
  1042. void stopAll()
  1043. {
  1044. workers->stopAll(true);
  1045. signal(workers->runningCount());
  1046. }
  1047. void join()
  1048. {
  1049. workers->joinAll(true);
  1050. workers.clear(); // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
  1051. }
  1052. void enqueue(ISerializedRoxieQueryPacket *x)
  1053. {
  1054. {
  1055. #ifdef TIME_PACKETS
  1056. x->queryHeader().tick = msTick();
  1057. #endif
  1058. CriticalBlock qc(qcrit);
  1059. waiting.enqueue(x);
  1060. noteQueued();
  1061. }
  1062. available.signal();
  1063. }
  1064. void enqueueUnique(ISerializedRoxieQueryPacket *x, unsigned subChannel)
  1065. {
  1066. RoxiePacketHeader &header = x->queryHeader();
  1067. #ifdef TIME_PACKETS
  1068. header.tick = msTick();
  1069. #endif
  1070. bool found = false;
  1071. {
  1072. CriticalBlock qc(qcrit);
  1073. unsigned len = waiting.ordinality();
  1074. unsigned i;
  1075. for (i = 0; i < len; i++)
  1076. {
  1077. ISerializedRoxieQueryPacket *queued = waiting.item(i);
  1078. if (queued && queued->queryHeader().matchPacket(header))
  1079. {
  1080. found = true;
  1081. break;
  1082. }
  1083. }
  1084. if (!found)
  1085. waiting.enqueue(x);
  1086. }
  1087. if (found)
  1088. {
  1089. if (traceLevel > 0)
  1090. {
  1091. StringBuffer xx;
  1092. AgentContextLogger l(x);
  1093. l.CTXLOG("Ignored retry on subchannel %u for queued activity %s", subChannel, header.toString(xx).str());
  1094. }
  1095. if (!subChannel)
  1096. retriesIgnoredPrm++;
  1097. else
  1098. retriesIgnoredSec++;
  1099. x->Release();
  1100. }
  1101. else
  1102. {
  1103. available.signal();
  1104. noteQueued();
  1105. if (traceLevel > 10)
  1106. {
  1107. AgentContextLogger l(x);
  1108. StringBuffer xx;
  1109. l.CTXLOG("enqueued %s", header.toString(xx).str());
  1110. }
  1111. }
  1112. }
  1113. bool remove(RoxiePacketHeader &x)
  1114. {
  1115. unsigned scanLength = 0;
  1116. ISerializedRoxieQueryPacket *found = nullptr;
  1117. {
  1118. CriticalBlock qc(qcrit);
  1119. unsigned len = waiting.ordinality();
  1120. unsigned i;
  1121. for (i = 0; i < len; i++)
  1122. {
  1123. ISerializedRoxieQueryPacket *queued = waiting.item(i);
  1124. if (queued)
  1125. {
  1126. scanLength++;
  1127. if (queued->queryHeader().matchPacket(x))
  1128. {
  1129. waiting.set(i, NULL);
  1130. found = queued;
  1131. break;
  1132. }
  1133. }
  1134. }
  1135. }
  1136. if (found)
  1137. {
  1138. #ifdef _DEBUG
  1139. RoxiePacketHeader &header = found->queryHeader();
  1140. AgentContextLogger l(found);
  1141. StringBuffer xx;
  1142. l.CTXLOG("discarded %s", header.toString(xx).str());
  1143. #endif
  1144. found->Release();
  1145. queueLength--;
  1146. if (scanLength > maxScanLength)
  1147. maxScanLength = scanLength;
  1148. totScanLength += scanLength;
  1149. totScans++;
  1150. return true;
  1151. }
  1152. else
  1153. return false;
  1154. }
  1155. void wait()
  1156. {
  1157. idle++;
  1158. available.wait();
  1159. idle--;
  1160. }
  1161. void signal(unsigned num)
  1162. {
  1163. available.signal(num);
  1164. }
  1165. ISerializedRoxieQueryPacket *dequeue()
  1166. {
  1167. CriticalBlock qc(qcrit);
  1168. unsigned lim = waiting.ordinality();
  1169. if (lim)
  1170. {
  1171. if (headRegionSize)
  1172. {
  1173. if (lim > headRegionSize)
  1174. lim = headRegionSize;
  1175. return waiting.dequeue(fastRand() % lim);
  1176. }
  1177. return waiting.dequeue();
  1178. }
  1179. else
  1180. return NULL;
  1181. }
  1182. unsigned getHeadRegionSize() const
  1183. {
  1184. return headRegionSize;
  1185. }
  1186. unsigned setHeadRegionSize(unsigned newsize)
  1187. {
  1188. unsigned ret = headRegionSize;
  1189. headRegionSize = newsize;
  1190. return ret;
  1191. }
  1192. void noteOrphanIBYTI(const RoxiePacketHeader &hdr)
  1193. {
  1194. if (myIBYTIbuffer)
  1195. myIBYTIbuffer->noteOrphan(hdr);
  1196. }
  1197. bool lookupOrphanIBYTI(const RoxiePacketHeader &hdr) const
  1198. {
  1199. if (myIBYTIbuffer)
  1200. return myIBYTIbuffer->lookup(hdr);
  1201. else
  1202. return false;
  1203. }
  1204. };
  1205. class CRoxieWorker : public CInterface, implements IPooledThread
  1206. {
  1207. RoxieQueue *queue;
  1208. CriticalSection actCrit;
  1209. #ifndef NEW_IBYTI
  1210. Semaphore ibytiSem;
  1211. #endif
  1212. bool stopped;
  1213. bool abortJob;
  1214. bool busy;
  1215. Owned<IRoxieAgentActivity> activity;
  1216. Owned<IRoxieQueryPacket> packet;
  1217. Owned<const ITopologyServer> topology;
  1218. AgentContextLogger logctx;
  1219. public:
  1220. IMPLEMENT_IINTERFACE;
  1221. CRoxieWorker()
  1222. {
  1223. queue = NULL;
  1224. stopped = false;
  1225. busy = false;
  1226. abortJob = false;
  1227. }
  1228. virtual void init(void *_r) override
  1229. {
  1230. queue = (RoxieQueue *) _r;
  1231. stopped = false;
  1232. busy = false;
  1233. abortJob = false;
  1234. }
  1235. virtual bool canReuse() const override
  1236. {
  1237. return true;
  1238. }
  1239. virtual bool stop() override
  1240. {
  1241. stopped = true;
  1242. return true;
  1243. }
  1244. inline void setActivity(IRoxieAgentActivity *act)
  1245. {
  1246. CriticalBlock b(actCrit);
  1247. activity.setown(act);
  1248. }
  1249. inline bool match(RoxiePacketHeader &h)
  1250. {
  1251. // There is a window between getting packet from queue and being able to match it.
  1252. // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
  1253. CriticalBlock b(actCrit);
  1254. return packet && packet->queryHeader().matchPacket(h);
  1255. }
  1256. void abortChannel(unsigned channel)
  1257. {
  1258. CriticalBlock b(actCrit);
  1259. if (packet && packet->queryHeader().channel==channel)
  1260. {
  1261. abortJob = true;
  1262. #ifndef NEW_IBYTI
  1263. if (doIbytiDelay)
  1264. ibytiSem.signal();
  1265. #endif
  1266. if (activity)
  1267. activity->abort();
  1268. }
  1269. }
  1270. bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
  1271. {
  1272. CriticalBlock b(actCrit);
  1273. if (packet && packet->queryHeader().matchPacket(h))
  1274. {
  1275. queryFound = true;
  1276. abortJob = true;
  1277. #ifndef NEW_IBYTI
  1278. if (doIbytiDelay)
  1279. ibytiSem.signal();
  1280. #endif
  1281. if (activity)
  1282. {
  1283. // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority agent
  1284. // (more primary in the rank). The agents with higher rank will hold the lower bits of the retries field in IBYTI packet).
  1285. #ifdef SUBCHANNELS_IN_HEADER
  1286. if (!checkRank || h.getRespondingSubChannel() < h.mySubChannel())
  1287. #else
  1288. if (!checkRank || topology->queryChannelInfo(h.channel).otherAgentHasPriority(h.priorityHash(), h.getRespondingSubChannel()))
  1289. #endif
  1290. {
  1291. activity->abort();
  1292. return true;
  1293. }
  1294. else
  1295. {
  1296. return false;
  1297. }
  1298. }
  1299. if (busy)
  1300. {
  1301. preActivity = true;
  1302. return true;
  1303. }
  1304. }
  1305. return false;
  1306. }
  1307. void throwRemoteException(IException *E, IRoxieAgentActivity *activity, IRoxieQueryPacket *packet, bool isUser)
  1308. {
  1309. try
  1310. {
  1311. if (activity && (logctx.queryTraceLevel() > 1))
  1312. {
  1313. StringBuffer act;
  1314. activity->toString(act);
  1315. logctx.CTXLOG("throwRemoteException, activity %s, isUser=%d", act.str(), (int) isUser);
  1316. if (!isUser)
  1317. EXCLOG(E, "throwRemoteException");
  1318. }
  1319. RoxiePacketHeader &header = packet->queryHeader();
  1320. #ifdef SUBCHANNELS_IN_HEADER
  1321. unsigned mySubChannel = header.mySubChannel();
  1322. #else
  1323. unsigned mySubChannel = topology->queryChannelInfo(header.channel).subChannel();
  1324. #endif
  1325. // I failed to do the query, but already sent out IBYTI - resend it so someone else can try
  1326. if (!isUser)
  1327. {
  1328. StringBuffer s;
  1329. s.append("Exception in agent for packet ");
  1330. header.toString(s);
  1331. logctx.logOperatorException(E, NULL, 0, "%s", s.str());
  1332. header.setException(mySubChannel);
  1333. if (!header.allChannelsFailed() && !localAgent)
  1334. {
  1335. if (logctx.queryTraceLevel() > 1)
  1336. logctx.CTXLOG("resending packet from agent in case others want to try it");
  1337. ROQ->sendPacket(packet, logctx);
  1338. }
  1339. }
  1340. RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION, mySubChannel);
  1341. if (isUser)
  1342. newHeader.retries = (unsigned short) -1;
  1343. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1344. StringBuffer message("<Exception>");
  1345. message.appendf("<Code>%d</Code><Message>", E->errorCode());
  1346. StringBuffer err;
  1347. E->errorMessage(err);
  1348. encodeXML(err.str(), message);
  1349. message.append("</Message></Exception>");
  1350. unsigned len = message.length();
  1351. void *ret = output->getBuffer(len+1, true);
  1352. memcpy(ret, message.str(), len+1);
  1353. output->putBuffer(ret, len+1, true);
  1354. output->flush();
  1355. E->Release();
  1356. }
  1357. catch (IException *EInE)
  1358. {
  1359. EXCLOG(EInE, "Exception during throwRemoteException");
  1360. E->Release();
  1361. EInE->Release();
  1362. }
  1363. catch (...)
  1364. {
  1365. logctx.CTXLOG("Unknown Exception during throwRemoteException");
  1366. E->Release();
  1367. }
  1368. }
  1369. void doActivity()
  1370. {
  1371. RoxiePacketHeader &header = packet->queryHeader();
  1372. unsigned channel = header.channel;
  1373. hash64_t queryHash = packet->queryHeader().queryHash;
  1374. unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
  1375. Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
  1376. #ifdef SUBCHANNELS_IN_HEADER
  1377. unsigned mySubChannel = header.mySubChannel();
  1378. #else
  1379. unsigned numAgents = topology->queryAgents(channel).ordinality();
  1380. unsigned mySubChannel = topology->queryChannelInfo(channel).subChannel();
  1381. #endif
  1382. if (!queryFactory && logctx.queryWuid())
  1383. {
  1384. Owned <IRoxieDaliHelper> daliHelper = connectToDali();
  1385. Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(logctx.queryWuid(), NULL);
  1386. queryFactory.setown(createAgentQueryFactoryFromWu(wu, channel));
  1387. if (queryFactory)
  1388. cacheOnDemandQuery(queryHash, channel, queryFactory);
  1389. }
  1390. if (!queryFactory)
  1391. {
  1392. StringBuffer hdr;
  1393. IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie agent received request for unregistered query: %s", packet->queryHeader().toString(hdr).str());
  1394. EXCLOG(E, "doActivity");
  1395. throwRemoteException(E, activity, packet, false);
  1396. return;
  1397. }
  1398. try
  1399. {
  1400. bool debugging = logctx.queryDebuggerActive();
  1401. if (debugging)
  1402. {
  1403. if (mySubChannel)
  1404. abortJob = true; // when debugging, we always run on primary only...
  1405. }
  1406. #ifndef NEW_IBYTI
  1407. #ifdef SUBCHANNELS_IN_HEADER
  1408. else if (doIbytiDelay && mySubChannel)
  1409. {
  1410. unsigned delay = 0;
  1411. for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
  1412. delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
  1413. unsigned start = 0;
  1414. if (traceRoxiePackets)
  1415. {
  1416. StringBuffer x;
  1417. DBGLOG("YES myTurnToDelay subchannel=%u delay=%u %s", mySubChannel, delay, header.toString(x).str());
  1418. start = msTick();
  1419. }
  1420. if (delay)
  1421. ibytiSem.wait(delay);
  1422. if (traceRoxiePackets)
  1423. {
  1424. StringBuffer x;
  1425. DBGLOG("Delay done, abortJob=%d, elapsed=%d", (int) abortJob, msTick()-start);
  1426. }
  1427. if (!abortJob)
  1428. {
  1429. for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
  1430. noteNodeSick(header.subChannels[subChannel]);
  1431. }
  1432. }
  1433. #else
  1434. else if (doIbytiDelay && (numAgents > 1))
  1435. {
  1436. unsigned hdrHashVal = header.priorityHash();
  1437. unsigned primarySubChannel = (hdrHashVal % numAgents);
  1438. if (primarySubChannel != mySubChannel)
  1439. {
  1440. unsigned delay = topology->queryChannelInfo(channel).getIbytiDelay(primarySubChannel);
  1441. if (logctx.queryTraceLevel() > 6)
  1442. {
  1443. StringBuffer x;
  1444. logctx.CTXLOG("YES myTurnToDelayIBYTI subchannel=%u delay=%u hash=%u %s", mySubChannel, delay, hdrHashVal, header.toString(x).str());
  1445. }
  1446. // MORE: if we are dealing with a query that was on channel 0, we may want a longer delay
  1447. // (since the theory about duplicated work not mattering when cluster is idle does not hold up)
  1448. if (delay)
  1449. {
  1450. ibytiSem.wait(delay);
  1451. if (!abortJob)
  1452. topology->queryChannelInfo(channel).noteChannelsSick(primarySubChannel);
  1453. if (logctx.queryTraceLevel() > 8)
  1454. {
  1455. StringBuffer x;
  1456. logctx.CTXLOG("Buddy did%s send IBYTI, updated delay : %s",
  1457. abortJob ? "" : " NOT", header.toString(x).str());
  1458. }
  1459. }
  1460. }
  1461. else
  1462. {
  1463. #ifndef NO_IBYTI_DELAYS_COUNT
  1464. if (!mySubChannel)
  1465. ibytiNoDelaysPrm++;
  1466. else
  1467. ibytiNoDelaysSec++;
  1468. #endif
  1469. if (logctx.queryTraceLevel() > 6)
  1470. {
  1471. StringBuffer x;
  1472. logctx.CTXLOG("NOT myTurnToDelayIBYTI subchannel=%u hash=%u %s", mySubChannel, hdrHashVal, header.toString(x).str());
  1473. }
  1474. }
  1475. }
  1476. #endif
  1477. #endif
  1478. if (abortJob)
  1479. {
  1480. CriticalBlock b(actCrit);
  1481. busy = false; // Keep order - before setActivity below
  1482. if (logctx.queryTraceLevel() > 5)
  1483. {
  1484. StringBuffer x;
  1485. logctx.CTXLOG("Stop before processing - activity aborted %s", header.toString(x).str());
  1486. }
  1487. return;
  1488. }
  1489. if (!debugging)
  1490. ROQ->sendIbyti(header, logctx, mySubChannel);
  1491. activitiesStarted++;
  1492. Owned <IAgentActivityFactory> factory = queryFactory->getAgentActivityFactory(activityId);
  1493. assertex(factory);
  1494. setActivity(factory->createActivity(logctx, packet));
  1495. Owned<IMessagePacker> output = activity->process();
  1496. if (logctx.queryTraceLevel() > 5)
  1497. {
  1498. StringBuffer x;
  1499. logctx.CTXLOG("done processing %s", header.toString(x).str());
  1500. }
  1501. if (output)
  1502. {
  1503. activitiesCompleted++;
  1504. busy = false; // Keep order - before setActivity below
  1505. setActivity(NULL); // Ensures all stats are merged from child queries etc
  1506. logctx.flush();
  1507. output->flush();
  1508. }
  1509. }
  1510. catch (IUserException *E)
  1511. {
  1512. throwRemoteException(E, activity, packet, true);
  1513. }
  1514. catch (IException *E)
  1515. {
  1516. if (E->errorCode()!=ROXIE_ABORT_ERROR)
  1517. throwRemoteException(E, activity, packet, false);
  1518. else
  1519. E->Release();
  1520. }
  1521. catch (...)
  1522. {
  1523. throwRemoteException(MakeStringException(ROXIE_MULTICAST_ERROR, "Unknown exception"), activity, packet, false);
  1524. }
  1525. busy = false; // Keep order - before setActivity below
  1526. setActivity(NULL);
  1527. }
  1528. virtual void threadmain() override
  1529. {
  1530. while (!stopped)
  1531. {
  1532. try
  1533. {
  1534. for (;;)
  1535. {
  1536. queue->wait();
  1537. if (stopped)
  1538. break;
  1539. agentsActive++;
  1540. maxAgentsActive.store_max(agentsActive);
  1541. abortJob = false;
  1542. busy = true;
  1543. #ifndef NEW_IBYTI
  1544. if (doIbytiDelay)
  1545. ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
  1546. #endif
  1547. Owned<ISerializedRoxieQueryPacket> next = queue->dequeue();
  1548. if (next)
  1549. {
  1550. logctx.set(next);
  1551. packet.setown(next->deserialize());
  1552. next.clear();
  1553. queueLength--;
  1554. RoxiePacketHeader &header = packet->queryHeader();
  1555. #ifdef TIME_PACKETS
  1556. {
  1557. unsigned now = msTick();
  1558. unsigned packetWait = now-header.tick;
  1559. header.tick = now;
  1560. packetWaitMax.store_max(packetWait);
  1561. packetWaitElapsed += packetWait;
  1562. packetWaitCount++;
  1563. }
  1564. #endif
  1565. topology.setown(getTopology());
  1566. if (logctx.queryTraceLevel() > 10)
  1567. {
  1568. StringBuffer x;
  1569. logctx.CTXLOG("dequeued %s", header.toString(x).str());
  1570. }
  1571. if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_UNLOAD)
  1572. {
  1573. doUnload(packet, logctx);
  1574. }
  1575. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_PING)
  1576. {
  1577. doPing(packet, logctx);
  1578. }
  1579. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_DEBUGREQUEST)
  1580. {
  1581. // MORE - we need to make sure only executed on primary, and that the proxyId (== pointer to DebugGraphManager) is still valid.
  1582. // It may be that there is not a lot of point using the pointer - may as well use an non-reused ID and look it up in a global hash table of active ones
  1583. doDebugRequest(packet, logctx);
  1584. }
  1585. else if (header.channel)
  1586. doActivity();
  1587. else
  1588. throwUnexpected(); // channel 0 requests translated earlier now
  1589. #ifdef TIME_PACKETS
  1590. {
  1591. unsigned now = msTick();
  1592. unsigned packetRun = now-header.tick;
  1593. packetRunMax.store_max(packetRun);
  1594. packetRunElapsed += packetRun;
  1595. packetRunCount++;
  1596. }
  1597. #endif
  1598. }
  1599. busy = false;
  1600. {
  1601. CriticalBlock b(actCrit);
  1602. packet.clear();
  1603. topology.clear();
  1604. logctx.set(NULL);
  1605. }
  1606. agentsActive--;
  1607. }
  1608. }
  1609. catch(IException *E)
  1610. {
  1611. CriticalBlock b(actCrit);
  1612. EXCLOG(E);
  1613. if (packet)
  1614. {
  1615. throwRemoteException(E, NULL, packet, false);
  1616. packet.clear();
  1617. }
  1618. else
  1619. E->Release();
  1620. topology.clear();
  1621. }
  1622. catch(...)
  1623. {
  1624. CriticalBlock b(actCrit);
  1625. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
  1626. EXCLOG(E);
  1627. if (packet)
  1628. {
  1629. throwRemoteException(E.getClear(), NULL, packet, false);
  1630. packet.clear();
  1631. }
  1632. topology.clear();
  1633. }
  1634. }
  1635. }
  1636. };
  1637. IPooledThread *RoxieQueue::createNew()
  1638. {
  1639. return new CRoxieWorker;
  1640. }
  1641. void RoxieQueue::abortChannel(unsigned channel)
  1642. {
  1643. Owned<IPooledThreadIterator> wi = workers->running();
  1644. ForEach(*wi)
  1645. {
  1646. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1647. w.abortChannel(channel);
  1648. }
  1649. }
  1650. //=================================================================================
  1651. class CallbackEntry : implements IPendingCallback, public CInterface
  1652. {
  1653. const RoxiePacketHeader &header;
  1654. StringAttr lfn;
  1655. InterruptableSemaphore ready;
  1656. MemoryBuffer data;
  1657. bool gotData;
  1658. public:
  1659. IMPLEMENT_IINTERFACE;
  1660. CallbackEntry(const RoxiePacketHeader &_header, const char *_lfn) : header(_header), lfn(_lfn)
  1661. {
  1662. gotData = false;
  1663. }
  1664. virtual bool wait(unsigned msecs)
  1665. {
  1666. return ready.wait(msecs);
  1667. }
  1668. virtual MemoryBuffer &queryData()
  1669. {
  1670. return data;
  1671. }
  1672. bool matches(RoxiePacketHeader &cand, const char *_lfn)
  1673. {
  1674. return (cand.matchPacket(header) && (!_lfn|| stricmp(_lfn, lfn)==0));
  1675. }
  1676. void doFileCallback(unsigned _len, const void *_data, bool aborted)
  1677. {
  1678. // MORE - make sure we call this for whole query abort as well as for callback abort
  1679. if (aborted)
  1680. ready.interrupt(MakeStringException(0, "Interrupted"));
  1681. else if (!gotData)
  1682. {
  1683. gotData = true;
  1684. data.append(_len, _data);
  1685. ready.signal();
  1686. }
  1687. }
  1688. };
  1689. class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
  1690. {
  1691. protected:
  1692. RoxieQueue slaQueue;
  1693. RoxieQueue hiQueue;
  1694. RoxieQueue loQueue;
  1695. unsigned numWorkers;
  1696. public:
  1697. IMPLEMENT_IINTERFACE;
  1698. RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
  1699. {
  1700. }
  1701. virtual unsigned getHeadRegionSize() const
  1702. {
  1703. return loQueue.getHeadRegionSize();
  1704. }
  1705. virtual void setHeadRegionSize(unsigned newSize)
  1706. {
  1707. slaQueue.setHeadRegionSize(newSize);
  1708. hiQueue.setHeadRegionSize(newSize);
  1709. loQueue.setHeadRegionSize(newSize);
  1710. }
  1711. virtual void start()
  1712. {
  1713. loQueue.start();
  1714. hiQueue.start();
  1715. slaQueue.start();
  1716. }
  1717. virtual void stop()
  1718. {
  1719. loQueue.stopAll();
  1720. hiQueue.stopAll();
  1721. slaQueue.stopAll();
  1722. }
  1723. virtual void join()
  1724. {
  1725. loQueue.join();
  1726. hiQueue.join();
  1727. slaQueue.join();
  1728. }
  1729. IArrayOf<CallbackEntry> callbacks;
  1730. CriticalSection callbacksCrit;
  1731. virtual IPendingCallback *notePendingCallback(const RoxiePacketHeader &header, const char *lfn)
  1732. {
  1733. CriticalBlock b(callbacksCrit);
  1734. CallbackEntry *callback = new CallbackEntry(header, lfn);
  1735. callbacks.append(*callback);
  1736. return callback;
  1737. }
  1738. virtual void removePendingCallback(IPendingCallback *goer)
  1739. {
  1740. if (goer)
  1741. {
  1742. CriticalBlock b(callbacksCrit);
  1743. callbacks.zap(static_cast<CallbackEntry &>(*goer));
  1744. }
  1745. }
  1746. protected:
  1747. void doFileCallback(IRoxieQueryPacket *packet)
  1748. {
  1749. // This is called on the main agent reader thread so needs to be as fast as possible to avoid lost packets
  1750. const char *lfn;
  1751. const char *data;
  1752. unsigned len;
  1753. RoxiePacketHeader &header = packet->queryHeader();
  1754. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK)
  1755. {
  1756. lfn = (const char *) packet->queryContextData();
  1757. unsigned namelen = strlen(lfn) + 1;
  1758. data = lfn + namelen;
  1759. len = packet->getContextLength() - namelen;
  1760. }
  1761. else
  1762. {
  1763. lfn = data = NULL; // used when query aborted
  1764. len = 0;
  1765. }
  1766. CriticalBlock b(callbacksCrit);
  1767. ForEachItemIn(idx, callbacks)
  1768. {
  1769. CallbackEntry &c = callbacks.item(idx);
  1770. if (c.matches(header, lfn))
  1771. {
  1772. if (traceLevel > 10)
  1773. DBGLOG("callback return matched a waiting query");
  1774. c.doFileCallback(len, data, header.retries==QUERY_ABORTED);
  1775. }
  1776. }
  1777. }
  1778. };
  1779. #ifdef _MSC_VER
  1780. #pragma warning ( push )
  1781. #pragma warning ( disable: 4355 )
  1782. #endif
  1783. class RoxieThrottledPacketSender : public Thread
  1784. {
  1785. TokenBucket &bucket;
  1786. InterruptableSemaphore queued;
  1787. Semaphore started;
  1788. unsigned maxPacketSize;
  1789. SafeQueueOf<IRoxieQueryPacket, false> queue;
  1790. class DECL_EXCEPTION StoppedException: public IException, public CInterface
  1791. {
  1792. public:
  1793. IMPLEMENT_IINTERFACE;
  1794. int errorCode() const { return 0; }
  1795. StringBuffer & errorMessage(StringBuffer &str) const { return str.append("Stopped"); }
  1796. MessageAudience errorAudience() const { return MSGAUD_user; }
  1797. };
  1798. void enqueue(IRoxieQueryPacket *packet)
  1799. {
  1800. packet->Link();
  1801. queue.enqueue(packet);
  1802. queued.signal();
  1803. }
  1804. IRoxieQueryPacket *dequeue()
  1805. {
  1806. queued.wait();
  1807. return queue.dequeue();
  1808. }
  1809. public:
  1810. RoxieThrottledPacketSender(TokenBucket &_bucket, unsigned _maxPacketSize)
  1811. : Thread("RoxieThrottledPacketSender"), bucket(_bucket), maxPacketSize(_maxPacketSize)
  1812. {
  1813. start();
  1814. started.wait();
  1815. }
  1816. ~RoxieThrottledPacketSender()
  1817. {
  1818. stop();
  1819. join();
  1820. }
  1821. virtual int run()
  1822. {
  1823. started.signal();
  1824. for (;;)
  1825. {
  1826. try
  1827. {
  1828. Owned<IRoxieQueryPacket> packet = dequeue();
  1829. unsigned length = packet->queryHeader().packetlength;
  1830. {
  1831. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  1832. bucket.wait((length / 1024) + 1);
  1833. }
  1834. Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
  1835. if (!channelWrite(serialized->queryHeader(), true))
  1836. DBGLOG("Roxie packet write wrote too little");
  1837. packetsSent++;
  1838. }
  1839. catch (StoppedException *E)
  1840. {
  1841. E->Release();
  1842. break;
  1843. }
  1844. catch (IException *E)
  1845. {
  1846. EXCLOG(E);
  1847. E->Release();
  1848. }
  1849. catch (...)
  1850. {
  1851. }
  1852. }
  1853. return 0;
  1854. }
  1855. void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  1856. {
  1857. RoxiePacketHeader &header = x->queryHeader();
  1858. unsigned length = x->queryHeader().packetlength;
  1859. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  1860. switch (header.retries & ROXIE_RETRIES_MASK)
  1861. {
  1862. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  1863. {
  1864. StringBuffer s;
  1865. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  1866. }
  1867. break;
  1868. default:
  1869. {
  1870. StringBuffer s;
  1871. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  1872. }
  1873. break;
  1874. case 0:
  1875. if (logctx.queryTraceLevel() > 8)
  1876. {
  1877. StringBuffer s;
  1878. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  1879. }
  1880. break;
  1881. }
  1882. if (length > maxPacketSize)
  1883. {
  1884. StringBuffer s;
  1885. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  1886. }
  1887. enqueue(x);
  1888. }
  1889. void stop()
  1890. {
  1891. // bucket.stop();
  1892. queued.interrupt(new StoppedException);
  1893. }
  1894. };
  1895. //------------------------------------------------------------------------------------------------------------
  1896. #ifdef NEW_IBYTI
  1897. class DelayedPacketQueue
  1898. {
  1899. // Used to keep a list of all recently-received packets where we are not primary subchannel. There is one queue per subchannel level
  1900. // It is accessed ONLY from the main reader thread and does not need to be threadsafe (but does need to be fast)
  1901. // We use a doubly-linked list (not std::list as not quite flexible enough).
  1902. class DelayedPacketEntry
  1903. {
  1904. DelayedPacketEntry() = delete;
  1905. DelayedPacketEntry(const DelayedPacketEntry&) = delete;
  1906. public:
  1907. DelayedPacketEntry(ISerializedRoxieQueryPacket *_packet, unsigned _waitExpires)
  1908. : packet(_packet), waitExpires(_waitExpires)
  1909. {
  1910. }
  1911. ~DelayedPacketEntry()
  1912. {
  1913. if (prev)
  1914. prev->next = next;
  1915. if (next)
  1916. next->prev = prev;
  1917. }
  1918. bool matches(const RoxiePacketHeader &ibyti) const
  1919. {
  1920. return packet->queryHeader().matchPacket(ibyti);
  1921. }
  1922. ISerializedRoxieQueryPacket *getClear()
  1923. {
  1924. return packet.getClear();
  1925. }
  1926. StringBuffer & describe(StringBuffer &ret) const
  1927. {
  1928. return packet->queryHeader().toString(ret);
  1929. }
  1930. Owned<ISerializedRoxieQueryPacket> packet;
  1931. DelayedPacketEntry *next = nullptr;
  1932. DelayedPacketEntry *prev = nullptr;
  1933. unsigned waitExpires = 0;
  1934. };
  1935. public:
  1936. DelayedPacketQueue() = default;
  1937. DelayedPacketQueue(const DelayedPacketQueue&) = delete;
  1938. ~DelayedPacketQueue()
  1939. {
  1940. while (head)
  1941. removeEntry(head);
  1942. }
  1943. bool doIBYTI(const RoxiePacketHeader &ibyti)
  1944. {
  1945. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  1946. DelayedPacketEntry *finger = head;
  1947. while (finger)
  1948. {
  1949. if (finger->matches(ibyti))
  1950. {
  1951. if (traceRoxiePackets)
  1952. {
  1953. StringBuffer s;
  1954. DBGLOG("IBYTI removing delayed packet %s", finger->describe(s).str());
  1955. }
  1956. removeEntry(finger);
  1957. return true;
  1958. }
  1959. finger = finger->next;
  1960. }
  1961. return false;
  1962. }
  1963. void append(ISerializedRoxieQueryPacket *packet, unsigned expires)
  1964. {
  1965. // Goes on the end. But percolate the expiry time backwards
  1966. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  1967. DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires);
  1968. if (traceRoxiePackets)
  1969. {
  1970. StringBuffer s;
  1971. DBGLOG("Adding delayed packet %s", packet->queryHeader().toString(s).str());
  1972. }
  1973. newEntry->prev = tail;
  1974. if (tail)
  1975. {
  1976. tail->next = newEntry;
  1977. for (DelayedPacketEntry *finger = tail; finger != nullptr; finger = finger->prev)
  1978. {
  1979. if ((int) (finger->waitExpires - expires) <= 0)
  1980. break;
  1981. finger->waitExpires = expires;
  1982. finger = finger->prev;
  1983. }
  1984. }
  1985. else
  1986. head = newEntry;
  1987. tail = newEntry;
  1988. }
  1989. // Move any that we are done waiting for our buddy onto the active queue
  1990. void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
  1991. {
  1992. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  1993. DelayedPacketEntry *finger = head;
  1994. while (finger)
  1995. {
  1996. if (((int) (finger->waitExpires - now)) <= 0) // Oddly coded to handle wrapping
  1997. {
  1998. ISerializedRoxieQueryPacket *packet = finger->getClear();
  1999. const RoxiePacketHeader &header = packet->queryHeader();
  2000. if (traceRoxiePackets)
  2001. {
  2002. StringBuffer s;
  2003. DBGLOG("No IBYTI received yet for delayed packet %s", header.toString(s).str());
  2004. }
  2005. if (header.activityId & ROXIE_SLA_PRIORITY)
  2006. slaQueue.enqueue(packet);
  2007. else if (header.activityId & ROXIE_HIGH_PRIORITY)
  2008. hiQueue.enqueue(packet);
  2009. else
  2010. loQueue.enqueue(packet);
  2011. for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
  2012. {
  2013. if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
  2014. break;
  2015. noteNodeSick(header.subChannels[subChannel]);
  2016. }
  2017. DelayedPacketEntry *goer = finger;
  2018. finger = finger->next;
  2019. removeEntry(goer);
  2020. }
  2021. else
  2022. break;
  2023. }
  2024. }
  2025. // How long until the next time we want to call checkExpires() ?
  2026. unsigned timeout(unsigned now) const
  2027. {
  2028. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  2029. if (head)
  2030. {
  2031. int delay = (int) (head->waitExpires - now);
  2032. if (delay <= 0)
  2033. return 0;
  2034. else
  2035. return (unsigned) delay;
  2036. }
  2037. else
  2038. return (unsigned) -1;
  2039. }
  2040. private:
  2041. void removeEntry(DelayedPacketEntry *goer)
  2042. {
  2043. if (goer==head)
  2044. head = goer->next;
  2045. if (goer==tail)
  2046. tail = goer->prev;
  2047. delete goer;
  2048. }
  2049. DelayedPacketEntry *head = nullptr;
  2050. DelayedPacketEntry *tail = nullptr;
  2051. };
  2052. //------------------------------------------------------------------------------------------------------------
  2053. class DelayedPacketQueueChannel : public CInterface
  2054. {
  2055. // Manages a set of DelayedPacketQueues, one for each supported subchannel level.
  2056. DelayedPacketQueueChannel() = delete;
  2057. DelayedPacketQueueChannel(const DelayedPacketQueueChannel&) = delete;
  2058. public:
  2059. DelayedPacketQueueChannel(unsigned _channel) : channel(_channel)
  2060. {
  2061. }
  2062. inline unsigned queryChannel() const { return channel; }
  2063. inline DelayedPacketQueue &queryQueue(unsigned subchannel)
  2064. {
  2065. assertex(subchannel); // Subchannel 0 means primary and is never delayed
  2066. subchannel -= 1;
  2067. if (subchannel > maxSeen)
  2068. maxSeen = subchannel;
  2069. return queues[subchannel];
  2070. }
  2071. unsigned timeout(unsigned now) const
  2072. {
  2073. unsigned min = (unsigned) -1;
  2074. for (unsigned queue = 0; queue <= maxSeen; queue++)
  2075. {
  2076. unsigned t = queues[queue].timeout(now);
  2077. if (t < min)
  2078. min = t;
  2079. }
  2080. return min;
  2081. }
  2082. void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
  2083. {
  2084. for (unsigned queue = 0; queue <= maxSeen; queue++)
  2085. {
  2086. queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue);
  2087. }
  2088. }
  2089. private:
  2090. DelayedPacketQueue queues[MAX_SUBCHANNEL-1]; // Note - primary subchannel is not included
  2091. unsigned channel = 0;
  2092. unsigned maxSeen = 0;
  2093. };
  2094. class DelayedPacketQueueManager
  2095. {
  2096. public:
  2097. DelayedPacketQueueManager() = default;
  2098. DelayedPacketQueueManager(const DelayedPacketQueueManager&) = delete;
  2099. inline DelayedPacketQueue &queryQueue(unsigned channel, unsigned subchannel)
  2100. {
  2101. // Note - there are normally no more than a couple of channels on a single agent.
  2102. // If that were to change we could make this a fixed size array
  2103. assert(GetCurrentThreadId()==roxiePacketReaderThread);
  2104. ForEachItemIn(idx, channels)
  2105. {
  2106. DelayedPacketQueueChannel &i = channels.item(idx);
  2107. if (i.queryChannel() == channel)
  2108. return i.queryQueue(subchannel);
  2109. }
  2110. channels.append(*new DelayedPacketQueueChannel(channel));
  2111. return channels.tos().queryQueue(subchannel);
  2112. }
  2113. unsigned timeout(unsigned now) const
  2114. {
  2115. unsigned ret = (unsigned) -1;
  2116. ForEachItemIn(idx, channels)
  2117. {
  2118. unsigned t = channels.item(idx).timeout(now);
  2119. if (t < ret)
  2120. ret = t;
  2121. }
  2122. return ret;
  2123. }
  2124. void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
  2125. {
  2126. ForEachItemIn(idx, channels)
  2127. {
  2128. channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue);
  2129. }
  2130. }
  2131. private:
  2132. CIArrayOf<DelayedPacketQueueChannel> channels;
  2133. };
  2134. #endif
  2135. //------------------------------------------------------------------------------------------------------------
  2136. class RoxieSocketQueueManager : public RoxieReceiverBase
  2137. {
  2138. protected:
  2139. Linked<ISendManager> sendManager;
  2140. Linked<IReceiveManager> receiveManager;
  2141. Owned<RoxieThrottledPacketSender> throttledPacketSendManager;
  2142. Owned<TokenBucket> bucket;
  2143. unsigned maxPacketSize = 0;
  2144. std::atomic<bool> running = { false };
  2145. #ifdef NEW_IBYTI
  2146. DelayedPacketQueueManager delayed;
  2147. #endif
  2148. class ReceiverThread : public Thread
  2149. {
  2150. RoxieSocketQueueManager &parent;
  2151. public:
  2152. ReceiverThread(RoxieSocketQueueManager &_parent) : Thread("RoxieSocketQueueManager"), parent(_parent) {}
  2153. int run()
  2154. {
  2155. // Raise the priority so ibyti's get through in a timely fashion
  2156. #if defined( __linux__) || defined(__APPLE__)
  2157. setLinuxThreadPriority(3);
  2158. #else
  2159. adjustPriority(1);
  2160. #endif
  2161. roxiePacketReaderThread = GetCurrentThreadId();
  2162. return parent.run();
  2163. }
  2164. } readThread;
  2165. public:
  2166. RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), readThread(*this)
  2167. {
  2168. maxPacketSize = multicastSocket->get_max_send_size();
  2169. if ((maxPacketSize==0)||(maxPacketSize>65535))
  2170. maxPacketSize = 65535;
  2171. }
  2172. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  2173. {
  2174. if (throttledPacketSendManager)
  2175. throttledPacketSendManager->sendPacket(x, logctx);
  2176. else
  2177. {
  2178. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendPacket");
  2179. RoxiePacketHeader &header = x->queryHeader();
  2180. unsigned length = x->queryHeader().packetlength;
  2181. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  2182. StringBuffer s;
  2183. switch (header.retries & ROXIE_RETRIES_MASK)
  2184. {
  2185. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  2186. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  2187. break;
  2188. default:
  2189. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  2190. break;
  2191. case 0:
  2192. if (logctx.queryTraceLevel() > 8)
  2193. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  2194. break;
  2195. }
  2196. if (length > maxPacketSize)
  2197. {
  2198. StringBuffer s;
  2199. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  2200. }
  2201. Owned <ISerializedRoxieQueryPacket> serialized = x->serialize();
  2202. if (!channelWrite(serialized->queryHeader(), true))
  2203. logctx.CTXLOG("Roxie packet write wrote too little");
  2204. packetsSent++;
  2205. }
  2206. }
  2207. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) override
  2208. {
  2209. #ifdef SUBCHANNELS_IN_HEADER
  2210. if (!header.hasBuddies())
  2211. return;
  2212. #endif
  2213. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendIbyti");
  2214. RoxiePacketHeader ibytiHeader(header, header.activityId & ROXIE_PRIORITY_MASK, subChannel);
  2215. if (logctx.queryTraceLevel() > 8)
  2216. {
  2217. StringBuffer s; logctx.CTXLOG("Sending IBYTI packet %s", ibytiHeader.toString(s).str());
  2218. }
  2219. channelWrite(ibytiHeader, false); // don't send to self
  2220. ibytiPacketsSent++;
  2221. }
  2222. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx) override
  2223. {
  2224. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbort");
  2225. RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK, 0); // subChannel irrelevant - we are about to overwrite retries anyway
  2226. abortHeader.retries = QUERY_ABORTED;
  2227. if (logctx.queryTraceLevel() > 8)
  2228. {
  2229. StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
  2230. }
  2231. if (!channelWrite(abortHeader, true))
  2232. logctx.CTXLOG("sendAbort wrote too little");
  2233. abortsSent++;
  2234. }
  2235. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx) override
  2236. {
  2237. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbortCallback");
  2238. RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK, 0); // subChannel irrelevant - we are about to overwrite retries anyway
  2239. abortHeader.retries = QUERY_ABORTED;
  2240. abortHeader.packetlength += strlen(lfn)+1;
  2241. MemoryBuffer data;
  2242. data.append(sizeof(abortHeader), &abortHeader).append(lfn);
  2243. if (logctx.queryTraceLevel() > 5)
  2244. {
  2245. StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
  2246. }
  2247. Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
  2248. Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
  2249. if (!channelWrite(serialized->queryHeader(), true))
  2250. logctx.CTXLOG("sendAbortCallback wrote too little");
  2251. abortsSent++;
  2252. }
  2253. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
  2254. {
  2255. unsigned qnum = outOfBand ? 0 : ((header.retries & ROXIE_FASTLANE) || !fastLaneQueue) ? 1 : 2;
  2256. if (logctx.queryTraceLevel() > 8)
  2257. {
  2258. StringBuffer s; logctx.CTXLOG("Creating Output Stream for reply packet on Q=%d - %s", qnum, header.toString(s).str());
  2259. }
  2260. return sendManager->createMessagePacker(header.uid, header.getSequenceId(), &header, sizeof(RoxiePacketHeader), header.serverId, qnum);
  2261. }
  2262. virtual bool replyPending(RoxiePacketHeader &header)
  2263. {
  2264. return sendManager->dataQueued(header.uid, header.getSequenceId(), header.serverId);
  2265. }
  2266. virtual bool abortCompleted(RoxiePacketHeader &header)
  2267. {
  2268. return sendManager->abortData(header.uid, header.getSequenceId(), header.serverId);
  2269. }
  2270. bool abortRunning(RoxiePacketHeader &header, RoxieQueue &queue, bool checkRank, bool &preActivity)
  2271. {
  2272. bool queryFound = false;
  2273. bool ret = false;
  2274. Owned<IPooledThreadIterator> wi = queue.running();
  2275. ForEach(*wi)
  2276. {
  2277. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  2278. if (w.checkAbort(header, checkRank, queryFound, preActivity))
  2279. {
  2280. ret = true;
  2281. break;
  2282. }
  2283. else if (queryFound)
  2284. {
  2285. ret = false;
  2286. break;
  2287. }
  2288. }
  2289. if (!checkRank)
  2290. {
  2291. if (traceLevel > 8)
  2292. DBGLOG("discarding data for aborted query");
  2293. ROQ->abortCompleted(header);
  2294. }
  2295. return ret;
  2296. }
  2297. void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue)
  2298. {
  2299. assert(!localAgent);
  2300. bool preActivity = false;
  2301. #ifdef SUBCHANNELS_IN_HEADER
  2302. unsigned mySubChannel = header.mySubChannel();
  2303. #else
  2304. Owned<const ITopologyServer> topology = getTopology();
  2305. const ChannelInfo &channelInfo = topology->queryChannelInfo(header.channel);
  2306. unsigned mySubChannel = channelInfo.subChannel();
  2307. #endif
  2308. if (header.retries == QUERY_ABORTED)
  2309. {
  2310. bool foundInQ = false;
  2311. #ifdef NEW_IBYTI
  2312. foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
  2313. #endif
  2314. if (!foundInQ)
  2315. foundInQ = queue.remove(header);
  2316. if (!foundInQ)
  2317. abortRunning(header, queue, false, preActivity);
  2318. if (traceRoxiePackets || traceLevel > 10)
  2319. {
  2320. StringBuffer s;
  2321. DBGLOG("Abort activity %s", header.toString(s).str());
  2322. }
  2323. }
  2324. else
  2325. {
  2326. ibytiPacketsReceived++;
  2327. unsigned subChannel = header.getRespondingSubChannel();
  2328. if (subChannel == mySubChannel)
  2329. {
  2330. if (traceRoxiePackets || traceLevel > 10)
  2331. DBGLOG("doIBYTI packet was from self");
  2332. ibytiPacketsFromSelf++;
  2333. }
  2334. else
  2335. {
  2336. #ifndef SUBCHANNELS_IN_HEADER
  2337. channelInfo.noteChannelHealthy(subChannel);
  2338. #else
  2339. noteNodeHealthy(header.subChannels[subChannel]);
  2340. #endif
  2341. bool foundInQ = false;
  2342. #ifdef NEW_IBYTI
  2343. foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
  2344. #endif
  2345. if (!foundInQ)
  2346. foundInQ = queue.remove(header);
  2347. if (foundInQ)
  2348. {
  2349. if (traceRoxiePackets || traceLevel > 10)
  2350. {
  2351. StringBuffer s;
  2352. DBGLOG("Removed activity from Q : %s", header.toString(s).str());
  2353. }
  2354. ibytiPacketsWorked++;
  2355. return;
  2356. }
  2357. if (abortRunning(header, queue, true, preActivity))
  2358. {
  2359. if (traceRoxiePackets || traceLevel > 10)
  2360. {
  2361. StringBuffer s;
  2362. DBGLOG("Aborted running activity : %s", header.toString(s).str());
  2363. }
  2364. if (preActivity)
  2365. ibytiPacketsWorked++;
  2366. else
  2367. ibytiPacketsHalfWorked++;
  2368. return;
  2369. }
  2370. if (traceRoxiePackets || traceLevel > 10)
  2371. {
  2372. StringBuffer s;
  2373. DBGLOG("doIBYTI packet was too late (or too early) : %s", header.toString(s).str());
  2374. }
  2375. ibytiPacketsTooLate++; // meaning either I started and reserve the right to finish, or I finished already
  2376. if (IBYTIbufferSize)
  2377. queue.noteOrphanIBYTI(header);
  2378. }
  2379. }
  2380. }
  2381. void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue)
  2382. {
  2383. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  2384. // DO NOT put tracing on this thread except at very high tracelevels!
  2385. if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
  2386. doIbyti(header, queue);
  2387. else
  2388. {
  2389. if (!header.channel)
  2390. {
  2391. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  2392. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  2393. // Unfortunately this is bad news for dropping packets
  2394. // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending, except for some control messages like PING or UNLOAD
  2395. Owned<const ITopologyServer> topology = getTopology();
  2396. const std::vector<unsigned> channels = topology->queryChannels();
  2397. Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
  2398. for (unsigned i = 1; i < channels.size(); i++)
  2399. queue.enqueue(packet->cloneSerializedPacket(channels[i]));
  2400. header.channel = channels[0];
  2401. queue.enqueue(packet.getClear());
  2402. return;
  2403. }
  2404. #ifdef SUBCHANNELS_IN_HEADER
  2405. unsigned mySubchannel = header.mySubChannel();
  2406. #else
  2407. Owned<const ITopologyServer> topology = getTopology();
  2408. unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
  2409. #endif
  2410. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  2411. {
  2412. Owned<IRoxieQueryPacket> packet = deserializeCallbackPacket(mb);
  2413. if (traceLevel > 10)
  2414. {
  2415. StringBuffer s;
  2416. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  2417. }
  2418. doFileCallback(packet);
  2419. }
  2420. else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
  2421. {
  2422. if (traceRoxiePackets || traceLevel > 10)
  2423. {
  2424. StringBuffer s;
  2425. DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
  2426. }
  2427. ibytiPacketsTooLate--;
  2428. ibytiPacketsTooEarly++;
  2429. }
  2430. else
  2431. {
  2432. Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
  2433. AgentContextLogger logctx(packet);
  2434. unsigned retries = header.thisChannelRetries(mySubchannel);
  2435. if (retries)
  2436. {
  2437. // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
  2438. assertex(header.channel); // should never see a retry on channel 0
  2439. if (retries >= SUBCHANNEL_MASK)
  2440. return; // someone sent a failure or something - ignore it
  2441. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  2442. if (!(testAgentFailure & 0x800))
  2443. {
  2444. RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
  2445. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  2446. output->flush();
  2447. }
  2448. // If it's a retry, look it up against already running, or output stream, or input queue
  2449. // if found, send an IBYTI and discard retry request
  2450. if (!mySubchannel)
  2451. retriesReceivedPrm++;
  2452. else
  2453. retriesReceivedSec++;
  2454. bool alreadyRunning = false;
  2455. Owned<IPooledThreadIterator> wi = queue.running();
  2456. ForEach(*wi)
  2457. {
  2458. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  2459. if (w.match(header))
  2460. {
  2461. alreadyRunning = true;
  2462. if (!mySubchannel)
  2463. retriesIgnoredPrm++;
  2464. else
  2465. retriesIgnoredSec++;
  2466. ROQ->sendIbyti(header, logctx, mySubchannel);
  2467. if (logctx.queryTraceLevel() > 10)
  2468. {
  2469. StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
  2470. }
  2471. break;
  2472. }
  2473. }
  2474. if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
  2475. {
  2476. alreadyRunning = true;
  2477. if (!mySubchannel)
  2478. retriesIgnoredPrm++;
  2479. else
  2480. retriesIgnoredSec++;
  2481. ROQ->sendIbyti(header, logctx, mySubchannel);
  2482. if (logctx.queryTraceLevel() > 10)
  2483. {
  2484. StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
  2485. }
  2486. }
  2487. if (!alreadyRunning)
  2488. {
  2489. if (logctx.queryTraceLevel() > 10)
  2490. {
  2491. StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
  2492. }
  2493. queue.enqueueUnique(packet.getClear(), mySubchannel);
  2494. }
  2495. }
  2496. else // first time (not a retry).
  2497. {
  2498. #ifdef NEW_IBYTI
  2499. if (mySubchannel != 0) // i.e. I am not the primary here
  2500. {
  2501. unsigned delay = 0;
  2502. for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++)
  2503. delay += getIbytiDelay(header.subChannels[subChannel]);
  2504. delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
  2505. }
  2506. else
  2507. #endif
  2508. queue.enqueue(packet.getClear());
  2509. }
  2510. }
  2511. }
  2512. }
  2513. int run()
  2514. {
  2515. if (traceLevel)
  2516. DBGLOG("RoxieSocketQueueManager::run() starting: doIbytiDelay=%s minIbytiDelay=%u initIbytiDelay=%u",
  2517. doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay);
  2518. for (;;)
  2519. {
  2520. MemoryBuffer mb;
  2521. try
  2522. {
  2523. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  2524. // DO NOT put tracing on this thread except at very high tracelevels!
  2525. #ifdef NEW_IBYTI
  2526. unsigned timeout = delayed.timeout(msTick());
  2527. if (timeout>5000)
  2528. timeout = 5000;
  2529. #else
  2530. unsigned timeout = 5000;
  2531. #endif
  2532. unsigned l;
  2533. multicastSocket->readtms(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, timeout);
  2534. mb.setLength(l);
  2535. packetsReceived++;
  2536. RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
  2537. if (l != header.packetlength)
  2538. DBGLOG("sock->read returned %d but packetlength was %d", l, header.packetlength);
  2539. if (traceRoxiePackets || traceLevel > 10)
  2540. {
  2541. StringBuffer s;
  2542. DBGLOG("Read roxie packet: %s", header.toString(s).str());
  2543. }
  2544. if (header.activityId & ROXIE_SLA_PRIORITY)
  2545. processMessage(mb, header, slaQueue);
  2546. else if (header.activityId & ROXIE_HIGH_PRIORITY)
  2547. processMessage(mb, header, hiQueue);
  2548. else
  2549. processMessage(mb, header, loQueue);
  2550. }
  2551. catch (IException *E)
  2552. {
  2553. if (running)
  2554. {
  2555. // MORE: Maybe we should utilize IException::errorCode - not just text ??
  2556. if (E->errorCode()==JSOCKERR_timeout_expired)
  2557. E->Release();
  2558. else if (roxiemem::memPoolExhausted())
  2559. {
  2560. //MORE: I think this should probably be based on the error code instead.
  2561. EXCLOG(E, "Exception reading or processing roxie packet");
  2562. E->Release();
  2563. MilliSleep(1000); // Give a chance for mem free
  2564. }
  2565. else
  2566. {
  2567. EXCLOG(E, "Exception reading or processing roxie packet");
  2568. E->Release();
  2569. // MORE: Protect with try logic, in case udp_create throws exception ?
  2570. // What to do if create fails (ie exception is caught) ?
  2571. if (multicastSocket)
  2572. {
  2573. multicastSocket->close();
  2574. multicastSocket.clear();
  2575. openMulticastSocket();
  2576. }
  2577. }
  2578. }
  2579. else
  2580. {
  2581. E->Release();
  2582. break;
  2583. }
  2584. }
  2585. #ifdef NEW_IBYTI
  2586. delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
  2587. #endif
  2588. }
  2589. return 0;
  2590. }
  2591. void start()
  2592. {
  2593. RoxieReceiverBase::start();
  2594. running = true;
  2595. readThread.start();
  2596. }
  2597. void stop()
  2598. {
  2599. if (running)
  2600. {
  2601. running = false;
  2602. multicastSocket->close();
  2603. }
  2604. RoxieReceiverBase::stop();
  2605. }
  2606. void join()
  2607. {
  2608. readThread.join();
  2609. RoxieReceiverBase::join();
  2610. }
  2611. virtual IReceiveManager *queryReceiveManager()
  2612. {
  2613. return receiveManager;
  2614. }
  2615. };
  2616. class RoxieUdpSocketQueueManager : public RoxieSocketQueueManager
  2617. {
  2618. public:
  2619. RoxieUdpSocketQueueManager(unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers)
  2620. {
  2621. unsigned udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
  2622. unsigned udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
  2623. unsigned udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
  2624. if (topology->getPropInt("@sendMaxRate", 0))
  2625. {
  2626. unsigned sendMaxRate = topology->getPropInt("@sendMaxRate");
  2627. unsigned sendMaxRatePeriod = topology->getPropInt("@sendMaxRatePeriod", 1);
  2628. bucket.setown(new TokenBucket(sendMaxRate, sendMaxRatePeriod, sendMaxRate));
  2629. throttledPacketSendManager.setown(new RoxieThrottledPacketSender(*bucket, maxPacketSize));
  2630. }
  2631. if (udpMaxSlotsPerClient > udpQueueSize)
  2632. udpMaxSlotsPerClient = udpQueueSize;
  2633. if (udpResendEnabled && udpMaxSlotsPerClient > TRACKER_BITS)
  2634. udpMaxSlotsPerClient = TRACKER_BITS;
  2635. unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
  2636. unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
  2637. unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
  2638. receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, udpQueueSize, udpMaxSlotsPerClient, encryptionInTransit));
  2639. sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
  2640. }
  2641. };
  2642. class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager
  2643. {
  2644. public:
  2645. RoxieAeronSocketQueueManager(unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers)
  2646. {
  2647. unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
  2648. SocketEndpoint ep(dataPort, myNode.getIpAddress());
  2649. receiveManager.setown(createAeronReceiveManager(ep, encryptionInTransit));
  2650. assertex(!myNode.getIpAddress().isNull());
  2651. sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getIpAddress(), encryptionInTransit));
  2652. }
  2653. };
  2654. #ifdef _MSC_VER
  2655. #pragma warning( pop )
  2656. #endif
  2657. //==================================================================================================
  2658. interface ILocalMessageCollator : extends IMessageCollator
  2659. {
  2660. virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0;
  2661. };
  2662. interface ILocalReceiveManager : extends IReceiveManager
  2663. {
  2664. virtual ILocalMessageCollator *lookupCollator(ruid_t id) = 0;
  2665. };
  2666. class LocalMessagePacker : public CDummyMessagePacker
  2667. {
  2668. MemoryBuffer meta;
  2669. MemoryBuffer header;
  2670. Linked<ILocalReceiveManager> rm;
  2671. ruid_t id;
  2672. bool outOfBand;
  2673. public:
  2674. IMPLEMENT_IINTERFACE;
  2675. LocalMessagePacker(RoxiePacketHeader &_header, bool _outOfBand, ILocalReceiveManager *_rm) : rm(_rm), outOfBand(_outOfBand)
  2676. {
  2677. id = _header.uid;
  2678. header.append(sizeof(RoxiePacketHeader), &_header);
  2679. }
  2680. virtual void flush() override;
  2681. virtual void sendMetaInfo(const void *buf, unsigned len) override
  2682. {
  2683. meta.append(len, buf);
  2684. }
  2685. };
  2686. class CLocalMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface
  2687. {
  2688. void *data;
  2689. unsigned datalen;
  2690. unsigned pos;
  2691. Linked<IRowManager> rowManager;
  2692. public:
  2693. IMPLEMENT_IINTERFACE;
  2694. CLocalMessageUnpackCursor(IRowManager *_rowManager, void *_data, unsigned _datalen)
  2695. : rowManager(_rowManager)
  2696. {
  2697. datalen = _datalen;
  2698. data = _data;
  2699. pos = 0;
  2700. }
  2701. ~CLocalMessageUnpackCursor()
  2702. {
  2703. }
  2704. virtual bool atEOF() const
  2705. {
  2706. return datalen==pos;
  2707. }
  2708. virtual bool isSerialized() const
  2709. {
  2710. // NOTE: tempting to think that we could avoid serializing in localAgent case, but have to be careful about the lifespan of the rowManager...
  2711. return true;
  2712. }
  2713. virtual const void * getNext(int length)
  2714. {
  2715. if (pos==datalen)
  2716. return NULL;
  2717. assertex(pos + length <= datalen);
  2718. void * cur = ((char *) data) + pos;
  2719. pos += length;
  2720. void * ret = rowManager->allocate(length, 0);
  2721. memcpy(ret, cur, length);
  2722. //No need for finalize since only contains plain data.
  2723. return ret;
  2724. }
  2725. };
  2726. class CLocalMessageResult : implements IMessageResult, public CInterface
  2727. {
  2728. void *data;
  2729. void *meta;
  2730. void *header;
  2731. unsigned datalen, metalen, headerlen;
  2732. unsigned pos;
  2733. public:
  2734. IMPLEMENT_IINTERFACE;
  2735. CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen)
  2736. {
  2737. datalen = _datalen;
  2738. metalen = _metalen;
  2739. headerlen = _headerlen;
  2740. data = _data;
  2741. meta = _meta;
  2742. header = _header;
  2743. pos = 0;
  2744. }
  2745. ~CLocalMessageResult()
  2746. {
  2747. free(data);
  2748. free(meta);
  2749. free(header);
  2750. }
  2751. virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
  2752. {
  2753. return new CLocalMessageUnpackCursor(rowMgr, data, datalen);
  2754. }
  2755. virtual const void *getMessageHeader(unsigned &length) const
  2756. {
  2757. length = headerlen;
  2758. return header;
  2759. }
  2760. virtual const void *getMessageMetadata(unsigned &length) const
  2761. {
  2762. length = metalen;
  2763. return meta;
  2764. }
  2765. virtual void discard() const
  2766. {
  2767. }
  2768. };
  2769. class CLocalMessageCollator : implements ILocalMessageCollator, public CInterface
  2770. {
  2771. InterruptableSemaphore sem;
  2772. QueueOf<IMessageResult, false> pending;
  2773. CriticalSection crit;
  2774. Linked<IRowManager> rowManager; // Linked to ensure it lives longer than me
  2775. Linked<ILocalReceiveManager> receiveManager;
  2776. ruid_t id;
  2777. unsigned totalBytesReceived;
  2778. public:
  2779. IMPLEMENT_IINTERFACE;
  2780. CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid);
  2781. ~CLocalMessageCollator();
  2782. virtual ruid_t queryRUID() const
  2783. {
  2784. return id;
  2785. }
  2786. virtual IMessageResult* getNextResult(unsigned time_out, bool &anyActivity)
  2787. {
  2788. anyActivity = false;
  2789. if (!sem.wait(time_out))
  2790. return NULL;
  2791. anyActivity = true;
  2792. CriticalBlock c(crit);
  2793. return pending.dequeue();
  2794. }
  2795. virtual void interrupt(IException *E)
  2796. {
  2797. sem.interrupt(E);
  2798. }
  2799. virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen)
  2800. {
  2801. CriticalBlock c(crit);
  2802. if (outOfBand)
  2803. pending.enqueueHead(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
  2804. else
  2805. pending.enqueue(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
  2806. sem.signal();
  2807. totalBytesReceived += datalen + metalen + headerlen;
  2808. }
  2809. virtual unsigned queryBytesReceived() const
  2810. {
  2811. return totalBytesReceived;
  2812. }
  2813. virtual unsigned queryDuplicates() const
  2814. {
  2815. return 0;
  2816. }
  2817. virtual unsigned queryResends() const
  2818. {
  2819. return 0;
  2820. }
  2821. };
  2822. class RoxieLocalReceiveManager : implements ILocalReceiveManager, public CInterface
  2823. {
  2824. MapXToMyClass<ruid_t, ruid_t, ILocalMessageCollator> collators;
  2825. CriticalSection crit;
  2826. Owned<StringContextLogger> logctx;
  2827. public:
  2828. IMPLEMENT_IINTERFACE;
  2829. RoxieLocalReceiveManager() : logctx(new StringContextLogger("RoxieLocalReceiveManager"))
  2830. {
  2831. }
  2832. virtual IMessageCollator *createMessageCollator(IRowManager *manager, ruid_t ruid)
  2833. {
  2834. ILocalMessageCollator *collator = new CLocalMessageCollator(manager, ruid);
  2835. CriticalBlock b(crit);
  2836. collators.setValue(ruid, collator);
  2837. return collator;
  2838. }
  2839. virtual void detachCollator(const IMessageCollator *collator)
  2840. {
  2841. ruid_t id = collator->queryRUID();
  2842. CriticalBlock b(crit);
  2843. collators.setValue(id, NULL);
  2844. }
  2845. virtual ILocalMessageCollator *lookupCollator(ruid_t id)
  2846. {
  2847. CriticalBlock b(crit);
  2848. ILocalMessageCollator *ret = collators.getValue(id);
  2849. if (!ret)
  2850. ret = collators.getValue(RUID_DISCARD);
  2851. return LINK(ret);
  2852. }
  2853. };
  2854. void LocalMessagePacker::flush()
  2855. {
  2856. // MORE - I think this means we don't send anything until whole message available in localAgent mode, which
  2857. // may not be optimal.
  2858. data.setLength(lastput);
  2859. Owned<ILocalMessageCollator> collator = rm->lookupCollator(id);
  2860. if (collator)
  2861. {
  2862. unsigned datalen = data.length();
  2863. unsigned metalen = meta.length();
  2864. unsigned headerlen = header.length();
  2865. collator->enqueueMessage(outOfBand, data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen);
  2866. }
  2867. // otherwise Roxie server is no longer interested and we can simply discard
  2868. }
  2869. CLocalMessageCollator::CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid)
  2870. : rowManager(_rowManager), id(_ruid)
  2871. {
  2872. totalBytesReceived = 0;
  2873. }
  2874. CLocalMessageCollator::~CLocalMessageCollator()
  2875. {
  2876. IMessageResult *goer;
  2877. for (;;)
  2878. {
  2879. goer = pending.dequeue();
  2880. if (!goer)
  2881. break;
  2882. goer->Release();
  2883. }
  2884. }
  2885. class RoxieLocalQueueManager : public RoxieReceiverBase
  2886. {
  2887. Linked<RoxieLocalReceiveManager> receiveManager;
  2888. public:
  2889. RoxieLocalQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers)
  2890. {
  2891. receiveManager.setown(new RoxieLocalReceiveManager);
  2892. }
  2893. virtual void sendPacket(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx) override
  2894. {
  2895. RoxiePacketHeader &header = packet->queryHeader();
  2896. unsigned retries = header.thisChannelRetries(0);
  2897. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  2898. {
  2899. if (traceLevel > 5)
  2900. {
  2901. StringBuffer s;
  2902. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  2903. }
  2904. // MORE - do we need to encrypt these?
  2905. doFileCallback(packet);
  2906. }
  2907. else if (retries < SUBCHANNEL_MASK)
  2908. {
  2909. if (retries)
  2910. {
  2911. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  2912. RoxiePacketHeader newHeader(header, ROXIE_ALIVE, 0);
  2913. Owned<IMessagePacker> output = createOutputStream(newHeader, true, logctx);
  2914. output->flush();
  2915. return; // No point sending the retry in localAgent mode
  2916. }
  2917. RoxieQueue *targetQueue;
  2918. if (header.activityId & ROXIE_SLA_PRIORITY)
  2919. targetQueue = &slaQueue;
  2920. else if (header.activityId & ROXIE_HIGH_PRIORITY)
  2921. targetQueue = &hiQueue;
  2922. else
  2923. targetQueue = &loQueue;
  2924. Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
  2925. if (header.channel)
  2926. {
  2927. targetQueue->enqueue(serialized.getClear());
  2928. }
  2929. else
  2930. {
  2931. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  2932. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  2933. for (unsigned i = 1; i < numChannels; i++)
  2934. targetQueue->enqueue(serialized->cloneSerializedPacket(i+1));
  2935. header.channel = 1;
  2936. targetQueue->enqueue(serialized.getClear());
  2937. }
  2938. }
  2939. }
  2940. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) override
  2941. {
  2942. // Don't do IBYTI's when local agent - no buddy to talk to anyway
  2943. }
  2944. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx) override
  2945. {
  2946. MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbort");
  2947. RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK, 0);
  2948. abortHeader.retries = QUERY_ABORTED;
  2949. if (logctx.queryTraceLevel() > 8)
  2950. {
  2951. StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
  2952. }
  2953. MemoryBuffer data;
  2954. data.append(sizeof(abortHeader), &abortHeader);
  2955. Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
  2956. sendPacket(packet, logctx);
  2957. abortsSent++;
  2958. }
  2959. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx) override
  2960. {
  2961. MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbortCallback");
  2962. RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK, 0);
  2963. abortHeader.retries = QUERY_ABORTED;
  2964. MemoryBuffer data;
  2965. data.append(sizeof(abortHeader), &abortHeader).append(lfn);
  2966. if (logctx.queryTraceLevel() > 5)
  2967. {
  2968. StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
  2969. }
  2970. Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
  2971. sendPacket(packet, logctx);
  2972. abortsSent++;
  2973. }
  2974. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx) override
  2975. {
  2976. return new LocalMessagePacker(header, outOfBand, receiveManager);
  2977. }
  2978. virtual IReceiveManager *queryReceiveManager() override
  2979. {
  2980. return receiveManager;
  2981. }
  2982. virtual bool replyPending(RoxiePacketHeader &header) override
  2983. {
  2984. // MORE - should really have some code here! But returning true is a reasonable approximation.
  2985. return true;
  2986. }
  2987. virtual bool abortCompleted(RoxiePacketHeader &header) override
  2988. {
  2989. // MORE - should really have some code here!
  2990. return false;
  2991. }
  2992. };
  2993. IRoxieOutputQueueManager *ROQ;
  2994. extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned numWorkers, bool encrypted)
  2995. {
  2996. if (localAgent)
  2997. return new RoxieLocalQueueManager(numWorkers);
  2998. else if (useAeron)
  2999. return new RoxieAeronSocketQueueManager(numWorkers, encrypted);
  3000. else
  3001. return new RoxieUdpSocketQueueManager(numWorkers, encrypted);
  3002. }
  3003. //================================================================================================================================
  3004. class PacketDiscarder : public Thread, implements IPacketDiscarder
  3005. {
  3006. bool aborted;
  3007. Owned<IRowManager> rowManager; // not completely sure I need one... maybe I do
  3008. Owned<IMessageCollator> mc;
  3009. public:
  3010. IMPLEMENT_IINTERFACE;
  3011. PacketDiscarder()
  3012. {
  3013. aborted = false;
  3014. };
  3015. ~PacketDiscarder()
  3016. {
  3017. if (mc)
  3018. ROQ->queryReceiveManager()->detachCollator(mc);
  3019. mc.clear();
  3020. }
  3021. virtual int run()
  3022. {
  3023. Owned<StringContextLogger> logctx = new StringContextLogger("PacketDiscarder");
  3024. rowManager.setown(roxiemem::createRowManager(0, NULL, *logctx, NULL, false));
  3025. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  3026. try
  3027. {
  3028. while (!aborted)
  3029. {
  3030. bool anyActivity = false;
  3031. Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
  3032. if (mr)
  3033. {
  3034. if (traceLevel > 4)
  3035. DBGLOG("Discarding unwanted message");
  3036. unsigned headerLen;
  3037. const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  3038. if (headerLen)
  3039. {
  3040. switch (header.activityId)
  3041. {
  3042. case ROXIE_FILECALLBACK:
  3043. {
  3044. Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
  3045. OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
  3046. if (len)
  3047. {
  3048. RecordLengthType *rowlen = (RecordLengthType *) len.get();
  3049. OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
  3050. const char *rowdata = (const char *) row.get();
  3051. // bool isOpt = * (bool *) rowdata;
  3052. // bool isLocal = * (bool *) (rowdata+1);
  3053. ROQ->sendAbortCallback(header, rowdata+2, *logctx);
  3054. }
  3055. else
  3056. DBGLOG("Unrecognized format in discarded file callback");
  3057. break;
  3058. }
  3059. // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
  3060. }
  3061. }
  3062. else
  3063. DBGLOG("Unwanted message had no header?!");
  3064. }
  3065. else if (!anyActivity)
  3066. {
  3067. // to avoid leaking partial unwanted packets, we clear out mc periodically...
  3068. ROQ->queryReceiveManager()->detachCollator(mc);
  3069. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  3070. }
  3071. }
  3072. }
  3073. catch (IException * E)
  3074. {
  3075. if (!aborted || QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
  3076. EXCLOG(E);
  3077. ::Release(E);
  3078. }
  3079. return 0;
  3080. }
  3081. virtual void start()
  3082. {
  3083. Thread::start();
  3084. }
  3085. virtual void stop()
  3086. {
  3087. if (mc)
  3088. mc->interrupt();
  3089. aborted = true;
  3090. join();
  3091. }
  3092. };
  3093. IPacketDiscarder *createPacketDiscarder()
  3094. {
  3095. IPacketDiscarder *packetDiscarder = new PacketDiscarder;
  3096. packetDiscarder->start();
  3097. return packetDiscarder;
  3098. }
  3099. //================================================================================================================================
  3100. // There are various possibly interesting ways to reply to a ping:
  3101. // Reply as soon as receive, or put it on the queue like other messages?
  3102. // Reply for every channel, or just once for every agent?
  3103. // Should I send on channel 0 or round-robin the channels?
  3104. // My gut feeling is that knowing what channels are responding is useful so should reply on every unsuspended channel,
  3105. // and that the delay caused by queuing system is an interesting part of what we want to measure (though nice to know minimum possible too)
  3106. unsigned pingInterval = 60;
  3107. class PingTimer : public Thread
  3108. {
  3109. bool aborted;
  3110. Owned<IRowManager> rowManager;
  3111. Owned<IMessageCollator> mc;
  3112. StringContextLogger logctx;
  3113. void sendPing(unsigned priorityMask)
  3114. {
  3115. try
  3116. {
  3117. RemoteActivityId pingId(ROXIE_PING | priorityMask, 0);
  3118. RoxiePacketHeader header(pingId, 0, 0, 0);
  3119. MemoryBuffer mb;
  3120. mb.append(sizeof(RoxiePacketHeader), &header);
  3121. mb.append((char) LOGGING_FLAGSPRESENT);
  3122. mb.append("PING");
  3123. PingRecord data;
  3124. data.senderIP.ipset(myNode.getIpAddress());
  3125. data.tick = usTick();
  3126. mb.append(sizeof(PingRecord), &data);
  3127. if (traceLevel > 1)
  3128. DBGLOG("PING sent");
  3129. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  3130. ROQ->sendPacket(packet, logctx);
  3131. }
  3132. catch (IException *E)
  3133. {
  3134. EXCLOG(E);
  3135. E->Release();
  3136. }
  3137. }
  3138. public:
  3139. PingTimer() : logctx("PingTimer")
  3140. {
  3141. aborted = false;
  3142. };
  3143. ~PingTimer()
  3144. {
  3145. if (mc)
  3146. ROQ->queryReceiveManager()->detachCollator(mc);
  3147. mc.clear();
  3148. }
  3149. virtual int run()
  3150. {
  3151. rowManager.setown(roxiemem::createRowManager(1, NULL, queryDummyContextLogger(), NULL, false));
  3152. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_PING));
  3153. unsigned pingsReceived = 0;
  3154. unsigned pingsElapsed = 0;
  3155. sendPing(ROXIE_HIGH_PRIORITY);
  3156. while (!aborted)
  3157. {
  3158. bool anyActivity = false;
  3159. Owned<IMessageResult> mr = mc->getNextResult(pingInterval*1000, anyActivity);
  3160. if (mr)
  3161. {
  3162. unsigned headerLen;
  3163. const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  3164. Owned<IMessageUnpackCursor> mu = mr->getCursor(rowManager);
  3165. PingRecord *answer = (PingRecord *) mu->getNext(sizeof(PingRecord));
  3166. if (answer && mu->atEOF() && headerLen==sizeof(RoxiePacketHeader))
  3167. {
  3168. unsigned elapsed = usTick() - answer->tick;
  3169. pingsReceived++;
  3170. pingsElapsed += elapsed;
  3171. if (traceLevel > 10)
  3172. DBGLOG("PING reply channel=%d, time %d", header->channel, elapsed); // DBGLOG is slower than the pings so be careful!
  3173. }
  3174. else
  3175. DBGLOG("PING reply, garbled result");
  3176. ReleaseRoxieRow(answer);
  3177. }
  3178. else if (!anyActivity)
  3179. {
  3180. if (!pingsReceived && roxieMulticastEnabled)
  3181. DBGLOG("PING: NO replies received! Please check multicast settings, and that your network supports multicast.");
  3182. else if (traceLevel)
  3183. DBGLOG("PING: %d replies received, average delay %uus", pingsReceived, pingsReceived ? pingsElapsed / pingsReceived : 0);
  3184. pingsReceived = 0;
  3185. pingsElapsed = 0;
  3186. sendPing(ROXIE_HIGH_PRIORITY); // MORE - we could think about alternating the priority or sending pings on high and low at the same time...
  3187. }
  3188. }
  3189. return 0;
  3190. }
  3191. void stop()
  3192. {
  3193. if (mc)
  3194. mc->interrupt();
  3195. aborted = true;
  3196. }
  3197. static CriticalSection crit;
  3198. } *pingTimer;
  3199. CriticalSection PingTimer::crit;
  3200. extern void startPingTimer()
  3201. {
  3202. CriticalBlock b(PingTimer::crit);
  3203. if (!pingTimer)
  3204. {
  3205. pingTimer = new PingTimer();
  3206. pingTimer->start();
  3207. }
  3208. }
  3209. extern void stopPingTimer()
  3210. {
  3211. CriticalBlock b(PingTimer::crit);
  3212. if (pingTimer)
  3213. {
  3214. pingTimer->stop();
  3215. pingTimer->join();
  3216. delete pingTimer;
  3217. pingTimer = NULL;
  3218. }
  3219. }