12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #define mp_decl __declspec(dllexport)
- /* TBD
- lost packet disposal
- synchronous send
- connection protocol (HRPC);
- look at all timeouts
- */
- #include "platform.h"
- #include "jlib.hpp"
- #include <limits.h>
- #include "jsocket.hpp"
- #include "jmutex.hpp"
- #include "jutil.hpp"
- #include "jthread.hpp"
- #include "jqueue.tpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "mpcomm.hpp"
- #include "mpbuff.hpp"
- #include "mputil.hpp"
- #include "mplog.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- //#define _TRACE
- //#define _FULLTRACE
- #define _TRACEORPHANS
- #define REFUSE_STALE_CONNECTION
- #define MP_PROTOCOL_VERSION 0x102
- #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
- extern void initMyNode(unsigned short port=0);
- #define CANCELTIMEOUT 1000 // 1 sec
- #define MP_BASE_PORT 7101 //..7999 (for Dali too) -- Thor reserves 7500+
- #define MP_PORT_RANGE 400 //
- #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
- #define CONNECT_READ_TIMEOUT (3*60*1000) // 3 min
- #define CONFIRM_TIMEOUT (CONNECT_READ_TIMEOUT/2)
- #define VERIFY_DELAY (1*60*1000) // 1 Minute
- #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
- #define _TRACING
- static CriticalSection verifysect;
- static CriticalSection childprocesssect;
- static UnsignedArray childprocesslist;
- // IPv6 TBD
- struct SocketEndpointV4
- {
- byte ip[4];
- unsigned short port;
- SocketEndpointV4() {};
- SocketEndpointV4(const SocketEndpoint &val) { set(val); }
- void set(const SocketEndpoint &val)
- {
- port = val.port;
- if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
- IPV6_NOT_IMPLEMENTED();
- }
- void get(SocketEndpoint &val)
- {
- val.setNetAddress(sizeof(ip),&ip);
- val.port = port;
- }
- };
- class PacketHeader // standard packet header - no virtuals
- {
- public:
- static unsigned nextseq;
- static unsigned lasttick;
- void initseq()
- {
- sequence = msTick();
- lasttick = sequence;
- if (sequence-nextseq>USHRT_MAX)
- sequence = nextseq++;
- else
- nextseq = sequence+1;
- }
- PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
- {
- size = _size;
- tag = _tag;
- sender.set(_sender);
- target.set(_target);
- replytag = _replytag;
- flags = 0;
- version = MP_PROTOCOL_VERSION;
- initseq();
- }
- PacketHeader() {}
- size32_t size; // 0 total packet size
- mptag_t tag; // 4 packet tag (kind)
- unsigned short version; // 8 protocol version
- unsigned short flags; // 10 flags
- SocketEndpointV4 sender; // 12 who sent
- SocketEndpointV4 target; // 18 who destined for
- mptag_t replytag; // 24 used for reply
- unsigned sequence; // 28 packet type dependant
- // Total 32
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- sender.get(ep);
- mb.init(ep,tag,replytag);
- }
- };
- class PacketHeaderV6 : public PacketHeader
- {
- unsigned senderex[4]; // 32
- unsigned targetex[4]; // 48
- // total 64
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- ep.setNetAddress(sizeof(senderex),&senderex);
- ep.port = sender.port;
- mb.init(ep,tag,replytag);
- }
- };
-
- unsigned PacketHeader::nextseq=0;
- unsigned PacketHeader::lasttick=0;
- #define MINIMUMPACKETSIZE sizeof(PacketHeader)
- #define MAXDATAPERPACKET 50000
- struct MultiPacketHeader
- {
- mptag_t tag;
- size32_t ofs;
- size32_t size;
- unsigned idx;
- unsigned numparts;
- size32_t total; //
- };
- //
- class CMPException: public CInterface, public IMP_Exception
- {
- public:
- IMPLEMENT_IINTERFACE;
- CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
- {
- }
- CMPException(MessagePassingError err) : error(err)
- {
- }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- StringBuffer tmp;
- switch (error) {
- case MPERR_ok: str.append("OK"); break;
- case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
- case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- case MPERR_link_closed: str.appendf("MP link closed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
- }
- return str;
- }
- int errorCode() const { return error; }
- MessageAudience errorAudience() const
- {
- return MSGAUD_user;
- }
- private:
- MessagePassingError error;
- SocketEndpoint endpoint;
- };
- class CBufferQueueNotify
- {
- public:
- virtual bool notify(CMessageBuffer *)=0;
- virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
- };
- class CBufferQueueWaiting
- {
- public:
- enum QWenum { QWcontinue, QWdequeue, QWprobe };
- Semaphore sem;
- CBufferQueueNotify &waiting;
- bool probe;
- CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
- QWenum notify(CMessageBuffer *b)
- {
- // check this for DLL unloaded TBD
- if (waiting.notify(b)) {
- sem.signal();
- return probe?QWprobe:QWdequeue;
- }
- return QWcontinue;
- }
- QWenum notifyClosed(SocketEndpoint &ep)
- {
- // check this for DLL unloaded TBD
- if (waiting.notifyClosed(ep)) {
- sem.signal();
- return QWdequeue;
- }
- return QWcontinue;
- }
- };
- MAKEPointerArray(CBufferQueueWaiting,CWaitingArray);
- class CBufferQueue
- {
- QueueOf<CMessageBuffer, false> received;
- CWaitingArray waiting;
- CriticalSection sect;
- public:
- CBufferQueue()
- {
- }
- void enqueue(CMessageBuffer *b)
- {
- CriticalBlock block(sect);
- unsigned iter=0;
- loop {
- ForEachItemIn(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- if (r==CBufferQueueWaiting::QWdequeue)
- return;
- //CBufferQueueWaiting::QWprobe
- break;
- }
- }
- if (b->getReplyTag() != TAG_CANCEL)
- break;
- if (iter++==10) {
- delete b;
- return;
- }
- CriticalUnblock unblock(sect);
- Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
- }
- received.enqueue(b);
- }
- bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
- {
- CriticalBlock block(sect);
- bool probegot = false;
- ForEachQueueItemIn(i,received) {
- if (nfy.notify(received.item(i))) {
- if (probe) {
- probegot = true;
- }
- else {
- received.dequeue(i);
- return true;
- }
- }
- }
- if (probegot)
- return true;
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CBufferQueueWaiting qwaiting(nfy,probe);
- waiting.append(qwaiting);
- sect.leave();
- bool ok = qwaiting.sem.wait(remaining);
- sect.enter();
- if (!ok) {
- ok = qwaiting.sem.wait(0);
- if (!ok)
- waiting.zap(qwaiting);
- }
- return ok;
- }
- unsigned flush(CBufferQueueNotify &nfy)
- {
- bool count = 0;
- CriticalBlock block(sect);
- ForEachQueueItemInRev(i,received) {
- if (nfy.notify(received.item(i))) {
- count++;
- delete received.dequeue(i);
- }
- }
- return count;
- }
- void notifyClosed(SocketEndpoint &ep)
- {
- CriticalBlock block(sect);
- ForEachItemInRev(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- }
- }
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(sect);
- ForEachQueueItemIn(i,received) {
- received.item(i)->getDetails(buf).append('\n');
- }
- return buf;
- }
- };
- static UnsignedShortArray freetags;
- static unsigned nextfreetag=0;
- unsigned short generateDynamicTag()
- {
- if (freetags.ordinality())
- return freetags.pop();
- return nextfreetag++;
- }
- void releaseDynamicTag(unsigned short tag)
- {
- freetags.append(tag);
- }
- class CMPServer;
- class CMPChannel;
- class CMPConnectThread: public Thread
- {
- bool running;
- ISocket *listensock;
- CMPServer *parent;
- void checkSelfDestruct(void *p,size32_t sz);
- public:
- CMPConnectThread(CMPServer *_parent,unsigned short port);
- ~CMPConnectThread()
- {
- ::Release(listensock);
- }
- int run();
- void start(unsigned short port);
- void stop()
- {
- if (running) {
- running = false;
- listensock->cancel_accept();
- if (!join(1000*60*5)) // should be pretty instant
- printf("CMPConnectThread::stop timed out\n");
- }
- }
- };
- class PingPacketHandler;
- class PingReplyPacketHandler;
- class MultiPacketHandler;
- class BroadcastPacketHandler;
- class ForwardPacketHandler;
- class UserPacketHandler;
- class CMPNotifyClosedThread;
- static class CMPServer: private SuperHashTableOf<CMPChannel,SocketEndpoint>
- {
- unsigned short port;
- ISocketSelectHandler *selecthandler;
- CMPConnectThread *connectthread;
- CBufferQueue receiveq;
- CMPNotifyClosedThread *notifyclosedthread;
- public:
- static CriticalSection serversect;
- static int servernest;
- static bool serverpaused;
- bool checkclosed;
- // packet handlers
- PingPacketHandler *pingpackethandler; // TAG_SYS_PING
- PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
- ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
- MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
- BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
- UserPacketHandler *userpackethandler; // default
- CMPServer(unsigned short port);
- ~CMPServer();
- void start();
- void stop();
- unsigned short getPort() { return port; }
- void setPort(unsigned short _port) { port = _port; }
- CMPChannel &lookup(const SocketEndpoint &remoteep);
- ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
- CBufferQueue &getReceiveQ() { return receiveq; }
- bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
- void flush(mptag_t tag);
- unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
- void cancel(const SocketEndpoint *ep, mptag_t tag);
- bool nextChannel(CMPChannel *&c);
- void addConnectionMonitor(IConnectionMonitor *monitor);
- void removeConnectionMonitor(IConnectionMonitor *monitor);
- void notifyClosed(SocketEndpoint &ep);
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- return receiveq.getReceiveQueueDetails(buf);
- }
- void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
- protected:
- void onAdd(void *);
- void onRemove(void *e);
- unsigned getHashFromElement(const void *e) const;
- unsigned getHashFromFindParam(const void *fp) const;
- const void * getFindParam(const void *p) const;
- bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
- } *MPserver=NULL;
- int CMPServer::servernest=0;
- bool CMPServer::serverpaused=false;
- CriticalSection CMPServer::serversect;
- byte RTsalt=0xff;
- mptag_t createReplyTag()
- {
- // these are short-lived so a simple increment will do (I think this is OK!)
- mptag_t ret;
- {
- static CriticalSection sect;
- CriticalBlock block(sect);
- static int rettag=(int)TAG_REPLY_BASE; // NB negative
- if (RTsalt==0xff) {
- RTsalt = (byte)(getRandom()%16);
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- if (rettag>(int)TAG_REPLY_BASE) { // wrapped
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- ret = (mptag_t)rettag;
- rettag -= 16;
- }
- if (MPserver)
- MPserver->flush(ret);
- return ret;
- }
- void checkTagOK(mptag_t tag)
- {
- if ((int)tag<=(int)TAG_REPLY_BASE) {
- int dif = (int)TAG_REPLY_BASE-(int)tag;
- if (dif%16!=RTsalt) {
- ERRLOG("**Invalid MP tag used");
- PrintStackReport();
- }
- }
- }
- //===========================================================================
- class CMPNotifyClosedThread: public Thread
- {
- IArrayOf<IConnectionMonitor> connectionmonitors;
- CriticalSection conmonsect;
- SimpleInterThreadQueueOf<INode, false> workq;
- bool stopping;
- CMPServer *parent;
- CriticalSection stopsect;
- public:
- CMPNotifyClosedThread(CMPServer *_parent)
- : Thread("CMPNotifyClosedThread")
- {
- parent = _parent;
- stopping = false;
- }
- ~CMPNotifyClosedThread()
- {
- IArrayOf<IConnectionMonitor> todelete;
- CriticalBlock block(conmonsect);
- while (connectionmonitors.ordinality())
- todelete.append(connectionmonitors.popGet());
- }
- void addConnectionMonitor(IConnectionMonitor *monitor)
- {
- if (monitor)
- connectionmonitors.append(*LINK(monitor));
- }
- void removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- if (monitor) {
- CriticalBlock block(conmonsect);
- connectionmonitors.zap(*monitor);
- }
- }
- int run()
- {
- loop {
- try {
- Owned<INode> node = workq.dequeue();
- if (node->endpoint().isNull())
- break;
- SocketEndpoint ep = node->endpoint();
- parent->getReceiveQ().notifyClosed(ep);
- IArrayOf<IConnectionMonitor> toclose;
- {
- CriticalBlock block(conmonsect);
- ForEachItemIn(i1,connectionmonitors) {
- toclose.append(*LINK(&connectionmonitors.item(i1)));
- }
- }
- ForEachItemIn(i,toclose) {
- toclose.item(i).onClose(ep);
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- e->Release();
- }
- }
- return 0;
- }
- void stop()
- {
- {
- CriticalBlock block(stopsect);
- if (!stopping) {
- stopping = true;
- SocketEndpoint ep;
- workq.enqueue(createINode(ep));
- }
- }
- while (!join(1000*60*3))
- PROGLOG("CMPNotifyClosedThread join failed");
- }
- void notify(SocketEndpoint &ep)
- {
- CriticalBlock block(stopsect);
- if (!stopping&&!ep.isNull()) {
- if (workq.ordinality()>100)
- PROGLOG("MP: %d waiting to close",workq.ordinality());
- workq.enqueue(createINode(ep));
- }
- }
- };
- class CMPPacketReader;
- class CMPChannel: public CInterface
- {
- ISocket *channelsock;
- CMPServer *parent;
- Mutex sendmutex;
- Semaphore sendwaitingsig;
- unsigned sendwaiting; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
- CriticalSection connectsect;
- CMPPacketReader *reader;
- bool master; // i.e. connected originally
- mptag_t multitag; // current multi send in progress
- bool closed;
- IArrayOf<ISocket> keptsockets;
- protected: friend class CMPServer;
- SocketEndpoint remoteep;
- SocketEndpoint localep; // who the other end thinks I am
- protected: friend class CMPPacketReader;
- unsigned lastxfer;
- #ifdef _FULLTRACE
- unsigned startxfer;
- unsigned numiter;
- #endif
- bool connect(CTimeMon &tm)
- {
- // must be called from connectsect
- // also in sendmutex
- ISocket *newsock=NULL;
- unsigned retrycount = 10;
- unsigned remaining;
- while (!channelsock) {
- try {
- StringBuffer str;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).toCharArray());
- #endif
- if (((int)tm.timeout)<0)
- remaining = CONNECT_TIMEOUT;
- else if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- PROGLOG("MP: connect timed out 1");
- #endif
- return false;
- }
- if (remaining<10000)
- remaining = 10000; // 10s min granularity for MP
- newsock = ISocket::connect_timeout(remoteep,remaining);
- newsock->set_keep_alive(true);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect");
- #endif
- SocketEndpointV4 id[2];
- SocketEndpoint hostep;
- hostep.setLocalHost(parent->getPort());
- id[0].set(hostep);
- id[1].set(remoteep);
- newsock->write(&id[0],sizeof(id));
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- id[0].getUrlStr(tmp1);
- tmp1.append(' ');
- id[1].getUrlStr(tmp1);
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
- #endif
- size32_t reply;
- size32_t rd;
- newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_READ_TIMEOUT);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
- #endif
- if (reply!=0) {
- assertex(reply==sizeof(id)); // how can this fail?
- if (attachSocket(newsock,remoteep,hostep,true, NULL)) {
- newsock->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.toCharArray());
- #endif
- lastxfer = msTick();
- closed = false;
- break;
- }
- }
- }
- catch (IException *e)
- {
- if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 2");
- #endif
- e->Release();
- return false;
- }
- StringBuffer str;
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- e->Release();
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
- IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
- throw e;
- }
- #ifdef _TRACE
- str.clear();
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).toCharArray(),retrycount+1);
- #endif
- }
- ::Release(newsock);
- newsock = NULL;
- {
- CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
- #ifdef _FULLTRACE
- PROGLOG("MP: before sleep");
- #endif
- Sleep(2000+getRandom()%3000);
- #ifdef _FULLTRACE
- PROGLOG("MP: after sleep");
- #endif
- }
- }
- return true;
- }
- public:
- Semaphore pingsem;
- CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
- ~CMPChannel();
- void reset();
- bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm);
- bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- Linked<ISocket> dest;
- {
- CriticalBlock block(connectsect);
- if (closed) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
- throw e;
- }
- if (!channelsock) {
- if (!connect(tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
- #endif
- return false;
- }
- }
- dest.set(channelsock);
- }
- try {
- #ifdef _FULLTRACE
- unsigned t1 = msTick();
- #endif
- if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
- // if (tm.timeout!=MP_ASYNC_SEND) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (channelsock->wait_write(remaining)==0) {
- return false;
- }
- if (tm.timedout())
- return false;
- }
- // exception checking TBD
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
- unsigned t2 = msTick();
- #endif
- unsigned n = 0;
- const void *bufs[3];
- size32_t sizes[3];
- if (hdrsize) {
- bufs[n] = hdr;
- sizes[n++] = hdrsize;
- }
- if (hdr2size) {
- bufs[n] = hdr2;
- sizes[n++] = hdr2size;
- }
- if (bodysize) {
- bufs[n] = body;
- sizes[n++] = bodysize;
- }
- if (!dest) {
- LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
- return false;
- }
- dest->write_multiple(n,bufs,sizes);
- lastxfer = msTick();
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
- #endif
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- closeSocket();
- throw;
- }
- return true;
- }
- bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
- }
- bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
- }
- bool sendPing(CTimeMon &tm);
- bool sendPingReply(unsigned timeout,bool identifyself);
- bool verifyConnection(CTimeMon &tm,bool allowconnect)
- {
- {
- CriticalBlock block(connectsect);
- if (!channelsock&&allowconnect)
- return connect(tm);
- if (closed||!channelsock)
- return false;
- if ((msTick()-lastxfer)<VERIFY_DELAY)
- return true;
- }
- StringBuffer ep;
- remoteep.getUrlStr(ep);
- loop {
- CTimeMon pingtm(1000*60);
- if (sendPing(pingtm))
- break;
- {
- CriticalBlock block(connectsect);
- if (closed||!channelsock)
- return false;
- }
- if (tm.timedout()) {
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
- closeSocket();
- return false;
- }
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
- unsigned remaining;
- if (!pingtm.timedout(&remaining)&&remaining)
- Sleep(remaining);
- }
- return true;
- }
- bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
- void closeSocket(bool keepsocket=false)
- {
- ISocket *s;
- bool socketfailed = false;
- {
- CriticalBlock block(connectsect);
- if (!channelsock)
- return;
- lastxfer = msTick();
- closed = true;
- if (parent)
- parent->checkclosed = true;
- s=channelsock;
- channelsock = NULL;
- if (!keepsocket) {
- try {
- s->shutdown();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- }
- parent->querySelectHandler().remove(s);
- }
- parent->notifyClosed(remoteep);
- if (socketfailed) {
- try {
- s->Release();
- }
- catch (IException *) {
- // ignore
- }
- }
- else if (keepsocket) {
- // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
- if (keptsockets.ordinality()>10)
- keptsockets.remove(0);
- keptsockets.append(*s);
- }
- else {
- try {
- s->close();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- s->Release();
- }
- }
- CMPServer &queryServer() { return *parent; }
- void monitorCheck();
- StringBuffer & queryEpStr(StringBuffer &s)
- {
- return remoteep.getUrlStr(s);
- }
- bool isClosed()
- {
- return closed;
- }
- bool isConnected()
- {
- return !closed&&(channelsock!=NULL);
- }
- };
- // Message Handlers (not done as interfaces for speed reasons
- class UserPacketHandler // default
- {
- CMPServer *server;
- public:
- UserPacketHandler(CMPServer *_server)
- {
- server = _server;
- }
- void handle(CMessageBuffer *msg) // takes ownership of message buffer
- {
- server->getReceiveQ().enqueue(msg);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
- {
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class PingPacketHandler // TAG_SYS_PING
- {
- public:
- void handle(CMPChannel *channel,bool identifyself)
- {
- channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),tm);
- }
- };
- class PingReplyPacketHandler // TAG_SYS_PING_REPLY
- {
- public:
- void handle(CMPChannel *channel)
- {
- channel->pingsem.signal();
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class CMultiPacketReceiver: public CInterface
- { // assume each sender only sends one multi-message per channel
- public:
- SocketEndpoint sender;
- MultiPacketHeader info;
- CMessageBuffer *msg;
- byte * ptr;
- };
- class MultiPacketHandler // TAG_SYS_MULTI
- {
- CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
- CriticalSection sect;
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- if (!msg)
- return NULL;
- CriticalBlock block(sect);
- MultiPacketHeader mhdr;
- msg->read(sizeof(mhdr),&mhdr);
- CMultiPacketReceiver *recv=NULL;
- ForEachItemIn(i,inprogress) {
- CMultiPacketReceiver &mpr = inprogress.item(i);
- if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
- recv = &mpr;
- break;
- }
- }
- if (mhdr.idx==0) {
- if ((mhdr.ofs!=0)||(recv!=NULL)) {
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (1)");
- return NULL;
- }
- recv = new CMultiPacketReceiver;
- recv->msg = new CMessageBuffer();
- recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
- recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
- recv->sender = msg->getSender();
- recv->info = mhdr;
- inprogress.append(*recv);
- }
- else {
- if ((recv==NULL)||(mhdr.ofs==0)||
- (recv->info.ofs+recv->info.size!=mhdr.ofs)||
- (recv->info.idx+1!=mhdr.idx)||
- (recv->info.total!=mhdr.total)||
- (mhdr.ofs+mhdr.size>mhdr.total)) {
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (2)");
- delete msg;
- return NULL;
- }
- }
- msg->read(mhdr.size,recv->ptr+mhdr.ofs);
- delete msg;
- msg = NULL;
- recv->info = mhdr;
- if (mhdr.idx+1==mhdr.numparts) {
- if (mhdr.ofs+mhdr.size!=mhdr.total) {
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (3)");
- return NULL;
- }
- msg = recv->msg;
- inprogress.remove(i);
- }
- return msg;
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
- {
- // must not adjust mb
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- PacketHeader outhdr;
- outhdr = hdr;
- outhdr.tag = TAG_SYS_MULTI;
- MultiPacketHeader mhdr;
- mhdr.total = hdr.size-sizeof(hdr);
- mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
- mhdr.size = mhdr.total/mhdr.numparts;
- mhdr.tag = hdr.tag;
- mhdr.ofs = 0;
- mhdr.idx = 0;
- const byte *p = (const byte *)mb.toByteArray();
- unsigned i=0;
- loop {
- if (i+1==mhdr.numparts)
- mhdr.size = mhdr.total-mhdr.ofs;
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
- #endif
- outhdr.initseq();
- outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
- if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
- #endif
- return false;
- }
- i++;
- if (i==mhdr.numparts)
- break;
- sendmutex.unlock(); // allow other messages to interleave
- sendmutex.lock();
- mhdr.idx++;
- mhdr.ofs += mhdr.size;
- p += mhdr.size;
- }
- return true;
- }
- };
- class BroadcastPacketHandler // TAG_SYS_BCAST
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- class ForwardPacketHandler // TAG_SYS_FORWARD
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- // --------------------------------------------------------
- class CMPPacketReader: public CInterface, public ISocketSelectNotify
- {
- CMessageBuffer *activemsg;
- byte * activeptr;
- size32_t remaining;
- byte *dataptr;
- CMPChannel *parent;
- CriticalSection sect;
- public:
- IMPLEMENT_IINTERFACE;
- CMPPacketReader(CMPChannel *_parent)
- {
- init(_parent);
- }
- void init(CMPChannel *_parent)
- {
- parent = _parent;
- activemsg = NULL;
- }
- void shutdown()
- {
- CriticalBlock block(sect);
- parent = NULL;
- }
- bool notifySelected(ISocket *sock,unsigned selected)
- {
- if (!parent)
- return false;
- try {
- // try and mop up all data on socket
-
- size32_t sizeavail = sock->avail_read();
- if (sizeavail==0) {
- // graceful close
- Linked<CMPChannel> pc;
- {
- CriticalBlock block(sect);
- if (parent) {
- pc.set(parent); // don't want channel to disappear during call
- parent = NULL;
- }
- }
- if (pc)
- pc->closeSocket();
- return false;
- }
- do {
- parent->lastxfer = msTick();
- #ifdef _FULLTRACE
- parent->numiter++;
- #endif
- if (!activemsg) { // no message in progress
- PacketHeader hdr; // header for active message
- #ifdef _FULLTRACE
- parent->numiter = 1;
- parent->startxfer = msTick();
- #endif
- // assumes packet header will arrive in one go
- if (sizeavail<sizeof(hdr)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
- #endif
- size32_t szread;
- sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
- }
- else
- sock->read(&hdr,sizeof(hdr));
- if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
- // TBD IPV6 here
- SocketEndpoint ep;
- hdr.sender.get(ep);
- IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
- throw e;
- }
- if (sizeavail<=sizeof(hdr))
- sizeavail = sock->avail_read();
- else
- sizeavail -= sizeof(hdr);
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- remaining = hdr.size-sizeof(hdr);
- activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
- activeptr = (byte *)activemsg->reserveTruncate(remaining);
- hdr.setMessageFields(*activemsg);
- }
-
- size32_t toread = sizeavail;
- if (toread>remaining)
- toread = remaining;
- if (toread) {
- sock->read(activeptr,toread);
- remaining -= toread;
- sizeavail -= toread;
- activeptr += toread;
- }
- if (remaining==0) { // we have the packet so process
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
- #endif
- do {
- switch (activemsg->getTag()) {
- case TAG_SYS_MULTI:
- activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
- break;
- case TAG_SYS_PING:
- parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_PING_REPLY:
- parent->queryServer().pingreplypackethandler->handle(parent);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_BCAST:
- activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
- break;
- case TAG_SYS_FORWARD:
- activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
- break;
- default:
- parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
- activemsg = NULL;
- }
- } while (activemsg);
- }
- if (!sizeavail)
- sizeavail = sock->avail_read();
- } while (sizeavail);
- return false; // ok
- }
- catch (IException *e) {
- if (e->errorCode()!=JSOCKERR_graceful_close)
- FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
- e->Release();
- }
- // error here, so close socket (ignore error as may be closed already)
- try {
- if(parent)
- parent->closeSocket();
- }
- catch (IException *e) {
- e->Release();
- }
- parent = NULL;
- return false;
- }
- };
- CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep)
- {
- channelsock = NULL;
- parent = _parent;
- remoteep = _remoteep;
- localep.set(parent->getPort());
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- }
- void CMPChannel::reset()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- channelsock = NULL;
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- }
- CMPChannel::~CMPChannel()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- }
- bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm) // takes ownership if succeeds
- {
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket on entry");
- #endif
- CriticalBlock block(connectsect);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket got connectsect");
- #endif
- // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
- if (channelsock) {
- if (remoteep.port==0)
- return false;
- StringBuffer ep1;
- StringBuffer ep2;
- _localep.getUrlStr(ep1);
- remoteep.getUrlStr(ep2);
- LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
- try {
- if (ismaster!=master) {
- if (ismaster) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
- return false;
- }
- else {
- Sleep(50); // give the other side some time to close
- CTimeMon tm(10000);
- if (verifyConnection(tm,false)) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
- return false;
- }
- }
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
- e->Release();
- }
- try {
- LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
- CriticalUnblock unblock(connectsect);
- closeSocket(true);
- #ifdef REFUSE_STALE_CONNECTION
- if (!ismaster)
- return false;
- #endif
- Sleep(100); // pause to allow close socket triggers to run
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
- e->Release();
- }
- }
- if (confirm)
- newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
- closed = false;
- reader->init(this);
- channelsock = LINK(newsock);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket before select add");
- #endif
- parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket after select add");
- #endif
- localep = _localep;
- master = ismaster;
- return true;
- }
- bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
- {
- // note must not adjust mb
- assertex(tag!=TAG_NULL);
- assertex(tm.timeout);
- const byte *msg = (const byte *)mb.toByteArray();
- size32_t msgsize = mb.length();
- PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
- if (closed||(reply&&!isConnected())) { // flag error if has been disconnected
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
- throw e;
- }
- bool ismulti = (msgsize>MAXDATAPERPACKET);
- // pre-condition - ensure no clashes
- loop {
- sendmutex.lock();
- if (ismulti) {
- if (multitag==TAG_NULL) // don't want to interleave with other multi send
- break;
- }
- else if (multitag!=tag) // don't want to interleave with another of same tag
- break;
- sendwaiting++;
- sendmutex.unlock();
- sendwaitingsig.wait();
- }
- struct Cpostcondition // can we start using eiffel
- {
- Mutex &sendmutex;
- unsigned &sendwaiting;
- Semaphore &sendwaitingsig;
- mptag_t *multitag;
- Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
- : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
- {
- multitag = _multitag;
- }
- ~Cpostcondition()
- {
- if (multitag)
- *multitag = TAG_NULL;
- if (sendwaiting) {
- sendwaitingsig.signal(sendwaiting);
- sendwaiting = 0;
- }
- sendmutex.unlock();
- }
- } postcond(sendmutex,sendwaiting,sendwaitingsig,ismulti?&multitag:NULL);
- if (ismulti)
- return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
- return parent->userpackethandler->send(this,hdr,mb,tm);
- }
- bool CMPChannel::sendPing(CTimeMon &tm)
- {
- unsigned remaining;
- tm.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
- bool ret = false;
- try {
- ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
- e->Release();
- }
- sendmutex.unlock();
- if (ret)
- ret = pingsem.wait(remaining);
- return ret;
- }
- bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
- {
- CTimeMon mon(timeout);
- unsigned remaining;
- mon.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- MemoryBuffer mb;
- if (identifyself) {
- #ifdef _WIN32
- mb.append(GetCommandLine());
- #endif
- }
- PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
- bool ret;
- try {
- ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
- e->Release();
- ret = false;
- }
- sendmutex.unlock();
- return ret;
- }
-
- // --------------------------------------------------------
- CMPConnectThread::CMPConnectThread(CMPServer *_parent,unsigned short port)
- : Thread("MP Connection Thread")
- {
- parent = _parent;
- if (port==0) { // need to connect early to resolve clash
- loop {
- port = MP_BASE_PORT+getRandom()%MP_PORT_RANGE;
- try {
- listensock = ISocket::create(port,16); // better not to have *too* many waiting
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (e->errorCode()!=JSOCKERR_port_in_use)
- throw;
- e->Release();
- }
- }
- }
- else
- listensock = NULL; // delay create till running
- parent->setPort(port);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d",port);
- #endif
- running = false;
- }
- void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
- {
- byte *b = (byte *)p;
- while (sz--)
- if (*(b++)!=0xff)
- return;
- // Panic!
- PROGLOG("MP Self destruct invoked");
- try {
- if (listensock) {
- listensock->close();
- listensock->Release();
- listensock=NULL;
- }
- }
- catch (...)
- {
- PROGLOG("MP socket close failure");
- }
- // Kill registered child processes
- PROGLOG("MP self destruct exit");
- queryLogMsgManager()->flushQueue(10*1000);
- #ifdef _WIN32
- ForEachItemIn(i,childprocesslist)
- TerminateProcess((HANDLE)childprocesslist.item(i), 1);
- TerminateProcess(GetCurrentProcess(), 1);
- #else
- ForEachItemIn(i,childprocesslist)
- ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
- ::kill(getpid(), SIGTERM);
- #endif
- _exit(1);
- }
- void CMPConnectThread::start(unsigned short port)
- {
- if (!listensock)
- listensock = ISocket::create(port,16);
- running = true;
- Thread::start();
- }
- int CMPConnectThread::run()
- {
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting");
- #endif
- while (running) {
- ISocket *sock=NULL;
- try {
- sock=listensock->accept(true);
- #ifdef _FULLTRACE
- StringBuffer s;
- SocketEndpoint ep1;
- sock->getPeerEndpoint(ep1);
- PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
- #endif
- }
- catch (IException *e)
- {
- LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
- throw; // error handling TBD
- }
- if (sock) {
- try {
- sock->set_keep_alive(true);
- size32_t rd;
- SocketEndpoint remoteep;
- SocketEndpoint hostep;
- SocketEndpointV4 id[2];
- sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT);
- id[0].get(remoteep);
- id[1].get(hostep);
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- remoteep.getUrlStr(tmp1);
- tmp1.append(' ');
- hostep.getUrlStr(tmp1);
- PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
- #endif
- checkSelfDestruct(&id[0],sizeof(id));
- if (!parent->lookup(remoteep).attachSocket(sock,remoteep,hostep,false, &rd)) {
- #ifdef _FULLTRACE
- PROGLOG("MP Connect Thread: lookup failed");
- #endif
- }
- else {
- #ifdef _TRACE
- StringBuffer str1;
- StringBuffer str2;
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",remoteep.getUrlStr(str1).toCharArray());
- #endif
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: Connect Thread: after write");
- #endif
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
- sock->close();
- e->Release();
- }
- try {
- sock->Release();
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
- e->Release();
- }
- }
- else {
- if (running)
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
- }
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
- #endif
- return 0;
- }
- // --------------------------------------------------------
- class CMPChannelIterator
- {
- CMPServer &parent;
- CMPChannel *cur;
- public:
- CMPChannelIterator(CMPServer &_parent)
- : parent(_parent)
- {
- cur = NULL;
- }
- bool first()
- {
- cur = NULL;
- return parent.nextChannel(cur);
- }
- bool next()
- {
- return cur&&parent.nextChannel(cur);
- }
- bool isValid()
- {
- return cur!=NULL;
- }
- CMPChannel &query()
- {
- return *cur;
- }
- };
- //-----------------------------------------------------------------------------------
- CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
- {
- // there is an assumption here that no removes will be done within this loop
- CriticalBlock block(serversect);
- SocketEndpoint ep = endpoint;
- CMPChannel *e=find(ep);
- // Check for freed channels
- if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
- e->reset();
- if (checkclosed) {
- checkclosed = false;
- CMPChannel *c = NULL;
- loop {
- c = (CMPChannel *)next(c);
- if (!c) {
- break;
- }
- if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
- removeExact(c);
- c = NULL;
- }
- }
- e=find(ep);
- }
- if (!e) {
- e = new CMPChannel(this,ep);
- add(*e);
- }
- return *e;
- }
- CMPServer::CMPServer(unsigned short _port)
- {
- checkclosed = false;
- connectthread = new CMPConnectThread(this,_port);
- selecthandler = createSocketSelectHandler();
- pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
- pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
- forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
- multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
- broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
- userpackethandler = new UserPacketHandler(this); // default
- notifyclosedthread = new CMPNotifyClosedThread(this);
- notifyclosedthread->start();
- initMyNode(port); // NB port set by connectthread constructor
- selecthandler->start();
- }
- CMPServer::~CMPServer()
- {
- #ifdef _TRACEORPHANS
- StringBuffer buf;
- getReceiveQueueDetails(buf);
- if (buf.length())
- LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
- #endif
- releaseAll();
- selecthandler->Release();
- notifyclosedthread->stop();
- notifyclosedthread->Release();
- connectthread->Release();
-
- delete pingpackethandler;
- delete pingreplypackethandler;
- delete forwardpackethandler;
- delete multipackethandler;
- delete broadcastpackethandler;
- delete userpackethandler;
- }
- bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
- {
- checkTagOK(tag);
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- CMessageBuffer *result;
- const SocketEndpoint *ep;
- mptag_t tag;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
- bool notify(CMessageBuffer *msg)
- {
- if ((tag==TAG_ALL)||(tag==msg->getTag())) {
- SocketEndpoint senderep = msg->getSender();
- if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
- if (msg->getReplyTag()==TAG_CANCEL)
- delete msg;
- else
- result = msg;
- return true;
- }
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- if (ep&&ep->equals(closedep)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag);
- if (receiveq.wait(nfy,false,tm)&&nfy.result) {
- mbuf.transferFrom(*nfy.result);
- delete nfy.result;
- return true;
- }
- if (nfy.aborted) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
- throw e;
- }
- return false;
- }
- void CMPServer::flush(mptag_t tag)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- mptag_t tag;
- Cnfy(mptag_t _tag) { tag = _tag; }
- bool notify(CMessageBuffer *msg)
- {
- return (tag==TAG_ALL)||(tag==msg->getTag());
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- return false;
- }
- } nfy(tag);
- unsigned count = receiveq.flush(nfy);
- if (count)
- PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
- }
- void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
- {
- CMessageBuffer *cancelmsg = new CMessageBuffer(0);
- SocketEndpoint send;
- if (ep)
- send = *ep;
- cancelmsg->init(send,tag,TAG_CANCEL);
- getReceiveQ().enqueue(cancelmsg);
- }
- unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- SocketEndpoint &sender;
- const SocketEndpoint *ep;
- mptag_t tag;
- bool cancel;
- unsigned count;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
- {
- ep = _ep;
- tag = _tag;
- cancel = false;
- aborted = false;
- count = 0;
- }
- bool notify(CMessageBuffer *msg)
- {
- if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
- ((ep==NULL)||ep->equals(msg->getSender()))) {
- if (count==0) {
- sender = msg->getSender();
- cancel = (msg->getReplyTag()==TAG_CANCEL);
- }
- count++;
- return true;
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- if (ep&&ep->equals(closedep)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag,sender);
- if (receiveq.wait(nfy,true,tm)) {
- return nfy.cancel?0:nfy.count;
- }
- if (nfy.aborted) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
- throw e;
- }
- return 0;
- }
- void CMPServer::start()
- {
- connectthread->start(getPort());
- }
- void CMPServer::stop()
- {
- selecthandler->stop(true);
- connectthread->stop();
- CMPChannel *c = NULL;
- loop {
- c = (CMPChannel *)next(c);
- if (!c)
- break;
- c->closeSocket();
- }
- }
- void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->addConnectionMonitor(monitor);
- }
- void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->removeConnectionMonitor(monitor);
- }
- void CMPServer::onAdd(void *)
- {
- // not used
- }
- void CMPServer::onRemove(void *e)
- {
- CMPChannel &elem=*(CMPChannel *)e;
- elem.Release();
- }
- unsigned CMPServer::getHashFromElement(const void *e) const
- {
- const CMPChannel &elem=*(const CMPChannel *)e;
- return elem.remoteep.hash(0);
- }
- unsigned CMPServer::getHashFromFindParam(const void *fp) const
- {
- return ((const SocketEndpoint*)fp)->hash(0);
- }
- const void * CMPServer::getFindParam(const void *p) const
- {
- const CMPChannel &elem=*(const CMPChannel *)p;
- return &elem.remoteep;
- }
- bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
- }
- bool CMPServer::nextChannel(CMPChannel *&cur)
- {
- CriticalBlock block(serversect);
- cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
- return cur!=NULL;
- }
- void CMPServer::notifyClosed(SocketEndpoint &ep)
- {
- #ifdef _TRACE
- StringBuffer url;
- LOG(MCdebugInfo(100), unknownJob, "CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
- #endif
- notifyclosedthread->notify(ep);
- }
- // --------------------------------------------------------
- class CInterCommunicator: public CInterface, public IInterCommunicator
- {
- CMPServer *parent;
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (!dst)
- return false;
- size32_t msgsize = mbuf.length();
- if (dst->equals(queryMyNode())) {
- CMessageBuffer *msg = new CMessageBuffer();
- mptag_t reply = mbuf.getReplyTag();
- msg->transferFrom(mbuf);
- msg->init(dst->endpoint(),tag,reply);
- parent->getReceiveQ().enqueue(msg);
- mbuf.clear(); // for consistent semantics
- return true;
- }
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(dst->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- mbuf.clear(); // for consistent semantics
- return true;
- }
- bool verifyConnection(INode *node, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(node->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel.verifyConnection(tm,true);
- }
- void verifyAll(StringBuffer &log)
- {
- CMPChannelIterator iter(*parent);
- if (iter.first()) {
- do {
- CriticalBlock block(verifysect);
- CTimeMon tm(5000);
- CMPChannel &channel = iter.query();
- if (!channel.isClosed()) {
- channel.queryEpStr(log).append(' ');
- if (channel.verifyConnection(tm,false))
- log.append("OK\n");
- else
- log.append("FAILED\n");
- }
- }
- while (iter.next());
- }
- }
- bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank();
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (myrank!=rank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel.verifyConnection(tm,true)) {
- return false;
- }
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
- while (!channel.verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
- {
- if (sender)
- *sender = NULL;
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = createINode(res);
- return ret;
- }
- return 0;
- }
-
- bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (sender)
- *sender = NULL;
- CTimeMon tm(timeout);
- if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm)) {
- if (sender)
- *sender = createINode(mbuf.getSender());
- return true;
- }
- return false;
- }
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(dst);
- mptag_t replytag = createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,dst,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
- {
- Owned<INode> dst(createINode(mbuff.getSender()));
- return send(mbuff,dst,mbuff.getReplyTag(),timeout);
- }
- void cancel(INode *src, mptag_t tag)
- {
- parent->cancel(src?&src->endpoint():NULL,tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- CMPChannel &channel = parent->lookup(node->endpoint());
- channel.closeSocket();
- parent->removeChannel(&channel);
- }
- CInterCommunicator(CMPServer *_parent)
- {
- parent = _parent;
- }
- ~CInterCommunicator()
- {
- }
- };
- class CCommunicator: public CInterface, public ICommunicator
- {
- IGroup *group;
- CMPServer *parent;
- bool outer;
- const SocketEndpoint &queryEndpoint(rank_t rank)
- {
- return group->queryNode(rank).endpoint();
- }
- CMPChannel &queryChannel(rank_t rank)
- {
- return parent->lookup(queryEndpoint(rank));
- }
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
- {
- // send does not corrupt mbuf
- if (dstrank==RANK_NULL)
- return false;
- rank_t myrank = group->rank();
- if (dstrank==myrank) {
- CMessageBuffer *msg = mbuf.clone();
- // change sender
- msg->init(queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
- parent->getReceiveQ().enqueue(msg);
- }
- else {
- CTimeMon tm(timeout);
- rank_t endrank;
- if (dstrank==RANK_ALL) {
- send(mbuf,myrank,tag,timeout);
- dstrank = RANK_ALL_OTHER;
- }
- if (dstrank==RANK_ALL_OTHER) {
- dstrank = 0;
- endrank = group->ordinality()-1;
- }
- else if (dstrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- dstrank = getRandom()%group->ordinality();
- } while (dstrank==myrank);
- }
- else {
- assertex(myrank!=0);
- dstrank = 0;
- }
- endrank = dstrank;
- }
- else
- endrank = dstrank;
- for (;dstrank<=endrank;dstrank++) {
- if (dstrank!=myrank) {
- CMPChannel &channel = queryChannel(dstrank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- }
- }
- }
- return true;
- }
- bool verifyConnection(rank_t rank, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- assertex(rank!=RANK_RANDOM);
- assertex(rank!=RANK_ALL);
- CTimeMon tm(timeout);
- CMPChannel &channel = queryChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel.verifyConnection(tm,true);
- }
- bool verifyAll(bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank();
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (rank!=myrank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- CMPChannel &channel = queryChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel.verifyConnection(tm,true))
- return false;
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- CMPChannel &channel = queryChannel(rank);
- while (!channel.verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
- {
- assertex(srcrank!=RANK_NULL);
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = group->rank(res);
- return ret;
- }
- if (sender)
- *sender = RANK_NULL;
- return 0;
- }
- bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(srcrank!=RANK_NULL);
- const SocketEndpoint *srcep=NULL;
- if (srcrank==RANK_ALL) {
- if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
- srcep = &queryEndpoint(0);
- }
- else
- srcep = &queryEndpoint(srcrank);
- CTimeMon tm(timeout);
- if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm)) {
- if (sender)
- *sender = group->rank(mbuf.getSender());
- return true;
- }
- if (sender)
- *sender = RANK_NULL;
- return false;
- }
-
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- IGroup &queryGroup() { return *group; }
- IGroup *getGroup() { return LINK(group); }
- bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
- if (sendrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- sendrank = getRandom()%group->ordinality();
- } while (sendrank==group->rank());
- }
- else {
- assertex(group->rank()!=0);
- sendrank = 0;
- }
- }
- mptag_t replytag = createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,sendrank,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
- {
- mptag_t replytag = mbuf.getReplyTag();
- rank_t dstrank = group->rank(mbuf.getSender());
- if (dstrank!=RANK_NULL) {
- if (send (mbuf, dstrank, replytag,timeout)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
-
- CTimeMon tm(timeout);
- CMPChannel &channel = parent->lookup(mbuf.getSender());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (channel.send(mbuf,replytag,TAG_NULL,tm, true)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
- void cancel(rank_t srcrank, mptag_t tag)
- {
- assertex(srcrank!=RANK_NULL);
- parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- CMPChannel &channel = parent->lookup(node->endpoint());
- channel.closeSocket();
- parent->removeChannel(&channel);
- }
- CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
- {
- outer = _outer;
- parent = _parent;
- group = LINK(_group);
- }
- ~CCommunicator()
- {
- group->Release();
- }
- };
- ICommunicator *createCommunicator(IGroup *group,bool outer)
- {
- assertex(MPserver!=NULL);
- return new CCommunicator(MPserver,group,outer);
- }
- static IInterCommunicator *worldcomm=NULL;
- IInterCommunicator &queryWorldCommunicator()
- {
- CriticalBlock block(CMPServer::serversect);
- assertex(MPserver!=NULL);
- if (!worldcomm)
- worldcomm = new CInterCommunicator(MPserver);
- return *worldcomm;
- }
- void startMPServer(unsigned short port,bool paused)
- {
- assertex(sizeof(PacketHeader)==32);
- CriticalBlock block(CMPServer::serversect);
- if (CMPServer::servernest==0) {
- if (!CMPServer::serverpaused) {
- delete MPserver;
- MPserver = new CMPServer(port);
- }
- if (paused) {
- CMPServer::serverpaused = true;
- return;
- }
- queryLogMsgManager()->setPort(MPserver->getPort());
- MPserver->start();
- CMPServer::serverpaused = false;
- }
- CMPServer::servernest++;
- }
- void stopMPServer()
- {
- CriticalBlock block(CMPServer::serversect);
- if (--CMPServer::servernest==0) {
- stopLogMsgReceivers();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "Stopping MP Server");
- #endif
- CriticalUnblock unblock(CMPServer::serversect);
- assertex(MPserver!=NULL);
- MPserver->stop();
- delete MPserver;
- MPserver = NULL;
- ::Release(worldcomm);
- worldcomm = NULL;
- initMyNode(0);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "Stopped MP Server");
- #endif
- }
- }
- extern mp_decl void addMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CMPServer::serversect);
- assertex(MPserver);
- MPserver->addConnectionMonitor(monitor);
- }
- extern mp_decl void removeMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CMPServer::serversect);
- if (MPserver)
- MPserver->removeConnectionMonitor(monitor);
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(CMPServer::serversect);
- if (MPserver)
- MPserver->getReceiveQueueDetails(buf);
- return buf;
- }
- void registerSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.append((unsigned)handle);
- }
- void unregisterSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.zap((unsigned)handle);
- }
|