mpcomm.cpp 107 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl DECL_EXPORT
  14. /* TBD
  15. lost packet disposal
  16. synchronous send
  17. connection protocol (HRPC);
  18. look at all timeouts
  19. */
  20. #include <future>
  21. #include <vector>
  22. #include "platform.h"
  23. #include "portlist.h"
  24. #include "jlib.hpp"
  25. #include <limits.h>
  26. #include "jsocket.hpp"
  27. #include "jmutex.hpp"
  28. #include "jutil.hpp"
  29. #include "jthread.hpp"
  30. #include "jqueue.tpp"
  31. #include "jsuperhash.hpp"
  32. #include "jmisc.hpp"
  33. #include "jsecrets.hpp"
  34. #include "mpcomm.hpp"
  35. #include "mpbuff.hpp"
  36. #include "mputil.hpp"
  37. #include "mplog.hpp"
  38. #include "securesocket.hpp"
  39. #ifdef _MSC_VER
  40. #pragma warning (disable : 4355)
  41. #endif
  42. //#define _TRACE
  43. //#define _FULLTRACE
  44. //#define _TRACEMPSERVERNOTIFYCLOSED
  45. #define _TRACEORPHANS
  46. #define REFUSE_STALE_CONNECTION
  47. #define MP_PROTOCOL_VERSION 0x102
  48. #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
  49. // These should really be configurable
  50. #define CANCELTIMEOUT 1000 // 1 sec
  51. #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
  52. #define CONNECT_READ_TIMEOUT (10*1000) // 10 seconds. NB: used by connect readtms loop (see loopCnt)
  53. #define CONNECT_TIMEOUT_INTERVAL 1000 // 1 sec
  54. #define CONNECT_RETRYCOUNT 180 // Overall max connect time is = CONNECT_RETRYCOUNT * CONNECT_READ_TIMEOUT
  55. #define CONNECT_TIMEOUT_MINSLEEP 2000 // random range: CONNECT_TIMEOUT_MINSLEEP to CONNECT_TIMEOUT_MAXSLEEP milliseconds
  56. #define CONNECT_TIMEOUT_MAXSLEEP 5000
  57. #define CONFIRM_TIMEOUT (90*1000) // 1.5 mins
  58. #define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
  59. #define TRACESLOW_THRESHOLD 1000 // 1 sec
  60. #define VERIFY_DELAY (1*60*1000) // 1 Minute
  61. #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
  62. #define DIGIT1 U64C(0x10000000000) // (256ULL*256ULL*256ULL*65536ULL)
  63. #define DIGIT2 U64C(0x100000000) // (256ULL*256ULL*65536ULL)
  64. #define DIGIT3 U64C(0x1000000) // (256ULL*65536ULL)
  65. #define DIGIT4 U64C(0x10000) // (65536ULL)
  66. #define _TRACING
  67. static CriticalSection childprocesssect;
  68. #ifdef _WIN32
  69. static Unsigned64Array childprocesslist;
  70. #else
  71. static UnsignedArray childprocesslist;
  72. #endif
  73. // IPv6 TBD
  74. struct SocketEndpointV4
  75. {
  76. byte ip[4];
  77. unsigned short port;
  78. SocketEndpointV4() {};
  79. SocketEndpointV4(const SocketEndpoint &val) { set(val); }
  80. void set(const SocketEndpoint &val)
  81. {
  82. port = val.port;
  83. if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
  84. IPV6_NOT_IMPLEMENTED();
  85. }
  86. void get(SocketEndpoint &val)
  87. {
  88. val.setNetAddress(sizeof(ip),&ip);
  89. val.port = port;
  90. }
  91. StringBuffer & getUrlStr(StringBuffer &val)
  92. {
  93. SocketEndpoint s;
  94. this->get(s);
  95. return s.getUrlStr(val);
  96. }
  97. };
  98. class PacketHeader // standard packet header - no virtuals
  99. {
  100. public:
  101. static unsigned nextseq;
  102. static unsigned lasttick;
  103. void initseq()
  104. {
  105. sequence = msTick();
  106. lasttick = sequence;
  107. if (sequence-nextseq>USHRT_MAX)
  108. sequence = nextseq++;
  109. else
  110. nextseq = sequence+1;
  111. }
  112. PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
  113. {
  114. size = _size;
  115. tag = _tag;
  116. sender.set(_sender);
  117. target.set(_target);
  118. replytag = _replytag;
  119. flags = 0;
  120. version = MP_PROTOCOL_VERSION;
  121. initseq();
  122. }
  123. PacketHeader() {}
  124. size32_t size; // 0 total packet size
  125. mptag_t tag; // 4 packet tag (kind)
  126. unsigned short version; // 8 protocol version
  127. unsigned short flags; // 10 flags
  128. SocketEndpointV4 sender; // 12 who sent
  129. SocketEndpointV4 target; // 18 who destined for
  130. mptag_t replytag; // 24 used for reply
  131. unsigned sequence; // 28 packet type dependant
  132. // Total 32
  133. void setMessageFields(CMessageBuffer &mb)
  134. {
  135. SocketEndpoint ep;
  136. sender.get(ep);
  137. mb.init(ep,tag,replytag);
  138. }
  139. };
  140. #if 0
  141. class PacketHeaderV6 : public PacketHeader
  142. {
  143. unsigned senderex[4]; // 32
  144. unsigned targetex[4]; // 48
  145. // total 64
  146. void setMessageFields(CMessageBuffer &mb)
  147. {
  148. SocketEndpoint ep;
  149. ep.setNetAddress(sizeof(senderex),&senderex);
  150. ep.port = sender.port;
  151. mb.init(ep,tag,replytag);
  152. }
  153. };
  154. #endif
  155. unsigned PacketHeader::nextseq=0;
  156. unsigned PacketHeader::lasttick=0;
  157. #define MINIMUMPACKETSIZE sizeof(PacketHeader)
  158. #define MAXDATAPERPACKET 50000
  159. struct MultiPacketHeader
  160. {
  161. mptag_t tag;
  162. size32_t ofs;
  163. size32_t size;
  164. unsigned idx;
  165. unsigned numparts;
  166. size32_t total;
  167. StringBuffer &getDetails(StringBuffer &out) const
  168. {
  169. out.append("MultiPacketHeader: ");
  170. out.append("tag=").append((unsigned)tag);
  171. out.append(",ofs=").append(ofs);
  172. out.append(",size=").append(size);
  173. out.append(",idx=").append(idx);
  174. out.append(",numparts=").append(numparts);
  175. out.append(",total=").append(total);
  176. return out;
  177. }
  178. };
  179. //
  180. class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
  181. {
  182. public:
  183. IMPLEMENT_IINTERFACE;
  184. CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
  185. {
  186. }
  187. StringBuffer & errorMessage(StringBuffer &str) const
  188. {
  189. StringBuffer tmp;
  190. switch (error) {
  191. case MPERR_ok: str.append("OK"); break;
  192. case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).str()); break;
  193. case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
  194. case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).str()); break;
  195. // process crashes (segv, etc.) often cause this exception which is logged and can be misleading
  196. // change it from "MP link closed" to something more helpful
  197. case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getUrlStr(tmp).str()); break;
  198. }
  199. return str;
  200. }
  201. int errorCode() const { return error; }
  202. MessageAudience errorAudience() const
  203. {
  204. return MSGAUD_user;
  205. }
  206. virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
  207. private:
  208. MessagePassingError error;
  209. SocketEndpoint endpoint;
  210. };
  211. class CBufferQueueNotify
  212. {
  213. public:
  214. virtual bool notify(CMessageBuffer *)=0;
  215. virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
  216. };
  217. class CBufferQueueWaiting
  218. {
  219. public:
  220. enum QWenum { QWcontinue, QWdequeue, QWprobe };
  221. Semaphore sem;
  222. CBufferQueueNotify &waiting;
  223. bool probe;
  224. CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
  225. QWenum notify(CMessageBuffer *b)
  226. {
  227. // check this for DLL unloaded TBD
  228. if (waiting.notify(b)) {
  229. sem.signal();
  230. return probe?QWprobe:QWdequeue;
  231. }
  232. return QWcontinue;
  233. }
  234. QWenum notifyClosed(SocketEndpoint &ep)
  235. {
  236. // check this for DLL unloaded TBD
  237. if (waiting.notifyClosed(ep)) {
  238. sem.signal();
  239. return QWdequeue;
  240. }
  241. return QWcontinue;
  242. }
  243. };
  244. typedef CopyReferenceArrayOf<CBufferQueueWaiting> CWaitingArray;
  245. class CBufferQueue
  246. {
  247. QueueOf<CMessageBuffer, false> received;
  248. CWaitingArray waiting;
  249. CriticalSection sect;
  250. public:
  251. CBufferQueue()
  252. {
  253. }
  254. void enqueue(CMessageBuffer *b)
  255. {
  256. CriticalBlock block(sect);
  257. unsigned iter=0;
  258. for (;;) {
  259. ForEachItemIn(i,waiting) {
  260. CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
  261. if (r!=CBufferQueueWaiting::QWcontinue) {
  262. waiting.remove(i);
  263. if (r==CBufferQueueWaiting::QWdequeue)
  264. return;
  265. //CBufferQueueWaiting::QWprobe
  266. break;
  267. }
  268. }
  269. if (b->getReplyTag() != TAG_CANCEL)
  270. break;
  271. if (iter++==10) {
  272. delete b;
  273. return;
  274. }
  275. CriticalUnblock unblock(sect);
  276. Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
  277. }
  278. received.enqueue(b);
  279. }
  280. bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
  281. {
  282. CriticalBlock block(sect);
  283. bool probegot = false;
  284. ForEachQueueItemIn(i,received) {
  285. if (nfy.notify(received.item(i))) {
  286. if (probe) {
  287. probegot = true;
  288. }
  289. else {
  290. received.dequeue(i);
  291. return true;
  292. }
  293. }
  294. }
  295. if (probegot)
  296. return true;
  297. unsigned remaining;
  298. if (tm.timedout(&remaining))
  299. return false;
  300. CBufferQueueWaiting qwaiting(nfy,probe);
  301. waiting.append(qwaiting);
  302. sect.leave();
  303. bool ok = qwaiting.sem.wait(remaining);
  304. sect.enter();
  305. if (!ok) {
  306. ok = qwaiting.sem.wait(0);
  307. if (!ok)
  308. waiting.zap(qwaiting);
  309. }
  310. return ok;
  311. }
  312. unsigned flush(CBufferQueueNotify &nfy)
  313. {
  314. unsigned count = 0;
  315. CriticalBlock block(sect);
  316. ForEachQueueItemInRev(i,received) {
  317. if (nfy.notify(received.item(i))) {
  318. count++;
  319. delete received.dequeue(i);
  320. }
  321. }
  322. return count;
  323. }
  324. void notifyClosed(SocketEndpoint &ep)
  325. {
  326. CriticalBlock block(sect);
  327. ForEachItemInRev(i,waiting) {
  328. CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
  329. if (r!=CBufferQueueWaiting::QWcontinue) {
  330. waiting.remove(i);
  331. }
  332. }
  333. }
  334. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  335. {
  336. CriticalBlock block(sect);
  337. ForEachQueueItemIn(i,received) {
  338. received.item(i)->getDetails(buf).append('\n');
  339. }
  340. return buf;
  341. }
  342. };
  343. static UnsignedShortArray freetags;
  344. static unsigned nextfreetag=0;
  345. unsigned short generateDynamicTag()
  346. {
  347. if (freetags.ordinality())
  348. return freetags.popGet();
  349. return nextfreetag++;
  350. }
  351. void releaseDynamicTag(unsigned short tag)
  352. {
  353. freetags.append(tag);
  354. }
  355. bool check_kernparam(const char *path, int *value)
  356. {
  357. #ifdef __linux__
  358. FILE *f = fopen(path,"r");
  359. char res[32];
  360. char *r = 0;
  361. if (f) {
  362. r = fgets(res, sizeof(res), f);
  363. fclose(f);
  364. if (r) {
  365. *value = atoi(r);
  366. return true;
  367. }
  368. }
  369. #endif
  370. return false;
  371. }
  372. bool check_somaxconn(int *val)
  373. {
  374. return check_kernparam("/proc/sys/net/core/somaxconn", val);
  375. }
  376. class CMPServer;
  377. class CMPChannel;
  378. class CMPConnectThread: public Thread
  379. {
  380. bool running;
  381. bool listen;
  382. ISocket *listensock;
  383. CMPServer *parent;
  384. int mpSoMaxConn;
  385. Owned<IAllowListHandler> allowListCallback;
  386. void checkSelfDestruct(void *p,size32_t sz);
  387. Owned<ISecureSocketContext> secureContextServer;
  388. public:
  389. CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen);
  390. ~CMPConnectThread()
  391. {
  392. ::Release(listensock);
  393. }
  394. int run();
  395. void startPort(unsigned short port);
  396. void stop()
  397. {
  398. if (running) {
  399. running = false;
  400. listensock->cancel_accept();
  401. if (!join(1000*60*5)) // should be pretty instant
  402. printf("CMPConnectThread::stop timed out\n");
  403. }
  404. }
  405. void installAllowListCallback(IAllowListHandler *_allowListCallback)
  406. {
  407. allowListCallback.set(_allowListCallback);
  408. }
  409. IAllowListHandler *queryAllowListCallback() const
  410. {
  411. return allowListCallback;
  412. }
  413. };
  414. class PingPacketHandler;
  415. class PingReplyPacketHandler;
  416. class MultiPacketHandler;
  417. class BroadcastPacketHandler;
  418. class ForwardPacketHandler;
  419. class UserPacketHandler;
  420. class CMPNotifyClosedThread;
  421. typedef SuperHashTableOf<CMPChannel,SocketEndpoint> CMPChannelHT;
  422. class CMPServer: private CMPChannelHT, implements IMPServer
  423. {
  424. byte RTsalt;
  425. ISocketSelectHandler *selecthandler;
  426. CMPConnectThread *connectthread;
  427. CBufferQueue receiveq;
  428. CMPNotifyClosedThread *notifyclosedthread;
  429. CriticalSection sect;
  430. protected:
  431. unsigned __int64 role;
  432. unsigned short port;
  433. public:
  434. bool checkclosed;
  435. bool tryReopenChannel = false;
  436. bool useTLS = false;
  437. unsigned mpTraceLevel = 0;
  438. // packet handlers
  439. PingPacketHandler *pingpackethandler; // TAG_SYS_PING
  440. PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
  441. ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
  442. MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
  443. BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
  444. UserPacketHandler *userpackethandler; // default
  445. IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
  446. CMPServer(unsigned __int64 _role, unsigned _port, bool _listen);
  447. ~CMPServer();
  448. void start();
  449. virtual void stop();
  450. unsigned short getPort() const { return port; }
  451. unsigned __int64 getRole() const { return role; }
  452. void setPort(unsigned short _port) { port = _port; }
  453. CMPChannel *lookup(const SocketEndpoint &remoteep);
  454. ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
  455. CBufferQueue &getReceiveQ() { return receiveq; }
  456. void checkTagOK(mptag_t tag);
  457. bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
  458. void flush(mptag_t tag);
  459. unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
  460. void cancel(const SocketEndpoint *ep, mptag_t tag);
  461. bool nextChannel(CMPChannel *&c);
  462. void addConnectionMonitor(IConnectionMonitor *monitor);
  463. void removeConnectionMonitor(IConnectionMonitor *monitor);
  464. void notifyClosed(SocketEndpoint &ep, bool trace);
  465. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  466. {
  467. return receiveq.getReceiveQueueDetails(buf);
  468. }
  469. void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
  470. protected:
  471. void onAdd(void *);
  472. void onRemove(void *e);
  473. unsigned getHashFromElement(const void *e) const;
  474. unsigned getHashFromFindParam(const void *fp) const;
  475. const void * getFindParam(const void *p) const;
  476. bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
  477. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
  478. CriticalSection replyTagSect;
  479. int rettag;
  480. INode *myNode;
  481. public:
  482. virtual mptag_t createReplyTag()
  483. {
  484. // these are short-lived so a simple increment will do (I think this is OK!)
  485. mptag_t ret;
  486. {
  487. CriticalBlock block(replyTagSect);
  488. if (RTsalt==0xff) {
  489. RTsalt = (byte)(getRandom()%16);
  490. rettag = (int)TAG_REPLY_BASE-RTsalt;
  491. }
  492. if (rettag>(int)TAG_REPLY_BASE) { // wrapped
  493. rettag = (int)TAG_REPLY_BASE-RTsalt;
  494. }
  495. ret = (mptag_t)rettag;
  496. rettag -= 16;
  497. }
  498. flush(ret);
  499. return ret;
  500. }
  501. virtual ICommunicator *createCommunicator(IGroup *group, bool outer);
  502. virtual INode *queryMyNode()
  503. {
  504. return myNode;
  505. }
  506. virtual void setOpt(MPServerOpts opt, const char *value)
  507. {
  508. switch (opt)
  509. {
  510. case mpsopt_channelreopen:
  511. {
  512. bool tf = (nullptr != value) ? strToBool(value) : false;
  513. PROGLOG("Setting ChannelReopen = %s", tf ? "true" : "false");
  514. tryReopenChannel = tf;
  515. break;
  516. }
  517. default:
  518. // ignore
  519. break;
  520. }
  521. }
  522. virtual void installAllowListCallback(IAllowListHandler *allowListCallback) override
  523. {
  524. connectthread->installAllowListCallback(allowListCallback);
  525. }
  526. virtual IAllowListHandler *queryAllowListCallback() const override
  527. {
  528. return connectthread->queryAllowListCallback();
  529. }
  530. };
  531. //===========================================================================
  532. class CMPNotifyClosedThread: public Thread
  533. {
  534. IArrayOf<IConnectionMonitor> connectionmonitors;
  535. CriticalSection conmonsect;
  536. SimpleInterThreadQueueOf<INode, false> workq;
  537. bool stopping;
  538. CMPServer *parent;
  539. CriticalSection stopsect;
  540. public:
  541. CMPNotifyClosedThread(CMPServer *_parent)
  542. : Thread("CMPNotifyClosedThread")
  543. {
  544. parent = _parent;
  545. stopping = false;
  546. }
  547. ~CMPNotifyClosedThread()
  548. {
  549. IArrayOf<IConnectionMonitor> todelete;
  550. CriticalBlock block(conmonsect);
  551. while (connectionmonitors.ordinality())
  552. todelete.append(connectionmonitors.popGet());
  553. }
  554. void addConnectionMonitor(IConnectionMonitor *monitor)
  555. {
  556. if (monitor)
  557. connectionmonitors.append(*LINK(monitor));
  558. }
  559. void removeConnectionMonitor(IConnectionMonitor *monitor)
  560. {
  561. // called in critical section CMPServer::sect
  562. if (monitor) {
  563. CriticalBlock block(conmonsect);
  564. connectionmonitors.zap(*monitor);
  565. }
  566. }
  567. int run()
  568. {
  569. for (;;) {
  570. try {
  571. Owned<INode> node = workq.dequeue();
  572. if (node->endpoint().isNull())
  573. break;
  574. SocketEndpoint ep = node->endpoint();
  575. parent->getReceiveQ().notifyClosed(ep);
  576. IArrayOf<IConnectionMonitor> toclose;
  577. {
  578. CriticalBlock block(conmonsect);
  579. ForEachItemIn(i1,connectionmonitors) {
  580. toclose.append(*LINK(&connectionmonitors.item(i1)));
  581. }
  582. }
  583. ForEachItemIn(i,toclose) {
  584. toclose.item(i).onClose(ep);
  585. }
  586. }
  587. catch (IException *e) {
  588. FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
  589. e->Release();
  590. }
  591. }
  592. return 0;
  593. }
  594. void stop()
  595. {
  596. {
  597. CriticalBlock block(stopsect);
  598. if (!stopping) {
  599. stopping = true;
  600. SocketEndpoint ep;
  601. workq.enqueue(createINode(ep));
  602. }
  603. }
  604. while (!join(1000*60*3))
  605. PROGLOG("CMPNotifyClosedThread join failed");
  606. }
  607. void notify(SocketEndpoint &ep)
  608. {
  609. CriticalBlock block(stopsect);
  610. if (!stopping&&!ep.isNull()) {
  611. if (workq.ordinality()>100)
  612. PROGLOG("MP: %d waiting to close",workq.ordinality());
  613. workq.enqueue(createINode(ep));
  614. }
  615. }
  616. };
  617. void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
  618. {
  619. dbgassertex(timeoutChkIntervalMs < timeoutMs);
  620. StringBuffer epStr;
  621. CCycleTimer readTmsTimer;
  622. unsigned intervalTimeoutMs = timeoutChkIntervalMs;
  623. for (;;)
  624. {
  625. try
  626. {
  627. sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
  628. break;
  629. }
  630. catch (IJSOCK_Exception *e)
  631. {
  632. if (JSOCKERR_graceful_close == e->errorCode())
  633. return;
  634. else if (JSOCKERR_timeout_expired != e->errorCode())
  635. throw;
  636. unsigned elapsedMs = readTmsTimer.elapsedMs();
  637. if (elapsedMs >= timeoutMs)
  638. throw;
  639. unsigned remainingMs = timeoutMs-elapsedMs;
  640. if (remainingMs < timeoutChkIntervalMs)
  641. intervalTimeoutMs = remainingMs;
  642. if (0 == epStr.length())
  643. {
  644. SocketEndpoint ep;
  645. sock->getPeerEndpoint(ep);
  646. ep.getUrlStr(epStr);
  647. }
  648. WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
  649. }
  650. }
  651. if (readTmsTimer.elapsedMs() >= TRACESLOW_THRESHOLD)
  652. {
  653. if (0 == epStr.length())
  654. {
  655. SocketEndpoint ep;
  656. sock->getPeerEndpoint(ep);
  657. ep.getUrlStr(epStr);
  658. }
  659. WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
  660. }
  661. }
  662. /* Legacy header sent id[2] only.
  663. * To remain backward compatible (when new MP clients are connecting to old Dali),
  664. * we send a regular empty PacketHeader as well that has the 'role' embedded within it,
  665. * in unused fields. TAG_SYS_BCAST is used as the message tag, because it is an
  666. * unused feature that all Dali's simply receive and delete.
  667. */
  668. struct ConnectHdr
  669. {
  670. ConnectHdr(const SocketEndpoint &hostEp, const SocketEndpoint &remoteEp, unsigned __int64 role)
  671. {
  672. id[0].set(hostEp);
  673. id[1].set(remoteEp);
  674. hdr.size = sizeof(PacketHeader);
  675. hdr.tag = TAG_SYS_BCAST;
  676. hdr.flags = 0;
  677. hdr.version = MP_PROTOCOL_VERSION;
  678. setRole(role);
  679. }
  680. ConnectHdr()
  681. {
  682. }
  683. SocketEndpointV4 id[2];
  684. PacketHeader hdr;
  685. inline void setRole(unsigned __int64 role)
  686. {
  687. hdr.replytag = (mptag_t) (role >> 32);
  688. hdr.sequence = (unsigned) (role & 0xffffffff);
  689. }
  690. inline unsigned __int64 getRole() const
  691. {
  692. return (((unsigned __int64)hdr.replytag)<<32) | ((unsigned __int64)hdr.sequence);
  693. }
  694. };
  695. class CMPPacketReader;
  696. class CMPChannel: public CInterface
  697. {
  698. ISocket *channelsock = nullptr;
  699. CMPServer *parent;
  700. Mutex sendmutex;
  701. Semaphore sendwaitingsig;
  702. unsigned sendwaiting = 0; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
  703. CriticalSection connectsect;
  704. CMPPacketReader *reader;
  705. bool master = false; // i.e. connected originally
  706. mptag_t multitag = TAG_NULL; // current multi send in progress
  707. bool closed = false;
  708. IArrayOf<ISocket> keptsockets;
  709. CriticalSection attachsect;
  710. unsigned __int64 attachaddrval = 0;
  711. SocketEndpoint attachep, attachPeerEp;
  712. std::atomic<unsigned> attachchk;
  713. protected: friend class CMPServer;
  714. SocketEndpoint remoteep;
  715. SocketEndpoint localep; // who the other end thinks I am
  716. protected: friend class CMPPacketReader;
  717. unsigned lastxfer;
  718. #ifdef _FULLTRACE
  719. unsigned startxfer;
  720. unsigned numiter;
  721. #endif
  722. Owned<ISecureSocketContext> secureContextClient;
  723. bool checkReconnect(CTimeMon &tm)
  724. {
  725. if (!parent->tryReopenChannel)
  726. return false;
  727. ::Release(channelsock);
  728. channelsock = nullptr;
  729. if (connect(tm))
  730. return true;
  731. WARNLOG("Failed to reconnect");
  732. return false;
  733. }
  734. bool connect(CTimeMon &tm)
  735. {
  736. // must be called from connectsect
  737. // also in sendmutex
  738. Owned<ISocket> newsock;
  739. unsigned retrycount = CONNECT_RETRYCOUNT;
  740. unsigned remaining;
  741. Owned<IException> exitException;
  742. while (!channelsock)
  743. {
  744. try
  745. {
  746. StringBuffer str;
  747. #ifdef _TRACE
  748. LOG(MCdebugInfo, unknownJob, "MP: connecting to %s role: %" I64F "u", remoteep.getUrlStr(str).str(), parent->getRole());
  749. #endif
  750. if (((int)tm.timeout)<0)
  751. remaining = CONNECT_TIMEOUT;
  752. else if (tm.timedout(&remaining))
  753. {
  754. #ifdef _FULLTRACE
  755. PROGLOG("MP: connect timed out 1");
  756. #endif
  757. return false;
  758. }
  759. if (remaining<10000)
  760. remaining = 10000; // 10s min granularity for MP
  761. newsock.setown(ISocket::connect_timeout(remoteep,remaining));
  762. #if defined(_USE_OPENSSL)
  763. if (parent->useTLS)
  764. {
  765. Owned<ISecureSocket> ssock = secureContextClient->createSecureSocket(newsock.getClear());
  766. int tlsTraceLevel = SSLogMin;
  767. if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
  768. tlsTraceLevel = SSLogMax;
  769. int status = ssock->secure_connect(tlsTraceLevel);
  770. if (status < 0)
  771. {
  772. ssock->close();
  773. exitException.setown(new CMPException(MPERR_connection_failed, remoteep));
  774. throw exitException.getLink();
  775. }
  776. newsock.setown(ssock.getClear());
  777. }
  778. #endif // OPENSSL
  779. newsock->set_keep_alive(true);
  780. #ifdef _FULLTRACE
  781. LOG(MCdebugInfo, unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
  782. #endif
  783. SocketEndpoint hostep;
  784. hostep.setLocalHost(parent->getPort());
  785. ConnectHdr connectHdr(hostep, remoteep, parent->getRole());
  786. unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
  787. #ifdef _TRACE
  788. PROGLOG("MP: connect addrval = %" I64F "u", addrval);
  789. #endif
  790. newsock->write(&connectHdr,sizeof(connectHdr));
  791. #ifdef _FULLTRACE
  792. StringBuffer tmp1;
  793. connectHdr.id[0].getUrlStr(tmp1);
  794. tmp1.append(' ');
  795. connectHdr.id[1].getUrlStr(tmp1);
  796. LOG(MCdebugInfo, unknownJob, "MP: connect after socket write %s",tmp1.str());
  797. #endif
  798. size32_t rd = 0;
  799. #ifdef _TRACE
  800. LOG(MCdebugInfo, unknownJob, "MP: connect after socket write, waiting for read");
  801. #endif
  802. // Wait for connection reply but also check for A<->B deadlock (where both processes are here
  803. // waiting for other side to send confirm) and decide who stops waiting based on address.
  804. // To be compatible with older versions of mplib which will not do this,
  805. // loop with short wait time and release CS to allow other side to proceed
  806. StringBuffer epStr;
  807. unsigned startMs = msTick();
  808. unsigned loopCnt = CONNECT_READ_TIMEOUT / CONNECT_TIMEOUT_INTERVAL + 1;
  809. #ifdef _TRACE
  810. PROGLOG("MP: loopCnt start = %u", loopCnt);
  811. #endif
  812. while (loopCnt-- > 0)
  813. {
  814. {
  815. CriticalBlock block(attachsect);
  816. #ifdef _TRACE
  817. PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", attachchk.load(), loopCnt);
  818. #endif
  819. if (attachchk > 0)
  820. {
  821. if (remoteep.equals(attachep))
  822. {
  823. #ifdef _TRACE
  824. PROGLOG("MP: deadlock situation [] attachaddrval = %" I64F "u addrval = %" I64F "u", attachaddrval, addrval);
  825. #endif
  826. if (attachaddrval < addrval)
  827. break;
  828. }
  829. }
  830. }
  831. rd = 0;
  832. MemoryBuffer replyMb;
  833. void *replyMem = replyMb.ensureCapacity(0x1000); // 4K - max size to allow for serialized exception
  834. try
  835. {
  836. newsock->readtms(replyMem, sizeof(rd), replyMb.capacity(), rd, CONNECT_TIMEOUT_INTERVAL);
  837. }
  838. catch (IException *e)
  839. {
  840. #ifdef _TRACE
  841. PROGLOG("MP: loop exception code = %d, loopCnt = %u", e->errorCode(), loopCnt);
  842. #endif
  843. if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
  844. ((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
  845. {
  846. if (tm.timedout(&remaining))
  847. {
  848. #ifdef _FULLTRACE
  849. EXCLOG(e,"MP: connect timed out 3");
  850. #endif
  851. e->Release();
  852. return false;
  853. }
  854. #ifdef _TRACE
  855. EXCLOG(e, "MP: Failed to connect");
  856. #endif
  857. if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
  858. { // don't bother retrying on async send
  859. e->Release();
  860. throw new CMPException(MPERR_connection_failed,remoteep);
  861. }
  862. // if other side closes, connect again
  863. if (e->errorCode() == JSOCKERR_graceful_close)
  864. {
  865. LOG(MCdebugInfo, unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
  866. e->Release();
  867. break;
  868. }
  869. e->Release();
  870. #ifdef _TRACE
  871. LOG(MCdebugInfo, unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
  872. #endif
  873. }
  874. else
  875. {
  876. if (0 == epStr.length())
  877. {
  878. SocketEndpoint ep;
  879. newsock->getPeerEndpoint(ep);
  880. ep.getUrlStr(epStr);
  881. }
  882. WARNLOG("MP: connect to: %s, stalled for %d ms so far", epStr.str(), msTick()-startMs);
  883. e->Release();
  884. }
  885. }
  886. #ifdef _FULLTRACE
  887. PROGLOG("MP: rd = %d", rd);
  888. #endif
  889. /* NB: legacy clients that don't handle the exception deserialization here
  890. * will see reply as success, so no clean error,
  891. * but will fail shortly afterwards since server connection is closed
  892. */
  893. if (rd > sizeof(rd)) // legacy clients will only ever send a reply of 0 or 4, if greater, then new client is replying with an exception
  894. {
  895. MemoryBuffer mb;
  896. mb.setBuffer(rd, replyMem, false);
  897. size32_t len;
  898. mb.read(len); // exception length
  899. if (len)
  900. {
  901. exitException.setown(deserializeException(mb));
  902. throw exitException.getLink();
  903. }
  904. break;
  905. }
  906. else if (rd != 0)
  907. {
  908. assertex(rd == sizeof(rd));
  909. break;
  910. }
  911. }
  912. #ifdef _TRACE
  913. LOG(MCdebugInfo, unknownJob, "MP: connect after socket read rd=%u, sizeof(connectHdr)=%lu", rd, sizeof(connectHdr));
  914. #endif
  915. if (rd)
  916. {
  917. unsigned elapsedMs = msTick() - startMs;
  918. if (elapsedMs >= TRACESLOW_THRESHOLD)
  919. {
  920. if (0 == epStr.length())
  921. {
  922. SocketEndpoint ep;
  923. newsock->getPeerEndpoint(ep);
  924. ep.getUrlStr(epStr);
  925. }
  926. WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
  927. }
  928. if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
  929. {
  930. #ifdef _TRACE
  931. LOG(MCdebugInfo, unknownJob, "MP: connected to %s",str.str());
  932. #endif
  933. lastxfer = msTick();
  934. closed = false;
  935. break;
  936. }
  937. }
  938. }
  939. catch (IException *e)
  940. {
  941. if (exitException)
  942. throw;
  943. if (tm.timedout(&remaining)) {
  944. #ifdef _FULLTRACE
  945. EXCLOG(e,"MP: connect timed out 2");
  946. #endif
  947. e->Release();
  948. return false;
  949. }
  950. #ifdef _TRACE
  951. EXCLOG(e, "MP: Failed to connect");
  952. #endif
  953. e->Release();
  954. if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
  955. IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
  956. throw e;
  957. }
  958. #ifdef _TRACE
  959. StringBuffer str;
  960. str.clear();
  961. LOG(MCdebugInfo, unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
  962. #endif
  963. }
  964. newsock.clear();
  965. {
  966. CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
  967. #ifdef _FULLTRACE
  968. PROGLOG("MP: before sleep");
  969. #endif
  970. // check often if channelsock was created from accept thread
  971. Sleep(50);
  972. unsigned totalt = CONNECT_TIMEOUT_MINSLEEP + getRandom() % (CONNECT_TIMEOUT_MAXSLEEP-CONNECT_TIMEOUT_MINSLEEP);
  973. unsigned startt = msTick();
  974. unsigned deltat = 0;
  975. while (deltat < totalt)
  976. {
  977. {
  978. CriticalBlock block(connectsect);
  979. if (channelsock)
  980. break;
  981. }
  982. deltat = msTick() - startt;
  983. Sleep(50);
  984. }
  985. #ifdef _FULLTRACE
  986. PROGLOG("MP: after sleep");
  987. #endif
  988. }
  989. }
  990. return true;
  991. }
  992. public:
  993. Semaphore pingsem;
  994. CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
  995. ~CMPChannel();
  996. void reset();
  997. bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval=0);
  998. bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
  999. {
  1000. Linked<ISocket> dest;
  1001. {
  1002. CriticalBlock block(connectsect);
  1003. if (closed) {
  1004. #ifdef _TRACELINKCLOSED
  1005. LOG(MCdebugInfo, unknownJob, "WritePacket closed on entry");
  1006. PrintStackReport();
  1007. #endif
  1008. if (!checkReconnect(tm))
  1009. throw new CMPException(MPERR_link_closed,remoteep);
  1010. }
  1011. if (!channelsock) {
  1012. if (!connect(tm)) {
  1013. #ifdef _FULLTRACE
  1014. LOG(MCdebugInfo, unknownJob, "WritePacket connect failed");
  1015. #endif
  1016. return false;
  1017. }
  1018. }
  1019. dest.set(channelsock);
  1020. }
  1021. try {
  1022. #ifdef _FULLTRACE
  1023. unsigned t1 = msTick();
  1024. #endif
  1025. if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
  1026. // if (tm.timeout!=MP_ASYNC_SEND) {
  1027. unsigned remaining;
  1028. if (tm.timedout(&remaining))
  1029. return false;
  1030. if (channelsock->wait_write(remaining)==0) {
  1031. return false;
  1032. }
  1033. if (tm.timedout())
  1034. return false;
  1035. }
  1036. // exception checking TBD
  1037. #ifdef _FULLTRACE
  1038. StringBuffer ep1;
  1039. StringBuffer ep2;
  1040. LOG(MCdebugInfo, unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
  1041. unsigned t2 = msTick();
  1042. #endif
  1043. unsigned n = 0;
  1044. const void *bufs[3];
  1045. size32_t sizes[3];
  1046. if (hdrsize) {
  1047. bufs[n] = hdr;
  1048. sizes[n++] = hdrsize;
  1049. }
  1050. if (hdr2size) {
  1051. bufs[n] = hdr2;
  1052. sizes[n++] = hdr2size;
  1053. }
  1054. if (bodysize) {
  1055. bufs[n] = body;
  1056. sizes[n++] = bodysize;
  1057. }
  1058. if (!dest) {
  1059. LOG(MCdebugInfo, unknownJob, "MP Warning: WritePacket unexpected NULL socket");
  1060. return false;
  1061. }
  1062. dest->write_multiple(n,bufs,sizes);
  1063. lastxfer = msTick();
  1064. #ifdef _FULLTRACE
  1065. LOG(MCdebugInfo, unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
  1066. #endif
  1067. }
  1068. catch (IException *e) {
  1069. FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
  1070. closeSocket(false, true);
  1071. throw;
  1072. }
  1073. return true;
  1074. }
  1075. bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
  1076. {
  1077. return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
  1078. }
  1079. bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
  1080. {
  1081. return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
  1082. }
  1083. bool sendPing(CTimeMon &tm);
  1084. bool sendPingReply(unsigned timeout,bool identifyself);
  1085. bool verifyConnection(CTimeMon &tm,bool allowconnect)
  1086. {
  1087. {
  1088. CriticalBlock block(connectsect);
  1089. if (!channelsock&&allowconnect)
  1090. return connect(tm);
  1091. if (closed||!channelsock)
  1092. return false;
  1093. if ((msTick()-lastxfer)<VERIFY_DELAY)
  1094. return true;
  1095. }
  1096. StringBuffer ep;
  1097. remoteep.getUrlStr(ep);
  1098. for (;;) {
  1099. CTimeMon pingtm(1000*60);
  1100. if (sendPing(pingtm))
  1101. break;
  1102. {
  1103. CriticalBlock block(connectsect);
  1104. if (closed||!channelsock)
  1105. return false;
  1106. }
  1107. if (tm.timedout()) {
  1108. LOG(MCdebugInfo, unknownJob, "MP: verify, ping failed to %s",ep.str());
  1109. closeSocket();
  1110. return false;
  1111. }
  1112. LOG(MCdebugInfo, unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
  1113. unsigned remaining;
  1114. if (!pingtm.timedout(&remaining)&&remaining)
  1115. Sleep(remaining);
  1116. }
  1117. return true;
  1118. }
  1119. bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
  1120. void closeSocket(bool keepsocket=false, bool trace=false)
  1121. {
  1122. ISocket *s;
  1123. bool socketfailed = false;
  1124. {
  1125. CriticalBlock block(connectsect);
  1126. if (!channelsock)
  1127. return;
  1128. lastxfer = msTick();
  1129. closed = true;
  1130. if (parent)
  1131. parent->checkclosed = true;
  1132. s=channelsock;
  1133. channelsock = nullptr;
  1134. {
  1135. CriticalBlock block(attachsect);
  1136. attachaddrval = 0;
  1137. attachep.set(nullptr);
  1138. attachPeerEp.set(nullptr);
  1139. attachchk = 0;
  1140. }
  1141. if (!keepsocket) {
  1142. try {
  1143. s->shutdown();
  1144. }
  1145. catch (IException *e) {
  1146. socketfailed = true; // ignore if the socket has been closed
  1147. WARNLOG("closeSocket() : Ignoring shutdown error");
  1148. e->Release();
  1149. }
  1150. }
  1151. parent->querySelectHandler().remove(s);
  1152. }
  1153. parent->notifyClosed(remoteep, trace);
  1154. if (socketfailed) {
  1155. try {
  1156. s->Release();
  1157. }
  1158. catch (IException *) {
  1159. // ignore
  1160. }
  1161. }
  1162. else if (keepsocket) {
  1163. // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
  1164. if (keptsockets.ordinality()>10)
  1165. keptsockets.remove(0);
  1166. keptsockets.append(*s);
  1167. }
  1168. else {
  1169. try {
  1170. s->close();
  1171. }
  1172. catch (IException *) {
  1173. socketfailed = true; // ignore if the socket has been closed
  1174. }
  1175. s->Release();
  1176. }
  1177. }
  1178. CMPServer &queryServer() { return *parent; }
  1179. void monitorCheck();
  1180. StringBuffer & queryEpStr(StringBuffer &s)
  1181. {
  1182. return remoteep.getUrlStr(s);
  1183. }
  1184. bool isClosed()
  1185. {
  1186. return closed;
  1187. }
  1188. bool isConnected()
  1189. {
  1190. return !closed&&(channelsock!=NULL);
  1191. }
  1192. const SocketEndpoint &queryPeerEp() const { return attachPeerEp; }
  1193. };
  1194. // Message Handlers (not done as interfaces for speed reasons
  1195. class UserPacketHandler // default
  1196. {
  1197. CMPServer *server;
  1198. public:
  1199. UserPacketHandler(CMPServer *_server)
  1200. {
  1201. server = _server;
  1202. }
  1203. void handle(CMessageBuffer *msg) // takes ownership of message buffer
  1204. {
  1205. server->getReceiveQ().enqueue(msg);
  1206. }
  1207. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
  1208. {
  1209. #ifdef _FULLTRACE
  1210. StringBuffer ep1;
  1211. StringBuffer ep2;
  1212. LOG(MCdebugInfo, unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  1213. #endif
  1214. return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
  1215. }
  1216. };
  1217. class PingPacketHandler // TAG_SYS_PING
  1218. {
  1219. public:
  1220. void handle(CMPChannel *channel,bool identifyself)
  1221. {
  1222. channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
  1223. }
  1224. bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
  1225. {
  1226. return channel->writepacket(&hdr,sizeof(hdr),tm);
  1227. }
  1228. };
  1229. class PingReplyPacketHandler // TAG_SYS_PING_REPLY
  1230. {
  1231. public:
  1232. void handle(CMPChannel *channel)
  1233. {
  1234. channel->pingsem.signal();
  1235. }
  1236. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
  1237. {
  1238. return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
  1239. }
  1240. };
  1241. class CMultiPacketReceiver: public CInterface
  1242. { // assume each sender only sends one multi-message per channel
  1243. public:
  1244. SocketEndpoint sender;
  1245. MultiPacketHeader info;
  1246. CMessageBuffer *msg;
  1247. byte * ptr;
  1248. };
  1249. class MultiPacketHandler // TAG_SYS_MULTI
  1250. {
  1251. CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
  1252. CriticalSection sect;
  1253. unsigned lastErrMs;
  1254. void logError(unsigned code, MultiPacketHeader &mhdr, CMessageBuffer &msg, MultiPacketHeader *otherMhdr)
  1255. {
  1256. unsigned ms = msTick();
  1257. if ((ms-lastErrMs) > 1000) // avoid logging too much
  1258. {
  1259. StringBuffer errorMsg("sender=");
  1260. msg.getSender().getUrlStr(errorMsg).newline();
  1261. errorMsg.append("This header: ");
  1262. mhdr.getDetails(errorMsg).newline();
  1263. if (otherMhdr)
  1264. {
  1265. errorMsg.append("Other header: ");
  1266. otherMhdr->getDetails(errorMsg).newline();
  1267. }
  1268. msg.getDetails(errorMsg);
  1269. LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (%d) %s", code, errorMsg.str());
  1270. }
  1271. lastErrMs = ms;
  1272. }
  1273. public:
  1274. MultiPacketHandler() : lastErrMs(0)
  1275. {
  1276. }
  1277. CMessageBuffer *handle(CMessageBuffer * msg)
  1278. {
  1279. if (!msg)
  1280. return NULL;
  1281. CriticalBlock block(sect);
  1282. MultiPacketHeader mhdr;
  1283. msg->read(sizeof(mhdr),&mhdr);
  1284. CMultiPacketReceiver *recv=NULL;
  1285. ForEachItemIn(i,inprogress) {
  1286. CMultiPacketReceiver &mpr = inprogress.item(i);
  1287. if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
  1288. recv = &mpr;
  1289. break;
  1290. }
  1291. }
  1292. if (mhdr.idx==0) {
  1293. if ((mhdr.ofs!=0)||(recv!=NULL)) {
  1294. logError(1, mhdr, *msg, recv?&recv->info:NULL);
  1295. delete msg;
  1296. return NULL;
  1297. }
  1298. recv = new CMultiPacketReceiver;
  1299. recv->msg = new CMessageBuffer();
  1300. recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
  1301. recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
  1302. recv->sender = msg->getSender();
  1303. recv->info = mhdr;
  1304. inprogress.append(*recv);
  1305. }
  1306. else {
  1307. if ((recv==NULL)||(mhdr.ofs==0)||
  1308. (recv->info.ofs+recv->info.size!=mhdr.ofs)||
  1309. (recv->info.idx+1!=mhdr.idx)||
  1310. (recv->info.total!=mhdr.total)||
  1311. (mhdr.ofs+mhdr.size>mhdr.total)) {
  1312. logError(2, mhdr, *msg, recv?&recv->info:NULL);
  1313. delete msg;
  1314. return NULL;
  1315. }
  1316. }
  1317. msg->read(mhdr.size,recv->ptr+mhdr.ofs);
  1318. delete msg;
  1319. msg = NULL;
  1320. recv->info = mhdr;
  1321. if (mhdr.idx+1==mhdr.numparts) {
  1322. if (mhdr.ofs+mhdr.size!=mhdr.total) {
  1323. logError(3, mhdr, *msg, NULL);
  1324. return NULL;
  1325. }
  1326. msg = recv->msg;
  1327. inprogress.remove(i);
  1328. }
  1329. return msg;
  1330. }
  1331. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
  1332. {
  1333. // must not adjust mb
  1334. #ifdef _FULLTRACE
  1335. StringBuffer ep1;
  1336. StringBuffer ep2;
  1337. LOG(MCdebugInfo, unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  1338. #endif
  1339. PacketHeader outhdr;
  1340. outhdr = hdr;
  1341. outhdr.tag = TAG_SYS_MULTI;
  1342. MultiPacketHeader mhdr;
  1343. mhdr.total = hdr.size-sizeof(hdr);
  1344. mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
  1345. mhdr.size = mhdr.total/mhdr.numparts;
  1346. mhdr.tag = hdr.tag;
  1347. mhdr.ofs = 0;
  1348. mhdr.idx = 0;
  1349. const byte *p = (const byte *)mb.toByteArray();
  1350. unsigned i=0;
  1351. for (;;) {
  1352. if (i+1==mhdr.numparts)
  1353. mhdr.size = mhdr.total-mhdr.ofs;
  1354. #ifdef _FULLTRACE
  1355. LOG(MCdebugInfo, unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
  1356. #endif
  1357. outhdr.initseq();
  1358. outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
  1359. if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
  1360. #ifdef _FULLTRACE
  1361. LOG(MCdebugInfo, unknownJob, "MP: multi-send failed");
  1362. #endif
  1363. return false;
  1364. }
  1365. i++;
  1366. if (i==mhdr.numparts)
  1367. break;
  1368. sendmutex.unlock(); // allow other messages to interleave
  1369. sendmutex.lock();
  1370. mhdr.idx++;
  1371. mhdr.ofs += mhdr.size;
  1372. p += mhdr.size;
  1373. }
  1374. return true;
  1375. }
  1376. };
  1377. class BroadcastPacketHandler // TAG_SYS_BCAST
  1378. {
  1379. public:
  1380. CMessageBuffer *handle(CMessageBuffer * msg)
  1381. {
  1382. delete msg;
  1383. return NULL;
  1384. }
  1385. };
  1386. class ForwardPacketHandler // TAG_SYS_FORWARD
  1387. {
  1388. public:
  1389. CMessageBuffer *handle(CMessageBuffer * msg)
  1390. {
  1391. delete msg;
  1392. return NULL;
  1393. }
  1394. };
  1395. // --------------------------------------------------------
  1396. class CMPPacketReader: public ISocketSelectNotify, public CInterface
  1397. {
  1398. CMessageBuffer *activemsg;
  1399. byte * activeptr;
  1400. size32_t remaining;
  1401. CMPChannel *parent;
  1402. CriticalSection sect;
  1403. public:
  1404. IMPLEMENT_IINTERFACE;
  1405. CMPPacketReader(CMPChannel *_parent)
  1406. {
  1407. init(_parent);
  1408. }
  1409. void init(CMPChannel *_parent)
  1410. {
  1411. parent = _parent;
  1412. activemsg = NULL;
  1413. }
  1414. void shutdown()
  1415. {
  1416. CriticalBlock block(sect);
  1417. parent = NULL;
  1418. }
  1419. bool notifySelected(ISocket *sock,unsigned selected)
  1420. {
  1421. if (!parent)
  1422. return false;
  1423. try {
  1424. // try and mop up all data on socket
  1425. // TLS TODO: avail_read() may not return accurate amount of pending bytes
  1426. size32_t sizeavail = sock->avail_read();
  1427. if (sizeavail==0) {
  1428. // graceful close
  1429. Linked<CMPChannel> pc;
  1430. {
  1431. CriticalBlock block(sect);
  1432. if (parent) {
  1433. pc.set(parent); // don't want channel to disappear during call
  1434. parent = NULL;
  1435. }
  1436. }
  1437. if (pc)
  1438. {
  1439. #ifdef _TRACELINKCLOSED
  1440. LOG(MCdebugInfo, unknownJob, "CMPPacketReader::notifySelected() about to close socket, mode = 0x%x", selected);
  1441. #endif
  1442. pc->closeSocket(false, true);
  1443. }
  1444. return false;
  1445. }
  1446. do {
  1447. parent->lastxfer = msTick();
  1448. #ifdef _FULLTRACE
  1449. parent->numiter++;
  1450. #endif
  1451. if (!activemsg) { // no message in progress
  1452. PacketHeader hdr; // header for active message
  1453. #ifdef _FULLTRACE
  1454. parent->numiter = 1;
  1455. parent->startxfer = msTick();
  1456. #endif
  1457. // assumes packet header will arrive in one go
  1458. if (sizeavail<sizeof(hdr)) {
  1459. #ifdef _FULLTRACE
  1460. LOG(MCdebugInfo, unknownJob, "Selected stalled on header %u %lu",sizeavail,sizeavail-sizeof(hdr));
  1461. #endif
  1462. size32_t szread;
  1463. sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
  1464. }
  1465. else
  1466. sock->read(&hdr,sizeof(hdr));
  1467. if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
  1468. // TBD IPV6 here
  1469. SocketEndpoint ep;
  1470. hdr.sender.get(ep);
  1471. IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
  1472. throw e;
  1473. }
  1474. if (sizeavail<=sizeof(hdr))
  1475. sizeavail = sock->avail_read();
  1476. else
  1477. sizeavail -= sizeof(hdr);
  1478. #ifdef _FULLTRACE
  1479. StringBuffer ep1;
  1480. StringBuffer ep2;
  1481. LOG(MCdebugInfo, unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  1482. #endif
  1483. remaining = hdr.size-sizeof(hdr);
  1484. activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
  1485. activeptr = (byte *)activemsg->reserveTruncate(remaining);
  1486. hdr.setMessageFields(*activemsg);
  1487. }
  1488. size32_t toread = sizeavail;
  1489. if (toread>remaining)
  1490. toread = remaining;
  1491. if (toread) {
  1492. sock->read(activeptr,toread);
  1493. remaining -= toread;
  1494. sizeavail -= toread;
  1495. activeptr += toread;
  1496. }
  1497. if (remaining==0) { // we have the packet so process
  1498. #ifdef _FULLTRACE
  1499. LOG(MCdebugInfo, unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
  1500. #endif
  1501. do {
  1502. switch (activemsg->getTag()) {
  1503. case TAG_SYS_MULTI:
  1504. activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
  1505. break;
  1506. case TAG_SYS_PING:
  1507. parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
  1508. delete activemsg;
  1509. activemsg = NULL;
  1510. break;
  1511. case TAG_SYS_PING_REPLY:
  1512. parent->queryServer().pingreplypackethandler->handle(parent);
  1513. delete activemsg;
  1514. activemsg = NULL;
  1515. break;
  1516. case TAG_SYS_BCAST:
  1517. activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
  1518. break;
  1519. case TAG_SYS_FORWARD:
  1520. activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
  1521. break;
  1522. default:
  1523. parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
  1524. activemsg = NULL;
  1525. }
  1526. } while (activemsg);
  1527. }
  1528. if (!sizeavail)
  1529. sizeavail = sock->avail_read();
  1530. } while (sizeavail);
  1531. return false; // ok
  1532. }
  1533. catch (IException *e) {
  1534. if (e->errorCode()!=JSOCKERR_graceful_close)
  1535. FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
  1536. e->Release();
  1537. }
  1538. // error here, so close socket (ignore error as may be closed already)
  1539. try {
  1540. if(parent)
  1541. parent->closeSocket(false, true);
  1542. }
  1543. catch (IException *e) {
  1544. e->Release();
  1545. }
  1546. parent = NULL;
  1547. return false;
  1548. }
  1549. };
  1550. CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep) : parent(_parent), remoteep(_remoteep)
  1551. {
  1552. localep.set(parent->getPort());
  1553. reader = new CMPPacketReader(this);
  1554. attachep.set(nullptr);
  1555. attachchk = 0;
  1556. lastxfer = msTick();
  1557. #if defined(_USE_OPENSSL)
  1558. if (parent->useTLS)
  1559. secureContextClient.setown(createSecureSocketContextSecret("local", ClientSocket));
  1560. #endif
  1561. }
  1562. void CMPChannel::reset()
  1563. {
  1564. reader->shutdown(); // clear as early as possible
  1565. closeSocket(false, true);
  1566. reader->Release();
  1567. channelsock = nullptr;
  1568. multitag = TAG_NULL;
  1569. reader = new CMPPacketReader(this);
  1570. closed = false;
  1571. master = false;
  1572. sendwaiting = 0;
  1573. attachaddrval = 0;
  1574. attachep.set(nullptr);
  1575. attachPeerEp.set(nullptr);
  1576. attachchk = 0;
  1577. lastxfer = msTick();
  1578. }
  1579. CMPChannel::~CMPChannel()
  1580. {
  1581. reader->shutdown(); // clear as early as possible
  1582. closeSocket();
  1583. reader->Release();
  1584. }
  1585. bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned __int64 addrval) // takes ownership if succeeds
  1586. {
  1587. struct attachdTor
  1588. {
  1589. std::atomic<unsigned> &attchk;
  1590. attachdTor(std::atomic<unsigned> &_attchk) : attchk(_attchk) { }
  1591. ~attachdTor() { --attchk; }
  1592. } attachChk (attachchk);
  1593. #ifdef _FULLTRACE
  1594. PROGLOG("MP: attachSocket on entry, ismaster = %d, confirm = %p, channelsock = %p, addrval = %" I64F "u", ismaster, confirm, channelsock, addrval);
  1595. #endif
  1596. {
  1597. CriticalBlock block(attachsect);
  1598. attachaddrval = addrval;
  1599. attachep = _remoteep;
  1600. if (newsock)
  1601. newsock->getPeerEndpoint(attachPeerEp);
  1602. ++attachchk;
  1603. }
  1604. CriticalBlock block(connectsect);
  1605. #ifdef _FULLTRACE
  1606. PROGLOG("MP: attachSocket got connectsect, channelsock = %p", channelsock);
  1607. #endif
  1608. // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
  1609. if (channelsock) {
  1610. if (_remoteep.port==0)
  1611. return false;
  1612. StringBuffer ep1;
  1613. StringBuffer ep2;
  1614. _localep.getUrlStr(ep1);
  1615. _remoteep.getUrlStr(ep2);
  1616. LOG(MCdebugInfo, unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
  1617. try {
  1618. if (ismaster!=master) {
  1619. if (ismaster) {
  1620. LOG(MCdebugInfo, unknownJob, "MP: resolving socket attach clash (master)");
  1621. return false;
  1622. }
  1623. else {
  1624. Sleep(50); // give the other side some time to close
  1625. CTimeMon tm(10000);
  1626. if (verifyConnection(tm,false)) {
  1627. LOG(MCdebugInfo, unknownJob, "MP: resolving socket attach clash (verified)");
  1628. return false;
  1629. }
  1630. }
  1631. }
  1632. }
  1633. catch (IException *e) {
  1634. FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
  1635. e->Release();
  1636. }
  1637. try {
  1638. LOG(MCdebugInfo, unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
  1639. CriticalUnblock unblock(connectsect);
  1640. closeSocket(true, true);
  1641. #ifdef REFUSE_STALE_CONNECTION
  1642. if (!ismaster)
  1643. return false;
  1644. #endif
  1645. Sleep(100); // pause to allow close socket triggers to run
  1646. }
  1647. catch (IException *e) {
  1648. FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
  1649. e->Release();
  1650. }
  1651. }
  1652. if (confirm)
  1653. newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
  1654. closed = false;
  1655. reader->init(this);
  1656. channelsock = LINK(newsock);
  1657. #ifdef _FULLTRACE
  1658. PROGLOG("MP: attachSocket before select add");
  1659. #endif
  1660. parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
  1661. #ifdef _FULLTRACE
  1662. PROGLOG("MP: attachSocket after select add");
  1663. #endif
  1664. localep = _localep;
  1665. master = ismaster;
  1666. return true;
  1667. }
  1668. bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
  1669. {
  1670. // note must not adjust mb
  1671. assertex(tag!=TAG_NULL);
  1672. assertex(tm.timeout);
  1673. size32_t msgsize = mb.length();
  1674. PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
  1675. if (closed||(reply&&!isConnected())) // flag error if has been disconnected
  1676. {
  1677. #ifdef _TRACELINKCLOSED
  1678. LOG(MCdebugInfo, unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
  1679. PrintStackReport();
  1680. #endif
  1681. if (!checkReconnect(tm))
  1682. throw new CMPException(MPERR_link_closed,remoteep);
  1683. }
  1684. bool ismulti = (msgsize>MAXDATAPERPACKET);
  1685. // pre-condition - ensure no clashes
  1686. for (;;)
  1687. {
  1688. sendmutex.lock();
  1689. if (ismulti)
  1690. {
  1691. if (multitag==TAG_NULL) // don't want to interleave with other multi send
  1692. {
  1693. multitag = tag;
  1694. break;
  1695. }
  1696. }
  1697. else if (multitag!=tag) // don't want to interleave with another of same tag
  1698. break;
  1699. /* NB: block clashing multi packet sends until current one is done,
  1700. * but note that the multipackethandler-send() temporarily releases the sendmutex,
  1701. * between packets, to allow other tags to interleave
  1702. */
  1703. sendwaiting++;
  1704. sendmutex.unlock();
  1705. sendwaitingsig.wait();
  1706. }
  1707. struct Cpostcondition // can we start using eiffel
  1708. {
  1709. Mutex &sendmutex;
  1710. unsigned &sendwaiting;
  1711. Semaphore &sendwaitingsig;
  1712. mptag_t *multitag;
  1713. Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
  1714. : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
  1715. {
  1716. multitag = _multitag;
  1717. }
  1718. ~Cpostcondition()
  1719. {
  1720. if (multitag)
  1721. *multitag = TAG_NULL;
  1722. if (sendwaiting)
  1723. {
  1724. sendwaitingsig.signal(sendwaiting);
  1725. sendwaiting = 0;
  1726. }
  1727. sendmutex.unlock();
  1728. }
  1729. } postcond(sendmutex, sendwaiting, sendwaitingsig, (ismulti && (multitag != TAG_NULL)) ? &multitag : nullptr);
  1730. if (ismulti)
  1731. return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
  1732. return parent->userpackethandler->send(this,hdr,mb,tm);
  1733. }
  1734. bool CMPChannel::sendPing(CTimeMon &tm)
  1735. {
  1736. unsigned remaining;
  1737. tm.timedout(&remaining);
  1738. if (!sendmutex.lockWait(remaining))
  1739. return false;
  1740. SocketEndpoint myep(parent->getPort());
  1741. PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
  1742. bool ret = false;
  1743. try {
  1744. ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
  1745. }
  1746. catch (IException *e) {
  1747. FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
  1748. e->Release();
  1749. }
  1750. sendmutex.unlock();
  1751. if (ret)
  1752. ret = pingsem.wait(remaining);
  1753. return ret;
  1754. }
  1755. bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
  1756. {
  1757. CTimeMon mon(timeout);
  1758. unsigned remaining;
  1759. mon.timedout(&remaining);
  1760. if (!sendmutex.lockWait(remaining))
  1761. return false;
  1762. SocketEndpoint myep(parent->getPort());
  1763. MemoryBuffer mb;
  1764. if (identifyself) {
  1765. #ifdef _WIN32
  1766. mb.append(GetCommandLine());
  1767. #endif
  1768. }
  1769. PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
  1770. bool ret;
  1771. try {
  1772. ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
  1773. }
  1774. catch (IException *e) {
  1775. FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
  1776. e->Release();
  1777. ret = false;
  1778. }
  1779. sendmutex.unlock();
  1780. return ret;
  1781. }
  1782. // --------------------------------------------------------
  1783. CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen)
  1784. : Thread("MP Connection Thread")
  1785. {
  1786. parent = _parent;
  1787. listen = _listen;
  1788. mpSoMaxConn = 0;
  1789. #ifndef _CONTAINERIZED
  1790. Owned<IPropertyTree> env = getHPCCEnvironment();
  1791. if (env)
  1792. {
  1793. if (listen)
  1794. {
  1795. mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
  1796. if (!mpSoMaxConn)
  1797. mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
  1798. }
  1799. unsigned mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
  1800. switch (mpTraceLevel)
  1801. {
  1802. case 0:
  1803. parent->mpTraceLevel = InfoMsgThreshold;
  1804. break;
  1805. case 1:
  1806. parent->mpTraceLevel = DebugMsgThreshold;
  1807. break;
  1808. case 2:
  1809. parent->mpTraceLevel = ExtraneousMsgThreshold;
  1810. break;
  1811. default:
  1812. parent->mpTraceLevel = MPVerboseMsgThreshold;
  1813. break;
  1814. }
  1815. }
  1816. #else
  1817. parent->mpTraceLevel = getComponentConfigSP()->getPropInt("logging/@detail", InfoMsgThreshold);
  1818. #endif
  1819. if (mpSoMaxConn)
  1820. {
  1821. int kernSoMaxConn = 0;
  1822. bool soMaxCheck = check_somaxconn(&kernSoMaxConn);
  1823. if (soMaxCheck && (mpSoMaxConn > kernSoMaxConn))
  1824. WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
  1825. }
  1826. if (!mpSoMaxConn && listen)
  1827. mpSoMaxConn = DEFAULT_LISTEN_QUEUE_SIZE;
  1828. if (!port)
  1829. {
  1830. // need to connect early to resolve clash
  1831. unsigned minPort, maxPort;
  1832. minPort = MP_START_PORT;
  1833. maxPort = MP_END_PORT;
  1834. #ifndef _CONTAINERIZED
  1835. if (env)
  1836. {
  1837. minPort = env->getPropInt("EnvSettings/mpStart", 0);
  1838. if (!minPort)
  1839. minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
  1840. maxPort = env->getPropInt("EnvSettings/mpEnd", 0);
  1841. if (!maxPort)
  1842. maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
  1843. }
  1844. #endif
  1845. assertex(maxPort >= minPort);
  1846. Owned<IJSOCK_Exception> lastErr;
  1847. // mck - if not listening then could ignore port range and
  1848. // let OS select an unused port ...
  1849. unsigned numPorts = maxPort - minPort + 1;
  1850. for (unsigned retries = 0; retries < numPorts * 3; retries++)
  1851. {
  1852. port = minPort + getRandom() % numPorts;
  1853. try
  1854. {
  1855. listensock = ISocket::create(port, mpSoMaxConn);
  1856. break;
  1857. }
  1858. catch (IJSOCK_Exception *e)
  1859. {
  1860. if (e->errorCode()!=JSOCKERR_port_in_use)
  1861. throw;
  1862. lastErr.setown(e);
  1863. }
  1864. }
  1865. if (!listensock)
  1866. throw lastErr.getClear();
  1867. }
  1868. else
  1869. listensock = NULL; // delay create till running
  1870. parent->setPort(port);
  1871. #ifdef _TRACE
  1872. LOG(MCdebugInfo, unknownJob, "MP Connect Thread Init Port = %d", port);
  1873. #endif
  1874. running = false;
  1875. #if defined(_USE_OPENSSL)
  1876. if (parent->useTLS)
  1877. secureContextServer.setown(createSecureSocketContextSecretSrv("local"));
  1878. #endif
  1879. }
  1880. void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
  1881. {
  1882. byte *b = (byte *)p;
  1883. while (sz--)
  1884. if (*(b++)!=0xff)
  1885. return;
  1886. // Panic!
  1887. PROGLOG("MP Self destruct invoked");
  1888. try {
  1889. if (listensock) {
  1890. listensock->close();
  1891. listensock->Release();
  1892. listensock=NULL;
  1893. }
  1894. }
  1895. catch (...)
  1896. {
  1897. PROGLOG("MP socket close failure");
  1898. }
  1899. // Kill registered child processes
  1900. PROGLOG("MP self destruct exit");
  1901. queryLogMsgManager()->flushQueue(10*1000);
  1902. #ifdef _WIN32
  1903. ForEachItemIn(i,childprocesslist)
  1904. TerminateProcess((HANDLE)childprocesslist.item(i), 1);
  1905. TerminateProcess(GetCurrentProcess(), 1);
  1906. #else
  1907. ForEachItemIn(i,childprocesslist)
  1908. ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
  1909. ::kill(getpid(), SIGTERM);
  1910. #endif
  1911. _exit(1);
  1912. }
  1913. void CMPConnectThread::startPort(unsigned short port)
  1914. {
  1915. if (!listensock)
  1916. listensock = ISocket::create(port, mpSoMaxConn);
  1917. if (!listen)
  1918. return;
  1919. running = true;
  1920. Thread::start();
  1921. }
  1922. int CMPConnectThread::run()
  1923. {
  1924. #ifdef _TRACE
  1925. LOG(MCdebugInfo, unknownJob, "MP: Connect Thread Starting - accept loop");
  1926. #endif
  1927. while (running)
  1928. {
  1929. Owned<ISocket> sock;
  1930. SocketEndpoint peerEp;
  1931. try
  1932. {
  1933. sock.setown(listensock->accept(true, &peerEp));
  1934. }
  1935. catch (IException *e)
  1936. {
  1937. LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
  1938. throw; // error handling TBD
  1939. }
  1940. if (sock)
  1941. {
  1942. try
  1943. {
  1944. #if defined(_USE_OPENSSL)
  1945. if (parent->useTLS)
  1946. {
  1947. Owned<ISecureSocket> ssock = secureContextServer->createSecureSocket(sock.getClear());
  1948. int tlsTraceLevel = SSLogMin;
  1949. if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
  1950. tlsTraceLevel = SSLogMax;
  1951. int status = ssock->secure_accept(tlsTraceLevel);
  1952. if (status < 0)
  1953. {
  1954. ssock->close();
  1955. PROGLOG("MP Connect Thread: failed to accept secure connection");
  1956. continue;
  1957. }
  1958. sock.setown(ssock.getClear());
  1959. }
  1960. #endif // OPENSSL
  1961. #ifdef _FULLTRACE
  1962. StringBuffer s;
  1963. SocketEndpoint ep1;
  1964. sock->getPeerEndpoint(ep1);
  1965. PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
  1966. #endif
  1967. sock->set_keep_alive(true);
  1968. size32_t rd;
  1969. SocketEndpoint _remoteep;
  1970. SocketEndpoint hostep;
  1971. ConnectHdr connectHdr;
  1972. bool legacyClient = false;
  1973. // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new
  1974. traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
  1975. if (0 == rd)
  1976. {
  1977. if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
  1978. {
  1979. // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint())
  1980. PROGLOG("MP Connect Thread: connect with no msg received, assumed port monitor check");
  1981. }
  1982. sock->close();
  1983. continue;
  1984. }
  1985. else
  1986. {
  1987. if (rd == sizeof(connectHdr.id)) // legacy client
  1988. {
  1989. legacyClient = true;
  1990. connectHdr.setRole(0); // unknown
  1991. }
  1992. else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr))
  1993. {
  1994. // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception
  1995. StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");
  1996. peerEp.getUrlStr(errMsg);
  1997. FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
  1998. sock->close();
  1999. continue;
  2000. }
  2001. }
  2002. if (allowListCallback)
  2003. {
  2004. StringBuffer ipStr;
  2005. peerEp.getIpText(ipStr);
  2006. StringBuffer responseText; // filled if denied
  2007. if (!allowListCallback->isAllowListed(ipStr, connectHdr.getRole(), &responseText))
  2008. {
  2009. Owned<IException> e = makeStringException(-1, responseText);
  2010. OWARNLOG(e, nullptr);
  2011. if (legacyClient)
  2012. {
  2013. /* NB: legacy client can't handle exception response
  2014. * Acknowledge legacy connection, then close socket
  2015. * The effect will be the client sees an MPERR_link_closed
  2016. */
  2017. size32_t reply = sizeof(connectHdr.id);
  2018. sock->write(&reply, sizeof(reply));
  2019. }
  2020. else
  2021. {
  2022. MemoryBuffer mb;
  2023. DelayedSizeMarker marker(mb);
  2024. serializeException(e, mb);
  2025. marker.write();
  2026. sock->write(mb.toByteArray(), mb.length());
  2027. }
  2028. sock->close();
  2029. continue;
  2030. }
  2031. }
  2032. connectHdr.id[0].get(_remoteep);
  2033. connectHdr.id[1].get(hostep);
  2034. unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
  2035. #ifdef _TRACE
  2036. PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval);
  2037. #endif
  2038. if (_remoteep.isNull() || hostep.isNull())
  2039. {
  2040. StringBuffer errMsg;
  2041. SocketEndpointV4 zeroTest[2];
  2042. memset(zeroTest, 0x0, sizeof(zeroTest));
  2043. if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id)))
  2044. {
  2045. // JCSMORE, I think _remoteep really must/should match a IP of this local host
  2046. errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from ");
  2047. peerEp.getUrlStr(errMsg);
  2048. FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
  2049. }
  2050. else if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
  2051. {
  2052. // all zeros msg received
  2053. errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
  2054. peerEp.getUrlStr(errMsg);
  2055. PROGLOG("%s", errMsg.str());
  2056. }
  2057. sock->close();
  2058. continue;
  2059. }
  2060. #ifdef _FULLTRACE
  2061. StringBuffer tmp1;
  2062. _remoteep.getUrlStr(tmp1);
  2063. tmp1.append(' ');
  2064. hostep.getUrlStr(tmp1);
  2065. PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
  2066. #endif
  2067. checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id));
  2068. Owned<CMPChannel> channel = parent->lookup(_remoteep);
  2069. if (!channel->attachSocket(sock.getClear(),_remoteep,hostep,false,&rd,addrval))
  2070. {
  2071. #ifdef _FULLTRACE
  2072. PROGLOG("MP Connect Thread: lookup failed");
  2073. #endif
  2074. }
  2075. else
  2076. {
  2077. #ifdef _TRACE
  2078. StringBuffer str1;
  2079. StringBuffer str2;
  2080. LOG(MCdebugInfo, unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).str());
  2081. #endif
  2082. }
  2083. #ifdef _FULLTRACE
  2084. PROGLOG("MP: Connect Thread: after write");
  2085. #endif
  2086. }
  2087. catch (IException *e)
  2088. {
  2089. FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
  2090. sock->close();
  2091. e->Release();
  2092. }
  2093. }
  2094. else
  2095. {
  2096. if (running)
  2097. LOG(MCdebugInfo, unknownJob, "MP Connect Thread accept returned NULL");
  2098. }
  2099. }
  2100. #ifdef _TRACE
  2101. LOG(MCdebugInfo, unknownJob, "MP Connect Thread Stopping");
  2102. #endif
  2103. return 0;
  2104. }
  2105. // --------------------------------------------------------
  2106. class CMPChannelIterator
  2107. {
  2108. CMPServer &parent;
  2109. CMPChannel *cur;
  2110. public:
  2111. CMPChannelIterator(CMPServer &_parent)
  2112. : parent(_parent)
  2113. {
  2114. cur = NULL;
  2115. }
  2116. bool first()
  2117. {
  2118. cur = NULL;
  2119. return parent.nextChannel(cur);
  2120. }
  2121. bool next()
  2122. {
  2123. return cur&&parent.nextChannel(cur);
  2124. }
  2125. bool isValid()
  2126. {
  2127. return cur!=NULL;
  2128. }
  2129. CMPChannel &query()
  2130. {
  2131. return *cur;
  2132. }
  2133. };
  2134. //-----------------------------------------------------------------------------------
  2135. CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
  2136. {
  2137. // there is an assumption here that no removes will be done within this loop
  2138. CriticalBlock block(sect);
  2139. SocketEndpoint ep = endpoint;
  2140. CMPChannel *e=find(ep);
  2141. // Check for freed channels
  2142. if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
  2143. e->reset();
  2144. if (checkclosed) {
  2145. checkclosed = false;
  2146. CMPChannel *c = NULL;
  2147. for (;;) {
  2148. c = next(c);
  2149. if (!c) {
  2150. break;
  2151. }
  2152. if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
  2153. removeExact(c);
  2154. c = NULL;
  2155. }
  2156. }
  2157. e=find(ep);
  2158. }
  2159. if (!e) {
  2160. e = new CMPChannel(this,ep);
  2161. add(*e);
  2162. }
  2163. return LINK(e);
  2164. }
  2165. CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
  2166. {
  2167. RTsalt=0xff;
  2168. role = _role;
  2169. port = 0; // connectthread tells me what port it actually connected on
  2170. checkclosed = false;
  2171. useTLS = queryMtls();
  2172. // If !_listen, CMPConnectThread binds a port but does not actually start
  2173. // running, it is used as a unique IP:port required in MP INode/IGroup internals
  2174. // for MP clients that do not need to accept connections.
  2175. connectthread = new CMPConnectThread(this, _port, _listen);
  2176. selecthandler = createSocketSelectHandler();
  2177. pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
  2178. pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
  2179. forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
  2180. multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
  2181. broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
  2182. userpackethandler = new UserPacketHandler(this); // default
  2183. notifyclosedthread = new CMPNotifyClosedThread(this);
  2184. notifyclosedthread->start();
  2185. selecthandler->start();
  2186. rettag = (int)TAG_REPLY_BASE; // NB negative
  2187. SocketEndpoint ep(port); // NB port set by connectthread constructor
  2188. myNode = createINode(ep);
  2189. }
  2190. CMPServer::~CMPServer()
  2191. {
  2192. #ifdef _TRACEORPHANS
  2193. StringBuffer buf;
  2194. getReceiveQueueDetails(buf);
  2195. if (buf.length())
  2196. LOG(MCdebugInfo, unknownJob, "MP: Orphan check\n%s",buf.str());
  2197. #endif
  2198. _releaseAll();
  2199. selecthandler->stop(true);
  2200. selecthandler->Release();
  2201. notifyclosedthread->stop();
  2202. notifyclosedthread->Release();
  2203. connectthread->Release();
  2204. delete pingpackethandler;
  2205. delete pingreplypackethandler;
  2206. delete forwardpackethandler;
  2207. delete multipackethandler;
  2208. delete broadcastpackethandler;
  2209. delete userpackethandler;
  2210. ::Release(myNode);
  2211. }
  2212. void CMPServer::checkTagOK(mptag_t tag)
  2213. {
  2214. if ((int)tag<=(int)TAG_REPLY_BASE) {
  2215. int dif = (int)TAG_REPLY_BASE-(int)tag;
  2216. if (dif%16!=RTsalt) {
  2217. ERRLOG("**Invalid MP tag used");
  2218. PrintStackReport();
  2219. }
  2220. }
  2221. }
  2222. bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
  2223. {
  2224. checkTagOK(tag);
  2225. class Cnfy: public CBufferQueueNotify
  2226. {
  2227. public:
  2228. bool aborted;
  2229. CMessageBuffer *result;
  2230. const SocketEndpoint *ep;
  2231. SocketEndpoint closedEp; // used if receiving on RANK_ALL
  2232. mptag_t tag;
  2233. Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
  2234. bool notify(CMessageBuffer *msg)
  2235. {
  2236. if ((tag==TAG_ALL)||(tag==msg->getTag())) {
  2237. const SocketEndpoint &senderep = msg->getSender();
  2238. if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
  2239. if (msg->getReplyTag()==TAG_CANCEL)
  2240. delete msg;
  2241. else
  2242. result = msg;
  2243. return true;
  2244. }
  2245. }
  2246. return false;
  2247. }
  2248. bool notifyClosed(SocketEndpoint &_closedEp) // called when connection closed
  2249. {
  2250. if (NULL == ep) { // ep is NULL if receiving on RANK_ALL
  2251. closedEp = _closedEp;
  2252. ep = &closedEp; // used for abort info
  2253. aborted = true;
  2254. return true;
  2255. }
  2256. else if (ep->equals(_closedEp)) {
  2257. aborted = true;
  2258. return true;
  2259. }
  2260. return false;
  2261. }
  2262. } nfy(ep,tag);
  2263. if (receiveq.wait(nfy,false,tm)&&nfy.result) {
  2264. mbuf.transferFrom(*nfy.result);
  2265. delete nfy.result;
  2266. return true;
  2267. }
  2268. if (nfy.aborted) {
  2269. #ifdef _TRACELINKCLOSED
  2270. LOG(MCdebugInfo, unknownJob, "CMPserver::recv closed on notify");
  2271. PrintStackReport();
  2272. #endif
  2273. IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
  2274. throw e;
  2275. }
  2276. return false;
  2277. }
  2278. void CMPServer::flush(mptag_t tag)
  2279. {
  2280. class Cnfy: public CBufferQueueNotify
  2281. {
  2282. public:
  2283. mptag_t tag;
  2284. Cnfy(mptag_t _tag) { tag = _tag; }
  2285. bool notify(CMessageBuffer *msg)
  2286. {
  2287. return (tag==TAG_ALL)||(tag==msg->getTag());
  2288. }
  2289. bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
  2290. {
  2291. return false;
  2292. }
  2293. } nfy(tag);
  2294. unsigned count = receiveq.flush(nfy);
  2295. if (count)
  2296. PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
  2297. }
  2298. void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
  2299. {
  2300. CMessageBuffer *cancelmsg = new CMessageBuffer(0);
  2301. SocketEndpoint send;
  2302. if (ep)
  2303. send = *ep;
  2304. cancelmsg->init(send,tag,TAG_CANCEL);
  2305. getReceiveQ().enqueue(cancelmsg);
  2306. }
  2307. unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
  2308. {
  2309. class Cnfy: public CBufferQueueNotify
  2310. {
  2311. public:
  2312. bool aborted;
  2313. SocketEndpoint &sender;
  2314. const SocketEndpoint *ep;
  2315. mptag_t tag;
  2316. bool cancel;
  2317. unsigned count;
  2318. Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
  2319. {
  2320. ep = _ep;
  2321. tag = _tag;
  2322. cancel = false;
  2323. aborted = false;
  2324. count = 0;
  2325. }
  2326. bool notify(CMessageBuffer *msg)
  2327. {
  2328. if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
  2329. ((ep==NULL)||ep->equals(msg->getSender()))) {
  2330. if (count==0) {
  2331. sender = msg->getSender();
  2332. cancel = (msg->getReplyTag()==TAG_CANCEL);
  2333. }
  2334. count++;
  2335. return true;
  2336. }
  2337. return false;
  2338. }
  2339. bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
  2340. {
  2341. if (ep&&ep->equals(closedep)) {
  2342. aborted = true;
  2343. return true;
  2344. }
  2345. return false;
  2346. }
  2347. } nfy(ep,tag,sender);
  2348. if (receiveq.wait(nfy,true,tm)) {
  2349. return nfy.cancel?0:nfy.count;
  2350. }
  2351. if (nfy.aborted) {
  2352. #ifdef _TRACELINKCLOSED
  2353. LOG(MCdebugInfo, unknownJob, "CMPserver::probe closed on notify");
  2354. PrintStackReport();
  2355. #endif
  2356. IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
  2357. throw e;
  2358. }
  2359. return 0;
  2360. }
  2361. void CMPServer::start()
  2362. {
  2363. connectthread->startPort(getPort());
  2364. }
  2365. void CMPServer::stop()
  2366. {
  2367. selecthandler->stop(true);
  2368. connectthread->stop();
  2369. CMPChannel *c = NULL;
  2370. for (;;) {
  2371. c = (CMPChannel *)next(c);
  2372. if (!c)
  2373. break;
  2374. c->closeSocket();
  2375. }
  2376. }
  2377. void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
  2378. {
  2379. // called in critical section CMPServer::sect
  2380. notifyclosedthread->addConnectionMonitor(monitor);
  2381. }
  2382. void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
  2383. {
  2384. // called in critical section CMPServer::sect
  2385. notifyclosedthread->removeConnectionMonitor(monitor);
  2386. }
  2387. void CMPServer::onAdd(void *)
  2388. {
  2389. // not used
  2390. }
  2391. void CMPServer::onRemove(void *e)
  2392. {
  2393. CMPChannel &elem=*(CMPChannel *)e;
  2394. elem.Release();
  2395. }
  2396. unsigned CMPServer::getHashFromElement(const void *e) const
  2397. {
  2398. const CMPChannel &elem=*(const CMPChannel *)e;
  2399. return elem.remoteep.hash(0);
  2400. }
  2401. unsigned CMPServer::getHashFromFindParam(const void *fp) const
  2402. {
  2403. return ((const SocketEndpoint*)fp)->hash(0);
  2404. }
  2405. const void * CMPServer::getFindParam(const void *p) const
  2406. {
  2407. const CMPChannel &elem=*(const CMPChannel *)p;
  2408. return &elem.remoteep;
  2409. }
  2410. bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
  2411. {
  2412. return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
  2413. }
  2414. bool CMPServer::nextChannel(CMPChannel *&cur)
  2415. {
  2416. CriticalBlock block(sect);
  2417. cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
  2418. return cur!=NULL;
  2419. }
  2420. void CMPServer::notifyClosed(SocketEndpoint &ep, bool trace)
  2421. {
  2422. #ifdef _TRACEMPSERVERNOTIFYCLOSED
  2423. if (trace)
  2424. {
  2425. StringBuffer url;
  2426. LOG(MCdebugInfo, unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
  2427. PrintStackReport();
  2428. }
  2429. #endif
  2430. notifyclosedthread->notify(ep);
  2431. }
  2432. // --------------------------------------------------------
  2433. class CInterCommunicator: public IInterCommunicator, public CInterface
  2434. {
  2435. CMPServer *parent;
  2436. CriticalSection verifysect;
  2437. public:
  2438. IMPLEMENT_IINTERFACE;
  2439. bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
  2440. {
  2441. if (!dst)
  2442. return false;
  2443. if (dst->equals(queryMyNode())) {
  2444. CMessageBuffer *msg = new CMessageBuffer();
  2445. mptag_t reply = mbuf.getReplyTag();
  2446. msg->transferFrom(mbuf);
  2447. msg->init(dst->endpoint(),tag,reply);
  2448. parent->getReceiveQ().enqueue(msg);
  2449. mbuf.clear(); // for consistent semantics
  2450. return true;
  2451. }
  2452. CTimeMon tm(timeout);
  2453. Owned<CMPChannel> channel = parent->lookup(dst->endpoint());
  2454. unsigned remaining;
  2455. if (tm.timedout(&remaining))
  2456. return false;
  2457. if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
  2458. return false;
  2459. mbuf.clear(); // for consistent semantics
  2460. return true;
  2461. }
  2462. bool verifyConnection(INode *node, unsigned timeout)
  2463. {
  2464. CriticalBlock block(verifysect);
  2465. CTimeMon tm(timeout);
  2466. Owned<CMPChannel> channel = parent->lookup(node->endpoint());
  2467. unsigned remaining;
  2468. if (tm.timedout(&remaining))
  2469. return false;
  2470. return channel->verifyConnection(tm,true);
  2471. }
  2472. void verifyAll(StringBuffer &log)
  2473. {
  2474. CMPChannelIterator iter(*parent);
  2475. if (iter.first()) {
  2476. do {
  2477. CriticalBlock block(verifysect);
  2478. CTimeMon tm(5000);
  2479. CMPChannel &channel = iter.query();
  2480. if (!channel.isClosed()) {
  2481. channel.queryEpStr(log).append(' ');
  2482. if (channel.verifyConnection(tm,false))
  2483. log.append("OK\n");
  2484. else
  2485. log.append("FAILED\n");
  2486. }
  2487. }
  2488. while (iter.next());
  2489. }
  2490. }
  2491. bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
  2492. {
  2493. CriticalBlock block(verifysect);
  2494. CTimeMon tm(timeout);
  2495. rank_t myrank = group->rank(parent->queryMyNode());
  2496. {
  2497. ForEachNodeInGroup(rank,*group) {
  2498. bool doverify;
  2499. if (duplex)
  2500. doverify = (myrank!=rank);
  2501. else if ((rank&1)==(myrank&1))
  2502. doverify = (myrank>rank);
  2503. else
  2504. doverify = (myrank<rank);
  2505. if (doverify) {
  2506. Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
  2507. unsigned remaining;
  2508. if (tm.timedout(&remaining)) {
  2509. return false;
  2510. }
  2511. if (!channel->verifyConnection(tm,true)) {
  2512. return false;
  2513. }
  2514. }
  2515. }
  2516. }
  2517. if (!duplex) {
  2518. ForEachNodeInGroup(rank,*group) {
  2519. bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
  2520. if (doverify) {
  2521. Owned<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
  2522. while (!channel->verifyConnection(tm,false)) {
  2523. unsigned remaining;
  2524. if (tm.timedout(&remaining))
  2525. return false;
  2526. CriticalUnblock unblock(verifysect);
  2527. Sleep(100);
  2528. }
  2529. }
  2530. }
  2531. }
  2532. return true;
  2533. }
  2534. unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
  2535. {
  2536. if (sender)
  2537. *sender = NULL;
  2538. SocketEndpoint res;
  2539. CTimeMon tm(timeout);
  2540. unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
  2541. if (ret!=0) {
  2542. if (sender)
  2543. *sender = createINode(res);
  2544. return ret;
  2545. }
  2546. return 0;
  2547. }
  2548. bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
  2549. {
  2550. if (sender)
  2551. *sender = NULL;
  2552. CTimeMon tm(timeout);
  2553. for (;;)
  2554. {
  2555. try
  2556. {
  2557. if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
  2558. {
  2559. if (sender)
  2560. *sender = createINode(mbuf.getSender());
  2561. return true;
  2562. }
  2563. return false;
  2564. }
  2565. catch (IMP_Exception *e)
  2566. {
  2567. if (MPERR_link_closed != e->errorCode())
  2568. throw;
  2569. const SocketEndpoint &ep = e->queryEndpoint();
  2570. if (src && (ep == src->endpoint()))
  2571. throw;
  2572. // ignoring closed endpoint
  2573. e->Release();
  2574. // loop around and recv again
  2575. }
  2576. }
  2577. }
  2578. void flush(mptag_t tag)
  2579. {
  2580. parent->flush(tag);
  2581. }
  2582. bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
  2583. {
  2584. assertex(dst);
  2585. mptag_t replytag = parent->createReplyTag();
  2586. CTimeMon tm(timeout);
  2587. mbuff.setReplyTag(replytag);
  2588. unsigned remaining;
  2589. if (tm.timedout(&remaining))
  2590. return false;
  2591. if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
  2592. return false;
  2593. mbuff.clear();
  2594. return recv(mbuff,dst,replytag,NULL,remaining);
  2595. }
  2596. bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
  2597. {
  2598. Owned<INode> dst(createINode(mbuff.getSender()));
  2599. return send(mbuff,dst,mbuff.getReplyTag(),timeout);
  2600. }
  2601. void cancel(INode *src, mptag_t tag)
  2602. {
  2603. parent->cancel(src?&src->endpoint():NULL,tag);
  2604. }
  2605. void disconnect(INode *node)
  2606. {
  2607. CriticalBlock block(verifysect);
  2608. Owned<CMPChannel> channel = parent->lookup(node->endpoint());
  2609. channel->closeSocket();
  2610. parent->removeChannel(channel);
  2611. }
  2612. CInterCommunicator(CMPServer *_parent)
  2613. {
  2614. parent = _parent;
  2615. }
  2616. ~CInterCommunicator()
  2617. {
  2618. }
  2619. };
  2620. class CCommunicator: public ICommunicator, public CInterface
  2621. {
  2622. IGroup *group;
  2623. CMPServer *parent;
  2624. bool outer;
  2625. rank_t myrank;
  2626. CriticalSection verifysect;
  2627. const SocketEndpoint &queryEndpoint(rank_t rank)
  2628. {
  2629. return group->queryNode(rank).endpoint();
  2630. }
  2631. CMPChannel *getChannel(rank_t rank)
  2632. {
  2633. return parent->lookup(queryEndpoint(rank));
  2634. }
  2635. public:
  2636. IMPLEMENT_IINTERFACE;
  2637. bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
  2638. {
  2639. // send does not corrupt mbuf
  2640. if (dstrank==RANK_NULL)
  2641. return false;
  2642. if (dstrank==myrank) {
  2643. CMessageBuffer *msg = mbuf.clone();
  2644. // change sender
  2645. msg->init(parent->queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
  2646. parent->getReceiveQ().enqueue(msg);
  2647. }
  2648. else {
  2649. CTimeMon tm(timeout);
  2650. rank_t endrank;
  2651. if (dstrank==RANK_ALL) {
  2652. send(mbuf,myrank,tag,timeout);
  2653. dstrank = RANK_ALL_OTHER;
  2654. }
  2655. if (dstrank==RANK_ALL_OTHER) {
  2656. dstrank = 0;
  2657. endrank = group->ordinality()-1;
  2658. }
  2659. else if (dstrank==RANK_RANDOM) {
  2660. if (group->ordinality()>1) {
  2661. do {
  2662. dstrank = getRandom()%group->ordinality();
  2663. } while (dstrank==myrank);
  2664. }
  2665. else {
  2666. assertex(myrank!=0);
  2667. dstrank = 0;
  2668. }
  2669. endrank = dstrank;
  2670. }
  2671. else
  2672. endrank = dstrank;
  2673. for (;dstrank<=endrank;dstrank++) {
  2674. if (dstrank!=myrank) {
  2675. Owned<CMPChannel> channel = getChannel(dstrank);
  2676. unsigned remaining;
  2677. if (tm.timedout(&remaining))
  2678. return false;
  2679. if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
  2680. return false;
  2681. }
  2682. }
  2683. }
  2684. return true;
  2685. }
  2686. void barrier(void)
  2687. {
  2688. #ifdef _TRACE
  2689. DBGLOG("MP: barrier enter");
  2690. #endif
  2691. /*
  2692. * Use the dissemination algorithm described in:
  2693. * Debra Hensgen, Raphael Finkel, and Udi Manbet, "Two Algorithms for Barrier Synchronization,"
  2694. * International Journal of Parallel Programming, 17(1):1-17, 1988.
  2695. * It uses ceiling(lgp) steps. In step k, 0 <= k <= (ceiling(lgp)-1),
  2696. * process i sends to process (i + 2^k) % p and receives from process (i - 2^k + p) % p.
  2697. */
  2698. int numranks = group->ordinality();
  2699. CMessageBuffer mb;
  2700. rank_t r;
  2701. int mask = 0x1;
  2702. while (mask < numranks)
  2703. {
  2704. int dst = (myrank + mask) % numranks;
  2705. int src = (myrank - mask + numranks) % numranks;
  2706. #ifdef _TRACE
  2707. DBGLOG("MP: barrier: send to %d, recv from %d", dst, src);
  2708. #endif
  2709. // NOTE: MPI method MUST use sendrecv so as to not send/recv deadlock ...
  2710. mb.clear();
  2711. mb.append("MPTAG_BARRIER");
  2712. bool oks = send(mb,dst,MPTAG_BARRIER,120000);
  2713. mb.clear();
  2714. bool okr = recv(mb,src,MPTAG_BARRIER,&r);
  2715. if (!oks && !okr)
  2716. {
  2717. DBGLOG("MP: barrier: Error sending or recving");
  2718. break;
  2719. }
  2720. mask <<= 1;
  2721. }
  2722. #ifdef _TRACE
  2723. DBGLOG("MP: barrier leave");
  2724. #endif
  2725. }
  2726. bool verifyConnection(rank_t rank, unsigned timeout)
  2727. {
  2728. CriticalBlock block(verifysect);
  2729. assertex(rank!=RANK_RANDOM);
  2730. assertex(rank!=RANK_ALL);
  2731. CTimeMon tm(timeout);
  2732. Owned<CMPChannel> channel = getChannel(rank);
  2733. unsigned remaining;
  2734. if (tm.timedout(&remaining))
  2735. return false;
  2736. return channel->verifyConnection(tm,true);
  2737. }
  2738. bool verifyAll(bool duplex, unsigned totalTimeout, unsigned perConnectionTimeout)
  2739. {
  2740. CriticalBlock block(verifysect);
  2741. CTimeMon totalTM(totalTimeout);
  2742. Semaphore sem;
  2743. sem.signal(getAffinityCpus());
  2744. std::atomic<bool> abort{false};
  2745. auto verifyConnWithConnect = [&](unsigned rank, unsigned timeout)
  2746. {
  2747. CTimeMon tm(timeout);
  2748. Owned<CMPChannel> channel = getChannel(rank);
  2749. return channel->verifyConnection(tm, true);
  2750. };
  2751. auto verifyConnWithoutConnect = [&](unsigned rank, unsigned timeout)
  2752. {
  2753. CTimeMon tm(timeout);
  2754. while (true)
  2755. {
  2756. Owned<CMPChannel> channel = getChannel(rank);
  2757. if (channel->verifyConnection(tm, false))
  2758. return true;
  2759. if (abort || tm.timedout())
  2760. return false;
  2761. Sleep(100);
  2762. }
  2763. };
  2764. auto threadedVerifyConnectFunc = [&](rank_t rank, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
  2765. {
  2766. // NB: running because took (via wait()) a semaphore slot, restore it at end of scope
  2767. struct RestoreSlot
  2768. {
  2769. Semaphore &sem;
  2770. RestoreSlot(Semaphore &_sem) : sem(_sem) { }
  2771. ~RestoreSlot() { sem.signal(); }
  2772. } restoreSlot(sem);
  2773. unsigned timeoutMs;
  2774. if (totalTM.timedout(&timeoutMs) || abort)
  2775. return false;
  2776. if (perConnectionTimeout && (perConnectionTimeout < timeoutMs))
  2777. timeoutMs = perConnectionTimeout;
  2778. if (!connectFunc(rank, timeoutMs))
  2779. {
  2780. abort = true; // ensure verifyFunc knows before release slot, to prevent other thread being launched
  2781. return false;
  2782. }
  2783. return true;
  2784. };
  2785. auto verifyFunc = [&](std::function<bool (unsigned rank)> isRankToVerifyFunc, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
  2786. {
  2787. std::vector<std::future<bool>> results;
  2788. for (rank_t rank=0; rank<group->ordinality(); rank++)
  2789. {
  2790. if (isRankToVerifyFunc(rank))
  2791. {
  2792. // check timeout before and after sem.wait
  2793. // NB: sem.wait if successful, takes a slot which is restored by the thread when it is done
  2794. unsigned remaining;
  2795. if (totalTM.timedout(&remaining) || !sem.wait(remaining) || totalTM.timedout(&remaining))
  2796. {
  2797. abort = true;
  2798. break;
  2799. }
  2800. else if (abort)
  2801. break;
  2802. results.push_back(std::async(std::launch::async, threadedVerifyConnectFunc, rank, connectFunc));
  2803. }
  2804. }
  2805. bool res = true;
  2806. for (auto &f: results)
  2807. {
  2808. if (!f.get())
  2809. res = false;
  2810. }
  2811. return res && !abort;
  2812. };
  2813. if (duplex)
  2814. return verifyFunc([this](rank_t rank) { return rank != myrank; }, verifyConnWithConnect);
  2815. else
  2816. {
  2817. if (!verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank > rank) : (myrank < rank); }, verifyConnWithConnect))
  2818. return false;
  2819. return verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank < rank) : (myrank > rank); }, verifyConnWithoutConnect);
  2820. }
  2821. }
  2822. unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
  2823. {
  2824. assertex(srcrank!=RANK_NULL);
  2825. SocketEndpoint res;
  2826. CTimeMon tm(timeout);
  2827. unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
  2828. if (ret!=0) {
  2829. if (sender)
  2830. *sender = group->rank(res);
  2831. return ret;
  2832. }
  2833. if (sender)
  2834. *sender = RANK_NULL;
  2835. return 0;
  2836. }
  2837. bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
  2838. {
  2839. assertex(srcrank!=RANK_NULL);
  2840. const SocketEndpoint *srcep=NULL;
  2841. if (srcrank==RANK_ALL) {
  2842. if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
  2843. srcep = &queryEndpoint(0);
  2844. }
  2845. else
  2846. srcep = &queryEndpoint(srcrank);
  2847. CTimeMon tm(timeout);
  2848. for (;;)
  2849. {
  2850. try
  2851. {
  2852. if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
  2853. {
  2854. if (sender)
  2855. *sender = group->rank(mbuf.getSender());
  2856. return true;
  2857. }
  2858. if (sender)
  2859. *sender = RANK_NULL;
  2860. return false;
  2861. }
  2862. catch (IMP_Exception *e)
  2863. {
  2864. if (MPERR_link_closed != e->errorCode())
  2865. throw;
  2866. const SocketEndpoint &ep = e->queryEndpoint();
  2867. if (RANK_NULL != group->rank(ep))
  2868. throw;
  2869. // ignoring closed endpoint from outside the communicator group
  2870. e->Release();
  2871. // loop around and recv again
  2872. }
  2873. }
  2874. }
  2875. void flush(mptag_t tag)
  2876. {
  2877. parent->flush(tag);
  2878. }
  2879. IGroup &queryGroup() { return *group; }
  2880. IGroup *getGroup() { return LINK(group); }
  2881. bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
  2882. {
  2883. assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
  2884. if (sendrank==RANK_RANDOM) {
  2885. if (group->ordinality()>1) {
  2886. do {
  2887. sendrank = getRandom()%group->ordinality();
  2888. } while (sendrank==myrank);
  2889. }
  2890. else {
  2891. assertex(myrank!=0);
  2892. sendrank = 0;
  2893. }
  2894. }
  2895. mptag_t replytag = parent->createReplyTag();
  2896. CTimeMon tm(timeout);
  2897. mbuff.setReplyTag(replytag);
  2898. unsigned remaining;
  2899. if (tm.timedout(&remaining))
  2900. return false;
  2901. if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
  2902. return false;
  2903. mbuff.clear();
  2904. return recv(mbuff,sendrank,replytag,NULL,remaining);
  2905. }
  2906. bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
  2907. {
  2908. mptag_t replytag = mbuf.getReplyTag();
  2909. rank_t dstrank = group->rank(mbuf.getSender());
  2910. if (dstrank!=RANK_NULL) {
  2911. if (send (mbuf, dstrank, replytag,timeout)) {
  2912. mbuf.setReplyTag(TAG_NULL);
  2913. return true;
  2914. }
  2915. return false;
  2916. }
  2917. CTimeMon tm(timeout);
  2918. Owned<CMPChannel> channel = parent->lookup(mbuf.getSender());
  2919. unsigned remaining;
  2920. if (tm.timedout(&remaining)) {
  2921. return false;
  2922. }
  2923. if (channel->send(mbuf,replytag,TAG_NULL,tm, true)) {
  2924. mbuf.setReplyTag(TAG_NULL);
  2925. return true;
  2926. }
  2927. return false;
  2928. }
  2929. void cancel(rank_t srcrank, mptag_t tag)
  2930. {
  2931. assertex(srcrank!=RANK_NULL);
  2932. parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
  2933. }
  2934. void disconnect(INode *node)
  2935. {
  2936. CriticalBlock block(verifysect);
  2937. Owned<CMPChannel> channel = parent->lookup(node->endpoint());
  2938. channel->closeSocket();
  2939. parent->removeChannel(channel);
  2940. }
  2941. virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) const override
  2942. {
  2943. Owned<CMPChannel> channel = parent->lookup(sender);
  2944. assertex(channel);
  2945. return channel->queryPeerEp();
  2946. }
  2947. CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
  2948. {
  2949. outer = _outer;
  2950. parent = _parent;
  2951. group = LINK(_group);
  2952. myrank = group->rank(parent->queryMyNode());
  2953. }
  2954. ~CCommunicator()
  2955. {
  2956. group->Release();
  2957. }
  2958. };
  2959. // Additional CMPServer methods
  2960. ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
  2961. {
  2962. return new CCommunicator(this,group,outer);
  2963. }
  2964. ///////////////////////////////////
  2965. IMPServer *startNewMPServer(unsigned port, bool listen)
  2966. {
  2967. assertex(sizeof(PacketHeader)==32);
  2968. CMPServer *mpServer = new CMPServer(0, port, listen);
  2969. mpServer->start();
  2970. return mpServer;
  2971. }
  2972. class CGlobalMPServer : public CMPServer
  2973. {
  2974. int nestLevel;
  2975. bool paused;
  2976. IInterCommunicator *worldcomm;
  2977. public:
  2978. static CriticalSection sect;
  2979. CGlobalMPServer(unsigned __int64 _role, unsigned _port, bool _listen) : CMPServer(_role, _port, _listen)
  2980. {
  2981. worldcomm = NULL;
  2982. nestLevel = 0;
  2983. }
  2984. ~CGlobalMPServer()
  2985. {
  2986. ::Release(worldcomm);
  2987. worldcomm = nullptr;
  2988. }
  2989. IInterCommunicator &queryWorldCommunicator()
  2990. {
  2991. if (!worldcomm)
  2992. worldcomm = new CInterCommunicator(this);
  2993. return *worldcomm;
  2994. }
  2995. unsigned incNest() { return ++nestLevel; }
  2996. unsigned decNest() { return --nestLevel; }
  2997. unsigned queryNest() { return nestLevel; }
  2998. bool isPaused() const { return paused; }
  2999. void setPaused(bool onOff) { paused = onOff; }
  3000. };
  3001. CriticalSection CGlobalMPServer::sect;
  3002. static CGlobalMPServer *globalMPServer;
  3003. MODULE_INIT(INIT_PRIORITY_STANDARD)
  3004. {
  3005. globalMPServer = NULL;
  3006. return true;
  3007. }
  3008. MODULE_EXIT()
  3009. {
  3010. CGlobalMPServer * savedMPServer = globalMPServer;
  3011. globalMPServer = nullptr;
  3012. ::Release(savedMPServer);
  3013. }
  3014. void startMPServer(unsigned __int64 role, unsigned port, bool paused, bool listen)
  3015. {
  3016. assertex(sizeof(PacketHeader)==32);
  3017. CriticalBlock block(CGlobalMPServer::sect);
  3018. if (NULL == globalMPServer)
  3019. {
  3020. globalMPServer = new CGlobalMPServer(role, port, listen);
  3021. initMyNode(globalMPServer->getPort());
  3022. }
  3023. if (0 == globalMPServer->queryNest())
  3024. {
  3025. if (paused)
  3026. {
  3027. globalMPServer->setPaused(paused);
  3028. return;
  3029. }
  3030. queryLogMsgManager()->setPort(globalMPServer->getPort());
  3031. globalMPServer->start();
  3032. globalMPServer->setPaused(false);
  3033. }
  3034. globalMPServer->incNest();
  3035. }
  3036. void startMPServer(unsigned port, bool paused, bool listen)
  3037. {
  3038. startMPServer(0, port, paused, listen);
  3039. }
  3040. void stopMPServer()
  3041. {
  3042. CGlobalMPServer *_globalMPServer = NULL;
  3043. {
  3044. CriticalBlock block(CGlobalMPServer::sect);
  3045. if (NULL == globalMPServer)
  3046. return;
  3047. if (0 == globalMPServer->decNest())
  3048. {
  3049. stopLogMsgReceivers();
  3050. #ifdef _TRACE
  3051. LOG(MCdebugInfo, unknownJob, "MP: Stopping MP Server");
  3052. #endif
  3053. _globalMPServer = globalMPServer;
  3054. globalMPServer = NULL;
  3055. }
  3056. }
  3057. if (NULL == _globalMPServer)
  3058. return;
  3059. _globalMPServer->stop();
  3060. _globalMPServer->Release();
  3061. #ifdef _TRACE
  3062. LOG(MCdebugInfo, unknownJob, "MP: Stopped MP Server");
  3063. #endif
  3064. CriticalBlock block(CGlobalMPServer::sect);
  3065. initMyNode(0);
  3066. }
  3067. bool hasMPServerStarted()
  3068. {
  3069. CriticalBlock block(CGlobalMPServer::sect);
  3070. return globalMPServer != NULL;
  3071. }
  3072. IInterCommunicator &queryWorldCommunicator()
  3073. {
  3074. CriticalBlock block(CGlobalMPServer::sect);
  3075. assertex(globalMPServer);
  3076. return globalMPServer->queryWorldCommunicator();
  3077. }
  3078. mptag_t createReplyTag()
  3079. {
  3080. assertex(globalMPServer);
  3081. return globalMPServer->createReplyTag();
  3082. }
  3083. ICommunicator *createCommunicator(IGroup *group, bool outer)
  3084. {
  3085. assertex(globalMPServer);
  3086. return globalMPServer->createCommunicator(group, outer);
  3087. }
  3088. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  3089. {
  3090. CriticalBlock block(CGlobalMPServer::sect);
  3091. if (globalMPServer)
  3092. globalMPServer->getReceiveQueueDetails(buf);
  3093. return buf;
  3094. }
  3095. void addMPConnectionMonitor(IConnectionMonitor *monitor)
  3096. {
  3097. CriticalBlock block(CGlobalMPServer::sect);
  3098. assertex(globalMPServer);
  3099. globalMPServer->addConnectionMonitor(monitor);
  3100. }
  3101. void removeMPConnectionMonitor(IConnectionMonitor *monitor)
  3102. {
  3103. CriticalBlock block(CGlobalMPServer::sect);
  3104. if (globalMPServer)
  3105. globalMPServer->removeConnectionMonitor(monitor);
  3106. }
  3107. IMPServer *getMPServer()
  3108. {
  3109. CriticalBlock block(CGlobalMPServer::sect);
  3110. assertex(globalMPServer);
  3111. return LINK(globalMPServer);
  3112. }
  3113. void registerSelfDestructChildProcess(HANDLE handle)
  3114. {
  3115. CriticalBlock block(childprocesssect);
  3116. if (handle!=(HANDLE)-1)
  3117. childprocesslist.append((unsigned)handle);
  3118. }
  3119. void unregisterSelfDestructChildProcess(HANDLE handle)
  3120. {
  3121. CriticalBlock block(childprocesssect);
  3122. if (handle!=(HANDLE)-1)
  3123. childprocesslist.zap((unsigned)handle);
  3124. }