123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define mp_decl __declspec(dllexport)
- /* TBD
- lost packet disposal
- synchronous send
- connection protocol (HRPC);
- look at all timeouts
- */
- #include "platform.h"
- #include "portlist.h"
- #include "jlib.hpp"
- #include <limits.h>
- #include "jsocket.hpp"
- #include "jmutex.hpp"
- #include "jutil.hpp"
- #include "jthread.hpp"
- #include "jqueue.tpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "mpcomm.hpp"
- #include "mpbuff.hpp"
- #include "mputil.hpp"
- #include "mplog.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- //#define _TRACE
- //#define _FULLTRACE
- #define _TRACEORPHANS
- #define REFUSE_STALE_CONNECTION
- #define MP_PROTOCOL_VERSION 0x102
- #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
- #define CANCELTIMEOUT 1000 // 1 sec
- #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
- #define CONNECT_READ_TIMEOUT (3*60*1000) // 3 min
- #define CONFIRM_TIMEOUT (CONNECT_READ_TIMEOUT/2)
- #define VERIFY_DELAY (1*60*1000) // 1 Minute
- #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
- #define _TRACING
- static CriticalSection verifysect;
- static CriticalSection childprocesssect;
- static UnsignedArray childprocesslist;
- // IPv6 TBD
- struct SocketEndpointV4
- {
- byte ip[4];
- unsigned short port;
- SocketEndpointV4() {};
- SocketEndpointV4(const SocketEndpoint &val) { set(val); }
- void set(const SocketEndpoint &val)
- {
- port = val.port;
- if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
- IPV6_NOT_IMPLEMENTED();
- }
- void get(SocketEndpoint &val)
- {
- val.setNetAddress(sizeof(ip),&ip);
- val.port = port;
- }
- };
- class PacketHeader // standard packet header - no virtuals
- {
- public:
- static unsigned nextseq;
- static unsigned lasttick;
- void initseq()
- {
- sequence = msTick();
- lasttick = sequence;
- if (sequence-nextseq>USHRT_MAX)
- sequence = nextseq++;
- else
- nextseq = sequence+1;
- }
- PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
- {
- size = _size;
- tag = _tag;
- sender.set(_sender);
- target.set(_target);
- replytag = _replytag;
- flags = 0;
- version = MP_PROTOCOL_VERSION;
- initseq();
- }
- PacketHeader() {}
- size32_t size; // 0 total packet size
- mptag_t tag; // 4 packet tag (kind)
- unsigned short version; // 8 protocol version
- unsigned short flags; // 10 flags
- SocketEndpointV4 sender; // 12 who sent
- SocketEndpointV4 target; // 18 who destined for
- mptag_t replytag; // 24 used for reply
- unsigned sequence; // 28 packet type dependant
- // Total 32
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- sender.get(ep);
- mb.init(ep,tag,replytag);
- }
- };
- class PacketHeaderV6 : public PacketHeader
- {
- unsigned senderex[4]; // 32
- unsigned targetex[4]; // 48
- // total 64
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- ep.setNetAddress(sizeof(senderex),&senderex);
- ep.port = sender.port;
- mb.init(ep,tag,replytag);
- }
- };
-
- unsigned PacketHeader::nextseq=0;
- unsigned PacketHeader::lasttick=0;
- #define MINIMUMPACKETSIZE sizeof(PacketHeader)
- #define MAXDATAPERPACKET 50000
- struct MultiPacketHeader
- {
- mptag_t tag;
- size32_t ofs;
- size32_t size;
- unsigned idx;
- unsigned numparts;
- size32_t total;
- StringBuffer &getDetails(StringBuffer &out) const
- {
- out.append("MultiPacketHeader: ");
- out.append("tag=").append((unsigned)tag);
- out.append(",ofs=").append(ofs);
- out.append(",size=").append(size);
- out.append(",idx=").append(idx);
- out.append(",numparts=").append(numparts);
- out.append(",total=").append(total);
- return out;
- }
- };
- //
- class CMPException: public CInterface, public IMP_Exception
- {
- public:
- IMPLEMENT_IINTERFACE;
- CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
- {
- }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- StringBuffer tmp;
- switch (error) {
- case MPERR_ok: str.append("OK"); break;
- case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
- case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- case MPERR_link_closed: str.appendf("MP link closed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- }
- return str;
- }
- int errorCode() const { return error; }
- MessageAudience errorAudience() const
- {
- return MSGAUD_user;
- }
- virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
- private:
- MessagePassingError error;
- SocketEndpoint endpoint;
- };
- class CBufferQueueNotify
- {
- public:
- virtual bool notify(CMessageBuffer *)=0;
- virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
- };
- class CBufferQueueWaiting
- {
- public:
- enum QWenum { QWcontinue, QWdequeue, QWprobe };
- Semaphore sem;
- CBufferQueueNotify &waiting;
- bool probe;
- CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
- QWenum notify(CMessageBuffer *b)
- {
- // check this for DLL unloaded TBD
- if (waiting.notify(b)) {
- sem.signal();
- return probe?QWprobe:QWdequeue;
- }
- return QWcontinue;
- }
- QWenum notifyClosed(SocketEndpoint &ep)
- {
- // check this for DLL unloaded TBD
- if (waiting.notifyClosed(ep)) {
- sem.signal();
- return QWdequeue;
- }
- return QWcontinue;
- }
- };
- MAKEPointerArray(CBufferQueueWaiting,CWaitingArray);
- class CBufferQueue
- {
- QueueOf<CMessageBuffer, false> received;
- CWaitingArray waiting;
- CriticalSection sect;
- public:
- CBufferQueue()
- {
- }
- void enqueue(CMessageBuffer *b)
- {
- CriticalBlock block(sect);
- unsigned iter=0;
- loop {
- ForEachItemIn(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- if (r==CBufferQueueWaiting::QWdequeue)
- return;
- //CBufferQueueWaiting::QWprobe
- break;
- }
- }
- if (b->getReplyTag() != TAG_CANCEL)
- break;
- if (iter++==10) {
- delete b;
- return;
- }
- CriticalUnblock unblock(sect);
- Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
- }
- received.enqueue(b);
- }
- bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
- {
- CriticalBlock block(sect);
- bool probegot = false;
- ForEachQueueItemIn(i,received) {
- if (nfy.notify(received.item(i))) {
- if (probe) {
- probegot = true;
- }
- else {
- received.dequeue(i);
- return true;
- }
- }
- }
- if (probegot)
- return true;
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CBufferQueueWaiting qwaiting(nfy,probe);
- waiting.append(qwaiting);
- sect.leave();
- bool ok = qwaiting.sem.wait(remaining);
- sect.enter();
- if (!ok) {
- ok = qwaiting.sem.wait(0);
- if (!ok)
- waiting.zap(qwaiting);
- }
- return ok;
- }
- unsigned flush(CBufferQueueNotify &nfy)
- {
- unsigned count = 0;
- CriticalBlock block(sect);
- ForEachQueueItemInRev(i,received) {
- if (nfy.notify(received.item(i))) {
- count++;
- delete received.dequeue(i);
- }
- }
- return count;
- }
- void notifyClosed(SocketEndpoint &ep)
- {
- CriticalBlock block(sect);
- ForEachItemInRev(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- }
- }
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(sect);
- ForEachQueueItemIn(i,received) {
- received.item(i)->getDetails(buf).append('\n');
- }
- return buf;
- }
- };
- static UnsignedShortArray freetags;
- static unsigned nextfreetag=0;
- unsigned short generateDynamicTag()
- {
- if (freetags.ordinality())
- return freetags.pop();
- return nextfreetag++;
- }
- void releaseDynamicTag(unsigned short tag)
- {
- freetags.append(tag);
- }
- class CMPServer;
- class CMPChannel;
- class CMPConnectThread: public Thread
- {
- bool running;
- ISocket *listensock;
- CMPServer *parent;
- void checkSelfDestruct(void *p,size32_t sz);
- public:
- CMPConnectThread(CMPServer *_parent, unsigned port);
- ~CMPConnectThread()
- {
- ::Release(listensock);
- }
- int run();
- void start(unsigned short port);
- void stop()
- {
- if (running) {
- running = false;
- listensock->cancel_accept();
- if (!join(1000*60*5)) // should be pretty instant
- printf("CMPConnectThread::stop timed out\n");
- }
- }
- };
- class PingPacketHandler;
- class PingReplyPacketHandler;
- class MultiPacketHandler;
- class BroadcastPacketHandler;
- class ForwardPacketHandler;
- class UserPacketHandler;
- class CMPNotifyClosedThread;
- static class CMPServer: private SuperHashTableOf<CMPChannel,SocketEndpoint>
- {
- unsigned short port;
- ISocketSelectHandler *selecthandler;
- CMPConnectThread *connectthread;
- CBufferQueue receiveq;
- CMPNotifyClosedThread *notifyclosedthread;
- public:
- static CriticalSection serversect;
- static int servernest;
- static bool serverpaused;
- bool checkclosed;
- // packet handlers
- PingPacketHandler *pingpackethandler; // TAG_SYS_PING
- PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
- ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
- MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
- BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
- UserPacketHandler *userpackethandler; // default
- CMPServer(unsigned _port);
- ~CMPServer();
- void start();
- void stop();
- unsigned short getPort() { return port; }
- void setPort(unsigned short _port) { port = _port; }
- CMPChannel &lookup(const SocketEndpoint &remoteep);
- ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
- CBufferQueue &getReceiveQ() { return receiveq; }
- bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
- void flush(mptag_t tag);
- unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
- void cancel(const SocketEndpoint *ep, mptag_t tag);
- bool nextChannel(CMPChannel *&c);
- void addConnectionMonitor(IConnectionMonitor *monitor);
- void removeConnectionMonitor(IConnectionMonitor *monitor);
- void notifyClosed(SocketEndpoint &ep);
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- return receiveq.getReceiveQueueDetails(buf);
- }
- void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
- protected:
- void onAdd(void *);
- void onRemove(void *e);
- unsigned getHashFromElement(const void *e) const;
- unsigned getHashFromFindParam(const void *fp) const;
- const void * getFindParam(const void *p) const;
- bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
- } *MPserver=NULL;
- int CMPServer::servernest=0;
- bool CMPServer::serverpaused=false;
- CriticalSection CMPServer::serversect;
- byte RTsalt=0xff;
- mptag_t createReplyTag()
- {
- // these are short-lived so a simple increment will do (I think this is OK!)
- mptag_t ret;
- {
- static CriticalSection sect;
- CriticalBlock block(sect);
- static int rettag=(int)TAG_REPLY_BASE; // NB negative
- if (RTsalt==0xff) {
- RTsalt = (byte)(getRandom()%16);
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- if (rettag>(int)TAG_REPLY_BASE) { // wrapped
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- ret = (mptag_t)rettag;
- rettag -= 16;
- }
- if (MPserver)
- MPserver->flush(ret);
- return ret;
- }
- void checkTagOK(mptag_t tag)
- {
- if ((int)tag<=(int)TAG_REPLY_BASE) {
- int dif = (int)TAG_REPLY_BASE-(int)tag;
- if (dif%16!=RTsalt) {
- ERRLOG("**Invalid MP tag used");
- PrintStackReport();
- }
- }
- }
- //===========================================================================
- class CMPNotifyClosedThread: public Thread
- {
- IArrayOf<IConnectionMonitor> connectionmonitors;
- CriticalSection conmonsect;
- SimpleInterThreadQueueOf<INode, false> workq;
- bool stopping;
- CMPServer *parent;
- CriticalSection stopsect;
- public:
- CMPNotifyClosedThread(CMPServer *_parent)
- : Thread("CMPNotifyClosedThread")
- {
- parent = _parent;
- stopping = false;
- }
- ~CMPNotifyClosedThread()
- {
- IArrayOf<IConnectionMonitor> todelete;
- CriticalBlock block(conmonsect);
- while (connectionmonitors.ordinality())
- todelete.append(connectionmonitors.popGet());
- }
- void addConnectionMonitor(IConnectionMonitor *monitor)
- {
- if (monitor)
- connectionmonitors.append(*LINK(monitor));
- }
- void removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- if (monitor) {
- CriticalBlock block(conmonsect);
- connectionmonitors.zap(*monitor);
- }
- }
- int run()
- {
- loop {
- try {
- Owned<INode> node = workq.dequeue();
- if (node->endpoint().isNull())
- break;
- SocketEndpoint ep = node->endpoint();
- parent->getReceiveQ().notifyClosed(ep);
- IArrayOf<IConnectionMonitor> toclose;
- {
- CriticalBlock block(conmonsect);
- ForEachItemIn(i1,connectionmonitors) {
- toclose.append(*LINK(&connectionmonitors.item(i1)));
- }
- }
- ForEachItemIn(i,toclose) {
- toclose.item(i).onClose(ep);
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- e->Release();
- }
- }
- return 0;
- }
- void stop()
- {
- {
- CriticalBlock block(stopsect);
- if (!stopping) {
- stopping = true;
- SocketEndpoint ep;
- workq.enqueue(createINode(ep));
- }
- }
- while (!join(1000*60*3))
- PROGLOG("CMPNotifyClosedThread join failed");
- }
- void notify(SocketEndpoint &ep)
- {
- CriticalBlock block(stopsect);
- if (!stopping&&!ep.isNull()) {
- if (workq.ordinality()>100)
- PROGLOG("MP: %d waiting to close",workq.ordinality());
- workq.enqueue(createINode(ep));
- }
- }
- };
- class CMPPacketReader;
- class CMPChannel: public CInterface
- {
- ISocket *channelsock;
- CMPServer *parent;
- Mutex sendmutex;
- Semaphore sendwaitingsig;
- unsigned sendwaiting; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
- CriticalSection connectsect;
- CMPPacketReader *reader;
- bool master; // i.e. connected originally
- mptag_t multitag; // current multi send in progress
- bool closed;
- IArrayOf<ISocket> keptsockets;
- protected: friend class CMPServer;
- SocketEndpoint remoteep;
- SocketEndpoint localep; // who the other end thinks I am
- protected: friend class CMPPacketReader;
- unsigned lastxfer;
- #ifdef _FULLTRACE
- unsigned startxfer;
- unsigned numiter;
- #endif
- bool connect(CTimeMon &tm)
- {
- // must be called from connectsect
- // also in sendmutex
- ISocket *newsock=NULL;
- unsigned retrycount = 10;
- unsigned remaining;
- while (!channelsock) {
- try {
- StringBuffer str;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).toCharArray());
- #endif
- if (((int)tm.timeout)<0)
- remaining = CONNECT_TIMEOUT;
- else if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- PROGLOG("MP: connect timed out 1");
- #endif
- return false;
- }
- if (remaining<10000)
- remaining = 10000; // 10s min granularity for MP
- newsock = ISocket::connect_timeout(remoteep,remaining);
- newsock->set_keep_alive(true);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect");
- #endif
- SocketEndpointV4 id[2];
- SocketEndpoint hostep;
- hostep.setLocalHost(parent->getPort());
- id[0].set(hostep);
- id[1].set(remoteep);
- newsock->write(&id[0],sizeof(id));
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- id[0].getUrlStr(tmp1);
- tmp1.append(' ');
- id[1].getUrlStr(tmp1);
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
- #endif
- size32_t reply;
- size32_t rd;
- newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_READ_TIMEOUT);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
- #endif
- if (reply!=0) {
- assertex(reply==sizeof(id)); // how can this fail?
- if (attachSocket(newsock,remoteep,hostep,true, NULL)) {
- newsock->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.toCharArray());
- #endif
- lastxfer = msTick();
- closed = false;
- break;
- }
- }
- }
- catch (IException *e)
- {
- if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 2");
- #endif
- e->Release();
- return false;
- }
- StringBuffer str;
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- e->Release();
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
- IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
- throw e;
- }
- #ifdef _TRACE
- str.clear();
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).toCharArray(),retrycount+1);
- #endif
- }
- ::Release(newsock);
- newsock = NULL;
- {
- CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
- #ifdef _FULLTRACE
- PROGLOG("MP: before sleep");
- #endif
- Sleep(2000+getRandom()%3000);
- #ifdef _FULLTRACE
- PROGLOG("MP: after sleep");
- #endif
- }
- }
- return true;
- }
- public:
- Semaphore pingsem;
- CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
- ~CMPChannel();
- void reset();
- bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm);
- bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- Linked<ISocket> dest;
- {
- CriticalBlock block(connectsect);
- if (closed) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
- throw e;
- }
- if (!channelsock) {
- if (!connect(tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
- #endif
- return false;
- }
- }
- dest.set(channelsock);
- }
- try {
- #ifdef _FULLTRACE
- unsigned t1 = msTick();
- #endif
- if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
- // if (tm.timeout!=MP_ASYNC_SEND) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (channelsock->wait_write(remaining)==0) {
- return false;
- }
- if (tm.timedout())
- return false;
- }
- // exception checking TBD
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
- unsigned t2 = msTick();
- #endif
- unsigned n = 0;
- const void *bufs[3];
- size32_t sizes[3];
- if (hdrsize) {
- bufs[n] = hdr;
- sizes[n++] = hdrsize;
- }
- if (hdr2size) {
- bufs[n] = hdr2;
- sizes[n++] = hdr2size;
- }
- if (bodysize) {
- bufs[n] = body;
- sizes[n++] = bodysize;
- }
- if (!dest) {
- LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
- return false;
- }
- dest->write_multiple(n,bufs,sizes);
- lastxfer = msTick();
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
- #endif
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- closeSocket();
- throw;
- }
- return true;
- }
- bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
- }
- bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
- }
- bool sendPing(CTimeMon &tm);
- bool sendPingReply(unsigned timeout,bool identifyself);
- bool verifyConnection(CTimeMon &tm,bool allowconnect)
- {
- {
- CriticalBlock block(connectsect);
- if (!channelsock&&allowconnect)
- return connect(tm);
- if (closed||!channelsock)
- return false;
- if ((msTick()-lastxfer)<VERIFY_DELAY)
- return true;
- }
- StringBuffer ep;
- remoteep.getUrlStr(ep);
- loop {
- CTimeMon pingtm(1000*60);
- if (sendPing(pingtm))
- break;
- {
- CriticalBlock block(connectsect);
- if (closed||!channelsock)
- return false;
- }
- if (tm.timedout()) {
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
- closeSocket();
- return false;
- }
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
- unsigned remaining;
- if (!pingtm.timedout(&remaining)&&remaining)
- Sleep(remaining);
- }
- return true;
- }
- bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
- void closeSocket(bool keepsocket=false)
- {
- ISocket *s;
- bool socketfailed = false;
- {
- CriticalBlock block(connectsect);
- if (!channelsock)
- return;
- lastxfer = msTick();
- closed = true;
- if (parent)
- parent->checkclosed = true;
- s=channelsock;
- channelsock = NULL;
- if (!keepsocket) {
- try {
- s->shutdown();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- }
- parent->querySelectHandler().remove(s);
- }
- parent->notifyClosed(remoteep);
- if (socketfailed) {
- try {
- s->Release();
- }
- catch (IException *) {
- // ignore
- }
- }
- else if (keepsocket) {
- // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
- if (keptsockets.ordinality()>10)
- keptsockets.remove(0);
- keptsockets.append(*s);
- }
- else {
- try {
- s->close();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- s->Release();
- }
- }
- CMPServer &queryServer() { return *parent; }
- void monitorCheck();
- StringBuffer & queryEpStr(StringBuffer &s)
- {
- return remoteep.getUrlStr(s);
- }
- bool isClosed()
- {
- return closed;
- }
- bool isConnected()
- {
- return !closed&&(channelsock!=NULL);
- }
- };
- // Message Handlers (not done as interfaces for speed reasons
- class UserPacketHandler // default
- {
- CMPServer *server;
- public:
- UserPacketHandler(CMPServer *_server)
- {
- server = _server;
- }
- void handle(CMessageBuffer *msg) // takes ownership of message buffer
- {
- server->getReceiveQ().enqueue(msg);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
- {
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class PingPacketHandler // TAG_SYS_PING
- {
- public:
- void handle(CMPChannel *channel,bool identifyself)
- {
- channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),tm);
- }
- };
- class PingReplyPacketHandler // TAG_SYS_PING_REPLY
- {
- public:
- void handle(CMPChannel *channel)
- {
- channel->pingsem.signal();
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class CMultiPacketReceiver: public CInterface
- { // assume each sender only sends one multi-message per channel
- public:
- SocketEndpoint sender;
- MultiPacketHeader info;
- CMessageBuffer *msg;
- byte * ptr;
- };
- class MultiPacketHandler // TAG_SYS_MULTI
- {
- CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
- CriticalSection sect;
- unsigned lastErrMs;
- void logError(unsigned code, MultiPacketHeader &mhdr, CMessageBuffer &msg, MultiPacketHeader *otherMhdr)
- {
- unsigned ms = msTick();
- if ((ms-lastErrMs) > 1000) // avoid logging too much
- {
- StringBuffer errorMsg("sender=");
- msg.getSender().getUrlStr(errorMsg).newline();
- msg.append("This header: ");
- mhdr.getDetails(errorMsg).newline();
- if (otherMhdr)
- {
- msg.append("Other header: ");
- otherMhdr->getDetails(errorMsg).newline();
- }
- msg.getDetails(errorMsg);
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (%d) %s", code, errorMsg.str());
- }
- lastErrMs = ms;
- }
- public:
- MultiPacketHandler() : lastErrMs(0)
- {
- }
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- if (!msg)
- return NULL;
- CriticalBlock block(sect);
- MultiPacketHeader mhdr;
- msg->read(sizeof(mhdr),&mhdr);
- CMultiPacketReceiver *recv=NULL;
- ForEachItemIn(i,inprogress) {
- CMultiPacketReceiver &mpr = inprogress.item(i);
- if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
- recv = &mpr;
- break;
- }
- }
- if (mhdr.idx==0) {
- if ((mhdr.ofs!=0)||(recv!=NULL)) {
- logError(1, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- recv = new CMultiPacketReceiver;
- recv->msg = new CMessageBuffer();
- recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
- recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
- recv->sender = msg->getSender();
- recv->info = mhdr;
- inprogress.append(*recv);
- }
- else {
- if ((recv==NULL)||(mhdr.ofs==0)||
- (recv->info.ofs+recv->info.size!=mhdr.ofs)||
- (recv->info.idx+1!=mhdr.idx)||
- (recv->info.total!=mhdr.total)||
- (mhdr.ofs+mhdr.size>mhdr.total)) {
- logError(2, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- }
- msg->read(mhdr.size,recv->ptr+mhdr.ofs);
- delete msg;
- msg = NULL;
- recv->info = mhdr;
- if (mhdr.idx+1==mhdr.numparts) {
- if (mhdr.ofs+mhdr.size!=mhdr.total) {
- logError(3, mhdr, *msg, NULL);
- return NULL;
- }
- msg = recv->msg;
- inprogress.remove(i);
- }
- return msg;
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
- {
- // must not adjust mb
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- PacketHeader outhdr;
- outhdr = hdr;
- outhdr.tag = TAG_SYS_MULTI;
- MultiPacketHeader mhdr;
- mhdr.total = hdr.size-sizeof(hdr);
- mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
- mhdr.size = mhdr.total/mhdr.numparts;
- mhdr.tag = hdr.tag;
- mhdr.ofs = 0;
- mhdr.idx = 0;
- const byte *p = (const byte *)mb.toByteArray();
- unsigned i=0;
- loop {
- if (i+1==mhdr.numparts)
- mhdr.size = mhdr.total-mhdr.ofs;
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
- #endif
- outhdr.initseq();
- outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
- if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
- #endif
- return false;
- }
- i++;
- if (i==mhdr.numparts)
- break;
- sendmutex.unlock(); // allow other messages to interleave
- sendmutex.lock();
- mhdr.idx++;
- mhdr.ofs += mhdr.size;
- p += mhdr.size;
- }
- return true;
- }
- };
- class BroadcastPacketHandler // TAG_SYS_BCAST
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- class ForwardPacketHandler // TAG_SYS_FORWARD
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- // --------------------------------------------------------
- class CMPPacketReader: public CInterface, public ISocketSelectNotify
- {
- CMessageBuffer *activemsg;
- byte * activeptr;
- size32_t remaining;
- byte *dataptr;
- CMPChannel *parent;
- CriticalSection sect;
- public:
- IMPLEMENT_IINTERFACE;
- CMPPacketReader(CMPChannel *_parent)
- {
- init(_parent);
- }
- void init(CMPChannel *_parent)
- {
- parent = _parent;
- activemsg = NULL;
- }
- void shutdown()
- {
- CriticalBlock block(sect);
- parent = NULL;
- }
- bool notifySelected(ISocket *sock,unsigned selected)
- {
- if (!parent)
- return false;
- try {
- // try and mop up all data on socket
-
- size32_t sizeavail = sock->avail_read();
- if (sizeavail==0) {
- // graceful close
- Linked<CMPChannel> pc;
- {
- CriticalBlock block(sect);
- if (parent) {
- pc.set(parent); // don't want channel to disappear during call
- parent = NULL;
- }
- }
- if (pc)
- pc->closeSocket();
- return false;
- }
- do {
- parent->lastxfer = msTick();
- #ifdef _FULLTRACE
- parent->numiter++;
- #endif
- if (!activemsg) { // no message in progress
- PacketHeader hdr; // header for active message
- #ifdef _FULLTRACE
- parent->numiter = 1;
- parent->startxfer = msTick();
- #endif
- // assumes packet header will arrive in one go
- if (sizeavail<sizeof(hdr)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
- #endif
- size32_t szread;
- sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
- }
- else
- sock->read(&hdr,sizeof(hdr));
- if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
- // TBD IPV6 here
- SocketEndpoint ep;
- hdr.sender.get(ep);
- IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
- throw e;
- }
- if (sizeavail<=sizeof(hdr))
- sizeavail = sock->avail_read();
- else
- sizeavail -= sizeof(hdr);
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- remaining = hdr.size-sizeof(hdr);
- activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
- activeptr = (byte *)activemsg->reserveTruncate(remaining);
- hdr.setMessageFields(*activemsg);
- }
-
- size32_t toread = sizeavail;
- if (toread>remaining)
- toread = remaining;
- if (toread) {
- sock->read(activeptr,toread);
- remaining -= toread;
- sizeavail -= toread;
- activeptr += toread;
- }
- if (remaining==0) { // we have the packet so process
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
- #endif
- do {
- switch (activemsg->getTag()) {
- case TAG_SYS_MULTI:
- activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
- break;
- case TAG_SYS_PING:
- parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_PING_REPLY:
- parent->queryServer().pingreplypackethandler->handle(parent);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_BCAST:
- activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
- break;
- case TAG_SYS_FORWARD:
- activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
- break;
- default:
- parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
- activemsg = NULL;
- }
- } while (activemsg);
- }
- if (!sizeavail)
- sizeavail = sock->avail_read();
- } while (sizeavail);
- return false; // ok
- }
- catch (IException *e) {
- if (e->errorCode()!=JSOCKERR_graceful_close)
- FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
- e->Release();
- }
- // error here, so close socket (ignore error as may be closed already)
- try {
- if(parent)
- parent->closeSocket();
- }
- catch (IException *e) {
- e->Release();
- }
- parent = NULL;
- return false;
- }
- };
- CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep)
- {
- channelsock = NULL;
- parent = _parent;
- remoteep = _remoteep;
- localep.set(parent->getPort());
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- }
- void CMPChannel::reset()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- channelsock = NULL;
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- }
- CMPChannel::~CMPChannel()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- }
- bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm) // takes ownership if succeeds
- {
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket on entry");
- #endif
- CriticalBlock block(connectsect);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket got connectsect");
- #endif
- // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
- if (channelsock) {
- if (remoteep.port==0)
- return false;
- StringBuffer ep1;
- StringBuffer ep2;
- _localep.getUrlStr(ep1);
- remoteep.getUrlStr(ep2);
- LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
- try {
- if (ismaster!=master) {
- if (ismaster) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
- return false;
- }
- else {
- Sleep(50); // give the other side some time to close
- CTimeMon tm(10000);
- if (verifyConnection(tm,false)) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
- return false;
- }
- }
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
- e->Release();
- }
- try {
- LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
- CriticalUnblock unblock(connectsect);
- closeSocket(true);
- #ifdef REFUSE_STALE_CONNECTION
- if (!ismaster)
- return false;
- #endif
- Sleep(100); // pause to allow close socket triggers to run
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
- e->Release();
- }
- }
- if (confirm)
- newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
- closed = false;
- reader->init(this);
- channelsock = LINK(newsock);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket before select add");
- #endif
- parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket after select add");
- #endif
- localep = _localep;
- master = ismaster;
- return true;
- }
- bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
- {
- // note must not adjust mb
- assertex(tag!=TAG_NULL);
- assertex(tm.timeout);
- const byte *msg = (const byte *)mb.toByteArray();
- size32_t msgsize = mb.length();
- PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
- if (closed||(reply&&!isConnected())) { // flag error if has been disconnected
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
- throw e;
- }
- bool ismulti = (msgsize>MAXDATAPERPACKET);
- // pre-condition - ensure no clashes
- loop {
- sendmutex.lock();
- if (ismulti) {
- if (multitag==TAG_NULL) // don't want to interleave with other multi send
- break;
- }
- else if (multitag!=tag) // don't want to interleave with another of same tag
- break;
- sendwaiting++;
- sendmutex.unlock();
- sendwaitingsig.wait();
- }
- struct Cpostcondition // can we start using eiffel
- {
- Mutex &sendmutex;
- unsigned &sendwaiting;
- Semaphore &sendwaitingsig;
- mptag_t *multitag;
- Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
- : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
- {
- multitag = _multitag;
- }
- ~Cpostcondition()
- {
- if (multitag)
- *multitag = TAG_NULL;
- if (sendwaiting) {
- sendwaitingsig.signal(sendwaiting);
- sendwaiting = 0;
- }
- sendmutex.unlock();
- }
- } postcond(sendmutex,sendwaiting,sendwaitingsig,ismulti?&multitag:NULL);
- if (ismulti)
- return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
- return parent->userpackethandler->send(this,hdr,mb,tm);
- }
- bool CMPChannel::sendPing(CTimeMon &tm)
- {
- unsigned remaining;
- tm.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
- bool ret = false;
- try {
- ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
- e->Release();
- }
- sendmutex.unlock();
- if (ret)
- ret = pingsem.wait(remaining);
- return ret;
- }
- bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
- {
- CTimeMon mon(timeout);
- unsigned remaining;
- mon.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- MemoryBuffer mb;
- if (identifyself) {
- #ifdef _WIN32
- mb.append(GetCommandLine());
- #endif
- }
- PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
- bool ret;
- try {
- ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
- e->Release();
- ret = false;
- }
- sendmutex.unlock();
- return ret;
- }
-
- // --------------------------------------------------------
- CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
- : Thread("MP Connection Thread")
- {
- parent = _parent;
- if (!port)
- {
- // need to connect early to resolve clash
- Owned<IPropertyTree> env = getHPCCEnvironment();
- unsigned minPort, maxPort;
- if (env)
- {
- minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
- maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
- }
- else
- {
- minPort = MP_START_PORT;
- maxPort = MP_END_PORT;
- }
- assertex(maxPort >= minPort);
- Owned<IJSOCK_Exception> lastErr;
- unsigned numPorts = maxPort - minPort + 1;
- for (int retries = 0; retries < numPorts * 3; retries++)
- {
- port = minPort + getRandom() % numPorts;
- try
- {
- listensock = ISocket::create(port, 16); // better not to have *too* many waiting
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (e->errorCode()!=JSOCKERR_port_in_use)
- throw;
- lastErr.setown(e);
- }
- }
- if (!listensock)
- throw lastErr.getClear();
- }
- else
- listensock = NULL; // delay create till running
- parent->setPort(port);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
- #endif
- running = false;
- }
- void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
- {
- byte *b = (byte *)p;
- while (sz--)
- if (*(b++)!=0xff)
- return;
- // Panic!
- PROGLOG("MP Self destruct invoked");
- try {
- if (listensock) {
- listensock->close();
- listensock->Release();
- listensock=NULL;
- }
- }
- catch (...)
- {
- PROGLOG("MP socket close failure");
- }
- // Kill registered child processes
- PROGLOG("MP self destruct exit");
- queryLogMsgManager()->flushQueue(10*1000);
- #ifdef _WIN32
- ForEachItemIn(i,childprocesslist)
- TerminateProcess((HANDLE)childprocesslist.item(i), 1);
- TerminateProcess(GetCurrentProcess(), 1);
- #else
- ForEachItemIn(i,childprocesslist)
- ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
- ::kill(getpid(), SIGTERM);
- #endif
- _exit(1);
- }
- void CMPConnectThread::start(unsigned short port)
- {
- if (!listensock)
- listensock = ISocket::create(port,16);
- running = true;
- Thread::start();
- }
- int CMPConnectThread::run()
- {
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting");
- #endif
- while (running) {
- ISocket *sock=NULL;
- try {
- sock=listensock->accept(true);
- #ifdef _FULLTRACE
- StringBuffer s;
- SocketEndpoint ep1;
- sock->getPeerEndpoint(ep1);
- PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
- #endif
- }
- catch (IException *e)
- {
- LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
- throw; // error handling TBD
- }
- if (sock) {
- try {
- sock->set_keep_alive(true);
- size32_t rd;
- SocketEndpoint remoteep;
- SocketEndpoint hostep;
- SocketEndpointV4 id[2];
- sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT);
- if (rd != sizeof(id))
- {
- FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid number of connection bytes serialized");
- sock->close();
- continue;
- }
- id[0].get(remoteep);
- id[1].get(hostep);
- if (remoteep.isNull() || hostep.isNull())
- {
- // JCSMORE, I think remoteep really must/should match a IP of this local host
- FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid remote and/or host ep serialized");
- sock->close();
- continue;
- }
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- remoteep.getUrlStr(tmp1);
- tmp1.append(' ');
- hostep.getUrlStr(tmp1);
- PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
- #endif
- checkSelfDestruct(&id[0],sizeof(id));
- if (!parent->lookup(remoteep).attachSocket(sock,remoteep,hostep,false, &rd)) {
- #ifdef _FULLTRACE
- PROGLOG("MP Connect Thread: lookup failed");
- #endif
- }
- else {
- #ifdef _TRACE
- StringBuffer str1;
- StringBuffer str2;
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",remoteep.getUrlStr(str1).toCharArray());
- #endif
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: Connect Thread: after write");
- #endif
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
- sock->close();
- e->Release();
- }
- try {
- sock->Release();
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
- e->Release();
- }
- }
- else {
- if (running)
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
- }
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
- #endif
- return 0;
- }
- // --------------------------------------------------------
- class CMPChannelIterator
- {
- CMPServer &parent;
- CMPChannel *cur;
- public:
- CMPChannelIterator(CMPServer &_parent)
- : parent(_parent)
- {
- cur = NULL;
- }
- bool first()
- {
- cur = NULL;
- return parent.nextChannel(cur);
- }
- bool next()
- {
- return cur&&parent.nextChannel(cur);
- }
- bool isValid()
- {
- return cur!=NULL;
- }
- CMPChannel &query()
- {
- return *cur;
- }
- };
- //-----------------------------------------------------------------------------------
- CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
- {
- // there is an assumption here that no removes will be done within this loop
- CriticalBlock block(serversect);
- SocketEndpoint ep = endpoint;
- CMPChannel *e=find(ep);
- // Check for freed channels
- if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
- e->reset();
- if (checkclosed) {
- checkclosed = false;
- CMPChannel *c = NULL;
- loop {
- c = (CMPChannel *)next(c);
- if (!c) {
- break;
- }
- if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
- removeExact(c);
- c = NULL;
- }
- }
- e=find(ep);
- }
- if (!e) {
- e = new CMPChannel(this,ep);
- add(*e);
- }
- return *e;
- }
- CMPServer::CMPServer(unsigned _port)
- {
- port = 0; // connectthread tells me what port it actually connected on
- checkclosed = false;
- connectthread = new CMPConnectThread(this, _port);
- selecthandler = createSocketSelectHandler();
- pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
- pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
- forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
- multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
- broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
- userpackethandler = new UserPacketHandler(this); // default
- notifyclosedthread = new CMPNotifyClosedThread(this);
- notifyclosedthread->start();
- initMyNode(port); // NB port set by connectthread constructor
- selecthandler->start();
- }
- CMPServer::~CMPServer()
- {
- #ifdef _TRACEORPHANS
- StringBuffer buf;
- getReceiveQueueDetails(buf);
- if (buf.length())
- LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
- #endif
- releaseAll();
- selecthandler->Release();
- notifyclosedthread->stop();
- notifyclosedthread->Release();
- connectthread->Release();
-
- delete pingpackethandler;
- delete pingreplypackethandler;
- delete forwardpackethandler;
- delete multipackethandler;
- delete broadcastpackethandler;
- delete userpackethandler;
- }
- bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
- {
- checkTagOK(tag);
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- CMessageBuffer *result;
- const SocketEndpoint *ep;
- SocketEndpoint closedEp; // used if receiving on RANK_ALL
- mptag_t tag;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
- bool notify(CMessageBuffer *msg)
- {
- if ((tag==TAG_ALL)||(tag==msg->getTag())) {
- SocketEndpoint senderep = msg->getSender();
- if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
- if (msg->getReplyTag()==TAG_CANCEL)
- delete msg;
- else
- result = msg;
- return true;
- }
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &_closedEp) // called when connection closed
- {
- if (NULL == ep) { // ep is NULL if receiving on RANK_ALL
- closedEp = _closedEp;
- ep = &closedEp; // used for abort info
- aborted = true;
- return true;
- }
- else if (ep->equals(_closedEp)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag);
- if (receiveq.wait(nfy,false,tm)&&nfy.result) {
- mbuf.transferFrom(*nfy.result);
- delete nfy.result;
- return true;
- }
- if (nfy.aborted) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
- throw e;
- }
- return false;
- }
- void CMPServer::flush(mptag_t tag)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- mptag_t tag;
- Cnfy(mptag_t _tag) { tag = _tag; }
- bool notify(CMessageBuffer *msg)
- {
- return (tag==TAG_ALL)||(tag==msg->getTag());
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- return false;
- }
- } nfy(tag);
- unsigned count = receiveq.flush(nfy);
- if (count)
- PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
- }
- void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
- {
- CMessageBuffer *cancelmsg = new CMessageBuffer(0);
- SocketEndpoint send;
- if (ep)
- send = *ep;
- cancelmsg->init(send,tag,TAG_CANCEL);
- getReceiveQ().enqueue(cancelmsg);
- }
- unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- SocketEndpoint &sender;
- const SocketEndpoint *ep;
- mptag_t tag;
- bool cancel;
- unsigned count;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
- {
- ep = _ep;
- tag = _tag;
- cancel = false;
- aborted = false;
- count = 0;
- }
- bool notify(CMessageBuffer *msg)
- {
- if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
- ((ep==NULL)||ep->equals(msg->getSender()))) {
- if (count==0) {
- sender = msg->getSender();
- cancel = (msg->getReplyTag()==TAG_CANCEL);
- }
- count++;
- return true;
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- if (ep&&ep->equals(closedep)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag,sender);
- if (receiveq.wait(nfy,true,tm)) {
- return nfy.cancel?0:nfy.count;
- }
- if (nfy.aborted) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
- throw e;
- }
- return 0;
- }
- void CMPServer::start()
- {
- connectthread->start(getPort());
- }
- void CMPServer::stop()
- {
- selecthandler->stop(true);
- connectthread->stop();
- CMPChannel *c = NULL;
- loop {
- c = (CMPChannel *)next(c);
- if (!c)
- break;
- c->closeSocket();
- }
- }
- void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->addConnectionMonitor(monitor);
- }
- void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->removeConnectionMonitor(monitor);
- }
- void CMPServer::onAdd(void *)
- {
- // not used
- }
- void CMPServer::onRemove(void *e)
- {
- CMPChannel &elem=*(CMPChannel *)e;
- elem.Release();
- }
- unsigned CMPServer::getHashFromElement(const void *e) const
- {
- const CMPChannel &elem=*(const CMPChannel *)e;
- return elem.remoteep.hash(0);
- }
- unsigned CMPServer::getHashFromFindParam(const void *fp) const
- {
- return ((const SocketEndpoint*)fp)->hash(0);
- }
- const void * CMPServer::getFindParam(const void *p) const
- {
- const CMPChannel &elem=*(const CMPChannel *)p;
- return &elem.remoteep;
- }
- bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
- }
- bool CMPServer::nextChannel(CMPChannel *&cur)
- {
- CriticalBlock block(serversect);
- cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
- return cur!=NULL;
- }
- void CMPServer::notifyClosed(SocketEndpoint &ep)
- {
- #ifdef _TRACE
- StringBuffer url;
- LOG(MCdebugInfo(100), unknownJob, "CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
- #endif
- notifyclosedthread->notify(ep);
- }
- // --------------------------------------------------------
- class CInterCommunicator: public CInterface, public IInterCommunicator
- {
- CMPServer *parent;
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (!dst)
- return false;
- size32_t msgsize = mbuf.length();
- if (dst->equals(queryMyNode())) {
- CMessageBuffer *msg = new CMessageBuffer();
- mptag_t reply = mbuf.getReplyTag();
- msg->transferFrom(mbuf);
- msg->init(dst->endpoint(),tag,reply);
- parent->getReceiveQ().enqueue(msg);
- mbuf.clear(); // for consistent semantics
- return true;
- }
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(dst->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- mbuf.clear(); // for consistent semantics
- return true;
- }
- bool verifyConnection(INode *node, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(node->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel.verifyConnection(tm,true);
- }
- void verifyAll(StringBuffer &log)
- {
- CMPChannelIterator iter(*parent);
- if (iter.first()) {
- do {
- CriticalBlock block(verifysect);
- CTimeMon tm(5000);
- CMPChannel &channel = iter.query();
- if (!channel.isClosed()) {
- channel.queryEpStr(log).append(' ');
- if (channel.verifyConnection(tm,false))
- log.append("OK\n");
- else
- log.append("FAILED\n");
- }
- }
- while (iter.next());
- }
- }
- bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank();
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (myrank!=rank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel.verifyConnection(tm,true)) {
- return false;
- }
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
- while (!channel.verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
- {
- if (sender)
- *sender = NULL;
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = createINode(res);
- return ret;
- }
- return 0;
- }
-
- bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (sender)
- *sender = NULL;
- CTimeMon tm(timeout);
- loop
- {
- try
- {
- if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
- {
- if (sender)
- *sender = createINode(mbuf.getSender());
- return true;
- }
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (src && (ep == src->endpoint()))
- throw;
- StringBuffer epStr;
- ep.getUrlStr(epStr);
- FLLOG(MCoperatorWarning, unknownJob, "CInterCommunicator: ignoring closed endpoint: %s", epStr.str());
- e->Release();
- // loop around and recv again
- }
- }
- }
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(dst);
- mptag_t replytag = createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,dst,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
- {
- Owned<INode> dst(createINode(mbuff.getSender()));
- return send(mbuff,dst,mbuff.getReplyTag(),timeout);
- }
- void cancel(INode *src, mptag_t tag)
- {
- parent->cancel(src?&src->endpoint():NULL,tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- CMPChannel &channel = parent->lookup(node->endpoint());
- channel.closeSocket();
- parent->removeChannel(&channel);
- }
- CInterCommunicator(CMPServer *_parent)
- {
- parent = _parent;
- }
- ~CInterCommunicator()
- {
- }
- };
- class CCommunicator: public CInterface, public ICommunicator
- {
- IGroup *group;
- CMPServer *parent;
- bool outer;
- const SocketEndpoint &queryEndpoint(rank_t rank)
- {
- return group->queryNode(rank).endpoint();
- }
- CMPChannel &queryChannel(rank_t rank)
- {
- return parent->lookup(queryEndpoint(rank));
- }
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
- {
- // send does not corrupt mbuf
- if (dstrank==RANK_NULL)
- return false;
- rank_t myrank = group->rank();
- if (dstrank==myrank) {
- CMessageBuffer *msg = mbuf.clone();
- // change sender
- msg->init(queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
- parent->getReceiveQ().enqueue(msg);
- }
- else {
- CTimeMon tm(timeout);
- rank_t endrank;
- if (dstrank==RANK_ALL) {
- send(mbuf,myrank,tag,timeout);
- dstrank = RANK_ALL_OTHER;
- }
- if (dstrank==RANK_ALL_OTHER) {
- dstrank = 0;
- endrank = group->ordinality()-1;
- }
- else if (dstrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- dstrank = getRandom()%group->ordinality();
- } while (dstrank==myrank);
- }
- else {
- assertex(myrank!=0);
- dstrank = 0;
- }
- endrank = dstrank;
- }
- else
- endrank = dstrank;
- for (;dstrank<=endrank;dstrank++) {
- if (dstrank!=myrank) {
- CMPChannel &channel = queryChannel(dstrank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- }
- }
- }
- return true;
- }
- bool verifyConnection(rank_t rank, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- assertex(rank!=RANK_RANDOM);
- assertex(rank!=RANK_ALL);
- CTimeMon tm(timeout);
- CMPChannel &channel = queryChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel.verifyConnection(tm,true);
- }
- bool verifyAll(bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank();
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (rank!=myrank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- CMPChannel &channel = queryChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel.verifyConnection(tm,true))
- return false;
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- CMPChannel &channel = queryChannel(rank);
- while (!channel.verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
- {
- assertex(srcrank!=RANK_NULL);
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = group->rank(res);
- return ret;
- }
- if (sender)
- *sender = RANK_NULL;
- return 0;
- }
- bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(srcrank!=RANK_NULL);
- const SocketEndpoint *srcep=NULL;
- if (srcrank==RANK_ALL) {
- if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
- srcep = &queryEndpoint(0);
- }
- else
- srcep = &queryEndpoint(srcrank);
- CTimeMon tm(timeout);
- loop
- {
- try
- {
- if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
- {
- if (sender)
- *sender = group->rank(mbuf.getSender());
- return true;
- }
- if (sender)
- *sender = RANK_NULL;
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (RANK_NULL != group->rank(ep))
- throw;
- StringBuffer epStr;
- ep.getUrlStr(epStr);
- FLLOG(MCoperatorWarning, unknownJob, "CCommunicator: ignoring closed endpoint from outside the communicator group: %s", epStr.str());
- e->Release();
- // loop around and recv again
- }
- }
- }
-
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- IGroup &queryGroup() { return *group; }
- IGroup *getGroup() { return LINK(group); }
- bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
- if (sendrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- sendrank = getRandom()%group->ordinality();
- } while (sendrank==group->rank());
- }
- else {
- assertex(group->rank()!=0);
- sendrank = 0;
- }
- }
- mptag_t replytag = createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,sendrank,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
- {
- mptag_t replytag = mbuf.getReplyTag();
- rank_t dstrank = group->rank(mbuf.getSender());
- if (dstrank!=RANK_NULL) {
- if (send (mbuf, dstrank, replytag,timeout)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
-
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(mbuf.getSender());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (channel.send(mbuf,replytag,TAG_NULL,tm, true)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
- void cancel(rank_t srcrank, mptag_t tag)
- {
- assertex(srcrank!=RANK_NULL);
- parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- CMPChannel &channel = parent->lookup(node->endpoint());
- channel.closeSocket();
- parent->removeChannel(&channel);
- }
- CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
- {
- outer = _outer;
- parent = _parent;
- group = LINK(_group);
- }
- ~CCommunicator()
- {
- group->Release();
- }
- };
- ICommunicator *createCommunicator(IGroup *group,bool outer)
- {
- assertex(MPserver!=NULL);
- return new CCommunicator(MPserver,group,outer);
- }
- static IInterCommunicator *worldcomm=NULL;
- IInterCommunicator &queryWorldCommunicator()
- {
- CriticalBlock block(CMPServer::serversect);
- assertex(MPserver!=NULL);
- if (!worldcomm)
- worldcomm = new CInterCommunicator(MPserver);
- return *worldcomm;
- }
- void startMPServer(unsigned port, bool paused)
- {
- assertex(sizeof(PacketHeader)==32);
- CriticalBlock block(CMPServer::serversect);
- if (CMPServer::servernest==0)
- {
- if (!CMPServer::serverpaused)
- {
- delete MPserver;
- MPserver = new CMPServer(port);
- }
- if (paused)
- {
- CMPServer::serverpaused = true;
- return;
- }
- queryLogMsgManager()->setPort(MPserver->getPort());
- MPserver->start();
- CMPServer::serverpaused = false;
- }
- CMPServer::servernest++;
- }
- void stopMPServer()
- {
- CriticalBlock block(CMPServer::serversect);
- if (--CMPServer::servernest==0) {
- stopLogMsgReceivers();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "Stopping MP Server");
- #endif
- CriticalUnblock unblock(CMPServer::serversect);
- assertex(MPserver!=NULL);
- MPserver->stop();
- delete MPserver;
- MPserver = NULL;
- ::Release(worldcomm);
- worldcomm = NULL;
- initMyNode(0);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "Stopped MP Server");
- #endif
- }
- }
- extern mp_decl void addMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CMPServer::serversect);
- assertex(MPserver);
- MPserver->addConnectionMonitor(monitor);
- }
- extern mp_decl void removeMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CMPServer::serversect);
- if (MPserver)
- MPserver->removeConnectionMonitor(monitor);
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(CMPServer::serversect);
- if (MPserver)
- MPserver->getReceiveQueueDetails(buf);
- return buf;
- }
- void registerSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.append((unsigned)handle);
- }
- void unregisterSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.zap((unsigned)handle);
- }
|