1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define mp_decl DECL_EXPORT
- /* TBD
- lost packet disposal
- synchronous send
- connection protocol (HRPC);
- look at all timeouts
- */
- #include <future>
- #include <vector>
- #include "platform.h"
- #include "portlist.h"
- #include "jlib.hpp"
- #include <limits.h>
- #include "jsocket.hpp"
- #include "jmutex.hpp"
- #include "jutil.hpp"
- #include "jthread.hpp"
- #include "jqueue.tpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "mpcomm.hpp"
- #include "mpbuff.hpp"
- #include "mputil.hpp"
- #include "mplog.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- //#define _TRACE
- //#define _FULLTRACE
- //#define _TRACEMPSERVERNOTIFYCLOSED
- #define _TRACEORPHANS
- #define REFUSE_STALE_CONNECTION
- #define MP_PROTOCOL_VERSION 0x102
- #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
- // These should really be configurable
- #define CANCELTIMEOUT 1000 // 1 sec
- #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
- #define CONNECT_READ_TIMEOUT (10*1000) // 10 seconds. NB: used by connect readtms loop (see loopCnt)
- #define CONNECT_TIMEOUT_INTERVAL 1000 // 1 sec
- #define CONNECT_RETRYCOUNT 180 // Overall max connect time is = CONNECT_RETRYCOUNT * CONNECT_READ_TIMEOUT
- #define CONNECT_TIMEOUT_MINSLEEP 2000 // random range: CONNECT_TIMEOUT_MINSLEEP to CONNECT_TIMEOUT_MAXSLEEP milliseconds
- #define CONNECT_TIMEOUT_MAXSLEEP 5000
- #define CONFIRM_TIMEOUT (90*1000) // 1.5 mins
- #define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
- #define TRACESLOW_THRESHOLD 1000 // 1 sec
- #define VERIFY_DELAY (1*60*1000) // 1 Minute
- #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
- #define DIGIT1 U64C(0x10000000000) // (256ULL*256ULL*256ULL*65536ULL)
- #define DIGIT2 U64C(0x100000000) // (256ULL*256ULL*65536ULL)
- #define DIGIT3 U64C(0x1000000) // (256ULL*65536ULL)
- #define DIGIT4 U64C(0x10000) // (65536ULL)
- #define _TRACING
- static CriticalSection childprocesssect;
- static UnsignedArray childprocesslist;
- // IPv6 TBD
- struct SocketEndpointV4
- {
- byte ip[4];
- unsigned short port;
- SocketEndpointV4() {};
- SocketEndpointV4(const SocketEndpoint &val) { set(val); }
- void set(const SocketEndpoint &val)
- {
- port = val.port;
- if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
- IPV6_NOT_IMPLEMENTED();
- }
- void get(SocketEndpoint &val)
- {
- val.setNetAddress(sizeof(ip),&ip);
- val.port = port;
- }
- StringBuffer & getUrlStr(StringBuffer &val)
- {
- SocketEndpoint s;
- this->get(s);
- return s.getUrlStr(val);
- }
- };
- class PacketHeader // standard packet header - no virtuals
- {
- public:
- static unsigned nextseq;
- static unsigned lasttick;
- void initseq()
- {
- sequence = msTick();
- lasttick = sequence;
- if (sequence-nextseq>USHRT_MAX)
- sequence = nextseq++;
- else
- nextseq = sequence+1;
- }
- PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
- {
- size = _size;
- tag = _tag;
- sender.set(_sender);
- target.set(_target);
- replytag = _replytag;
- flags = 0;
- version = MP_PROTOCOL_VERSION;
- initseq();
- }
- PacketHeader() {}
- size32_t size; // 0 total packet size
- mptag_t tag; // 4 packet tag (kind)
- unsigned short version; // 8 protocol version
- unsigned short flags; // 10 flags
- SocketEndpointV4 sender; // 12 who sent
- SocketEndpointV4 target; // 18 who destined for
- mptag_t replytag; // 24 used for reply
- unsigned sequence; // 28 packet type dependant
- // Total 32
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- sender.get(ep);
- mb.init(ep,tag,replytag);
- }
- };
- #if 0
- class PacketHeaderV6 : public PacketHeader
- {
- unsigned senderex[4]; // 32
- unsigned targetex[4]; // 48
- // total 64
- void setMessageFields(CMessageBuffer &mb)
- {
- SocketEndpoint ep;
- ep.setNetAddress(sizeof(senderex),&senderex);
- ep.port = sender.port;
- mb.init(ep,tag,replytag);
- }
- };
- #endif
- unsigned PacketHeader::nextseq=0;
- unsigned PacketHeader::lasttick=0;
- #define MINIMUMPACKETSIZE sizeof(PacketHeader)
- #define MAXDATAPERPACKET 50000
- struct MultiPacketHeader
- {
- mptag_t tag;
- size32_t ofs;
- size32_t size;
- unsigned idx;
- unsigned numparts;
- size32_t total;
- StringBuffer &getDetails(StringBuffer &out) const
- {
- out.append("MultiPacketHeader: ");
- out.append("tag=").append((unsigned)tag);
- out.append(",ofs=").append(ofs);
- out.append(",size=").append(size);
- out.append(",idx=").append(idx);
- out.append(",numparts=").append(numparts);
- out.append(",total=").append(total);
- return out;
- }
- };
- //
- class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
- {
- }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- StringBuffer tmp;
- switch (error) {
- case MPERR_ok: str.append("OK"); break;
- case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).str()); break;
- case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
- case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).str()); break;
- // process crashes (segv, etc.) often cause this exception which is logged and can be misleading
- // change it from "MP link closed" to something more helpful
- case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getUrlStr(tmp).str()); break;
- }
- return str;
- }
- int errorCode() const { return error; }
- MessageAudience errorAudience() const
- {
- return MSGAUD_user;
- }
- virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
- private:
- MessagePassingError error;
- SocketEndpoint endpoint;
- };
- class CBufferQueueNotify
- {
- public:
- virtual bool notify(CMessageBuffer *)=0;
- virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
- };
- class CBufferQueueWaiting
- {
- public:
- enum QWenum { QWcontinue, QWdequeue, QWprobe };
- Semaphore sem;
- CBufferQueueNotify &waiting;
- bool probe;
- CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
- QWenum notify(CMessageBuffer *b)
- {
- // check this for DLL unloaded TBD
- if (waiting.notify(b)) {
- sem.signal();
- return probe?QWprobe:QWdequeue;
- }
- return QWcontinue;
- }
- QWenum notifyClosed(SocketEndpoint &ep)
- {
- // check this for DLL unloaded TBD
- if (waiting.notifyClosed(ep)) {
- sem.signal();
- return QWdequeue;
- }
- return QWcontinue;
- }
- };
- typedef CopyReferenceArrayOf<CBufferQueueWaiting> CWaitingArray;
- class CBufferQueue
- {
- QueueOf<CMessageBuffer, false> received;
- CWaitingArray waiting;
- CriticalSection sect;
- public:
- CBufferQueue()
- {
- }
- void enqueue(CMessageBuffer *b)
- {
- CriticalBlock block(sect);
- unsigned iter=0;
- for (;;) {
- ForEachItemIn(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- if (r==CBufferQueueWaiting::QWdequeue)
- return;
- //CBufferQueueWaiting::QWprobe
- break;
- }
- }
- if (b->getReplyTag() != TAG_CANCEL)
- break;
- if (iter++==10) {
- delete b;
- return;
- }
- CriticalUnblock unblock(sect);
- Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
- }
- received.enqueue(b);
- }
- bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
- {
- CriticalBlock block(sect);
- bool probegot = false;
- ForEachQueueItemIn(i,received) {
- if (nfy.notify(received.item(i))) {
- if (probe) {
- probegot = true;
- }
- else {
- received.dequeue(i);
- return true;
- }
- }
- }
- if (probegot)
- return true;
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CBufferQueueWaiting qwaiting(nfy,probe);
- waiting.append(qwaiting);
- sect.leave();
- bool ok = qwaiting.sem.wait(remaining);
- sect.enter();
- if (!ok) {
- ok = qwaiting.sem.wait(0);
- if (!ok)
- waiting.zap(qwaiting);
- }
- return ok;
- }
- unsigned flush(CBufferQueueNotify &nfy)
- {
- unsigned count = 0;
- CriticalBlock block(sect);
- ForEachQueueItemInRev(i,received) {
- if (nfy.notify(received.item(i))) {
- count++;
- delete received.dequeue(i);
- }
- }
- return count;
- }
- void notifyClosed(SocketEndpoint &ep)
- {
- CriticalBlock block(sect);
- ForEachItemInRev(i,waiting) {
- CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
- if (r!=CBufferQueueWaiting::QWcontinue) {
- waiting.remove(i);
- }
- }
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(sect);
- ForEachQueueItemIn(i,received) {
- received.item(i)->getDetails(buf).append('\n');
- }
- return buf;
- }
- };
- static UnsignedShortArray freetags;
- static unsigned nextfreetag=0;
- unsigned short generateDynamicTag()
- {
- if (freetags.ordinality())
- return freetags.popGet();
- return nextfreetag++;
- }
- void releaseDynamicTag(unsigned short tag)
- {
- freetags.append(tag);
- }
- bool check_kernparam(const char *path, int *value)
- {
- #ifdef __linux__
- FILE *f = fopen(path,"r");
- char res[32];
- char *r = 0;
- if (f) {
- r = fgets(res, sizeof(res), f);
- fclose(f);
- if (r) {
- *value = atoi(r);
- return true;
- }
- }
- #endif
- return false;
- }
- bool check_somaxconn(int *val)
- {
- return check_kernparam("/proc/sys/net/core/somaxconn", val);
- }
- class CMPServer;
- class CMPChannel;
- class CMPConnectThread: public Thread
- {
- bool running;
- ISocket *listensock;
- CMPServer *parent;
- int mpSoMaxConn;
- unsigned mpTraceLevel;
- Owned<IWhiteListHandler> whiteListCallback;
- void checkSelfDestruct(void *p,size32_t sz);
- public:
- CMPConnectThread(CMPServer *_parent, unsigned port);
- ~CMPConnectThread()
- {
- ::Release(listensock);
- }
- int run();
- void startPort(unsigned short port);
- void stop()
- {
- if (running) {
- running = false;
- listensock->cancel_accept();
- if (!join(1000*60*5)) // should be pretty instant
- printf("CMPConnectThread::stop timed out\n");
- }
- }
- void installWhiteListCallback(IWhiteListHandler *_whiteListCallback)
- {
- whiteListCallback.set(_whiteListCallback);
- }
- IWhiteListHandler *queryWhiteListCallback() const
- {
- return whiteListCallback;
- }
- };
- class PingPacketHandler;
- class PingReplyPacketHandler;
- class MultiPacketHandler;
- class BroadcastPacketHandler;
- class ForwardPacketHandler;
- class UserPacketHandler;
- class CMPNotifyClosedThread;
- typedef SuperHashTableOf<CMPChannel,SocketEndpoint> CMPChannelHT;
- class CMPServer: private CMPChannelHT, implements IMPServer
- {
- byte RTsalt;
- ISocketSelectHandler *selecthandler;
- CMPConnectThread *connectthread;
- CBufferQueue receiveq;
- CMPNotifyClosedThread *notifyclosedthread;
- CriticalSection sect;
- protected:
- unsigned __int64 role;
- unsigned short port;
- public:
- bool checkclosed;
- bool tryReopenChannel = false;
- // packet handlers
- PingPacketHandler *pingpackethandler; // TAG_SYS_PING
- PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
- ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
- MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
- BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
- UserPacketHandler *userpackethandler; // default
- IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
- CMPServer(unsigned __int64 _role, unsigned _port);
- ~CMPServer();
- void start();
- virtual void stop();
- unsigned short getPort() const { return port; }
- unsigned __int64 getRole() const { return role; }
- void setPort(unsigned short _port) { port = _port; }
- CMPChannel *lookup(const SocketEndpoint &remoteep);
- ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
- CBufferQueue &getReceiveQ() { return receiveq; }
- void checkTagOK(mptag_t tag);
- bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
- void flush(mptag_t tag);
- unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
- void cancel(const SocketEndpoint *ep, mptag_t tag);
- bool nextChannel(CMPChannel *&c);
- void addConnectionMonitor(IConnectionMonitor *monitor);
- void removeConnectionMonitor(IConnectionMonitor *monitor);
- void notifyClosed(SocketEndpoint &ep, bool trace);
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- return receiveq.getReceiveQueueDetails(buf);
- }
- void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
- protected:
- void onAdd(void *);
- void onRemove(void *e);
- unsigned getHashFromElement(const void *e) const;
- unsigned getHashFromFindParam(const void *fp) const;
- const void * getFindParam(const void *p) const;
- bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
- CriticalSection replyTagSect;
- int rettag;
- INode *myNode;
- public:
- virtual mptag_t createReplyTag()
- {
- // these are short-lived so a simple increment will do (I think this is OK!)
- mptag_t ret;
- {
- CriticalBlock block(replyTagSect);
- if (RTsalt==0xff) {
- RTsalt = (byte)(getRandom()%16);
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- if (rettag>(int)TAG_REPLY_BASE) { // wrapped
- rettag = (int)TAG_REPLY_BASE-RTsalt;
- }
- ret = (mptag_t)rettag;
- rettag -= 16;
- }
- flush(ret);
- return ret;
- }
- virtual ICommunicator *createCommunicator(IGroup *group, bool outer);
- virtual INode *queryMyNode()
- {
- return myNode;
- }
- virtual void setOpt(MPServerOpts opt, const char *value)
- {
- switch (opt)
- {
- case mpsopt_channelreopen:
- {
- bool tf = (nullptr != value) ? strToBool(value) : false;
- PROGLOG("Setting ChannelReopen = %s", tf ? "true" : "false");
- tryReopenChannel = tf;
- break;
- }
- default:
- // ignore
- break;
- }
- }
- virtual void installWhiteListCallback(IWhiteListHandler *whiteListCallback) override
- {
- connectthread->installWhiteListCallback(whiteListCallback);
- }
- virtual IWhiteListHandler *queryWhiteListCallback() const override
- {
- return connectthread->queryWhiteListCallback();
- }
- };
- //===========================================================================
- class CMPNotifyClosedThread: public Thread
- {
- IArrayOf<IConnectionMonitor> connectionmonitors;
- CriticalSection conmonsect;
- SimpleInterThreadQueueOf<INode, false> workq;
- bool stopping;
- CMPServer *parent;
- CriticalSection stopsect;
- public:
- CMPNotifyClosedThread(CMPServer *_parent)
- : Thread("CMPNotifyClosedThread")
- {
- parent = _parent;
- stopping = false;
- }
- ~CMPNotifyClosedThread()
- {
- IArrayOf<IConnectionMonitor> todelete;
- CriticalBlock block(conmonsect);
- while (connectionmonitors.ordinality())
- todelete.append(connectionmonitors.popGet());
- }
- void addConnectionMonitor(IConnectionMonitor *monitor)
- {
- if (monitor)
- connectionmonitors.append(*LINK(monitor));
- }
- void removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- if (monitor) {
- CriticalBlock block(conmonsect);
- connectionmonitors.zap(*monitor);
- }
- }
- int run()
- {
- for (;;) {
- try {
- Owned<INode> node = workq.dequeue();
- if (node->endpoint().isNull())
- break;
- SocketEndpoint ep = node->endpoint();
- parent->getReceiveQ().notifyClosed(ep);
- IArrayOf<IConnectionMonitor> toclose;
- {
- CriticalBlock block(conmonsect);
- ForEachItemIn(i1,connectionmonitors) {
- toclose.append(*LINK(&connectionmonitors.item(i1)));
- }
- }
- ForEachItemIn(i,toclose) {
- toclose.item(i).onClose(ep);
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- e->Release();
- }
- }
- return 0;
- }
- void stop()
- {
- {
- CriticalBlock block(stopsect);
- if (!stopping) {
- stopping = true;
- SocketEndpoint ep;
- workq.enqueue(createINode(ep));
- }
- }
- while (!join(1000*60*3))
- PROGLOG("CMPNotifyClosedThread join failed");
- }
- void notify(SocketEndpoint &ep)
- {
- CriticalBlock block(stopsect);
- if (!stopping&&!ep.isNull()) {
- if (workq.ordinality()>100)
- PROGLOG("MP: %d waiting to close",workq.ordinality());
- workq.enqueue(createINode(ep));
- }
- }
- };
- void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
- {
- dbgassertex(timeoutChkIntervalMs < timeoutMs);
- StringBuffer epStr;
- CCycleTimer readTmsTimer;
- unsigned intervalTimeoutMs = timeoutChkIntervalMs;
- for (;;)
- {
- try
- {
- sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (JSOCKERR_graceful_close == e->errorCode())
- return;
- else if (JSOCKERR_timeout_expired != e->errorCode())
- throw;
- unsigned elapsedMs = readTmsTimer.elapsedMs();
- if (elapsedMs >= timeoutMs)
- throw;
- unsigned remainingMs = timeoutMs-elapsedMs;
- if (remainingMs < timeoutChkIntervalMs)
- intervalTimeoutMs = remainingMs;
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
- }
- }
- if (readTmsTimer.elapsedMs() >= TRACESLOW_THRESHOLD)
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- sock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
- }
- }
- /* Legacy header sent id[2] only.
- * To remain backward compatible (when new MP clients are connecting to old Dali),
- * we send a regular empty PacketHeader as well that has the 'role' embedded within it,
- * in unused fields. TAG_SYS_BCAST is used as the message tag, because it is an
- * unused feature that all Dali's simply receive and delete.
- */
- struct ConnectHdr
- {
- ConnectHdr(const SocketEndpoint &hostEp, const SocketEndpoint &remoteEp, unsigned __int64 role)
- {
- id[0].set(hostEp);
- id[1].set(remoteEp);
- hdr.size = sizeof(PacketHeader);
- hdr.tag = TAG_SYS_BCAST;
- hdr.flags = 0;
- hdr.version = MP_PROTOCOL_VERSION;
- setRole(role);
- }
- ConnectHdr()
- {
- }
- SocketEndpointV4 id[2];
- PacketHeader hdr;
- inline void setRole(unsigned __int64 role)
- {
- hdr.replytag = (mptag_t) (role >> 32);
- hdr.sequence = (unsigned) (role & 0xffffffff);
- }
- inline unsigned __int64 getRole() const
- {
- return (((unsigned __int64)hdr.replytag)<<32) | ((unsigned __int64)hdr.sequence);
- }
- };
- class CMPPacketReader;
- class CMPChannel: public CInterface
- {
- ISocket *channelsock = nullptr;
- CMPServer *parent;
- Mutex sendmutex;
- Semaphore sendwaitingsig;
- unsigned sendwaiting = 0; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
- CriticalSection connectsect;
- CMPPacketReader *reader;
- bool master = false; // i.e. connected originally
- mptag_t multitag = TAG_NULL; // current multi send in progress
- bool closed = false;
- IArrayOf<ISocket> keptsockets;
- CriticalSection attachsect;
- unsigned __int64 attachaddrval = 0;
- SocketEndpoint attachep, attachPeerEp;
- atomic_t attachchk;
- protected: friend class CMPServer;
- SocketEndpoint remoteep;
- SocketEndpoint localep; // who the other end thinks I am
- protected: friend class CMPPacketReader;
- unsigned lastxfer;
- #ifdef _FULLTRACE
- unsigned startxfer;
- unsigned numiter;
- #endif
- bool checkReconnect(CTimeMon &tm)
- {
- if (!parent->tryReopenChannel)
- return false;
- ::Release(channelsock);
- channelsock = nullptr;
- if (connect(tm))
- return true;
- WARNLOG("Failed to reconnect");
- return false;
- }
- bool connect(CTimeMon &tm)
- {
- // must be called from connectsect
- // also in sendmutex
- Owned<ISocket> newsock;
- unsigned retrycount = CONNECT_RETRYCOUNT;
- unsigned remaining;
- Owned<IException> exitException;
- while (!channelsock)
- {
- try
- {
- StringBuffer str;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).str());
- #endif
- if (((int)tm.timeout)<0)
- remaining = CONNECT_TIMEOUT;
- else if (tm.timedout(&remaining))
- {
- #ifdef _FULLTRACE
- PROGLOG("MP: connect timed out 1");
- #endif
- return false;
- }
- if (remaining<10000)
- remaining = 10000; // 10s min granularity for MP
- newsock.setown(ISocket::connect_timeout(remoteep,remaining));
- newsock->set_keep_alive(true);
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
- #endif
- SocketEndpoint hostep;
- hostep.setLocalHost(parent->getPort());
- ConnectHdr connectHdr(hostep, remoteep, parent->getRole());
- unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
- #ifdef _TRACE
- PROGLOG("MP: connect addrval = %" I64F "u", addrval);
- #endif
- newsock->write(&connectHdr,sizeof(connectHdr));
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- connectHdr.id[0].getUrlStr(tmp1);
- tmp1.append(' ');
- connectHdr.id[1].getUrlStr(tmp1);
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
- #endif
- size32_t rd = 0;
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write, waiting for read");
- #endif
- // Wait for connection reply but also check for A<->B deadlock (where both processes are here
- // waiting for other side to send confirm) and decide who stops waiting based on address.
- // To be compatible with older versions of mplib which will not do this,
- // loop with short wait time and release CS to allow other side to proceed
- StringBuffer epStr;
- unsigned startMs = msTick();
- unsigned loopCnt = CONNECT_READ_TIMEOUT / CONNECT_TIMEOUT_INTERVAL + 1;
- #ifdef _TRACE
- PROGLOG("MP: loopCnt start = %u", loopCnt);
- #endif
- while (loopCnt-- > 0)
- {
- {
- CriticalBlock block(attachsect);
- #ifdef _TRACE
- PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", atomic_read(&attachchk), loopCnt);
- #endif
- if (atomic_read(&attachchk) > 0)
- {
- if (remoteep.equals(attachep))
- {
- #ifdef _TRACE
- PROGLOG("MP: deadlock situation [] attachaddrval = %" I64F "u addrval = %" I64F "u", attachaddrval, addrval);
- #endif
- if (attachaddrval < addrval)
- break;
- }
- }
- }
- rd = 0;
- MemoryBuffer replyMb;
- void *replyMem = replyMb.ensureCapacity(0x1000); // 4K - max size to allow for serialized exception
- try
- {
- newsock->readtms(replyMem, sizeof(rd), replyMb.capacity(), rd, CONNECT_TIMEOUT_INTERVAL);
- }
- catch (IException *e)
- {
- #ifdef _TRACE
- PROGLOG("MP: loop exception code = %d, loopCnt = %u", e->errorCode(), loopCnt);
- #endif
- if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
- ((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
- {
- if (tm.timedout(&remaining))
- {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 3");
- #endif
- e->Release();
- return false;
- }
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
- { // don't bother retrying on async send
- e->Release();
- throw new CMPException(MPERR_connection_failed,remoteep);
- }
- // if other side closes, connect again
- if (e->errorCode() == JSOCKERR_graceful_close)
- {
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
- e->Release();
- break;
- }
- e->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
- #endif
- }
- else
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- newsock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("MP: connect to: %s, stalled for %d ms so far", epStr.str(), msTick()-startMs);
- e->Release();
- }
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: rd = %d", rd);
- #endif
- /* NB: legacy clients that don't handle the exception deserialization here
- * will see reply as success, so no clean error,
- * but will fail shortly afterwards since server connection is closed
- */
- if (rd > sizeof(rd)) // legacy clients will only ever send a reply of 0 or 4, if greater, then new client is replying with an exception
- {
- MemoryBuffer mb;
- mb.setBuffer(rd, replyMem, false);
- size32_t len;
- mb.read(len); // exception length
- if (len)
- {
- exitException.setown(deserializeException(mb));
- throw exitException.getLink();
- }
- break;
- }
- else if (rd != 0)
- {
- assertex(rd == sizeof(rd));
- break;
- }
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(connectHdr)=%lu", rd, reply, sizeof(connectHdr));
- #endif
- if (rd)
- {
- unsigned elapsedMs = msTick() - startMs;
- if (elapsedMs >= TRACESLOW_THRESHOLD)
- {
- if (0 == epStr.length())
- {
- SocketEndpoint ep;
- newsock->getPeerEndpoint(ep);
- ep.getUrlStr(epStr);
- }
- WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
- }
- if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
- {
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.str());
- #endif
- lastxfer = msTick();
- closed = false;
- break;
- }
- }
- }
- catch (IException *e)
- {
- if (exitException)
- throw;
- if (tm.timedout(&remaining)) {
- #ifdef _FULLTRACE
- EXCLOG(e,"MP: connect timed out 2");
- #endif
- e->Release();
- return false;
- }
- #ifdef _TRACE
- EXCLOG(e, "MP: Failed to connect");
- #endif
- e->Release();
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
- IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
- throw e;
- }
- #ifdef _TRACE
- StringBuffer str;
- str.clear();
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
- #endif
- }
- newsock.clear();
- {
- CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
- #ifdef _FULLTRACE
- PROGLOG("MP: before sleep");
- #endif
- // check often if channelsock was created from accept thread
- Sleep(50);
- unsigned totalt = CONNECT_TIMEOUT_MINSLEEP + getRandom() % (CONNECT_TIMEOUT_MAXSLEEP-CONNECT_TIMEOUT_MINSLEEP);
- unsigned startt = msTick();
- unsigned deltat = 0;
- while (deltat < totalt)
- {
- {
- CriticalBlock block(connectsect);
- if (channelsock)
- break;
- }
- deltat = msTick() - startt;
- Sleep(50);
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: after sleep");
- #endif
- }
- }
- return true;
- }
- public:
- Semaphore pingsem;
- CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
- ~CMPChannel();
- void reset();
- bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval=0);
- bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- Linked<ISocket> dest;
- {
- CriticalBlock block(connectsect);
- if (closed) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
- PrintStackReport();
- #endif
- if (!checkReconnect(tm))
- throw new CMPException(MPERR_link_closed,remoteep);
- }
- if (!channelsock) {
- if (!connect(tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
- #endif
- return false;
- }
- }
- dest.set(channelsock);
- }
- try {
- #ifdef _FULLTRACE
- unsigned t1 = msTick();
- #endif
- if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
- // if (tm.timeout!=MP_ASYNC_SEND) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (channelsock->wait_write(remaining)==0) {
- return false;
- }
- if (tm.timedout())
- return false;
- }
- // exception checking TBD
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
- unsigned t2 = msTick();
- #endif
- unsigned n = 0;
- const void *bufs[3];
- size32_t sizes[3];
- if (hdrsize) {
- bufs[n] = hdr;
- sizes[n++] = hdrsize;
- }
- if (hdr2size) {
- bufs[n] = hdr2;
- sizes[n++] = hdr2size;
- }
- if (bodysize) {
- bufs[n] = body;
- sizes[n++] = bodysize;
- }
- if (!dest) {
- LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
- return false;
- }
- dest->write_multiple(n,bufs,sizes);
- lastxfer = msTick();
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
- #endif
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
- closeSocket(false, true);
- throw;
- }
- return true;
- }
- bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
- }
- bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
- {
- return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
- }
- bool sendPing(CTimeMon &tm);
- bool sendPingReply(unsigned timeout,bool identifyself);
- bool verifyConnection(CTimeMon &tm,bool allowconnect)
- {
- {
- CriticalBlock block(connectsect);
- if (!channelsock&&allowconnect)
- return connect(tm);
- if (closed||!channelsock)
- return false;
- if ((msTick()-lastxfer)<VERIFY_DELAY)
- return true;
- }
- StringBuffer ep;
- remoteep.getUrlStr(ep);
- for (;;) {
- CTimeMon pingtm(1000*60);
- if (sendPing(pingtm))
- break;
- {
- CriticalBlock block(connectsect);
- if (closed||!channelsock)
- return false;
- }
- if (tm.timedout()) {
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
- closeSocket();
- return false;
- }
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
- unsigned remaining;
- if (!pingtm.timedout(&remaining)&&remaining)
- Sleep(remaining);
- }
- return true;
- }
- bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
- void closeSocket(bool keepsocket=false, bool trace=false)
- {
- ISocket *s;
- bool socketfailed = false;
- {
- CriticalBlock block(connectsect);
- if (!channelsock)
- return;
- lastxfer = msTick();
- closed = true;
- if (parent)
- parent->checkclosed = true;
- s=channelsock;
- channelsock = nullptr;
- {
- CriticalBlock block(attachsect);
- attachaddrval = 0;
- attachep.set(nullptr);
- attachPeerEp.set(nullptr);
- atomic_set(&attachchk, 0);
- }
- if (!keepsocket) {
- try {
- s->shutdown();
- }
- catch (IException *e) {
- socketfailed = true; // ignore if the socket has been closed
- WARNLOG("closeSocket() : Ignoring shutdown error");
- e->Release();
- }
- }
- parent->querySelectHandler().remove(s);
- }
- parent->notifyClosed(remoteep, trace);
- if (socketfailed) {
- try {
- s->Release();
- }
- catch (IException *) {
- // ignore
- }
- }
- else if (keepsocket) {
- // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
- if (keptsockets.ordinality()>10)
- keptsockets.remove(0);
- keptsockets.append(*s);
- }
- else {
- try {
- s->close();
- }
- catch (IException *) {
- socketfailed = true; // ignore if the socket has been closed
- }
- s->Release();
- }
- }
- CMPServer &queryServer() { return *parent; }
- void monitorCheck();
- StringBuffer & queryEpStr(StringBuffer &s)
- {
- return remoteep.getUrlStr(s);
- }
- bool isClosed()
- {
- return closed;
- }
- bool isConnected()
- {
- return !closed&&(channelsock!=NULL);
- }
- const SocketEndpoint &queryPeerEp() const { return attachPeerEp; }
- };
- // Message Handlers (not done as interfaces for speed reasons
- class UserPacketHandler // default
- {
- CMPServer *server;
- public:
- UserPacketHandler(CMPServer *_server)
- {
- server = _server;
- }
- void handle(CMessageBuffer *msg) // takes ownership of message buffer
- {
- server->getReceiveQ().enqueue(msg);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
- {
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class PingPacketHandler // TAG_SYS_PING
- {
- public:
- void handle(CMPChannel *channel,bool identifyself)
- {
- channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),tm);
- }
- };
- class PingReplyPacketHandler // TAG_SYS_PING_REPLY
- {
- public:
- void handle(CMPChannel *channel)
- {
- channel->pingsem.signal();
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
- {
- return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
- }
- };
- class CMultiPacketReceiver: public CInterface
- { // assume each sender only sends one multi-message per channel
- public:
- SocketEndpoint sender;
- MultiPacketHeader info;
- CMessageBuffer *msg;
- byte * ptr;
- };
- class MultiPacketHandler // TAG_SYS_MULTI
- {
- CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
- CriticalSection sect;
- unsigned lastErrMs;
- void logError(unsigned code, MultiPacketHeader &mhdr, CMessageBuffer &msg, MultiPacketHeader *otherMhdr)
- {
- unsigned ms = msTick();
- if ((ms-lastErrMs) > 1000) // avoid logging too much
- {
- StringBuffer errorMsg("sender=");
- msg.getSender().getUrlStr(errorMsg).newline();
- errorMsg.append("This header: ");
- mhdr.getDetails(errorMsg).newline();
- if (otherMhdr)
- {
- errorMsg.append("Other header: ");
- otherMhdr->getDetails(errorMsg).newline();
- }
- msg.getDetails(errorMsg);
- LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (%d) %s", code, errorMsg.str());
- }
- lastErrMs = ms;
- }
- public:
- MultiPacketHandler() : lastErrMs(0)
- {
- }
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- if (!msg)
- return NULL;
- CriticalBlock block(sect);
- MultiPacketHeader mhdr;
- msg->read(sizeof(mhdr),&mhdr);
- CMultiPacketReceiver *recv=NULL;
- ForEachItemIn(i,inprogress) {
- CMultiPacketReceiver &mpr = inprogress.item(i);
- if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
- recv = &mpr;
- break;
- }
- }
- if (mhdr.idx==0) {
- if ((mhdr.ofs!=0)||(recv!=NULL)) {
- logError(1, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- recv = new CMultiPacketReceiver;
- recv->msg = new CMessageBuffer();
- recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
- recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
- recv->sender = msg->getSender();
- recv->info = mhdr;
- inprogress.append(*recv);
- }
- else {
- if ((recv==NULL)||(mhdr.ofs==0)||
- (recv->info.ofs+recv->info.size!=mhdr.ofs)||
- (recv->info.idx+1!=mhdr.idx)||
- (recv->info.total!=mhdr.total)||
- (mhdr.ofs+mhdr.size>mhdr.total)) {
- logError(2, mhdr, *msg, recv?&recv->info:NULL);
- delete msg;
- return NULL;
- }
- }
- msg->read(mhdr.size,recv->ptr+mhdr.ofs);
- delete msg;
- msg = NULL;
- recv->info = mhdr;
- if (mhdr.idx+1==mhdr.numparts) {
- if (mhdr.ofs+mhdr.size!=mhdr.total) {
- logError(3, mhdr, *msg, NULL);
- return NULL;
- }
- msg = recv->msg;
- inprogress.remove(i);
- }
- return msg;
- }
- bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
- {
- // must not adjust mb
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- PacketHeader outhdr;
- outhdr = hdr;
- outhdr.tag = TAG_SYS_MULTI;
- MultiPacketHeader mhdr;
- mhdr.total = hdr.size-sizeof(hdr);
- mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
- mhdr.size = mhdr.total/mhdr.numparts;
- mhdr.tag = hdr.tag;
- mhdr.ofs = 0;
- mhdr.idx = 0;
- const byte *p = (const byte *)mb.toByteArray();
- unsigned i=0;
- for (;;) {
- if (i+1==mhdr.numparts)
- mhdr.size = mhdr.total-mhdr.ofs;
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
- #endif
- outhdr.initseq();
- outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
- if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
- #endif
- return false;
- }
- i++;
- if (i==mhdr.numparts)
- break;
- sendmutex.unlock(); // allow other messages to interleave
- sendmutex.lock();
- mhdr.idx++;
- mhdr.ofs += mhdr.size;
- p += mhdr.size;
- }
- return true;
- }
- };
- class BroadcastPacketHandler // TAG_SYS_BCAST
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- class ForwardPacketHandler // TAG_SYS_FORWARD
- {
- public:
- CMessageBuffer *handle(CMessageBuffer * msg)
- {
- delete msg;
- return NULL;
- }
- };
- // --------------------------------------------------------
- class CMPPacketReader: public ISocketSelectNotify, public CInterface
- {
- CMessageBuffer *activemsg;
- byte * activeptr;
- size32_t remaining;
- CMPChannel *parent;
- CriticalSection sect;
- public:
- IMPLEMENT_IINTERFACE;
- CMPPacketReader(CMPChannel *_parent)
- {
- init(_parent);
- }
- void init(CMPChannel *_parent)
- {
- parent = _parent;
- activemsg = NULL;
- }
- void shutdown()
- {
- CriticalBlock block(sect);
- parent = NULL;
- }
- bool notifySelected(ISocket *sock,unsigned selected)
- {
- if (!parent)
- return false;
- try {
- // try and mop up all data on socket
-
- size32_t sizeavail = sock->avail_read();
- if (sizeavail==0) {
- // graceful close
- Linked<CMPChannel> pc;
- {
- CriticalBlock block(sect);
- if (parent) {
- pc.set(parent); // don't want channel to disappear during call
- parent = NULL;
- }
- }
- if (pc)
- {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPPacketReader::notifySelected() about to close socket, mode = 0x%x", selected);
- #endif
- pc->closeSocket(false, true);
- }
- return false;
- }
- do {
- parent->lastxfer = msTick();
- #ifdef _FULLTRACE
- parent->numiter++;
- #endif
- if (!activemsg) { // no message in progress
- PacketHeader hdr; // header for active message
- #ifdef _FULLTRACE
- parent->numiter = 1;
- parent->startxfer = msTick();
- #endif
- // assumes packet header will arrive in one go
- if (sizeavail<sizeof(hdr)) {
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
- #endif
- size32_t szread;
- sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
- }
- else
- sock->read(&hdr,sizeof(hdr));
- if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
- // TBD IPV6 here
- SocketEndpoint ep;
- hdr.sender.get(ep);
- IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
- throw e;
- }
- if (sizeavail<=sizeof(hdr))
- sizeavail = sock->avail_read();
- else
- sizeavail -= sizeof(hdr);
- #ifdef _FULLTRACE
- StringBuffer ep1;
- StringBuffer ep2;
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
- #endif
- remaining = hdr.size-sizeof(hdr);
- activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
- activeptr = (byte *)activemsg->reserveTruncate(remaining);
- hdr.setMessageFields(*activemsg);
- }
-
- size32_t toread = sizeavail;
- if (toread>remaining)
- toread = remaining;
- if (toread) {
- sock->read(activeptr,toread);
- remaining -= toread;
- sizeavail -= toread;
- activeptr += toread;
- }
- if (remaining==0) { // we have the packet so process
- #ifdef _FULLTRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
- #endif
- do {
- switch (activemsg->getTag()) {
- case TAG_SYS_MULTI:
- activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
- break;
- case TAG_SYS_PING:
- parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_PING_REPLY:
- parent->queryServer().pingreplypackethandler->handle(parent);
- delete activemsg;
- activemsg = NULL;
- break;
- case TAG_SYS_BCAST:
- activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
- break;
- case TAG_SYS_FORWARD:
- activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
- break;
- default:
- parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
- activemsg = NULL;
- }
- } while (activemsg);
- }
- if (!sizeavail)
- sizeavail = sock->avail_read();
- } while (sizeavail);
- return false; // ok
- }
- catch (IException *e) {
- if (e->errorCode()!=JSOCKERR_graceful_close)
- FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
- e->Release();
- }
- // error here, so close socket (ignore error as may be closed already)
- try {
- if(parent)
- parent->closeSocket(false, true);
- }
- catch (IException *e) {
- e->Release();
- }
- parent = NULL;
- return false;
- }
- };
- CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep) : parent(_parent), remoteep(_remoteep)
- {
- localep.set(parent->getPort());
- reader = new CMPPacketReader(this);
- attachep.set(nullptr);
- atomic_set(&attachchk, 0);
- lastxfer = msTick();
- }
- void CMPChannel::reset()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket(false, true);
- reader->Release();
- channelsock = nullptr;
- multitag = TAG_NULL;
- reader = new CMPPacketReader(this);
- closed = false;
- master = false;
- sendwaiting = 0;
- attachaddrval = 0;
- attachep.set(nullptr);
- attachPeerEp.set(nullptr);
- atomic_set(&attachchk, 0);
- lastxfer = msTick();
- }
- CMPChannel::~CMPChannel()
- {
- reader->shutdown(); // clear as early as possible
- closeSocket();
- reader->Release();
- }
- bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval) // takes ownership if succeeds
- {
- struct attachdTor
- {
- atomic_t &attchk;
- attachdTor(atomic_t &_attchk) : attchk(_attchk) { }
- ~attachdTor() { atomic_dec(&attchk); }
- } attachChk (attachchk);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket on entry, ismaster = %d, confirm = %p, channelsock = %p, addrval = %" I64F "u", ismaster, confirm, channelsock, addrval);
- #endif
- {
- CriticalBlock block(attachsect);
- attachaddrval = addrval;
- attachep = _remoteep;
- if (newsock)
- newsock->getPeerEndpoint(attachPeerEp);
- atomic_inc(&attachchk);
- }
- CriticalBlock block(connectsect);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket got connectsect, channelsock = %p", channelsock);
- #endif
- // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
- if (channelsock) {
- if (_remoteep.port==0)
- return false;
- StringBuffer ep1;
- StringBuffer ep2;
- _localep.getUrlStr(ep1);
- _remoteep.getUrlStr(ep2);
- LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
- try {
- if (ismaster!=master) {
- if (ismaster) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
- return false;
- }
- else {
- Sleep(50); // give the other side some time to close
- CTimeMon tm(10000);
- if (verifyConnection(tm,false)) {
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
- return false;
- }
- }
- }
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
- e->Release();
- }
- try {
- LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
- CriticalUnblock unblock(connectsect);
- closeSocket(true, true);
- #ifdef REFUSE_STALE_CONNECTION
- if (!ismaster)
- return false;
- #endif
- Sleep(100); // pause to allow close socket triggers to run
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
- e->Release();
- }
- }
- if (confirm)
- newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
- closed = false;
- reader->init(this);
- channelsock = LINK(newsock);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket before select add");
- #endif
- parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
- #ifdef _FULLTRACE
- PROGLOG("MP: attachSocket after select add");
- #endif
- localep = _localep;
- master = ismaster;
- return true;
- }
- bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
- {
- // note must not adjust mb
- assertex(tag!=TAG_NULL);
- assertex(tm.timeout);
- size32_t msgsize = mb.length();
- PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
- if (closed||(reply&&!isConnected())) // flag error if has been disconnected
- {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
- PrintStackReport();
- #endif
- if (!checkReconnect(tm))
- throw new CMPException(MPERR_link_closed,remoteep);
- }
- bool ismulti = (msgsize>MAXDATAPERPACKET);
- // pre-condition - ensure no clashes
- for (;;)
- {
- sendmutex.lock();
- if (ismulti)
- {
- if (multitag==TAG_NULL) // don't want to interleave with other multi send
- {
- multitag = tag;
- break;
- }
- }
- else if (multitag!=tag) // don't want to interleave with another of same tag
- break;
- /* NB: block clashing multi packet sends until current one is done,
- * but note that the multipackethandler-send() temporarily releases the sendmutex,
- * between packets, to allow other tags to interleave
- */
- sendwaiting++;
- sendmutex.unlock();
- sendwaitingsig.wait();
- }
- struct Cpostcondition // can we start using eiffel
- {
- Mutex &sendmutex;
- unsigned &sendwaiting;
- Semaphore &sendwaitingsig;
- mptag_t *multitag;
- Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
- : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
- {
- multitag = _multitag;
- }
- ~Cpostcondition()
- {
- if (multitag)
- *multitag = TAG_NULL;
- if (sendwaiting)
- {
- sendwaitingsig.signal(sendwaiting);
- sendwaiting = 0;
- }
- sendmutex.unlock();
- }
- } postcond(sendmutex, sendwaiting, sendwaitingsig, (ismulti && (multitag != TAG_NULL)) ? &multitag : nullptr);
- if (ismulti)
- return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
- return parent->userpackethandler->send(this,hdr,mb,tm);
- }
- bool CMPChannel::sendPing(CTimeMon &tm)
- {
- unsigned remaining;
- tm.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
- bool ret = false;
- try {
- ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
- e->Release();
- }
- sendmutex.unlock();
- if (ret)
- ret = pingsem.wait(remaining);
- return ret;
- }
- bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
- {
- CTimeMon mon(timeout);
- unsigned remaining;
- mon.timedout(&remaining);
- if (!sendmutex.lockWait(remaining))
- return false;
- SocketEndpoint myep(parent->getPort());
- MemoryBuffer mb;
- if (identifyself) {
- #ifdef _WIN32
- mb.append(GetCommandLine());
- #endif
- }
- PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
- bool ret;
- try {
- ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
- }
- catch (IException *e) {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
- e->Release();
- ret = false;
- }
- sendmutex.unlock();
- return ret;
- }
-
- // --------------------------------------------------------
- CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
- : Thread("MP Connection Thread")
- {
- parent = _parent;
- mpSoMaxConn = 0;
- mpTraceLevel = 0;
- Owned<IPropertyTree> env = getHPCCEnvironment();
- if (env)
- {
- mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
- if (!mpSoMaxConn)
- mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
- mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
- }
- if (mpSoMaxConn)
- {
- int kernSoMaxConn = 0;
- bool soMaxCheck = check_somaxconn(&kernSoMaxConn);
- if (soMaxCheck && (mpSoMaxConn > kernSoMaxConn))
- WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
- }
- if (!mpSoMaxConn)
- mpSoMaxConn = DEFAULT_LISTEN_QUEUE_SIZE;
- if (!port)
- {
- // need to connect early to resolve clash
- unsigned minPort, maxPort;
- if (env)
- {
- minPort = env->getPropInt("EnvSettings/mpStart", 0);
- if (!minPort)
- minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
- maxPort = env->getPropInt("EnvSettings/mpEnd", 0);
- if (!maxPort)
- maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
- }
- else
- {
- minPort = MP_START_PORT;
- maxPort = MP_END_PORT;
- }
- assertex(maxPort >= minPort);
- Owned<IJSOCK_Exception> lastErr;
- unsigned numPorts = maxPort - minPort + 1;
- for (unsigned retries = 0; retries < numPorts * 3; retries++)
- {
- port = minPort + getRandom() % numPorts;
- try
- {
- listensock = ISocket::create(port, mpSoMaxConn);
- break;
- }
- catch (IJSOCK_Exception *e)
- {
- if (e->errorCode()!=JSOCKERR_port_in_use)
- throw;
- lastErr.setown(e);
- }
- }
- if (!listensock)
- throw lastErr.getClear();
- }
- else
- listensock = NULL; // delay create till running
- parent->setPort(port);
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
- #endif
- running = false;
- }
- void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
- {
- byte *b = (byte *)p;
- while (sz--)
- if (*(b++)!=0xff)
- return;
- // Panic!
- PROGLOG("MP Self destruct invoked");
- try {
- if (listensock) {
- listensock->close();
- listensock->Release();
- listensock=NULL;
- }
- }
- catch (...)
- {
- PROGLOG("MP socket close failure");
- }
- // Kill registered child processes
- PROGLOG("MP self destruct exit");
- queryLogMsgManager()->flushQueue(10*1000);
- #ifdef _WIN32
- ForEachItemIn(i,childprocesslist)
- TerminateProcess((HANDLE)childprocesslist.item(i), 1);
- TerminateProcess(GetCurrentProcess(), 1);
- #else
- ForEachItemIn(i,childprocesslist)
- ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
- ::kill(getpid(), SIGTERM);
- #endif
- _exit(1);
- }
- void CMPConnectThread::startPort(unsigned short port)
- {
- if (!listensock)
- listensock = ISocket::create(port, mpSoMaxConn);
- running = true;
- Thread::start();
- }
- int CMPConnectThread::run()
- {
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
- #endif
- while (running)
- {
- ISocket *sock=NULL;
- SocketEndpoint peerEp;
- try
- {
- sock=listensock->accept(true, &peerEp);
- #ifdef _FULLTRACE
- StringBuffer s;
- SocketEndpoint ep1;
- if (sock)
- {
- sock->getPeerEndpoint(ep1);
- PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
- }
- #endif
- }
- catch (IException *e)
- {
- LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
- throw; // error handling TBD
- }
- if (sock)
- {
- try
- {
- sock->set_keep_alive(true);
- size32_t rd;
- SocketEndpoint _remoteep;
- SocketEndpoint hostep;
- ConnectHdr connectHdr;
- bool legacyClient = false;
- // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new
- traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
- if (0 == rd)
- {
- if (mpTraceLevel > 1)
- {
- // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint())
- StringBuffer errMsg("MP Connect Thread: connect with no msg received, assumed port monitor check");
- PROGLOG("%s", errMsg.str());
- }
- sock->close();
- sock->Release();
- continue;
- }
- else
- {
- if (rd == sizeof(connectHdr.id)) // legacy client
- {
- legacyClient = true;
- connectHdr.setRole(0); // unknown
- }
- else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr))
- {
- // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception
- StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");
- peerEp.getUrlStr(errMsg);
- FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
- sock->close();
- sock->Release();
- continue;
- }
- }
- if (whiteListCallback)
- {
- StringBuffer ipStr;
- peerEp.getIpText(ipStr);
- StringBuffer responseText; // filled if denied
- if (!whiteListCallback->isWhiteListed(ipStr, connectHdr.getRole(), &responseText))
- {
- Owned<IException> e = makeStringException(-1, responseText);
- OWARNLOG(e, nullptr);
- if (legacyClient)
- {
- /* NB: legacy client can't handle exception response
- * Acknowledge legacy connection, then close socket
- * The effect will be the client sees an MPERR_link_closed
- */
- size32_t reply = sizeof(connectHdr.id);
- sock->write(&reply, sizeof(reply));
- }
- else
- {
- MemoryBuffer mb;
- DelayedSizeMarker marker(mb);
- serializeException(e, mb);
- marker.write();
- sock->write(mb.toByteArray(), mb.length());
- }
- sock->close();
- sock->Release();
- continue;
- }
- }
- connectHdr.id[0].get(_remoteep);
- connectHdr.id[1].get(hostep);
- unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
- #ifdef _TRACE
- PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval);
- #endif
- if (_remoteep.isNull() || hostep.isNull())
- {
- StringBuffer errMsg;
- SocketEndpointV4 zeroTest[2];
- memset(zeroTest, 0x0, sizeof(zeroTest));
- if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id)))
- {
- // JCSMORE, I think _remoteep really must/should match a IP of this local host
- errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from ");
- peerEp.getUrlStr(errMsg);
- FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
- }
- else if (mpTraceLevel > 1)
- {
- // all zeros msg received
- errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
- peerEp.getUrlStr(errMsg);
- PROGLOG("%s", errMsg.str());
- }
- sock->close();
- sock->Release();
- continue;
- }
- #ifdef _FULLTRACE
- StringBuffer tmp1;
- _remoteep.getUrlStr(tmp1);
- tmp1.append(' ');
- hostep.getUrlStr(tmp1);
- PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
- #endif
- checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id));
- Owned<CMPChannel> channel = parent->lookup(_remoteep);
- if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval))
- {
- #ifdef _FULLTRACE
- PROGLOG("MP Connect Thread: lookup failed");
- #endif
- }
- else
- {
- #ifdef _TRACE
- StringBuffer str1;
- StringBuffer str2;
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).str());
- #endif
- }
- #ifdef _FULLTRACE
- PROGLOG("MP: Connect Thread: after write");
- #endif
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
- sock->close();
- e->Release();
- }
- try
- {
- sock->Release();
- }
- catch (IException *e)
- {
- FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
- e->Release();
- }
- }
- else
- {
- if (running)
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
- }
- }
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
- #endif
- return 0;
- }
- // --------------------------------------------------------
- class CMPChannelIterator
- {
- CMPServer &parent;
- CMPChannel *cur;
- public:
- CMPChannelIterator(CMPServer &_parent)
- : parent(_parent)
- {
- cur = NULL;
- }
- bool first()
- {
- cur = NULL;
- return parent.nextChannel(cur);
- }
- bool next()
- {
- return cur&&parent.nextChannel(cur);
- }
- bool isValid()
- {
- return cur!=NULL;
- }
- CMPChannel &query()
- {
- return *cur;
- }
- };
- //-----------------------------------------------------------------------------------
- CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
- {
- // there is an assumption here that no removes will be done within this loop
- CriticalBlock block(sect);
- SocketEndpoint ep = endpoint;
- CMPChannel *e=find(ep);
- // Check for freed channels
- if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
- e->reset();
- if (checkclosed) {
- checkclosed = false;
- CMPChannel *c = NULL;
- for (;;) {
- c = next(c);
- if (!c) {
- break;
- }
- if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
- removeExact(c);
- c = NULL;
- }
- }
- e=find(ep);
- }
- if (!e) {
- e = new CMPChannel(this,ep);
- add(*e);
- }
- return LINK(e);
- }
- CMPServer::CMPServer(unsigned __int64 _role, unsigned _port)
- {
- RTsalt=0xff;
- role = _role;
- port = 0; // connectthread tells me what port it actually connected on
- checkclosed = false;
- connectthread = new CMPConnectThread(this, _port);
- selecthandler = createSocketSelectHandler();
- pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
- pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
- forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
- multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
- broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
- userpackethandler = new UserPacketHandler(this); // default
- notifyclosedthread = new CMPNotifyClosedThread(this);
- notifyclosedthread->start();
- selecthandler->start();
- rettag = (int)TAG_REPLY_BASE; // NB negative
- SocketEndpoint ep(port); // NB port set by connectthread constructor
- myNode = createINode(ep);
- }
- CMPServer::~CMPServer()
- {
- #ifdef _TRACEORPHANS
- StringBuffer buf;
- getReceiveQueueDetails(buf);
- if (buf.length())
- LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
- #endif
- _releaseAll();
- selecthandler->stop(true);
- selecthandler->Release();
- notifyclosedthread->stop();
- notifyclosedthread->Release();
- connectthread->Release();
-
- delete pingpackethandler;
- delete pingreplypackethandler;
- delete forwardpackethandler;
- delete multipackethandler;
- delete broadcastpackethandler;
- delete userpackethandler;
- ::Release(myNode);
- }
- void CMPServer::checkTagOK(mptag_t tag)
- {
- if ((int)tag<=(int)TAG_REPLY_BASE) {
- int dif = (int)TAG_REPLY_BASE-(int)tag;
- if (dif%16!=RTsalt) {
- ERRLOG("**Invalid MP tag used");
- PrintStackReport();
- }
- }
- }
- bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
- {
- checkTagOK(tag);
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- CMessageBuffer *result;
- const SocketEndpoint *ep;
- SocketEndpoint closedEp; // used if receiving on RANK_ALL
- mptag_t tag;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
- bool notify(CMessageBuffer *msg)
- {
- if ((tag==TAG_ALL)||(tag==msg->getTag())) {
- const SocketEndpoint &senderep = msg->getSender();
- if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
- if (msg->getReplyTag()==TAG_CANCEL)
- delete msg;
- else
- result = msg;
- return true;
- }
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &_closedEp) // called when connection closed
- {
- if (NULL == ep) { // ep is NULL if receiving on RANK_ALL
- closedEp = _closedEp;
- ep = &closedEp; // used for abort info
- aborted = true;
- return true;
- }
- else if (ep->equals(_closedEp)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag);
- if (receiveq.wait(nfy,false,tm)&&nfy.result) {
- mbuf.transferFrom(*nfy.result);
- delete nfy.result;
- return true;
- }
- if (nfy.aborted) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
- throw e;
- }
- return false;
- }
- void CMPServer::flush(mptag_t tag)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- mptag_t tag;
- Cnfy(mptag_t _tag) { tag = _tag; }
- bool notify(CMessageBuffer *msg)
- {
- return (tag==TAG_ALL)||(tag==msg->getTag());
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- return false;
- }
- } nfy(tag);
- unsigned count = receiveq.flush(nfy);
- if (count)
- PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
- }
- void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
- {
- CMessageBuffer *cancelmsg = new CMessageBuffer(0);
- SocketEndpoint send;
- if (ep)
- send = *ep;
- cancelmsg->init(send,tag,TAG_CANCEL);
- getReceiveQ().enqueue(cancelmsg);
- }
- unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
- {
- class Cnfy: public CBufferQueueNotify
- {
- public:
- bool aborted;
- SocketEndpoint &sender;
- const SocketEndpoint *ep;
- mptag_t tag;
- bool cancel;
- unsigned count;
- Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
- {
- ep = _ep;
- tag = _tag;
- cancel = false;
- aborted = false;
- count = 0;
- }
- bool notify(CMessageBuffer *msg)
- {
- if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
- ((ep==NULL)||ep->equals(msg->getSender()))) {
- if (count==0) {
- sender = msg->getSender();
- cancel = (msg->getReplyTag()==TAG_CANCEL);
- }
- count++;
- return true;
- }
- return false;
- }
- bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
- {
- if (ep&&ep->equals(closedep)) {
- aborted = true;
- return true;
- }
- return false;
- }
- } nfy(ep,tag,sender);
- if (receiveq.wait(nfy,true,tm)) {
- return nfy.cancel?0:nfy.count;
- }
- if (nfy.aborted) {
- #ifdef _TRACELINKCLOSED
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
- PrintStackReport();
- #endif
- IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
- throw e;
- }
- return 0;
- }
- void CMPServer::start()
- {
- connectthread->startPort(getPort());
- }
- void CMPServer::stop()
- {
- selecthandler->stop(true);
- connectthread->stop();
- CMPChannel *c = NULL;
- for (;;) {
- c = (CMPChannel *)next(c);
- if (!c)
- break;
- c->closeSocket();
- }
- }
- void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->addConnectionMonitor(monitor);
- }
- void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
- {
- // called in critical section CMPServer::sect
- notifyclosedthread->removeConnectionMonitor(monitor);
- }
- void CMPServer::onAdd(void *)
- {
- // not used
- }
- void CMPServer::onRemove(void *e)
- {
- CMPChannel &elem=*(CMPChannel *)e;
- elem.Release();
- }
- unsigned CMPServer::getHashFromElement(const void *e) const
- {
- const CMPChannel &elem=*(const CMPChannel *)e;
- return elem.remoteep.hash(0);
- }
- unsigned CMPServer::getHashFromFindParam(const void *fp) const
- {
- return ((const SocketEndpoint*)fp)->hash(0);
- }
- const void * CMPServer::getFindParam(const void *p) const
- {
- const CMPChannel &elem=*(const CMPChannel *)p;
- return &elem.remoteep;
- }
- bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
- }
- bool CMPServer::nextChannel(CMPChannel *&cur)
- {
- CriticalBlock block(sect);
- cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
- return cur!=NULL;
- }
- void CMPServer::notifyClosed(SocketEndpoint &ep, bool trace)
- {
- #ifdef _TRACEMPSERVERNOTIFYCLOSED
- if (trace)
- {
- StringBuffer url;
- LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
- PrintStackReport();
- }
- #endif
- notifyclosedthread->notify(ep);
- }
- // --------------------------------------------------------
- class CInterCommunicator: public IInterCommunicator, public CInterface
- {
- CMPServer *parent;
- CriticalSection verifysect;
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (!dst)
- return false;
- if (dst->equals(queryMyNode())) {
- CMessageBuffer *msg = new CMessageBuffer();
- mptag_t reply = mbuf.getReplyTag();
- msg->transferFrom(mbuf);
- msg->init(dst->endpoint(),tag,reply);
- parent->getReceiveQ().enqueue(msg);
- mbuf.clear(); // for consistent semantics
- return true;
- }
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(dst->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- mbuf.clear(); // for consistent semantics
- return true;
- }
- bool verifyConnection(INode *node, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel->verifyConnection(tm,true);
- }
- void verifyAll(StringBuffer &log)
- {
- CMPChannelIterator iter(*parent);
- if (iter.first()) {
- do {
- CriticalBlock block(verifysect);
- CTimeMon tm(5000);
- CMPChannel &channel = iter.query();
- if (!channel.isClosed()) {
- channel.queryEpStr(log).append(' ');
- if (channel.verifyConnection(tm,false))
- log.append("OK\n");
- else
- log.append("FAILED\n");
- }
- }
- while (iter.next());
- }
- }
- bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon tm(timeout);
- rank_t myrank = group->rank(parent->queryMyNode());
- {
- ForEachNodeInGroup(rank,*group) {
- bool doverify;
- if (duplex)
- doverify = (myrank!=rank);
- else if ((rank&1)==(myrank&1))
- doverify = (myrank>rank);
- else
- doverify = (myrank<rank);
- if (doverify) {
- Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (!channel->verifyConnection(tm,true)) {
- return false;
- }
- }
- }
- }
- if (!duplex) {
- ForEachNodeInGroup(rank,*group) {
- bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
- if (doverify) {
- Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
- while (!channel->verifyConnection(tm,false)) {
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- CriticalUnblock unblock(verifysect);
- Sleep(100);
- }
- }
- }
- }
- return true;
- }
- unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
- {
- if (sender)
- *sender = NULL;
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = createINode(res);
- return ret;
- }
- return 0;
- }
-
- bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
- {
- if (sender)
- *sender = NULL;
- CTimeMon tm(timeout);
- for (;;)
- {
- try
- {
- if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
- {
- if (sender)
- *sender = createINode(mbuf.getSender());
- return true;
- }
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (src && (ep == src->endpoint()))
- throw;
- // ignoring closed endpoint
- e->Release();
- // loop around and recv again
- }
- }
- }
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(dst);
- mptag_t replytag = parent->createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,dst,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
- {
- Owned<INode> dst(createINode(mbuff.getSender()));
- return send(mbuff,dst,mbuff.getReplyTag(),timeout);
- }
- void cancel(INode *src, mptag_t tag)
- {
- parent->cancel(src?&src->endpoint():NULL,tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- channel->closeSocket();
- parent->removeChannel(channel);
- }
- CInterCommunicator(CMPServer *_parent)
- {
- parent = _parent;
- }
- ~CInterCommunicator()
- {
- }
- };
- class CCommunicator: public ICommunicator, public CInterface
- {
- IGroup *group;
- CMPServer *parent;
- bool outer;
- rank_t myrank;
- CriticalSection verifysect;
- const SocketEndpoint &queryEndpoint(rank_t rank)
- {
- return group->queryNode(rank).endpoint();
- }
- CMPChannel *getChannel(rank_t rank)
- {
- return parent->lookup(queryEndpoint(rank));
- }
- public:
- IMPLEMENT_IINTERFACE;
- bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
- {
- // send does not corrupt mbuf
- if (dstrank==RANK_NULL)
- return false;
- if (dstrank==myrank) {
- CMessageBuffer *msg = mbuf.clone();
- // change sender
- msg->init(parent->queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
- parent->getReceiveQ().enqueue(msg);
- }
- else {
- CTimeMon tm(timeout);
- rank_t endrank;
- if (dstrank==RANK_ALL) {
- send(mbuf,myrank,tag,timeout);
- dstrank = RANK_ALL_OTHER;
- }
- if (dstrank==RANK_ALL_OTHER) {
- dstrank = 0;
- endrank = group->ordinality()-1;
- }
- else if (dstrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- dstrank = getRandom()%group->ordinality();
- } while (dstrank==myrank);
- }
- else {
- assertex(myrank!=0);
- dstrank = 0;
- }
- endrank = dstrank;
- }
- else
- endrank = dstrank;
- for (;dstrank<=endrank;dstrank++) {
- if (dstrank!=myrank) {
- Owned<CMPChannel> channel = getChannel(dstrank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
- return false;
- }
- }
- }
- return true;
- }
- void barrier(void)
- {
- #ifdef _TRACE
- DBGLOG("MP: barrier enter");
- #endif
- /*
- * Use the dissemination algorithm described in:
- * Debra Hensgen, Raphael Finkel, and Udi Manbet, "Two Algorithms for Barrier Synchronization,"
- * International Journal of Parallel Programming, 17(1):1-17, 1988.
- * It uses ceiling(lgp) steps. In step k, 0 <= k <= (ceiling(lgp)-1),
- * process i sends to process (i + 2^k) % p and receives from process (i - 2^k + p) % p.
- */
- int numranks = group->ordinality();
- CMessageBuffer mb;
- rank_t r;
- int mask = 0x1;
- while (mask < numranks)
- {
- int dst = (myrank + mask) % numranks;
- int src = (myrank - mask + numranks) % numranks;
- #ifdef _TRACE
- DBGLOG("MP: barrier: send to %d, recv from %d", dst, src);
- #endif
- // NOTE: MPI method MUST use sendrecv so as to not send/recv deadlock ...
- mb.clear();
- mb.append("MPTAG_BARRIER");
- bool oks = send(mb,dst,MPTAG_BARRIER,120000);
- mb.clear();
- bool okr = recv(mb,src,MPTAG_BARRIER,&r);
- if (!oks && !okr)
- {
- DBGLOG("MP: barrier: Error sending or recving");
- break;
- }
- mask <<= 1;
- }
- #ifdef _TRACE
- DBGLOG("MP: barrier leave");
- #endif
- }
- bool verifyConnection(rank_t rank, unsigned timeout)
- {
- CriticalBlock block(verifysect);
- assertex(rank!=RANK_RANDOM);
- assertex(rank!=RANK_ALL);
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = getChannel(rank);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- return channel->verifyConnection(tm,true);
- }
- bool verifyAll(bool duplex, unsigned totalTimeout, unsigned perConnectionTimeout)
- {
- CriticalBlock block(verifysect);
- CTimeMon totalTM(totalTimeout);
- Semaphore sem;
- sem.signal(getAffinityCpus());
- std::atomic<bool> abort{false};
-
- auto verifyConnWithConnect = [&](unsigned rank, unsigned timeout)
- {
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = getChannel(rank);
- return channel->verifyConnection(tm, true);
- };
- auto verifyConnWithoutConnect = [&](unsigned rank, unsigned timeout)
- {
- CTimeMon tm(timeout);
- while (true)
- {
- Owned<CMPChannel> channel = getChannel(rank);
- if (channel->verifyConnection(tm, false))
- return true;
- if (abort || tm.timedout())
- return false;
- Sleep(100);
- }
- };
- auto threadedVerifyConnectFunc = [&](rank_t rank, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
- {
- // NB: running because took (via wait()) a semaphore slot, restore it at end of scope
- struct RestoreSlot
- {
- Semaphore &sem;
- RestoreSlot(Semaphore &_sem) : sem(_sem) { }
- ~RestoreSlot() { sem.signal(); }
- } restoreSlot(sem);
- unsigned timeoutMs;
- if (totalTM.timedout(&timeoutMs) || abort)
- return false;
- if (perConnectionTimeout && (perConnectionTimeout < timeoutMs))
- timeoutMs = perConnectionTimeout;
- if (!connectFunc(rank, timeoutMs))
- {
- abort = true; // ensure verifyFunc knows before release slot, to prevent other thread being launched
- return false;
- }
- return true;
- };
- auto verifyFunc = [&](std::function<bool (unsigned rank)> isRankToVerifyFunc, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
- {
- std::vector<std::future<bool>> results;
- for (rank_t rank=0; rank<group->ordinality(); rank++)
- {
- if (isRankToVerifyFunc(rank))
- {
- // check timeout before and after sem.wait
- // NB: sem.wait if successful, takes a slot which is restored by the thread when it is done
- unsigned remaining;
- if (totalTM.timedout(&remaining) || !sem.wait(remaining) || totalTM.timedout(&remaining))
- {
- abort = true;
- break;
- }
- else if (abort)
- break;
- results.push_back(std::async(std::launch::async, threadedVerifyConnectFunc, rank, connectFunc));
- }
- }
- bool res = true;
- for (auto &f: results)
- {
- if (!f.get())
- res = false;
- }
- return res && !abort;
- };
- if (duplex)
- return verifyFunc([this](rank_t rank) { return rank != myrank; }, verifyConnWithConnect);
- else
- {
- if (!verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank > rank) : (myrank < rank); }, verifyConnWithConnect))
- return false;
- return verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank < rank) : (myrank > rank); }, verifyConnWithoutConnect);
- }
- }
- unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
- {
- assertex(srcrank!=RANK_NULL);
- SocketEndpoint res;
- CTimeMon tm(timeout);
- unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
- if (ret!=0) {
- if (sender)
- *sender = group->rank(res);
- return ret;
- }
- if (sender)
- *sender = RANK_NULL;
- return 0;
- }
- bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex(srcrank!=RANK_NULL);
- const SocketEndpoint *srcep=NULL;
- if (srcrank==RANK_ALL) {
- if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
- srcep = &queryEndpoint(0);
- }
- else
- srcep = &queryEndpoint(srcrank);
- CTimeMon tm(timeout);
- for (;;)
- {
- try
- {
- if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
- {
- if (sender)
- *sender = group->rank(mbuf.getSender());
- return true;
- }
- if (sender)
- *sender = RANK_NULL;
- return false;
- }
- catch (IMP_Exception *e)
- {
- if (MPERR_link_closed != e->errorCode())
- throw;
- const SocketEndpoint &ep = e->queryEndpoint();
- if (RANK_NULL != group->rank(ep))
- throw;
- // ignoring closed endpoint from outside the communicator group
- e->Release();
- // loop around and recv again
- }
- }
- }
-
- void flush(mptag_t tag)
- {
- parent->flush(tag);
- }
- IGroup &queryGroup() { return *group; }
- IGroup *getGroup() { return LINK(group); }
- bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
- {
- assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
- if (sendrank==RANK_RANDOM) {
- if (group->ordinality()>1) {
- do {
- sendrank = getRandom()%group->ordinality();
- } while (sendrank==myrank);
- }
- else {
- assertex(myrank!=0);
- sendrank = 0;
- }
- }
- mptag_t replytag = parent->createReplyTag();
- CTimeMon tm(timeout);
- mbuff.setReplyTag(replytag);
- unsigned remaining;
- if (tm.timedout(&remaining))
- return false;
- if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
- return false;
- mbuff.clear();
- return recv(mbuff,sendrank,replytag,NULL,remaining);
- }
- bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
- {
- mptag_t replytag = mbuf.getReplyTag();
- rank_t dstrank = group->rank(mbuf.getSender());
- if (dstrank!=RANK_NULL) {
- if (send (mbuf, dstrank, replytag,timeout)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
-
- CTimeMon tm(timeout);
- Owned<CMPChannel> channel = parent->lookup(mbuf.getSender());
- unsigned remaining;
- if (tm.timedout(&remaining)) {
- return false;
- }
- if (channel->send(mbuf,replytag,TAG_NULL,tm, true)) {
- mbuf.setReplyTag(TAG_NULL);
- return true;
- }
- return false;
- }
- void cancel(rank_t srcrank, mptag_t tag)
- {
- assertex(srcrank!=RANK_NULL);
- parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
- }
- void disconnect(INode *node)
- {
- CriticalBlock block(verifysect);
- Owned<CMPChannel> channel = parent->lookup(node->endpoint());
- channel->closeSocket();
- parent->removeChannel(channel);
- }
- virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) const override
- {
- Owned<CMPChannel> channel = parent->lookup(sender);
- assertex(channel);
- return channel->queryPeerEp();
- }
- CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
- {
- outer = _outer;
- parent = _parent;
- group = LINK(_group);
- myrank = group->rank(parent->queryMyNode());
- }
- ~CCommunicator()
- {
- group->Release();
- }
- };
- // Additional CMPServer methods
- ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
- {
- return new CCommunicator(this,group,outer);
- }
- ///////////////////////////////////
- IMPServer *startNewMPServer(unsigned port)
- {
- assertex(sizeof(PacketHeader)==32);
- CMPServer *mpServer = new CMPServer(0, port);
- mpServer->start();
- return mpServer;
- }
- class CGlobalMPServer : public CMPServer
- {
- int nestLevel;
- bool paused;
- IInterCommunicator *worldcomm;
- public:
- static CriticalSection sect;
- CGlobalMPServer(unsigned __int64 _role, unsigned _port) : CMPServer(_role, _port)
- {
- worldcomm = NULL;
- nestLevel = 0;
- }
- ~CGlobalMPServer()
- {
- ::Release(worldcomm);
- }
- IInterCommunicator &queryWorldCommunicator()
- {
- if (!worldcomm)
- worldcomm = new CInterCommunicator(this);
- return *worldcomm;
- }
- unsigned incNest() { return ++nestLevel; }
- unsigned decNest() { return --nestLevel; }
- unsigned queryNest() { return nestLevel; }
- bool isPaused() const { return paused; }
- void setPaused(bool onOff) { paused = onOff; }
- };
- CriticalSection CGlobalMPServer::sect;
- static CGlobalMPServer *globalMPServer;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- globalMPServer = NULL;
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(globalMPServer);
- }
- void startMPServer(unsigned __int64 role, unsigned port, bool paused)
- {
- assertex(sizeof(PacketHeader)==32);
- CriticalBlock block(CGlobalMPServer::sect);
- if (NULL == globalMPServer)
- {
- globalMPServer = new CGlobalMPServer(role, port);
- initMyNode(globalMPServer->getPort());
- }
- if (0 == globalMPServer->queryNest())
- {
- if (paused)
- {
- globalMPServer->setPaused(paused);
- return;
- }
- queryLogMsgManager()->setPort(globalMPServer->getPort());
- globalMPServer->start();
- globalMPServer->setPaused(false);
- }
- globalMPServer->incNest();
- }
- void startMPServer(unsigned port, bool paused)
- {
- startMPServer(0, port, paused);
- }
- void stopMPServer()
- {
- CGlobalMPServer *_globalMPServer = NULL;
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (NULL == globalMPServer)
- return;
- if (0 == globalMPServer->decNest())
- {
- stopLogMsgReceivers();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
- #endif
- _globalMPServer = globalMPServer;
- globalMPServer = NULL;
- }
- }
- if (NULL == _globalMPServer)
- return;
- _globalMPServer->stop();
- _globalMPServer->Release();
- #ifdef _TRACE
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
- #endif
- CriticalBlock block(CGlobalMPServer::sect);
- initMyNode(0);
- }
- bool hasMPServerStarted()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- return globalMPServer != NULL;
- }
- IInterCommunicator &queryWorldCommunicator()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- return globalMPServer->queryWorldCommunicator();
- }
- mptag_t createReplyTag()
- {
- assertex(globalMPServer);
- return globalMPServer->createReplyTag();
- }
- ICommunicator *createCommunicator(IGroup *group, bool outer)
- {
- assertex(globalMPServer);
- return globalMPServer->createCommunicator(group, outer);
- }
- StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (globalMPServer)
- globalMPServer->getReceiveQueueDetails(buf);
- return buf;
- }
- void addMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- globalMPServer->addConnectionMonitor(monitor);
- }
- void removeMPConnectionMonitor(IConnectionMonitor *monitor)
- {
- CriticalBlock block(CGlobalMPServer::sect);
- if (globalMPServer)
- globalMPServer->removeConnectionMonitor(monitor);
- }
- IMPServer *getMPServer()
- {
- CriticalBlock block(CGlobalMPServer::sect);
- assertex(globalMPServer);
- return LINK(globalMPServer);
- }
- void registerSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.append((unsigned)handle);
- }
- void unregisterSelfDestructChildProcess(HANDLE handle)
- {
- CriticalBlock block(childprocesssect);
- if (handle!=(HANDLE)-1)
- childprocesslist.zap((unsigned)handle);
- }
|