ccdqueue.cpp 108 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include <jio.hpp>
  16. #include <jqueue.tpp>
  17. #include <jsocket.hpp>
  18. #include <jlog.hpp>
  19. #include "jisem.hpp"
  20. #include "udplib.hpp"
  21. #include "ccd.hpp"
  22. #include "ccddebug.hpp"
  23. #include "ccdquery.hpp"
  24. #include "ccdstate.hpp"
  25. #include "ccdqueue.ipp"
  26. #include "ccdsnmp.hpp"
  27. #ifdef _USE_CPPUNIT
  28. #include <cppunit/extensions/HelperMacros.h>
  29. #endif
  30. static unsigned channels[MAX_CLUSTER_SIZE]; // Array [0..channelCount-1] of channels supported by this slave
  31. static unsigned channelCount; // number of channels supported by this slave
  32. //The following are all indexed by the channel number [1..numChannels]
  33. static unsigned subChannels[MAX_CLUSTER_SIZE]; // The subchannel that this node supports on the specified channel. NOTE - values in this array are 1-based
  34. static unsigned numSlaves[MAX_CLUSTER_SIZE]; // The total number of slaves in the system that process this channel
  35. static unsigned replicationLevel[MAX_CLUSTER_SIZE]; // Which copy of the data is held by this node, for a given channel - MORE - is this the same as subChannels[n]-1 ? I suspect it is.
  36. static std::atomic<bool> suspendedChannels[MAX_CLUSTER_SIZE];
  37. using roxiemem::OwnedRoxieRow;
  38. using roxiemem::OwnedConstRoxieRow;
  39. using roxiemem::IRowManager;
  40. using roxiemem::DataBuffer;
  41. //============================================================================================
  42. RoxiePacketHeader::RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  43. {
  44. packetlength = sizeof(RoxiePacketHeader);
  45. #ifdef TIME_PACKETS
  46. tick = 0;
  47. #endif
  48. init(_remoteId, _uid, _channel, _overflowSequence);
  49. }
  50. RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _activityId)
  51. {
  52. // Used to create the header to send a callback to originating server or an IBYTI to a buddy
  53. activityId = _activityId;
  54. uid = source.uid;
  55. queryHash = source.queryHash;
  56. serverIdx = source.serverIdx;
  57. channel = source.channel;
  58. overflowSequence = source.overflowSequence;
  59. continueSequence = source.continueSequence;
  60. if (_activityId >= ROXIE_ACTIVITY_SPECIAL_FIRST && _activityId <= ROXIE_ACTIVITY_SPECIAL_LAST)
  61. overflowSequence |= OUTOFBAND_SEQUENCE; // Need to make sure it is not treated as dup of actual reply in the udp layer
  62. retries = getSubChannelMask(channel) | (source.retries & ~ROXIE_RETRIES_MASK);
  63. #ifdef TIME_PACKETS
  64. tick = source.tick;
  65. #endif
  66. packetlength = sizeof(RoxiePacketHeader);
  67. }
  68. unsigned RoxiePacketHeader::getSubChannelMask(unsigned channel)
  69. {
  70. unsigned subChannel = subChannels[channel] - 1;
  71. return SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  72. }
  73. unsigned RoxiePacketHeader::priorityHash() const
  74. {
  75. // Used to determine which slave to act as primary and which as secondary for a given packet (thus spreading the load)
  76. // It's important that we do NOT include channel (since that would result in different values for the different slaves responding to a broadcast)
  77. // We also don't include continueSequence since we'd prefer continuations to go the same way as original
  78. unsigned hash = hashc((const unsigned char *) &serverIdx, sizeof(serverIdx), 0);
  79. hash = hashc((const unsigned char *) &uid, sizeof(uid), hash);
  80. hash += overflowSequence; // MORE - is this better than hashing?
  81. if (traceLevel > 9)
  82. {
  83. StringBuffer s;
  84. DBGLOG("Calculating hash: %s hash was %d", toString(s).str(), hash);
  85. }
  86. return hash;
  87. }
  88. bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
  89. {
  90. // used when matching up a kill packet against a pending one...
  91. // DO NOT compare activityId - they are not supposed to match, since 0 in activityid identifies ibyti!
  92. return
  93. oh.uid==uid &&
  94. (oh.overflowSequence & ~OUTOFBAND_SEQUENCE) == (overflowSequence & ~OUTOFBAND_SEQUENCE) &&
  95. oh.continueSequence == continueSequence &&
  96. oh.serverIdx==serverIdx &&
  97. oh.channel==channel;
  98. }
  99. void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
  100. {
  101. retries = 0;
  102. activityId = _remoteId.activityId;
  103. queryHash = _remoteId.queryHash;
  104. uid = _uid;
  105. serverIdx = myNodeIndex;
  106. channel = _channel;
  107. overflowSequence = _overflowSequence;
  108. continueSequence = 0;
  109. }
  110. StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
  111. {
  112. const IpAddress &serverIP = getNodeAddress(serverIdx);
  113. ret.appendf("uid=" RUIDF " activityId=", uid);
  114. switch(activityId & ~ROXIE_PRIORITY_MASK)
  115. {
  116. case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
  117. case ROXIE_PING: ret.append("ROXIE_PING"); break;
  118. case ROXIE_TRACEINFO: ret.append("ROXIE_TRACEINFO"); break;
  119. case ROXIE_DEBUGREQUEST: ret.append("ROXIE_DEBUGREQUEST"); break;
  120. case ROXIE_DEBUGCALLBACK: ret.append("ROXIE_DEBUGCALLBACK"); break;
  121. case ROXIE_FILECALLBACK: ret.append("ROXIE_FILECALLBACK"); break;
  122. case ROXIE_ALIVE: ret.append("ROXIE_ALIVE"); break;
  123. case ROXIE_KEYEDLIMIT_EXCEEDED: ret.append("ROXIE_KEYEDLIMIT_EXCEEDED"); break;
  124. case ROXIE_LIMIT_EXCEEDED: ret.append("ROXIE_LIMIT_EXCEEDED"); break;
  125. case ROXIE_EXCEPTION: ret.append("ROXIE_EXCEPTION"); break;
  126. default:
  127. ret.appendf("%u", (activityId & ~(ROXIE_ACTIVITY_FETCH | ROXIE_PRIORITY_MASK)));
  128. if (activityId & ROXIE_ACTIVITY_FETCH)
  129. ret.appendf(" (fetch part)");
  130. break;
  131. }
  132. ret.append(" pri=");
  133. switch(activityId & ROXIE_PRIORITY_MASK)
  134. {
  135. case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
  136. case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
  137. case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
  138. default: ret.append("???"); break;
  139. }
  140. ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
  141. serverIP.getIpText(ret);
  142. if (retries)
  143. {
  144. if (retries==QUERY_ABORTED)
  145. ret.append(" retries=QUERY_ABORTED");
  146. else
  147. {
  148. if (retries & ROXIE_RETRIES_MASK)
  149. ret.appendf(" retries=%04x", retries);
  150. if (retries & ROXIE_FASTLANE)
  151. ret.appendf(" FASTLANE");
  152. if (retries & ROXIE_BROADCAST)
  153. ret.appendf(" BROADCAST");
  154. }
  155. }
  156. return ret;
  157. }
  158. bool RoxiePacketHeader::allChannelsFailed()
  159. {
  160. unsigned mask = (1 << (numSlaves[channel] * SUBCHANNEL_BITS)) - 1;
  161. return (retries & mask) == mask;
  162. }
  163. bool RoxiePacketHeader::retry()
  164. {
  165. bool worthRetrying = false;
  166. unsigned mask = SUBCHANNEL_MASK;
  167. for (unsigned subChannel = 0; subChannel < numSlaves[channel]; subChannel++)
  168. {
  169. unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
  170. if (subRetries != SUBCHANNEL_MASK)
  171. subRetries++;
  172. if (subRetries != SUBCHANNEL_MASK)
  173. worthRetrying = true;
  174. retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
  175. mask <<= SUBCHANNEL_BITS;
  176. }
  177. return worthRetrying;
  178. }
  179. void RoxiePacketHeader::setException()
  180. {
  181. unsigned subChannel = subChannels[channel] - 1;
  182. retries |= SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
  183. }
  184. unsigned RoxiePacketHeader::thisChannelRetries()
  185. {
  186. unsigned shift = SUBCHANNEL_BITS * (subChannels[channel] - 1);
  187. unsigned mask = SUBCHANNEL_MASK << shift;
  188. return (retries & mask) >> shift;
  189. }
  190. //============================================================================================
  191. void addSlaveChannel(unsigned channel, unsigned level)
  192. {
  193. StringBuffer xpath;
  194. xpath.appendf("RoxieSlaveProcess[@channel=\"%d\"]", channel);
  195. if (ccdChannels->hasProp(xpath.str()))
  196. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated", channel);
  197. IPropertyTree *ci = ccdChannels->addPropTree("RoxieSlaveProcess");
  198. ci->setPropInt("@channel", channel);
  199. ci->setPropInt("@subChannel", numSlaves[channel]);
  200. }
  201. void addChannel(unsigned nodeNumber, unsigned channel, unsigned level)
  202. {
  203. numSlaves[channel]++;
  204. if (nodeNumber == myNodeIndex && channel > 0)
  205. {
  206. assertex(channel <= numChannels);
  207. assertex(!replicationLevel[channel]);
  208. replicationLevel[channel] = level;
  209. addSlaveChannel(channel, level);
  210. }
  211. if (!localSlave)
  212. {
  213. addEndpoint(channel, getNodeAddress(nodeNumber), ccdMulticastPort);
  214. }
  215. }
  216. unsigned getReplicationLevel(unsigned channel)
  217. {
  218. return replicationLevel[channel];
  219. }
  220. //============================================================================================
  221. // This function maps a slave number to the multicast ip used to talk to it.
  222. IpAddress multicastBase("239.1.1.1"); // TBD IPv6 (need IPv6 multicast addresses?
  223. IpAddress multicastLast("239.1.5.254");
  224. const IpAddress &getChannelIp(IpAddress &ip, unsigned _channel)
  225. {
  226. // need to be careful to avoid the .0's and the .255's (not sure why...)
  227. ip = multicastBase;
  228. if (!ip.ipincrement(_channel,1,254,1,0xffff)
  229. ||(ip.ipcompare(multicastLast)>0))
  230. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Out-of-range multicast channel %d", _channel);
  231. return ip;
  232. }
  233. Owned<ISocket> multicastSocket;
  234. SocketEndpointArray *slaveEndpoints; // indexed by channel
  235. bool isSlaveEndpoint(unsigned channel, const IpAddress &slaveIp)
  236. {
  237. SocketEndpointArray &eps = slaveEndpoints[channel];
  238. ForEachItemIn(idx, eps)
  239. {
  240. if (eps.item(idx).ipequals(slaveIp))
  241. return true;
  242. }
  243. return false;
  244. }
  245. void joinMulticastChannel(unsigned channel)
  246. {
  247. if (roxieMulticastEnabled && !localSlave)
  248. {
  249. IpAddress multicastIp;
  250. getChannelIp(multicastIp, channel);
  251. SocketEndpoint ep(ccdMulticastPort, multicastIp);
  252. StringBuffer epStr;
  253. ep.getUrlStr(epStr);
  254. if (!multicastSocket->join_multicast_group(ep))
  255. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to join multicast channel %d (%s)", channel, epStr.str());
  256. if (traceLevel)
  257. DBGLOG("Joined multicast channel %d (%s)", channel, epStr.str());
  258. }
  259. }
  260. void openMulticastSocket()
  261. {
  262. if (!multicastSocket)
  263. {
  264. multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
  265. if (multicastTTL)
  266. {
  267. multicastSocket->set_ttl(multicastTTL);
  268. DBGLOG("Roxie: multicastTTL: %u", multicastTTL);
  269. }
  270. else
  271. DBGLOG("Roxie: multicastTTL not set");
  272. multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
  273. size32_t actualSize = multicastSocket->get_receive_buffer_size();
  274. if (actualSize < udpMulticastBufferSize)
  275. {
  276. DBGLOG("Roxie: multicast socket buffer size could not be set (requested=%d actual %d", udpMulticastBufferSize, actualSize);
  277. throwUnexpected();
  278. }
  279. if (traceLevel)
  280. DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", ccdMulticastPort, udpMulticastBufferSize, actualSize);
  281. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
  282. ForEach(*it)
  283. {
  284. unsigned channel = it->query().getPropInt("@channel", 0);
  285. assertex(channel);
  286. joinMulticastChannel(channel);
  287. }
  288. joinMulticastChannel(0); // all slaves also listen on channel 0
  289. }
  290. }
  291. void addEndpoint(unsigned channel, const IpAddress &slaveIp, unsigned port)
  292. {
  293. if (!slaveEndpoints)
  294. slaveEndpoints = new SocketEndpointArray[numChannels + 1];
  295. IpAddress multicastIp;
  296. if (roxieMulticastEnabled)
  297. getChannelIp(multicastIp, channel);
  298. else
  299. multicastIp = slaveIp;
  300. if (!isSlaveEndpoint(channel, multicastIp))
  301. {
  302. SocketEndpoint &ep = *new SocketEndpoint(ccdMulticastPort, multicastIp);
  303. slaveEndpoints[channel].append(ep);
  304. }
  305. if (channel)
  306. addEndpoint(0, slaveIp, port);
  307. }
  308. void closeMulticastSockets()
  309. {
  310. delete[] slaveEndpoints;
  311. slaveEndpoints = NULL;
  312. multicastSocket.clear();
  313. }
  314. size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
  315. {
  316. size32_t minwrote = 0;
  317. SocketEndpointArray &eps = slaveEndpoints[channel]; // if multicast is enabled, this will have a single multicast endpoint in it.
  318. assertex(eps.ordinality());
  319. ForEachItemIn(idx, eps)
  320. {
  321. size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), buf, size);
  322. if (!idx || wrote < minwrote)
  323. minwrote = wrote;
  324. }
  325. return minwrote;
  326. }
  327. // #define TEST_SLAVE_FAILURE
  328. //============================================================================================
  329. class CRoxieQueryPacket : implements IRoxieQueryPacket, public CInterface
  330. {
  331. protected:
  332. RoxiePacketHeader *data;
  333. const byte *continuationData;
  334. unsigned continuationLength;
  335. const byte *smartStepInfoData;
  336. unsigned smartStepInfoLength;
  337. const byte *contextData;
  338. unsigned contextLength;
  339. const byte *traceInfo;
  340. unsigned traceLength;
  341. public:
  342. IMPLEMENT_IINTERFACE;
  343. CRoxieQueryPacket(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
  344. {
  345. assertex(lengthRemaining >= (int) sizeof(RoxiePacketHeader));
  346. data->packetlength = lengthRemaining;
  347. const byte *finger = (const byte *) (data + 1);
  348. lengthRemaining -= sizeof(RoxiePacketHeader);
  349. if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
  350. {
  351. continuationData = NULL;
  352. continuationLength = 0;
  353. smartStepInfoData = NULL;
  354. smartStepInfoLength = 0;
  355. traceInfo = NULL;
  356. traceLength = 0;
  357. }
  358. else
  359. {
  360. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  361. {
  362. assertex(lengthRemaining >= (int) sizeof(unsigned short));
  363. continuationLength = *(unsigned short *) finger;
  364. continuationData = finger + sizeof(unsigned short);
  365. finger = continuationData + continuationLength;
  366. lengthRemaining -= continuationLength + sizeof(unsigned short);
  367. }
  368. else
  369. {
  370. continuationData = NULL;
  371. continuationLength = 0;
  372. }
  373. if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
  374. {
  375. assertex(lengthRemaining >= (int) sizeof(unsigned short));
  376. smartStepInfoLength = *(unsigned short *) finger;
  377. smartStepInfoData = finger + sizeof(unsigned short);
  378. finger = smartStepInfoData + smartStepInfoLength;
  379. lengthRemaining -= smartStepInfoLength + sizeof(unsigned short);
  380. }
  381. else
  382. {
  383. smartStepInfoData = NULL;
  384. smartStepInfoLength = 0;
  385. }
  386. assertex(lengthRemaining > 1);
  387. traceInfo = finger;
  388. lengthRemaining--;
  389. if (*finger++ & LOGGING_DEBUGGERACTIVE)
  390. {
  391. assertex(lengthRemaining >= (int) sizeof(unsigned short));
  392. unsigned short debugLen = *(unsigned short *) finger;
  393. finger += debugLen + sizeof(unsigned short);
  394. lengthRemaining -= debugLen + sizeof(unsigned short);
  395. }
  396. for (;;)
  397. {
  398. assertex(lengthRemaining>0);
  399. if (!*finger)
  400. {
  401. lengthRemaining--;
  402. finger++;
  403. break;
  404. }
  405. lengthRemaining--;
  406. finger++;
  407. }
  408. traceLength = finger - traceInfo;
  409. }
  410. assertex(lengthRemaining >= 0);
  411. contextData = finger;
  412. contextLength = lengthRemaining;
  413. }
  414. ~CRoxieQueryPacket()
  415. {
  416. free(data);
  417. }
  418. virtual RoxiePacketHeader &queryHeader() const
  419. {
  420. return *data;
  421. }
  422. virtual const void *queryContinuationData() const
  423. {
  424. return continuationData;
  425. }
  426. virtual unsigned getContinuationLength() const
  427. {
  428. return continuationLength;
  429. }
  430. virtual const byte *querySmartStepInfoData() const
  431. {
  432. return smartStepInfoData;
  433. }
  434. virtual unsigned getSmartStepInfoLength() const
  435. {
  436. return smartStepInfoLength;
  437. }
  438. virtual const byte *queryTraceInfo() const
  439. {
  440. return traceInfo;
  441. }
  442. virtual unsigned getTraceLength() const
  443. {
  444. return traceLength;
  445. }
  446. virtual const void *queryContextData() const
  447. {
  448. return contextData;
  449. }
  450. virtual unsigned getContextLength() const
  451. {
  452. return contextLength;
  453. }
  454. virtual IRoxieQueryPacket *clonePacket(unsigned channel) const
  455. {
  456. unsigned length = data->packetlength;
  457. RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
  458. memcpy(newdata, data, length);
  459. newdata->channel = channel;
  460. newdata->retries |= ROXIE_BROADCAST;
  461. return createRoxiePacket(newdata, length);
  462. }
  463. virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const
  464. {
  465. assertex((data->continueSequence & CONTINUE_SEQUENCE_SKIPTO) == 0); // Should not already be any skipto info in the source packet
  466. unsigned newDataSize = data->packetlength + sizeof(unsigned short) + skipDataLen;
  467. char *newdata = (char *) malloc(newDataSize);
  468. unsigned headSize = sizeof(RoxiePacketHeader);
  469. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  470. headSize += sizeof(unsigned short) + continuationLength;
  471. memcpy(newdata, data, headSize); // copy in leading part of old data
  472. ((RoxiePacketHeader *) newdata)->continueSequence |= CONTINUE_SEQUENCE_SKIPTO; // set flag indicating new data is present
  473. *(unsigned short *) (newdata + headSize) = skipDataLen; // add length field for new data
  474. memcpy(newdata + headSize + sizeof(unsigned short), skipData, skipDataLen); // copy in new data
  475. memcpy(newdata + headSize + sizeof(unsigned short) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
  476. return createRoxiePacket(newdata, newDataSize);
  477. }
  478. virtual unsigned hash() const
  479. {
  480. // This is used for Roxie server-side caching. The hash includes some of the header and all of the payload.
  481. unsigned hash = 0;
  482. if (continuationLength)
  483. hash = hashc((const unsigned char *) continuationData, continuationLength, hash);
  484. if (smartStepInfoLength)
  485. hash = hashc((const unsigned char *) smartStepInfoData, smartStepInfoLength, hash);
  486. // NOTE - don't hash the trace info!
  487. hash = hashc((const unsigned char *) contextData, contextLength, hash);
  488. hash = hashc((const unsigned char *) &data->channel, sizeof(data->channel), hash);
  489. hash = hashc((const unsigned char *) &data->overflowSequence, sizeof(data->overflowSequence), hash);
  490. hash = hashc((const unsigned char *) &data->continueSequence, sizeof(data->continueSequence), hash);
  491. // MORE - sequence fields should always be zero for anything we are caching I think... (?)
  492. // Note - no point hashing activityId (as cache is local to one activity) or serverIP (likewise)
  493. return hash;
  494. }
  495. virtual bool cacheMatch(const IRoxieQueryPacket *c) const
  496. {
  497. // note - this checks whether it's a repeat from Roxie server's point-of-view
  498. // So fields that are compared are the same as the ones that are hashed....
  499. RoxiePacketHeader &h = c->queryHeader();
  500. if (data->channel == h.channel && data->overflowSequence == h.overflowSequence && data->continueSequence == h.continueSequence)
  501. {
  502. if (continuationLength) // note - we already checked that sequences match
  503. {
  504. if (continuationLength != c->getContinuationLength())
  505. return false;
  506. if (memcmp(continuationData,c->queryContinuationData(),continuationLength)!=0)
  507. return false;
  508. }
  509. if (smartStepInfoLength)
  510. {
  511. if (smartStepInfoLength != c->getSmartStepInfoLength())
  512. return false;
  513. if (memcmp(smartStepInfoData,c->querySmartStepInfoData(),smartStepInfoLength)!=0)
  514. return false;
  515. }
  516. // NOTE - trace info NOT compared
  517. if (contextLength == c->getContextLength() && memcmp(contextData, c->queryContextData(), contextLength)==0)
  518. return true;
  519. }
  520. return false;
  521. }
  522. };
  523. extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
  524. {
  525. if ((unsigned short)_len != _len && !localSlave)
  526. {
  527. StringBuffer s;
  528. RoxiePacketHeader *header = (RoxiePacketHeader *) _data;
  529. header->toString(s);
  530. free(_data);
  531. throw MakeStringException(ROXIE_PACKET_ERROR, "Packet length %d exceeded maximum sending packet %s", _len, s.str());
  532. }
  533. return new CRoxieQueryPacket(_data, _len);
  534. }
  535. extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
  536. {
  537. unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
  538. return createRoxiePacket(m.detachOwn(), length);
  539. }
  540. //=================================================================================
  541. SlaveContextLogger::SlaveContextLogger()
  542. {
  543. GetHostIp(ip);
  544. set(NULL);
  545. }
  546. SlaveContextLogger::SlaveContextLogger(IRoxieQueryPacket *packet)
  547. {
  548. GetHostIp(ip);
  549. set(packet);
  550. }
  551. void SlaveContextLogger::set(IRoxieQueryPacket *packet)
  552. {
  553. anyOutput = false;
  554. intercept = false;
  555. debuggerActive = false;
  556. checkingHeap = false;
  557. aborted = false;
  558. stats.reset();
  559. start = msTick();
  560. if (packet)
  561. {
  562. CriticalBlock b(crit);
  563. RoxiePacketHeader &header = packet->queryHeader();
  564. const byte *traceInfo = packet->queryTraceInfo();
  565. unsigned traceLength = packet->getTraceLength();
  566. unsigned char loggingFlags = *traceInfo;
  567. if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL
  568. {
  569. traceInfo++;
  570. traceLength--;
  571. if (loggingFlags & LOGGING_INTERCEPTED)
  572. intercept = true;
  573. if (loggingFlags & LOGGING_TRACELEVELSET)
  574. {
  575. ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
  576. traceLength--;
  577. }
  578. if (loggingFlags & LOGGING_BLIND)
  579. blind = true;
  580. if (loggingFlags & LOGGING_CHECKINGHEAP)
  581. checkingHeap = true;
  582. if (loggingFlags & LOGGING_DEBUGGERACTIVE)
  583. {
  584. assertex(traceLength > sizeof(unsigned short));
  585. debuggerActive = true;
  586. unsigned short debugLen = *(unsigned short *) traceInfo;
  587. traceInfo += debugLen + sizeof(unsigned short);
  588. traceLength -= debugLen + sizeof(unsigned short);
  589. }
  590. // Passing the wuid via the logging context prefix is a bit of a hack...
  591. if (loggingFlags & LOGGING_WUID)
  592. {
  593. unsigned wuidLen = 0;
  594. while (wuidLen < traceLength)
  595. {
  596. if (traceInfo[wuidLen]=='@')
  597. break;
  598. wuidLen++;
  599. }
  600. wuid.set((const char *) traceInfo, wuidLen);
  601. }
  602. }
  603. channel = header.channel;
  604. StringBuffer s(traceLength, (const char *) traceInfo);
  605. s.append("|");
  606. ip.getIpText(s);
  607. s.append(':').append(channel);
  608. StringContextLogger::set(s.str());
  609. if (intercept || mergeSlaveStatistics)
  610. {
  611. RoxiePacketHeader newHeader(header, ROXIE_TRACEINFO);
  612. output.setown(ROQ->createOutputStream(newHeader, true, *this));
  613. }
  614. }
  615. else
  616. {
  617. StringContextLogger::set("");
  618. channel = 0;
  619. }
  620. }
  621. void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const
  622. {
  623. if (output && mergeSlaveStatistics)
  624. {
  625. MemoryBuffer buf;
  626. buf.append((char) LOG_CHILDCOUNT); // A special log entry for the stats
  627. buf.append(subGraphId);
  628. buf.append(actId);
  629. buf.append(idx);
  630. buf.append(processed);
  631. buf.append(strands);
  632. }
  633. }
  634. void SlaveContextLogger::putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const
  635. {
  636. if (output && mergeSlaveStatistics)
  637. {
  638. MemoryBuffer buf;
  639. buf.append((char) LOG_CHILDSTATS); // A special log entry for the stats
  640. buf.append(subGraphId);
  641. buf.append(actId);
  642. if (stats.serialize(buf))
  643. {
  644. unsigned len = buf.length();
  645. void *ret = output->getBuffer(len, true);
  646. memcpy(ret, buf.toByteArray(), len);
  647. output->putBuffer(ret, len, true);
  648. anyOutput = true;
  649. }
  650. }
  651. }
  652. void SlaveContextLogger::flush()
  653. {
  654. if (output)
  655. {
  656. CriticalBlock b(crit);
  657. if (mergeSlaveStatistics)
  658. {
  659. MemoryBuffer buf;
  660. buf.append((char) LOG_STATVALUES); // A special log entry for the stats
  661. if (stats.serialize(buf))
  662. {
  663. unsigned len = buf.length();
  664. void *ret = output->getBuffer(len, true);
  665. memcpy(ret, buf.toByteArray(), len);
  666. output->putBuffer(ret, len, true);
  667. anyOutput = true;
  668. }
  669. }
  670. ForEachItemIn(idx, log)
  671. {
  672. MemoryBuffer buf;
  673. LogItem &logItem = log.item(idx);
  674. logItem.serialize(buf);
  675. unsigned len = buf.length();
  676. void *ret = output->getBuffer(len, true);
  677. memcpy(ret, buf.toByteArray(), len);
  678. output->putBuffer(ret, len, true);
  679. anyOutput = true;
  680. }
  681. log.kill();
  682. if (anyOutput)
  683. output->flush(true);
  684. output.clear();
  685. }
  686. }
  687. //=================================================================================
  688. /*
  689. * IBYTI handling
  690. *
  691. * IBYTI (I beat you to it) messages are sent by the slave that is going to process a particular request,
  692. * to tell the other slaves on the same channel not to bother.
  693. *
  694. * In order to reduce wasted work, for each request a "primary" subchannel is selected (based on a hash of the
  695. * packet's RUID) - this channel will process the request immediately, but others will delay a little while
  696. * in order to give the expected IBYTI time to arrive.
  697. *
  698. * The decision on how long to delay is a little complex - too long, and you end up losing the ability for a
  699. * backup slave to step in when primary is under load (or dead); too short and you end up duplicating work.
  700. * It's also important for the delay to be adaptive so that if a slave goes offline, the other slaves on the
  701. * subchannel don't keep waiting for it to take its turn.
  702. *
  703. * The adaptiveness is handled by noting any time that we delay waiting for an IBYTI that does not arrive - this
  704. * may mean that the slave(s) we expected to get there first are offline, and thus next time we don't wait quite
  705. * so long for them. Conversely, any time an IBYTI does arrive from another slave on your channel, you know that
  706. * it is online and so can reset the delay to its original value.
  707. *
  708. * A previous version of this code assumed a single missed IBYTI was enough to assume that a slave was dead and drop the
  709. * delay for that slave to zero - this turned out to behave pretty poorly when under load, with much duplicated work.
  710. * Thus we take care to adjust the delay more gradually, while still ending up with a zero delay if the buddy does not respond
  711. * several times in a row.
  712. */
  713. /*
  714. * A "subchannel" is a value from 1 to 7 (with current settings) that indicates which "copy" of the data for this channel
  715. * is being processed by this slave. A value of 0 would indicate that this slave does not have any data for this channel.
  716. * In a typical 100-way roxie with cyclic redundancy, node 1 would be processing channel 1, subchannel 1, and channel 2,
  717. * subchannel 2, node 2 would be processing channel 2, subchannel 1 and channel 3, subchannel 2, and so on u to node 100,
  718. * which would process channel 100, subchannel 1 and channel 1, subchannel 2. The subChannels array tells me my subchannel
  719. * for any given channel.
  720. *
  721. * To determine which subchannel is the "primary" for a given query packet, a hash value of fields from the packet header
  722. * is used, modulo the number of subchannels on this channel. The slave on this subchannel will respond immediately.
  723. * Slaves on other channels delay according to the subchannel number - so on a 4-way redundant system, if the primary
  724. * subchannel is decided to be 2, the slave on subchannel 3 will delay by 1 ibytiDelay value, the slave on subchannel 4 by
  725. * 2 values, and the slave on subchannel 1 by 3 values (this assumes all slaves are responding normally).
  726. *
  727. * In fact, the calculation is a little more complex, in that the "units" are adjusted per subchannel to take into account
  728. * the responsiveness or otherwise of a subchannel. Initially, the delay value for each subchannel is the same, but any time
  729. * a slave waits for an IBYTI that does not arrive on time, the delay value for any slave that is "more primary" than me for
  730. * this packet is reduced. Any time an IBYTI _does_ arrive on time, the delay is reset to its initial value.
  731. */
  732. class ChannelInfo
  733. {
  734. public:
  735. unsigned getIbytiDelay(unsigned primarySubChannel) const // NOTE - zero-based
  736. {
  737. unsigned delay = 0;
  738. unsigned subChannel = primarySubChannel;
  739. while (subChannel != mySubChannel)
  740. {
  741. delay += currentDelay[subChannel];
  742. subChannel++;
  743. if (subChannel == numSubChannels)
  744. subChannel = 0;
  745. }
  746. return delay;
  747. }
  748. void noteChannelsSick(unsigned primarySubChannel)
  749. {
  750. unsigned subChannel = primarySubChannel;
  751. while (subChannel != mySubChannel)
  752. {
  753. unsigned newDelay = currentDelay[subChannel] / 2;
  754. if (newDelay < minIbytiDelay)
  755. newDelay = minIbytiDelay;
  756. currentDelay[subChannel] = newDelay;
  757. subChannel++;
  758. if (subChannel == numSubChannels)
  759. subChannel = 0;
  760. }
  761. }
  762. void noteChannelHealthy(unsigned subChannel)
  763. {
  764. currentDelay[subChannel] = initIbytiDelay;
  765. }
  766. void init(unsigned _subChannel, unsigned _numSubChannels)
  767. {
  768. mySubChannel = _subChannel;
  769. numSubChannels = _numSubChannels;
  770. for (unsigned i = 0; i < numSubChannels; i++)
  771. currentDelay[i] = initIbytiDelay;
  772. }
  773. private:
  774. unsigned mySubChannel = 0; // Which subChannel does this node implement for this channel - zero-based
  775. unsigned numSubChannels = 0; // How many subchannels are there for this channel, across all slaves
  776. unsigned currentDelay[MAX_SUBCHANNEL] = {0}; // NOTE - technically should be atomic, but in the event of a race we don't really care who wins
  777. };
  778. static ChannelInfo channelInfo[MAX_CLUSTER_SIZE];
  779. /*
  780. * Determine whether to abort on receipt of an IBYTI for a packet which I have already started processing
  781. * As I will also have sent out an IBYTI, I should only abort if the sender of the IBYTI has higher priority
  782. * for this packet than I do.
  783. *
  784. * MORE - move this function into ChannelInfo class.
  785. */
  786. bool otherSlaveHasPriority(const RoxiePacketHeader &h)
  787. {
  788. unsigned hash = h.priorityHash();
  789. unsigned primarySubChannel = (hash % numSlaves[h.channel]);
  790. unsigned mySubChannel = subChannels[h.channel] - 1;
  791. unsigned otherSlaveSubChannel = h.getRespondingSubChannel();
  792. // could be coded smarter! Basically mysub - prim < theirsub - prim using modulo arithmetic, I think
  793. while (primarySubChannel != mySubChannel)
  794. {
  795. if (primarySubChannel == otherSlaveSubChannel)
  796. return true;
  797. primarySubChannel++;
  798. if (primarySubChannel >= numSlaves[h.channel])
  799. primarySubChannel = 0;
  800. }
  801. return false;
  802. }
  803. //=================================================================================
  804. static SpinLock onDemandQueriesCrit;
  805. static MapXToMyClass<hash64_t, hash64_t, IQueryFactory> onDemandQueryCache;
  806. void sendUnloadMessage(hash64_t hash, const char *id, const IRoxieContextLogger &logctx)
  807. {
  808. unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen(id) + 1;
  809. void *packetData = malloc(packetSize);
  810. RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
  811. RemoteActivityId unloadId(ROXIE_UNLOAD, hash);
  812. header->init(unloadId, 0, 0, 0);
  813. char *finger = (char *) (header + 1);
  814. *finger++ = (char) LOGGING_FLAGSPRESENT;
  815. strcpy(finger, id);
  816. finger += strlen(id)+1;
  817. if (traceLevel > 1)
  818. DBGLOG("UNLOAD sent for query %s", id);
  819. Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
  820. ROQ->sendPacket(packet, logctx);
  821. }
  822. void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  823. {
  824. const RoxiePacketHeader &header = packet->queryHeader();
  825. unsigned channelNo = header.channel;
  826. logctx.CTXLOG("Unload received for channel %d", channelNo);
  827. hash64_t hashValue = header.queryHash;
  828. hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  829. SpinBlock b(onDemandQueriesCrit);
  830. onDemandQueryCache.remove(hashValue);
  831. }
  832. void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
  833. {
  834. hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  835. SpinBlock b(onDemandQueriesCrit);
  836. onDemandQueryCache.setValue(hashValue, query);
  837. }
  838. //=================================================================================
  839. struct PingRecord
  840. {
  841. unsigned tick;
  842. IpAddress senderIP;
  843. };
  844. void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  845. {
  846. const RoxiePacketHeader &header = packet->queryHeader();
  847. const IpAddress &serverIP = getNodeAddress(header.serverIdx);
  848. unsigned contextLength = packet->getContextLength();
  849. if (contextLength != sizeof(PingRecord))
  850. {
  851. StringBuffer s;
  852. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Unexpected data size %d (expected %d) in PING: %s", contextLength, (unsigned) sizeof(PingRecord), header.toString(s).str());
  853. }
  854. const PingRecord *data = (const PingRecord *) packet->queryContextData();
  855. if (!serverIP.ipequals(data->senderIP))
  856. {
  857. StringBuffer s;
  858. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
  859. }
  860. RoxiePacketHeader newHeader(header, ROXIE_PING);
  861. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  862. void *ret = output->getBuffer(contextLength, false);
  863. memcpy(ret, data, contextLength);
  864. output->putBuffer(ret, contextLength, false);
  865. output->flush(true);
  866. }
  867. //=================================================================================
  868. //
  869. // RoxieQueue - holds pending transactions on a roxie agent
  870. class RoxieQueue : public CInterface, implements IThreadFactory
  871. {
  872. Owned <IThreadPool> workers;
  873. QueueOf<IRoxieQueryPacket, true> waiting;
  874. Semaphore available;
  875. CriticalSection qcrit;
  876. unsigned headRegionSize;
  877. unsigned numWorkers;
  878. RelaxedAtomic<unsigned> started;
  879. std::atomic<unsigned> idle;
  880. void noteQueued()
  881. {
  882. maxQueueLength.store_max(++queueLength);
  883. // NOTE - there is a small race condition here - if idle is 1 but two enqueue's happen
  884. // close enough together that the signal has not yet caused idle to come back down to zero, then the
  885. // desired new thread may not be created. It's unlikely, and it's benign in that the query is still
  886. // processed and the thread will be created next time the HWM is reached.
  887. if (started < numWorkers && idle==0)
  888. {
  889. workers->start(this);
  890. started++;
  891. }
  892. }
  893. public:
  894. IMPLEMENT_IINTERFACE;
  895. RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
  896. {
  897. headRegionSize = _headRegionSize;
  898. numWorkers = _numWorkers;
  899. workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
  900. started = 0;
  901. idle = 0;
  902. }
  903. virtual IPooledThread *createNew();
  904. void abortChannel(unsigned channel);
  905. void start()
  906. {
  907. if (prestartSlaveThreads)
  908. {
  909. while (started < numWorkers)
  910. {
  911. workers->start(this);
  912. started++;
  913. }
  914. }
  915. }
  916. IPooledThreadIterator *running()
  917. {
  918. return workers->running();
  919. }
  920. void stopAll()
  921. {
  922. workers->stopAll(true);
  923. signal(workers->runningCount());
  924. }
  925. void join()
  926. {
  927. workers->joinAll(true);
  928. workers.clear(); // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
  929. }
  930. void enqueue(IRoxieQueryPacket *x)
  931. {
  932. {
  933. #ifdef TIME_PACKETS
  934. x->queryHeader().tick = msTick();
  935. #endif
  936. CriticalBlock qc(qcrit);
  937. waiting.enqueue(x);
  938. noteQueued();
  939. }
  940. available.signal();
  941. }
  942. void enqueueUnique(IRoxieQueryPacket *x)
  943. {
  944. RoxiePacketHeader &header = x->queryHeader();
  945. #ifdef TIME_PACKETS
  946. header.tick = msTick();
  947. #endif
  948. bool found = false;
  949. {
  950. CriticalBlock qc(qcrit);
  951. unsigned len = waiting.ordinality();
  952. unsigned i;
  953. for (i = 0; i < len; i++)
  954. {
  955. IRoxieQueryPacket *queued = waiting.item(i);
  956. if (queued && queued->queryHeader().matchPacket(header))
  957. {
  958. found = true;
  959. break;
  960. }
  961. }
  962. if (!found)
  963. waiting.enqueue(x);
  964. }
  965. if (found)
  966. {
  967. bool primChannel = true;
  968. if (subChannels[header.channel] != 1) primChannel = false;
  969. if (traceLevel > 0)
  970. {
  971. StringBuffer xx;
  972. SlaveContextLogger l(x);
  973. l.CTXLOG("Ignored retry on %s channel for queued activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  974. }
  975. if (primChannel)
  976. retriesIgnoredPrm++;
  977. else
  978. retriesIgnoredSec++;
  979. x->Release();
  980. }
  981. else
  982. {
  983. available.signal();
  984. noteQueued();
  985. if (traceLevel > 10)
  986. {
  987. SlaveContextLogger l(x);
  988. StringBuffer xx;
  989. l.CTXLOG("enqueued %s", header.toString(xx).str());
  990. }
  991. }
  992. }
  993. bool remove(RoxiePacketHeader &x)
  994. {
  995. unsigned scanLength = 0;
  996. IRoxieQueryPacket *found = nullptr;
  997. {
  998. CriticalBlock qc(qcrit);
  999. unsigned len = waiting.ordinality();
  1000. unsigned i;
  1001. for (i = 0; i < len; i++)
  1002. {
  1003. IRoxieQueryPacket *queued = waiting.item(i);
  1004. if (queued)
  1005. {
  1006. scanLength++;
  1007. if (queued->queryHeader().matchPacket(x))
  1008. {
  1009. waiting.set(i, NULL);
  1010. found = queued;
  1011. break;
  1012. }
  1013. }
  1014. }
  1015. }
  1016. if (found)
  1017. {
  1018. #ifdef _DEBUG
  1019. RoxiePacketHeader &header = found->queryHeader();
  1020. SlaveContextLogger l(found);
  1021. StringBuffer xx;
  1022. l.CTXLOG("discarded %s", header.toString(xx).str());
  1023. #endif
  1024. found->Release();
  1025. queueLength--;
  1026. if (scanLength > maxScanLength)
  1027. maxScanLength = scanLength;
  1028. totScanLength += scanLength;
  1029. totScans++;
  1030. return true;
  1031. }
  1032. else
  1033. return false;
  1034. }
  1035. void wait()
  1036. {
  1037. idle++;
  1038. available.wait();
  1039. idle--;
  1040. }
  1041. void signal(unsigned num)
  1042. {
  1043. available.signal(num);
  1044. }
  1045. IRoxieQueryPacket *dequeue()
  1046. {
  1047. CriticalBlock qc(qcrit);
  1048. unsigned lim = waiting.ordinality();
  1049. if (lim)
  1050. {
  1051. if (headRegionSize)
  1052. {
  1053. if (lim > headRegionSize)
  1054. lim = headRegionSize;
  1055. return waiting.dequeue(fastRand() % lim);
  1056. }
  1057. return waiting.dequeue();
  1058. }
  1059. else
  1060. return NULL;
  1061. }
  1062. unsigned getHeadRegionSize() const
  1063. {
  1064. return headRegionSize;
  1065. }
  1066. unsigned setHeadRegionSize(unsigned newsize)
  1067. {
  1068. unsigned ret = headRegionSize;
  1069. headRegionSize = newsize;
  1070. return ret;
  1071. }
  1072. };
  1073. class CRoxieWorker : public CInterface, implements IPooledThread
  1074. {
  1075. RoxieQueue *queue;
  1076. CriticalSection actCrit;
  1077. Semaphore ibytiSem;
  1078. bool stopped;
  1079. bool abortJob;
  1080. bool busy;
  1081. Owned<IRoxieSlaveActivity> activity;
  1082. Owned<IRoxieQueryPacket> packet;
  1083. SlaveContextLogger logctx;
  1084. public:
  1085. IMPLEMENT_IINTERFACE;
  1086. CRoxieWorker()
  1087. {
  1088. queue = NULL;
  1089. stopped = false;
  1090. busy = false;
  1091. abortJob = false;
  1092. }
  1093. virtual void init(void *_r) override
  1094. {
  1095. queue = (RoxieQueue *) _r;
  1096. stopped = false;
  1097. busy = false;
  1098. abortJob = false;
  1099. }
  1100. virtual bool canReuse() const override
  1101. {
  1102. return true;
  1103. }
  1104. virtual bool stop() override
  1105. {
  1106. stopped = true;
  1107. return true;
  1108. }
  1109. inline void setActivity(IRoxieSlaveActivity *act)
  1110. {
  1111. CriticalBlock b(actCrit);
  1112. activity.setown(act);
  1113. }
  1114. inline bool match(RoxiePacketHeader &h)
  1115. {
  1116. // There is a window between getting packet from queue and being able to match it.
  1117. // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
  1118. CriticalBlock b(actCrit);
  1119. return packet && packet->queryHeader().matchPacket(h);
  1120. }
  1121. void abortChannel(unsigned channel)
  1122. {
  1123. CriticalBlock b(actCrit);
  1124. if (packet && packet->queryHeader().channel==channel)
  1125. {
  1126. abortJob = true;
  1127. if (doIbytiDelay)
  1128. ibytiSem.signal();
  1129. if (activity)
  1130. activity->abort();
  1131. }
  1132. }
  1133. bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
  1134. {
  1135. CriticalBlock b(actCrit);
  1136. if (packet && packet->queryHeader().matchPacket(h))
  1137. {
  1138. queryFound = true;
  1139. abortJob = true;
  1140. if (doIbytiDelay)
  1141. ibytiSem.signal();
  1142. if (activity)
  1143. {
  1144. // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority slave
  1145. // (more primary in the rank). The slaves with higher rank will hold the lower bits of the retries field in IBYTI packet).
  1146. if (!checkRank || otherSlaveHasPriority(h))
  1147. {
  1148. activity->abort();
  1149. return true;
  1150. }
  1151. else
  1152. {
  1153. return false;
  1154. }
  1155. }
  1156. if (busy)
  1157. {
  1158. preActivity = true;
  1159. return true;
  1160. }
  1161. }
  1162. return false;
  1163. }
  1164. void throwRemoteException(IException *E, IRoxieSlaveActivity *activity, IRoxieQueryPacket *packet, bool isUser)
  1165. {
  1166. try
  1167. {
  1168. if (activity && (logctx.queryTraceLevel() > 1))
  1169. {
  1170. StringBuffer act;
  1171. activity->toString(act);
  1172. logctx.CTXLOG("throwRemoteException, activity %s, isUser=%d", act.str(), (int) isUser);
  1173. if (!isUser)
  1174. EXCLOG(E, "throwRemoteException");
  1175. }
  1176. RoxiePacketHeader &header = packet->queryHeader();
  1177. // I failed to do the query, but already sent out IBYTI - resend it so someone else can try
  1178. if (!isUser)
  1179. {
  1180. StringBuffer s;
  1181. s.append("Exception in slave for packet ");
  1182. header.toString(s);
  1183. logctx.logOperatorException(E, NULL, 0, "%s", s.str());
  1184. header.setException();
  1185. if (!header.allChannelsFailed() && !localSlave)
  1186. {
  1187. if (logctx.queryTraceLevel() > 1)
  1188. logctx.CTXLOG("resending packet from slave in case others want to try it");
  1189. ROQ->sendPacket(packet, logctx);
  1190. }
  1191. }
  1192. RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION);
  1193. if (isUser)
  1194. newHeader.retries = (unsigned short) -1;
  1195. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1196. StringBuffer message("<Exception>");
  1197. message.appendf("<Code>%d</Code><Message>", E->errorCode());
  1198. StringBuffer err;
  1199. E->errorMessage(err);
  1200. encodeXML(err.str(), message);
  1201. message.append("</Message></Exception>");
  1202. unsigned len = message.length();
  1203. void *ret = output->getBuffer(len+1, true);
  1204. memcpy(ret, message.str(), len+1);
  1205. output->putBuffer(ret, len+1, true);
  1206. output->flush(true);
  1207. E->Release();
  1208. }
  1209. catch (IException *EInE)
  1210. {
  1211. EXCLOG(EInE, "Exception during throwRemoteException");
  1212. E->Release();
  1213. EInE->Release();
  1214. }
  1215. catch (...)
  1216. {
  1217. logctx.CTXLOG("Unknown Exception during throwRemoteException");
  1218. E->Release();
  1219. }
  1220. }
  1221. void doActivity()
  1222. {
  1223. RoxiePacketHeader &header = packet->queryHeader();
  1224. unsigned channel = header.channel;
  1225. hash64_t queryHash = packet->queryHeader().queryHash;
  1226. unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
  1227. Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
  1228. if (!queryFactory && logctx.queryWuid())
  1229. {
  1230. Owned <IRoxieDaliHelper> daliHelper = connectToDali();
  1231. Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(logctx.queryWuid(), NULL);
  1232. queryFactory.setown(createSlaveQueryFactoryFromWu(wu, channel));
  1233. if (queryFactory)
  1234. cacheOnDemandQuery(queryHash, channel, queryFactory);
  1235. }
  1236. if (!queryFactory)
  1237. {
  1238. StringBuffer hdr;
  1239. IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie slave received request for unregistered query: %s", packet->queryHeader().toString(hdr).str());
  1240. EXCLOG(E, "doActivity");
  1241. throwRemoteException(E, activity, packet, false);
  1242. return;
  1243. }
  1244. try
  1245. {
  1246. if (logctx.queryTraceLevel() > 8)
  1247. {
  1248. StringBuffer x;
  1249. logctx.CTXLOG("IBYTI delay controls : doIbytiDelay=%s numslaves=%u subchnl=%u : %s",
  1250. doIbytiDelay?"YES":"NO",
  1251. numSlaves[channel], subChannels[channel],
  1252. header.toString(x).str());
  1253. }
  1254. bool debugging = logctx.queryDebuggerActive();
  1255. if (debugging)
  1256. {
  1257. if (subChannels[channel] != 1)
  1258. abortJob = true; // when debugging, we always run on primary only...
  1259. }
  1260. else if (doIbytiDelay && (numSlaves[channel] > 1))
  1261. {
  1262. bool primChannel = true;
  1263. if (subChannels[channel] != 1)
  1264. primChannel = false;
  1265. unsigned hdrHashVal = header.priorityHash();
  1266. unsigned primarySubChannel = (hdrHashVal % numSlaves[channel]);
  1267. if (primarySubChannel != subChannels[channel]-1)
  1268. {
  1269. unsigned delay = channelInfo[channel].getIbytiDelay(primarySubChannel);
  1270. if (logctx.queryTraceLevel() > 6)
  1271. {
  1272. StringBuffer x;
  1273. logctx.CTXLOG("YES myTurnToDelayIBYTI channel=%s delay=%u hash=%u %s", primChannel?"primary":"secondary", delay, hdrHashVal, header.toString(x).str());
  1274. }
  1275. // MORE: if we are dealing with a query that was on channel 0, we may want a longer delay
  1276. // (since the theory about duplicated work not mattering when cluster is idle does not hold up)
  1277. if (delay)
  1278. {
  1279. ibytiSem.wait(delay);
  1280. if (!abortJob)
  1281. channelInfo[channel].noteChannelsSick(primarySubChannel);
  1282. if (logctx.queryTraceLevel() > 8)
  1283. {
  1284. StringBuffer x;
  1285. logctx.CTXLOG("Buddy did%s send IBYTI, updated delay : %s",
  1286. abortJob ? "" : " NOT", header.toString(x).str());
  1287. }
  1288. }
  1289. }
  1290. else
  1291. {
  1292. #ifndef NO_IBYTI_DELAYS_COUNT
  1293. if (primChannel)
  1294. ibytiNoDelaysPrm++;
  1295. else
  1296. ibytiNoDelaysSec++;
  1297. #endif
  1298. if (logctx.queryTraceLevel() > 6)
  1299. {
  1300. StringBuffer x;
  1301. logctx.CTXLOG("NOT myTurnToDelayIBYTI channel=%s hash=%u %s", primChannel?"primary":"secondary", hdrHashVal, header.toString(x).str());
  1302. }
  1303. }
  1304. }
  1305. if (abortJob)
  1306. {
  1307. CriticalBlock b(actCrit);
  1308. busy = false; // Keep order - before setActivity below
  1309. if (logctx.queryTraceLevel() > 5)
  1310. {
  1311. StringBuffer x;
  1312. logctx.CTXLOG("Stop before processing - activity aborted %s", header.toString(x).str());
  1313. }
  1314. return;
  1315. }
  1316. if (!debugging)
  1317. ROQ->sendIbyti(header, logctx);
  1318. activitiesStarted++;
  1319. Owned <ISlaveActivityFactory> factory = queryFactory->getSlaveActivityFactory(activityId);
  1320. assertex(factory);
  1321. setActivity(factory->createActivity(logctx, packet));
  1322. #ifdef TEST_SLAVE_FAILURE
  1323. bool skip = false;
  1324. if (testSlaveFailure)
  1325. {
  1326. // Meaning of each byte in testSlaveFailure
  1327. // bits 1 -> 4 : test cast type (4 bits)
  1328. // bits 5 -> 11 : test case Freq (7 bits)
  1329. // bit 12 : : 1 Dot NOT Send ROXIE_ALIVE message - 0: send it
  1330. // bits 13 -> 32 : test case parameter (20 bits), if any
  1331. unsigned testCaseType = (testSlaveFailure & 0x0000000F);
  1332. unsigned testCaseFreq = (testSlaveFailure & 0x000007F0) >> 4;
  1333. unsigned testCaseParm = (testSlaveFailure & 0xFFFFF000) >> 12;
  1334. if (testCaseFreq && (atomic_read(&activitiesStarted) % testCaseFreq == 0))
  1335. {
  1336. StringBuffer x;
  1337. logctx.CTXLOG("------------ TestSlaveFailure to do the following (testCase=%u freq=%u tot=%u parm=%u ROXIE_ALIVE is %s - val=0x%.8X) for %s",
  1338. testCaseType, testCaseFreq, (unsigned) atomic_read(&activitiesStarted), testCaseParm, (testSlaveFailure & 0x800) ? "OFF" : "ON",
  1339. testSlaveFailure, header.toString(x).str());
  1340. switch (testCaseType)
  1341. {
  1342. case 1:
  1343. if (testCaseParm == 0) testCaseParm = 10000;
  1344. logctx.CTXLOG("--------------- Sleeping for %u ms - testCase=%u", testCaseParm, testCaseType);
  1345. Sleep(testCaseParm);
  1346. break;
  1347. case 2:
  1348. logctx.CTXLOG("--------------- Skip processing - testCase=%u ------", testCaseType);
  1349. skip = true;
  1350. break;
  1351. case 3:
  1352. logctx.CTXLOG("--------------- Throwing Exception String number %u NOW - testCase=%u -----", ROXIE_FILE_ERROR, testCaseType);
  1353. throw MakeStringException(ROXIE_FILE_ERROR, "Simulate File Exception in slave NOW");
  1354. break;
  1355. case 4:
  1356. if (numSlaves[channel] == 1)
  1357. {
  1358. logctx.CTXLOG("--------------- Setting numSlaves[channel=%u] to 2 to force one way to act as two way for ibyti logic testing - testCase=%u ------", channel, testCaseType);
  1359. numSlaves[channel] = 2;
  1360. }
  1361. testSlaveFailure = 0;
  1362. break;
  1363. }
  1364. }
  1365. }
  1366. if (!skip)
  1367. {
  1368. #endif
  1369. Owned<IMessagePacker> output = activity->process();
  1370. if (logctx.queryTraceLevel() > 5)
  1371. {
  1372. StringBuffer x;
  1373. logctx.CTXLOG("done processing %s", header.toString(x).str());
  1374. }
  1375. if (output)
  1376. {
  1377. activitiesCompleted++;
  1378. busy = false; // Keep order - before setActivity below
  1379. setActivity(NULL); // Ensures all stats are merged from child queries etc
  1380. logctx.flush();
  1381. output->flush(true);
  1382. }
  1383. #ifdef TEST_SLAVE_FAILURE
  1384. }
  1385. #endif
  1386. }
  1387. catch (IUserException *E)
  1388. {
  1389. throwRemoteException(E, activity, packet, true);
  1390. }
  1391. catch (IException *E)
  1392. {
  1393. if (E->errorCode()!=ROXIE_ABORT_ERROR)
  1394. throwRemoteException(E, activity, packet, false);
  1395. }
  1396. catch (...)
  1397. {
  1398. throwRemoteException(MakeStringException(ROXIE_MULTICAST_ERROR, "Unknown exception"), activity, packet, false);
  1399. }
  1400. busy = false; // Keep order - before setActivity below
  1401. setActivity(NULL);
  1402. }
  1403. virtual void threadmain() override
  1404. {
  1405. while (!stopped)
  1406. {
  1407. try
  1408. {
  1409. for (;;)
  1410. {
  1411. queue->wait();
  1412. if (stopped)
  1413. break;
  1414. slavesActive++;
  1415. maxSlavesActive.store_max(slavesActive);
  1416. abortJob = false;
  1417. busy = true;
  1418. if (doIbytiDelay)
  1419. ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
  1420. packet.setown(queue->dequeue());
  1421. if (packet)
  1422. {
  1423. queueLength--;
  1424. RoxiePacketHeader &header = packet->queryHeader();
  1425. logctx.set(packet);
  1426. #ifdef TIME_PACKETS
  1427. {
  1428. unsigned now = msTick();
  1429. unsigned packetWait = now-header.tick;
  1430. header.tick = now;
  1431. packetWaitMax.store_max(packetWait);
  1432. packetWaitElapsed += packetWait;
  1433. packetWaitCount++;
  1434. }
  1435. #endif
  1436. if (logctx.queryTraceLevel() > 10)
  1437. {
  1438. StringBuffer x;
  1439. logctx.CTXLOG("dequeued %s", header.toString(x).str());
  1440. }
  1441. if (ROQ->checkSuspended(header, logctx))
  1442. {
  1443. StringBuffer s;
  1444. logctx.CTXLOG("Ignoring packet for suspended channel %d: %s", header.channel, header.toString(s).str());
  1445. }
  1446. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_UNLOAD)
  1447. {
  1448. doUnload(packet, logctx);
  1449. }
  1450. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_PING)
  1451. {
  1452. doPing(packet, logctx);
  1453. }
  1454. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_DEBUGREQUEST)
  1455. {
  1456. // MORE - we need to make sure only executed on primary, and that the proxyId (== pointer to DebugGraphManager) is still valid.
  1457. // It may be that there is not a lot of point using the pointer - may as well use an non-reused ID and look it up in a global hash table of active ones
  1458. doDebugRequest(packet, logctx);
  1459. }
  1460. else if (header.channel)
  1461. doActivity();
  1462. else
  1463. throwUnexpected(); // channel 0 requests translated earlier now
  1464. #ifdef TIME_PACKETS
  1465. {
  1466. unsigned now = msTick();
  1467. unsigned packetRun = now-header.tick;
  1468. packetRunMax.store_max(packetRun);
  1469. packetRunElapsed += packetRun;
  1470. packetRunCount++;
  1471. }
  1472. #endif
  1473. }
  1474. busy = false;
  1475. {
  1476. CriticalBlock b(actCrit);
  1477. packet.clear();
  1478. logctx.set(NULL);
  1479. }
  1480. slavesActive--;
  1481. }
  1482. }
  1483. catch(...)
  1484. {
  1485. CriticalBlock b(actCrit);
  1486. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
  1487. EXCLOG(E);
  1488. if (packet)
  1489. throwRemoteException(E.getClear(), NULL, packet, false);
  1490. packet.clear();
  1491. }
  1492. }
  1493. }
  1494. };
  1495. IPooledThread *RoxieQueue::createNew()
  1496. {
  1497. return new CRoxieWorker;
  1498. }
  1499. void RoxieQueue::abortChannel(unsigned channel)
  1500. {
  1501. Owned<IPooledThreadIterator> wi = workers->running();
  1502. ForEach(*wi)
  1503. {
  1504. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1505. w.abortChannel(channel);
  1506. }
  1507. }
  1508. //=================================================================================
  1509. class CallbackEntry : implements IPendingCallback, public CInterface
  1510. {
  1511. const RoxiePacketHeader &header;
  1512. StringAttr lfn;
  1513. InterruptableSemaphore ready;
  1514. MemoryBuffer data;
  1515. bool gotData;
  1516. public:
  1517. IMPLEMENT_IINTERFACE;
  1518. CallbackEntry(const RoxiePacketHeader &_header, const char *_lfn) : header(_header), lfn(_lfn)
  1519. {
  1520. gotData = false;
  1521. }
  1522. virtual bool wait(unsigned msecs)
  1523. {
  1524. return ready.wait(msecs);
  1525. }
  1526. virtual MemoryBuffer &queryData()
  1527. {
  1528. return data;
  1529. }
  1530. bool matches(RoxiePacketHeader &cand, const char *_lfn)
  1531. {
  1532. return (cand.matchPacket(header) && (!_lfn|| stricmp(_lfn, lfn)==0));
  1533. }
  1534. void doFileCallback(unsigned _len, const void *_data, bool aborted)
  1535. {
  1536. // MORE - make sure we call this for whole query abort as well as for callback abort
  1537. if (aborted)
  1538. ready.interrupt(MakeStringException(0, "Interrupted"));
  1539. else if (!gotData)
  1540. {
  1541. gotData = true;
  1542. data.append(_len, _data);
  1543. ready.signal();
  1544. }
  1545. }
  1546. };
  1547. class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
  1548. {
  1549. protected:
  1550. #ifdef ROXIE_SLA_LOGIC
  1551. RoxieQueue slaQueue;
  1552. #endif
  1553. RoxieQueue hiQueue;
  1554. RoxieQueue loQueue;
  1555. unsigned numWorkers;
  1556. public:
  1557. IMPLEMENT_IINTERFACE;
  1558. #ifdef ROXIE_SLA_LOGIC
  1559. RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
  1560. #else
  1561. RoxieReceiverBase(unsigned _numWorkers) : hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
  1562. #endif
  1563. {
  1564. CriticalBlock b(ccdChannelsCrit);
  1565. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
  1566. ForEach(*it)
  1567. {
  1568. unsigned channel = it->query().getPropInt("@channel", 0);
  1569. unsigned subChannel = it->query().getPropInt("@subChannel", 0);
  1570. assertex(channel <= numChannels);
  1571. assertex(subChannels[channel] == 0);
  1572. assertex(subChannel != 0);
  1573. subChannels[channel] = subChannel;
  1574. channels[channelCount++] = channel;
  1575. channelInfo[channel].init(subChannel-1, numSlaves[channel]);
  1576. }
  1577. }
  1578. virtual bool checkSuspended(const RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1579. {
  1580. bool suspended = suspendedChannels[header.channel];
  1581. if (suspended)
  1582. {
  1583. try
  1584. {
  1585. RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION);
  1586. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1587. StringBuffer message;
  1588. message.appendf("<Exception><Code>%d</Code><Message>Channel %d is suspended</Message></Exception>", ROXIE_CHANNEL_SUSPENDED, header.channel);
  1589. unsigned len = message.length();
  1590. void *ret = output->getBuffer(len+1, true);
  1591. memcpy(ret, message.str(), len+1);
  1592. output->putBuffer(ret, len+1, true);
  1593. output->flush(true);
  1594. }
  1595. catch (IException *EInE)
  1596. {
  1597. EXCLOG(EInE, "Exception during checkSuspended");
  1598. EInE->Release();
  1599. }
  1600. catch (...)
  1601. {
  1602. logctx.CTXLOG("Unknown Exception during checkSuspended");
  1603. }
  1604. }
  1605. return suspended;
  1606. }
  1607. virtual bool suspendChannel(unsigned channel, bool suspend, const IRoxieContextLogger &logctx)
  1608. {
  1609. assertex(channel < MAX_CLUSTER_SIZE);
  1610. bool prev = suspendedChannels[channel];
  1611. suspendedChannels[channel] = suspend;
  1612. if (suspend && subChannels[channel] && !prev)
  1613. {
  1614. logctx.CTXLOG("ERROR: suspending channel %d - aborting active queries", channel);
  1615. #ifdef ROXIE_SLA_LOGIC
  1616. slaQueue.abortChannel(channel);
  1617. #endif
  1618. hiQueue.abortChannel(channel);
  1619. loQueue.abortChannel(channel);
  1620. }
  1621. return prev;
  1622. }
  1623. virtual unsigned getHeadRegionSize() const
  1624. {
  1625. return loQueue.getHeadRegionSize();
  1626. }
  1627. virtual void setHeadRegionSize(unsigned newSize)
  1628. {
  1629. #ifdef ROXIE_SLA_LOGIC
  1630. slaQueue.setHeadRegionSize(newSize);
  1631. #endif
  1632. hiQueue.setHeadRegionSize(newSize);
  1633. loQueue.setHeadRegionSize(newSize);
  1634. }
  1635. virtual void start()
  1636. {
  1637. loQueue.start();
  1638. hiQueue.start();
  1639. #ifdef ROXIE_SLA_LOGIC
  1640. slaQueue.start();
  1641. #endif
  1642. }
  1643. virtual void stop()
  1644. {
  1645. loQueue.stopAll();
  1646. hiQueue.stopAll();
  1647. #ifdef ROXIE_SLA_LOGIC
  1648. slaQueue.stopAll();
  1649. #endif
  1650. }
  1651. virtual void join()
  1652. {
  1653. loQueue.join();
  1654. hiQueue.join();
  1655. #ifdef ROXIE_SLA_LOGIC
  1656. slaQueue.join();
  1657. #endif
  1658. }
  1659. IArrayOf<CallbackEntry> callbacks;
  1660. CriticalSection callbacksCrit;
  1661. virtual IPendingCallback *notePendingCallback(const RoxiePacketHeader &header, const char *lfn)
  1662. {
  1663. CriticalBlock b(callbacksCrit);
  1664. CallbackEntry *callback = new CallbackEntry(header, lfn);
  1665. callbacks.append(*callback);
  1666. return callback;
  1667. }
  1668. virtual void removePendingCallback(IPendingCallback *goer)
  1669. {
  1670. if (goer)
  1671. {
  1672. CriticalBlock b(callbacksCrit);
  1673. callbacks.zap(static_cast<CallbackEntry &>(*goer));
  1674. }
  1675. }
  1676. protected:
  1677. void doFileCallback(IRoxieQueryPacket *packet)
  1678. {
  1679. // This is called on the main slave reader thread so needs to be as fast as possible to avoid lost packets
  1680. const char *lfn;
  1681. const char *data;
  1682. unsigned len;
  1683. RoxiePacketHeader &header = packet->queryHeader();
  1684. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK)
  1685. {
  1686. lfn = (const char *) packet->queryContextData();
  1687. unsigned namelen = strlen(lfn) + 1;
  1688. data = lfn + namelen;
  1689. len = packet->getContextLength() - namelen;
  1690. }
  1691. else
  1692. {
  1693. lfn = data = NULL; // used when query aborted
  1694. len = 0;
  1695. }
  1696. CriticalBlock b(callbacksCrit);
  1697. ForEachItemIn(idx, callbacks)
  1698. {
  1699. CallbackEntry &c = callbacks.item(idx);
  1700. if (c.matches(header, lfn))
  1701. {
  1702. if (traceLevel > 10)
  1703. DBGLOG("callback return matched a waiting query");
  1704. c.doFileCallback(len, data, header.retries==QUERY_ABORTED);
  1705. }
  1706. }
  1707. }
  1708. };
  1709. #ifdef _MSC_VER
  1710. #pragma warning ( push )
  1711. #pragma warning ( disable: 4355 )
  1712. #endif
  1713. class RoxieThrottledPacketSender : public Thread
  1714. {
  1715. TokenBucket &bucket;
  1716. InterruptableSemaphore queued;
  1717. Semaphore started;
  1718. unsigned maxPacketSize;
  1719. SafeQueueOf<IRoxieQueryPacket, false> queue;
  1720. class DECL_EXCEPTION StoppedException: public IException, public CInterface
  1721. {
  1722. public:
  1723. IMPLEMENT_IINTERFACE;
  1724. int errorCode() const { return 0; }
  1725. StringBuffer & errorMessage(StringBuffer &str) const { return str.append("Stopped"); }
  1726. MessageAudience errorAudience() const { return MSGAUD_user; }
  1727. };
  1728. void enqueue(IRoxieQueryPacket *packet)
  1729. {
  1730. packet->Link();
  1731. queue.enqueue(packet);
  1732. queued.signal();
  1733. }
  1734. IRoxieQueryPacket *dequeue()
  1735. {
  1736. queued.wait();
  1737. return queue.dequeue();
  1738. }
  1739. public:
  1740. RoxieThrottledPacketSender(TokenBucket &_bucket, unsigned _maxPacketSize)
  1741. : Thread("RoxieThrottledPacketSender"), bucket(_bucket), maxPacketSize(_maxPacketSize)
  1742. {
  1743. start();
  1744. started.wait();
  1745. }
  1746. ~RoxieThrottledPacketSender()
  1747. {
  1748. stop();
  1749. join();
  1750. }
  1751. virtual int run()
  1752. {
  1753. started.signal();
  1754. for (;;)
  1755. {
  1756. try
  1757. {
  1758. Owned<IRoxieQueryPacket> packet = dequeue();
  1759. RoxiePacketHeader &header = packet->queryHeader();
  1760. unsigned length = packet->queryHeader().packetlength;
  1761. {
  1762. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  1763. bucket.wait((length / 1024) + 1);
  1764. }
  1765. if (channelWrite(header.channel, &header, length) != length)
  1766. DBGLOG("multicast write wrote too little");
  1767. packetsSent++;
  1768. }
  1769. catch (StoppedException *E)
  1770. {
  1771. E->Release();
  1772. break;
  1773. }
  1774. catch (IException *E)
  1775. {
  1776. EXCLOG(E);
  1777. E->Release();
  1778. }
  1779. catch (...)
  1780. {
  1781. }
  1782. }
  1783. return 0;
  1784. }
  1785. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  1786. {
  1787. RoxiePacketHeader &header = x->queryHeader();
  1788. unsigned length = x->queryHeader().packetlength;
  1789. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  1790. switch (header.retries & ROXIE_RETRIES_MASK)
  1791. {
  1792. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  1793. {
  1794. StringBuffer s;
  1795. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  1796. }
  1797. break;
  1798. default:
  1799. {
  1800. StringBuffer s;
  1801. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  1802. }
  1803. break;
  1804. case 0:
  1805. if (logctx.queryTraceLevel() > 8)
  1806. {
  1807. StringBuffer s;
  1808. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  1809. }
  1810. break;
  1811. }
  1812. if (length > maxPacketSize)
  1813. {
  1814. StringBuffer s;
  1815. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  1816. }
  1817. enqueue(x);
  1818. }
  1819. void stop()
  1820. {
  1821. // bucket.stop();
  1822. queued.interrupt(new StoppedException);
  1823. }
  1824. };
  1825. class RoxieSocketQueueManager : public RoxieReceiverBase
  1826. {
  1827. unsigned maxPacketSize;
  1828. bool running;
  1829. Linked<ISendManager> sendManager;
  1830. Linked<IReceiveManager> receiveManager;
  1831. Owned<TokenBucket> bucket;
  1832. class ReceiverThread : public Thread
  1833. {
  1834. RoxieSocketQueueManager &parent;
  1835. public:
  1836. ReceiverThread(RoxieSocketQueueManager &_parent) : Thread("RoxieSocketQueueManager"), parent(_parent) {}
  1837. int run()
  1838. {
  1839. // Raise the priority so ibyti's get through in a timely fashion
  1840. #ifdef __linux__
  1841. setLinuxThreadPriority(3);
  1842. #else
  1843. adjustPriority(1);
  1844. #endif
  1845. return parent.run();
  1846. }
  1847. } readThread;
  1848. public:
  1849. RoxieSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), readThread(*this)
  1850. {
  1851. int udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
  1852. int udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
  1853. int udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
  1854. maxPacketSize = multicastSocket->get_max_send_size();
  1855. if ((maxPacketSize==0)||(maxPacketSize>65535))
  1856. maxPacketSize = 65535;
  1857. if (topology->getPropInt("@sendMaxRate", 0))
  1858. {
  1859. unsigned sendMaxRate = topology->getPropInt("@sendMaxRate");
  1860. unsigned sendMaxRatePeriod = topology->getPropInt("@sendMaxRatePeriod", 1);
  1861. bucket.setown(new TokenBucket(sendMaxRate, sendMaxRatePeriod, sendMaxRate));
  1862. throttledPacketSendManager.setown(new RoxieThrottledPacketSender(*bucket, maxPacketSize));
  1863. }
  1864. IpAddress snifferIp;
  1865. getChannelIp(snifferIp, snifferChannel);
  1866. if (udpMaxSlotsPerClient > udpQueueSize)
  1867. udpMaxSlotsPerClient = udpQueueSize;
  1868. unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
  1869. unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
  1870. unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
  1871. unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
  1872. receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient, myNodeIndex));
  1873. sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, myNodeIndex));
  1874. running = false;
  1875. }
  1876. Owned<RoxieThrottledPacketSender> throttledPacketSendManager;
  1877. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  1878. {
  1879. if (throttledPacketSendManager)
  1880. throttledPacketSendManager->sendPacket(x, logctx);
  1881. else
  1882. {
  1883. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendPacket");
  1884. RoxiePacketHeader &header = x->queryHeader();
  1885. unsigned length = x->queryHeader().packetlength;
  1886. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  1887. StringBuffer s;
  1888. switch (header.retries & ROXIE_RETRIES_MASK)
  1889. {
  1890. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  1891. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  1892. break;
  1893. default:
  1894. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  1895. break;
  1896. case 0:
  1897. if (logctx.queryTraceLevel() > 8)
  1898. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  1899. break;
  1900. }
  1901. if (length > maxPacketSize)
  1902. {
  1903. StringBuffer s;
  1904. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  1905. }
  1906. if (channelWrite(header.channel, &header, length) != length)
  1907. logctx.CTXLOG("multicast write wrote too little");
  1908. packetsSent++;
  1909. }
  1910. }
  1911. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1912. {
  1913. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendIbyti");
  1914. RoxiePacketHeader ibytiHeader(header, header.activityId & ROXIE_PRIORITY_MASK);
  1915. if (logctx.queryTraceLevel() > 8)
  1916. {
  1917. StringBuffer s; logctx.CTXLOG("Sending IBYTI packet %s", ibytiHeader.toString(s).str());
  1918. }
  1919. if (channelWrite(header.channel, &ibytiHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
  1920. logctx.CTXLOG("sendIbyti wrote too little");
  1921. ibytiPacketsSent++;
  1922. }
  1923. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1924. {
  1925. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbort");
  1926. RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK);
  1927. abortHeader.retries = QUERY_ABORTED;
  1928. if (logctx.queryTraceLevel() > 8)
  1929. {
  1930. StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
  1931. }
  1932. if (channelWrite(header.channel, &abortHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
  1933. logctx.CTXLOG("sendAbort wrote too little");
  1934. abortsSent++;
  1935. }
  1936. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx)
  1937. {
  1938. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbortCallback");
  1939. RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK);
  1940. abortHeader.retries = QUERY_ABORTED;
  1941. MemoryBuffer data;
  1942. data.append(sizeof(abortHeader), &abortHeader).append(lfn);
  1943. if (logctx.queryTraceLevel() > 5)
  1944. {
  1945. StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
  1946. }
  1947. if (channelWrite(header.channel, data.toByteArray(), data.length()) != data.length())
  1948. logctx.CTXLOG("tr->write wrote too little");
  1949. abortsSent++;
  1950. }
  1951. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
  1952. {
  1953. unsigned qnum = outOfBand ? 0 : ((header.retries & ROXIE_FASTLANE) || !fastLaneQueue) ? 1 : 2;
  1954. if (logctx.queryTraceLevel() > 8)
  1955. {
  1956. StringBuffer s; logctx.CTXLOG("Creating Output Stream for reply packet on Q=%d - %s", qnum, header.toString(s).str());
  1957. }
  1958. return sendManager->createMessagePacker(header.uid, header.getSequenceId(), &header, sizeof(RoxiePacketHeader), header.serverIdx, qnum);
  1959. }
  1960. virtual bool replyPending(RoxiePacketHeader &header)
  1961. {
  1962. return sendManager->dataQueued(header.uid, header.getSequenceId(), header.serverIdx);
  1963. }
  1964. virtual bool abortCompleted(RoxiePacketHeader &header)
  1965. {
  1966. return sendManager->abortData(header.uid, header.getSequenceId(), header.serverIdx);
  1967. }
  1968. bool abortRunning(RoxiePacketHeader &header, RoxieQueue &queue, bool checkRank, bool &preActivity)
  1969. {
  1970. bool queryFound = false;
  1971. bool ret = false;
  1972. Owned<IPooledThreadIterator> wi = queue.running();
  1973. ForEach(*wi)
  1974. {
  1975. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1976. if (w.checkAbort(header, checkRank, queryFound, preActivity))
  1977. {
  1978. ret = true;
  1979. break;
  1980. }
  1981. else if (queryFound)
  1982. {
  1983. ret = false;
  1984. break;
  1985. }
  1986. }
  1987. if (!checkRank)
  1988. {
  1989. if (traceLevel > 8)
  1990. DBGLOG("discarding data for aborted query");
  1991. ROQ->abortCompleted(header);
  1992. }
  1993. return ret;
  1994. }
  1995. void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue)
  1996. {
  1997. assertex(!localSlave);
  1998. bool preActivity = false;
  1999. if (traceLevel > 10)
  2000. {
  2001. IpAddress peer;
  2002. StringBuffer s, s1;
  2003. multicastSocket->getPeerAddress(peer).getIpText(s);
  2004. header.toString(s1);
  2005. DBGLOG("doIBYTI %s from %s", s1.str(), s.str());
  2006. DBGLOG("header.retries=%x header.getSubChannelMask(header.channel)=%x", header.retries, header.getSubChannelMask(header.channel));
  2007. }
  2008. if (header.retries == QUERY_ABORTED)
  2009. {
  2010. abortRunning(header, queue, false, preActivity);
  2011. queue.remove(header);
  2012. if (traceLevel > 10)
  2013. {
  2014. StringBuffer s;
  2015. DBGLOG("Abort activity %s", header.toString(s).str());
  2016. }
  2017. }
  2018. else
  2019. {
  2020. ibytiPacketsReceived++;
  2021. unsigned subChannel = header.getRespondingSubChannel();
  2022. if (subChannel == subChannels[header.channel] - 1)
  2023. {
  2024. if (traceLevel > 10)
  2025. DBGLOG("doIBYTI packet was from self");
  2026. ibytiPacketsFromSelf++;
  2027. }
  2028. else
  2029. {
  2030. channelInfo[header.channel].noteChannelHealthy(subChannel);
  2031. bool foundInQ = queue.remove(header);
  2032. if (foundInQ)
  2033. {
  2034. if (traceLevel > 10)
  2035. {
  2036. StringBuffer s;
  2037. DBGLOG("Removed activity from Q : %s", header.toString(s).str());
  2038. }
  2039. ibytiPacketsWorked++;
  2040. return;
  2041. }
  2042. if (abortRunning(header, queue, true, preActivity))
  2043. {
  2044. if (preActivity)
  2045. ibytiPacketsWorked++;
  2046. else
  2047. ibytiPacketsHalfWorked++;
  2048. return;
  2049. }
  2050. if (traceLevel > 10)
  2051. DBGLOG("doIBYTI packet was too late");
  2052. ibytiPacketsTooLate++; // meaning either I started and reserve the right to finish, or I finished already
  2053. }
  2054. }
  2055. }
  2056. void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue)
  2057. {
  2058. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  2059. // DO NOT put tracing on this thread except at very high tracelevels!
  2060. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  2061. {
  2062. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  2063. if (traceLevel > 10)
  2064. {
  2065. StringBuffer s;
  2066. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  2067. }
  2068. doFileCallback(packet);
  2069. }
  2070. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
  2071. doIbyti(header, queue); // MORE - check how fast this is!
  2072. else
  2073. {
  2074. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  2075. SlaveContextLogger logctx(packet);
  2076. unsigned retries = header.thisChannelRetries();
  2077. if (retries)
  2078. {
  2079. // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
  2080. assertex(header.channel); // should never see a retry on channel 0
  2081. if (retries >= SUBCHANNEL_MASK)
  2082. return; // someone sent a failure or something - ignore it
  2083. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  2084. if (!(testSlaveFailure & 0x800))
  2085. {
  2086. RoxiePacketHeader newHeader(header, ROXIE_ALIVE);
  2087. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  2088. output->flush(true);
  2089. }
  2090. // If it's a retry, look it up against already running, or output stream, or input queue
  2091. // if found, send an IBYTI and discard retry request
  2092. bool primChannel = true;
  2093. if (subChannels[header.channel] != 1) primChannel = false;
  2094. if (primChannel)
  2095. retriesReceivedPrm++;
  2096. else
  2097. retriesReceivedSec++;
  2098. bool alreadyRunning = false;
  2099. Owned<IPooledThreadIterator> wi = queue.running();
  2100. ForEach(*wi)
  2101. {
  2102. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  2103. if (w.match(header))
  2104. {
  2105. alreadyRunning = true;
  2106. if (primChannel)
  2107. retriesIgnoredPrm++;
  2108. else
  2109. retriesIgnoredSec++;
  2110. ROQ->sendIbyti(header, logctx);
  2111. if (logctx.queryTraceLevel() > 10)
  2112. {
  2113. StringBuffer xx; logctx.CTXLOG("Ignored retry on %s channel for running activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  2114. }
  2115. break;
  2116. }
  2117. }
  2118. if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
  2119. {
  2120. alreadyRunning = true;
  2121. if (primChannel)
  2122. retriesIgnoredPrm++;
  2123. else
  2124. retriesIgnoredSec++;
  2125. ROQ->sendIbyti(header, logctx);
  2126. if (logctx.queryTraceLevel() > 10)
  2127. {
  2128. StringBuffer xx; logctx.CTXLOG("Ignored retry on %s channel for completed activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  2129. }
  2130. }
  2131. if (!alreadyRunning)
  2132. {
  2133. if (logctx.queryTraceLevel() > 10)
  2134. {
  2135. StringBuffer xx; logctx.CTXLOG("Retry %d received on %s channel for %s", retries+1, primChannel?"primary":"secondary", header.toString(xx).str());
  2136. }
  2137. queue.enqueueUnique(packet.getClear());
  2138. }
  2139. }
  2140. else // first time (not a retry).
  2141. {
  2142. if (header.channel)
  2143. {
  2144. queue.enqueue(packet.getClear());
  2145. }
  2146. else
  2147. {
  2148. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  2149. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  2150. // Unfortunately this is bad news for dropping packets
  2151. for (unsigned i = 1; i < channelCount; i++)
  2152. queue.enqueue(packet->clonePacket(channels[i]));
  2153. header.channel = channels[0];
  2154. queue.enqueue(packet.getClear());
  2155. }
  2156. }
  2157. }
  2158. }
  2159. int run()
  2160. {
  2161. if (traceLevel)
  2162. DBGLOG("RoxieSocketQueueManager::run() starting: doIbytiDelay=%s minIbytiDelay=%u initIbytiDelay=%u",
  2163. doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay);
  2164. for (;;)
  2165. {
  2166. MemoryBuffer mb;
  2167. try
  2168. {
  2169. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  2170. // DO NOT put tracing on this thread except at very high tracelevels!
  2171. unsigned l;
  2172. multicastSocket->read(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, 5);
  2173. mb.setLength(l);
  2174. packetsReceived++;
  2175. RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
  2176. if (l != header.packetlength)
  2177. DBGLOG("sock->read returned %d but packetlength was %d", l, header.packetlength);
  2178. if (traceLevel > 10)
  2179. {
  2180. StringBuffer s;
  2181. DBGLOG("Read from multicast: %s", header.toString(s).str());
  2182. }
  2183. #ifdef ROXIE_SLA_LOGIC
  2184. if (header.activityId & ROXIE_SLA_PRIORITY)
  2185. processMessage(mb, header, slaQueue);
  2186. else
  2187. #endif
  2188. if (header.activityId & ROXIE_HIGH_PRIORITY)
  2189. processMessage(mb, header, hiQueue);
  2190. else
  2191. processMessage(mb, header, loQueue);
  2192. }
  2193. catch (IException *E)
  2194. {
  2195. if (running)
  2196. {
  2197. // MORE: Maybe we should utilize IException::errorCode - not just text ??
  2198. if (E->errorCode()==JSOCKERR_timeout_expired)
  2199. E->Release();
  2200. else if (roxiemem::memPoolExhausted())
  2201. {
  2202. //MORE: I think this should probably be based on the error code instead.
  2203. EXCLOG(E, "Exception reading or processing multicast msg");
  2204. E->Release();
  2205. MilliSleep(1000); // Give a chance for mem free
  2206. }
  2207. else
  2208. {
  2209. EXCLOG(E, "Exception reading or processing multicast msg");
  2210. E->Release();
  2211. // MORE: Protect with try logic, in case udp_create throws exception ?
  2212. // What to do if create fails (ie exception is caught) ?
  2213. if (multicastSocket)
  2214. {
  2215. multicastSocket->close();
  2216. multicastSocket.clear();
  2217. openMulticastSocket();
  2218. }
  2219. }
  2220. }
  2221. else
  2222. {
  2223. E->Release();
  2224. break;
  2225. }
  2226. }
  2227. }
  2228. return 0;
  2229. }
  2230. void start()
  2231. {
  2232. RoxieReceiverBase::start();
  2233. running = true;
  2234. readThread.start();
  2235. }
  2236. void stop()
  2237. {
  2238. if (running)
  2239. {
  2240. running = false;
  2241. multicastSocket->close();
  2242. }
  2243. RoxieReceiverBase::stop();
  2244. }
  2245. void join()
  2246. {
  2247. readThread.join();
  2248. RoxieReceiverBase::join();
  2249. }
  2250. virtual IReceiveManager *queryReceiveManager()
  2251. {
  2252. return receiveManager;
  2253. }
  2254. };
  2255. #ifdef _MSC_VER
  2256. #pragma warning( pop )
  2257. #endif
  2258. //==================================================================================================
  2259. interface ILocalMessageCollator : extends IMessageCollator
  2260. {
  2261. virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0;
  2262. };
  2263. interface ILocalReceiveManager : extends IReceiveManager
  2264. {
  2265. virtual ILocalMessageCollator *lookupCollator(ruid_t id) = 0;
  2266. };
  2267. class LocalMessagePacker : public CDummyMessagePacker
  2268. {
  2269. MemoryBuffer meta;
  2270. MemoryBuffer header;
  2271. Linked<ILocalReceiveManager> rm;
  2272. ruid_t id;
  2273. bool outOfBand;
  2274. public:
  2275. IMPLEMENT_IINTERFACE;
  2276. LocalMessagePacker(RoxiePacketHeader &_header, bool _outOfBand, ILocalReceiveManager *_rm) : rm(_rm), outOfBand(_outOfBand)
  2277. {
  2278. id = _header.uid;
  2279. header.append(sizeof(RoxiePacketHeader), &_header);
  2280. }
  2281. virtual void flush(bool last_message);
  2282. virtual void sendMetaInfo(const void *buf, unsigned len)
  2283. {
  2284. meta.append(len, buf);
  2285. }
  2286. };
  2287. class CLocalMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface
  2288. {
  2289. void *data;
  2290. unsigned datalen;
  2291. unsigned pos;
  2292. Linked<IRowManager> rowManager;
  2293. public:
  2294. IMPLEMENT_IINTERFACE;
  2295. CLocalMessageUnpackCursor(IRowManager *_rowManager, void *_data, unsigned _datalen)
  2296. : rowManager(_rowManager)
  2297. {
  2298. datalen = _datalen;
  2299. data = _data;
  2300. pos = 0;
  2301. }
  2302. ~CLocalMessageUnpackCursor()
  2303. {
  2304. }
  2305. virtual bool atEOF() const
  2306. {
  2307. return datalen==pos;
  2308. }
  2309. virtual bool isSerialized() const
  2310. {
  2311. // NOTE: tempting to think that we could avoid serializing in localSlave case, but have to be careful about the lifespan of the rowManager...
  2312. return true;
  2313. }
  2314. virtual const void * getNext(int length)
  2315. {
  2316. if (pos==datalen)
  2317. return NULL;
  2318. assertex(pos + length <= datalen);
  2319. void * cur = ((char *) data) + pos;
  2320. pos += length;
  2321. void * ret = rowManager->allocate(length, 0);
  2322. memcpy(ret, cur, length);
  2323. //No need for finalize since only contains plain data.
  2324. return ret;
  2325. }
  2326. };
  2327. class CLocalMessageResult : implements IMessageResult, public CInterface
  2328. {
  2329. void *data;
  2330. void *meta;
  2331. void *header;
  2332. unsigned datalen, metalen, headerlen;
  2333. unsigned pos;
  2334. public:
  2335. IMPLEMENT_IINTERFACE;
  2336. CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen)
  2337. {
  2338. datalen = _datalen;
  2339. metalen = _metalen;
  2340. headerlen = _headerlen;
  2341. data = _data;
  2342. meta = _meta;
  2343. header = _header;
  2344. pos = 0;
  2345. }
  2346. ~CLocalMessageResult()
  2347. {
  2348. free(data);
  2349. free(meta);
  2350. free(header);
  2351. }
  2352. virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
  2353. {
  2354. return new CLocalMessageUnpackCursor(rowMgr, data, datalen);
  2355. }
  2356. virtual const void *getMessageHeader(unsigned &length) const
  2357. {
  2358. length = headerlen;
  2359. return header;
  2360. }
  2361. virtual const void *getMessageMetadata(unsigned &length) const
  2362. {
  2363. length = metalen;
  2364. return meta;
  2365. }
  2366. virtual void discard() const
  2367. {
  2368. }
  2369. };
  2370. class CLocalMessageCollator : implements ILocalMessageCollator, public CInterface
  2371. {
  2372. InterruptableSemaphore sem;
  2373. QueueOf<IMessageResult, false> pending;
  2374. CriticalSection crit;
  2375. Linked<IRowManager> rowManager;
  2376. Linked<ILocalReceiveManager> receiveManager;
  2377. ruid_t id;
  2378. unsigned totalBytesReceived;
  2379. public:
  2380. IMPLEMENT_IINTERFACE;
  2381. CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid);
  2382. ~CLocalMessageCollator();
  2383. virtual ruid_t queryRUID() const
  2384. {
  2385. return id;
  2386. }
  2387. virtual bool add_package(DataBuffer *dataBuff)
  2388. {
  2389. throwUnexpected(); // internal use in udp layer...
  2390. }
  2391. virtual IMessageResult* getNextResult(unsigned time_out, bool &anyActivity)
  2392. {
  2393. anyActivity = false;
  2394. if (!sem.wait(time_out))
  2395. return NULL;
  2396. anyActivity = true;
  2397. CriticalBlock c(crit);
  2398. return pending.dequeue();
  2399. }
  2400. virtual void interrupt(IException *E)
  2401. {
  2402. sem.interrupt(E);
  2403. }
  2404. virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen)
  2405. {
  2406. CriticalBlock c(crit);
  2407. if (outOfBand)
  2408. pending.enqueueHead(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
  2409. else
  2410. pending.enqueue(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
  2411. sem.signal();
  2412. totalBytesReceived += datalen + metalen + headerlen;
  2413. }
  2414. virtual unsigned queryBytesReceived() const
  2415. {
  2416. return totalBytesReceived;
  2417. }
  2418. };
  2419. class RoxieLocalReceiveManager : implements ILocalReceiveManager, public CInterface
  2420. {
  2421. MapXToMyClass<ruid_t, ruid_t, ILocalMessageCollator> collators;
  2422. CriticalSection crit;
  2423. Owned<StringContextLogger> logctx;
  2424. Linked<IMessageCollator> defaultCollator;
  2425. public:
  2426. IMPLEMENT_IINTERFACE;
  2427. RoxieLocalReceiveManager() : logctx(new StringContextLogger("RoxieLocalReceiveManager"))
  2428. {
  2429. }
  2430. virtual IMessageCollator *createMessageCollator(IRowManager *manager, ruid_t ruid)
  2431. {
  2432. IMessageCollator *collator = new CLocalMessageCollator(manager, ruid);
  2433. CriticalBlock b(crit);
  2434. collators.setValue(ruid, collator);
  2435. return collator;
  2436. }
  2437. virtual void detachCollator(const IMessageCollator *collator)
  2438. {
  2439. ruid_t id = collator->queryRUID();
  2440. CriticalBlock b(crit);
  2441. collators.setValue(id, NULL);
  2442. }
  2443. virtual void setDefaultCollator(IMessageCollator *collator)
  2444. {
  2445. CriticalBlock b(crit);
  2446. defaultCollator.set(collator);
  2447. }
  2448. virtual ILocalMessageCollator *lookupCollator(ruid_t id)
  2449. {
  2450. CriticalBlock b(crit);
  2451. IMessageCollator *ret = collators.getValue(id);
  2452. if (!ret)
  2453. ret = defaultCollator;
  2454. return LINK(QUERYINTERFACE(ret, ILocalMessageCollator));
  2455. }
  2456. };
  2457. void LocalMessagePacker::flush(bool last_message)
  2458. {
  2459. data.setLength(lastput);
  2460. if (last_message)
  2461. {
  2462. Owned<ILocalMessageCollator> collator = rm->lookupCollator(id);
  2463. if (collator)
  2464. {
  2465. unsigned datalen = data.length();
  2466. unsigned metalen = meta.length();
  2467. unsigned headerlen = header.length();
  2468. collator->enqueueMessage(outOfBand, data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen);
  2469. }
  2470. // otherwise Roxie server is no longer interested and we can simply discard
  2471. }
  2472. }
  2473. CLocalMessageCollator::CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid)
  2474. : rowManager(_rowManager), id(_ruid)
  2475. {
  2476. totalBytesReceived = 0;
  2477. }
  2478. CLocalMessageCollator::~CLocalMessageCollator()
  2479. {
  2480. IMessageResult *goer;
  2481. for (;;)
  2482. {
  2483. goer = pending.dequeue();
  2484. if (!goer)
  2485. break;
  2486. goer->Release();
  2487. }
  2488. }
  2489. class RoxieLocalQueueManager : public RoxieReceiverBase
  2490. {
  2491. Linked<RoxieLocalReceiveManager> receiveManager;
  2492. public:
  2493. RoxieLocalQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieReceiverBase(_numWorkers)
  2494. {
  2495. receiveManager.setown(new RoxieLocalReceiveManager);
  2496. }
  2497. virtual bool suspendChannel(unsigned channel, bool suspend, const IRoxieContextLogger &logctx)
  2498. {
  2499. if (suspend)
  2500. UNIMPLEMENTED;
  2501. return false;
  2502. }
  2503. virtual void sendPacket(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  2504. {
  2505. RoxiePacketHeader &header = packet->queryHeader();
  2506. unsigned retries = header.thisChannelRetries();
  2507. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  2508. {
  2509. if (traceLevel > 5)
  2510. {
  2511. StringBuffer s;
  2512. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  2513. }
  2514. doFileCallback(packet);
  2515. }
  2516. else if (retries < SUBCHANNEL_MASK)
  2517. {
  2518. if (retries)
  2519. {
  2520. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  2521. RoxiePacketHeader newHeader(header, ROXIE_ALIVE);
  2522. Owned<IMessagePacker> output = createOutputStream(newHeader, true, logctx);
  2523. output->flush(true);
  2524. return; // No point sending the retry in localSlave mode
  2525. }
  2526. RoxieQueue *targetQueue;
  2527. #ifdef ROXIE_SLA_LOGIC
  2528. if (header.activityId & ROXIE_SLA_PRIORITY)
  2529. targetQueue = &slaQueue;
  2530. else
  2531. #endif
  2532. if (header.activityId & ROXIE_HIGH_PRIORITY)
  2533. targetQueue = &hiQueue;
  2534. else
  2535. targetQueue = &loQueue;
  2536. if (header.channel)
  2537. {
  2538. targetQueue->enqueue(LINK(packet));
  2539. }
  2540. else
  2541. {
  2542. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  2543. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  2544. for (unsigned i = 0; i < channelCount; i++)
  2545. {
  2546. targetQueue->enqueue(packet->clonePacket(channels[i]));
  2547. }
  2548. }
  2549. }
  2550. }
  2551. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  2552. {
  2553. // Don't do IBYTI's when local slave - no buddy to talk to anyway
  2554. }
  2555. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  2556. {
  2557. MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbort");
  2558. RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK);
  2559. abortHeader.retries = QUERY_ABORTED;
  2560. if (logctx.queryTraceLevel() > 8)
  2561. {
  2562. StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
  2563. }
  2564. MemoryBuffer data;
  2565. data.append(sizeof(abortHeader), &abortHeader);
  2566. Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
  2567. sendPacket(packet, logctx);
  2568. abortsSent++;
  2569. }
  2570. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx)
  2571. {
  2572. MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbortCallback");
  2573. RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK);
  2574. abortHeader.retries = QUERY_ABORTED;
  2575. MemoryBuffer data;
  2576. data.append(sizeof(abortHeader), &abortHeader).append(lfn);
  2577. if (logctx.queryTraceLevel() > 5)
  2578. {
  2579. StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
  2580. }
  2581. Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
  2582. sendPacket(packet, logctx);
  2583. abortsSent++;
  2584. }
  2585. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
  2586. {
  2587. return new LocalMessagePacker(header, outOfBand, receiveManager);
  2588. }
  2589. virtual IReceiveManager *queryReceiveManager()
  2590. {
  2591. return receiveManager;
  2592. }
  2593. virtual bool replyPending(RoxiePacketHeader &header)
  2594. {
  2595. // MORE - should really have some code here! But returning true is a reasonable approximation.
  2596. return true;
  2597. }
  2598. virtual bool abortCompleted(RoxiePacketHeader &header)
  2599. {
  2600. // MORE - should really have some code here!
  2601. return false;
  2602. }
  2603. };
  2604. IRoxieOutputQueueManager *ROQ;
  2605. extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers)
  2606. {
  2607. if (localSlave)
  2608. return new RoxieLocalQueueManager(snifferChannel, numWorkers);
  2609. else
  2610. return new RoxieSocketQueueManager(snifferChannel, numWorkers);
  2611. }
  2612. //================================================================================================================================
  2613. class PacketDiscarder : public Thread, implements IPacketDiscarder
  2614. {
  2615. bool aborted;
  2616. Owned<IRowManager> rowManager; // not completely sure I need one... maybe I do
  2617. Owned<IMessageCollator> mc;
  2618. public:
  2619. IMPLEMENT_IINTERFACE;
  2620. PacketDiscarder()
  2621. {
  2622. aborted = false;
  2623. };
  2624. ~PacketDiscarder()
  2625. {
  2626. if (mc)
  2627. ROQ->queryReceiveManager()->detachCollator(mc);
  2628. mc.clear();
  2629. }
  2630. virtual int run()
  2631. {
  2632. Owned<StringContextLogger> logctx = new StringContextLogger("PacketDiscarder");
  2633. rowManager.setown(roxiemem::createRowManager(0, NULL, *logctx, NULL, false));
  2634. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  2635. ROQ->queryReceiveManager()->setDefaultCollator(mc);
  2636. while (!aborted)
  2637. {
  2638. bool anyActivity = false;
  2639. Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
  2640. if (mr)
  2641. {
  2642. if (traceLevel > 4)
  2643. DBGLOG("Discarding unwanted message");
  2644. unsigned headerLen;
  2645. const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  2646. if (headerLen)
  2647. {
  2648. switch (header.activityId)
  2649. {
  2650. case ROXIE_FILECALLBACK:
  2651. {
  2652. Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
  2653. OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
  2654. if (len)
  2655. {
  2656. RecordLengthType *rowlen = (RecordLengthType *) len.get();
  2657. OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
  2658. const char *rowdata = (const char *) row.get();
  2659. // bool isOpt = * (bool *) rowdata;
  2660. // bool isLocal = * (bool *) (rowdata+1);
  2661. ROQ->sendAbortCallback(header, rowdata+2, *logctx);
  2662. }
  2663. else
  2664. DBGLOG("Unrecognized format in discarded file callback");
  2665. break;
  2666. }
  2667. // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
  2668. }
  2669. }
  2670. else
  2671. DBGLOG("Unwanted message had no header?!");
  2672. }
  2673. else if (!anyActivity)
  2674. {
  2675. // to avoid leaking partial unwanted packets, we clear out mc periodically...
  2676. ROQ->queryReceiveManager()->detachCollator(mc);
  2677. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  2678. ROQ->queryReceiveManager()->setDefaultCollator(mc);
  2679. }
  2680. }
  2681. return 0;
  2682. }
  2683. virtual void start()
  2684. {
  2685. Thread::start();
  2686. }
  2687. virtual void stop()
  2688. {
  2689. if (mc)
  2690. mc->interrupt();
  2691. aborted = true;
  2692. join();
  2693. }
  2694. };
  2695. IPacketDiscarder *createPacketDiscarder()
  2696. {
  2697. IPacketDiscarder *packetDiscarder = new PacketDiscarder;
  2698. packetDiscarder->start();
  2699. return packetDiscarder;
  2700. }
  2701. //================================================================================================================================
  2702. // There are various possibly interesting ways to reply to a ping:
  2703. // Reply as soon as receive, or put it on the queue like other messages?
  2704. // Reply for every channel, or just once for every slave?
  2705. // Should I send on channel 0 or round-robin the channels?
  2706. // My gut feeling is that knowing what channels are responding is useful so should reply on every unsuspended channel,
  2707. // and that the delay caused by queuing system is an interesting part of what we want to measure (though nice to know minimum possible too)
  2708. unsigned pingInterval = 60;
  2709. class PingTimer : public Thread
  2710. {
  2711. bool aborted;
  2712. Owned<IRowManager> rowManager;
  2713. Owned<IMessageCollator> mc;
  2714. StringContextLogger logctx;
  2715. void sendPing(unsigned priorityMask)
  2716. {
  2717. unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen("PING") + 1 + sizeof(PingRecord);
  2718. void *packetData = malloc(packetSize);
  2719. RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
  2720. RemoteActivityId pingId(ROXIE_PING | priorityMask, 0);
  2721. header->init(pingId, 0, 0, 0);
  2722. char *finger = (char *) (header + 1);
  2723. *finger++ = (char) LOGGING_FLAGSPRESENT;
  2724. strcpy(finger, "PING");
  2725. finger += strlen("PING")+1;
  2726. if (traceLevel > 1)
  2727. DBGLOG("PING sent");
  2728. PingRecord data;
  2729. data.senderIP.ipset(getNodeAddress(myNodeIndex));
  2730. data.tick = usTick();
  2731. memcpy(finger, &data, sizeof(PingRecord));
  2732. Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
  2733. ROQ->sendPacket(packet, logctx);
  2734. }
  2735. public:
  2736. PingTimer() : logctx("PingTimer")
  2737. {
  2738. aborted = false;
  2739. };
  2740. ~PingTimer()
  2741. {
  2742. if (mc)
  2743. ROQ->queryReceiveManager()->detachCollator(mc);
  2744. mc.clear();
  2745. }
  2746. virtual int run()
  2747. {
  2748. rowManager.setown(roxiemem::createRowManager(1, NULL, queryDummyContextLogger(), NULL, false));
  2749. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_PING));
  2750. unsigned pingsReceived = 0;
  2751. unsigned pingsElapsed = 0;
  2752. sendPing(ROXIE_HIGH_PRIORITY);
  2753. while (!aborted)
  2754. {
  2755. bool anyActivity = false;
  2756. Owned<IMessageResult> mr = mc->getNextResult(pingInterval*1000, anyActivity);
  2757. if (mr)
  2758. {
  2759. unsigned headerLen;
  2760. const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  2761. Owned<IMessageUnpackCursor> mu = mr->getCursor(rowManager);
  2762. PingRecord *answer = (PingRecord *) mu->getNext(sizeof(PingRecord));
  2763. if (answer && mu->atEOF() && headerLen==sizeof(RoxiePacketHeader))
  2764. {
  2765. unsigned elapsed = usTick() - answer->tick;
  2766. pingsReceived++;
  2767. pingsElapsed += elapsed;
  2768. if (traceLevel > 10)
  2769. DBGLOG("PING reply channel=%d, time %d", header->channel, elapsed); // DBGLOG is slower than the pings so be careful!
  2770. }
  2771. else
  2772. DBGLOG("PING reply, garbled result");
  2773. ReleaseRoxieRow(answer);
  2774. }
  2775. else if (!anyActivity)
  2776. {
  2777. if (!pingsReceived && roxieMulticastEnabled)
  2778. DBGLOG("PING: NO replies received! Please check multicast settings, and that your network supports multicast.");
  2779. else if (traceLevel)
  2780. DBGLOG("PING: %d replies received, average delay %uus", pingsReceived, pingsReceived ? pingsElapsed / pingsReceived : 0);
  2781. pingsReceived = 0;
  2782. pingsElapsed = 0;
  2783. sendPing(ROXIE_HIGH_PRIORITY); // MORE - we could think about alternating the priority or sending pings on high and low at the same time...
  2784. }
  2785. }
  2786. return 0;
  2787. }
  2788. void stop()
  2789. {
  2790. if (mc)
  2791. mc->interrupt();
  2792. aborted = true;
  2793. }
  2794. static CriticalSection crit;
  2795. } *pingTimer;
  2796. CriticalSection PingTimer::crit;
  2797. extern void startPingTimer()
  2798. {
  2799. CriticalBlock b(PingTimer::crit);
  2800. if (!pingTimer)
  2801. {
  2802. pingTimer = new PingTimer();
  2803. pingTimer->start();
  2804. }
  2805. }
  2806. extern void stopPingTimer()
  2807. {
  2808. CriticalBlock b(PingTimer::crit);
  2809. if (pingTimer)
  2810. {
  2811. pingTimer->stop();
  2812. pingTimer->join();
  2813. delete pingTimer;
  2814. pingTimer = NULL;
  2815. }
  2816. }