mpcomm.cpp 80 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl __declspec(dllexport)
  14. /* TBD
  15. lost packet disposal
  16. synchronous send
  17. connection protocol (HRPC);
  18. look at all timeouts
  19. */
  20. #include "platform.h"
  21. #include "portlist.h"
  22. #include "jlib.hpp"
  23. #include <limits.h>
  24. #include "jsocket.hpp"
  25. #include "jmutex.hpp"
  26. #include "jutil.hpp"
  27. #include "jthread.hpp"
  28. #include "jqueue.tpp"
  29. #include "jsuperhash.hpp"
  30. #include "jmisc.hpp"
  31. #include "mpcomm.hpp"
  32. #include "mpbuff.hpp"
  33. #include "mputil.hpp"
  34. #include "mplog.hpp"
  35. #ifdef _MSC_VER
  36. #pragma warning (disable : 4355)
  37. #endif
  38. //#define _TRACE
  39. //#define _FULLTRACE
  40. #define _TRACEORPHANS
  41. #define REFUSE_STALE_CONNECTION
  42. #define MP_PROTOCOL_VERSION 0x102
  43. #define MP_PROTOCOL_VERSIONV6 0x202 // extended for IPV6
  44. #define CANCELTIMEOUT 1000 // 1 sec
  45. #define CONNECT_TIMEOUT (5*60*1000) // 5 mins
  46. #define CONNECT_READ_TIMEOUT (3*60*1000) // 3 mins
  47. #define CONNECT_TIMEOUT_INTERVAL 5000 // 5 secs
  48. #define CONFIRM_TIMEOUT (CONNECT_READ_TIMEOUT/2) // 1.5 mins
  49. #define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
  50. #define CONFIRM_TRACESLOW_THRESHOLD 1000 // 1 sec
  51. #define VERIFY_DELAY (1*60*1000) // 1 Minute
  52. #define VERIFY_TIMEOUT (1*60*1000) // 1 Minute
  53. #define _TRACING
  54. static CriticalSection verifysect;
  55. static CriticalSection childprocesssect;
  56. static UnsignedArray childprocesslist;
  57. // IPv6 TBD
  58. struct SocketEndpointV4
  59. {
  60. byte ip[4];
  61. unsigned short port;
  62. SocketEndpointV4() {};
  63. SocketEndpointV4(const SocketEndpoint &val) { set(val); }
  64. void set(const SocketEndpoint &val)
  65. {
  66. port = val.port;
  67. if (val.getNetAddress(sizeof(ip),&ip)!=sizeof(ip))
  68. IPV6_NOT_IMPLEMENTED();
  69. }
  70. void get(SocketEndpoint &val)
  71. {
  72. val.setNetAddress(sizeof(ip),&ip);
  73. val.port = port;
  74. }
  75. };
  76. class PacketHeader // standard packet header - no virtuals
  77. {
  78. public:
  79. static unsigned nextseq;
  80. static unsigned lasttick;
  81. void initseq()
  82. {
  83. sequence = msTick();
  84. lasttick = sequence;
  85. if (sequence-nextseq>USHRT_MAX)
  86. sequence = nextseq++;
  87. else
  88. nextseq = sequence+1;
  89. }
  90. PacketHeader(size32_t _size, SocketEndpoint &_sender, SocketEndpoint &_target, mptag_t _tag, mptag_t _replytag)
  91. {
  92. size = _size;
  93. tag = _tag;
  94. sender.set(_sender);
  95. target.set(_target);
  96. replytag = _replytag;
  97. flags = 0;
  98. version = MP_PROTOCOL_VERSION;
  99. initseq();
  100. }
  101. PacketHeader() {}
  102. size32_t size; // 0 total packet size
  103. mptag_t tag; // 4 packet tag (kind)
  104. unsigned short version; // 8 protocol version
  105. unsigned short flags; // 10 flags
  106. SocketEndpointV4 sender; // 12 who sent
  107. SocketEndpointV4 target; // 18 who destined for
  108. mptag_t replytag; // 24 used for reply
  109. unsigned sequence; // 28 packet type dependant
  110. // Total 32
  111. void setMessageFields(CMessageBuffer &mb)
  112. {
  113. SocketEndpoint ep;
  114. sender.get(ep);
  115. mb.init(ep,tag,replytag);
  116. }
  117. };
  118. class PacketHeaderV6 : public PacketHeader
  119. {
  120. unsigned senderex[4]; // 32
  121. unsigned targetex[4]; // 48
  122. // total 64
  123. void setMessageFields(CMessageBuffer &mb)
  124. {
  125. SocketEndpoint ep;
  126. ep.setNetAddress(sizeof(senderex),&senderex);
  127. ep.port = sender.port;
  128. mb.init(ep,tag,replytag);
  129. }
  130. };
  131. unsigned PacketHeader::nextseq=0;
  132. unsigned PacketHeader::lasttick=0;
  133. #define MINIMUMPACKETSIZE sizeof(PacketHeader)
  134. #define MAXDATAPERPACKET 50000
  135. struct MultiPacketHeader
  136. {
  137. mptag_t tag;
  138. size32_t ofs;
  139. size32_t size;
  140. unsigned idx;
  141. unsigned numparts;
  142. size32_t total;
  143. StringBuffer &getDetails(StringBuffer &out) const
  144. {
  145. out.append("MultiPacketHeader: ");
  146. out.append("tag=").append((unsigned)tag);
  147. out.append(",ofs=").append(ofs);
  148. out.append(",size=").append(size);
  149. out.append(",idx=").append(idx);
  150. out.append(",numparts=").append(numparts);
  151. out.append(",total=").append(total);
  152. return out;
  153. }
  154. };
  155. //
  156. class CMPException: public CInterface, public IMP_Exception
  157. {
  158. public:
  159. IMPLEMENT_IINTERFACE;
  160. CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
  161. {
  162. }
  163. StringBuffer & errorMessage(StringBuffer &str) const
  164. {
  165. StringBuffer tmp;
  166. switch (error) {
  167. case MPERR_ok: str.append("OK"); break;
  168. case MPERR_connection_failed: str.appendf("MP connect failed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
  169. case MPERR_process_not_in_group: str.appendf("Current process not in Communicator group"); break;
  170. case MPERR_protocol_version_mismatch: str.appendf("Protocol version mismatch (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
  171. case MPERR_link_closed: str.appendf("MP link closed (%s)",endpoint.getUrlStr(tmp).toCharArray()); break;
  172. }
  173. return str;
  174. }
  175. int errorCode() const { return error; }
  176. MessageAudience errorAudience() const
  177. {
  178. return MSGAUD_user;
  179. }
  180. virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
  181. private:
  182. MessagePassingError error;
  183. SocketEndpoint endpoint;
  184. };
  185. class CBufferQueueNotify
  186. {
  187. public:
  188. virtual bool notify(CMessageBuffer *)=0;
  189. virtual bool notifyClosed(SocketEndpoint &closedep)=0; // called when connection closed
  190. };
  191. class CBufferQueueWaiting
  192. {
  193. public:
  194. enum QWenum { QWcontinue, QWdequeue, QWprobe };
  195. Semaphore sem;
  196. CBufferQueueNotify &waiting;
  197. bool probe;
  198. CBufferQueueWaiting(CBufferQueueNotify& _waiting,bool _probe) : waiting(_waiting) { probe = _probe; }
  199. QWenum notify(CMessageBuffer *b)
  200. {
  201. // check this for DLL unloaded TBD
  202. if (waiting.notify(b)) {
  203. sem.signal();
  204. return probe?QWprobe:QWdequeue;
  205. }
  206. return QWcontinue;
  207. }
  208. QWenum notifyClosed(SocketEndpoint &ep)
  209. {
  210. // check this for DLL unloaded TBD
  211. if (waiting.notifyClosed(ep)) {
  212. sem.signal();
  213. return QWdequeue;
  214. }
  215. return QWcontinue;
  216. }
  217. };
  218. MAKEPointerArray(CBufferQueueWaiting,CWaitingArray);
  219. class CBufferQueue
  220. {
  221. QueueOf<CMessageBuffer, false> received;
  222. CWaitingArray waiting;
  223. CriticalSection sect;
  224. public:
  225. CBufferQueue()
  226. {
  227. }
  228. void enqueue(CMessageBuffer *b)
  229. {
  230. CriticalBlock block(sect);
  231. unsigned iter=0;
  232. loop {
  233. ForEachItemIn(i,waiting) {
  234. CBufferQueueWaiting::QWenum r = waiting.item(i).notify(b);
  235. if (r!=CBufferQueueWaiting::QWcontinue) {
  236. waiting.remove(i);
  237. if (r==CBufferQueueWaiting::QWdequeue)
  238. return;
  239. //CBufferQueueWaiting::QWprobe
  240. break;
  241. }
  242. }
  243. if (b->getReplyTag() != TAG_CANCEL)
  244. break;
  245. if (iter++==10) {
  246. delete b;
  247. return;
  248. }
  249. CriticalUnblock unblock(sect);
  250. Sleep(CANCELTIMEOUT/10); // to avoid race conditions (cancel eventually times out)
  251. }
  252. received.enqueue(b);
  253. }
  254. bool wait(CBufferQueueNotify &nfy,bool probe,CTimeMon &tm)
  255. {
  256. CriticalBlock block(sect);
  257. bool probegot = false;
  258. ForEachQueueItemIn(i,received) {
  259. if (nfy.notify(received.item(i))) {
  260. if (probe) {
  261. probegot = true;
  262. }
  263. else {
  264. received.dequeue(i);
  265. return true;
  266. }
  267. }
  268. }
  269. if (probegot)
  270. return true;
  271. unsigned remaining;
  272. if (tm.timedout(&remaining))
  273. return false;
  274. CBufferQueueWaiting qwaiting(nfy,probe);
  275. waiting.append(qwaiting);
  276. sect.leave();
  277. bool ok = qwaiting.sem.wait(remaining);
  278. sect.enter();
  279. if (!ok) {
  280. ok = qwaiting.sem.wait(0);
  281. if (!ok)
  282. waiting.zap(qwaiting);
  283. }
  284. return ok;
  285. }
  286. unsigned flush(CBufferQueueNotify &nfy)
  287. {
  288. unsigned count = 0;
  289. CriticalBlock block(sect);
  290. ForEachQueueItemInRev(i,received) {
  291. if (nfy.notify(received.item(i))) {
  292. count++;
  293. delete received.dequeue(i);
  294. }
  295. }
  296. return count;
  297. }
  298. void notifyClosed(SocketEndpoint &ep)
  299. {
  300. CriticalBlock block(sect);
  301. ForEachItemInRev(i,waiting) {
  302. CBufferQueueWaiting::QWenum r = waiting.item(i).notifyClosed(ep);
  303. if (r!=CBufferQueueWaiting::QWcontinue) {
  304. waiting.remove(i);
  305. }
  306. }
  307. }
  308. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  309. {
  310. CriticalBlock block(sect);
  311. ForEachQueueItemIn(i,received) {
  312. received.item(i)->getDetails(buf).append('\n');
  313. }
  314. return buf;
  315. }
  316. };
  317. static UnsignedShortArray freetags;
  318. static unsigned nextfreetag=0;
  319. unsigned short generateDynamicTag()
  320. {
  321. if (freetags.ordinality())
  322. return freetags.pop();
  323. return nextfreetag++;
  324. }
  325. void releaseDynamicTag(unsigned short tag)
  326. {
  327. freetags.append(tag);
  328. }
  329. class CMPServer;
  330. class CMPChannel;
  331. class CMPConnectThread: public Thread
  332. {
  333. bool running;
  334. ISocket *listensock;
  335. CMPServer *parent;
  336. void checkSelfDestruct(void *p,size32_t sz);
  337. public:
  338. CMPConnectThread(CMPServer *_parent, unsigned port);
  339. ~CMPConnectThread()
  340. {
  341. ::Release(listensock);
  342. }
  343. int run();
  344. void start(unsigned short port);
  345. void stop()
  346. {
  347. if (running) {
  348. running = false;
  349. listensock->cancel_accept();
  350. if (!join(1000*60*5)) // should be pretty instant
  351. printf("CMPConnectThread::stop timed out\n");
  352. }
  353. }
  354. };
  355. class PingPacketHandler;
  356. class PingReplyPacketHandler;
  357. class MultiPacketHandler;
  358. class BroadcastPacketHandler;
  359. class ForwardPacketHandler;
  360. class UserPacketHandler;
  361. class CMPNotifyClosedThread;
  362. static class CMPServer: private SuperHashTableOf<CMPChannel,SocketEndpoint>
  363. {
  364. unsigned short port;
  365. ISocketSelectHandler *selecthandler;
  366. CMPConnectThread *connectthread;
  367. CBufferQueue receiveq;
  368. CMPNotifyClosedThread *notifyclosedthread;
  369. public:
  370. static CriticalSection serversect;
  371. static int servernest;
  372. static bool serverpaused;
  373. bool checkclosed;
  374. // packet handlers
  375. PingPacketHandler *pingpackethandler; // TAG_SYS_PING
  376. PingReplyPacketHandler *pingreplypackethandler; // TAG_SYS_PING_REPLY
  377. ForwardPacketHandler *forwardpackethandler; // TAG_SYS_FORWARD
  378. MultiPacketHandler *multipackethandler; // TAG_SYS_MULTI
  379. BroadcastPacketHandler *broadcastpackethandler; // TAG_SYS_BCAST
  380. UserPacketHandler *userpackethandler; // default
  381. CMPServer(unsigned _port);
  382. ~CMPServer();
  383. void start();
  384. void stop();
  385. unsigned short getPort() { return port; }
  386. void setPort(unsigned short _port) { port = _port; }
  387. CMPChannel &lookup(const SocketEndpoint &remoteep);
  388. ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
  389. CBufferQueue &getReceiveQ() { return receiveq; }
  390. bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
  391. void flush(mptag_t tag);
  392. unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
  393. void cancel(const SocketEndpoint *ep, mptag_t tag);
  394. bool nextChannel(CMPChannel *&c);
  395. void addConnectionMonitor(IConnectionMonitor *monitor);
  396. void removeConnectionMonitor(IConnectionMonitor *monitor);
  397. void notifyClosed(SocketEndpoint &ep);
  398. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  399. {
  400. return receiveq.getReceiveQueueDetails(buf);
  401. }
  402. void removeChannel(CMPChannel *c) { if (c) removeExact(c); }
  403. protected:
  404. void onAdd(void *);
  405. void onRemove(void *e);
  406. unsigned getHashFromElement(const void *e) const;
  407. unsigned getHashFromFindParam(const void *fp) const;
  408. const void * getFindParam(const void *p) const;
  409. bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const;
  410. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
  411. } *MPserver=NULL;
  412. int CMPServer::servernest=0;
  413. bool CMPServer::serverpaused=false;
  414. CriticalSection CMPServer::serversect;
  415. byte RTsalt=0xff;
  416. mptag_t createReplyTag()
  417. {
  418. // these are short-lived so a simple increment will do (I think this is OK!)
  419. mptag_t ret;
  420. {
  421. static CriticalSection sect;
  422. CriticalBlock block(sect);
  423. static int rettag=(int)TAG_REPLY_BASE; // NB negative
  424. if (RTsalt==0xff) {
  425. RTsalt = (byte)(getRandom()%16);
  426. rettag = (int)TAG_REPLY_BASE-RTsalt;
  427. }
  428. if (rettag>(int)TAG_REPLY_BASE) { // wrapped
  429. rettag = (int)TAG_REPLY_BASE-RTsalt;
  430. }
  431. ret = (mptag_t)rettag;
  432. rettag -= 16;
  433. }
  434. if (MPserver)
  435. MPserver->flush(ret);
  436. return ret;
  437. }
  438. void checkTagOK(mptag_t tag)
  439. {
  440. if ((int)tag<=(int)TAG_REPLY_BASE) {
  441. int dif = (int)TAG_REPLY_BASE-(int)tag;
  442. if (dif%16!=RTsalt) {
  443. ERRLOG("**Invalid MP tag used");
  444. PrintStackReport();
  445. }
  446. }
  447. }
  448. //===========================================================================
  449. class CMPNotifyClosedThread: public Thread
  450. {
  451. IArrayOf<IConnectionMonitor> connectionmonitors;
  452. CriticalSection conmonsect;
  453. SimpleInterThreadQueueOf<INode, false> workq;
  454. bool stopping;
  455. CMPServer *parent;
  456. CriticalSection stopsect;
  457. public:
  458. CMPNotifyClosedThread(CMPServer *_parent)
  459. : Thread("CMPNotifyClosedThread")
  460. {
  461. parent = _parent;
  462. stopping = false;
  463. }
  464. ~CMPNotifyClosedThread()
  465. {
  466. IArrayOf<IConnectionMonitor> todelete;
  467. CriticalBlock block(conmonsect);
  468. while (connectionmonitors.ordinality())
  469. todelete.append(connectionmonitors.popGet());
  470. }
  471. void addConnectionMonitor(IConnectionMonitor *monitor)
  472. {
  473. if (monitor)
  474. connectionmonitors.append(*LINK(monitor));
  475. }
  476. void removeConnectionMonitor(IConnectionMonitor *monitor)
  477. {
  478. // called in critical section CMPServer::sect
  479. if (monitor) {
  480. CriticalBlock block(conmonsect);
  481. connectionmonitors.zap(*monitor);
  482. }
  483. }
  484. int run()
  485. {
  486. loop {
  487. try {
  488. Owned<INode> node = workq.dequeue();
  489. if (node->endpoint().isNull())
  490. break;
  491. SocketEndpoint ep = node->endpoint();
  492. parent->getReceiveQ().notifyClosed(ep);
  493. IArrayOf<IConnectionMonitor> toclose;
  494. {
  495. CriticalBlock block(conmonsect);
  496. ForEachItemIn(i1,connectionmonitors) {
  497. toclose.append(*LINK(&connectionmonitors.item(i1)));
  498. }
  499. }
  500. ForEachItemIn(i,toclose) {
  501. toclose.item(i).onClose(ep);
  502. }
  503. }
  504. catch (IException *e) {
  505. FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
  506. e->Release();
  507. }
  508. }
  509. return 0;
  510. }
  511. void stop()
  512. {
  513. {
  514. CriticalBlock block(stopsect);
  515. if (!stopping) {
  516. stopping = true;
  517. SocketEndpoint ep;
  518. workq.enqueue(createINode(ep));
  519. }
  520. }
  521. while (!join(1000*60*3))
  522. PROGLOG("CMPNotifyClosedThread join failed");
  523. }
  524. void notify(SocketEndpoint &ep)
  525. {
  526. CriticalBlock block(stopsect);
  527. if (!stopping&&!ep.isNull()) {
  528. if (workq.ordinality()>100)
  529. PROGLOG("MP: %d waiting to close",workq.ordinality());
  530. workq.enqueue(createINode(ep));
  531. }
  532. }
  533. };
  534. void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
  535. {
  536. dbgassertex(timeoutChkIntervalMs < timeoutMs);
  537. StringBuffer epStr;
  538. CCycleTimer readTmsTimer;
  539. unsigned intervalTimeoutMs = timeoutChkIntervalMs;
  540. loop
  541. {
  542. try
  543. {
  544. sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
  545. break;
  546. }
  547. catch (IJSOCK_Exception *e)
  548. {
  549. if (JSOCKERR_timeout_expired != e->errorCode())
  550. throw;
  551. unsigned elapsedMs = readTmsTimer.elapsedMs();
  552. if (elapsedMs >= timeoutMs)
  553. throw;
  554. unsigned remainingMs = timeoutMs-elapsedMs;
  555. if (remainingMs < timeoutChkIntervalMs)
  556. intervalTimeoutMs = remainingMs;
  557. if (0 == epStr.length())
  558. {
  559. SocketEndpoint ep;
  560. sock->getPeerEndpoint(ep);
  561. ep.getUrlStr(epStr);
  562. }
  563. WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
  564. }
  565. }
  566. if (readTmsTimer.elapsedMs() >= CONFIRM_TRACESLOW_THRESHOLD)
  567. {
  568. if (0 == epStr.length())
  569. {
  570. SocketEndpoint ep;
  571. sock->getPeerEndpoint(ep);
  572. ep.getUrlStr(epStr);
  573. }
  574. WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
  575. }
  576. }
  577. class CMPPacketReader;
  578. class CMPChannel: public CInterface
  579. {
  580. ISocket *channelsock;
  581. CMPServer *parent;
  582. Mutex sendmutex;
  583. Semaphore sendwaitingsig;
  584. unsigned sendwaiting; // number waiting on sendwaitingsem (for multi/single clashes to resolve)
  585. CriticalSection connectsect;
  586. CMPPacketReader *reader;
  587. bool master; // i.e. connected originally
  588. mptag_t multitag; // current multi send in progress
  589. bool closed;
  590. IArrayOf<ISocket> keptsockets;
  591. protected: friend class CMPServer;
  592. SocketEndpoint remoteep;
  593. SocketEndpoint localep; // who the other end thinks I am
  594. protected: friend class CMPPacketReader;
  595. unsigned lastxfer;
  596. #ifdef _FULLTRACE
  597. unsigned startxfer;
  598. unsigned numiter;
  599. #endif
  600. bool connect(CTimeMon &tm)
  601. {
  602. // must be called from connectsect
  603. // also in sendmutex
  604. ISocket *newsock=NULL;
  605. unsigned retrycount = 10;
  606. unsigned remaining;
  607. while (!channelsock) {
  608. try {
  609. StringBuffer str;
  610. #ifdef _TRACE
  611. LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).toCharArray());
  612. #endif
  613. if (((int)tm.timeout)<0)
  614. remaining = CONNECT_TIMEOUT;
  615. else if (tm.timedout(&remaining)) {
  616. #ifdef _FULLTRACE
  617. PROGLOG("MP: connect timed out 1");
  618. #endif
  619. return false;
  620. }
  621. if (remaining<10000)
  622. remaining = 10000; // 10s min granularity for MP
  623. newsock = ISocket::connect_timeout(remoteep,remaining);
  624. newsock->set_keep_alive(true);
  625. #ifdef _FULLTRACE
  626. LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect");
  627. #endif
  628. SocketEndpointV4 id[2];
  629. SocketEndpoint hostep;
  630. hostep.setLocalHost(parent->getPort());
  631. id[0].set(hostep);
  632. id[1].set(remoteep);
  633. newsock->write(&id[0],sizeof(id));
  634. #ifdef _FULLTRACE
  635. StringBuffer tmp1;
  636. id[0].getUrlStr(tmp1);
  637. tmp1.append(' ');
  638. id[1].getUrlStr(tmp1);
  639. LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
  640. #endif
  641. size32_t reply;
  642. size32_t rd;
  643. traceSlowReadTms("MP: connect to", newsock, &reply, sizeof(reply), sizeof(reply), rd, CONNECT_READ_TIMEOUT, CONNECT_TIMEOUT_INTERVAL);
  644. #ifdef _FULLTRACE
  645. LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
  646. #endif
  647. if (reply!=0) {
  648. assertex(reply==sizeof(id)); // how can this fail?
  649. if (attachSocket(newsock,remoteep,hostep,true, NULL)) {
  650. newsock->Release();
  651. #ifdef _TRACE
  652. LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.toCharArray());
  653. #endif
  654. lastxfer = msTick();
  655. closed = false;
  656. break;
  657. }
  658. }
  659. }
  660. catch (IException *e)
  661. {
  662. if (tm.timedout(&remaining)) {
  663. #ifdef _FULLTRACE
  664. EXCLOG(e,"MP: connect timed out 2");
  665. #endif
  666. e->Release();
  667. return false;
  668. }
  669. StringBuffer str;
  670. #ifdef _TRACE
  671. EXCLOG(e, "MP: Failed to connect");
  672. #endif
  673. e->Release();
  674. if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND)) { // don't bother retrying on async send
  675. IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
  676. throw e;
  677. }
  678. #ifdef _TRACE
  679. str.clear();
  680. LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).toCharArray(),retrycount+1);
  681. #endif
  682. }
  683. ::Release(newsock);
  684. newsock = NULL;
  685. {
  686. CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
  687. #ifdef _FULLTRACE
  688. PROGLOG("MP: before sleep");
  689. #endif
  690. Sleep(2000+getRandom()%3000);
  691. #ifdef _FULLTRACE
  692. PROGLOG("MP: after sleep");
  693. #endif
  694. }
  695. }
  696. return true;
  697. }
  698. public:
  699. Semaphore pingsem;
  700. CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep);
  701. ~CMPChannel();
  702. void reset();
  703. bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm);
  704. bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
  705. {
  706. Linked<ISocket> dest;
  707. {
  708. CriticalBlock block(connectsect);
  709. if (closed) {
  710. #ifdef _FULLTRACE
  711. LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
  712. PrintStackReport();
  713. #endif
  714. IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
  715. throw e;
  716. }
  717. if (!channelsock) {
  718. if (!connect(tm)) {
  719. #ifdef _FULLTRACE
  720. LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
  721. #endif
  722. return false;
  723. }
  724. }
  725. dest.set(channelsock);
  726. }
  727. try {
  728. #ifdef _FULLTRACE
  729. unsigned t1 = msTick();
  730. #endif
  731. if ((tm.timeout!=MP_ASYNC_SEND)&&(tm.timeout!=MP_WAIT_FOREVER)) {
  732. // if (tm.timeout!=MP_ASYNC_SEND) {
  733. unsigned remaining;
  734. if (tm.timedout(&remaining))
  735. return false;
  736. if (channelsock->wait_write(remaining)==0) {
  737. return false;
  738. }
  739. if (tm.timedout())
  740. return false;
  741. }
  742. // exception checking TBD
  743. #ifdef _FULLTRACE
  744. StringBuffer ep1;
  745. StringBuffer ep2;
  746. LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
  747. unsigned t2 = msTick();
  748. #endif
  749. unsigned n = 0;
  750. const void *bufs[3];
  751. size32_t sizes[3];
  752. if (hdrsize) {
  753. bufs[n] = hdr;
  754. sizes[n++] = hdrsize;
  755. }
  756. if (hdr2size) {
  757. bufs[n] = hdr2;
  758. sizes[n++] = hdr2size;
  759. }
  760. if (bodysize) {
  761. bufs[n] = body;
  762. sizes[n++] = bodysize;
  763. }
  764. if (!dest) {
  765. LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
  766. return false;
  767. }
  768. dest->write_multiple(n,bufs,sizes);
  769. lastxfer = msTick();
  770. #ifdef _FULLTRACE
  771. LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
  772. #endif
  773. }
  774. catch (IException *e) {
  775. FLLOG(MCoperatorWarning, unknownJob, e,"MP writepacket");
  776. closeSocket();
  777. throw;
  778. }
  779. return true;
  780. }
  781. bool writepacket(const void *hdr,size32_t hdrsize,const void *body,size32_t bodysize,CTimeMon &tm)
  782. {
  783. return writepacket(hdr,hdrsize,NULL,0,body,bodysize,tm);
  784. }
  785. bool writepacket(const void *hdr,size32_t hdrsize,CTimeMon &tm)
  786. {
  787. return writepacket(hdr,hdrsize,NULL,0,NULL,0,tm);
  788. }
  789. bool sendPing(CTimeMon &tm);
  790. bool sendPingReply(unsigned timeout,bool identifyself);
  791. bool verifyConnection(CTimeMon &tm,bool allowconnect)
  792. {
  793. {
  794. CriticalBlock block(connectsect);
  795. if (!channelsock&&allowconnect)
  796. return connect(tm);
  797. if (closed||!channelsock)
  798. return false;
  799. if ((msTick()-lastxfer)<VERIFY_DELAY)
  800. return true;
  801. }
  802. StringBuffer ep;
  803. remoteep.getUrlStr(ep);
  804. loop {
  805. CTimeMon pingtm(1000*60);
  806. if (sendPing(pingtm))
  807. break;
  808. {
  809. CriticalBlock block(connectsect);
  810. if (closed||!channelsock)
  811. return false;
  812. }
  813. if (tm.timedout()) {
  814. LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
  815. closeSocket();
  816. return false;
  817. }
  818. LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
  819. unsigned remaining;
  820. if (!pingtm.timedout(&remaining)&&remaining)
  821. Sleep(remaining);
  822. }
  823. return true;
  824. }
  825. bool send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply);
  826. void closeSocket(bool keepsocket=false)
  827. {
  828. ISocket *s;
  829. bool socketfailed = false;
  830. {
  831. CriticalBlock block(connectsect);
  832. if (!channelsock)
  833. return;
  834. lastxfer = msTick();
  835. closed = true;
  836. if (parent)
  837. parent->checkclosed = true;
  838. s=channelsock;
  839. channelsock = NULL;
  840. if (!keepsocket) {
  841. try {
  842. s->shutdown();
  843. }
  844. catch (IException *) {
  845. socketfailed = true; // ignore if the socket has been closed
  846. }
  847. }
  848. parent->querySelectHandler().remove(s);
  849. }
  850. parent->notifyClosed(remoteep);
  851. if (socketfailed) {
  852. try {
  853. s->Release();
  854. }
  855. catch (IException *) {
  856. // ignore
  857. }
  858. }
  859. else if (keepsocket) {
  860. // hopefully shouldn't get too many of these! (this is a kludge to prevent closing off wrong socket)
  861. if (keptsockets.ordinality()>10)
  862. keptsockets.remove(0);
  863. keptsockets.append(*s);
  864. }
  865. else {
  866. try {
  867. s->close();
  868. }
  869. catch (IException *) {
  870. socketfailed = true; // ignore if the socket has been closed
  871. }
  872. s->Release();
  873. }
  874. }
  875. CMPServer &queryServer() { return *parent; }
  876. void monitorCheck();
  877. StringBuffer & queryEpStr(StringBuffer &s)
  878. {
  879. return remoteep.getUrlStr(s);
  880. }
  881. bool isClosed()
  882. {
  883. return closed;
  884. }
  885. bool isConnected()
  886. {
  887. return !closed&&(channelsock!=NULL);
  888. }
  889. };
  890. // Message Handlers (not done as interfaces for speed reasons
  891. class UserPacketHandler // default
  892. {
  893. CMPServer *server;
  894. public:
  895. UserPacketHandler(CMPServer *_server)
  896. {
  897. server = _server;
  898. }
  899. void handle(CMessageBuffer *msg) // takes ownership of message buffer
  900. {
  901. server->getReceiveQ().enqueue(msg);
  902. }
  903. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm)
  904. {
  905. #ifdef _FULLTRACE
  906. StringBuffer ep1;
  907. StringBuffer ep2;
  908. LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  909. #endif
  910. return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
  911. }
  912. };
  913. class PingPacketHandler // TAG_SYS_PING
  914. {
  915. public:
  916. void handle(CMPChannel *channel,bool identifyself)
  917. {
  918. channel->sendPingReply(CONFIRM_TIMEOUT,identifyself);
  919. }
  920. bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm)
  921. {
  922. return channel->writepacket(&hdr,sizeof(hdr),tm);
  923. }
  924. };
  925. class PingReplyPacketHandler // TAG_SYS_PING_REPLY
  926. {
  927. public:
  928. void handle(CMPChannel *channel)
  929. {
  930. channel->pingsem.signal();
  931. }
  932. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb,CTimeMon &tm)
  933. {
  934. return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
  935. }
  936. };
  937. class CMultiPacketReceiver: public CInterface
  938. { // assume each sender only sends one multi-message per channel
  939. public:
  940. SocketEndpoint sender;
  941. MultiPacketHeader info;
  942. CMessageBuffer *msg;
  943. byte * ptr;
  944. };
  945. class MultiPacketHandler // TAG_SYS_MULTI
  946. {
  947. CIArrayOf<CMultiPacketReceiver> inprogress; // should be ok as not many in progress hopefully (TBD orphans)
  948. CriticalSection sect;
  949. unsigned lastErrMs;
  950. void logError(unsigned code, MultiPacketHeader &mhdr, CMessageBuffer &msg, MultiPacketHeader *otherMhdr)
  951. {
  952. unsigned ms = msTick();
  953. if ((ms-lastErrMs) > 1000) // avoid logging too much
  954. {
  955. StringBuffer errorMsg("sender=");
  956. msg.getSender().getUrlStr(errorMsg).newline();
  957. msg.append("This header: ");
  958. mhdr.getDetails(errorMsg).newline();
  959. if (otherMhdr)
  960. {
  961. msg.append("Other header: ");
  962. otherMhdr->getDetails(errorMsg).newline();
  963. }
  964. msg.getDetails(errorMsg);
  965. LOG(MCerror, unknownJob, "MultiPacketHandler: protocol error (%d) %s", code, errorMsg.str());
  966. }
  967. lastErrMs = ms;
  968. }
  969. public:
  970. MultiPacketHandler() : lastErrMs(0)
  971. {
  972. }
  973. CMessageBuffer *handle(CMessageBuffer * msg)
  974. {
  975. if (!msg)
  976. return NULL;
  977. CriticalBlock block(sect);
  978. MultiPacketHeader mhdr;
  979. msg->read(sizeof(mhdr),&mhdr);
  980. CMultiPacketReceiver *recv=NULL;
  981. ForEachItemIn(i,inprogress) {
  982. CMultiPacketReceiver &mpr = inprogress.item(i);
  983. if ((mpr.info.tag==mhdr.tag)&&mpr.sender.equals(msg->getSender())) {
  984. recv = &mpr;
  985. break;
  986. }
  987. }
  988. if (mhdr.idx==0) {
  989. if ((mhdr.ofs!=0)||(recv!=NULL)) {
  990. logError(1, mhdr, *msg, recv?&recv->info:NULL);
  991. delete msg;
  992. return NULL;
  993. }
  994. recv = new CMultiPacketReceiver;
  995. recv->msg = new CMessageBuffer();
  996. recv->msg->init(msg->getSender(),mhdr.tag,msg->getReplyTag());
  997. recv->ptr = (byte *)recv->msg->reserveTruncate(mhdr.total);
  998. recv->sender = msg->getSender();
  999. recv->info = mhdr;
  1000. inprogress.append(*recv);
  1001. }
  1002. else {
  1003. if ((recv==NULL)||(mhdr.ofs==0)||
  1004. (recv->info.ofs+recv->info.size!=mhdr.ofs)||
  1005. (recv->info.idx+1!=mhdr.idx)||
  1006. (recv->info.total!=mhdr.total)||
  1007. (mhdr.ofs+mhdr.size>mhdr.total)) {
  1008. logError(2, mhdr, *msg, recv?&recv->info:NULL);
  1009. delete msg;
  1010. return NULL;
  1011. }
  1012. }
  1013. msg->read(mhdr.size,recv->ptr+mhdr.ofs);
  1014. delete msg;
  1015. msg = NULL;
  1016. recv->info = mhdr;
  1017. if (mhdr.idx+1==mhdr.numparts) {
  1018. if (mhdr.ofs+mhdr.size!=mhdr.total) {
  1019. logError(3, mhdr, *msg, NULL);
  1020. return NULL;
  1021. }
  1022. msg = recv->msg;
  1023. inprogress.remove(i);
  1024. }
  1025. return msg;
  1026. }
  1027. bool send(CMPChannel *channel,PacketHeader &hdr,MemoryBuffer &mb, CTimeMon &tm, Mutex &sendmutex)
  1028. {
  1029. // must not adjust mb
  1030. #ifdef _FULLTRACE
  1031. StringBuffer ep1;
  1032. StringBuffer ep2;
  1033. LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  1034. #endif
  1035. PacketHeader outhdr;
  1036. outhdr = hdr;
  1037. outhdr.tag = TAG_SYS_MULTI;
  1038. MultiPacketHeader mhdr;
  1039. mhdr.total = hdr.size-sizeof(hdr);
  1040. mhdr.numparts = (mhdr.total+MAXDATAPERPACKET-1)/MAXDATAPERPACKET;
  1041. mhdr.size = mhdr.total/mhdr.numparts;
  1042. mhdr.tag = hdr.tag;
  1043. mhdr.ofs = 0;
  1044. mhdr.idx = 0;
  1045. const byte *p = (const byte *)mb.toByteArray();
  1046. unsigned i=0;
  1047. loop {
  1048. if (i+1==mhdr.numparts)
  1049. mhdr.size = mhdr.total-mhdr.ofs;
  1050. #ifdef _FULLTRACE
  1051. LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
  1052. #endif
  1053. outhdr.initseq();
  1054. outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
  1055. if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
  1056. #ifdef _FULLTRACE
  1057. LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
  1058. #endif
  1059. return false;
  1060. }
  1061. i++;
  1062. if (i==mhdr.numparts)
  1063. break;
  1064. sendmutex.unlock(); // allow other messages to interleave
  1065. sendmutex.lock();
  1066. mhdr.idx++;
  1067. mhdr.ofs += mhdr.size;
  1068. p += mhdr.size;
  1069. }
  1070. return true;
  1071. }
  1072. };
  1073. class BroadcastPacketHandler // TAG_SYS_BCAST
  1074. {
  1075. public:
  1076. CMessageBuffer *handle(CMessageBuffer * msg)
  1077. {
  1078. delete msg;
  1079. return NULL;
  1080. }
  1081. };
  1082. class ForwardPacketHandler // TAG_SYS_FORWARD
  1083. {
  1084. public:
  1085. CMessageBuffer *handle(CMessageBuffer * msg)
  1086. {
  1087. delete msg;
  1088. return NULL;
  1089. }
  1090. };
  1091. // --------------------------------------------------------
  1092. class CMPPacketReader: public CInterface, public ISocketSelectNotify
  1093. {
  1094. CMessageBuffer *activemsg;
  1095. byte * activeptr;
  1096. size32_t remaining;
  1097. byte *dataptr;
  1098. CMPChannel *parent;
  1099. CriticalSection sect;
  1100. public:
  1101. IMPLEMENT_IINTERFACE;
  1102. CMPPacketReader(CMPChannel *_parent)
  1103. {
  1104. init(_parent);
  1105. }
  1106. void init(CMPChannel *_parent)
  1107. {
  1108. parent = _parent;
  1109. activemsg = NULL;
  1110. }
  1111. void shutdown()
  1112. {
  1113. CriticalBlock block(sect);
  1114. parent = NULL;
  1115. }
  1116. bool notifySelected(ISocket *sock,unsigned selected)
  1117. {
  1118. if (!parent)
  1119. return false;
  1120. try {
  1121. // try and mop up all data on socket
  1122. size32_t sizeavail = sock->avail_read();
  1123. if (sizeavail==0) {
  1124. // graceful close
  1125. Linked<CMPChannel> pc;
  1126. {
  1127. CriticalBlock block(sect);
  1128. if (parent) {
  1129. pc.set(parent); // don't want channel to disappear during call
  1130. parent = NULL;
  1131. }
  1132. }
  1133. if (pc)
  1134. pc->closeSocket();
  1135. return false;
  1136. }
  1137. do {
  1138. parent->lastxfer = msTick();
  1139. #ifdef _FULLTRACE
  1140. parent->numiter++;
  1141. #endif
  1142. if (!activemsg) { // no message in progress
  1143. PacketHeader hdr; // header for active message
  1144. #ifdef _FULLTRACE
  1145. parent->numiter = 1;
  1146. parent->startxfer = msTick();
  1147. #endif
  1148. // assumes packet header will arrive in one go
  1149. if (sizeavail<sizeof(hdr)) {
  1150. #ifdef _FULLTRACE
  1151. LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
  1152. #endif
  1153. size32_t szread;
  1154. sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
  1155. }
  1156. else
  1157. sock->read(&hdr,sizeof(hdr));
  1158. if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) {
  1159. // TBD IPV6 here
  1160. SocketEndpoint ep;
  1161. hdr.sender.get(ep);
  1162. IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
  1163. throw e;
  1164. }
  1165. if (sizeavail<=sizeof(hdr))
  1166. sizeavail = sock->avail_read();
  1167. else
  1168. sizeavail -= sizeof(hdr);
  1169. #ifdef _FULLTRACE
  1170. StringBuffer ep1;
  1171. StringBuffer ep2;
  1172. LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
  1173. #endif
  1174. remaining = hdr.size-sizeof(hdr);
  1175. activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
  1176. activeptr = (byte *)activemsg->reserveTruncate(remaining);
  1177. hdr.setMessageFields(*activemsg);
  1178. }
  1179. size32_t toread = sizeavail;
  1180. if (toread>remaining)
  1181. toread = remaining;
  1182. if (toread) {
  1183. sock->read(activeptr,toread);
  1184. remaining -= toread;
  1185. sizeavail -= toread;
  1186. activeptr += toread;
  1187. }
  1188. if (remaining==0) { // we have the packet so process
  1189. #ifdef _FULLTRACE
  1190. LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
  1191. #endif
  1192. do {
  1193. switch (activemsg->getTag()) {
  1194. case TAG_SYS_MULTI:
  1195. activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out
  1196. break;
  1197. case TAG_SYS_PING:
  1198. parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg);
  1199. delete activemsg;
  1200. activemsg = NULL;
  1201. break;
  1202. case TAG_SYS_PING_REPLY:
  1203. parent->queryServer().pingreplypackethandler->handle(parent);
  1204. delete activemsg;
  1205. activemsg = NULL;
  1206. break;
  1207. case TAG_SYS_BCAST:
  1208. activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg);
  1209. break;
  1210. case TAG_SYS_FORWARD:
  1211. activemsg = parent->queryServer().forwardpackethandler->handle(activemsg);
  1212. break;
  1213. default:
  1214. parent->queryServer().userpackethandler->handle(activemsg); // takes ownership
  1215. activemsg = NULL;
  1216. }
  1217. } while (activemsg);
  1218. }
  1219. if (!sizeavail)
  1220. sizeavail = sock->avail_read();
  1221. } while (sizeavail);
  1222. return false; // ok
  1223. }
  1224. catch (IException *e) {
  1225. if (e->errorCode()!=JSOCKERR_graceful_close)
  1226. FLLOG(MCoperatorWarning, unknownJob, e,"MP(Packet Reader)");
  1227. e->Release();
  1228. }
  1229. // error here, so close socket (ignore error as may be closed already)
  1230. try {
  1231. if(parent)
  1232. parent->closeSocket();
  1233. }
  1234. catch (IException *e) {
  1235. e->Release();
  1236. }
  1237. parent = NULL;
  1238. return false;
  1239. }
  1240. };
  1241. CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep)
  1242. {
  1243. channelsock = NULL;
  1244. parent = _parent;
  1245. remoteep = _remoteep;
  1246. localep.set(parent->getPort());
  1247. multitag = TAG_NULL;
  1248. reader = new CMPPacketReader(this);
  1249. closed = false;
  1250. master = false;
  1251. sendwaiting = 0;
  1252. }
  1253. void CMPChannel::reset()
  1254. {
  1255. reader->shutdown(); // clear as early as possible
  1256. closeSocket();
  1257. reader->Release();
  1258. channelsock = NULL;
  1259. multitag = TAG_NULL;
  1260. reader = new CMPPacketReader(this);
  1261. closed = false;
  1262. master = false;
  1263. sendwaiting = 0;
  1264. }
  1265. CMPChannel::~CMPChannel()
  1266. {
  1267. reader->shutdown(); // clear as early as possible
  1268. closeSocket();
  1269. reader->Release();
  1270. }
  1271. bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm) // takes ownership if succeeds
  1272. {
  1273. #ifdef _FULLTRACE
  1274. PROGLOG("MP: attachSocket on entry");
  1275. #endif
  1276. CriticalBlock block(connectsect);
  1277. #ifdef _FULLTRACE
  1278. PROGLOG("MP: attachSocket got connectsect");
  1279. #endif
  1280. // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
  1281. if (channelsock) {
  1282. if (remoteep.port==0)
  1283. return false;
  1284. StringBuffer ep1;
  1285. StringBuffer ep2;
  1286. _localep.getUrlStr(ep1);
  1287. remoteep.getUrlStr(ep2);
  1288. LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
  1289. try {
  1290. if (ismaster!=master) {
  1291. if (ismaster) {
  1292. LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
  1293. return false;
  1294. }
  1295. else {
  1296. Sleep(50); // give the other side some time to close
  1297. CTimeMon tm(10000);
  1298. if (verifyConnection(tm,false)) {
  1299. LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
  1300. return false;
  1301. }
  1302. }
  1303. }
  1304. }
  1305. catch (IException *e) {
  1306. FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(1)");
  1307. e->Release();
  1308. }
  1309. try {
  1310. LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
  1311. CriticalUnblock unblock(connectsect);
  1312. closeSocket(true);
  1313. #ifdef REFUSE_STALE_CONNECTION
  1314. if (!ismaster)
  1315. return false;
  1316. #endif
  1317. Sleep(100); // pause to allow close socket triggers to run
  1318. }
  1319. catch (IException *e) {
  1320. FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
  1321. e->Release();
  1322. }
  1323. }
  1324. if (confirm)
  1325. newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
  1326. closed = false;
  1327. reader->init(this);
  1328. channelsock = LINK(newsock);
  1329. #ifdef _FULLTRACE
  1330. PROGLOG("MP: attachSocket before select add");
  1331. #endif
  1332. parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
  1333. #ifdef _FULLTRACE
  1334. PROGLOG("MP: attachSocket after select add");
  1335. #endif
  1336. localep = _localep;
  1337. master = ismaster;
  1338. return true;
  1339. }
  1340. bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon &tm, bool reply)
  1341. {
  1342. // note must not adjust mb
  1343. assertex(tag!=TAG_NULL);
  1344. assertex(tm.timeout);
  1345. const byte *msg = (const byte *)mb.toByteArray();
  1346. size32_t msgsize = mb.length();
  1347. PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
  1348. if (closed||(reply&&!isConnected())) { // flag error if has been disconnected
  1349. #ifdef _FULLTRACE
  1350. LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
  1351. PrintStackReport();
  1352. #endif
  1353. IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
  1354. throw e;
  1355. }
  1356. bool ismulti = (msgsize>MAXDATAPERPACKET);
  1357. // pre-condition - ensure no clashes
  1358. loop {
  1359. sendmutex.lock();
  1360. if (ismulti) {
  1361. if (multitag==TAG_NULL) // don't want to interleave with other multi send
  1362. break;
  1363. }
  1364. else if (multitag!=tag) // don't want to interleave with another of same tag
  1365. break;
  1366. sendwaiting++;
  1367. sendmutex.unlock();
  1368. sendwaitingsig.wait();
  1369. }
  1370. struct Cpostcondition // can we start using eiffel
  1371. {
  1372. Mutex &sendmutex;
  1373. unsigned &sendwaiting;
  1374. Semaphore &sendwaitingsig;
  1375. mptag_t *multitag;
  1376. Cpostcondition(Mutex &_sendmutex,unsigned &_sendwaiting,Semaphore &_sendwaitingsig,mptag_t *_multitag)
  1377. : sendmutex(_sendmutex),sendwaiting(_sendwaiting),sendwaitingsig(_sendwaitingsig)
  1378. {
  1379. multitag = _multitag;
  1380. }
  1381. ~Cpostcondition()
  1382. {
  1383. if (multitag)
  1384. *multitag = TAG_NULL;
  1385. if (sendwaiting) {
  1386. sendwaitingsig.signal(sendwaiting);
  1387. sendwaiting = 0;
  1388. }
  1389. sendmutex.unlock();
  1390. }
  1391. } postcond(sendmutex,sendwaiting,sendwaitingsig,ismulti?&multitag:NULL);
  1392. if (ismulti)
  1393. return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);
  1394. return parent->userpackethandler->send(this,hdr,mb,tm);
  1395. }
  1396. bool CMPChannel::sendPing(CTimeMon &tm)
  1397. {
  1398. unsigned remaining;
  1399. tm.timedout(&remaining);
  1400. if (!sendmutex.lockWait(remaining))
  1401. return false;
  1402. SocketEndpoint myep(parent->getPort());
  1403. PacketHeader hdr(sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING,TAG_SYS_PING_REPLY);
  1404. bool ret = false;
  1405. try {
  1406. ret = parent->pingpackethandler->send(this,hdr,tm)&&!tm.timedout(&remaining);
  1407. }
  1408. catch (IException *e) {
  1409. FLLOG(MCoperatorWarning, unknownJob, e,"MP ping(1)");
  1410. e->Release();
  1411. }
  1412. sendmutex.unlock();
  1413. if (ret)
  1414. ret = pingsem.wait(remaining);
  1415. return ret;
  1416. }
  1417. bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
  1418. {
  1419. CTimeMon mon(timeout);
  1420. unsigned remaining;
  1421. mon.timedout(&remaining);
  1422. if (!sendmutex.lockWait(remaining))
  1423. return false;
  1424. SocketEndpoint myep(parent->getPort());
  1425. MemoryBuffer mb;
  1426. if (identifyself) {
  1427. #ifdef _WIN32
  1428. mb.append(GetCommandLine());
  1429. #endif
  1430. }
  1431. PacketHeader hdr(mb.length()+sizeof(PacketHeader),myep,remoteep,TAG_SYS_PING_REPLY,TAG_NULL);
  1432. bool ret;
  1433. try {
  1434. ret = parent->pingreplypackethandler->send(this,hdr,mb,mon);
  1435. }
  1436. catch (IException *e) {
  1437. FLLOG(MCoperatorWarning, unknownJob, e,"MP ping reply(1)");
  1438. e->Release();
  1439. ret = false;
  1440. }
  1441. sendmutex.unlock();
  1442. return ret;
  1443. }
  1444. // --------------------------------------------------------
  1445. CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
  1446. : Thread("MP Connection Thread")
  1447. {
  1448. parent = _parent;
  1449. if (!port)
  1450. {
  1451. // need to connect early to resolve clash
  1452. Owned<IPropertyTree> env = getHPCCEnvironment();
  1453. unsigned minPort, maxPort;
  1454. if (env)
  1455. {
  1456. minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
  1457. maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
  1458. }
  1459. else
  1460. {
  1461. minPort = MP_START_PORT;
  1462. maxPort = MP_END_PORT;
  1463. }
  1464. assertex(maxPort >= minPort);
  1465. Owned<IJSOCK_Exception> lastErr;
  1466. unsigned numPorts = maxPort - minPort + 1;
  1467. for (int retries = 0; retries < numPorts * 3; retries++)
  1468. {
  1469. port = minPort + getRandom() % numPorts;
  1470. try
  1471. {
  1472. listensock = ISocket::create(port, 16); // better not to have *too* many waiting
  1473. break;
  1474. }
  1475. catch (IJSOCK_Exception *e)
  1476. {
  1477. if (e->errorCode()!=JSOCKERR_port_in_use)
  1478. throw;
  1479. lastErr.setown(e);
  1480. }
  1481. }
  1482. if (!listensock)
  1483. throw lastErr.getClear();
  1484. }
  1485. else
  1486. listensock = NULL; // delay create till running
  1487. parent->setPort(port);
  1488. #ifdef _TRACE
  1489. LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
  1490. #endif
  1491. running = false;
  1492. }
  1493. void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
  1494. {
  1495. byte *b = (byte *)p;
  1496. while (sz--)
  1497. if (*(b++)!=0xff)
  1498. return;
  1499. // Panic!
  1500. PROGLOG("MP Self destruct invoked");
  1501. try {
  1502. if (listensock) {
  1503. listensock->close();
  1504. listensock->Release();
  1505. listensock=NULL;
  1506. }
  1507. }
  1508. catch (...)
  1509. {
  1510. PROGLOG("MP socket close failure");
  1511. }
  1512. // Kill registered child processes
  1513. PROGLOG("MP self destruct exit");
  1514. queryLogMsgManager()->flushQueue(10*1000);
  1515. #ifdef _WIN32
  1516. ForEachItemIn(i,childprocesslist)
  1517. TerminateProcess((HANDLE)childprocesslist.item(i), 1);
  1518. TerminateProcess(GetCurrentProcess(), 1);
  1519. #else
  1520. ForEachItemIn(i,childprocesslist)
  1521. ::kill((HANDLE)childprocesslist.item(i), SIGTERM);
  1522. ::kill(getpid(), SIGTERM);
  1523. #endif
  1524. _exit(1);
  1525. }
  1526. void CMPConnectThread::start(unsigned short port)
  1527. {
  1528. if (!listensock)
  1529. listensock = ISocket::create(port,16);
  1530. running = true;
  1531. Thread::start();
  1532. }
  1533. int CMPConnectThread::run()
  1534. {
  1535. #ifdef _TRACE
  1536. LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting");
  1537. #endif
  1538. while (running) {
  1539. ISocket *sock=NULL;
  1540. try {
  1541. sock=listensock->accept(true);
  1542. #ifdef _FULLTRACE
  1543. StringBuffer s;
  1544. SocketEndpoint ep1;
  1545. sock->getPeerEndpoint(ep1);
  1546. PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
  1547. #endif
  1548. }
  1549. catch (IException *e)
  1550. {
  1551. LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
  1552. throw; // error handling TBD
  1553. }
  1554. if (sock) {
  1555. try {
  1556. sock->set_keep_alive(true);
  1557. size32_t rd;
  1558. SocketEndpoint remoteep;
  1559. SocketEndpoint hostep;
  1560. SocketEndpointV4 id[2];
  1561. traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
  1562. if (rd != sizeof(id))
  1563. {
  1564. FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid number of connection bytes serialized");
  1565. sock->close();
  1566. continue;
  1567. }
  1568. id[0].get(remoteep);
  1569. id[1].get(hostep);
  1570. if (remoteep.isNull() || hostep.isNull())
  1571. {
  1572. // JCSMORE, I think remoteep really must/should match a IP of this local host
  1573. FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid remote and/or host ep serialized");
  1574. sock->close();
  1575. continue;
  1576. }
  1577. #ifdef _FULLTRACE
  1578. StringBuffer tmp1;
  1579. remoteep.getUrlStr(tmp1);
  1580. tmp1.append(' ');
  1581. hostep.getUrlStr(tmp1);
  1582. PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
  1583. #endif
  1584. checkSelfDestruct(&id[0],sizeof(id));
  1585. if (!parent->lookup(remoteep).attachSocket(sock,remoteep,hostep,false, &rd)) {
  1586. #ifdef _FULLTRACE
  1587. PROGLOG("MP Connect Thread: lookup failed");
  1588. #endif
  1589. }
  1590. else {
  1591. #ifdef _TRACE
  1592. StringBuffer str1;
  1593. StringBuffer str2;
  1594. LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",remoteep.getUrlStr(str1).toCharArray());
  1595. #endif
  1596. }
  1597. #ifdef _FULLTRACE
  1598. PROGLOG("MP: Connect Thread: after write");
  1599. #endif
  1600. }
  1601. catch (IException *e)
  1602. {
  1603. FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)");
  1604. sock->close();
  1605. e->Release();
  1606. }
  1607. try {
  1608. sock->Release();
  1609. }
  1610. catch (IException *e)
  1611. {
  1612. FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
  1613. e->Release();
  1614. }
  1615. }
  1616. else {
  1617. if (running)
  1618. LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
  1619. }
  1620. }
  1621. #ifdef _TRACE
  1622. LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
  1623. #endif
  1624. return 0;
  1625. }
  1626. // --------------------------------------------------------
  1627. class CMPChannelIterator
  1628. {
  1629. CMPServer &parent;
  1630. CMPChannel *cur;
  1631. public:
  1632. CMPChannelIterator(CMPServer &_parent)
  1633. : parent(_parent)
  1634. {
  1635. cur = NULL;
  1636. }
  1637. bool first()
  1638. {
  1639. cur = NULL;
  1640. return parent.nextChannel(cur);
  1641. }
  1642. bool next()
  1643. {
  1644. return cur&&parent.nextChannel(cur);
  1645. }
  1646. bool isValid()
  1647. {
  1648. return cur!=NULL;
  1649. }
  1650. CMPChannel &query()
  1651. {
  1652. return *cur;
  1653. }
  1654. };
  1655. //-----------------------------------------------------------------------------------
  1656. CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
  1657. {
  1658. // there is an assumption here that no removes will be done within this loop
  1659. CriticalBlock block(serversect);
  1660. SocketEndpoint ep = endpoint;
  1661. CMPChannel *e=find(ep);
  1662. // Check for freed channels
  1663. if (e&&e->isClosed()&&(msTick()-e->lastxfer>30*1000))
  1664. e->reset();
  1665. if (checkclosed) {
  1666. checkclosed = false;
  1667. CMPChannel *c = NULL;
  1668. loop {
  1669. c = (CMPChannel *)next(c);
  1670. if (!c) {
  1671. break;
  1672. }
  1673. if (c->isClosed()&&(msTick()-c->lastxfer>30*1000)) {
  1674. removeExact(c);
  1675. c = NULL;
  1676. }
  1677. }
  1678. e=find(ep);
  1679. }
  1680. if (!e) {
  1681. e = new CMPChannel(this,ep);
  1682. add(*e);
  1683. }
  1684. return *e;
  1685. }
  1686. CMPServer::CMPServer(unsigned _port)
  1687. {
  1688. port = 0; // connectthread tells me what port it actually connected on
  1689. checkclosed = false;
  1690. connectthread = new CMPConnectThread(this, _port);
  1691. selecthandler = createSocketSelectHandler();
  1692. pingpackethandler = new PingPacketHandler; // TAG_SYS_PING
  1693. pingreplypackethandler = new PingReplyPacketHandler; // TAG_SYS_PING_REPLY
  1694. forwardpackethandler = new ForwardPacketHandler; // TAG_SYS_FORWARD
  1695. multipackethandler = new MultiPacketHandler; // TAG_SYS_MULTI
  1696. broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST
  1697. userpackethandler = new UserPacketHandler(this); // default
  1698. notifyclosedthread = new CMPNotifyClosedThread(this);
  1699. notifyclosedthread->start();
  1700. initMyNode(port); // NB port set by connectthread constructor
  1701. selecthandler->start();
  1702. }
  1703. CMPServer::~CMPServer()
  1704. {
  1705. #ifdef _TRACEORPHANS
  1706. StringBuffer buf;
  1707. getReceiveQueueDetails(buf);
  1708. if (buf.length())
  1709. LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
  1710. #endif
  1711. releaseAll();
  1712. selecthandler->Release();
  1713. notifyclosedthread->stop();
  1714. notifyclosedthread->Release();
  1715. connectthread->Release();
  1716. delete pingpackethandler;
  1717. delete pingreplypackethandler;
  1718. delete forwardpackethandler;
  1719. delete multipackethandler;
  1720. delete broadcastpackethandler;
  1721. delete userpackethandler;
  1722. }
  1723. bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm)
  1724. {
  1725. checkTagOK(tag);
  1726. class Cnfy: public CBufferQueueNotify
  1727. {
  1728. public:
  1729. bool aborted;
  1730. CMessageBuffer *result;
  1731. const SocketEndpoint *ep;
  1732. SocketEndpoint closedEp; // used if receiving on RANK_ALL
  1733. mptag_t tag;
  1734. Cnfy(const SocketEndpoint *_ep,mptag_t _tag) { ep = _ep; tag = _tag; result = NULL; aborted=false; }
  1735. bool notify(CMessageBuffer *msg)
  1736. {
  1737. if ((tag==TAG_ALL)||(tag==msg->getTag())) {
  1738. SocketEndpoint senderep = msg->getSender();
  1739. if ((ep==NULL)||ep->equals(senderep)||senderep.isNull()) {
  1740. if (msg->getReplyTag()==TAG_CANCEL)
  1741. delete msg;
  1742. else
  1743. result = msg;
  1744. return true;
  1745. }
  1746. }
  1747. return false;
  1748. }
  1749. bool notifyClosed(SocketEndpoint &_closedEp) // called when connection closed
  1750. {
  1751. if (NULL == ep) { // ep is NULL if receiving on RANK_ALL
  1752. closedEp = _closedEp;
  1753. ep = &closedEp; // used for abort info
  1754. aborted = true;
  1755. return true;
  1756. }
  1757. else if (ep->equals(_closedEp)) {
  1758. aborted = true;
  1759. return true;
  1760. }
  1761. return false;
  1762. }
  1763. } nfy(ep,tag);
  1764. if (receiveq.wait(nfy,false,tm)&&nfy.result) {
  1765. mbuf.transferFrom(*nfy.result);
  1766. delete nfy.result;
  1767. return true;
  1768. }
  1769. if (nfy.aborted) {
  1770. #ifdef _FULLTRACE
  1771. LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
  1772. PrintStackReport();
  1773. #endif
  1774. IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
  1775. throw e;
  1776. }
  1777. return false;
  1778. }
  1779. void CMPServer::flush(mptag_t tag)
  1780. {
  1781. class Cnfy: public CBufferQueueNotify
  1782. {
  1783. public:
  1784. mptag_t tag;
  1785. Cnfy(mptag_t _tag) { tag = _tag; }
  1786. bool notify(CMessageBuffer *msg)
  1787. {
  1788. return (tag==TAG_ALL)||(tag==msg->getTag());
  1789. }
  1790. bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
  1791. {
  1792. return false;
  1793. }
  1794. } nfy(tag);
  1795. unsigned count = receiveq.flush(nfy);
  1796. if (count)
  1797. PROGLOG("CMPServer::flush(%d) discarded %u buffers",(int)tag,count);
  1798. }
  1799. void CMPServer::cancel(const SocketEndpoint *ep, mptag_t tag)
  1800. {
  1801. CMessageBuffer *cancelmsg = new CMessageBuffer(0);
  1802. SocketEndpoint send;
  1803. if (ep)
  1804. send = *ep;
  1805. cancelmsg->init(send,tag,TAG_CANCEL);
  1806. getReceiveQ().enqueue(cancelmsg);
  1807. }
  1808. unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,SocketEndpoint &sender)
  1809. {
  1810. class Cnfy: public CBufferQueueNotify
  1811. {
  1812. public:
  1813. bool aborted;
  1814. SocketEndpoint &sender;
  1815. const SocketEndpoint *ep;
  1816. mptag_t tag;
  1817. bool cancel;
  1818. unsigned count;
  1819. Cnfy(const SocketEndpoint *_ep,mptag_t _tag,SocketEndpoint &_sender) : sender(_sender)
  1820. {
  1821. ep = _ep;
  1822. tag = _tag;
  1823. cancel = false;
  1824. aborted = false;
  1825. count = 0;
  1826. }
  1827. bool notify(CMessageBuffer *msg)
  1828. {
  1829. if (((tag==TAG_ALL)||(tag==msg->getTag()))&&
  1830. ((ep==NULL)||ep->equals(msg->getSender()))) {
  1831. if (count==0) {
  1832. sender = msg->getSender();
  1833. cancel = (msg->getReplyTag()==TAG_CANCEL);
  1834. }
  1835. count++;
  1836. return true;
  1837. }
  1838. return false;
  1839. }
  1840. bool notifyClosed(SocketEndpoint &closedep) // called when connection closed
  1841. {
  1842. if (ep&&ep->equals(closedep)) {
  1843. aborted = true;
  1844. return true;
  1845. }
  1846. return false;
  1847. }
  1848. } nfy(ep,tag,sender);
  1849. if (receiveq.wait(nfy,true,tm)) {
  1850. return nfy.cancel?0:nfy.count;
  1851. }
  1852. if (nfy.aborted) {
  1853. #ifdef _FULLTRACE
  1854. LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
  1855. PrintStackReport();
  1856. #endif
  1857. IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
  1858. throw e;
  1859. }
  1860. return 0;
  1861. }
  1862. void CMPServer::start()
  1863. {
  1864. connectthread->start(getPort());
  1865. }
  1866. void CMPServer::stop()
  1867. {
  1868. selecthandler->stop(true);
  1869. connectthread->stop();
  1870. CMPChannel *c = NULL;
  1871. loop {
  1872. c = (CMPChannel *)next(c);
  1873. if (!c)
  1874. break;
  1875. c->closeSocket();
  1876. }
  1877. }
  1878. void CMPServer::addConnectionMonitor(IConnectionMonitor *monitor)
  1879. {
  1880. // called in critical section CMPServer::sect
  1881. notifyclosedthread->addConnectionMonitor(monitor);
  1882. }
  1883. void CMPServer::removeConnectionMonitor(IConnectionMonitor *monitor)
  1884. {
  1885. // called in critical section CMPServer::sect
  1886. notifyclosedthread->removeConnectionMonitor(monitor);
  1887. }
  1888. void CMPServer::onAdd(void *)
  1889. {
  1890. // not used
  1891. }
  1892. void CMPServer::onRemove(void *e)
  1893. {
  1894. CMPChannel &elem=*(CMPChannel *)e;
  1895. elem.Release();
  1896. }
  1897. unsigned CMPServer::getHashFromElement(const void *e) const
  1898. {
  1899. const CMPChannel &elem=*(const CMPChannel *)e;
  1900. return elem.remoteep.hash(0);
  1901. }
  1902. unsigned CMPServer::getHashFromFindParam(const void *fp) const
  1903. {
  1904. return ((const SocketEndpoint*)fp)->hash(0);
  1905. }
  1906. const void * CMPServer::getFindParam(const void *p) const
  1907. {
  1908. const CMPChannel &elem=*(const CMPChannel *)p;
  1909. return &elem.remoteep;
  1910. }
  1911. bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) const
  1912. {
  1913. return ((CMPChannel *)et)->remoteep.equals(*(SocketEndpoint *)fp);
  1914. }
  1915. bool CMPServer::nextChannel(CMPChannel *&cur)
  1916. {
  1917. CriticalBlock block(serversect);
  1918. cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
  1919. return cur!=NULL;
  1920. }
  1921. void CMPServer::notifyClosed(SocketEndpoint &ep)
  1922. {
  1923. #ifdef _TRACE
  1924. StringBuffer url;
  1925. LOG(MCdebugInfo(100), unknownJob, "CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
  1926. #endif
  1927. notifyclosedthread->notify(ep);
  1928. }
  1929. // --------------------------------------------------------
  1930. class CInterCommunicator: public CInterface, public IInterCommunicator
  1931. {
  1932. CMPServer *parent;
  1933. public:
  1934. IMPLEMENT_IINTERFACE;
  1935. bool send (CMessageBuffer &mbuf, INode *dst, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
  1936. {
  1937. if (!dst)
  1938. return false;
  1939. size32_t msgsize = mbuf.length();
  1940. if (dst->equals(queryMyNode())) {
  1941. CMessageBuffer *msg = new CMessageBuffer();
  1942. mptag_t reply = mbuf.getReplyTag();
  1943. msg->transferFrom(mbuf);
  1944. msg->init(dst->endpoint(),tag,reply);
  1945. parent->getReceiveQ().enqueue(msg);
  1946. mbuf.clear(); // for consistent semantics
  1947. return true;
  1948. }
  1949. CTimeMon tm(timeout);
  1950. CMPChannel &channel = parent->lookup(dst->endpoint());
  1951. unsigned remaining;
  1952. if (tm.timedout(&remaining))
  1953. return false;
  1954. if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
  1955. return false;
  1956. mbuf.clear(); // for consistent semantics
  1957. return true;
  1958. }
  1959. bool verifyConnection(INode *node, unsigned timeout)
  1960. {
  1961. CriticalBlock block(verifysect);
  1962. CTimeMon tm(timeout);
  1963. CMPChannel &channel = parent->lookup(node->endpoint());
  1964. unsigned remaining;
  1965. if (tm.timedout(&remaining))
  1966. return false;
  1967. return channel.verifyConnection(tm,true);
  1968. }
  1969. void verifyAll(StringBuffer &log)
  1970. {
  1971. CMPChannelIterator iter(*parent);
  1972. if (iter.first()) {
  1973. do {
  1974. CriticalBlock block(verifysect);
  1975. CTimeMon tm(5000);
  1976. CMPChannel &channel = iter.query();
  1977. if (!channel.isClosed()) {
  1978. channel.queryEpStr(log).append(' ');
  1979. if (channel.verifyConnection(tm,false))
  1980. log.append("OK\n");
  1981. else
  1982. log.append("FAILED\n");
  1983. }
  1984. }
  1985. while (iter.next());
  1986. }
  1987. }
  1988. bool verifyAll(IGroup *group,bool duplex, unsigned timeout)
  1989. {
  1990. CriticalBlock block(verifysect);
  1991. CTimeMon tm(timeout);
  1992. rank_t myrank = group->rank();
  1993. {
  1994. ForEachNodeInGroup(rank,*group) {
  1995. bool doverify;
  1996. if (duplex)
  1997. doverify = (myrank!=rank);
  1998. else if ((rank&1)==(myrank&1))
  1999. doverify = (myrank>rank);
  2000. else
  2001. doverify = (myrank<rank);
  2002. if (doverify) {
  2003. CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
  2004. unsigned remaining;
  2005. if (tm.timedout(&remaining)) {
  2006. return false;
  2007. }
  2008. if (!channel.verifyConnection(tm,true)) {
  2009. return false;
  2010. }
  2011. }
  2012. }
  2013. }
  2014. if (!duplex) {
  2015. ForEachNodeInGroup(rank,*group) {
  2016. bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
  2017. if (doverify) {
  2018. CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
  2019. while (!channel.verifyConnection(tm,false)) {
  2020. unsigned remaining;
  2021. if (tm.timedout(&remaining))
  2022. return false;
  2023. CriticalUnblock unblock(verifysect);
  2024. Sleep(100);
  2025. }
  2026. }
  2027. }
  2028. }
  2029. return true;
  2030. }
  2031. unsigned probe(INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=0)
  2032. {
  2033. if (sender)
  2034. *sender = NULL;
  2035. SocketEndpoint res;
  2036. CTimeMon tm(timeout);
  2037. unsigned ret = parent->probe(src?&src->endpoint():NULL,tag,tm,res);
  2038. if (ret!=0) {
  2039. if (sender)
  2040. *sender = createINode(res);
  2041. return ret;
  2042. }
  2043. return 0;
  2044. }
  2045. bool recv(CMessageBuffer &mbuf, INode *src, mptag_t tag, INode **sender=NULL, unsigned timeout=MP_WAIT_FOREVER)
  2046. {
  2047. if (sender)
  2048. *sender = NULL;
  2049. CTimeMon tm(timeout);
  2050. loop
  2051. {
  2052. try
  2053. {
  2054. if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
  2055. {
  2056. if (sender)
  2057. *sender = createINode(mbuf.getSender());
  2058. return true;
  2059. }
  2060. return false;
  2061. }
  2062. catch (IMP_Exception *e)
  2063. {
  2064. if (MPERR_link_closed != e->errorCode())
  2065. throw;
  2066. const SocketEndpoint &ep = e->queryEndpoint();
  2067. if (src && (ep == src->endpoint()))
  2068. throw;
  2069. StringBuffer epStr;
  2070. ep.getUrlStr(epStr);
  2071. FLLOG(MCoperatorWarning, unknownJob, "CInterCommunicator: ignoring closed endpoint: %s", epStr.str());
  2072. e->Release();
  2073. // loop around and recv again
  2074. }
  2075. }
  2076. }
  2077. void flush(mptag_t tag)
  2078. {
  2079. parent->flush(tag);
  2080. }
  2081. bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag, unsigned timeout=MP_WAIT_FOREVER)
  2082. {
  2083. assertex(dst);
  2084. mptag_t replytag = createReplyTag();
  2085. CTimeMon tm(timeout);
  2086. mbuff.setReplyTag(replytag);
  2087. unsigned remaining;
  2088. if (tm.timedout(&remaining))
  2089. return false;
  2090. if (!send(mbuff,dst,dsttag,remaining)||tm.timedout(&remaining))
  2091. return false;
  2092. mbuff.clear();
  2093. return recv(mbuff,dst,replytag,NULL,remaining);
  2094. }
  2095. bool reply(CMessageBuffer &mbuff, unsigned timeout=MP_WAIT_FOREVER)
  2096. {
  2097. Owned<INode> dst(createINode(mbuff.getSender()));
  2098. return send(mbuff,dst,mbuff.getReplyTag(),timeout);
  2099. }
  2100. void cancel(INode *src, mptag_t tag)
  2101. {
  2102. parent->cancel(src?&src->endpoint():NULL,tag);
  2103. }
  2104. void disconnect(INode *node)
  2105. {
  2106. CriticalBlock block(verifysect);
  2107. CMPChannel &channel = parent->lookup(node->endpoint());
  2108. channel.closeSocket();
  2109. parent->removeChannel(&channel);
  2110. }
  2111. CInterCommunicator(CMPServer *_parent)
  2112. {
  2113. parent = _parent;
  2114. }
  2115. ~CInterCommunicator()
  2116. {
  2117. }
  2118. };
  2119. class CCommunicator: public CInterface, public ICommunicator
  2120. {
  2121. IGroup *group;
  2122. CMPServer *parent;
  2123. bool outer;
  2124. const SocketEndpoint &queryEndpoint(rank_t rank)
  2125. {
  2126. return group->queryNode(rank).endpoint();
  2127. }
  2128. CMPChannel &queryChannel(rank_t rank)
  2129. {
  2130. return parent->lookup(queryEndpoint(rank));
  2131. }
  2132. public:
  2133. IMPLEMENT_IINTERFACE;
  2134. bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
  2135. {
  2136. // send does not corrupt mbuf
  2137. if (dstrank==RANK_NULL)
  2138. return false;
  2139. rank_t myrank = group->rank();
  2140. if (dstrank==myrank) {
  2141. CMessageBuffer *msg = mbuf.clone();
  2142. // change sender
  2143. msg->init(queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
  2144. parent->getReceiveQ().enqueue(msg);
  2145. }
  2146. else {
  2147. CTimeMon tm(timeout);
  2148. rank_t endrank;
  2149. if (dstrank==RANK_ALL) {
  2150. send(mbuf,myrank,tag,timeout);
  2151. dstrank = RANK_ALL_OTHER;
  2152. }
  2153. if (dstrank==RANK_ALL_OTHER) {
  2154. dstrank = 0;
  2155. endrank = group->ordinality()-1;
  2156. }
  2157. else if (dstrank==RANK_RANDOM) {
  2158. if (group->ordinality()>1) {
  2159. do {
  2160. dstrank = getRandom()%group->ordinality();
  2161. } while (dstrank==myrank);
  2162. }
  2163. else {
  2164. assertex(myrank!=0);
  2165. dstrank = 0;
  2166. }
  2167. endrank = dstrank;
  2168. }
  2169. else
  2170. endrank = dstrank;
  2171. for (;dstrank<=endrank;dstrank++) {
  2172. if (dstrank!=myrank) {
  2173. CMPChannel &channel = queryChannel(dstrank);
  2174. unsigned remaining;
  2175. if (tm.timedout(&remaining))
  2176. return false;
  2177. if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
  2178. return false;
  2179. }
  2180. }
  2181. }
  2182. return true;
  2183. }
  2184. bool verifyConnection(rank_t rank, unsigned timeout)
  2185. {
  2186. CriticalBlock block(verifysect);
  2187. assertex(rank!=RANK_RANDOM);
  2188. assertex(rank!=RANK_ALL);
  2189. CTimeMon tm(timeout);
  2190. CMPChannel &channel = queryChannel(rank);
  2191. unsigned remaining;
  2192. if (tm.timedout(&remaining))
  2193. return false;
  2194. return channel.verifyConnection(tm,true);
  2195. }
  2196. bool verifyAll(bool duplex, unsigned timeout)
  2197. {
  2198. CriticalBlock block(verifysect);
  2199. CTimeMon tm(timeout);
  2200. rank_t myrank = group->rank();
  2201. {
  2202. ForEachNodeInGroup(rank,*group) {
  2203. bool doverify;
  2204. if (duplex)
  2205. doverify = (rank!=myrank);
  2206. else if ((rank&1)==(myrank&1))
  2207. doverify = (myrank>rank);
  2208. else
  2209. doverify = (myrank<rank);
  2210. if (doverify) {
  2211. CMPChannel &channel = queryChannel(rank);
  2212. unsigned remaining;
  2213. if (tm.timedout(&remaining)) {
  2214. return false;
  2215. }
  2216. if (!channel.verifyConnection(tm,true))
  2217. return false;
  2218. }
  2219. }
  2220. }
  2221. if (!duplex) {
  2222. ForEachNodeInGroup(rank,*group) {
  2223. bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
  2224. if (doverify) {
  2225. CMPChannel &channel = queryChannel(rank);
  2226. while (!channel.verifyConnection(tm,false)) {
  2227. unsigned remaining;
  2228. if (tm.timedout(&remaining))
  2229. return false;
  2230. CriticalUnblock unblock(verifysect);
  2231. Sleep(100);
  2232. }
  2233. }
  2234. }
  2235. }
  2236. return true;
  2237. }
  2238. unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)
  2239. {
  2240. assertex(srcrank!=RANK_NULL);
  2241. SocketEndpoint res;
  2242. CTimeMon tm(timeout);
  2243. unsigned ret = parent->probe((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm,res);
  2244. if (ret!=0) {
  2245. if (sender)
  2246. *sender = group->rank(res);
  2247. return ret;
  2248. }
  2249. if (sender)
  2250. *sender = RANK_NULL;
  2251. return 0;
  2252. }
  2253. bool recv(CMessageBuffer &mbuf, rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=MP_WAIT_FOREVER)
  2254. {
  2255. assertex(srcrank!=RANK_NULL);
  2256. const SocketEndpoint *srcep=NULL;
  2257. if (srcrank==RANK_ALL) {
  2258. if (!outer&&(group->ordinality()==1)) // minor optimization (useful in Dali)
  2259. srcep = &queryEndpoint(0);
  2260. }
  2261. else
  2262. srcep = &queryEndpoint(srcrank);
  2263. CTimeMon tm(timeout);
  2264. loop
  2265. {
  2266. try
  2267. {
  2268. if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
  2269. {
  2270. if (sender)
  2271. *sender = group->rank(mbuf.getSender());
  2272. return true;
  2273. }
  2274. if (sender)
  2275. *sender = RANK_NULL;
  2276. return false;
  2277. }
  2278. catch (IMP_Exception *e)
  2279. {
  2280. if (MPERR_link_closed != e->errorCode())
  2281. throw;
  2282. const SocketEndpoint &ep = e->queryEndpoint();
  2283. if (RANK_NULL != group->rank(ep))
  2284. throw;
  2285. StringBuffer epStr;
  2286. ep.getUrlStr(epStr);
  2287. FLLOG(MCoperatorWarning, unknownJob, "CCommunicator: ignoring closed endpoint from outside the communicator group: %s", epStr.str());
  2288. e->Release();
  2289. // loop around and recv again
  2290. }
  2291. }
  2292. }
  2293. void flush(mptag_t tag)
  2294. {
  2295. parent->flush(tag);
  2296. }
  2297. IGroup &queryGroup() { return *group; }
  2298. IGroup *getGroup() { return LINK(group); }
  2299. bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
  2300. {
  2301. assertex((sendrank!=RANK_NULL)&&(sendrank!=RANK_ALL));
  2302. if (sendrank==RANK_RANDOM) {
  2303. if (group->ordinality()>1) {
  2304. do {
  2305. sendrank = getRandom()%group->ordinality();
  2306. } while (sendrank==group->rank());
  2307. }
  2308. else {
  2309. assertex(group->rank()!=0);
  2310. sendrank = 0;
  2311. }
  2312. }
  2313. mptag_t replytag = createReplyTag();
  2314. CTimeMon tm(timeout);
  2315. mbuff.setReplyTag(replytag);
  2316. unsigned remaining;
  2317. if (tm.timedout(&remaining))
  2318. return false;
  2319. if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
  2320. return false;
  2321. mbuff.clear();
  2322. return recv(mbuff,sendrank,replytag,NULL,remaining);
  2323. }
  2324. bool reply(CMessageBuffer &mbuf, unsigned timeout=MP_WAIT_FOREVER)
  2325. {
  2326. mptag_t replytag = mbuf.getReplyTag();
  2327. rank_t dstrank = group->rank(mbuf.getSender());
  2328. if (dstrank!=RANK_NULL) {
  2329. if (send (mbuf, dstrank, replytag,timeout)) {
  2330. mbuf.setReplyTag(TAG_NULL);
  2331. return true;
  2332. }
  2333. return false;
  2334. }
  2335. CTimeMon tm(timeout);
  2336. CMPChannel &channel = parent->lookup(mbuf.getSender());
  2337. unsigned remaining;
  2338. if (tm.timedout(&remaining)) {
  2339. return false;
  2340. }
  2341. if (channel.send(mbuf,replytag,TAG_NULL,tm, true)) {
  2342. mbuf.setReplyTag(TAG_NULL);
  2343. return true;
  2344. }
  2345. return false;
  2346. }
  2347. void cancel(rank_t srcrank, mptag_t tag)
  2348. {
  2349. assertex(srcrank!=RANK_NULL);
  2350. parent->cancel((srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag);
  2351. }
  2352. void disconnect(INode *node)
  2353. {
  2354. CriticalBlock block(verifysect);
  2355. CMPChannel &channel = parent->lookup(node->endpoint());
  2356. channel.closeSocket();
  2357. parent->removeChannel(&channel);
  2358. }
  2359. CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
  2360. {
  2361. outer = _outer;
  2362. parent = _parent;
  2363. group = LINK(_group);
  2364. }
  2365. ~CCommunicator()
  2366. {
  2367. group->Release();
  2368. }
  2369. };
  2370. ICommunicator *createCommunicator(IGroup *group,bool outer)
  2371. {
  2372. assertex(MPserver!=NULL);
  2373. return new CCommunicator(MPserver,group,outer);
  2374. }
  2375. static IInterCommunicator *worldcomm=NULL;
  2376. IInterCommunicator &queryWorldCommunicator()
  2377. {
  2378. CriticalBlock block(CMPServer::serversect);
  2379. assertex(MPserver!=NULL);
  2380. if (!worldcomm)
  2381. worldcomm = new CInterCommunicator(MPserver);
  2382. return *worldcomm;
  2383. }
  2384. void startMPServer(unsigned port, bool paused)
  2385. {
  2386. assertex(sizeof(PacketHeader)==32);
  2387. CriticalBlock block(CMPServer::serversect);
  2388. if (CMPServer::servernest==0)
  2389. {
  2390. if (!CMPServer::serverpaused)
  2391. {
  2392. delete MPserver;
  2393. MPserver = new CMPServer(port);
  2394. }
  2395. if (paused)
  2396. {
  2397. CMPServer::serverpaused = true;
  2398. return;
  2399. }
  2400. queryLogMsgManager()->setPort(MPserver->getPort());
  2401. MPserver->start();
  2402. CMPServer::serverpaused = false;
  2403. }
  2404. CMPServer::servernest++;
  2405. }
  2406. void stopMPServer()
  2407. {
  2408. CriticalBlock block(CMPServer::serversect);
  2409. if (--CMPServer::servernest==0) {
  2410. stopLogMsgReceivers();
  2411. #ifdef _TRACE
  2412. LOG(MCdebugInfo(100), unknownJob, "Stopping MP Server");
  2413. #endif
  2414. CriticalUnblock unblock(CMPServer::serversect);
  2415. assertex(MPserver!=NULL);
  2416. MPserver->stop();
  2417. delete MPserver;
  2418. MPserver = NULL;
  2419. ::Release(worldcomm);
  2420. worldcomm = NULL;
  2421. initMyNode(0);
  2422. #ifdef _TRACE
  2423. LOG(MCdebugInfo(100), unknownJob, "Stopped MP Server");
  2424. #endif
  2425. }
  2426. }
  2427. extern mp_decl void addMPConnectionMonitor(IConnectionMonitor *monitor)
  2428. {
  2429. CriticalBlock block(CMPServer::serversect);
  2430. assertex(MPserver);
  2431. MPserver->addConnectionMonitor(monitor);
  2432. }
  2433. extern mp_decl void removeMPConnectionMonitor(IConnectionMonitor *monitor)
  2434. {
  2435. CriticalBlock block(CMPServer::serversect);
  2436. if (MPserver)
  2437. MPserver->removeConnectionMonitor(monitor);
  2438. }
  2439. StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
  2440. {
  2441. CriticalBlock block(CMPServer::serversect);
  2442. if (MPserver)
  2443. MPserver->getReceiveQueueDetails(buf);
  2444. return buf;
  2445. }
  2446. void registerSelfDestructChildProcess(HANDLE handle)
  2447. {
  2448. CriticalBlock block(childprocesssect);
  2449. if (handle!=(HANDLE)-1)
  2450. childprocesslist.append((unsigned)handle);
  2451. }
  2452. void unregisterSelfDestructChildProcess(HANDLE handle)
  2453. {
  2454. CriticalBlock block(childprocesssect);
  2455. if (handle!=(HANDLE)-1)
  2456. childprocesslist.zap((unsigned)handle);
  2457. }