ccdqueue.cpp 99 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include <jio.hpp>
  16. #include <jqueue.tpp>
  17. #include <jsocket.hpp>
  18. #include <jlog.hpp>
  19. #include "jisem.hpp"
  20. #include "udplib.hpp"
  21. #include "ccd.hpp"
  22. #include "ccddebug.hpp"
  23. #include "ccdquery.hpp"
  24. #include "ccdstate.hpp"
  25. #include "ccdqueue.ipp"
  26. #include "ccdsnmp.hpp"
  27. #ifdef _USE_CPPUNIT
  28. #include <cppunit/extensions/HelperMacros.h>
  29. #endif
  30. CriticalSection ibytiCrit; // CAUTION - not safe to use spinlocks as real-time thread accesses
  31. CriticalSection queueCrit;
  32. unsigned channels[MAX_CLUSTER_SIZE];
  33. unsigned channelCount;
  34. unsigned subChannels[MAX_CLUSTER_SIZE];
  35. unsigned numSlaves[MAX_CLUSTER_SIZE];
  36. unsigned replicationLevel[MAX_CLUSTER_SIZE];
  37. unsigned IBYTIDelays[MAX_CLUSTER_SIZE]; // MORE: this will cover only 2 slaves per channel, change to cover all.
  38. SpinLock suspendCrit;
  39. bool suspendedChannels[MAX_CLUSTER_SIZE];
  40. using roxiemem::OwnedRoxieRow;
  41. using roxiemem::OwnedConstRoxieRow;
  42. using roxiemem::IRowManager;
  43. using roxiemem::DataBuffer;
  44. //============================================================================================
  45. // This function maps a slave number to the multicast ip used to talk to it.
  46. IpAddress multicastBase("239.1.1.1"); // TBD IPv6 (need IPv6 multicast addresses?
  47. IpAddress multicastLast("239.1.5.254");
  48. const IpAddress &getChannelIp(IpAddress &ip, unsigned _channel)
  49. {
  50. // need to be careful to avoid the .0's and the .255's (not sure why...)
  51. ip = multicastBase;
  52. if (!ip.ipincrement(_channel,1,254,1,0xffff)
  53. ||(ip.ipcompare(multicastLast)>0))
  54. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Out-of-range multicast channel %d", _channel);
  55. return ip;
  56. }
  57. Owned<ISocket> multicastSocket;
  58. SocketEndpointArray *slaveEndpoints; // indexed by channel
  59. bool isSlaveEndpoint(unsigned channel, const IpAddress &slaveIp)
  60. {
  61. SocketEndpointArray &eps = slaveEndpoints[channel];
  62. ForEachItemIn(idx, eps)
  63. {
  64. if (eps.item(idx).ipequals(slaveIp))
  65. return true;
  66. }
  67. return false;
  68. }
  69. void joinMulticastChannel(unsigned channel)
  70. {
  71. if (roxieMulticastEnabled && !localSlave)
  72. {
  73. IpAddress multicastIp;
  74. getChannelIp(multicastIp, channel);
  75. SocketEndpoint ep(ccdMulticastPort, multicastIp);
  76. StringBuffer epStr;
  77. ep.getUrlStr(epStr);
  78. if (!multicastSocket->join_multicast_group(ep))
  79. throw MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to join multicast channel %d (%s)", channel, epStr.str());
  80. if (traceLevel)
  81. DBGLOG("Joined multicast channel %d (%s)", channel, epStr.str());
  82. }
  83. }
  84. void openMulticastSocket()
  85. {
  86. if (!multicastSocket)
  87. {
  88. multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
  89. multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
  90. size32_t actualSize = multicastSocket->get_receive_buffer_size();
  91. if (actualSize < udpMulticastBufferSize)
  92. {
  93. DBGLOG("Roxie: multicast socket buffer size could not be set (requested=%d actual %d", udpMulticastBufferSize, actualSize);
  94. throwUnexpected();
  95. }
  96. if (traceLevel)
  97. DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", ccdMulticastPort, udpMulticastBufferSize, actualSize);
  98. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
  99. ForEach(*it)
  100. {
  101. unsigned channel = it->query().getPropInt("@channel", 0);
  102. assertex(channel);
  103. joinMulticastChannel(channel);
  104. }
  105. joinMulticastChannel(0); // all slaves also listen on channel 0
  106. }
  107. }
  108. void addEndpoint(unsigned channel, const IpAddress &slaveIp, unsigned port)
  109. {
  110. if (!slaveEndpoints)
  111. slaveEndpoints = new SocketEndpointArray[numChannels + 1];
  112. IpAddress multicastIp;
  113. if (roxieMulticastEnabled)
  114. getChannelIp(multicastIp, channel);
  115. else
  116. multicastIp = slaveIp;
  117. if (!isSlaveEndpoint(channel, multicastIp))
  118. {
  119. SocketEndpoint &ep = *new SocketEndpoint(ccdMulticastPort, multicastIp);
  120. slaveEndpoints[channel].append(ep);
  121. }
  122. if (channel)
  123. addEndpoint(0, slaveIp, port);
  124. }
  125. void closeMulticastSockets()
  126. {
  127. delete[] slaveEndpoints;
  128. slaveEndpoints = NULL;
  129. multicastSocket.clear();
  130. }
  131. size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
  132. {
  133. size32_t minwrote = 0;
  134. SocketEndpointArray &eps = slaveEndpoints[channel]; // if multicast is enabled, this will have a single multicast endpoint in it.
  135. assertex(eps.ordinality());
  136. ForEachItemIn(idx, eps)
  137. {
  138. size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), buf, size);
  139. if (!idx || wrote < minwrote)
  140. minwrote = wrote;
  141. }
  142. return minwrote;
  143. }
  144. // #define TEST_SLAVE_FAILURE
  145. //============================================================================================
  146. StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
  147. {
  148. const IpAddress &serverIP = getNodeAddress(serverIdx);
  149. ret.appendf("uid="RUIDF" activityId=", uid);
  150. switch(activityId & ~ROXIE_PRIORITY_MASK)
  151. {
  152. case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
  153. case ROXIE_PING: ret.append("ROXIE_PING"); break;
  154. case ROXIE_TRACEINFO: ret.append("ROXIE_TRACEINFO"); break;
  155. case ROXIE_DEBUGREQUEST: ret.append("ROXIE_DEBUGREQUEST"); break;
  156. case ROXIE_DEBUGCALLBACK: ret.append("ROXIE_DEBUGCALLBACK"); break;
  157. case ROXIE_FILECALLBACK: ret.append("ROXIE_FILECALLBACK"); break;
  158. case ROXIE_ALIVE: ret.append("ROXIE_ALIVE"); break;
  159. case ROXIE_KEYEDLIMIT_EXCEEDED: ret.append("ROXIE_KEYEDLIMIT_EXCEEDED"); break;
  160. case ROXIE_LIMIT_EXCEEDED: ret.append("ROXIE_LIMIT_EXCEEDED"); break;
  161. case ROXIE_EXCEPTION: ret.append("ROXIE_EXCEPTION"); break;
  162. default:
  163. ret.appendf("%u", (activityId & ~(ROXIE_ACTIVITY_FETCH | ROXIE_PRIORITY_MASK)));
  164. if (activityId & ROXIE_ACTIVITY_FETCH)
  165. ret.appendf(" (fetch part)");
  166. break;
  167. }
  168. ret.append(" pri=");
  169. switch(activityId & ROXIE_PRIORITY_MASK)
  170. {
  171. case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
  172. case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
  173. case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
  174. default: ret.append("???"); break;
  175. }
  176. ret.appendf(" queryHash=%"I64F"x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
  177. serverIP.getIpText(ret);
  178. if (retries)
  179. {
  180. if (retries==QUERY_ABORTED)
  181. ret.append(" retries=QUERY_ABORTED");
  182. else
  183. {
  184. if (retries & ROXIE_RETRIES_MASK)
  185. ret.appendf(" retries=%04x", retries);
  186. if (retries & ROXIE_FASTLANE)
  187. ret.appendf(" FASTLANE");
  188. if (retries & ROXIE_BROADCAST)
  189. ret.appendf(" BROADCAST");
  190. }
  191. }
  192. return ret;
  193. }
  194. class CRoxieQueryPacket : public CInterface, implements IRoxieQueryPacket
  195. {
  196. protected:
  197. RoxiePacketHeader *data;
  198. const byte *continuationData;
  199. unsigned continuationLength;
  200. const byte *smartStepInfoData;
  201. unsigned smartStepInfoLength;
  202. const byte *contextData;
  203. unsigned contextLength;
  204. const byte *traceInfo;
  205. unsigned traceLength;
  206. public:
  207. IMPLEMENT_IINTERFACE;
  208. CRoxieQueryPacket(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
  209. {
  210. assertex(lengthRemaining >= sizeof(RoxiePacketHeader));
  211. data->packetlength = lengthRemaining;
  212. const byte *finger = (const byte *) (data + 1);
  213. lengthRemaining -= sizeof(RoxiePacketHeader);
  214. if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK)
  215. {
  216. continuationData = NULL;
  217. continuationLength = 0;
  218. smartStepInfoData = NULL;
  219. smartStepInfoLength = 0;
  220. traceInfo = NULL;
  221. traceLength = 0;
  222. }
  223. else
  224. {
  225. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  226. {
  227. assertex(lengthRemaining >= sizeof(unsigned short));
  228. continuationLength = *(unsigned short *) finger;
  229. continuationData = finger + sizeof(unsigned short);
  230. finger = continuationData + continuationLength;
  231. lengthRemaining -= continuationLength + sizeof(unsigned short);
  232. }
  233. else
  234. {
  235. continuationData = NULL;
  236. continuationLength = 0;
  237. }
  238. if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
  239. {
  240. assertex(lengthRemaining >= sizeof(unsigned short));
  241. smartStepInfoLength = *(unsigned short *) finger;
  242. smartStepInfoData = finger + sizeof(unsigned short);
  243. finger = smartStepInfoData + smartStepInfoLength;
  244. lengthRemaining -= smartStepInfoLength + sizeof(unsigned short);
  245. }
  246. else
  247. {
  248. smartStepInfoData = NULL;
  249. smartStepInfoLength = 0;
  250. }
  251. assertex(lengthRemaining > 1);
  252. traceInfo = finger;
  253. lengthRemaining--;
  254. if (*finger++ & LOGGING_DEBUGGERACTIVE)
  255. {
  256. assertex(lengthRemaining >= sizeof(unsigned short));
  257. unsigned short debugLen = *(unsigned short *) finger;
  258. finger += debugLen + sizeof(unsigned short);
  259. lengthRemaining -= debugLen + sizeof(unsigned short);
  260. }
  261. loop
  262. {
  263. assertex(lengthRemaining>0);
  264. if (!*finger)
  265. {
  266. lengthRemaining--;
  267. finger++;
  268. break;
  269. }
  270. lengthRemaining--;
  271. finger++;
  272. }
  273. traceLength = finger - traceInfo;
  274. }
  275. assertex(lengthRemaining >= 0);
  276. contextData = finger;
  277. contextLength = lengthRemaining;
  278. }
  279. ~CRoxieQueryPacket()
  280. {
  281. free(data);
  282. }
  283. virtual RoxiePacketHeader &queryHeader() const
  284. {
  285. return *data;
  286. }
  287. virtual const void *queryContinuationData() const
  288. {
  289. return continuationData;
  290. }
  291. virtual unsigned getContinuationLength() const
  292. {
  293. return continuationLength;
  294. }
  295. virtual const byte *querySmartStepInfoData() const
  296. {
  297. return smartStepInfoData;
  298. }
  299. virtual unsigned getSmartStepInfoLength() const
  300. {
  301. return smartStepInfoLength;
  302. }
  303. virtual const byte *queryTraceInfo() const
  304. {
  305. return traceInfo;
  306. }
  307. virtual unsigned getTraceLength() const
  308. {
  309. return traceLength;
  310. }
  311. virtual const void *queryContextData() const
  312. {
  313. return contextData;
  314. }
  315. virtual unsigned getContextLength() const
  316. {
  317. return contextLength;
  318. }
  319. virtual IRoxieQueryPacket *clonePacket(unsigned channel) const
  320. {
  321. unsigned length = data->packetlength;
  322. RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
  323. memcpy(newdata, data, length);
  324. newdata->channel = channel;
  325. newdata->retries |= ROXIE_BROADCAST;
  326. return createRoxiePacket(newdata, length);
  327. }
  328. virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const
  329. {
  330. assertex((data->continueSequence & CONTINUE_SEQUENCE_SKIPTO) == 0); // Should not already be any skipto info in the source packet
  331. unsigned newDataSize = data->packetlength + sizeof(unsigned short) + skipDataLen;
  332. char *newdata = (char *) malloc(newDataSize);
  333. unsigned headSize = sizeof(RoxiePacketHeader);
  334. if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
  335. headSize += sizeof(unsigned short) + continuationLength;
  336. memcpy(newdata, data, headSize); // copy in leading part of old data
  337. ((RoxiePacketHeader *) newdata)->continueSequence |= CONTINUE_SEQUENCE_SKIPTO; // set flag indicating new data is present
  338. *(unsigned short *) (newdata + headSize) = skipDataLen; // add length field for new data
  339. memcpy(newdata + headSize + sizeof(unsigned short), skipData, skipDataLen); // copy in new data
  340. memcpy(newdata + headSize + sizeof(unsigned short) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
  341. return createRoxiePacket(newdata, newDataSize);
  342. }
  343. virtual unsigned hash() const
  344. {
  345. // This is used for Roxie server-side caching. The hash includes some of the header and all of the payload.
  346. unsigned hash = 0;
  347. if (continuationLength)
  348. hash = hashc((const unsigned char *) continuationData, continuationLength, hash);
  349. if (smartStepInfoLength)
  350. hash = hashc((const unsigned char *) smartStepInfoData, smartStepInfoLength, hash);
  351. // NOTE - don't hash the trace info!
  352. hash = hashc((const unsigned char *) contextData, contextLength, hash);
  353. hash = hashc((const unsigned char *) &data->channel, sizeof(data->channel), hash);
  354. hash = hashc((const unsigned char *) &data->overflowSequence, sizeof(data->overflowSequence), hash);
  355. hash = hashc((const unsigned char *) &data->continueSequence, sizeof(data->continueSequence), hash);
  356. // MORE - sequence fields should always be zero for anything we are caching I think... (?)
  357. // Note - no point hashing activityId (as cache is local to one activity) or serverIP (likewise)
  358. return hash;
  359. }
  360. virtual bool cacheMatch(const IRoxieQueryPacket *c) const
  361. {
  362. // note - this checks whether it's a repeat from Roxie server's point-of-view
  363. // So fields that are compared are the same as the ones that are hashed....
  364. RoxiePacketHeader &h = c->queryHeader();
  365. if (data->channel == h.channel && data->overflowSequence == h.overflowSequence && data->continueSequence == h.continueSequence)
  366. {
  367. if (continuationLength) // note - we already checked that sequences match
  368. {
  369. if (continuationLength != c->getContinuationLength())
  370. return false;
  371. if (memcmp(continuationData,c->queryContinuationData(),continuationLength)!=0)
  372. return false;
  373. }
  374. if (smartStepInfoLength)
  375. {
  376. if (smartStepInfoLength != c->getSmartStepInfoLength())
  377. return false;
  378. if (memcmp(smartStepInfoData,c->querySmartStepInfoData(),smartStepInfoLength)!=0)
  379. return false;
  380. }
  381. // NOTE - trace info NOT compared
  382. if (contextLength == c->getContextLength() && memcmp(contextData, c->queryContextData(), contextLength)==0)
  383. return true;
  384. }
  385. return false;
  386. }
  387. };
  388. extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
  389. {
  390. if ((unsigned short)_len != _len)
  391. {
  392. StringBuffer s;
  393. RoxiePacketHeader *header = (RoxiePacketHeader *) _data;
  394. header->toString(s);
  395. free(_data);
  396. throw MakeStringException(ROXIE_PACKET_ERROR, "Packet length %d exceeded maximum sending packet %s", _len, s.str());
  397. }
  398. return new CRoxieQueryPacket(_data, _len);
  399. }
  400. extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
  401. {
  402. unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
  403. return createRoxiePacket(m.detachOwn(), length);
  404. }
  405. //=================================================================================
  406. SlaveContextLogger::SlaveContextLogger()
  407. {
  408. GetHostIp(ip);
  409. set(NULL);
  410. }
  411. SlaveContextLogger::SlaveContextLogger(IRoxieQueryPacket *packet)
  412. {
  413. GetHostIp(ip);
  414. set(packet);
  415. }
  416. void SlaveContextLogger::set(IRoxieQueryPacket *packet)
  417. {
  418. anyOutput = false;
  419. intercept = false;
  420. debuggerActive = false;
  421. checkingHeap = false;
  422. aborted = false;
  423. stats.reset();
  424. start = msTick();
  425. if (packet)
  426. {
  427. CriticalBlock b(crit);
  428. RoxiePacketHeader &header = packet->queryHeader();
  429. const byte *traceInfo = packet->queryTraceInfo();
  430. unsigned traceLength = packet->getTraceLength();
  431. unsigned char loggingFlags = *traceInfo;
  432. if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL
  433. {
  434. traceInfo++;
  435. traceLength--;
  436. if (loggingFlags & LOGGING_INTERCEPTED)
  437. intercept = true;
  438. if (loggingFlags & LOGGING_TRACELEVELSET)
  439. {
  440. ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
  441. traceLength--;
  442. }
  443. if (loggingFlags & LOGGING_BLIND)
  444. blind = true;
  445. if (loggingFlags & LOGGING_CHECKINGHEAP)
  446. checkingHeap = true;
  447. if (loggingFlags & LOGGING_DEBUGGERACTIVE)
  448. {
  449. assertex(traceLength > sizeof(unsigned short));
  450. debuggerActive = true;
  451. unsigned short debugLen = *(unsigned short *) traceInfo;
  452. traceInfo += debugLen + sizeof(unsigned short);
  453. traceLength -= debugLen + sizeof(unsigned short);
  454. }
  455. // Passing the wuid via the logging context prefix is a bit of a hack...
  456. if (loggingFlags & LOGGING_WUID)
  457. {
  458. unsigned wuidLen = 0;
  459. while (wuidLen < traceLength)
  460. {
  461. if (traceInfo[wuidLen]=='@')
  462. break;
  463. wuidLen++;
  464. }
  465. wuid.set((const char *) traceInfo, wuidLen);
  466. }
  467. }
  468. channel = header.channel;
  469. StringBuffer s(traceLength, (const char *) traceInfo);
  470. s.append("|");
  471. ip.getIpText(s);
  472. s.append(':').append(channel);
  473. StringContextLogger::set(s.str());
  474. if (intercept || mergeSlaveStatistics)
  475. {
  476. RoxiePacketHeader newHeader(header, ROXIE_TRACEINFO);
  477. output.setown(ROQ->createOutputStream(newHeader, true, *this));
  478. }
  479. }
  480. else
  481. {
  482. StringContextLogger::set("");
  483. channel = 0;
  484. }
  485. }
  486. void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed) const
  487. {
  488. if (output && mergeSlaveStatistics)
  489. {
  490. MemoryBuffer buf;
  491. buf.append((char) LOG_CHILDCOUNT); // A special log entry for the stats
  492. buf.append(subGraphId);
  493. buf.append(actId);
  494. buf.append(idx);
  495. buf.append(processed);
  496. }
  497. }
  498. void SlaveContextLogger::putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const
  499. {
  500. if (output && mergeSlaveStatistics)
  501. {
  502. MemoryBuffer buf;
  503. buf.append((char) LOG_CHILDSTATS); // A special log entry for the stats
  504. buf.append(subGraphId);
  505. buf.append(actId);
  506. if (stats.serialize(buf))
  507. {
  508. unsigned len = buf.length();
  509. void *ret = output->getBuffer(len, true);
  510. memcpy(ret, buf.toByteArray(), len);
  511. output->putBuffer(ret, len, true);
  512. anyOutput = true;
  513. }
  514. }
  515. }
  516. void SlaveContextLogger::flush()
  517. {
  518. if (output)
  519. {
  520. CriticalBlock b(crit);
  521. if (mergeSlaveStatistics)
  522. {
  523. MemoryBuffer buf;
  524. buf.append((char) LOG_STATVALUES); // A special log entry for the stats
  525. if (stats.serialize(buf))
  526. {
  527. unsigned len = buf.length();
  528. void *ret = output->getBuffer(len, true);
  529. memcpy(ret, buf.toByteArray(), len);
  530. output->putBuffer(ret, len, true);
  531. anyOutput = true;
  532. }
  533. }
  534. ForEachItemIn(idx, log)
  535. {
  536. MemoryBuffer buf;
  537. LogItem &logItem = log.item(idx);
  538. logItem.serialize(buf);
  539. unsigned len = buf.length();
  540. void *ret = output->getBuffer(len, true);
  541. memcpy(ret, buf.toByteArray(), len);
  542. output->putBuffer(ret, len, true);
  543. anyOutput = true;
  544. }
  545. log.kill();
  546. if (anyOutput)
  547. output->flush(true);
  548. output.clear();
  549. }
  550. }
  551. //=================================================================================
  552. unsigned getIbytiDelay(unsigned channel, const RoxiePacketHeader &header)
  553. {
  554. // MORE - adjust delay according to whether it's a retry, whether it was a broadcast etc
  555. CriticalBlock b(ibytiCrit);
  556. return IBYTIDelays[channel];
  557. }
  558. void resetIbytiDelay(unsigned channel)
  559. {
  560. unsigned prevVal;
  561. {
  562. CriticalBlock b(ibytiCrit);
  563. prevVal = IBYTIDelays[channel];
  564. IBYTIDelays[channel] = initIbytiDelay;
  565. }
  566. if (traceLevel > 8 && prevVal != initIbytiDelay)
  567. DBGLOG("Reset IBYTI delay value for channel %u from %u to %u", channel, prevVal, initIbytiDelay);
  568. }
  569. void decIbytiDelay(unsigned channel, unsigned factor = 2)
  570. {
  571. unsigned prevVal, newVal;
  572. {
  573. CriticalBlock b(ibytiCrit);
  574. prevVal = IBYTIDelays[channel];
  575. IBYTIDelays[channel] /= factor;
  576. if (IBYTIDelays[channel] < minIbytiDelay)
  577. IBYTIDelays[channel] = minIbytiDelay;
  578. newVal = IBYTIDelays[channel];
  579. }
  580. if (traceLevel > 8 && prevVal != newVal)
  581. DBGLOG("Dec IBYTI delay value for channel %u from %u to %u (factor=%u)", channel, prevVal, newVal, factor);
  582. }
  583. //=================================================================================
  584. static SpinLock onDemandQueriesCrit;
  585. static MapXToMyClass<hash64_t, hash64_t, IQueryFactory> onDemandQueryCache;
  586. void sendUnloadMessage(hash64_t hash, const char *id, const IRoxieContextLogger &logctx)
  587. {
  588. unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen(id) + 1;
  589. void *packetData = malloc(packetSize);
  590. RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
  591. RemoteActivityId unloadId(ROXIE_UNLOAD, hash);
  592. header->init(unloadId, 0, 0, 0);
  593. char *finger = (char *) (header + 1);
  594. *finger++ = (char) LOGGING_FLAGSPRESENT;
  595. strcpy(finger, id);
  596. finger += strlen(id)+1;
  597. if (traceLevel > 1)
  598. DBGLOG("UNLOAD sent for query %s", id);
  599. Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
  600. ROQ->sendPacket(packet, logctx);
  601. }
  602. void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  603. {
  604. const RoxiePacketHeader &header = packet->queryHeader();
  605. unsigned channelNo = header.channel;
  606. logctx.CTXLOG("Unload received for channel %d", channelNo);
  607. hash64_t hashValue = header.queryHash;
  608. SpinBlock b(onDemandQueriesCrit);
  609. onDemandQueryCache.remove(hashValue+channelNo);
  610. }
  611. void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
  612. {
  613. SpinBlock b(onDemandQueriesCrit);
  614. onDemandQueryCache.setValue(hashValue+channelNo, query);
  615. }
  616. //=================================================================================
  617. struct PingRecord
  618. {
  619. unsigned tick;
  620. IpAddress senderIP;
  621. };
  622. void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  623. {
  624. const RoxiePacketHeader &header = packet->queryHeader();
  625. const IpAddress &serverIP = getNodeAddress(header.serverIdx);
  626. unsigned contextLength = packet->getContextLength();
  627. if (contextLength != sizeof(PingRecord))
  628. {
  629. StringBuffer s;
  630. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Unexpected data size %d (expected %d) in PING: %s", contextLength, (unsigned) sizeof(PingRecord), header.toString(s).str());
  631. }
  632. const PingRecord *data = (const PingRecord *) packet->queryContextData();
  633. if (!serverIP.ipequals(data->senderIP))
  634. {
  635. StringBuffer s;
  636. throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
  637. }
  638. RoxiePacketHeader newHeader(header, ROXIE_PING);
  639. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  640. void *ret = output->getBuffer(contextLength, false);
  641. memcpy(ret, data, contextLength);
  642. output->putBuffer(ret, contextLength, false);
  643. output->flush(true);
  644. }
  645. //=================================================================================
  646. //
  647. // RoxieQueue - holds pending transactions on a roxie agent
  648. class RoxieQueue : public CInterface
  649. {
  650. QueueOf<IRoxieQueryPacket, true> waiting;
  651. Semaphore available;
  652. CriticalSection qcrit;
  653. unsigned headRegionSize;
  654. public:
  655. IMPLEMENT_IINTERFACE;
  656. RoxieQueue(unsigned _headRegionSize)
  657. {
  658. headRegionSize = _headRegionSize;
  659. }
  660. void enqueue(IRoxieQueryPacket *x)
  661. {
  662. {
  663. CriticalBlock qc(qcrit);
  664. #ifdef TIME_PACKETS
  665. header.tick = msTick();
  666. #endif
  667. waiting.enqueue(x);
  668. CriticalBlock b(counterCrit);
  669. queueLength++;
  670. if (queueLength>maxQueueLength)
  671. maxQueueLength = queueLength;
  672. }
  673. available.signal();
  674. }
  675. void enqueueUnique(IRoxieQueryPacket *x)
  676. {
  677. {
  678. CriticalBlock qc(qcrit);
  679. RoxiePacketHeader &header = x->queryHeader();
  680. #ifdef TIME_PACKETS
  681. header.tick = msTick();
  682. #endif
  683. unsigned len = waiting.ordinality();
  684. unsigned i;
  685. for (i = 0; i < len; i++)
  686. {
  687. IRoxieQueryPacket *queued = waiting.item(i);
  688. if (queued && queued->queryHeader().matchPacket(header))
  689. {
  690. bool primChannel = true;
  691. if (subChannels[header.channel] != 1) primChannel = false;
  692. if (traceLevel > 0)
  693. {
  694. StringBuffer xx;
  695. SlaveContextLogger l(x);
  696. l.CTXLOG("Ignored retry on %s channel for queued activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  697. }
  698. if (primChannel)
  699. atomic_inc(&retriesIgnoredPrm);
  700. else
  701. atomic_inc(&retriesIgnoredSec);
  702. x->Release();
  703. return;
  704. }
  705. }
  706. if (traceLevel > 10)
  707. {
  708. SlaveContextLogger l(x);
  709. StringBuffer xx;
  710. l.CTXLOG("enqueued %s", header.toString(xx).str());
  711. }
  712. waiting.enqueue(x);
  713. CriticalBlock b(counterCrit);
  714. queueLength++;
  715. if (queueLength>maxQueueLength)
  716. maxQueueLength = queueLength;
  717. }
  718. available.signal();
  719. }
  720. bool remove(RoxiePacketHeader &x)
  721. {
  722. CriticalBlock qc(qcrit);
  723. unsigned len = waiting.ordinality();
  724. unsigned i;
  725. unsigned scanLength = 0;
  726. for (i = 0; i < len; i++)
  727. {
  728. IRoxieQueryPacket *queued = waiting.item(i);
  729. if (queued)
  730. {
  731. scanLength++;
  732. if (queued->queryHeader().matchPacket(x))
  733. {
  734. #ifdef _DEBUG
  735. RoxiePacketHeader &header = queued->queryHeader();
  736. SlaveContextLogger l(queued);
  737. StringBuffer xx;
  738. l.CTXLOG("discarded %s", header.toString(xx).str());
  739. #endif
  740. // Already done in doIBYTI()...queue.remove() !!!!! atomic_inc(&ibytiPacketsWorked);
  741. waiting.set(i, NULL);
  742. queued->Release();
  743. CriticalBlock b(counterCrit);
  744. queueLength--;
  745. if (scanLength > maxScanLength)
  746. maxScanLength = scanLength;
  747. totScanLength += scanLength;
  748. totScans++;
  749. if (totScans)
  750. meanScanLength = totScanLength / totScans;
  751. return true;
  752. }
  753. }
  754. }
  755. return false;
  756. }
  757. void wait()
  758. {
  759. available.wait();
  760. }
  761. void signal(unsigned num)
  762. {
  763. available.signal(num);
  764. }
  765. IRoxieQueryPacket *dequeue()
  766. {
  767. CriticalBlock qc(qcrit);
  768. unsigned lim = waiting.ordinality();
  769. if (lim)
  770. {
  771. if (headRegionSize)
  772. {
  773. if (lim > headRegionSize)
  774. lim = headRegionSize;
  775. return waiting.dequeue(rand() % lim);
  776. }
  777. return waiting.dequeue();
  778. }
  779. else
  780. return NULL;
  781. }
  782. unsigned getHeadRegionSize() const
  783. {
  784. return headRegionSize;
  785. }
  786. unsigned setHeadRegionSize(unsigned newsize)
  787. {
  788. unsigned ret = headRegionSize;
  789. headRegionSize = newsize;
  790. return ret;
  791. }
  792. };
  793. class CRoxieWorker : public CInterface, implements IPooledThread
  794. {
  795. RoxieQueue *queue;
  796. CriticalSection actCrit;
  797. Semaphore ibytiSem;
  798. bool stopped;
  799. bool abortJob;
  800. bool busy;
  801. Owned<IRoxieSlaveActivity> activity;
  802. Owned<IRoxieQueryPacket> packet;
  803. SlaveContextLogger logctx;
  804. public:
  805. IMPLEMENT_IINTERFACE;
  806. CRoxieWorker()
  807. {
  808. queue = NULL;
  809. stopped = false;
  810. busy = false;
  811. abortJob = false;
  812. }
  813. void init(void *_r)
  814. {
  815. queue = (RoxieQueue *) _r;
  816. stopped = false;
  817. busy = false;
  818. abortJob = false;
  819. }
  820. bool canReuse()
  821. {
  822. return true;
  823. }
  824. bool stop()
  825. {
  826. stopped = true;
  827. return true;
  828. }
  829. inline void setActivity(IRoxieSlaveActivity *act)
  830. {
  831. CriticalBlock b(actCrit);
  832. activity.setown(act);
  833. }
  834. inline bool match(RoxiePacketHeader &h)
  835. {
  836. // There is a window between getting packet from queue and being able to match it.
  837. // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
  838. CriticalBlock b(actCrit);
  839. return packet && packet->queryHeader().matchPacket(h);
  840. }
  841. void abortChannel(unsigned channel)
  842. {
  843. CriticalBlock b(actCrit);
  844. if (packet && packet->queryHeader().channel==channel)
  845. {
  846. abortJob = true;
  847. if (doIbytiDelay)
  848. ibytiSem.signal();
  849. if (activity)
  850. activity->abort();
  851. }
  852. }
  853. bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
  854. {
  855. CriticalBlock b(actCrit);
  856. if (packet && packet->queryHeader().matchPacket(h))
  857. {
  858. queryFound = true;
  859. abortJob = true;
  860. if (doIbytiDelay)
  861. ibytiSem.signal();
  862. if (activity)
  863. {
  864. // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority slave
  865. // (more primary in the rank). The slaves with higher rank will hold the lower bits of the retries field in IBYTI packet).
  866. if (!checkRank || ((h.retries & ROXIE_RETRIES_MASK) < h.getSubChannelMask(h.channel)))
  867. {
  868. activity->abort();
  869. return true;
  870. }
  871. else
  872. {
  873. return false;
  874. }
  875. }
  876. if (busy)
  877. {
  878. preActivity = true;
  879. return true;
  880. }
  881. }
  882. return false;
  883. }
  884. void throwRemoteException(IException *E, IRoxieSlaveActivity *activity, IRoxieQueryPacket *packet, bool isUser)
  885. {
  886. try
  887. {
  888. if (activity && (logctx.queryTraceLevel() > 1))
  889. {
  890. StringBuffer act;
  891. activity->toString(act);
  892. logctx.CTXLOG("throwRemoteException, activity %s, isUser=%d", act.str(), (int) isUser);
  893. if (!isUser)
  894. EXCLOG(E, "throwRemoteException");
  895. }
  896. RoxiePacketHeader &header = packet->queryHeader();
  897. // I failed to do the query, but already sent out IBYTI - resend it so someone else can try
  898. if (!isUser)
  899. {
  900. StringBuffer s;
  901. s.append("Exception in slave for packet ");
  902. header.toString(s);
  903. logctx.logOperatorException(E, NULL, 0, "%s", s.str());
  904. header.setException();
  905. if (!header.allChannelsFailed())
  906. {
  907. if (logctx.queryTraceLevel() > 1)
  908. logctx.CTXLOG("resending packet from slave in case others want to try it");
  909. ROQ->sendPacket(packet, logctx);
  910. }
  911. }
  912. RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION);
  913. if (isUser)
  914. newHeader.retries = (unsigned short) -1;
  915. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  916. StringBuffer message("<Exception>");
  917. message.appendf("<Code>%d</Code><Message>", E->errorCode());
  918. StringBuffer err;
  919. E->errorMessage(err);
  920. encodeXML(err.str(), message);
  921. message.append("</Message></Exception>");
  922. unsigned len = message.length();
  923. void *ret = output->getBuffer(len+1, true);
  924. memcpy(ret, message.str(), len+1);
  925. output->putBuffer(ret, len+1, true);
  926. output->flush(true);
  927. E->Release();
  928. }
  929. catch (IException *EInE)
  930. {
  931. EXCLOG(EInE, "Exception during throwRemoteException");
  932. E->Release();
  933. EInE->Release();
  934. }
  935. catch (...)
  936. {
  937. logctx.CTXLOG("Unknown Exception during throwRemoteException");
  938. E->Release();
  939. }
  940. }
  941. void doActivity()
  942. {
  943. RoxiePacketHeader &header = packet->queryHeader();
  944. unsigned channel = header.channel;
  945. hash64_t queryHash = packet->queryHeader().queryHash;
  946. unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
  947. Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
  948. if (!queryFactory && logctx.queryWuid())
  949. {
  950. Owned <IRoxieDaliHelper> daliHelper = connectToDali();
  951. Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(logctx.queryWuid(), NULL);
  952. queryFactory.setown(createSlaveQueryFactoryFromWu(wu, channel));
  953. if (queryFactory)
  954. cacheOnDemandQuery(queryHash, channel, queryFactory);
  955. }
  956. if (!queryFactory)
  957. {
  958. StringBuffer hdr;
  959. IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie slave received request for unregistered query: %s", packet->queryHeader().toString(hdr).str());
  960. EXCLOG(E, "doActivity");
  961. throwRemoteException(E, activity, packet, false);
  962. return;
  963. }
  964. try
  965. {
  966. if (logctx.queryTraceLevel() > 8)
  967. {
  968. StringBuffer x;
  969. logctx.CTXLOG("IBYTI delay controls : doIbytiDelay=%s numslaves=%u subchnl=%u : %s",
  970. doIbytiDelay?"YES":"NO",
  971. numSlaves[channel], subChannels[channel],
  972. header.toString(x).str());
  973. }
  974. bool debugging = logctx.queryDebuggerActive();
  975. if (debugging)
  976. {
  977. if (subChannels[channel] != 1)
  978. abortJob = true; // when debugging, we always run on primary only...
  979. }
  980. else if (doIbytiDelay && (numSlaves[channel] > 1))
  981. {
  982. bool primChannel = true;
  983. if (subChannels[channel] != 1)
  984. primChannel = false;
  985. bool myTurnToDelayIBYTI = true; // all slaves will delay, except one
  986. unsigned hdrHashVal = header.priorityHash();
  987. if ((((hdrHashVal % numSlaves[channel]) + 1) == subChannels[channel]))
  988. myTurnToDelayIBYTI = false;
  989. if (myTurnToDelayIBYTI)
  990. {
  991. unsigned delay = getIbytiDelay(channel, header);
  992. if (logctx.queryTraceLevel() > 6)
  993. {
  994. StringBuffer x;
  995. logctx.CTXLOG("YES myTurnToDelayIBYTI channel=%s delay=%u hash=%u %s", primChannel?"primary":"secondary", delay, hdrHashVal, header.toString(x).str());
  996. }
  997. // MORE: this code puts the penalty on all slaves on this channel,
  998. // change it to have one for each slave on every channel.
  999. // NOT critical for the time being with 2 slaves per channel
  1000. // MORE: if we are dealing with a query that was on channel 0, we may want a longer delay
  1001. // (since the theory about duplicated work not mattering when cluster is idle does not hold up)
  1002. if (delay)
  1003. {
  1004. ibytiSem.wait(delay);
  1005. if (abortJob)
  1006. resetIbytiDelay(channel); // we know there is an active buddy on the channel...
  1007. else
  1008. decIbytiDelay(channel);
  1009. if (logctx.queryTraceLevel() > 8)
  1010. {
  1011. StringBuffer x;
  1012. logctx.CTXLOG("Buddy did%s send IBYTI, updated delay=%u : %s",
  1013. abortJob ? "" : " NOT", IBYTIDelays[channel], header.toString(x).str());
  1014. }
  1015. }
  1016. }
  1017. else {
  1018. #ifndef NO_IBYTI_DELAYS_COUNT
  1019. if (primChannel) atomic_inc(&ibytiNoDelaysPrm);
  1020. else atomic_inc(&ibytiNoDelaysSec);
  1021. #endif
  1022. if (logctx.queryTraceLevel() > 6)
  1023. {
  1024. StringBuffer x;
  1025. logctx.CTXLOG("NOT myTurnToDelayIBYTI channel=%s hash=%u %s", primChannel?"primary":"secondary", hdrHashVal, header.toString(x).str());
  1026. }
  1027. }
  1028. }
  1029. if (abortJob)
  1030. {
  1031. CriticalBlock b(actCrit);
  1032. busy = false; // Keep order - before setActivity below
  1033. if (logctx.queryTraceLevel() > 5)
  1034. {
  1035. StringBuffer x;
  1036. logctx.CTXLOG("Stop before processing - activity aborted %s", header.toString(x).str());
  1037. }
  1038. return;
  1039. }
  1040. if (!debugging)
  1041. ROQ->sendIbyti(header, logctx);
  1042. atomic_inc(&activitiesStarted);
  1043. Owned <ISlaveActivityFactory> factory = queryFactory->getSlaveActivityFactory(activityId);
  1044. assertex(factory);
  1045. setActivity(factory->createActivity(logctx, packet));
  1046. #ifdef TEST_SLAVE_FAILURE
  1047. bool skip = false;
  1048. if (testSlaveFailure)
  1049. {
  1050. // Meaning of each byte in testSlaveFailure
  1051. // bits 1 -> 4 : test cast type (4 bits)
  1052. // bits 5 -> 11 : test case Freq (7 bits)
  1053. // bit 12 : : 1 Dot NOT Send ROXIE_ALIVE message - 0: send it
  1054. // bits 13 -> 32 : test case parameter (20 bits), if any
  1055. unsigned testCaseType = (testSlaveFailure & 0x0000000F);
  1056. unsigned testCaseFreq = (testSlaveFailure & 0x000007F0) >> 4;
  1057. unsigned testCaseParm = (testSlaveFailure & 0xFFFFF000) >> 12;
  1058. if (testCaseFreq && (atomic_read(&activitiesStarted) % testCaseFreq == 0))
  1059. {
  1060. StringBuffer x;
  1061. logctx.CTXLOG("------------ TestSlaveFailure to do the following (testCase=%u freq=%u tot=%u parm=%u ROXIE_ALIVE is %s - val=0x%.8X) for %s",
  1062. testCaseType, testCaseFreq, (unsigned) atomic_read(&activitiesStarted), testCaseParm, (testSlaveFailure & 0x800) ? "OFF" : "ON",
  1063. testSlaveFailure, header.toString(x).str());
  1064. switch (testCaseType)
  1065. {
  1066. case 1:
  1067. if (testCaseParm == 0) testCaseParm = 10000;
  1068. logctx.CTXLOG("--------------- Sleeping for %u ms - testCase=%u", testCaseParm, testCaseType);
  1069. Sleep(testCaseParm);
  1070. break;
  1071. case 2:
  1072. logctx.CTXLOG("--------------- Skip processing - testCase=%u ------", testCaseType);
  1073. skip = true;
  1074. break;
  1075. case 3:
  1076. logctx.CTXLOG("--------------- Throwing Exception String number %u NOW - testCase=%u -----", ROXIE_FILE_ERROR, testCaseType);
  1077. throw MakeStringException(ROXIE_FILE_ERROR, "Simulate File Exception in slave NOW");
  1078. break;
  1079. case 4:
  1080. if (numSlaves[channel] == 1)
  1081. {
  1082. logctx.CTXLOG("--------------- Setting numSlaves[channel=%u] to 2 to force one way to act as two way for ibyti logic testing - testCase=%u ------", channel, testCaseType);
  1083. numSlaves[channel] = 2;
  1084. }
  1085. testSlaveFailure = 0;
  1086. break;
  1087. }
  1088. }
  1089. }
  1090. if (!skip)
  1091. {
  1092. #endif
  1093. Owned<IMessagePacker> output = activity->process();
  1094. if (logctx.queryTraceLevel() > 5)
  1095. {
  1096. StringBuffer x;
  1097. logctx.CTXLOG("done processing %s", header.toString(x).str());
  1098. }
  1099. if (output)
  1100. {
  1101. atomic_inc(&activitiesCompleted);
  1102. busy = false; // Keep order - before setActivity below
  1103. setActivity(NULL); // Ensures all stats are merged from child queries etc
  1104. logctx.flush();
  1105. output->flush(true);
  1106. }
  1107. #ifdef TEST_SLAVE_FAILURE
  1108. }
  1109. #endif
  1110. }
  1111. catch (IUserException *E)
  1112. {
  1113. throwRemoteException(E, activity, packet, true);
  1114. }
  1115. catch (IException *E)
  1116. {
  1117. if (E->errorCode()!=ROXIE_ABORT_ERROR)
  1118. throwRemoteException(E, activity, packet, false);
  1119. }
  1120. catch (...)
  1121. {
  1122. throwRemoteException(MakeStringException(ROXIE_MULTICAST_ERROR, "Unknown exception"), activity, packet, false);
  1123. }
  1124. busy = false; // Keep order - before setActivity below
  1125. setActivity(NULL);
  1126. }
  1127. void main()
  1128. {
  1129. while (!stopped)
  1130. {
  1131. try
  1132. {
  1133. loop
  1134. {
  1135. queue->wait();
  1136. if (stopped)
  1137. break;
  1138. {
  1139. CriticalBlock b(counterCrit);
  1140. slavesActive++;
  1141. if (slavesActive > maxSlavesActive)
  1142. maxSlavesActive = slavesActive;
  1143. }
  1144. abortJob = false;
  1145. busy = true;
  1146. if (doIbytiDelay)
  1147. ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
  1148. {
  1149. CriticalBlock b(queueCrit);
  1150. packet.setown(queue->dequeue());
  1151. }
  1152. if (packet)
  1153. {
  1154. {
  1155. CriticalBlock b(counterCrit);
  1156. queueLength--;
  1157. }
  1158. RoxiePacketHeader &header = packet->queryHeader();
  1159. logctx.set(packet);
  1160. #ifdef TIME_PACKETS
  1161. {
  1162. unsigned now = msTick();
  1163. unsigned packetWait = now-header.tick;
  1164. header.tick = now;
  1165. CriticalBlock b(counterCrit);
  1166. if (packetWait > packetWaitMax)
  1167. packetWaitMax = packetWait;
  1168. packetWaitElapsed += packetWait;
  1169. atomic_inc(&packetWaitCount);
  1170. }
  1171. #endif
  1172. if (logctx.queryTraceLevel() > 10)
  1173. {
  1174. StringBuffer x;
  1175. logctx.CTXLOG("dequeued %s", header.toString(x).str());
  1176. }
  1177. if (ROQ->checkSuspended(header, logctx))
  1178. {
  1179. StringBuffer s;
  1180. logctx.CTXLOG("Ignoring packet for suspended channel %d: %s", header.channel, header.toString(s).str());
  1181. }
  1182. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_UNLOAD)
  1183. {
  1184. doUnload(packet, logctx);
  1185. }
  1186. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_PING)
  1187. {
  1188. doPing(packet, logctx);
  1189. }
  1190. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_DEBUGREQUEST)
  1191. {
  1192. // MORE - we need to make sure only executed on primary, and that the proxyId (== pointer to DebugGraphManager) is still valid.
  1193. // It may be that there is not a lot of point using the pointer - may as well use an non-reused ID and look it up in a global hash table of active ones
  1194. doDebugRequest(packet, logctx);
  1195. }
  1196. else if (header.channel)
  1197. doActivity();
  1198. else
  1199. throwUnexpected(); // channel 0 requests translated earlier now
  1200. #ifdef TIME_PACKETS
  1201. {
  1202. unsigned now = msTick();
  1203. unsigned packetRun = now-header.tick;
  1204. CriticalBlock b(counterCrit);
  1205. if (packetRun > packetRunMax)
  1206. packetRunMax = packetRun;
  1207. packetRunElapsed += packetRun;
  1208. atomic_inc(&packetRunCount);
  1209. }
  1210. #endif
  1211. }
  1212. busy = false;
  1213. {
  1214. CriticalBlock b(actCrit);
  1215. packet.clear();
  1216. logctx.set(NULL);
  1217. }
  1218. {
  1219. CriticalBlock b(counterCrit);
  1220. slavesActive--;
  1221. }
  1222. }
  1223. }
  1224. catch(...)
  1225. {
  1226. CriticalBlock b(actCrit);
  1227. Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
  1228. EXCLOG(E);
  1229. if (packet)
  1230. throwRemoteException(E.getClear(), NULL, packet, false);
  1231. packet.clear();
  1232. }
  1233. }
  1234. }
  1235. };
  1236. //=================================================================================
  1237. class CallbackEntry : public CInterface, implements IPendingCallback
  1238. {
  1239. const RoxiePacketHeader &header;
  1240. StringAttr lfn;
  1241. InterruptableSemaphore ready;
  1242. MemoryBuffer data;
  1243. bool gotData;
  1244. public:
  1245. IMPLEMENT_IINTERFACE;
  1246. CallbackEntry(const RoxiePacketHeader &_header, const char *_lfn) : header(_header), lfn(_lfn)
  1247. {
  1248. gotData = false;
  1249. }
  1250. virtual bool wait(unsigned msecs)
  1251. {
  1252. return ready.wait(msecs);
  1253. }
  1254. virtual MemoryBuffer &queryData()
  1255. {
  1256. return data;
  1257. }
  1258. bool matches(RoxiePacketHeader &cand, const char *_lfn)
  1259. {
  1260. return (cand.matchPacket(header) && (!_lfn|| stricmp(_lfn, lfn)==0));
  1261. }
  1262. void doFileCallback(unsigned _len, const void *_data, bool aborted)
  1263. {
  1264. // MORE - make sure we call this for whole query abort as well as for callback abort
  1265. if (aborted)
  1266. ready.interrupt(MakeStringException(0, "Interrupted"));
  1267. else if (!gotData)
  1268. {
  1269. gotData = true;
  1270. data.append(_len, _data);
  1271. ready.signal();
  1272. }
  1273. }
  1274. };
  1275. class RoxieReceiverBase : public CInterface, implements IThreadFactory, implements IRoxieOutputQueueManager
  1276. {
  1277. protected:
  1278. #ifdef ROXIE_SLA_LOGIC
  1279. RoxieQueue slaQueue;
  1280. Owned <IThreadPool> slaWorkers;
  1281. #endif
  1282. RoxieQueue hiQueue;
  1283. Owned <IThreadPool> hiWorkers;
  1284. RoxieQueue loQueue;
  1285. Owned <IThreadPool> loWorkers;
  1286. unsigned numWorkers;
  1287. void abortChannel(unsigned channel, IThreadPool *workers)
  1288. {
  1289. Owned<IPooledThreadIterator> wi = workers->running();
  1290. ForEach(*wi)
  1291. {
  1292. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1293. w.abortChannel(channel);
  1294. }
  1295. }
  1296. public:
  1297. IMPLEMENT_IINTERFACE;
  1298. #ifdef ROXIE_SLA_LOGIC
  1299. RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), slaQueue(headRegionSize), hiQueue(headRegionSize), loQueue(headRegionSize)
  1300. #else
  1301. RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), hiQueue(headRegionSize), loQueue(headRegionSize)
  1302. #endif
  1303. {
  1304. loWorkers.setown(createThreadPool("RoxieLoWorkers", this, NULL, numWorkers));
  1305. hiWorkers.setown(createThreadPool("RoxieHiWorkers", this, NULL, numWorkers));
  1306. #ifdef ROXIE_SLA_LOGIC
  1307. slaWorkers.setown(createThreadPool("RoxieSLAWorkers", this, NULL, numWorkers));
  1308. #endif
  1309. CriticalBlock b(ccdChannelsCrit);
  1310. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
  1311. ForEach(*it)
  1312. {
  1313. unsigned channel = it->query().getPropInt("@channel", 0);
  1314. unsigned subChannel = it->query().getPropInt("@subChannel", 0);
  1315. assertex(channel <= numChannels);
  1316. assertex(subChannels[channel] == 0);
  1317. assertex(subChannel != 0);
  1318. subChannels[channel] = subChannel;
  1319. IBYTIDelays[channel] = initIbytiDelay;
  1320. channels[channelCount++] = channel;
  1321. }
  1322. }
  1323. virtual bool checkSuspended(const RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1324. {
  1325. bool suspended;
  1326. {
  1327. SpinBlock b(suspendCrit);
  1328. suspended = suspendedChannels[header.channel];
  1329. }
  1330. if (suspended)
  1331. {
  1332. try
  1333. {
  1334. RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION);
  1335. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1336. StringBuffer message;
  1337. message.appendf("<Exception><Code>%d</Code><Message>Channel %d is suspended</Message></Exception>", ROXIE_CHANNEL_SUSPENDED, header.channel);
  1338. unsigned len = message.length();
  1339. void *ret = output->getBuffer(len+1, true);
  1340. memcpy(ret, message.str(), len+1);
  1341. output->putBuffer(ret, len+1, true);
  1342. output->flush(true);
  1343. }
  1344. catch (IException *EInE)
  1345. {
  1346. EXCLOG(EInE, "Exception during checkSuspended");
  1347. EInE->Release();
  1348. }
  1349. catch (...)
  1350. {
  1351. logctx.CTXLOG("Unknown Exception during checkSuspended");
  1352. }
  1353. }
  1354. return suspended;
  1355. }
  1356. virtual bool suspendChannel(unsigned channel, bool suspend, const IRoxieContextLogger &logctx)
  1357. {
  1358. assertex(channel < MAX_CLUSTER_SIZE);
  1359. bool prev;
  1360. {
  1361. SpinBlock b(suspendCrit);
  1362. prev = suspendedChannels[channel];
  1363. suspendedChannels[channel] = suspend;
  1364. }
  1365. if (suspend && subChannels[channel] && !prev)
  1366. {
  1367. logctx.CTXLOG("ERROR: suspending channel %d - aborting active queries", channel);
  1368. #ifdef ROXIE_SLA_LOGIC
  1369. abortChannel(channel, slaWorkers);
  1370. #endif
  1371. abortChannel(channel, hiWorkers);
  1372. abortChannel(channel, loWorkers);
  1373. }
  1374. return prev;
  1375. }
  1376. virtual unsigned getHeadRegionSize() const
  1377. {
  1378. return loQueue.getHeadRegionSize();
  1379. }
  1380. virtual void setHeadRegionSize(unsigned newSize)
  1381. {
  1382. #ifdef ROXIE_SLA_LOGIC
  1383. slaQueue.setHeadRegionSize(newSize);
  1384. #endif
  1385. hiQueue.setHeadRegionSize(newSize);
  1386. loQueue.setHeadRegionSize(newSize);
  1387. }
  1388. virtual void start()
  1389. {
  1390. for (unsigned i = 0; i < numWorkers; i++)
  1391. {
  1392. // MORE - why would we have same number of each?
  1393. // MORE - All workers (hi or low) have same sys priority, same number of workers,
  1394. // and same queue size ... What can make a query marked high priority
  1395. // get prioity of resource over low one ?
  1396. // MORE: I think we may want to have one set of worker threads that gets jobs of 2 (or 3 with SLA)
  1397. // queues (sla, high, and low). These workers will give higher priority for jobs
  1398. // on higher priority queues but without starving low ones... something similar to what
  1399. // I implemented in UDP output queues.
  1400. // For the time being, I will keep as is, and just add SLA queue and workers.
  1401. loWorkers->start(&loQueue);
  1402. hiWorkers->start(&hiQueue);
  1403. #ifdef ROXIE_SLA_LOGIC
  1404. slaWorkers->start(&slaQueue);
  1405. #endif
  1406. }
  1407. }
  1408. virtual void stop()
  1409. {
  1410. loWorkers->stopAll(true);
  1411. loQueue.signal(loWorkers->runningCount()); // MORE - looks like a race here... interruptableSemaphore would be better
  1412. hiWorkers->stopAll(true);
  1413. hiQueue.signal(hiWorkers->runningCount());
  1414. #ifdef ROXIE_SLA_LOGIC
  1415. slaWorkers->stopAll(true);
  1416. slaQueue.signal(slaWorkers->runningCount());
  1417. #endif
  1418. }
  1419. virtual void join()
  1420. {
  1421. #ifdef ROXIE_SLA_LOGIC
  1422. slaWorkers->joinAll(true);
  1423. #endif
  1424. hiWorkers->joinAll(true);
  1425. loWorkers->joinAll(true);
  1426. loWorkers.clear(); // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
  1427. hiWorkers.clear();
  1428. #ifdef ROXIE_SLA_LOGIC
  1429. slaWorkers.clear();
  1430. #endif
  1431. }
  1432. virtual IPooledThread *createNew()
  1433. {
  1434. return new CRoxieWorker;
  1435. }
  1436. IArrayOf<CallbackEntry> callbacks;
  1437. CriticalSection callbacksCrit;
  1438. virtual IPendingCallback *notePendingCallback(const RoxiePacketHeader &header, const char *lfn)
  1439. {
  1440. CriticalBlock b(callbacksCrit);
  1441. CallbackEntry *callback = new CallbackEntry(header, lfn);
  1442. callbacks.append(*callback);
  1443. return callback;
  1444. }
  1445. virtual void removePendingCallback(IPendingCallback *goer)
  1446. {
  1447. if (goer)
  1448. {
  1449. CriticalBlock b(callbacksCrit);
  1450. callbacks.zap(static_cast<CallbackEntry &>(*goer));
  1451. }
  1452. }
  1453. protected:
  1454. void doFileCallback(IRoxieQueryPacket *packet)
  1455. {
  1456. // This is called on the main slave reader thread so needs to be as fast as possible to avoid lost packets
  1457. const char *lfn;
  1458. const char *data;
  1459. unsigned len;
  1460. RoxiePacketHeader &header = packet->queryHeader();
  1461. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK)
  1462. {
  1463. lfn = (const char *) packet->queryContextData();
  1464. unsigned namelen = strlen(lfn) + 1;
  1465. data = lfn + namelen;
  1466. len = packet->getContextLength() - namelen;
  1467. }
  1468. else
  1469. {
  1470. lfn = data = NULL; // used when query aborted
  1471. len = 0;
  1472. }
  1473. CriticalBlock b(callbacksCrit);
  1474. ForEachItemIn(idx, callbacks)
  1475. {
  1476. CallbackEntry &c = callbacks.item(idx);
  1477. if (c.matches(header, lfn))
  1478. {
  1479. if (traceLevel > 10)
  1480. DBGLOG("callback return matched a waiting query");
  1481. c.doFileCallback(len, data, header.retries==QUERY_ABORTED);
  1482. }
  1483. }
  1484. }
  1485. };
  1486. #ifdef _MSC_VER
  1487. #pragma warning ( push )
  1488. #pragma warning ( disable: 4355 )
  1489. #endif
  1490. class RoxieThrottledPacketSender : public Thread
  1491. {
  1492. TokenBucket &bucket;
  1493. InterruptableSemaphore queued;
  1494. Semaphore started;
  1495. unsigned maxPacketSize;
  1496. SafeQueueOf<IRoxieQueryPacket, false> queue;
  1497. class StoppedException: public CInterface, public IException
  1498. {
  1499. public:
  1500. IMPLEMENT_IINTERFACE;
  1501. int errorCode() const { return 0; }
  1502. StringBuffer & errorMessage(StringBuffer &str) const { return str.append("Stopped"); }
  1503. MessageAudience errorAudience() const { return MSGAUD_user; }
  1504. };
  1505. void enqueue(IRoxieQueryPacket *packet)
  1506. {
  1507. packet->Link();
  1508. queue.enqueue(packet);
  1509. queued.signal();
  1510. }
  1511. IRoxieQueryPacket *dequeue()
  1512. {
  1513. queued.wait();
  1514. return queue.dequeue();
  1515. }
  1516. public:
  1517. RoxieThrottledPacketSender(TokenBucket &_bucket, unsigned _maxPacketSize)
  1518. : Thread("RoxieThrottledPacketSender"), bucket(_bucket), maxPacketSize(_maxPacketSize)
  1519. {
  1520. start();
  1521. started.wait();
  1522. }
  1523. ~RoxieThrottledPacketSender()
  1524. {
  1525. stop();
  1526. join();
  1527. }
  1528. virtual int run()
  1529. {
  1530. started.signal();
  1531. loop
  1532. {
  1533. try
  1534. {
  1535. Owned<IRoxieQueryPacket> packet = dequeue();
  1536. RoxiePacketHeader &header = packet->queryHeader();
  1537. unsigned length = packet->queryHeader().packetlength;
  1538. {
  1539. MTIME_SECTION(queryActiveTimer(), "bucket_wait");
  1540. bucket.wait((length / 1024) + 1);
  1541. }
  1542. if (channelWrite(header.channel, &header, length) != length)
  1543. DBGLOG("multicast write wrote too little");
  1544. atomic_inc(&packetsSent);
  1545. }
  1546. catch (StoppedException *E)
  1547. {
  1548. E->Release();
  1549. break;
  1550. }
  1551. catch (IException *E)
  1552. {
  1553. EXCLOG(E);
  1554. E->Release();
  1555. }
  1556. catch (...)
  1557. {
  1558. }
  1559. }
  1560. return 0;
  1561. }
  1562. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  1563. {
  1564. RoxiePacketHeader &header = x->queryHeader();
  1565. unsigned length = x->queryHeader().packetlength;
  1566. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  1567. switch (header.retries & ROXIE_RETRIES_MASK)
  1568. {
  1569. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  1570. {
  1571. StringBuffer s;
  1572. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  1573. }
  1574. break;
  1575. default:
  1576. {
  1577. StringBuffer s;
  1578. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  1579. }
  1580. break;
  1581. case 0:
  1582. if (logctx.queryTraceLevel() > 8)
  1583. {
  1584. StringBuffer s;
  1585. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  1586. }
  1587. break;
  1588. }
  1589. if (length > maxPacketSize)
  1590. {
  1591. StringBuffer s;
  1592. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  1593. }
  1594. enqueue(x);
  1595. }
  1596. void stop()
  1597. {
  1598. // bucket.stop();
  1599. queued.interrupt(new StoppedException);
  1600. }
  1601. };
  1602. class RoxieSocketQueueManager : public RoxieReceiverBase
  1603. {
  1604. unsigned maxPacketSize;
  1605. bool running;
  1606. Linked<ISendManager> sendManager;
  1607. Linked<IReceiveManager> receiveManager;
  1608. Owned<TokenBucket> bucket;
  1609. class ReceiverThread : public Thread
  1610. {
  1611. RoxieSocketQueueManager &parent;
  1612. public:
  1613. ReceiverThread(RoxieSocketQueueManager &_parent) : parent(_parent), Thread("RoxieSocketQueueManager") {}
  1614. int run()
  1615. {
  1616. // Raise the priority so ibyti's get through in a timely fashion
  1617. #ifdef __linux__
  1618. setLinuxThreadPriority(3);
  1619. #else
  1620. adjustPriority(1);
  1621. #endif
  1622. return parent.run();
  1623. }
  1624. } readThread;
  1625. public:
  1626. RoxieSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), readThread(*this)
  1627. {
  1628. int udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
  1629. int udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
  1630. int udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
  1631. #ifdef _DEBUG
  1632. bool udpResendEnabled = topology->getPropBool("@udpResendEnabled", false);
  1633. #else
  1634. bool udpResendEnabled = false; // As long as it is known to be broken, we don't want it accidentally enabled in any release version
  1635. #endif
  1636. maxPacketSize = multicastSocket->get_max_send_size();
  1637. if ((maxPacketSize==0)||(maxPacketSize>65535))
  1638. maxPacketSize = 65535;
  1639. if (topology->getPropInt("@sendMaxRate", 0))
  1640. {
  1641. unsigned sendMaxRate = topology->getPropInt("@sendMaxRate");
  1642. unsigned sendMaxRatePeriod = topology->getPropInt("@sendMaxRatePeriod", 1);
  1643. bucket.setown(new TokenBucket(sendMaxRate, sendMaxRatePeriod, sendMaxRate));
  1644. throttledPacketSendManager.setown(new RoxieThrottledPacketSender(*bucket, maxPacketSize));
  1645. }
  1646. IpAddress snifferIp;
  1647. getChannelIp(snifferIp, snifferChannel);
  1648. if (udpMaxSlotsPerClient > udpQueueSize)
  1649. udpMaxSlotsPerClient = udpQueueSize;
  1650. unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
  1651. unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
  1652. unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
  1653. unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
  1654. receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient, myNodeIndex));
  1655. sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, udpResendEnabled ? udpMaxSlotsPerClient : 0, bucket, myNodeIndex));
  1656. running = false;
  1657. }
  1658. CriticalSection crit;
  1659. Owned<RoxieThrottledPacketSender> throttledPacketSendManager;
  1660. virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
  1661. {
  1662. if (throttledPacketSendManager)
  1663. throttledPacketSendManager->sendPacket(x, logctx);
  1664. else
  1665. {
  1666. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendPacket");
  1667. RoxiePacketHeader &header = x->queryHeader();
  1668. unsigned length = x->queryHeader().packetlength;
  1669. assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
  1670. StringBuffer s;
  1671. switch (header.retries & ROXIE_RETRIES_MASK)
  1672. {
  1673. case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
  1674. logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
  1675. break;
  1676. default:
  1677. logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
  1678. break;
  1679. case 0:
  1680. if (logctx.queryTraceLevel() > 8)
  1681. logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
  1682. break;
  1683. }
  1684. // MORE - crashes have been observed after exceptions here - mechanism not yet clear nor reproducible
  1685. if (length > maxPacketSize)
  1686. {
  1687. StringBuffer s;
  1688. throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
  1689. }
  1690. CriticalBlock c(crit); // is this needed or was it just protecting multicast array? prevent interleaving?
  1691. if (channelWrite(header.channel, &header, length) != length)
  1692. logctx.CTXLOG("multicast write wrote too little");
  1693. atomic_inc(&packetsSent);
  1694. }
  1695. }
  1696. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1697. {
  1698. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendIbyti");
  1699. RoxiePacketHeader ibytiHeader(header, header.activityId & ROXIE_PRIORITY_MASK);
  1700. if (logctx.queryTraceLevel() > 8)
  1701. {
  1702. StringBuffer s; logctx.CTXLOG("Sending IBYTI packet %s", ibytiHeader.toString(s).str());
  1703. }
  1704. CriticalBlock c(crit); // Not sure we really need this? Preventing interleave on writes? Should sock manage it?
  1705. if (channelWrite(header.channel, &ibytiHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
  1706. logctx.CTXLOG("sendIbyti wrote too little");
  1707. atomic_inc(&ibytiPacketsSent);
  1708. }
  1709. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  1710. {
  1711. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbort");
  1712. RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK);
  1713. abortHeader.retries = QUERY_ABORTED;
  1714. if (logctx.queryTraceLevel() > 8)
  1715. {
  1716. StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
  1717. }
  1718. CriticalBlock c(crit); // Not sure we really need this? Preventing interleave on writes? Should sock manage it?
  1719. if (channelWrite(header.channel, &abortHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
  1720. logctx.CTXLOG("sendAbort wrote too little");
  1721. atomic_inc(&abortsSent);
  1722. }
  1723. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx)
  1724. {
  1725. MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbortCallback");
  1726. RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK);
  1727. abortHeader.retries = QUERY_ABORTED;
  1728. MemoryBuffer data;
  1729. data.append(sizeof(abortHeader), &abortHeader).append(lfn);
  1730. if (logctx.queryTraceLevel() > 5)
  1731. {
  1732. StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
  1733. }
  1734. CriticalBlock c(crit); // Not sure we really need this? Preventing interleave on writes? Should sock manage it?
  1735. if (channelWrite(header.channel, data.toByteArray(), data.length()) != data.length())
  1736. logctx.CTXLOG("tr->write wrote too little");
  1737. atomic_inc(&abortsSent);
  1738. }
  1739. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
  1740. {
  1741. unsigned qnum = outOfBand ? 0 : ((header.retries & ROXIE_FASTLANE) || !fastLaneQueue) ? 1 : 2;
  1742. if (logctx.queryTraceLevel() > 8)
  1743. {
  1744. StringBuffer s; logctx.CTXLOG("Creating Output Stream for reply packet on Q=%d - %s", qnum, header.toString(s).str());
  1745. }
  1746. return sendManager->createMessagePacker(header.uid, header.getSequenceId(), &header, sizeof(RoxiePacketHeader), header.serverIdx, qnum);
  1747. }
  1748. virtual bool replyPending(RoxiePacketHeader &header)
  1749. {
  1750. return sendManager->dataQueued(header.uid, header.getSequenceId(), header.serverIdx);
  1751. }
  1752. virtual bool abortCompleted(RoxiePacketHeader &header)
  1753. {
  1754. return sendManager->abortData(header.uid, header.getSequenceId(), header.serverIdx);
  1755. }
  1756. bool abortRunning(RoxiePacketHeader &header, IThreadPool *workers, bool checkRank, bool &preActivity)
  1757. {
  1758. bool queryFound = false;
  1759. bool ret = false;
  1760. Owned<IPooledThreadIterator> wi = workers->running();
  1761. ForEach(*wi)
  1762. {
  1763. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1764. if (w.checkAbort(header, checkRank, queryFound, preActivity))
  1765. {
  1766. ret = true;
  1767. break;
  1768. }
  1769. else if (queryFound)
  1770. {
  1771. ret = false;
  1772. break;
  1773. }
  1774. }
  1775. if (!checkRank)
  1776. {
  1777. if (traceLevel > 8)
  1778. DBGLOG("discarding data for aborted query");
  1779. ROQ->abortCompleted(header);
  1780. }
  1781. return ret;
  1782. }
  1783. void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue, IThreadPool *workers)
  1784. {
  1785. assertex(!localSlave);
  1786. atomic_inc(&ibytiPacketsReceived);
  1787. bool preActivity = false;
  1788. if (traceLevel > 10)
  1789. {
  1790. IpAddress peer;
  1791. StringBuffer s, s1;
  1792. multicastSocket->getPeerAddress(peer).getIpText(s);
  1793. header.toString(s1);
  1794. DBGLOG("doIBYTI %s from %s", s1.str(), s.str());
  1795. DBGLOG("header.retries=%x header.getSubChannelMask(header.channel)=%x", header.retries, header.getSubChannelMask(header.channel));
  1796. }
  1797. if (header.retries == QUERY_ABORTED)
  1798. {
  1799. abortRunning(header, workers, false, preActivity);
  1800. queue.remove(header);
  1801. if (traceLevel > 10)
  1802. {
  1803. StringBuffer s;
  1804. DBGLOG("Abort activity %s", header.toString(s).str());
  1805. }
  1806. }
  1807. else
  1808. {
  1809. if ((header.retries & ROXIE_RETRIES_MASK) == header.getSubChannelMask(header.channel))
  1810. {
  1811. if (traceLevel > 10)
  1812. DBGLOG("doIBYTI packet was from self");
  1813. atomic_inc(&ibytiPacketsFromSelf);
  1814. }
  1815. else
  1816. {
  1817. resetIbytiDelay(header.channel);
  1818. bool foundInQ;
  1819. {
  1820. CriticalBlock b(queueCrit);
  1821. foundInQ = queue.remove(header);
  1822. }
  1823. if (foundInQ) {
  1824. if (traceLevel > 10)
  1825. {
  1826. StringBuffer s;
  1827. DBGLOG("Removed activity from Q : %s", header.toString(s).str());
  1828. }
  1829. atomic_inc(&ibytiPacketsWorked);
  1830. return;
  1831. }
  1832. if (abortRunning(header, workers, true, preActivity))
  1833. {
  1834. if (preActivity)
  1835. atomic_inc(&ibytiPacketsWorked); // MORE - may want to have a diff counter for this (not in queue but in IBYTI wait or before)
  1836. else
  1837. atomic_inc(&ibytiPacketsHalfWorked);
  1838. return;
  1839. }
  1840. if (traceLevel > 10)
  1841. DBGLOG("doIBYTI packet was too late");
  1842. atomic_inc(&ibytiPacketsTooLate); // meaning either I started and reserve the right to finish, or I finished already
  1843. }
  1844. }
  1845. }
  1846. void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue, IThreadPool *workers)
  1847. {
  1848. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  1849. // DO NOT put tracing on this thread except at very high tracelevels!
  1850. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  1851. {
  1852. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  1853. if (traceLevel > 10)
  1854. {
  1855. StringBuffer s;
  1856. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  1857. }
  1858. doFileCallback(packet);
  1859. }
  1860. else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
  1861. doIbyti(header, queue, workers); // MORE - check how fast this is!
  1862. else
  1863. {
  1864. Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
  1865. SlaveContextLogger logctx(packet);
  1866. unsigned retries = header.thisChannelRetries();
  1867. if (retries)
  1868. {
  1869. // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
  1870. assertex(header.channel); // should never see a retry on channel 0
  1871. if (retries >= SUBCHANNEL_MASK)
  1872. return; // someone sent a failure or something - ignore it
  1873. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  1874. if (!(testSlaveFailure & 0x800))
  1875. {
  1876. RoxiePacketHeader newHeader(header, ROXIE_ALIVE);
  1877. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1878. output->flush(true);
  1879. }
  1880. // If it's a retry, look it up against already running, or output stream, or input queue
  1881. // if found, send an IBYTI and discard retry request
  1882. bool primChannel = true;
  1883. if (subChannels[header.channel] != 1) primChannel = false;
  1884. if (primChannel)
  1885. atomic_inc(&retriesReceivedPrm);
  1886. else
  1887. atomic_inc(&retriesReceivedSec);
  1888. bool alreadyRunning = false;
  1889. Owned<IPooledThreadIterator> wi = workers->running();
  1890. ForEach(*wi)
  1891. {
  1892. CRoxieWorker &w = (CRoxieWorker &) wi->query();
  1893. if (w.match(header))
  1894. {
  1895. alreadyRunning = true;
  1896. if (primChannel)
  1897. atomic_inc(&retriesIgnoredPrm);
  1898. else
  1899. atomic_inc(&retriesIgnoredSec);
  1900. ROQ->sendIbyti(header, logctx);
  1901. if (logctx.queryTraceLevel() > 10)
  1902. {
  1903. StringBuffer xx; logctx.CTXLOG("Ignored retry on %s channel for running activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  1904. }
  1905. break;
  1906. }
  1907. }
  1908. if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
  1909. {
  1910. alreadyRunning = true;
  1911. if (primChannel)
  1912. atomic_inc(&retriesIgnoredPrm);
  1913. else
  1914. atomic_inc(&retriesIgnoredSec);
  1915. ROQ->sendIbyti(header, logctx);
  1916. if (logctx.queryTraceLevel() > 10)
  1917. {
  1918. StringBuffer xx; logctx.CTXLOG("Ignored retry on %s channel for completed activity %s", primChannel?"primary":"secondary", header.toString(xx).str());
  1919. }
  1920. }
  1921. if (!alreadyRunning)
  1922. {
  1923. if (logctx.queryTraceLevel() > 10)
  1924. {
  1925. StringBuffer xx; logctx.CTXLOG("Retry %d received on %s channel for %s", retries+1, primChannel?"primary":"secondary", header.toString(xx).str());
  1926. }
  1927. queue.enqueueUnique(packet.getClear());
  1928. }
  1929. }
  1930. else // first time (not a retry).
  1931. {
  1932. if (header.channel)
  1933. {
  1934. queue.enqueue(packet.getClear());
  1935. }
  1936. else
  1937. {
  1938. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  1939. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  1940. // Unfortunately this is bad news for dropping packets
  1941. for (unsigned i = 1; i < channelCount; i++)
  1942. queue.enqueue(packet->clonePacket(channels[i]));
  1943. header.channel = channels[0];
  1944. queue.enqueue(packet.getClear());
  1945. }
  1946. }
  1947. }
  1948. }
  1949. int run()
  1950. {
  1951. if (traceLevel)
  1952. DBGLOG("RoxieSocketQueueManager::run() starting: doIbytiDelay=%s minIbytiDelay=%u initIbytiDelay=%u",
  1953. doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay);
  1954. for (;;)
  1955. {
  1956. MemoryBuffer mb;
  1957. try
  1958. {
  1959. // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
  1960. // DO NOT put tracing on this thread except at very high tracelevels!
  1961. unsigned l;
  1962. multicastSocket->read(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, 5);
  1963. mb.setLength(l);
  1964. atomic_inc(&packetsReceived);
  1965. RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
  1966. if (l != header.packetlength)
  1967. DBGLOG("sock->read returned %d but packetlength was %d", l, header.packetlength);
  1968. if (traceLevel > 10)
  1969. {
  1970. StringBuffer s;
  1971. DBGLOG("Read from multicast: %s", header.toString(s).str());
  1972. }
  1973. #ifdef ROXIE_SLA_LOGIC
  1974. if (header.activityId & ROXIE_SLA_PRIORITY)
  1975. processMessage(mb, header, slaQueue, slaWorkers);
  1976. else
  1977. #endif
  1978. if (header.activityId & ROXIE_HIGH_PRIORITY)
  1979. processMessage(mb, header, hiQueue, hiWorkers);
  1980. else
  1981. processMessage(mb, header, loQueue, loWorkers);
  1982. }
  1983. catch (IException *E)
  1984. {
  1985. if (running)
  1986. {
  1987. // MORE: Maybe we should utilize IException::errorCode - not just text ??
  1988. if (E->errorCode()==JSOCKERR_timeout_expired)
  1989. E->Release();
  1990. else if (roxiemem::memPoolExhausted())
  1991. {
  1992. //MORE: I think this should probably be based on the error code instead.
  1993. EXCLOG(E, "Exception reading or processing multicast msg");
  1994. E->Release();
  1995. MilliSleep(1000); // Give a chance for mem free
  1996. }
  1997. else
  1998. {
  1999. EXCLOG(E, "Exception reading or processing multicast msg");
  2000. E->Release();
  2001. // MORE: Protect with try logic, in case udp_create throws exception ?
  2002. // What to do if create fails (ie exception is caught) ?
  2003. if (multicastSocket)
  2004. {
  2005. multicastSocket->close();
  2006. multicastSocket.clear();
  2007. openMulticastSocket();
  2008. }
  2009. }
  2010. }
  2011. else
  2012. {
  2013. E->Release();
  2014. break;
  2015. }
  2016. }
  2017. }
  2018. return 0;
  2019. }
  2020. void start()
  2021. {
  2022. RoxieReceiverBase::start();
  2023. running = true;
  2024. readThread.start();
  2025. }
  2026. void stop()
  2027. {
  2028. if (running)
  2029. {
  2030. running = false;
  2031. multicastSocket->close();
  2032. }
  2033. RoxieReceiverBase::stop();
  2034. }
  2035. void join()
  2036. {
  2037. readThread.join();
  2038. RoxieReceiverBase::join();
  2039. }
  2040. virtual IReceiveManager *queryReceiveManager()
  2041. {
  2042. return receiveManager;
  2043. }
  2044. };
  2045. #ifdef _MSC_VER
  2046. #pragma warning( pop )
  2047. #endif
  2048. //==================================================================================================
  2049. interface ILocalMessageCollator : extends IMessageCollator
  2050. {
  2051. virtual void enqueueMessage(void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0;
  2052. };
  2053. interface ILocalReceiveManager : extends IReceiveManager
  2054. {
  2055. virtual ILocalMessageCollator *lookupCollator(ruid_t id) = 0;
  2056. };
  2057. class LocalMessagePacker : public CDummyMessagePacker
  2058. {
  2059. MemoryBuffer meta;
  2060. MemoryBuffer header;
  2061. Linked<ILocalReceiveManager> rm;
  2062. ruid_t id;
  2063. public:
  2064. IMPLEMENT_IINTERFACE;
  2065. LocalMessagePacker(RoxiePacketHeader &_header, ILocalReceiveManager *_rm) : rm(_rm)
  2066. {
  2067. id = _header.uid;
  2068. header.append(sizeof(RoxiePacketHeader), &_header);
  2069. }
  2070. virtual void flush(bool last_message);
  2071. virtual void sendMetaInfo(const void *buf, unsigned len)
  2072. {
  2073. meta.append(len, buf);
  2074. }
  2075. };
  2076. class CLocalMessageUnpackCursor : public CInterface, implements IMessageUnpackCursor
  2077. {
  2078. void *data;
  2079. unsigned datalen;
  2080. unsigned pos;
  2081. Linked<IRowManager> rowManager;
  2082. public:
  2083. IMPLEMENT_IINTERFACE;
  2084. CLocalMessageUnpackCursor(IRowManager *_rowManager, void *_data, unsigned _datalen)
  2085. : rowManager(_rowManager)
  2086. {
  2087. datalen = _datalen;
  2088. data = _data;
  2089. pos = 0;
  2090. }
  2091. ~CLocalMessageUnpackCursor()
  2092. {
  2093. }
  2094. virtual bool atEOF() const
  2095. {
  2096. return datalen==pos;
  2097. }
  2098. virtual bool isSerialized() const
  2099. {
  2100. // NOTE: tempting to think that we could avoid serializing in localSlave case, but have to be careful about the lifespan of the rowManager...
  2101. return true;
  2102. }
  2103. virtual const void * getNext(int length)
  2104. {
  2105. if (pos==datalen)
  2106. return NULL;
  2107. assertex(pos + length <= datalen);
  2108. void * cur = ((char *) data) + pos;
  2109. pos += length;
  2110. void * ret = rowManager->allocate(length, 0);
  2111. memcpy(ret, cur, length);
  2112. //No need for finalize since only contains plain data.
  2113. return ret;
  2114. }
  2115. };
  2116. class CLocalMessageResult : public CInterface, implements IMessageResult
  2117. {
  2118. void *data;
  2119. void *meta;
  2120. void *header;
  2121. unsigned datalen, metalen, headerlen;
  2122. unsigned pos;
  2123. public:
  2124. IMPLEMENT_IINTERFACE;
  2125. CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen)
  2126. {
  2127. datalen = _datalen;
  2128. metalen = _metalen;
  2129. headerlen = _headerlen;
  2130. data = _data;
  2131. meta = _meta;
  2132. header = _header;
  2133. pos = 0;
  2134. }
  2135. ~CLocalMessageResult()
  2136. {
  2137. free(data);
  2138. free(meta);
  2139. free(header);
  2140. }
  2141. virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
  2142. {
  2143. return new CLocalMessageUnpackCursor(rowMgr, data, datalen);
  2144. }
  2145. virtual const void *getMessageHeader(unsigned &length) const
  2146. {
  2147. length = headerlen;
  2148. return header;
  2149. }
  2150. virtual const void *getMessageMetadata(unsigned &length) const
  2151. {
  2152. length = metalen;
  2153. return meta;
  2154. }
  2155. virtual void discard() const
  2156. {
  2157. }
  2158. };
  2159. class CLocalMessageCollator : public CInterface, implements ILocalMessageCollator
  2160. {
  2161. InterruptableSemaphore sem;
  2162. QueueOf<IMessageResult, false> pending;
  2163. CriticalSection crit;
  2164. Linked<IRowManager> rowManager;
  2165. Linked<ILocalReceiveManager> receiveManager;
  2166. ruid_t id;
  2167. unsigned totalBytesReceived;
  2168. public:
  2169. IMPLEMENT_IINTERFACE;
  2170. CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid);
  2171. ~CLocalMessageCollator();
  2172. virtual ruid_t queryRUID() const
  2173. {
  2174. return id;
  2175. }
  2176. virtual bool add_package(DataBuffer *dataBuff)
  2177. {
  2178. throwUnexpected(); // internal use in udp layer...
  2179. }
  2180. virtual IMessageResult* getNextResult(unsigned time_out, bool &anyActivity)
  2181. {
  2182. anyActivity = false;
  2183. if (!sem.wait(time_out))
  2184. return NULL;
  2185. anyActivity = true;
  2186. CriticalBlock c(crit);
  2187. return pending.dequeue();
  2188. }
  2189. virtual void interrupt(IException *E)
  2190. {
  2191. sem.interrupt(E);
  2192. }
  2193. virtual void enqueueMessage(void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen)
  2194. {
  2195. CriticalBlock c(crit);
  2196. pending.enqueue(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
  2197. sem.signal();
  2198. totalBytesReceived += datalen + metalen + headerlen;
  2199. }
  2200. virtual unsigned queryBytesReceived() const
  2201. {
  2202. return totalBytesReceived;
  2203. }
  2204. };
  2205. class RoxieLocalReceiveManager : public CInterface, implements ILocalReceiveManager
  2206. {
  2207. MapXToMyClass<ruid_t, ruid_t, ILocalMessageCollator> collators;
  2208. Owned<IRowManager> rowManager;
  2209. CriticalSection crit;
  2210. Owned<StringContextLogger> logctx;
  2211. Linked<IMessageCollator> defaultCollator;
  2212. public:
  2213. IMPLEMENT_IINTERFACE;
  2214. RoxieLocalReceiveManager() : logctx(new StringContextLogger("RoxieLocalReceiveManager"))
  2215. {
  2216. }
  2217. virtual IMessageCollator *createMessageCollator(IRowManager *manager, ruid_t ruid)
  2218. {
  2219. CriticalBlock b(crit);
  2220. if (!rowManager)
  2221. rowManager.setown(roxiemem::createRowManager(0, NULL, *logctx, NULL, false)); // MORE - should not really use default limits
  2222. IMessageCollator *collator = new CLocalMessageCollator(rowManager, ruid); // MORE - is this right - why two rowManagers and why pass this one (not the other) ?
  2223. collators.setValue(ruid, collator);
  2224. return collator;
  2225. }
  2226. virtual void detachCollator(const IMessageCollator *collator)
  2227. {
  2228. ruid_t id = collator->queryRUID();
  2229. CriticalBlock b(crit);
  2230. collators.setValue(id, NULL);
  2231. }
  2232. virtual void setDefaultCollator(IMessageCollator *collator)
  2233. {
  2234. CriticalBlock b(crit);
  2235. defaultCollator.set(collator);
  2236. }
  2237. virtual ILocalMessageCollator *lookupCollator(ruid_t id)
  2238. {
  2239. CriticalBlock b(crit);
  2240. IMessageCollator *ret = collators.getValue(id);
  2241. if (!ret)
  2242. ret = defaultCollator;
  2243. return QUERYINTERFACE(ret, ILocalMessageCollator);
  2244. }
  2245. };
  2246. void LocalMessagePacker::flush(bool last_message)
  2247. {
  2248. data.setLength(lastput);
  2249. if (last_message)
  2250. {
  2251. ILocalMessageCollator *collator = rm->lookupCollator(id);
  2252. if (collator)
  2253. {
  2254. unsigned datalen = data.length();
  2255. unsigned metalen = meta.length();
  2256. unsigned headerlen = header.length();
  2257. collator->enqueueMessage(data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen);
  2258. }
  2259. // otherwise Roxie server is no longer interested and we can simply discard
  2260. }
  2261. }
  2262. CLocalMessageCollator::CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid)
  2263. : rowManager(_rowManager), id(_ruid)
  2264. {
  2265. id = 0;
  2266. totalBytesReceived = 0;
  2267. }
  2268. CLocalMessageCollator::~CLocalMessageCollator()
  2269. {
  2270. IMessageResult *goer;
  2271. loop
  2272. {
  2273. goer = pending.dequeue();
  2274. if (!goer)
  2275. break;
  2276. goer->Release();
  2277. }
  2278. }
  2279. class RoxieLocalQueueManager : public RoxieReceiverBase
  2280. {
  2281. Linked<ISendManager> sendManager;
  2282. Linked<RoxieLocalReceiveManager> receiveManager;
  2283. public:
  2284. RoxieLocalQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieReceiverBase(_numWorkers)
  2285. {
  2286. receiveManager.setown(new RoxieLocalReceiveManager);
  2287. }
  2288. virtual bool suspendChannel(unsigned channel, bool suspend, const IRoxieContextLogger &logctx)
  2289. {
  2290. if (suspend)
  2291. UNIMPLEMENTED;
  2292. return false;
  2293. }
  2294. virtual void sendPacket(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
  2295. {
  2296. RoxiePacketHeader &header = packet->queryHeader();
  2297. unsigned retries = header.thisChannelRetries();
  2298. if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
  2299. {
  2300. if (traceLevel > 5)
  2301. {
  2302. StringBuffer s;
  2303. DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
  2304. }
  2305. doFileCallback(packet);
  2306. }
  2307. else if (retries < SUBCHANNEL_MASK)
  2308. {
  2309. if (retries)
  2310. {
  2311. // Send back an out-of-band immediately, to let Roxie server know that channel is still active
  2312. RoxiePacketHeader newHeader(header, ROXIE_ALIVE);
  2313. Owned<IMessagePacker> output = createOutputStream(newHeader, true, logctx);
  2314. output->flush(true);
  2315. }
  2316. RoxieQueue *targetQueue;
  2317. #ifdef ROXIE_SLA_LOGIC
  2318. if (header.activityId & ROXIE_SLA_PRIORITY)
  2319. targetQueue = &slaQueue;
  2320. else
  2321. #endif
  2322. if (header.activityId & ROXIE_HIGH_PRIORITY)
  2323. targetQueue = &hiQueue;
  2324. else
  2325. targetQueue = &loQueue;
  2326. if (header.channel)
  2327. {
  2328. targetQueue->enqueue(LINK(packet));
  2329. }
  2330. else
  2331. {
  2332. // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
  2333. // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
  2334. for (unsigned i = 0; i < channelCount; i++)
  2335. {
  2336. targetQueue->enqueue(packet->clonePacket(channels[i]));
  2337. }
  2338. }
  2339. }
  2340. }
  2341. virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  2342. {
  2343. // Don't do IBYTI's when local slave - no buddy to talk to anyway
  2344. }
  2345. virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
  2346. {
  2347. // MORE - should really have some code here? - no one to alert about the abort
  2348. //UNIMPLEMENTED;
  2349. }
  2350. virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx)
  2351. {
  2352. // MORE - should really have some code here
  2353. //UNIMPLEMENTED;
  2354. }
  2355. virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
  2356. {
  2357. return new LocalMessagePacker(header, receiveManager);
  2358. }
  2359. virtual IReceiveManager *queryReceiveManager()
  2360. {
  2361. return receiveManager;
  2362. }
  2363. virtual bool replyPending(RoxiePacketHeader &header)
  2364. {
  2365. // MORE - should really have some code here!
  2366. return false;
  2367. }
  2368. virtual bool abortCompleted(RoxiePacketHeader &header)
  2369. {
  2370. // MORE - should really have some code here!
  2371. return false;
  2372. }
  2373. };
  2374. IRoxieOutputQueueManager *ROQ;
  2375. extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers)
  2376. {
  2377. if (localSlave)
  2378. return new RoxieLocalQueueManager(snifferChannel, numWorkers);
  2379. else
  2380. return new RoxieSocketQueueManager(snifferChannel, numWorkers);
  2381. }
  2382. //================================================================================================================================
  2383. class PacketDiscarder : public Thread, implements IPacketDiscarder
  2384. {
  2385. bool aborted;
  2386. Owned<IRowManager> rowManager; // not completely sure I need one... maybe I do
  2387. Owned<IMessageCollator> mc;
  2388. public:
  2389. IMPLEMENT_IINTERFACE;
  2390. PacketDiscarder()
  2391. {
  2392. aborted = false;
  2393. };
  2394. ~PacketDiscarder()
  2395. {
  2396. if (mc)
  2397. ROQ->queryReceiveManager()->detachCollator(mc);
  2398. mc.clear();
  2399. }
  2400. virtual int run()
  2401. {
  2402. Owned<StringContextLogger> logctx = new StringContextLogger("PacketDiscarder");
  2403. rowManager.setown(roxiemem::createRowManager(1, NULL, *logctx, NULL));
  2404. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  2405. ROQ->queryReceiveManager()->setDefaultCollator(mc);
  2406. while (!aborted)
  2407. {
  2408. bool anyActivity = false;
  2409. Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
  2410. if (mr)
  2411. {
  2412. if (traceLevel > 4)
  2413. DBGLOG("Discarding unwanted message");
  2414. unsigned headerLen;
  2415. const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  2416. if (headerLen)
  2417. {
  2418. switch (header.activityId)
  2419. {
  2420. case ROXIE_FILECALLBACK:
  2421. {
  2422. Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
  2423. OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
  2424. if (len)
  2425. {
  2426. RecordLengthType *rowlen = (RecordLengthType *) len.get();
  2427. OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
  2428. const char *rowdata = (const char *) row.get();
  2429. // bool isOpt = * (bool *) rowdata;
  2430. // bool isLocal = * (bool *) (rowdata+1);
  2431. ROQ->sendAbortCallback(header, rowdata+2, *logctx);
  2432. }
  2433. else
  2434. DBGLOG("Unrecognized format in discarded file callback");
  2435. break;
  2436. }
  2437. // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
  2438. }
  2439. }
  2440. else
  2441. DBGLOG("Unwanted message had no header?!");
  2442. }
  2443. else if (!anyActivity)
  2444. {
  2445. // to avoid leaking partial unwanted packets, we clear out mc periodically...
  2446. ROQ->queryReceiveManager()->detachCollator(mc);
  2447. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
  2448. ROQ->queryReceiveManager()->setDefaultCollator(mc);
  2449. }
  2450. }
  2451. return 0;
  2452. }
  2453. virtual void start()
  2454. {
  2455. Thread::start();
  2456. }
  2457. virtual void stop()
  2458. {
  2459. if (mc)
  2460. mc->interrupt();
  2461. aborted = true;
  2462. join();
  2463. }
  2464. };
  2465. IPacketDiscarder *createPacketDiscarder()
  2466. {
  2467. IPacketDiscarder *packetDiscarder = new PacketDiscarder;
  2468. packetDiscarder->start();
  2469. return packetDiscarder;
  2470. }
  2471. //================================================================================================================================
  2472. // There are various possibly interesting ways to reply to a ping:
  2473. // Reply as soon as receive, or put it on the queue like other messages?
  2474. // Reply for every channel, or just once for every slave?
  2475. // Should I send on channel 0 or round-robin the channels?
  2476. // My gut feeling is that knowing what channels are responding is useful so should reply on every unsuspended channel,
  2477. // and that the delay caused by queuing system is an interesting part of what we want to measure (though nice to know minimum possible too)
  2478. unsigned pingInterval = 60;
  2479. class PingTimer : public Thread
  2480. {
  2481. bool aborted;
  2482. Owned<IRowManager> rowManager;
  2483. Owned<IMessageCollator> mc;
  2484. StringContextLogger logctx;
  2485. void sendPing(unsigned priorityMask)
  2486. {
  2487. unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen("PING") + 1 + sizeof(PingRecord);
  2488. void *packetData = malloc(packetSize);
  2489. RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
  2490. RemoteActivityId pingId(ROXIE_PING | priorityMask, 0);
  2491. header->init(pingId, 0, 0, 0);
  2492. char *finger = (char *) (header + 1);
  2493. *finger++ = (char) LOGGING_FLAGSPRESENT;
  2494. strcpy(finger, "PING");
  2495. finger += strlen("PING")+1;
  2496. if (traceLevel > 1)
  2497. DBGLOG("PING sent");
  2498. PingRecord data;
  2499. data.senderIP.ipset(getNodeAddress(myNodeIndex));
  2500. data.tick = usTick();
  2501. memcpy(finger, &data, sizeof(PingRecord));
  2502. Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
  2503. ROQ->sendPacket(packet, logctx);
  2504. }
  2505. public:
  2506. PingTimer() : logctx("PingTimer")
  2507. {
  2508. aborted = false;
  2509. };
  2510. ~PingTimer()
  2511. {
  2512. if (mc)
  2513. ROQ->queryReceiveManager()->detachCollator(mc);
  2514. mc.clear();
  2515. }
  2516. virtual int run()
  2517. {
  2518. rowManager.setown(roxiemem::createRowManager(1, NULL, queryDummyContextLogger(), NULL));
  2519. mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_PING));
  2520. unsigned pingsReceived = 0;
  2521. unsigned pingsElapsed = 0;
  2522. sendPing(ROXIE_HIGH_PRIORITY);
  2523. while (!aborted)
  2524. {
  2525. bool anyActivity = false;
  2526. Owned<IMessageResult> mr = mc->getNextResult(pingInterval*1000, anyActivity);
  2527. if (mr)
  2528. {
  2529. unsigned headerLen;
  2530. const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
  2531. Owned<IMessageUnpackCursor> mu = mr->getCursor(rowManager);
  2532. PingRecord *answer = (PingRecord *) mu->getNext(sizeof(PingRecord));
  2533. if (answer && mu->atEOF() && headerLen==sizeof(RoxiePacketHeader))
  2534. {
  2535. unsigned elapsed = usTick() - answer->tick;
  2536. pingsReceived++;
  2537. pingsElapsed += elapsed;
  2538. if (traceLevel > 10)
  2539. DBGLOG("PING reply channel=%d, time %d", header->channel, elapsed); // DBGLOG is slower than the pings so be careful!
  2540. }
  2541. else
  2542. DBGLOG("PING reply, garbled result");
  2543. ReleaseRoxieRow(answer);
  2544. }
  2545. else if (!anyActivity)
  2546. {
  2547. if (traceLevel)
  2548. DBGLOG("PING: %d replies received, average delay %d", pingsReceived, pingsReceived ? pingsElapsed / pingsReceived : 0);
  2549. pingsReceived = 0;
  2550. pingsElapsed = 0;
  2551. sendPing(ROXIE_HIGH_PRIORITY); // MORE - we could think about alternating the priority or sending pings on high and low at the same time...
  2552. }
  2553. }
  2554. return 0;
  2555. }
  2556. void stop()
  2557. {
  2558. if (mc)
  2559. mc->interrupt();
  2560. aborted = true;
  2561. }
  2562. static CriticalSection crit;
  2563. } *pingTimer;
  2564. CriticalSection PingTimer::crit;
  2565. extern void startPingTimer()
  2566. {
  2567. CriticalBlock b(PingTimer::crit);
  2568. if (!pingTimer)
  2569. {
  2570. pingTimer = new PingTimer();
  2571. pingTimer->start();
  2572. }
  2573. }
  2574. extern void stopPingTimer()
  2575. {
  2576. CriticalBlock b(PingTimer::crit);
  2577. if (pingTimer)
  2578. {
  2579. pingTimer->stop();
  2580. pingTimer->join();
  2581. delete pingTimer;
  2582. pingTimer = NULL;
  2583. }
  2584. }