12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define mp_decl __declspec(dllexport)
- /* TBD
- lost packet disposal
- synchronous send
- connection protocol (HRPC);
- look at all timeouts
- */
- #include "platform.h"
- #include "portlist.h"
- #include "jlib.hpp"
- #include <limits.h>
- #include "jsocket.hpp"
- #include "jmutex.hpp"
- #include "jutil.hpp"
- #include "jthread.hpp"
- #include "jqueue.tpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "mpcomm.hpp"
- #include "mpbuff.hpp"
- #include "mputil.hpp"
- #include "mplog.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- //#define _TRACE
- //#define _FULLTRACE
- #if 1 // #ifdef _FULLTRACE
- #define _TRACELINKCLOSED
- #endif
- #define _TRACEMPSERVERNOTIFYCLOSED
- #define _TRACEORPHANS
- #define REFUSE_STALE_CONNECTION
- #define MP_PROTOCOL_VERSION 0x102
- #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
- // These should really be configurable
- #define CANCELTIMEOUT 1000 // 1 sec
- #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
- #define CONNECT_READ_TIMEOUT (10*1000) // 10 seconds. NB: used by connect readtms loop (see loopCnt)
- #define CONNECT_TIMEOUT_INTERVAL 1000 // 1 sec
- #define CONNECT_RETRYCOUNT 180 // Overall max connect time is = CONNECT_RETRYCOUNT * CONNECT_READ_TIMEOUT
- #define CONNECT_TIMEOUT_MINSLEEP 2000 // random range: CONNECT_TIMEOUT_MINSLEEP to CONNECT_TIMEOUT_MAXSLEEP milliseconds
- #define CONNECT_TIMEOUT_MAXSLEEP 5000
- #define CONFIRM_TIMEOUT (90*1000) // 1.5 mins
- #define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
- #define TRACESLOW_THRESHOLD 1000 // 1 sec
- #define VERIFY_DELAY (1*60*1000) // 1 Minute
- #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
- #define DIGIT1 U64C(0x10000000000) // (256ULL*256ULL*256ULL*65536ULL)
- #define DIGIT2 U64C(0x100000000) // (256ULL*256ULL*65536ULL)
- #define DIGIT3 U64C(0x1000000) // (256ULL*65536ULL)
- #define DIGIT4 U64C(0x10000) // (65536ULL)
- #define _TRACING
- static CriticalSection verifysect;
- static CriticalSection childprocesssect;
- static UnsignedArray childprocesslist;
- // IPv6 TBD
- struct SocketEndpointV4
- {
- byte ip[4];
- unsigned short port;
- SocketEndpointV4() {};
- SocketEndpointV4(const SocketEndpoint &val) { set(val); }
- void set(const SocketEndpoint &val)
- {
- port = val.port;
- if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
- IPV6_NOT_IMPLEMENTED();
- }
- void get(SocketEndpoint &val)
- {
- val.setNetAddress(sizeof(ip),&ip);
- val.port = port;
- }
- StringBuffer & getUrlStr(StringBuffer &val)
- {
- SocketEndpoint s;
- this->get(s);
- return s.getUrlStr(val);
- }
- };
- class PacketHeader // standard packet header - no virtuals
- {
- public:
- static unsigned nextseq;
- static unsigned lasttick;
- void initseq()
- {
- sequence = msTick();
- lasttick = sequence;
- if (sequence-nextseq>USHRT_MAX)
- sequence = nextseq++;
- else
- nextseq = sequence+1;
- }
- PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
- {
- size = _size;
- tag = _tag;
- sender.set(_sender);
- target.set(_target);
- replytag = _replytag;
- flags = 0;
- version = MP_PROTOCOL_VERSION;
- initseq();
- }
- PacketHeader() {}
- size32_t size; // 0 total packet size
- mptag_t tag; // 4 packet tag (kind)
- unsigned short version; // 8 protocol version
- unsigned short flags; // 10 flags
- SocketEndpointV4 sender; // 12 who sent
- SocketEndpointV4 target; // 18 who destined for
- mptag_t replytag; // 24 used for reply
- unsigned sequence; // 28 packet type dependant
- // Total 32
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- sender.get(ep);
- mb.init(ep,tag,replytag);
- }
- };
- #if 0
- class PacketHeaderV6 : public PacketHeader
- {
- unsigned senderex[4]; // 32
- unsigned targetex[4]; // 48
- // total 64
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- ep.setNetAddress(sizeof(senderex),&senderex);
- ep.port = sender.port;
- mb.init(ep,tag,replytag);
- }
- };
- #endif
- unsigned PacketHeader::nextseq=0;
- unsigned PacketHeader::lasttick=0;
- #define MINIMUMPACKETSIZE sizeof(PacketHeader)
- #define MAXDATAPERPACKET 50000
- struct MultiPacketHeader
- {
- mptag_t tag;
- size32_t ofs;
- size32_t size;
- unsigned idx;
- unsigned numparts;
- size32_t total;
- StringBuffer &getDetails(StringBuffer &out) const
- {
- out.append("MultiPacketHeader: ");
- out.append("tag=").append((unsigned)tag);
- out.append(",ofs=").append(ofs);
- out.append(",size=").append(size);
- out.append(",idx=").append(idx);
- out.append(",numparts=").append(numparts);
- out.append(",total=").append(total);
- return out;
- }
- };
- //
- class CMPException: public IMP_Exception, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
- {
- }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- StringBuffer tmp;
- switch (error) {
- case MPERR_ok: str.append("OK"); break;
- case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).str()); break;
- case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
- case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).str()); break;
- case MPERR_link_closed: str.appendf("MP link closed (%s)",endpoint.getUrlStr(tmp).str()); break;
- }
- return str;
- }
- int errorCode() const { return error; }
- MessageAudience errorAudience() const
- {
- return MSGAUD_user;
- }
- virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
- private:
- MessagePassingError error;
- SocketEndpoint endpoint;
- };
- class CBufferQueueNotify
- {
- public:
- virtual bool notify(CMessageBuffer *)=0;
- virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
- };
- class CBufferQueueWaiting
- {
- public:
- enum QWenum { QWcontinue, QWdequeue, QWprobe };
- Semaphore sem;
- CBufferQueueNotify &waiting;
- bool probe;
- CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
- QWenum notify(CMessageBuffer *b)
- {
- // check this for DLL unloaded TBD
- if (waiting.notify(b)) {
- sem.signal();
- return probe?QWprobe:QWdequeue;
- }
- return QWcontinue;
- }
- QWenum notifyClosed(SocketEndpoint &ep)
- {
- // check this for DLL unloaded TBD
- if (waiting.notifyClosed(ep)) {
- sem.signal();
- return QWdequeue;
- }
- return QWcontinue;
- }
- };
- typedef CopyReferenceArrayOf<CBufferQueueWaiting> CWaitingArray;
- class CBufferQueue
- {
- QueueOf<CMessageBuffer, false> received;
- CWaitingArray waiting;
- CriticalSection sect;
- public:
- CBufferQueue()
- {
- }
- void enqueue(CMessageBuffer *b)
- {
- CriticalBlock block(sect);
- unsigned iter=0;
- loop {
- ForEachItemIn(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- if (r==CBufferQueueWaiting::QWdequeue)
- return;
- //CBufferQueueWaiting::QWprobe
- break;
- }
- }
- if (b->getReplyTag() != TAG_CANCEL)
- break;
- if (iter++==10) {
- delete b;
- return;
- }
- CriticalUnblock unblock(sect);
- Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
- }
- received.enqueue(b);
- }
- bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
- {
- CriticalBlock block(sect);
- bool probegot = false;
- ForEachQueueItemIn(i,received) {
- if (nfy.notify(received.item(i))) {
- if (probe) {
- probegot = true;
- }
- else {
- received.dequeue(i);
- return true;
- }
- }
- }
- if (probegot)
- return true;
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CBufferQueueWaiting qwaiting(nfy,probe);
- waiting.append(qwaiting);
- sect.leave();
- bool ok = qwaiting.sem.wait(remaining);
- sect.enter();
- if (!ok) {
- ok = qwaiting.sem.wait(0);
- if (!ok)
- waiting.zap(qwaiting);
- }
- return ok;
- }
- unsigned flush(CBufferQueueNotify &nfy)
- {
- unsigned count = 0;
- CriticalBlock block(sect);
- ForEachQueueItemInRev(i,received) {
- if (nfy.notify(received.item(i))) {
- count++;
- delete received.dequeue(i);
- }
- }
- return count;
- }
- void notifyClosed(SocketEndpoint &ep)
- {
- CriticalBlock block(sect);
- ForEachItemInRev(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- }
- }
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(sect);
- ForEachQueueItemIn(i,received) {
- received.item(i)->getDetails(buf).append('\n');
- }
- return buf;
- }
- };
- static UnsignedShortArray freetags;
- static unsigned nextfreetag=0;
- unsigned short generateDynamicTag()
- {
- if (freetags.ordinality())
- return freetags.popGet();
- return nextfreetag++;
- }
- void releaseDynamicTag(unsigned short tag)
- {
- freetags.append(tag);
- }
- bool check_kernparam(const char *path, int *value)
- {
- #ifdef __linux__
- FILE *f = fopen(path,"r");
- char res[32];
- char *r = 0;
- if (f) {
- r = fgets(res, sizeof(res), f);
- fclose(f);
- if (r) {
- *value = atoi(r);
- return true;
- }
- }
- #endif
- return false;
- }
- bool check_somaxconn(int *val)
- {
- return check_kernparam("/proc/sys/net/core/somaxconn", val);
- }
- class CMPServer;
- class CMPChannel;
- class CMPConnectThread: public Thread
- {
- bool running;
- ISocket *listensock;
- CMPServer *parent;
- int mpSoMaxConn;
- unsigned mpTraceLevel;
- void checkSelfDestruct(void *p,size32_t sz);
- public:
- CMPConnectThread(CMPServer *_parent, unsigned port);
- ~CMPConnectThread()
- {
- ::Release(listensock);
- }
- int run();
- void startPort(unsigned short port);
- void stop()
- {
- if (running) {
- running = false;
- listensock->cancel_accept();
- if (!join(1000*60*5)) // should be pretty instant
- printf("CMPConnectThread::stop timed out\n");
- }
- }
- };
- class PingPacketHandler;
- class PingReplyPacketHandler;
- class MultiPacketHandler;
- class BroadcastPacketHandler;
- class ForwardPacketHandler;
- class UserPacketHandler;
- class CMPNotifyClosedThread;
- typedef SuperHashTableOf<CMPChannel,SocketEndpoint> CMPChannelHT;
- class CMPServer: private CMPChannelHT, implements IMPServer
- {
- byte RTsalt;
- ISocketSelectHandler *selecthandler;
- CMPConnectThread *connectthread;
- CBufferQueue receiveq;
- CMPNotifyClosedThread *notifyclosedthread;
- CriticalSection sect;
- protected:
- unsigned short port;
- public:
- bool checkclosed;
- bool tryReopenChannel = false;
- // packet handlers
- PingPacketHandler *pingpackethandler; // TAG_SYS_PING
- PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
- ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
- MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
- BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
- UserPacketHandler *userpackethandler; // default
- IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
- CMPServer(unsigned _port);
- ~CMPServer();
- void start();
- virtual void stop();
- unsigned short getPort() { return port; }
- void setPort(unsigned short _port) { port = _port; }
- CMPChannel *lookup(const SocketEndpoint &remoteep);
- ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
- CBufferQueue &getReceiveQ() { return receiveq; }
- void checkTagOK(mptag_t tag);
- bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
- void flush(mptag_t tag);
- unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
- void cancel(const SocketEndpoint *ep, mptag_t tag);
- bool nextChannel(CMPChannel *&c);
- void addConnectionMonitor(IConnectionMonitor *monitor);
- void removeConnectionMonitor(IConnectionMonitor *monitor);
- void notifyClosed(SocketEndpoint &ep, bool trace);
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- return receiveq.getReceiveQueueDetails(buf);
- }
- void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
- protected:
- void onAdd(void *);
- void onRemove(void *e);
- unsigned getHashFromElement(const void *e) const;
- unsigned getHashFromFindParam(const void *fp) const;
- const void * getFindParam(const void *p) const;
- bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
- CriticalSection replyTagSect;
- int rettag;
- INode *myNode;
- public:
- virtual mptag_t createReplyTag()
- {
- // these are short-lived so a simple increment will do (I think this is OK!)
- mptag_t ret;
- {
- CriticalBlock block(replyTagSect);
- if (RTsalt==0xff) {
- RTsalt = (byte)(getRandom()%16);
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- if (rettag>(int)TAG_REPLY_BASE) { // wrapped
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- ret = (mptag_t)rettag;
- rettag -= 16;
- }
- flush(ret);
- return ret;
- }
- virtual ICommunicator *createCommunicator(IGroup *group, bool outer);
- virtual INode *queryMyNode()
- {
- return myNode;
- }
- virtual void setOpt(MPServerOpts opt, const char *value)
- {
- switch (opt)
- {
- case mpsopt_channelreopen:
- {
- bool tf = (nullptr != value) ? strToBool(value) : false;
- PROGLOG("Setting ChannelReopen = %s", tf ? "true" : "false");
- tryReopenChannel = tf;
- break;
- }
- default:
- // ignore
- break;
- }
- }
- };
- //===========================================================================
- class CMPNotifyClosedThread: public Thread
- {
- IArrayOf<IConnectionMonitor> connectionmonitors;
- CriticalSection conmonsect;
- SimpleInterThreadQueueOf<INode, false> workq;
- bool stopping;
- CMPServer *parent;
- CriticalSection stopsect;
- public:
- CMPNotifyClosedThread(CMPServer *_parent)
- : Thread("CMPNotifyClosedThread")
- {
- parent = _parent;
- stopping = false;
- }
- ~CMPNotifyClosedThread()
- {
- IArrayOf<IConnectionMonitor> todelete;
- CriticalBlock block(conmonsect);
- while (connectionmonitors.ordinality())
- todelete.append(connectionmonitors.popGet());
- }
- void addConnectionMonitor(IConnectionMonitor *monitor)
- {
- if (monitor)
- connectionmonitors.append(*LINK(monitor));
- }
- void removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- if (monitor) {
- CriticalBlock block(conmonsect);
- connectionmonitors.zap(*monitor);
- }
- }
- int run()
- {
- loop {
- try {
- Owned<INode> node = workq.dequeue();
- if (node->endpoint().isNull())
- break;
- SocketEndpoint ep = node->endpoint();
- parent->getReceiveQ().notifyClosed(ep);
- IArrayOf<IConnectionMonitor> toclose;
- {
- CriticalBlock block(conmonsect);
- ForEachItemIn(i1,connectionmonitors) {
- toclose.append(*LINK(&connectionmonitors.item(i1)));
- }
- }
- ForEachItemIn(i,toclose) {
- toclose.item(i).onClose(ep);
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- e->Release();
- }
- }
- return 0;
- }
- void stop()
- {
- {
- CriticalBlock block(stopsect);
- if (!stopping) {
- stopping = true;
- SocketEndpoint ep;
- workq.enqueue(createINode(ep));
- }
- }
- while (!join(1000*60*3))
- PROGLOG("CMPNotifyClosedThread join failed");
- }
- void notify(SocketEndpoint &ep)
- {
- CriticalBlock block(stopsect);
- if (!stopping&&!ep.isNull()) {
- if (workq.ordinality()>100)
- PROGLOG("MP: %d waiting to close",workq.ordinality());
- workq.enqueue(createINode(ep));
- }
- }
- };
- void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
- {
- dbgassertex(timeoutChkIntervalMs < timeoutMs);
- StringBuffer epStr;
- CCycleTimer readTmsTimer;
- unsigned intervalTimeoutMs = timeoutChkIntervalMs;
- loop
- {
- try
- {
- sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (JSOCKERR_graceful_close == e->errorCode())
- return;
- else if (JSOCKERR_timeout_expired != e->errorCode())
- throw;
- unsigned elapsedMs = readTmsTimer.elapsedMs();
- if (elapsedMs >= timeoutMs)
- throw;
- unsigned remainingMs = timeoutMs-elapsedMs;
- if (remainingMs < timeoutChkIntervalMs)
- intervalTimeoutMs = remainingMs;
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
- }
- }
- if (readTmsTimer.elapsedMs() >= TRACESLOW_THRESHOLD)
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
- }
- }
- class CMPPacketReader;
- class CMPChannel: public CInterface
- {
- ISocket *channelsock;
- CMPServer *parent;
- Mutex sendmutex;
- Semaphore sendwaitingsig;
- unsigned sendwaiting; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
- CriticalSection connectsect;
- CMPPacketReader *reader;
- bool master; // i.e. connected originally
- mptag_t multitag; // current multi send in progress
- bool closed;
- IArrayOf<ISocket> keptsockets;
- CriticalSection attachsect;
- unsigned __int64 attachaddrval;
- SocketEndpoint attachep;
- atomic_t attachchk;
- protected: friend class CMPServer;
- SocketEndpoint remoteep;
- SocketEndpoint localep; // who the other end thinks I am
- protected: friend class CMPPacketReader;
- unsigned lastxfer;
- #ifdef _FULLTRACE
- unsigned startxfer;
- unsigned numiter;
- #endif
- bool checkReconnect(CTimeMon &tm)
- {
- if (!parent->tryReopenChannel)
- return false;
- ::Release(channelsock);
- channelsock = nullptr;
- if (connect(tm))
- return true;
- WARNLOG("Failed to reconnect");
- return false;
- }
- bool connect(CTimeMon &tm)
- {
- // must be called from connectsect
- // also in sendmutex
- ISocket *newsock=NULL;
- unsigned retrycount = CONNECT_RETRYCOUNT;
- unsigned remaining;
- while (!channelsock) {
- try {
- StringBuffer str;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).str());
- #endif
- if (((int)tm.timeout)<0)
- remaining = CONNECT_TIMEOUT;
- else if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- PROGLOG("MP: connect timed out 1");
- #endif
- return false;
- }
- if (remaining<10000)
- remaining = 10000; // 10s min granularity for MP
- newsock = ISocket::connect_timeout(remoteep,remaining);
- newsock->set_keep_alive(true);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
- #endif
- SocketEndpointV4 id[2];
- SocketEndpoint hostep;
- hostep.setLocalHost(parent->getPort());
- id[0].set(hostep);
- id[1].set(remoteep);
- unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
- #ifdef _TRACE
- PROGLOG("MP: connect addrval = %" I64F "u", addrval);
- #endif
- newsock->write(&id[0],sizeof(id));
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- id[0].getUrlStr(tmp1);
- tmp1.append(' ');
- id[1].getUrlStr(tmp1);
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
- #endif
- size32_t reply = 0;
- size32_t rd = 0;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write, waiting for read");
- #endif
- // Wait for connection reply but also check for A<->B deadlock (where both processes are here
- // waiting for other side to send confirm) and decide who stops waiting based on address.
- // To be compatible with older versions of mplib which will not do this,
- // loop with short wait time and release CS to allow other side to proceed
- StringBuffer epStr;
- unsigned startMs = msTick();
- unsigned loopCnt = CONNECT_READ_TIMEOUT / CONNECT_TIMEOUT_INTERVAL + 1;
- #ifdef _TRACE
- PROGLOG("MP: loopCnt start = %u", loopCnt);
- #endif
- while (loopCnt-- > 0)
- {
- {
- CriticalBlock block(attachsect);
- #ifdef _TRACE
- PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", atomic_read(&attachchk), loopCnt);
- #endif
- if (atomic_read(&attachchk) > 0)
- {
- if (remoteep.equals(attachep))
- {
- #ifdef _TRACE
- PROGLOG("MP: deadlock situation [] attachaddrval = %" I64F "u addrval = %" I64F "u", attachaddrval, addrval);
- #endif
- if (attachaddrval < addrval)
- break;
- }
- }
- }
- rd = 0;
- try
- {
- newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_TIMEOUT_INTERVAL);
- }
- catch (IException *e)
- {
- #ifdef _TRACE
- PROGLOG("MP: loop exception code = %d, loopCnt = %u", e->errorCode(), loopCnt);
- #endif
- if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
- ((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
- {
- if (tm.timedout(&remaining))
- {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 3");
- #endif
- e->Release();
- newsock->Release();
- return false;
- }
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
- { // don't bother retrying on async send
- e->Release();
- throw new CMPException(MPERR_connection_failed,remoteep);
- }
- // if other side closes, connect again
- if (e->errorCode() == JSOCKERR_graceful_close)
- {
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
- e->Release();
- break;
- }
- e->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
- #endif
- }
- else
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- newsock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("MP: connect to: %s, stalled for %d ms so far", epStr.str(), msTick()-startMs);
- e->Release();
- }
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: rd = %d", rd);
- #endif
- if (rd != 0)
- break;
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(id)=%lu", rd, reply, sizeof(id));
- #endif
- if (reply!=0)
- {
- unsigned elapsedMs = msTick() - startMs;
- if (elapsedMs >= TRACESLOW_THRESHOLD)
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- newsock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
- }
- assertex(reply==sizeof(id)); // how can this fail?
- if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
- {
- newsock->Release();
- newsock = NULL;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.str());
- #endif
- lastxfer = msTick();
- closed = false;
- break;
- }
- }
- }
- catch (IException *e)
- {
- if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 2");
- #endif
- e->Release();
- return false;
- }
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- e->Release();
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
- IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
- throw e;
- }
- #ifdef _TRACE
- StringBuffer str;
- str.clear();
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
- #endif
- }
- ::Release(newsock);
- newsock = NULL;
- {
- CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
- #ifdef _FULLTRACE
- PROGLOG("MP: before sleep");
- #endif
- // check often if channelsock was created from accept thread
- Sleep(50);
- unsigned totalt = CONNECT_TIMEOUT_MINSLEEP + getRandom() % (CONNECT_TIMEOUT_MAXSLEEP-CONNECT_TIMEOUT_MINSLEEP);
- unsigned startt = msTick();
- unsigned deltat = 0;
- while (deltat < totalt)
- {
- {
- CriticalBlock block(connectsect);
- if (channelsock)
- break;
- }
- deltat = msTick() - startt;
- Sleep(50);
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: after sleep");
- #endif
- }
- }
- return true;
- }
- public:
- Semaphore pingsem;
- CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
- ~CMPChannel();
- void reset();
- bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval=0);
- bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- Linked<ISocket> dest;
- {
- CriticalBlock block(connectsect);
- if (closed) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
- PrintStackReport();
- #endif
- if (!checkReconnect(tm))
- throw new CMPException(MPERR_link_closed,remoteep);
- }
- if (!channelsock) {
- if (!connect(tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
- #endif
- return false;
- }
- }
- dest.set(channelsock);
- }
- try {
- #ifdef _FULLTRACE
- unsigned t1 = msTick();
- #endif
- if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
- // if (tm.timeout!=MP_ASYNC_SEND) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (channelsock->wait_write(remaining)==0) {
- return false;
- }
- if (tm.timedout())
- return false;
- }
- // exception checking TBD
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
- unsigned t2 = msTick();
- #endif
- unsigned n = 0;
- const void *bufs[3];
- size32_t sizes[3];
- if (hdrsize) {
- bufs[n] = hdr;
- sizes[n++] = hdrsize;
- }
- if (hdr2size) {
- bufs[n] = hdr2;
- sizes[n++] = hdr2size;
- }
- if (bodysize) {
- bufs[n] = body;
- sizes[n++] = bodysize;
- }
- if (!dest) {
- LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
- return false;
- }
- dest->write_multiple(n,bufs,sizes);
- lastxfer = msTick();
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
- #endif
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- closeSocket(false, true);
- throw;
- }
- return true;
- }
- bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
- }
- bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
- }
- bool sendPing(CTimeMon &tm);
- bool sendPingReply(unsigned timeout,bool identifyself);
- bool verifyConnection(CTimeMon &tm,bool allowconnect)
- {
- {
- CriticalBlock block(connectsect);
- if (!channelsock&&allowconnect)
- return connect(tm);
- if (closed||!channelsock)
- return false;
- if ((msTick()-lastxfer)<VERIFY_DELAY)
- return true;
- }
- StringBuffer ep;
- remoteep.getUrlStr(ep);
- loop {
- CTimeMon pingtm(1000*60);
- if (sendPing(pingtm))
- break;
- {
- CriticalBlock block(connectsect);
- if (closed||!channelsock)
- return false;
- }
- if (tm.timedout()) {
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
- closeSocket();
- return false;
- }
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
- unsigned remaining;
- if (!pingtm.timedout(&remaining)&&remaining)
- Sleep(remaining);
- }
- return true;
- }
- bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
- void closeSocket(bool keepsocket=false, bool trace=false)
- {
- ISocket *s;
- bool socketfailed = false;
- {
- CriticalBlock block(connectsect);
- if (!channelsock)
- return;
- lastxfer = msTick();
- closed = true;
- if (parent)
- parent->checkclosed = true;
- s=channelsock;
- channelsock = NULL;
- {
- CriticalBlock block(attachsect);
- attachaddrval = 0;
- attachep.set(NULL);
- atomic_set(&attachchk, 0);
- }
- if (!keepsocket) {
- try {
- s->shutdown();
- }
- catch (IException *e) {
- socketfailed = true; // ignore if the socket has been closed
- WARNLOG("closeSocket() : Ignoring shutdown error");
- e->Release();
- }
- }
- parent->querySelectHandler().remove(s);
- }
- parent->notifyClosed(remoteep, trace);
- if (socketfailed) {
- try {
- s->Release();
- }
- catch (IException *) {
- // ignore
- }
- }
- else if (keepsocket) {
- // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
- if (keptsockets.ordinality()>10)
- keptsockets.remove(0);
- keptsockets.append(*s);
- }
- else {
- try {
- s->close();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- s->Release();
- }
- }
- CMPServer &queryServer() { return *parent; }
- void monitorCheck();
- StringBuffer & queryEpStr(StringBuffer &s)
- {
- return remoteep.getUrlStr(s);
- }
- bool isClosed()
- {
- return closed;
- }
- bool isConnected()
- {
- return !closed&&(channelsock!=NULL);
- }
- };
- // Message Handlers (not done as interfaces for speed reasons
- class UserPacketHandler // default
- {
- CMPServer *server;
- public:
- UserPacketHandler(CMPServer *_server)
- {
- server = _server;
- }
- void handle(CMessageBuffer *msg) // takes ownership of message buffer
- {
- server->getReceiveQ().enqueue(msg);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
- {
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class PingPacketHandler // TAG_SYS_PING
- {
- public:
- void handle(CMPChannel *channel,bool identifyself)
- {
- channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),tm);
- }
- };
- class PingReplyPacketHandler // TAG_SYS_PING_REPLY
- {
- public:
- void handle(CMPChannel *channel)
- {
- channel->pingsem.signal();
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class CMultiPacketReceiver: public CInterface
- { // assume each sender only sends one multi-message per channel
- public:
- SocketEndpoint sender;
- MultiPacketHeader info;
- CMessageBuffer *msg;
- byte * ptr;
- };
- class MultiPacketHandler // TAG_SYS_MULTI
- {
- CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
- CriticalSection sect;
- unsigned lastErrMs;
- void logError(unsigned code, MultiPacketHeader &mhdr, CMessageBuffer &msg, MultiPacketHeader *otherMhdr)
- {
- unsigned ms = msTick();
- if ((ms-lastErrMs) > 1000) // avoid logging too much
- {
- StringBuffer errorMsg("sender=");
- msg.getSender().getUrlStr(errorMsg).newline();
- errorMsg.append("This header: ");
- mhdr.getDetails(errorMsg).newline();
- if (otherMhdr)
- {
- errorMsg.append("Other header: ");
- otherMhdr->getDetails(errorMsg).newline();
- }
- msg.getDetails(errorMsg);
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (%d) %s", code, errorMsg.str());
- }
- lastErrMs = ms;
- }
- public:
- MultiPacketHandler() : lastErrMs(0)
- {
- }
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- if (!msg)
- return NULL;
- CriticalBlock block(sect);
- MultiPacketHeader mhdr;
- msg->read(sizeof(mhdr),&mhdr);
- CMultiPacketReceiver *recv=NULL;
- ForEachItemIn(i,inprogress) {
- CMultiPacketReceiver &mpr = inprogress.item(i);
- if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
- recv = &mpr;
- break;
- }
- }
- if (mhdr.idx==0) {
- if ((mhdr.ofs!=0)||(recv!=NULL)) {
- logError(1, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- recv = new CMultiPacketReceiver;
- recv->msg = new CMessageBuffer();
- recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
- recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
- recv->sender = msg->getSender();
- recv->info = mhdr;
- inprogress.append(*recv);
- }
- else {
- if ((recv==NULL)||(mhdr.ofs==0)||
- (recv->info.ofs+recv->info.size!=mhdr.ofs)||
- (recv->info.idx+1!=mhdr.idx)||
- (recv->info.total!=mhdr.total)||
- (mhdr.ofs+mhdr.size>mhdr.total)) {
- logError(2, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- }
- msg->read(mhdr.size,recv->ptr+mhdr.ofs);
- delete msg;
- msg = NULL;
- recv->info = mhdr;
- if (mhdr.idx+1==mhdr.numparts) {
- if (mhdr.ofs+mhdr.size!=mhdr.total) {
- logError(3, mhdr, *msg, NULL);
- return NULL;
- }
- msg = recv->msg;
- inprogress.remove(i);
- }
- return msg;
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
- {
- // must not adjust mb
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- PacketHeader outhdr;
- outhdr = hdr;
- outhdr.tag = TAG_SYS_MULTI;
- MultiPacketHeader mhdr;
- mhdr.total = hdr.size-sizeof(hdr);
- mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
- mhdr.size = mhdr.total/mhdr.numparts;
- mhdr.tag = hdr.tag;
- mhdr.ofs = 0;
- mhdr.idx = 0;
- const byte *p = (const byte *)mb.toByteArray();
- unsigned i=0;
- loop {
- if (i+1==mhdr.numparts)
- mhdr.size = mhdr.total-mhdr.ofs;
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
- #endif
- outhdr.initseq();
- outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
- if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
- #endif
- return false;
- }
- i++;
- if (i==mhdr.numparts)
- break;
- sendmutex.unlock(); // allow other messages to interleave
- sendmutex.lock();
- mhdr.idx++;
- mhdr.ofs += mhdr.size;
- p += mhdr.size;
- }
- return true;
- }
- };
- class BroadcastPacketHandler // TAG_SYS_BCAST
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- class ForwardPacketHandler // TAG_SYS_FORWARD
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- // --------------------------------------------------------
- class CMPPacketReader: public ISocketSelectNotify, public CInterface
- {
- CMessageBuffer *activemsg;
- byte * activeptr;
- size32_t remaining;
- CMPChannel *parent;
- CriticalSection sect;
- public:
- IMPLEMENT_IINTERFACE;
- CMPPacketReader(CMPChannel *_parent)
- {
- init(_parent);
- }
- void init(CMPChannel *_parent)
- {
- parent = _parent;
- activemsg = NULL;
- }
- void shutdown()
- {
- CriticalBlock block(sect);
- parent = NULL;
- }
- bool notifySelected(ISocket *sock,unsigned selected)
- {
- if (!parent)
- return false;
- try {
- // try and mop up all data on socket
-
- size32_t sizeavail = sock->avail_read();
- if (sizeavail==0) {
- // graceful close
- Linked<CMPChannel> pc;
- {
- CriticalBlock block(sect);
- if (parent) {
- pc.set(parent); // don't want channel to disappear during call
- parent = NULL;
- }
- }
- if (pc)
- pc->closeSocket(false, true);
- return false;
- }
- do {
- parent->lastxfer = msTick();
- #ifdef _FULLTRACE
- parent->numiter++;
- #endif
- if (!activemsg) { // no message in progress
- PacketHeader hdr; // header for active message
- #ifdef _FULLTRACE
- parent->numiter = 1;
- parent->startxfer = msTick();
- #endif
- // assumes packet header will arrive in one go
- if (sizeavail<sizeof(hdr)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
- #endif
- size32_t szread;
- sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
- }
- else
- sock->read(&hdr,sizeof(hdr));
- if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
- // TBD IPV6 here
- SocketEndpoint ep;
- hdr.sender.get(ep);
- IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
- throw e;
- }
- if (sizeavail<=sizeof(hdr))
- sizeavail = sock->avail_read();
- else
- sizeavail -= sizeof(hdr);
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- remaining = hdr.size-sizeof(hdr);
- activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
- activeptr = (byte *)activemsg->reserveTruncate(remaining);
- hdr.setMessageFields(*activemsg);
- }
-
- size32_t toread = sizeavail;
- if (toread>remaining)
- toread = remaining;
- if (toread) {
- sock->read(activeptr,toread);
- remaining -= toread;
- sizeavail -= toread;
- activeptr += toread;
- }
- if (remaining==0) { // we have the packet so process
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
- #endif
- do {
- switch (activemsg->getTag()) {
- case TAG_SYS_MULTI:
- activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
- break;
- case TAG_SYS_PING:
- parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_PING_REPLY:
- parent->queryServer().pingreplypackethandler->handle(parent);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_BCAST:
- activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
- break;
- case TAG_SYS_FORWARD:
- activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
- break;
- default:
- parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
- activemsg = NULL;
- }
- } while (activemsg);
- }
- if (!sizeavail)
- sizeavail = sock->avail_read();
- } while (sizeavail);
- return false; // ok
- }
- catch (IException *e) {
- if (e->errorCode()!=JSOCKERR_graceful_close)
- FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
- e->Release();
- }
- // error here, so close socket (ignore error as may be closed already)
- try {
- if(parent)
- parent->closeSocket(false, true);
- }
- catch (IException *e) {
- e->Release();
- }
- parent = NULL;
- return false;
- }
- };
- CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep)
- {
- channelsock = NULL;
- parent = _parent;
- remoteep = _remoteep;
- localep.set(parent->getPort());
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- attachaddrval = 0;
- attachep.set(NULL);
- atomic_set(&attachchk, 0);
- }
- void CMPChannel::reset()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket(false, true);
- reader->Release();
- channelsock = NULL;
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- attachaddrval = 0;
- attachep.set(NULL);
- atomic_set(&attachchk, 0);
- }
- CMPChannel::~CMPChannel()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- }
- bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval) // takes ownership if succeeds
- {
- struct attachdTor
- {
- atomic_t &attchk;
- attachdTor(atomic_t &_attchk) : attchk(_attchk) { }
- ~attachdTor() { atomic_dec(&attchk); }
- } attachChk (attachchk);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket on entry, ismaster = %d, confirm = %p, channelsock = %p, addrval = %" I64F "u", ismaster, confirm, channelsock, addrval);
- #endif
- {
- CriticalBlock block(attachsect);
- attachaddrval = addrval;
- attachep = _remoteep;
- atomic_inc(&attachchk);
- }
- CriticalBlock block(connectsect);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket got connectsect, channelsock = %p", channelsock);
- #endif
- // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
- if (channelsock) {
- if (_remoteep.port==0)
- return false;
- StringBuffer ep1;
- StringBuffer ep2;
- _localep.getUrlStr(ep1);
- _remoteep.getUrlStr(ep2);
- LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
- try {
- if (ismaster!=master) {
- if (ismaster) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
- return false;
- }
- else {
- Sleep(50); // give the other side some time to close
- CTimeMon tm(10000);
- if (verifyConnection(tm,false)) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
- return false;
- }
- }
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
- e->Release();
- }
- try {
- LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
- CriticalUnblock unblock(connectsect);
- closeSocket(true, true);
- #ifdef REFUSE_STALE_CONNECTION
- if (!ismaster)
- return false;
- #endif
- Sleep(100); // pause to allow close socket triggers to run
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
- e->Release();
- }
- }
- if (confirm)
- newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
- closed = false;
- reader->init(this);
- channelsock = LINK(newsock);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket before select add");
- #endif
- parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket after select add");
- #endif
- localep = _localep;
- master = ismaster;
- return true;
- }
- bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
- {
- // note must not adjust mb
- assertex(tag!=TAG_NULL);
- assertex(tm.timeout);
- size32_t msgsize = mb.length();
- PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
- if (closed||(reply&&!isConnected())) { // flag error if has been disconnected
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
- PrintStackReport();
- #endif
- if (!checkReconnect(tm))
- throw new CMPException(MPERR_link_closed,remoteep);
- }
- bool ismulti = (msgsize>MAXDATAPERPACKET);
- // pre-condition - ensure no clashes
- loop {
- sendmutex.lock();
- if (ismulti) {
- if (multitag==TAG_NULL) // don't want to interleave with other multi send
- break;
- }
- else if (multitag!=tag) // don't want to interleave with another of same tag
- break;
- sendwaiting++;
- sendmutex.unlock();
- sendwaitingsig.wait();
- }
- struct Cpostcondition // can we start using eiffel
- {
- Mutex &sendmutex;
- unsigned &sendwaiting;
- Semaphore &sendwaitingsig;
- mptag_t *multitag;
- Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
- : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
- {
- multitag = _multitag;
- }
- ~Cpostcondition()
- {
- if (multitag)
- *multitag = TAG_NULL;
- if (sendwaiting) {
- sendwaitingsig.signal(sendwaiting);
- sendwaiting = 0;
- }
- sendmutex.unlock();
- }
- } postcond(sendmutex,sendwaiting,sendwaitingsig,ismulti?&multitag:NULL);
- if (ismulti)
- return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
- return parent->userpackethandler->send(this,hdr,mb,tm);
- }
- bool CMPChannel::sendPing(CTimeMon &tm)
- {
- unsigned remaining;
- tm.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
- bool ret = false;
- try {
- ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
- e->Release();
- }
- sendmutex.unlock();
- if (ret)
- ret = pingsem.wait(remaining);
- return ret;
- }
- bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
- {
- CTimeMon mon(timeout);
- unsigned remaining;
- mon.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- MemoryBuffer mb;
- if (identifyself) {
- #ifdef _WIN32
- mb.append(GetCommandLine());
- #endif
- }
- PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
- bool ret;
- try {
- ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
- e->Release();
- ret = false;
- }
- sendmutex.unlock();
- return ret;
- }
-
- // --------------------------------------------------------
- CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
- : Thread("MP Connection Thread")
- {
- parent = _parent;
- mpSoMaxConn = 0;
- mpTraceLevel = 0;
- Owned<IPropertyTree> env = getHPCCEnvironment();
- if (env)
- {
- mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
- if (!mpSoMaxConn)
- mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
- mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
- }
- if (mpSoMaxConn)
- {
- int kernSoMaxConn = 0;
- bool soMaxCheck = check_somaxconn(&kernSoMaxConn);
- if (soMaxCheck && (mpSoMaxConn > kernSoMaxConn))
- WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
- }
- if (!mpSoMaxConn)
- mpSoMaxConn = DEFAULT_LISTEN_QUEUE_SIZE;
- if (!port)
- {
- // need to connect early to resolve clash
- unsigned minPort, maxPort;
- if (env)
- {
- minPort = env->getPropInt("EnvSettings/mpStart", 0);
- if (!minPort)
- minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
- maxPort = env->getPropInt("EnvSettings/mpEnd", 0);
- if (!maxPort)
- maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
- }
- else
- {
- minPort = MP_START_PORT;
- maxPort = MP_END_PORT;
- }
- assertex(maxPort >= minPort);
- Owned<IJSOCK_Exception> lastErr;
- unsigned numPorts = maxPort - minPort + 1;
- for (unsigned retries = 0; retries < numPorts * 3; retries++)
- {
- port = minPort + getRandom() % numPorts;
- try
- {
- listensock = ISocket::create(port, mpSoMaxConn);
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (e->errorCode()!=JSOCKERR_port_in_use)
- throw;
- lastErr.setown(e);
- }
- }
- if (!listensock)
- throw lastErr.getClear();
- }
- else
- listensock = NULL; // delay create till running
- parent->setPort(port);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
- #endif
- running = false;
- }
- void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
- {
- byte *b = (byte *)p;
- while (sz--)
- if (*(b++)!=0xff)
- return;
- // Panic!
- PROGLOG("MP Self destruct invoked");
- try {
- if (listensock) {
- listensock->close();
- listensock->Release();
- listensock=NULL;
- }
- }
- catch (...)
- {
- PROGLOG("MP socket close failure");
- }
- // Kill registered child processes
- PROGLOG("MP self destruct exit");
- queryLogMsgManager()->flushQueue(10*1000);
- #ifdef _WIN32
- ForEachItemIn(i,childprocesslist)
- TerminateProcess((HANDLE)childprocesslist.item(i), 1);
- TerminateProcess(GetCurrentProcess(), 1);
- #else
- ForEachItemIn(i,childprocesslist)
- ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
- ::kill(getpid(), SIGTERM);
- #endif
- _exit(1);
- }
- void CMPConnectThread::startPort(unsigned short port)
- {
- if (!listensock)
- listensock = ISocket::create(port, mpSoMaxConn);
- running = true;
- Thread::start();
- }
- int CMPConnectThread::run()
- {
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
- #endif
- while (running) {
- ISocket *sock=NULL;
- try {
- sock=listensock->accept(true);
- #ifdef _FULLTRACE
- StringBuffer s;
- SocketEndpoint ep1;
- if (sock) {
- sock->getPeerEndpoint(ep1);
- PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
- }
- #endif
- }
- catch (IException *e)
- {
- LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
- throw; // error handling TBD
- }
- if (sock) {
- try {
- sock->set_keep_alive(true);
- size32_t rd;
- SocketEndpoint _remoteep;
- SocketEndpoint hostep;
- SocketEndpointV4 id[2];
- traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
- if (0 == rd)
- {
- if (mpTraceLevel > 1)
- {
- // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint())
- StringBuffer errMsg("MP Connect Thread: connect with no msg received, assumed port monitor check");
- PROGLOG("%s", errMsg.str());
- }
- sock->close();
- sock->Release();
- continue;
- }
- else if (rd != sizeof(id))
- {
- // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");
- ep.getUrlStr(errMsg);
- FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
- sock->close();
- sock->Release();
- continue;
- }
- id[0].get(_remoteep);
- id[1].get(hostep);
- unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
- #ifdef _TRACE
- PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval);
- #endif
- if (_remoteep.isNull() || hostep.isNull())
- {
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- StringBuffer errMsg;
- SocketEndpointV4 zeroTest[2];
- memset(zeroTest, 0x0, sizeof(zeroTest));
- if (memcmp(id, zeroTest, sizeof(id)))
- {
- // JCSMORE, I think _remoteep really must/should match a IP of this local host
- errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from ");
- ep.getUrlStr(errMsg);
- FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
- }
- else if (mpTraceLevel > 1)
- {
- // all zeros msg received
- errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
- ep.getUrlStr(errMsg);
- PROGLOG("%s", errMsg.str());
- }
- sock->close();
- sock->Release();
- continue;
- }
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- _remoteep.getUrlStr(tmp1);
- tmp1.append(' ');
- hostep.getUrlStr(tmp1);
- PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
- #endif
- checkSelfDestruct(&id[0],sizeof(id));
- Owned<CMPChannel> channel = parent->lookup(_remoteep);
- if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
- #ifdef _FULLTRACE
- PROGLOG("MP Connect Thread: lookup failed");
- #endif
- }
- else {
- #ifdef _TRACE
- StringBuffer str1;
- StringBuffer str2;
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).str());
- #endif
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: Connect Thread: after write");
- #endif
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
- sock->close();
- e->Release();
- }
- try {
- sock->Release();
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
- e->Release();
- }
- }
- else {
- if (running)
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
- }
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
- #endif
- return 0;
- }
- // --------------------------------------------------------
- class CMPChannelIterator
- {
- CMPServer &parent;
- CMPChannel *cur;
- public:
- CMPChannelIterator(CMPServer &_parent)
- : parent(_parent)
- {
- cur = NULL;
- }
- bool first()
- {
- cur = NULL;
- return parent.nextChannel(cur);
- }
- bool next()
- {
- return cur&&parent.nextChannel(cur);
- }
- bool isValid()
- {
- return cur!=NULL;
- }
- CMPChannel &query()
- {
- return *cur;
- }
- };
- //-----------------------------------------------------------------------------------
- CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
- {
- // there is an assumption here that no removes will be done within this loop
- CriticalBlock block(sect);
- SocketEndpoint ep = endpoint;
- CMPChannel *e=find(ep);
- // Check for freed channels
- if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
- e->reset();
- if (checkclosed) {
- checkclosed = false;
- CMPChannel *c = NULL;
- loop {
- c = next(c);
- if (!c) {
- break;
- }
- if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
- removeExact(c);
- c = NULL;
- }
- }
- e=find(ep);
- }
- if (!e) {
- e = new CMPChannel(this,ep);
- add(*e);
- }
- return LINK(e);
- }
- CMPServer::CMPServer(unsigned _port)
- {
- RTsalt=0xff;
- port = 0; // connectthread tells me what port it actually connected on
- checkclosed = false;
- connectthread = new CMPConnectThread(this, _port);
- selecthandler = createSocketSelectHandler();
- pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
- pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
- forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
- multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
- broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
- userpackethandler = new UserPacketHandler(this); // default
- notifyclosedthread = new CMPNotifyClosedThread(this);
- notifyclosedthread->start();
- selecthandler->start();
- rettag = (int)TAG_REPLY_BASE; // NB negative
- SocketEndpoint ep(port); // NB port set by connectthread constructor
- myNode = createINode(ep);
- }
- CMPServer::~CMPServer()
- {
- #ifdef _TRACEORPHANS
- StringBuffer buf;
- getReceiveQueueDetails(buf);
- if (buf.length())
- LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
- #endif
- _releaseAll();
- selecthandler->Release();
- notifyclosedthread->stop();
- notifyclosedthread->Release();
- connectthread->Release();
-
- delete pingpackethandler;
- delete pingreplypackethandler;
- delete forwardpackethandler;
- delete multipackethandler;
- delete broadcastpackethandler;
- delete userpackethandler;
- ::Release(myNode);
- }
- void CMPServer::checkTagOK(mptag_t tag)
- {
- if ((int)tag<=(int)TAG_REPLY_BASE) {
- int dif = (int)TAG_REPLY_BASE-(int)tag;
- if (dif%16!=RTsalt) {
- ERRLOG("**Invalid MP tag used");
- PrintStackReport();
- }
- }
- }
- bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
- {
- checkTagOK(tag);
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- CMessageBuffer *result;
- const SocketEndpoint *ep;
- SocketEndpoint closedEp; // used if receiving on RANK_ALL
- mptag_t tag;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
- bool notify(CMessageBuffer *msg)
- {
- if ((tag==TAG_ALL)||(tag==msg->getTag())) {
- SocketEndpoint senderep = msg->getSender();
- if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
- if (msg->getReplyTag()==TAG_CANCEL)
- delete msg;
- else
- result = msg;
- return true;
- }
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &_closedEp) // called when connection closed
- {
- if (NULL == ep) { // ep is NULL if receiving on RANK_ALL
- closedEp = _closedEp;
- ep = &closedEp; // used for abort info
- aborted = true;
- return true;
- }
- else if (ep->equals(_closedEp)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag);
- if (receiveq.wait(nfy,false,tm)&&nfy.result) {
- mbuf.transferFrom(*nfy.result);
- delete nfy.result;
- return true;
- }
- if (nfy.aborted) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
- throw e;
- }
- return false;
- }
- void CMPServer::flush(mptag_t tag)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- mptag_t tag;
- Cnfy(mptag_t _tag) { tag = _tag; }
- bool notify(CMessageBuffer *msg)
- {
- return (tag==TAG_ALL)||(tag==msg->getTag());
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- return false;
- }
- } nfy(tag);
- unsigned count = receiveq.flush(nfy);
- if (count)
- PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
- }
- void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
- {
- CMessageBuffer *cancelmsg = new CMessageBuffer(0);
- SocketEndpoint send;
- if (ep)
- send = *ep;
- cancelmsg->init(send,tag,TAG_CANCEL);
- getReceiveQ().enqueue(cancelmsg);
- }
- unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- SocketEndpoint &sender;
- const SocketEndpoint *ep;
- mptag_t tag;
- bool cancel;
- unsigned count;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
- {
- ep = _ep;
- tag = _tag;
- cancel = false;
- aborted = false;
- count = 0;
- }
- bool notify(CMessageBuffer *msg)
- {
- if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
- ((ep==NULL)||ep->equals(msg->getSender()))) {
- if (count==0) {
- sender = msg->getSender();
- cancel = (msg->getReplyTag()==TAG_CANCEL);
- }
- count++;
- return true;
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- if (ep&&ep->equals(closedep)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag,sender);
- if (receiveq.wait(nfy,true,tm)) {
- return nfy.cancel?0:nfy.count;
- }
- if (nfy.aborted) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
- throw e;
- }
- return 0;
- }
- void CMPServer::start()
- {
- connectthread->startPort(getPort());
- }
- void CMPServer::stop()
- {
- selecthandler->stop(true);
- connectthread->stop();
- CMPChannel *c = NULL;
- loop {
- c = (CMPChannel *)next(c);
- if (!c)
- break;
- c->closeSocket();
- }
- }
- void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->addConnectionMonitor(monitor);
- }
- void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->removeConnectionMonitor(monitor);
- }
- void CMPServer::onAdd(void *)
- {
- // not used
- }
- void CMPServer::onRemove(void *e)
- {
- CMPChannel &elem=*(CMPChannel *)e;
- elem.Release();
- }
- unsigned CMPServer::getHashFromElement(const void *e) const
- {
- const CMPChannel &elem=*(const CMPChannel *)e;
- return elem.remoteep.hash(0);
- }
- unsigned CMPServer::getHashFromFindParam(const void *fp) const
- {
- return ((const SocketEndpoint*)fp)->hash(0);
- }
- const void * CMPServer::getFindParam(const void *p) const
- {
- const CMPChannel &elem=*(const CMPChannel *)p;
- return &elem.remoteep;
- }
- bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
- }
- bool CMPServer::nextChannel(CMPChannel *&cur)
- {
- CriticalBlock block(sect);
- cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
- return cur!=NULL;
- }
- void CMPServer::notifyClosed(SocketEndpoint &ep, bool trace)
- {
- #ifdef _TRACEMPSERVERNOTIFYCLOSED
- if (trace)
- {
- StringBuffer url;
- LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
- PrintStackReport();
- }
- #endif
- notifyclosedthread->notify(ep);
- }
- // --------------------------------------------------------
- class CInterCommunicator: public IInterCommunicator, public CInterface
- {
- CMPServer *parent;
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (!dst)
- return false;
- if (dst->equals(queryMyNode())) {
- CMessageBuffer *msg = new CMessageBuffer();
- mptag_t reply = mbuf.getReplyTag();
- msg->transferFrom(mbuf);
- msg->init(dst->endpoint(),tag,reply);
- parent->getReceiveQ().enqueue(msg);
- mbuf.clear(); // for consistent semantics
- return true;
- }
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(dst->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- mbuf.clear(); // for consistent semantics
- return true;
- }
- bool verifyConnection(INode *node, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel->verifyConnection(tm,true);
- }
- void verifyAll(StringBuffer &log)
- {
- CMPChannelIterator iter(*parent);
- if (iter.first()) {
- do {
- CriticalBlock block(verifysect);
- CTimeMon tm(5000);
- CMPChannel &channel = iter.query();
- if (!channel.isClosed()) {
- channel.queryEpStr(log).append(' ');
- if (channel.verifyConnection(tm,false))
- log.append("OK\n");
- else
- log.append("FAILED\n");
- }
- }
- while (iter.next());
- }
- }
- bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank(parent->queryMyNode());
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (myrank!=rank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel->verifyConnection(tm,true)) {
- return false;
- }
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
- while (!channel->verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
- {
- if (sender)
- *sender = NULL;
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = createINode(res);
- return ret;
- }
- return 0;
- }
-
- bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (sender)
- *sender = NULL;
- CTimeMon tm(timeout);
- loop
- {
- try
- {
- if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
- {
- if (sender)
- *sender = createINode(mbuf.getSender());
- return true;
- }
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (src && (ep == src->endpoint()))
- throw;
- // ignoring closed endpoint
- e->Release();
- // loop around and recv again
- }
- }
- }
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(dst);
- mptag_t replytag = parent->createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,dst,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
- {
- Owned<INode> dst(createINode(mbuff.getSender()));
- return send(mbuff,dst,mbuff.getReplyTag(),timeout);
- }
- void cancel(INode *src, mptag_t tag)
- {
- parent->cancel(src?&src->endpoint():NULL,tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- channel->closeSocket();
- parent->removeChannel(channel);
- }
- CInterCommunicator(CMPServer *_parent)
- {
- parent = _parent;
- }
- ~CInterCommunicator()
- {
- }
- };
- class CCommunicator: public ICommunicator, public CInterface
- {
- IGroup *group;
- CMPServer *parent;
- bool outer;
- rank_t myrank;
- const SocketEndpoint &queryEndpoint(rank_t rank)
- {
- return group->queryNode(rank).endpoint();
- }
- CMPChannel *getChannel(rank_t rank)
- {
- return parent->lookup(queryEndpoint(rank));
- }
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
- {
- // send does not corrupt mbuf
- if (dstrank==RANK_NULL)
- return false;
- if (dstrank==myrank) {
- CMessageBuffer *msg = mbuf.clone();
- // change sender
- msg->init(parent->queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
- parent->getReceiveQ().enqueue(msg);
- }
- else {
- CTimeMon tm(timeout);
- rank_t endrank;
- if (dstrank==RANK_ALL) {
- send(mbuf,myrank,tag,timeout);
- dstrank = RANK_ALL_OTHER;
- }
- if (dstrank==RANK_ALL_OTHER) {
- dstrank = 0;
- endrank = group->ordinality()-1;
- }
- else if (dstrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- dstrank = getRandom()%group->ordinality();
- } while (dstrank==myrank);
- }
- else {
- assertex(myrank!=0);
- dstrank = 0;
- }
- endrank = dstrank;
- }
- else
- endrank = dstrank;
- for (;dstrank<=endrank;dstrank++) {
- if (dstrank!=myrank) {
- Owned<CMPChannel> channel = getChannel(dstrank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- }
- }
- }
- return true;
- }
- void barrier(void)
- {
- #ifdef _TRACE
- PrintLog("MP: barrier enter");
- #endif
- /*
- * Use the dissemination algorithm described in:
- * Debra Hensgen, Raphael Finkel, and Udi Manbet, "Two Algorithms for Barrier Synchronization,"
- * International Journal of Parallel Programming, 17(1):1-17, 1988.
- * It uses ceiling(lgp) steps. In step k, 0 <= k <= (ceiling(lgp)-1),
- * process i sends to process (i + 2^k) % p and receives from process (i - 2^k + p) % p.
- */
- int numranks = group->ordinality();
- CMessageBuffer mb;
- rank_t r;
- int mask = 0x1;
- while (mask < numranks)
- {
- int dst = (myrank + mask) % numranks;
- int src = (myrank - mask + numranks) % numranks;
- #ifdef _TRACE
- PrintLog("MP: barrier: send to %d, recv from %d", dst, src);
- #endif
- // NOTE: MPI method MUST use sendrecv so as to not send/recv deadlock ...
- mb.clear();
- mb.append("MPTAG_BARRIER");
- bool oks = send(mb,dst,MPTAG_BARRIER,120000);
- mb.clear();
- bool okr = recv(mb,src,MPTAG_BARRIER,&r);
- if (!oks && !okr)
- {
- PrintLog("MP: barrier: Error sending or recving");
- break;
- }
- mask <<= 1;
- }
- #ifdef _TRACE
- PrintLog("MP: barrier leave");
- #endif
- }
- bool verifyConnection(rank_t rank, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- assertex(rank!=RANK_RANDOM);
- assertex(rank!=RANK_ALL);
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = getChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel->verifyConnection(tm,true);
- }
- bool verifyAll(bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (rank!=myrank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- Owned<CMPChannel> channel = getChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel->verifyConnection(tm,true))
- return false;
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- Owned<CMPChannel> channel = getChannel(rank);
- while (!channel->verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
- {
- assertex(srcrank!=RANK_NULL);
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = group->rank(res);
- return ret;
- }
- if (sender)
- *sender = RANK_NULL;
- return 0;
- }
- bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(srcrank!=RANK_NULL);
- const SocketEndpoint *srcep=NULL;
- if (srcrank==RANK_ALL) {
- if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
- srcep = &queryEndpoint(0);
- }
- else
- srcep = &queryEndpoint(srcrank);
- CTimeMon tm(timeout);
- loop
- {
- try
- {
- if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
- {
- if (sender)
- *sender = group->rank(mbuf.getSender());
- return true;
- }
- if (sender)
- *sender = RANK_NULL;
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (RANK_NULL != group->rank(ep))
- throw;
- // ignoring closed endpoint from outside the communicator group
- e->Release();
- // loop around and recv again
- }
- }
- }
-
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- IGroup &queryGroup() { return *group; }
- IGroup *getGroup() { return LINK(group); }
- bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
- if (sendrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- sendrank = getRandom()%group->ordinality();
- } while (sendrank==myrank);
- }
- else {
- assertex(myrank!=0);
- sendrank = 0;
- }
- }
- mptag_t replytag = parent->createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,sendrank,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
- {
- mptag_t replytag = mbuf.getReplyTag();
- rank_t dstrank = group->rank(mbuf.getSender());
- if (dstrank!=RANK_NULL) {
- if (send (mbuf, dstrank, replytag,timeout)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
-
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(mbuf.getSender());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (channel->send(mbuf,replytag,TAG_NULL,tm, true)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
- void cancel(rank_t srcrank, mptag_t tag)
- {
- assertex(srcrank!=RANK_NULL);
- parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- channel->closeSocket();
- parent->removeChannel(channel);
- }
- CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
- {
- outer = _outer;
- parent = _parent;
- group = LINK(_group);
- myrank = group->rank(parent->queryMyNode());
- }
- ~CCommunicator()
- {
- group->Release();
- }
- };
- // Additional CMPServer methods
- ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
- {
- return new CCommunicator(this,group,outer);
- }
- ///////////////////////////////////
- IMPServer *startNewMPServer(unsigned port)
- {
- assertex(sizeof(PacketHeader)==32);
- CMPServer *mpServer = new CMPServer(port);
- mpServer->start();
- return mpServer;
- }
- class CGlobalMPServer : public CMPServer
- {
- int nestLevel;
- bool paused;
- IInterCommunicator *worldcomm;
- public:
- static CriticalSection sect;
- CGlobalMPServer(unsigned _port) : CMPServer(_port)
- {
- worldcomm = NULL;
- nestLevel = 0;
- }
- ~CGlobalMPServer()
- {
- ::Release(worldcomm);
- }
- IInterCommunicator &queryWorldCommunicator()
- {
- if (!worldcomm)
- worldcomm = new CInterCommunicator(this);
- return *worldcomm;
- }
- unsigned incNest() { return ++nestLevel; }
- unsigned decNest() { return --nestLevel; }
- unsigned queryNest() { return nestLevel; }
- bool isPaused() const { return paused; }
- void setPaused(bool onOff) { paused = onOff; }
- };
- CriticalSection CGlobalMPServer::sect;
- static CGlobalMPServer *globalMPServer;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- globalMPServer = NULL;
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(globalMPServer);
- }
- void startMPServer(unsigned port, bool paused)
- {
- assertex(sizeof(PacketHeader)==32);
- CriticalBlock block(CGlobalMPServer::sect);
- if (NULL == globalMPServer)
- {
- globalMPServer = new CGlobalMPServer(port);
- initMyNode(globalMPServer->getPort());
- }
- if (0 == globalMPServer->queryNest())
- {
- if (paused)
- {
- globalMPServer->setPaused(paused);
- return;
- }
- queryLogMsgManager()->setPort(globalMPServer->getPort());
- globalMPServer->start();
- globalMPServer->setPaused(false);
- }
- globalMPServer->incNest();
- }
- void stopMPServer()
- {
- CGlobalMPServer *_globalMPServer = NULL;
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (NULL == globalMPServer)
- return;
- if (0 == globalMPServer->decNest())
- {
- stopLogMsgReceivers();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
- #endif
- _globalMPServer = globalMPServer;
- globalMPServer = NULL;
- }
- }
- if (NULL == _globalMPServer)
- return;
- _globalMPServer->stop();
- _globalMPServer->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
- #endif
- CriticalBlock block(CGlobalMPServer::sect);
- initMyNode(0);
- }
- bool hasMPServerStarted()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- return globalMPServer != NULL;
- }
- IInterCommunicator &queryWorldCommunicator()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- return globalMPServer->queryWorldCommunicator();
- }
- mptag_t createReplyTag()
- {
- assertex(globalMPServer);
- return globalMPServer->createReplyTag();
- }
- ICommunicator *createCommunicator(IGroup *group, bool outer)
- {
- assertex(globalMPServer);
- return globalMPServer->createCommunicator(group, outer);
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (globalMPServer)
- globalMPServer->getReceiveQueueDetails(buf);
- return buf;
- }
- void addMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- globalMPServer->addConnectionMonitor(monitor);
- }
- void removeMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (globalMPServer)
- globalMPServer->removeConnectionMonitor(monitor);
- }
- IMPServer *getMPServer()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- return LINK(globalMPServer);
- }
- void registerSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.append((unsigned)handle);
- }
- void unregisterSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.zap((unsigned)handle);
- }
|