12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <platform.h>
- #include <jlib.hpp>
- #include <jio.hpp>
- #include <jqueue.tpp>
- #include <jsocket.hpp>
- #include <jlog.hpp>
- #include "jisem.hpp"
- #include "udplib.hpp"
- #include "udptopo.hpp"
- #include "ccd.hpp"
- #include "ccddebug.hpp"
- #include "ccdquery.hpp"
- #include "ccdstate.hpp"
- #include "ccdqueue.ipp"
- #include "ccdsnmp.hpp"
- #ifdef _USE_CPPUNIT
- #include <cppunit/extensions/HelperMacros.h>
- #endif
- using roxiemem::OwnedRoxieRow;
- using roxiemem::OwnedConstRoxieRow;
- using roxiemem::IRowManager;
- using roxiemem::DataBuffer;
- //============================================================================================
- RoxiePacketHeader::RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
- {
- packetlength = sizeof(RoxiePacketHeader);
- #ifdef TIME_PACKETS
- tick = 0;
- #endif
- init(_remoteId, _uid, _channel, _overflowSequence);
- }
- RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _activityId, unsigned subChannel) : serverId(source.serverId)
- {
- // Used to create the header to send a callback to originating server or an IBYTI to a buddy
- activityId = _activityId;
- uid = source.uid;
- queryHash = source.queryHash;
- channel = source.channel;
- overflowSequence = source.overflowSequence;
- continueSequence = source.continueSequence;
- if (_activityId >= ROXIE_ACTIVITY_SPECIAL_FIRST && _activityId <= ROXIE_ACTIVITY_SPECIAL_LAST)
- overflowSequence |= OUTOFBAND_SEQUENCE; // Need to make sure it is not treated as dup of actual reply in the udp layer
- retries = getSubChannelMask(subChannel) | (source.retries & ~ROXIE_RETRIES_MASK);
- #ifdef TIME_PACKETS
- tick = source.tick;
- #endif
- packetlength = sizeof(RoxiePacketHeader);
- }
- unsigned RoxiePacketHeader::getSubChannelMask(unsigned subChannel)
- {
- return SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
- }
- unsigned RoxiePacketHeader::priorityHash() const
- {
- // Used to determine which slave to act as primary and which as secondary for a given packet (thus spreading the load)
- // It's important that we do NOT include channel (since that would result in different values for the different slaves responding to a broadcast)
- // We also don't include continueSequence since we'd prefer continuations to go the same way as original
- unsigned hash = serverId.hash();
- hash = hashc((const unsigned char *) &uid, sizeof(uid), hash);
- hash += overflowSequence; // MORE - is this better than hashing?
- if (traceLevel > 9)
- {
- StringBuffer s;
- DBGLOG("Calculating hash: %s hash was %d", toString(s).str(), hash);
- }
- return hash;
- }
- bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
- {
- // used when matching up a kill packet against a pending one...
- // DO NOT compare activityId - they are not supposed to match, since 0 in activityid identifies ibyti!
- return
- oh.uid==uid &&
- (oh.overflowSequence & ~OUTOFBAND_SEQUENCE) == (overflowSequence & ~OUTOFBAND_SEQUENCE) &&
- oh.continueSequence == continueSequence &&
- oh.serverId==serverId &&
- oh.channel==channel;
- }
- void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence)
- {
- retries = 0;
- activityId = _remoteId.activityId;
- queryHash = _remoteId.queryHash;
- uid = _uid;
- serverId = myNode;
- channel = _channel;
- overflowSequence = _overflowSequence;
- continueSequence = 0;
- }
- StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
- {
- const IpAddress &serverIP = serverId.getNodeAddress();
- ret.appendf("uid=" RUIDF " activityId=", uid);
- switch(activityId & ~ROXIE_PRIORITY_MASK)
- {
- case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
- case ROXIE_PING: ret.append("ROXIE_PING"); break;
- case ROXIE_TRACEINFO: ret.append("ROXIE_TRACEINFO"); break;
- case ROXIE_DEBUGREQUEST: ret.append("ROXIE_DEBUGREQUEST"); break;
- case ROXIE_DEBUGCALLBACK: ret.append("ROXIE_DEBUGCALLBACK"); break;
- case ROXIE_FILECALLBACK: ret.append("ROXIE_FILECALLBACK"); break;
- case ROXIE_ALIVE: ret.append("ROXIE_ALIVE"); break;
- case ROXIE_KEYEDLIMIT_EXCEEDED: ret.append("ROXIE_KEYEDLIMIT_EXCEEDED"); break;
- case ROXIE_LIMIT_EXCEEDED: ret.append("ROXIE_LIMIT_EXCEEDED"); break;
- case ROXIE_EXCEPTION: ret.append("ROXIE_EXCEPTION"); break;
- default:
- ret.appendf("%u", (activityId & ~(ROXIE_ACTIVITY_FETCH | ROXIE_PRIORITY_MASK)));
- if (activityId & ROXIE_ACTIVITY_FETCH)
- ret.appendf(" (fetch part)");
- break;
- }
- ret.append(" pri=");
- switch(activityId & ROXIE_PRIORITY_MASK)
- {
- case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
- case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
- case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
- default: ret.append("???"); break;
- }
- ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
- serverIP.getIpText(ret);
- if (retries)
- {
- if (retries==QUERY_ABORTED)
- ret.append(" retries=QUERY_ABORTED");
- else
- {
- if (retries & ROXIE_RETRIES_MASK)
- ret.appendf(" retries=%04x", retries);
- if (retries & ROXIE_FASTLANE)
- ret.appendf(" FASTLANE");
- if (retries & ROXIE_BROADCAST)
- ret.appendf(" BROADCAST");
- }
- }
- return ret;
- }
- bool RoxiePacketHeader::allChannelsFailed()
- {
- unsigned mask = (1 << (getNumSlaves(channel) * SUBCHANNEL_BITS)) - 1;
- return (retries & mask) == mask;
- }
- bool RoxiePacketHeader::retry()
- {
- bool worthRetrying = false;
- unsigned mask = SUBCHANNEL_MASK;
- unsigned numSlaves = getNumSlaves(channel);
- for (unsigned subChannel = 0; subChannel < numSlaves; subChannel++)
- {
- unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
- if (subRetries != SUBCHANNEL_MASK)
- subRetries++;
- if (subRetries != SUBCHANNEL_MASK)
- worthRetrying = true;
- retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
- mask <<= SUBCHANNEL_BITS;
- }
- return worthRetrying;
- }
- void RoxiePacketHeader::setException(unsigned subChannel)
- {
- retries |= SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
- }
- unsigned RoxiePacketHeader::thisChannelRetries(unsigned subChannel)
- {
- unsigned shift = SUBCHANNEL_BITS * subChannel;
- unsigned mask = SUBCHANNEL_MASK << shift;
- return (retries & mask) >> shift;
- }
- //============================================================================================
- unsigned getReplicationLevel(unsigned channel)
- {
- if (!channel)
- return 0;
- Owned<const ITopologyServer> topology = getTopology();
- return topology->queryChannelInfo(channel).replicationLevel();
- }
- //============================================================================================
- // This function maps a slave number to the multicast ip used to talk to it.
- IpAddress multicastBase("239.1.1.1"); // TBD IPv6 (need IPv6 multicast addresses?
- IpAddress multicastLast("239.1.5.254");
- const IpAddress &getChannelIp(IpAddress &ip, unsigned _channel)
- {
- // need to be careful to avoid the .0's and the .255's (not sure why...)
- ip = multicastBase;
- if (!ip.ipincrement(_channel,1,254,1,0xffff)
- ||(ip.ipcompare(multicastLast)>0))
- throw MakeStringException(ROXIE_MULTICAST_ERROR, "Out-of-range multicast channel %d", _channel);
- return ip;
- }
- static Owned<ISocket> multicastSocket;
- void joinMulticastChannel(unsigned channel)
- {
- IpAddress multicastIp;
- getChannelIp(multicastIp, channel);
- SocketEndpoint ep(ccdMulticastPort, multicastIp);
- StringBuffer epStr;
- ep.getUrlStr(epStr);
- if (!multicastSocket->join_multicast_group(ep))
- throw MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to join multicast channel %d (%s)", channel, epStr.str());
- if (traceLevel)
- DBGLOG("Joined multicast channel %d (%s)", channel, epStr.str());
- }
- static SocketEndpointArray multicastEndpoints; // indexed by channel
- void setMulticastEndpoints(unsigned numChannels)
- {
- for (unsigned channel = 0; channel <= numChannels; channel++) // NOTE - channel 0 is special, and numChannels does not include it
- {
- IpAddress multicastIp;
- getChannelIp(multicastIp, channel);
- multicastEndpoints.append(SocketEndpoint(ccdMulticastPort, multicastIp));
- }
- }
- void openMulticastSocket()
- {
- if (!multicastSocket)
- {
- multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
- if (multicastTTL)
- {
- multicastSocket->set_ttl(multicastTTL);
- DBGLOG("Roxie: multicastTTL: %u", multicastTTL);
- }
- else
- DBGLOG("Roxie: multicastTTL not set");
- multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
- size32_t actualSize = multicastSocket->get_receive_buffer_size();
- if (actualSize < udpMulticastBufferSize)
- {
- DBGLOG("Roxie: multicast socket buffer size could not be set (requested=%d actual %d", udpMulticastBufferSize, actualSize);
- throwUnexpected();
- }
- if (traceLevel)
- DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", ccdMulticastPort, udpMulticastBufferSize, actualSize);
- if (roxieMulticastEnabled && !localSlave)
- {
- Owned<const ITopologyServer> topology = getTopology();
- for (unsigned channel : topology->queryChannels())
- {
- assertex(channel);
- joinMulticastChannel(channel);
- }
- joinMulticastChannel(0); // all slaves also listen on channel 0
- }
- }
- }
- void closeMulticastSockets()
- {
- multicastSocket.clear();
- }
- size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
- {
- size32_t minwrote = 0;
- if (roxieMulticastEnabled)
- {
- return multicastSocket->udp_write_to(multicastEndpoints.item(channel), buf, size);
- }
- else
- {
- Owned<const ITopologyServer> topo = getTopology();
- const SocketEndpointArray &eps = topo->querySlaves(channel);
- if (!eps.ordinality())
- throw makeStringExceptionV(0, "No slaves available for channel %d", channel);
- ForEachItemIn(idx, eps)
- {
- size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), buf, size);
- if (!idx || wrote < minwrote)
- minwrote = wrote;
- }
- }
- return minwrote;
- }
- //============================================================================================
- class CRoxieQueryPacket : implements IRoxieQueryPacket, public CInterface
- {
- protected:
- RoxiePacketHeader *data;
- const byte *continuationData;
- unsigned continuationLength;
- const byte *smartStepInfoData;
- unsigned smartStepInfoLength;
- const byte *contextData;
- unsigned contextLength;
- const byte *traceInfo;
- unsigned traceLength;
-
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieQueryPacket(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
- {
- assertex(lengthRemaining >= (int) sizeof(RoxiePacketHeader));
- data->packetlength = lengthRemaining;
- const byte *finger = (const byte *) (data + 1);
- lengthRemaining -= sizeof(RoxiePacketHeader);
- if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
- {
- continuationData = NULL;
- continuationLength = 0;
- smartStepInfoData = NULL;
- smartStepInfoLength = 0;
- traceInfo = NULL;
- traceLength = 0;
- }
- else
- {
- if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
- {
- assertex(lengthRemaining >= (int) sizeof(unsigned short));
- continuationLength = *(unsigned short *) finger;
- continuationData = finger + sizeof(unsigned short);
- finger = continuationData + continuationLength;
- lengthRemaining -= continuationLength + sizeof(unsigned short);
- }
- else
- {
- continuationData = NULL;
- continuationLength = 0;
- }
- if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
- {
- assertex(lengthRemaining >= (int) sizeof(unsigned short));
- smartStepInfoLength = *(unsigned short *) finger;
- smartStepInfoData = finger + sizeof(unsigned short);
- finger = smartStepInfoData + smartStepInfoLength;
- lengthRemaining -= smartStepInfoLength + sizeof(unsigned short);
- }
- else
- {
- smartStepInfoData = NULL;
- smartStepInfoLength = 0;
- }
- assertex(lengthRemaining > 1);
- traceInfo = finger;
- lengthRemaining--;
- if (*finger++ & LOGGING_DEBUGGERACTIVE)
- {
- assertex(lengthRemaining >= (int) sizeof(unsigned short));
- unsigned short debugLen = *(unsigned short *) finger;
- finger += debugLen + sizeof(unsigned short);
- lengthRemaining -= debugLen + sizeof(unsigned short);
- }
- for (;;)
- {
- assertex(lengthRemaining>0);
- if (!*finger)
- {
- lengthRemaining--;
- finger++;
- break;
- }
- lengthRemaining--;
- finger++;
- }
- traceLength = finger - traceInfo;
- }
- assertex(lengthRemaining >= 0);
- contextData = finger;
- contextLength = lengthRemaining;
- }
- ~CRoxieQueryPacket()
- {
- free(data);
- }
- virtual RoxiePacketHeader &queryHeader() const
- {
- return *data;
- }
- virtual const void *queryContinuationData() const
- {
- return continuationData;
- }
- virtual unsigned getContinuationLength() const
- {
- return continuationLength;
- }
- virtual const byte *querySmartStepInfoData() const
- {
- return smartStepInfoData;
- }
- virtual unsigned getSmartStepInfoLength() const
- {
- return smartStepInfoLength;
- }
- virtual const byte *queryTraceInfo() const
- {
- return traceInfo;
- }
- virtual unsigned getTraceLength() const
- {
- return traceLength;
- }
- virtual const void *queryContextData() const
- {
- return contextData;
- }
- virtual unsigned getContextLength() const
- {
- return contextLength;
- }
- virtual IRoxieQueryPacket *clonePacket(unsigned channel) const
- {
- unsigned length = data->packetlength;
- RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
- memcpy(newdata, data, length);
- newdata->channel = channel;
- newdata->retries |= ROXIE_BROADCAST;
- return createRoxiePacket(newdata, length);
- }
- virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const
- {
- assertex((data->continueSequence & CONTINUE_SEQUENCE_SKIPTO) == 0); // Should not already be any skipto info in the source packet
- unsigned newDataSize = data->packetlength + sizeof(unsigned short) + skipDataLen;
- char *newdata = (char *) malloc(newDataSize);
- unsigned headSize = sizeof(RoxiePacketHeader);
- if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
- headSize += sizeof(unsigned short) + continuationLength;
- memcpy(newdata, data, headSize); // copy in leading part of old data
- ((RoxiePacketHeader *) newdata)->continueSequence |= CONTINUE_SEQUENCE_SKIPTO; // set flag indicating new data is present
- *(unsigned short *) (newdata + headSize) = skipDataLen; // add length field for new data
- memcpy(newdata + headSize + sizeof(unsigned short), skipData, skipDataLen); // copy in new data
- memcpy(newdata + headSize + sizeof(unsigned short) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
- return createRoxiePacket(newdata, newDataSize);
- }
- virtual unsigned hash() const
- {
- // This is used for Roxie server-side caching. The hash includes some of the header and all of the payload.
- unsigned hash = 0;
- if (continuationLength)
- hash = hashc((const unsigned char *) continuationData, continuationLength, hash);
- if (smartStepInfoLength)
- hash = hashc((const unsigned char *) smartStepInfoData, smartStepInfoLength, hash);
- // NOTE - don't hash the trace info!
- hash = hashc((const unsigned char *) contextData, contextLength, hash);
- hash = hashc((const unsigned char *) &data->channel, sizeof(data->channel), hash);
- hash = hashc((const unsigned char *) &data->overflowSequence, sizeof(data->overflowSequence), hash);
- hash = hashc((const unsigned char *) &data->continueSequence, sizeof(data->continueSequence), hash);
- // MORE - sequence fields should always be zero for anything we are caching I think... (?)
- // Note - no point hashing activityId (as cache is local to one activity) or serverIP (likewise)
- return hash;
- }
- virtual bool cacheMatch(const IRoxieQueryPacket *c) const
- {
- // note - this checks whether it's a repeat from Roxie server's point-of-view
- // So fields that are compared are the same as the ones that are hashed....
- RoxiePacketHeader &h = c->queryHeader();
- if (data->channel == h.channel && data->overflowSequence == h.overflowSequence && data->continueSequence == h.continueSequence)
- {
- if (continuationLength) // note - we already checked that sequences match
- {
- if (continuationLength != c->getContinuationLength())
- return false;
- if (memcmp(continuationData,c->queryContinuationData(),continuationLength)!=0)
- return false;
- }
- if (smartStepInfoLength)
- {
- if (smartStepInfoLength != c->getSmartStepInfoLength())
- return false;
- if (memcmp(smartStepInfoData,c->querySmartStepInfoData(),smartStepInfoLength)!=0)
- return false;
- }
- // NOTE - trace info NOT compared
- if (contextLength == c->getContextLength() && memcmp(contextData, c->queryContextData(), contextLength)==0)
- return true;
- }
- return false;
- }
- };
- extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
- {
- if ((unsigned short)_len != _len && !localSlave)
- {
- StringBuffer s;
- RoxiePacketHeader *header = (RoxiePacketHeader *) _data;
- header->toString(s);
- free(_data);
- throw MakeStringException(ROXIE_PACKET_ERROR, "Packet length %d exceeded maximum sending packet %s", _len, s.str());
- }
- return new CRoxieQueryPacket(_data, _len);
- }
- extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
- {
- unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
- return createRoxiePacket(m.detachOwn(), length);
- }
- //=================================================================================
- SlaveContextLogger::SlaveContextLogger()
- {
- GetHostIp(ip);
- set(NULL);
- }
- SlaveContextLogger::SlaveContextLogger(IRoxieQueryPacket *packet)
- {
- GetHostIp(ip);
- set(packet);
- }
- void SlaveContextLogger::set(IRoxieQueryPacket *packet)
- {
- anyOutput = false;
- intercept = false;
- debuggerActive = false;
- checkingHeap = false;
- aborted = false;
- stats.reset();
- start = msTick();
- if (packet)
- {
- CriticalBlock b(crit);
- RoxiePacketHeader &header = packet->queryHeader();
- const byte *traceInfo = packet->queryTraceInfo();
- unsigned traceLength = packet->getTraceLength();
- unsigned char loggingFlags = *traceInfo;
- if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL
- {
- traceInfo++;
- traceLength--;
- if (loggingFlags & LOGGING_INTERCEPTED)
- intercept = true;
- if (loggingFlags & LOGGING_TRACELEVELSET)
- {
- ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
- traceLength--;
- }
- if (loggingFlags & LOGGING_BLIND)
- blind = true;
- if (loggingFlags & LOGGING_CHECKINGHEAP)
- checkingHeap = true;
- if (loggingFlags & LOGGING_DEBUGGERACTIVE)
- {
- assertex(traceLength > sizeof(unsigned short));
- debuggerActive = true;
- unsigned short debugLen = *(unsigned short *) traceInfo;
- traceInfo += debugLen + sizeof(unsigned short);
- traceLength -= debugLen + sizeof(unsigned short);
- }
- // Passing the wuid via the logging context prefix is a bit of a hack...
- if (loggingFlags & LOGGING_WUID)
- {
- unsigned wuidLen = 0;
- while (wuidLen < traceLength)
- {
- if (traceInfo[wuidLen]=='@')
- break;
- wuidLen++;
- }
- wuid.set((const char *) traceInfo, wuidLen);
- }
- }
- channel = header.channel;
- StringBuffer s(traceLength, (const char *) traceInfo);
- s.append("|");
- ip.getIpText(s);
- s.append(':').append(channel);
- StringContextLogger::set(s.str());
- if (intercept || mergeSlaveStatistics)
- {
- RoxiePacketHeader newHeader(header, ROXIE_TRACEINFO, 0); // subchannel not relevant
- output.setown(ROQ->createOutputStream(newHeader, true, *this));
- }
- }
- else
- {
- StringContextLogger::set("");
- channel = 0;
- }
- }
- void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const
- {
- if (output && mergeSlaveStatistics)
- {
- MemoryBuffer buf;
- buf.append((char) LOG_CHILDCOUNT); // A special log entry for the stats
- buf.append(subGraphId);
- buf.append(actId);
- buf.append(idx);
- buf.append(processed);
- buf.append(strands);
- }
- }
- void SlaveContextLogger::putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const
- {
- if (output && mergeSlaveStatistics)
- {
- MemoryBuffer buf;
- buf.append((char) LOG_CHILDSTATS); // A special log entry for the stats
- buf.append(subGraphId);
- buf.append(actId);
- if (stats.serialize(buf))
- {
- unsigned len = buf.length();
- void *ret = output->getBuffer(len, true);
- memcpy(ret, buf.toByteArray(), len);
- output->putBuffer(ret, len, true);
- anyOutput = true;
- }
- }
- }
- void SlaveContextLogger::flush()
- {
- if (output)
- {
- CriticalBlock b(crit);
- if (mergeSlaveStatistics)
- {
- MemoryBuffer buf;
- buf.append((char) LOG_STATVALUES); // A special log entry for the stats
- if (stats.serialize(buf))
- {
- unsigned len = buf.length();
- void *ret = output->getBuffer(len, true);
- memcpy(ret, buf.toByteArray(), len);
- output->putBuffer(ret, len, true);
- anyOutput = true;
- }
- }
- ForEachItemIn(idx, log)
- {
- MemoryBuffer buf;
- LogItem &logItem = log.item(idx);
- logItem.serialize(buf);
- unsigned len = buf.length();
- void *ret = output->getBuffer(len, true);
- memcpy(ret, buf.toByteArray(), len);
- output->putBuffer(ret, len, true);
- anyOutput = true;
- }
- log.kill();
- if (anyOutput)
- output->flush();
- output.clear();
- }
- }
- //=================================================================================
- static SpinLock onDemandQueriesCrit;
- static MapXToMyClass<hash64_t, hash64_t, IQueryFactory> onDemandQueryCache;
- void sendUnloadMessage(hash64_t hash, const char *id, const IRoxieContextLogger &logctx)
- {
- unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen(id) + 1;
- void *packetData = malloc(packetSize);
- RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
- RemoteActivityId unloadId(ROXIE_UNLOAD, hash);
- header->init(unloadId, 0, 0, 0);
- char *finger = (char *) (header + 1);
- *finger++ = (char) LOGGING_FLAGSPRESENT;
- strcpy(finger, id);
- finger += strlen(id)+1;
- if (traceLevel > 1)
- DBGLOG("UNLOAD sent for query %s", id);
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
- ROQ->sendPacket(packet, logctx);
- }
- void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
- {
- const RoxiePacketHeader &header = packet->queryHeader();
- unsigned channelNo = header.channel;
- logctx.CTXLOG("Unload received for channel %d", channelNo);
- hash64_t hashValue = header.queryHash;
- hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
- SpinBlock b(onDemandQueriesCrit);
- onDemandQueryCache.remove(hashValue);
- }
- void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
- {
- hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
- SpinBlock b(onDemandQueriesCrit);
- onDemandQueryCache.setValue(hashValue, query);
- }
- //=================================================================================
- struct PingRecord
- {
- unsigned tick;
- IpAddress senderIP;
- };
- void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
- {
- const RoxiePacketHeader &header = packet->queryHeader();
- const IpAddress &serverIP = header.serverId.getNodeAddress();
- unsigned contextLength = packet->getContextLength();
- if (contextLength != sizeof(PingRecord))
- {
- StringBuffer s;
- throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Unexpected data size %d (expected %d) in PING: %s", contextLength, (unsigned) sizeof(PingRecord), header.toString(s).str());
- }
- const PingRecord *data = (const PingRecord *) packet->queryContextData();
- if (!serverIP.ipequals(data->senderIP))
- {
- StringBuffer s;
- throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
- }
- RoxiePacketHeader newHeader(header, ROXIE_PING, 0); // subchannel not relevant
- Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
- void *ret = output->getBuffer(contextLength, false);
- memcpy(ret, data, contextLength);
- output->putBuffer(ret, contextLength, false);
- output->flush();
- }
- //=================================================================================
- //
- // RoxieQueue - holds pending transactions on a roxie agent
- class RoxieQueue : public CInterface, implements IThreadFactory
- {
- Owned <IThreadPool> workers;
- QueueOf<IRoxieQueryPacket, true> waiting;
- Semaphore available;
- CriticalSection qcrit;
- unsigned headRegionSize;
- unsigned numWorkers;
- RelaxedAtomic<unsigned> started;
- std::atomic<unsigned> idle;
- void noteQueued()
- {
- maxQueueLength.store_max(++queueLength);
- // NOTE - there is a small race condition here - if idle is 1 but two enqueue's happen
- // close enough together that the signal has not yet caused idle to come back down to zero, then the
- // desired new thread may not be created. It's unlikely, and it's benign in that the query is still
- // processed and the thread will be created next time the HWM is reached.
- if (started < numWorkers && idle==0)
- {
- workers->start(this);
- started++;
- }
- }
- public:
- IMPLEMENT_IINTERFACE;
- RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
- {
- headRegionSize = _headRegionSize;
- numWorkers = _numWorkers;
- workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
- started = 0;
- idle = 0;
- }
- virtual IPooledThread *createNew();
- void abortChannel(unsigned channel);
- void start()
- {
- if (prestartSlaveThreads)
- {
- while (started < numWorkers)
- {
- workers->start(this);
- started++;
- }
- }
- }
- IPooledThreadIterator *running()
- {
- return workers->running();
- }
- void stopAll()
- {
- workers->stopAll(true);
- signal(workers->runningCount());
- }
- void join()
- {
- workers->joinAll(true);
- workers.clear(); // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
- }
- void enqueue(IRoxieQueryPacket *x)
- {
- {
- #ifdef TIME_PACKETS
- x->queryHeader().tick = msTick();
- #endif
- CriticalBlock qc(qcrit);
- waiting.enqueue(x);
- noteQueued();
- }
- available.signal();
- }
- void enqueueUnique(IRoxieQueryPacket *x, unsigned subChannel)
- {
- RoxiePacketHeader &header = x->queryHeader();
- #ifdef TIME_PACKETS
- header.tick = msTick();
- #endif
- bool found = false;
- {
- CriticalBlock qc(qcrit);
- unsigned len = waiting.ordinality();
- unsigned i;
- for (i = 0; i < len; i++)
- {
- IRoxieQueryPacket *queued = waiting.item(i);
- if (queued && queued->queryHeader().matchPacket(header))
- {
- found = true;
- break;
- }
- }
- if (!found)
- waiting.enqueue(x);
- }
- if (found)
- {
- if (traceLevel > 0)
- {
- StringBuffer xx;
- SlaveContextLogger l(x);
- l.CTXLOG("Ignored retry on subchannel %u for queued activity %s", subChannel, header.toString(xx).str());
- }
- if (!subChannel)
- retriesIgnoredPrm++;
- else
- retriesIgnoredSec++;
- x->Release();
- }
- else
- {
- available.signal();
- noteQueued();
- if (traceLevel > 10)
- {
- SlaveContextLogger l(x);
- StringBuffer xx;
- l.CTXLOG("enqueued %s", header.toString(xx).str());
- }
- }
- }
- bool remove(RoxiePacketHeader &x)
- {
- unsigned scanLength = 0;
- IRoxieQueryPacket *found = nullptr;
- {
- CriticalBlock qc(qcrit);
- unsigned len = waiting.ordinality();
- unsigned i;
- for (i = 0; i < len; i++)
- {
- IRoxieQueryPacket *queued = waiting.item(i);
- if (queued)
- {
- scanLength++;
- if (queued->queryHeader().matchPacket(x))
- {
- waiting.set(i, NULL);
- found = queued;
- break;
- }
- }
- }
- }
- if (found)
- {
- #ifdef _DEBUG
- RoxiePacketHeader &header = found->queryHeader();
- SlaveContextLogger l(found);
- StringBuffer xx;
- l.CTXLOG("discarded %s", header.toString(xx).str());
- #endif
- found->Release();
- queueLength--;
- if (scanLength > maxScanLength)
- maxScanLength = scanLength;
- totScanLength += scanLength;
- totScans++;
- return true;
- }
- else
- return false;
- }
- void wait()
- {
- idle++;
- available.wait();
- idle--;
- }
- void signal(unsigned num)
- {
- available.signal(num);
- }
- IRoxieQueryPacket *dequeue()
- {
- CriticalBlock qc(qcrit);
- unsigned lim = waiting.ordinality();
- if (lim)
- {
- if (headRegionSize)
- {
- if (lim > headRegionSize)
- lim = headRegionSize;
- return waiting.dequeue(fastRand() % lim);
- }
- return waiting.dequeue();
- }
- else
- return NULL;
- }
- unsigned getHeadRegionSize() const
- {
- return headRegionSize;
- }
- unsigned setHeadRegionSize(unsigned newsize)
- {
- unsigned ret = headRegionSize;
- headRegionSize = newsize;
- return ret;
- }
- };
- class CRoxieWorker : public CInterface, implements IPooledThread
- {
- RoxieQueue *queue;
- CriticalSection actCrit;
- Semaphore ibytiSem;
- bool stopped;
- bool abortJob;
- bool busy;
- Owned<IRoxieSlaveActivity> activity;
- Owned<IRoxieQueryPacket> packet;
- Owned<const ITopologyServer> topology;
- SlaveContextLogger logctx;
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieWorker()
- {
- queue = NULL;
- stopped = false;
- busy = false;
- abortJob = false;
- }
- virtual void init(void *_r) override
- {
- queue = (RoxieQueue *) _r;
- stopped = false;
- busy = false;
- abortJob = false;
- }
- virtual bool canReuse() const override
- {
- return true;
- }
- virtual bool stop() override
- {
- stopped = true;
- return true;
- }
- inline void setActivity(IRoxieSlaveActivity *act)
- {
- CriticalBlock b(actCrit);
- activity.setown(act);
- }
- inline bool match(RoxiePacketHeader &h)
- {
- // There is a window between getting packet from queue and being able to match it.
- // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
- CriticalBlock b(actCrit);
- return packet && packet->queryHeader().matchPacket(h);
- }
- void abortChannel(unsigned channel)
- {
- CriticalBlock b(actCrit);
- if (packet && packet->queryHeader().channel==channel)
- {
- abortJob = true;
- if (doIbytiDelay)
- ibytiSem.signal();
- if (activity)
- activity->abort();
- }
- }
- bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
- {
- CriticalBlock b(actCrit);
- if (packet && packet->queryHeader().matchPacket(h))
- {
- queryFound = true;
- abortJob = true;
- if (doIbytiDelay)
- ibytiSem.signal();
- if (activity)
- {
- // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority slave
- // (more primary in the rank). The slaves with higher rank will hold the lower bits of the retries field in IBYTI packet).
- if (!checkRank || topology->queryChannelInfo(h.channel).otherSlaveHasPriority(h.priorityHash(), h.getRespondingSubChannel()))
- {
- activity->abort();
- return true;
- }
- else
- {
- return false;
- }
- }
- if (busy)
- {
- preActivity = true;
- return true;
- }
- }
- return false;
- }
- void throwRemoteException(IException *E, IRoxieSlaveActivity *activity, IRoxieQueryPacket *packet, bool isUser)
- {
- try
- {
- if (activity && (logctx.queryTraceLevel() > 1))
- {
- StringBuffer act;
- activity->toString(act);
- logctx.CTXLOG("throwRemoteException, activity %s, isUser=%d", act.str(), (int) isUser);
- if (!isUser)
- EXCLOG(E, "throwRemoteException");
- }
-
- RoxiePacketHeader &header = packet->queryHeader();
- unsigned mySubChannel = topology->queryChannelInfo(header.channel).subChannel();
- // I failed to do the query, but already sent out IBYTI - resend it so someone else can try
- if (!isUser)
- {
- StringBuffer s;
- s.append("Exception in slave for packet ");
- header.toString(s);
- logctx.logOperatorException(E, NULL, 0, "%s", s.str());
- header.setException(mySubChannel);
- if (!header.allChannelsFailed() && !localSlave)
- {
- if (logctx.queryTraceLevel() > 1)
- logctx.CTXLOG("resending packet from slave in case others want to try it");
- ROQ->sendPacket(packet, logctx);
- }
- }
- RoxiePacketHeader newHeader(header, ROXIE_EXCEPTION, mySubChannel);
- if (isUser)
- newHeader.retries = (unsigned short) -1;
- Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
- StringBuffer message("<Exception>");
- message.appendf("<Code>%d</Code><Message>", E->errorCode());
- StringBuffer err;
- E->errorMessage(err);
- encodeXML(err.str(), message);
- message.append("</Message></Exception>");
- unsigned len = message.length();
- void *ret = output->getBuffer(len+1, true);
- memcpy(ret, message.str(), len+1);
- output->putBuffer(ret, len+1, true);
- output->flush();
- E->Release();
- }
- catch (IException *EInE)
- {
- EXCLOG(EInE, "Exception during throwRemoteException");
- E->Release();
- EInE->Release();
- }
- catch (...)
- {
- logctx.CTXLOG("Unknown Exception during throwRemoteException");
- E->Release();
- }
- }
- void doActivity()
- {
- RoxiePacketHeader &header = packet->queryHeader();
- unsigned channel = header.channel;
- hash64_t queryHash = packet->queryHeader().queryHash;
- unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
- Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
- unsigned numSlaves = topology->querySlaves(channel).ordinality();
- unsigned mySubChannel = topology->queryChannelInfo(channel).subChannel();
- if (!queryFactory && logctx.queryWuid())
- {
- Owned <IRoxieDaliHelper> daliHelper = connectToDali();
- Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(logctx.queryWuid(), NULL);
- queryFactory.setown(createSlaveQueryFactoryFromWu(wu, channel));
- if (queryFactory)
- cacheOnDemandQuery(queryHash, channel, queryFactory);
- }
- if (!queryFactory)
- {
- StringBuffer hdr;
- IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie slave received request for unregistered query: %s", packet->queryHeader().toString(hdr).str());
- EXCLOG(E, "doActivity");
- throwRemoteException(E, activity, packet, false);
- return;
- }
- try
- {
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer x;
- logctx.CTXLOG("IBYTI delay controls : doIbytiDelay=%s numslaves=%u subchnl=%u : %s",
- doIbytiDelay?"YES":"NO",
- numSlaves, topology->queryChannelInfo(channel).subChannel(),
- header.toString(x).str());
- }
- bool debugging = logctx.queryDebuggerActive();
- if (debugging)
- {
- if (mySubChannel)
- abortJob = true; // when debugging, we always run on primary only...
- }
- else if (doIbytiDelay && (numSlaves > 1))
- {
- unsigned hdrHashVal = header.priorityHash();
- unsigned primarySubChannel = (hdrHashVal % numSlaves);
- if (primarySubChannel != mySubChannel)
- {
- unsigned delay = topology->queryChannelInfo(channel).getIbytiDelay(primarySubChannel);
- if (logctx.queryTraceLevel() > 6)
- {
- StringBuffer x;
- logctx.CTXLOG("YES myTurnToDelayIBYTI subchannel=%u delay=%u hash=%u %s", mySubChannel, delay, hdrHashVal, header.toString(x).str());
- }
-
- // MORE: if we are dealing with a query that was on channel 0, we may want a longer delay
- // (since the theory about duplicated work not mattering when cluster is idle does not hold up)
- if (delay)
- {
- ibytiSem.wait(delay);
- if (!abortJob)
- topology->queryChannelInfo(channel).noteChannelsSick(primarySubChannel);
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer x;
- logctx.CTXLOG("Buddy did%s send IBYTI, updated delay : %s",
- abortJob ? "" : " NOT", header.toString(x).str());
- }
- }
- }
- else
- {
- #ifndef NO_IBYTI_DELAYS_COUNT
- if (!mySubChannel)
- ibytiNoDelaysPrm++;
- else
- ibytiNoDelaysSec++;
- #endif
- if (logctx.queryTraceLevel() > 6)
- {
- StringBuffer x;
- logctx.CTXLOG("NOT myTurnToDelayIBYTI subchannel=%u hash=%u %s", mySubChannel, hdrHashVal, header.toString(x).str());
- }
- }
- }
- if (abortJob)
- {
- CriticalBlock b(actCrit);
- busy = false; // Keep order - before setActivity below
- if (logctx.queryTraceLevel() > 5)
- {
- StringBuffer x;
- logctx.CTXLOG("Stop before processing - activity aborted %s", header.toString(x).str());
- }
- return;
- }
- if (!debugging)
- ROQ->sendIbyti(header, logctx, mySubChannel);
- activitiesStarted++;
- Owned <ISlaveActivityFactory> factory = queryFactory->getSlaveActivityFactory(activityId);
- assertex(factory);
- setActivity(factory->createActivity(logctx, packet));
- Owned<IMessagePacker> output = activity->process();
- if (logctx.queryTraceLevel() > 5)
- {
- StringBuffer x;
- logctx.CTXLOG("done processing %s", header.toString(x).str());
- }
- if (output)
- {
- activitiesCompleted++;
- busy = false; // Keep order - before setActivity below
- setActivity(NULL); // Ensures all stats are merged from child queries etc
- logctx.flush();
- output->flush();
- }
- }
- catch (IUserException *E)
- {
- throwRemoteException(E, activity, packet, true);
- }
- catch (IException *E)
- {
- if (E->errorCode()!=ROXIE_ABORT_ERROR)
- throwRemoteException(E, activity, packet, false);
- }
- catch (...)
- {
- throwRemoteException(MakeStringException(ROXIE_MULTICAST_ERROR, "Unknown exception"), activity, packet, false);
- }
- busy = false; // Keep order - before setActivity below
- setActivity(NULL);
- }
- virtual void threadmain() override
- {
- while (!stopped)
- {
- try
- {
- for (;;)
- {
- queue->wait();
- if (stopped)
- break;
- slavesActive++;
- maxSlavesActive.store_max(slavesActive);
- abortJob = false;
- busy = true;
- if (doIbytiDelay)
- ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
- packet.setown(queue->dequeue());
- if (packet)
- {
- queueLength--;
- RoxiePacketHeader &header = packet->queryHeader();
- logctx.set(packet);
- #ifdef TIME_PACKETS
- {
- unsigned now = msTick();
- unsigned packetWait = now-header.tick;
- header.tick = now;
- packetWaitMax.store_max(packetWait);
- packetWaitElapsed += packetWait;
- packetWaitCount++;
- }
- #endif
- topology.setown(getTopology());
- if (logctx.queryTraceLevel() > 10)
- {
- StringBuffer x;
- logctx.CTXLOG("dequeued %s", header.toString(x).str());
- }
- if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_UNLOAD)
- {
- doUnload(packet, logctx);
- }
- else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_PING)
- {
- doPing(packet, logctx);
- }
- else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == ROXIE_DEBUGREQUEST)
- {
- // MORE - we need to make sure only executed on primary, and that the proxyId (== pointer to DebugGraphManager) is still valid.
- // It may be that there is not a lot of point using the pointer - may as well use an non-reused ID and look it up in a global hash table of active ones
- doDebugRequest(packet, logctx);
- }
- else if (header.channel)
- doActivity();
- else
- throwUnexpected(); // channel 0 requests translated earlier now
- #ifdef TIME_PACKETS
- {
- unsigned now = msTick();
- unsigned packetRun = now-header.tick;
- packetRunMax.store_max(packetRun);
- packetRunElapsed += packetRun;
- packetRunCount++;
- }
- #endif
- }
- busy = false;
- {
- CriticalBlock b(actCrit);
- packet.clear();
- topology.clear();
- logctx.set(NULL);
- }
- slavesActive--;
- }
- }
- catch(IException *E)
- {
- CriticalBlock b(actCrit);
- EXCLOG(E);
- if (packet)
- {
- throwRemoteException(E, NULL, packet, false);
- packet.clear();
- }
- else
- E->Release();
- topology.clear();
- }
- catch(...)
- {
- CriticalBlock b(actCrit);
- Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
- EXCLOG(E);
- if (packet)
- {
- throwRemoteException(E.getClear(), NULL, packet, false);
- packet.clear();
- }
- topology.clear();
- }
- }
- }
- };
- IPooledThread *RoxieQueue::createNew()
- {
- return new CRoxieWorker;
- }
- void RoxieQueue::abortChannel(unsigned channel)
- {
- Owned<IPooledThreadIterator> wi = workers->running();
- ForEach(*wi)
- {
- CRoxieWorker &w = (CRoxieWorker &) wi->query();
- w.abortChannel(channel);
- }
- }
- //=================================================================================
- class CallbackEntry : implements IPendingCallback, public CInterface
- {
- const RoxiePacketHeader &header;
- StringAttr lfn;
- InterruptableSemaphore ready;
- MemoryBuffer data;
- bool gotData;
- public:
- IMPLEMENT_IINTERFACE;
- CallbackEntry(const RoxiePacketHeader &_header, const char *_lfn) : header(_header), lfn(_lfn)
- {
- gotData = false;
- }
- virtual bool wait(unsigned msecs)
- {
- return ready.wait(msecs);
- }
- virtual MemoryBuffer &queryData()
- {
- return data;
- }
- bool matches(RoxiePacketHeader &cand, const char *_lfn)
- {
- return (cand.matchPacket(header) && (!_lfn|| stricmp(_lfn, lfn)==0));
- }
- void doFileCallback(unsigned _len, const void *_data, bool aborted)
- {
- // MORE - make sure we call this for whole query abort as well as for callback abort
- if (aborted)
- ready.interrupt(MakeStringException(0, "Interrupted"));
- else if (!gotData)
- {
- gotData = true;
- data.append(_len, _data);
- ready.signal();
- }
- }
- };
- class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
- {
- protected:
- #ifdef ROXIE_SLA_LOGIC
- RoxieQueue slaQueue;
- #endif
- RoxieQueue hiQueue;
- RoxieQueue loQueue;
- unsigned numWorkers;
- public:
- IMPLEMENT_IINTERFACE;
- #ifdef ROXIE_SLA_LOGIC
- RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
- #else
- RoxieReceiverBase(unsigned _numWorkers) : hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
- #endif
- {
- }
- virtual unsigned getHeadRegionSize() const
- {
- return loQueue.getHeadRegionSize();
- }
- virtual void setHeadRegionSize(unsigned newSize)
- {
- #ifdef ROXIE_SLA_LOGIC
- slaQueue.setHeadRegionSize(newSize);
- #endif
- hiQueue.setHeadRegionSize(newSize);
- loQueue.setHeadRegionSize(newSize);
- }
- virtual void start()
- {
- loQueue.start();
- hiQueue.start();
- #ifdef ROXIE_SLA_LOGIC
- slaQueue.start();
- #endif
- }
- virtual void stop()
- {
- loQueue.stopAll();
- hiQueue.stopAll();
- #ifdef ROXIE_SLA_LOGIC
- slaQueue.stopAll();
- #endif
- }
- virtual void join()
- {
- loQueue.join();
- hiQueue.join();
- #ifdef ROXIE_SLA_LOGIC
- slaQueue.join();
- #endif
- }
- IArrayOf<CallbackEntry> callbacks;
- CriticalSection callbacksCrit;
- virtual IPendingCallback *notePendingCallback(const RoxiePacketHeader &header, const char *lfn)
- {
- CriticalBlock b(callbacksCrit);
- CallbackEntry *callback = new CallbackEntry(header, lfn);
- callbacks.append(*callback);
- return callback;
- }
- virtual void removePendingCallback(IPendingCallback *goer)
- {
- if (goer)
- {
- CriticalBlock b(callbacksCrit);
- callbacks.zap(static_cast<CallbackEntry &>(*goer));
- }
- }
- protected:
- void doFileCallback(IRoxieQueryPacket *packet)
- {
- // This is called on the main slave reader thread so needs to be as fast as possible to avoid lost packets
- const char *lfn;
- const char *data;
- unsigned len;
- RoxiePacketHeader &header = packet->queryHeader();
- if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK)
- {
- lfn = (const char *) packet->queryContextData();
- unsigned namelen = strlen(lfn) + 1;
- data = lfn + namelen;
- len = packet->getContextLength() - namelen;
- }
- else
- {
- lfn = data = NULL; // used when query aborted
- len = 0;
- }
- CriticalBlock b(callbacksCrit);
- ForEachItemIn(idx, callbacks)
- {
- CallbackEntry &c = callbacks.item(idx);
- if (c.matches(header, lfn))
- {
- if (traceLevel > 10)
- DBGLOG("callback return matched a waiting query");
- c.doFileCallback(len, data, header.retries==QUERY_ABORTED);
- }
- }
- }
- };
- #ifdef _MSC_VER
- #pragma warning ( push )
- #pragma warning ( disable: 4355 )
- #endif
- class RoxieThrottledPacketSender : public Thread
- {
- TokenBucket &bucket;
- InterruptableSemaphore queued;
- Semaphore started;
- unsigned maxPacketSize;
- SafeQueueOf<IRoxieQueryPacket, false> queue;
- class DECL_EXCEPTION StoppedException: public IException, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- int errorCode() const { return 0; }
- StringBuffer & errorMessage(StringBuffer &str) const { return str.append("Stopped"); }
- MessageAudience errorAudience() const { return MSGAUD_user; }
- };
- void enqueue(IRoxieQueryPacket *packet)
- {
- packet->Link();
- queue.enqueue(packet);
- queued.signal();
- }
- IRoxieQueryPacket *dequeue()
- {
- queued.wait();
- return queue.dequeue();
- }
- public:
- RoxieThrottledPacketSender(TokenBucket &_bucket, unsigned _maxPacketSize)
- : Thread("RoxieThrottledPacketSender"), bucket(_bucket), maxPacketSize(_maxPacketSize)
- {
- start();
- started.wait();
- }
- ~RoxieThrottledPacketSender()
- {
- stop();
- join();
- }
- virtual int run()
- {
- started.signal();
- for (;;)
- {
- try
- {
- Owned<IRoxieQueryPacket> packet = dequeue();
- RoxiePacketHeader &header = packet->queryHeader();
- unsigned length = packet->queryHeader().packetlength;
- {
- MTIME_SECTION(queryActiveTimer(), "bucket_wait");
- bucket.wait((length / 1024) + 1);
- }
- if (channelWrite(header.channel, &header, length) != length)
- DBGLOG("multicast write wrote too little");
- packetsSent++;
- }
- catch (StoppedException *E)
- {
- E->Release();
- break;
- }
- catch (IException *E)
- {
- EXCLOG(E);
- E->Release();
- }
- catch (...)
- {
- }
- }
- return 0;
- }
- virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
- {
- RoxiePacketHeader &header = x->queryHeader();
- unsigned length = x->queryHeader().packetlength;
- assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
- switch (header.retries & ROXIE_RETRIES_MASK)
- {
- case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
- {
- StringBuffer s;
- logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
- }
- break;
- default:
- {
- StringBuffer s;
- logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
- }
- break;
- case 0:
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer s;
- logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
- }
- break;
- }
- if (length > maxPacketSize)
- {
- StringBuffer s;
- throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
- }
- enqueue(x);
- }
- void stop()
- {
- // bucket.stop();
- queued.interrupt(new StoppedException);
- }
- };
- class RoxieSocketQueueManager : public RoxieReceiverBase
- {
- protected:
- Linked<ISendManager> sendManager;
- Linked<IReceiveManager> receiveManager;
- Owned<RoxieThrottledPacketSender> throttledPacketSendManager;
- Owned<TokenBucket> bucket;
- unsigned maxPacketSize = 0;
- std::atomic<bool> running = { false };
- class ReceiverThread : public Thread
- {
- RoxieSocketQueueManager &parent;
- public:
- ReceiverThread(RoxieSocketQueueManager &_parent) : Thread("RoxieSocketQueueManager"), parent(_parent) {}
- int run()
- {
- // Raise the priority so ibyti's get through in a timely fashion
- #if defined( __linux__) || defined(__APPLE__)
- setLinuxThreadPriority(3);
- #else
- adjustPriority(1);
- #endif
- return parent.run();
- }
- } readThread;
- public:
- RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), readThread(*this)
- {
- maxPacketSize = multicastSocket->get_max_send_size();
- if ((maxPacketSize==0)||(maxPacketSize>65535))
- maxPacketSize = 65535;
- }
- virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
- {
- if (throttledPacketSendManager)
- throttledPacketSendManager->sendPacket(x, logctx);
- else
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendPacket");
- RoxiePacketHeader &header = x->queryHeader();
- unsigned length = x->queryHeader().packetlength;
- assertex (header.activityId & ~ROXIE_PRIORITY_MASK);
- StringBuffer s;
- switch (header.retries & ROXIE_RETRIES_MASK)
- {
- case (QUERY_ABORTED & ROXIE_RETRIES_MASK):
- logctx.CTXLOG("Aborting packet size=%d: %s", length, header.toString(s).str());
- break;
- default:
- logctx.CTXLOG("Resending packet size=%d: %s", length, header.toString(s).str());
- break;
- case 0:
- if (logctx.queryTraceLevel() > 8)
- logctx.CTXLOG("Sending packet size=%d: %s", length, header.toString(s).str());
- break;
- }
- if (length > maxPacketSize)
- {
- StringBuffer s;
- throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
- }
- if (channelWrite(header.channel, &header, length) != length)
- logctx.CTXLOG("multicast write wrote too little");
- packetsSent++;
- }
- }
- virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) override
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendIbyti");
- RoxiePacketHeader ibytiHeader(header, header.activityId & ROXIE_PRIORITY_MASK, subChannel);
-
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer s; logctx.CTXLOG("Sending IBYTI packet %s", ibytiHeader.toString(s).str());
- }
- if (channelWrite(header.channel, &ibytiHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
- logctx.CTXLOG("sendIbyti wrote too little");
- ibytiPacketsSent++;
- }
- virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx) override
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbort");
- RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK, 0); // subChannel irrelevant - we are about to overwrite retries anyway
- abortHeader.retries = QUERY_ABORTED;
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
- }
- if (channelWrite(header.channel, &abortHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
- logctx.CTXLOG("sendAbort wrote too little");
- abortsSent++;
- }
- virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx) override
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbortCallback");
- RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK, 0); // subChannel irrelevant - we are about to overwrite retries anyway
- abortHeader.retries = QUERY_ABORTED;
- MemoryBuffer data;
- data.append(sizeof(abortHeader), &abortHeader).append(lfn);
- if (logctx.queryTraceLevel() > 5)
- {
- StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
- }
- if (channelWrite(header.channel, data.toByteArray(), data.length()) != data.length())
- logctx.CTXLOG("tr->write wrote too little");
- abortsSent++;
- }
- virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx)
- {
- unsigned qnum = outOfBand ? 0 : ((header.retries & ROXIE_FASTLANE) || !fastLaneQueue) ? 1 : 2;
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer s; logctx.CTXLOG("Creating Output Stream for reply packet on Q=%d - %s", qnum, header.toString(s).str());
- }
- return sendManager->createMessagePacker(header.uid, header.getSequenceId(), &header, sizeof(RoxiePacketHeader), header.serverId, qnum);
- }
- virtual bool replyPending(RoxiePacketHeader &header)
- {
- return sendManager->dataQueued(header.uid, header.getSequenceId(), header.serverId);
- }
- virtual bool abortCompleted(RoxiePacketHeader &header)
- {
- return sendManager->abortData(header.uid, header.getSequenceId(), header.serverId);
- }
- bool abortRunning(RoxiePacketHeader &header, RoxieQueue &queue, bool checkRank, bool &preActivity)
- {
- bool queryFound = false;
- bool ret = false;
- Owned<IPooledThreadIterator> wi = queue.running();
- ForEach(*wi)
- {
- CRoxieWorker &w = (CRoxieWorker &) wi->query();
- if (w.checkAbort(header, checkRank, queryFound, preActivity))
- {
- ret = true;
- break;
- }
- else if (queryFound)
- {
- ret = false;
- break;
- }
- }
- if (!checkRank)
- {
- if (traceLevel > 8)
- DBGLOG("discarding data for aborted query");
- ROQ->abortCompleted(header);
- }
- return ret;
- }
- void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue, const ITopologyServer* topology)
- {
- assertex(!localSlave);
- bool preActivity = false;
- const ChannelInfo &channelInfo = topology->queryChannelInfo(header.channel);
- unsigned mySubChannel = channelInfo.subChannel();
- if (traceLevel > 10)
- {
- IpAddress peer;
- StringBuffer s, s1;
- multicastSocket->getPeerAddress(peer).getIpText(s);
- header.toString(s1);
- DBGLOG("doIBYTI %s from %s", s1.str(), s.str());
- DBGLOG("header.retries=%x header.getSubChannelMask(header.channel)=%x", header.retries, header.getSubChannelMask(mySubChannel));
- }
-
- if (header.retries == QUERY_ABORTED)
- {
- abortRunning(header, queue, false, preActivity);
- queue.remove(header);
- if (traceLevel > 10)
- {
- StringBuffer s;
- DBGLOG("Abort activity %s", header.toString(s).str());
- }
- }
- else
- {
- ibytiPacketsReceived++;
- unsigned subChannel = header.getRespondingSubChannel();
- if (subChannel == mySubChannel)
- {
- if (traceLevel > 10)
- DBGLOG("doIBYTI packet was from self");
- ibytiPacketsFromSelf++;
- }
- else
- {
- channelInfo.noteChannelHealthy(subChannel);
- bool foundInQ = queue.remove(header);
- if (foundInQ)
- {
- if (traceLevel > 10)
- {
- StringBuffer s;
- DBGLOG("Removed activity from Q : %s", header.toString(s).str());
- }
- ibytiPacketsWorked++;
- return;
- }
- if (abortRunning(header, queue, true, preActivity))
- {
- if (preActivity)
- ibytiPacketsWorked++;
- else
- ibytiPacketsHalfWorked++;
- return;
- }
- if (traceLevel > 10)
- DBGLOG("doIBYTI packet was too late");
- ibytiPacketsTooLate++; // meaning either I started and reserve the right to finish, or I finished already
- }
- }
- }
- void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue)
- {
- // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
- // DO NOT put tracing on this thread except at very high tracelevels!
- Owned<const ITopologyServer> topology = getTopology();
- if (!header.channel)
- {
- // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
- // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
- // Unfortunately this is bad news for dropping packets
- const std::vector<unsigned> channels = topology->queryChannels();
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
- for (unsigned i = 1; i < channels.size(); i++)
- queue.enqueue(packet->clonePacket(channels[i]));
- header.channel = channels[0];
- queue.enqueue(packet.getClear());
- return;
- }
- unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
- if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
- {
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
- if (traceLevel > 10)
- {
- StringBuffer s;
- DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
- }
- doFileCallback(packet);
- }
- else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
- doIbyti(header, queue, topology); // MORE - check how fast this is!
- else
- {
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
- SlaveContextLogger logctx(packet);
- unsigned retries = header.thisChannelRetries(mySubchannel);
- if (retries)
- {
- // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
- assertex(header.channel); // should never see a retry on channel 0
- if (retries >= SUBCHANNEL_MASK)
- return; // someone sent a failure or something - ignore it
- // Send back an out-of-band immediately, to let Roxie server know that channel is still active
- if (!(testSlaveFailure & 0x800))
- {
- RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
- Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
- output->flush();
- }
- // If it's a retry, look it up against already running, or output stream, or input queue
- // if found, send an IBYTI and discard retry request
-
- if (!mySubchannel)
- retriesReceivedPrm++;
- else
- retriesReceivedSec++;
- bool alreadyRunning = false;
- Owned<IPooledThreadIterator> wi = queue.running();
- ForEach(*wi)
- {
- CRoxieWorker &w = (CRoxieWorker &) wi->query();
- if (w.match(header))
- {
- alreadyRunning = true;
- if (!mySubchannel)
- retriesIgnoredPrm++;
- else
- retriesIgnoredSec++;
- ROQ->sendIbyti(header, logctx, mySubchannel);
- if (logctx.queryTraceLevel() > 10)
- {
- StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
- }
- break;
- }
- }
- if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
- {
- alreadyRunning = true;
- if (!mySubchannel)
- retriesIgnoredPrm++;
- else
- retriesIgnoredSec++;
- ROQ->sendIbyti(header, logctx, mySubchannel);
- if (logctx.queryTraceLevel() > 10)
- {
- StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
- }
- }
- if (!alreadyRunning)
- {
- if (logctx.queryTraceLevel() > 10)
- {
- StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
- }
- queue.enqueueUnique(packet.getClear(), mySubchannel);
- }
- }
- else // first time (not a retry).
- queue.enqueue(packet.getClear());
- }
- }
- int run()
- {
- if (traceLevel)
- DBGLOG("RoxieSocketQueueManager::run() starting: doIbytiDelay=%s minIbytiDelay=%u initIbytiDelay=%u",
- doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay);
- for (;;)
- {
- MemoryBuffer mb;
- try
- {
- // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
- // DO NOT put tracing on this thread except at very high tracelevels!
- unsigned l;
- multicastSocket->read(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, 5);
- mb.setLength(l);
- packetsReceived++;
- RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
- if (l != header.packetlength)
- DBGLOG("sock->read returned %d but packetlength was %d", l, header.packetlength);
- if (traceLevel > 10)
- {
- StringBuffer s;
- DBGLOG("Read from multicast: %s", header.toString(s).str());
- }
- #ifdef ROXIE_SLA_LOGIC
- if (header.activityId & ROXIE_SLA_PRIORITY)
- processMessage(mb, header, slaQueue);
- else
- #endif
- if (header.activityId & ROXIE_HIGH_PRIORITY)
- processMessage(mb, header, hiQueue);
- else
- processMessage(mb, header, loQueue);
- }
- catch (IException *E)
- {
- if (running)
- {
- // MORE: Maybe we should utilize IException::errorCode - not just text ??
- if (E->errorCode()==JSOCKERR_timeout_expired)
- E->Release();
- else if (roxiemem::memPoolExhausted())
- {
- //MORE: I think this should probably be based on the error code instead.
- EXCLOG(E, "Exception reading or processing multicast msg");
- E->Release();
- MilliSleep(1000); // Give a chance for mem free
- }
- else
- {
- EXCLOG(E, "Exception reading or processing multicast msg");
- E->Release();
- // MORE: Protect with try logic, in case udp_create throws exception ?
- // What to do if create fails (ie exception is caught) ?
- if (multicastSocket)
- {
- multicastSocket->close();
- multicastSocket.clear();
- openMulticastSocket();
- }
- }
-
- }
- else
- {
- E->Release();
- break;
- }
- }
- }
- return 0;
- }
- void start()
- {
- RoxieReceiverBase::start();
- running = true;
- readThread.start();
- }
- void stop()
- {
- if (running)
- {
- running = false;
- multicastSocket->close();
- }
- RoxieReceiverBase::stop();
- }
- void join()
- {
- readThread.join();
- RoxieReceiverBase::join();
- }
- virtual IReceiveManager *queryReceiveManager()
- {
- return receiveManager;
- }
- };
- class RoxieUdpSocketQueueManager : public RoxieSocketQueueManager
- {
- public:
- RoxieUdpSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieSocketQueueManager(_numWorkers)
- {
- int udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
- int udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
- int udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
- if (topology->getPropInt("@sendMaxRate", 0))
- {
- unsigned sendMaxRate = topology->getPropInt("@sendMaxRate");
- unsigned sendMaxRatePeriod = topology->getPropInt("@sendMaxRatePeriod", 1);
- bucket.setown(new TokenBucket(sendMaxRate, sendMaxRatePeriod, sendMaxRate));
- throttledPacketSendManager.setown(new RoxieThrottledPacketSender(*bucket, maxPacketSize));
- }
- IpAddress snifferIp;
- getChannelIp(snifferIp, snifferChannel);
- if (udpMaxSlotsPerClient > udpQueueSize)
- udpMaxSlotsPerClient = udpQueueSize;
- unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
- unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
- unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
- unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
- receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient));
- sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket));
- }
- };
- class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager
- {
- public:
- RoxieAeronSocketQueueManager(unsigned _numWorkers) : RoxieSocketQueueManager(_numWorkers)
- {
- unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
- SocketEndpoint ep(dataPort, myNode.getNodeAddress());
- receiveManager.setown(createAeronReceiveManager(ep));
- assertex(!myNode.getNodeAddress().isNull());
- sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getNodeAddress()));
- }
- };
- #ifdef _MSC_VER
- #pragma warning( pop )
- #endif
- //==================================================================================================
- interface ILocalMessageCollator : extends IMessageCollator
- {
- virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0;
- };
- interface ILocalReceiveManager : extends IReceiveManager
- {
- virtual ILocalMessageCollator *lookupCollator(ruid_t id) = 0;
- };
- class LocalMessagePacker : public CDummyMessagePacker
- {
- MemoryBuffer meta;
- MemoryBuffer header;
- Linked<ILocalReceiveManager> rm;
- ruid_t id;
- bool outOfBand;
- public:
- IMPLEMENT_IINTERFACE;
- LocalMessagePacker(RoxiePacketHeader &_header, bool _outOfBand, ILocalReceiveManager *_rm) : rm(_rm), outOfBand(_outOfBand)
- {
- id = _header.uid;
- header.append(sizeof(RoxiePacketHeader), &_header);
- }
- virtual void flush() override;
- virtual void sendMetaInfo(const void *buf, unsigned len) override
- {
- meta.append(len, buf);
- }
- };
- class CLocalMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface
- {
- void *data;
- unsigned datalen;
- unsigned pos;
- Linked<IRowManager> rowManager;
- public:
- IMPLEMENT_IINTERFACE;
- CLocalMessageUnpackCursor(IRowManager *_rowManager, void *_data, unsigned _datalen)
- : rowManager(_rowManager)
- {
- datalen = _datalen;
- data = _data;
- pos = 0;
- }
- ~CLocalMessageUnpackCursor()
- {
- }
- virtual bool atEOF() const
- {
- return datalen==pos;
- }
- virtual bool isSerialized() const
- {
- // NOTE: tempting to think that we could avoid serializing in localSlave case, but have to be careful about the lifespan of the rowManager...
- return true;
- }
- virtual const void * getNext(int length)
- {
- if (pos==datalen)
- return NULL;
- assertex(pos + length <= datalen);
- void * cur = ((char *) data) + pos;
- pos += length;
- void * ret = rowManager->allocate(length, 0);
- memcpy(ret, cur, length);
- //No need for finalize since only contains plain data.
- return ret;
- }
- };
- class CLocalMessageResult : implements IMessageResult, public CInterface
- {
- void *data;
- void *meta;
- void *header;
- unsigned datalen, metalen, headerlen;
- unsigned pos;
- public:
- IMPLEMENT_IINTERFACE;
- CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen)
- {
- datalen = _datalen;
- metalen = _metalen;
- headerlen = _headerlen;
- data = _data;
- meta = _meta;
- header = _header;
- pos = 0;
- }
- ~CLocalMessageResult()
- {
- free(data);
- free(meta);
- free(header);
- }
- virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
- {
- return new CLocalMessageUnpackCursor(rowMgr, data, datalen);
- }
- virtual const void *getMessageHeader(unsigned &length) const
- {
- length = headerlen;
- return header;
- }
- virtual const void *getMessageMetadata(unsigned &length) const
- {
- length = metalen;
- return meta;
- }
- virtual void discard() const
- {
- }
- };
- class CLocalMessageCollator : implements ILocalMessageCollator, public CInterface
- {
- InterruptableSemaphore sem;
- QueueOf<IMessageResult, false> pending;
- CriticalSection crit;
- Linked<IRowManager> rowManager; // Linked to ensure it lives longer than me
- Linked<ILocalReceiveManager> receiveManager;
- ruid_t id;
- unsigned totalBytesReceived;
- public:
- IMPLEMENT_IINTERFACE;
- CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid);
- ~CLocalMessageCollator();
- virtual ruid_t queryRUID() const
- {
- return id;
- }
- virtual IMessageResult* getNextResult(unsigned time_out, bool &anyActivity)
- {
- anyActivity = false;
- if (!sem.wait(time_out))
- return NULL;
- anyActivity = true;
- CriticalBlock c(crit);
- return pending.dequeue();
- }
- virtual void interrupt(IException *E)
- {
- sem.interrupt(E);
- }
- virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen)
- {
- CriticalBlock c(crit);
- if (outOfBand)
- pending.enqueueHead(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
- else
- pending.enqueue(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen));
- sem.signal();
- totalBytesReceived += datalen + metalen + headerlen;
- }
- virtual unsigned queryBytesReceived() const
- {
- return totalBytesReceived;
- }
- };
- class RoxieLocalReceiveManager : implements ILocalReceiveManager, public CInterface
- {
- MapXToMyClass<ruid_t, ruid_t, ILocalMessageCollator> collators;
- CriticalSection crit;
- Owned<StringContextLogger> logctx;
- public:
- IMPLEMENT_IINTERFACE;
- RoxieLocalReceiveManager() : logctx(new StringContextLogger("RoxieLocalReceiveManager"))
- {
- }
- virtual IMessageCollator *createMessageCollator(IRowManager *manager, ruid_t ruid)
- {
- ILocalMessageCollator *collator = new CLocalMessageCollator(manager, ruid);
- CriticalBlock b(crit);
- collators.setValue(ruid, collator);
- return collator;
- }
- virtual void detachCollator(const IMessageCollator *collator)
- {
- ruid_t id = collator->queryRUID();
- CriticalBlock b(crit);
- collators.setValue(id, NULL);
- }
- virtual ILocalMessageCollator *lookupCollator(ruid_t id)
- {
- CriticalBlock b(crit);
- ILocalMessageCollator *ret = collators.getValue(id);
- if (!ret)
- ret = collators.getValue(RUID_DISCARD);
- return LINK(ret);
- }
- };
- void LocalMessagePacker::flush()
- {
- // MORE - I think this means we don't send anything until whole message available in localSlave mode, which
- // may not be optimal.
- data.setLength(lastput);
- Owned<ILocalMessageCollator> collator = rm->lookupCollator(id);
- if (collator)
- {
- unsigned datalen = data.length();
- unsigned metalen = meta.length();
- unsigned headerlen = header.length();
- collator->enqueueMessage(outOfBand, data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen);
- }
- // otherwise Roxie server is no longer interested and we can simply discard
- }
- CLocalMessageCollator::CLocalMessageCollator(IRowManager *_rowManager, ruid_t _ruid)
- : rowManager(_rowManager), id(_ruid)
- {
- totalBytesReceived = 0;
- }
- CLocalMessageCollator::~CLocalMessageCollator()
- {
- IMessageResult *goer;
- for (;;)
- {
- goer = pending.dequeue();
- if (!goer)
- break;
- goer->Release();
- }
- }
- class RoxieLocalQueueManager : public RoxieReceiverBase
- {
- Linked<RoxieLocalReceiveManager> receiveManager;
- public:
- RoxieLocalQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers)
- {
- receiveManager.setown(new RoxieLocalReceiveManager);
- }
-
- virtual void sendPacket(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx) override
- {
- RoxiePacketHeader &header = packet->queryHeader();
- unsigned retries = header.thisChannelRetries(0);
- if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
- {
- if (traceLevel > 5)
- {
- StringBuffer s;
- DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
- }
- doFileCallback(packet);
- }
- else if (retries < SUBCHANNEL_MASK)
- {
- if (retries)
- {
- // Send back an out-of-band immediately, to let Roxie server know that channel is still active
- RoxiePacketHeader newHeader(header, ROXIE_ALIVE, 0);
- Owned<IMessagePacker> output = createOutputStream(newHeader, true, logctx);
- output->flush();
- return; // No point sending the retry in localSlave mode
- }
- RoxieQueue *targetQueue;
- #ifdef ROXIE_SLA_LOGIC
- if (header.activityId & ROXIE_SLA_PRIORITY)
- targetQueue = &slaQueue;
- else
- #endif
- if (header.activityId & ROXIE_HIGH_PRIORITY)
- targetQueue = &hiQueue;
- else
- targetQueue = &loQueue;
- if (header.channel)
- {
- targetQueue->enqueue(LINK(packet));
- }
- else
- {
- // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
- // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
- for (unsigned i = 0; i < numChannels; i++)
- {
- targetQueue->enqueue(packet->clonePacket(i+1));
- }
- }
- }
- }
- virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) override
- {
- // Don't do IBYTI's when local slave - no buddy to talk to anyway
- }
- virtual void sendAbort(RoxiePacketHeader &header, const IRoxieContextLogger &logctx) override
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbort");
- RoxiePacketHeader abortHeader(header, header.activityId & ROXIE_PRIORITY_MASK, 0);
- abortHeader.retries = QUERY_ABORTED;
- if (logctx.queryTraceLevel() > 8)
- {
- StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
- }
- MemoryBuffer data;
- data.append(sizeof(abortHeader), &abortHeader);
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
- sendPacket(packet, logctx);
- abortsSent++;
- }
- virtual void sendAbortCallback(const RoxiePacketHeader &header, const char *lfn, const IRoxieContextLogger &logctx) override
- {
- MTIME_SECTION(queryActiveTimer(), "RoxieLocalQueueManager::sendAbortCallback");
- RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK, 0);
- abortHeader.retries = QUERY_ABORTED;
- MemoryBuffer data;
- data.append(sizeof(abortHeader), &abortHeader).append(lfn);
- if (logctx.queryTraceLevel() > 5)
- {
- StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
- }
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
- sendPacket(packet, logctx);
- abortsSent++;
- }
- virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx) override
- {
- return new LocalMessagePacker(header, outOfBand, receiveManager);
- }
- virtual IReceiveManager *queryReceiveManager() override
- {
- return receiveManager;
- }
- virtual bool replyPending(RoxiePacketHeader &header) override
- {
- // MORE - should really have some code here! But returning true is a reasonable approximation.
- return true;
- }
- virtual bool abortCompleted(RoxiePacketHeader &header) override
- {
- // MORE - should really have some code here!
- return false;
- }
- };
- IRoxieOutputQueueManager *ROQ;
- extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers)
- {
- if (localSlave)
- return new RoxieLocalQueueManager(numWorkers);
- else if (useAeron)
- return new RoxieAeronSocketQueueManager(numWorkers);
- else
- return new RoxieUdpSocketQueueManager(snifferChannel, numWorkers);
- }
- //================================================================================================================================
- class PacketDiscarder : public Thread, implements IPacketDiscarder
- {
- bool aborted;
- Owned<IRowManager> rowManager; // not completely sure I need one... maybe I do
- Owned<IMessageCollator> mc;
- public:
- IMPLEMENT_IINTERFACE;
- PacketDiscarder()
- {
- aborted = false;
- };
- ~PacketDiscarder()
- {
- if (mc)
- ROQ->queryReceiveManager()->detachCollator(mc);
- mc.clear();
- }
- virtual int run()
- {
- Owned<StringContextLogger> logctx = new StringContextLogger("PacketDiscarder");
- rowManager.setown(roxiemem::createRowManager(0, NULL, *logctx, NULL, false));
- mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
- while (!aborted)
- {
- bool anyActivity = false;
- Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
- if (mr)
- {
- if (traceLevel > 4)
- DBGLOG("Discarding unwanted message");
- unsigned headerLen;
- const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
- if (headerLen)
- {
- switch (header.activityId)
- {
- case ROXIE_FILECALLBACK:
- {
- Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
- OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
- if (len)
- {
- RecordLengthType *rowlen = (RecordLengthType *) len.get();
- OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
- const char *rowdata = (const char *) row.get();
- // bool isOpt = * (bool *) rowdata;
- // bool isLocal = * (bool *) (rowdata+1);
- ROQ->sendAbortCallback(header, rowdata+2, *logctx);
- }
- else
- DBGLOG("Unrecognized format in discarded file callback");
- break;
- }
- // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
- }
- }
- else
- DBGLOG("Unwanted message had no header?!");
- }
- else if (!anyActivity)
- {
- // to avoid leaking partial unwanted packets, we clear out mc periodically...
- ROQ->queryReceiveManager()->detachCollator(mc);
- mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
- }
- }
- return 0;
- }
- virtual void start()
- {
- Thread::start();
- }
- virtual void stop()
- {
- if (mc)
- mc->interrupt();
- aborted = true;
- join();
- }
- };
- IPacketDiscarder *createPacketDiscarder()
- {
- IPacketDiscarder *packetDiscarder = new PacketDiscarder;
- packetDiscarder->start();
- return packetDiscarder;
- }
- //================================================================================================================================
- // There are various possibly interesting ways to reply to a ping:
- // Reply as soon as receive, or put it on the queue like other messages?
- // Reply for every channel, or just once for every slave?
- // Should I send on channel 0 or round-robin the channels?
- // My gut feeling is that knowing what channels are responding is useful so should reply on every unsuspended channel,
- // and that the delay caused by queuing system is an interesting part of what we want to measure (though nice to know minimum possible too)
- unsigned pingInterval = 60;
- class PingTimer : public Thread
- {
- bool aborted;
- Owned<IRowManager> rowManager;
- Owned<IMessageCollator> mc;
- StringContextLogger logctx;
- void sendPing(unsigned priorityMask)
- {
- try
- {
- unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen("PING") + 1 + sizeof(PingRecord);
- void *packetData = malloc(packetSize);
- RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
- RemoteActivityId pingId(ROXIE_PING | priorityMask, 0);
- header->init(pingId, 0, 0, 0);
- char *finger = (char *) (header + 1);
- *finger++ = (char) LOGGING_FLAGSPRESENT;
- strcpy(finger, "PING");
- finger += strlen("PING")+1;
- if (traceLevel > 1)
- DBGLOG("PING sent");
- PingRecord data;
- data.senderIP.ipset(myNode.getNodeAddress());
- data.tick = usTick();
- memcpy(finger, &data, sizeof(PingRecord));
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
- ROQ->sendPacket(packet, logctx);
- }
- catch (IException *E)
- {
- EXCLOG(E);
- E->Release();
- }
- }
- public:
- PingTimer() : logctx("PingTimer")
- {
- aborted = false;
- };
- ~PingTimer()
- {
- if (mc)
- ROQ->queryReceiveManager()->detachCollator(mc);
- mc.clear();
- }
- virtual int run()
- {
- rowManager.setown(roxiemem::createRowManager(1, NULL, queryDummyContextLogger(), NULL, false));
- mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_PING));
- unsigned pingsReceived = 0;
- unsigned pingsElapsed = 0;
- sendPing(ROXIE_HIGH_PRIORITY);
- while (!aborted)
- {
- bool anyActivity = false;
- Owned<IMessageResult> mr = mc->getNextResult(pingInterval*1000, anyActivity);
- if (mr)
- {
- unsigned headerLen;
- const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
- Owned<IMessageUnpackCursor> mu = mr->getCursor(rowManager);
- PingRecord *answer = (PingRecord *) mu->getNext(sizeof(PingRecord));
- if (answer && mu->atEOF() && headerLen==sizeof(RoxiePacketHeader))
- {
- unsigned elapsed = usTick() - answer->tick;
- pingsReceived++;
- pingsElapsed += elapsed;
- if (traceLevel > 10)
- DBGLOG("PING reply channel=%d, time %d", header->channel, elapsed); // DBGLOG is slower than the pings so be careful!
- }
- else
- DBGLOG("PING reply, garbled result");
- ReleaseRoxieRow(answer);
- }
- else if (!anyActivity)
- {
- if (!pingsReceived && roxieMulticastEnabled)
- DBGLOG("PING: NO replies received! Please check multicast settings, and that your network supports multicast.");
- else if (traceLevel)
- DBGLOG("PING: %d replies received, average delay %uus", pingsReceived, pingsReceived ? pingsElapsed / pingsReceived : 0);
- pingsReceived = 0;
- pingsElapsed = 0;
- sendPing(ROXIE_HIGH_PRIORITY); // MORE - we could think about alternating the priority or sending pings on high and low at the same time...
- }
- }
- return 0;
- }
- void stop()
- {
- if (mc)
- mc->interrupt();
- aborted = true;
- }
- static CriticalSection crit;
- } *pingTimer;
- CriticalSection PingTimer::crit;
- extern void startPingTimer()
- {
- CriticalBlock b(PingTimer::crit);
- if (!pingTimer)
- {
- pingTimer = new PingTimer();
- pingTimer->start();
- }
- }
- extern void stopPingTimer()
- {
- CriticalBlock b(PingTimer::crit);
- if (pingTimer)
- {
- pingTimer->stop();
- pingTimer->join();
- delete pingTimer;
- pingTimer = NULL;
- }
- }
|