rmtspawn.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jliball.hpp"
  15. #include "platform.h"
  16. #include "portlist.h"
  17. #include "remoteerr.hpp"
  18. #include "rmtspawn.hpp"
  19. #include "rmtssh.hpp"
  20. #include "rmtpass.hpp"
  21. LogMsgCategory MCdetailDebugInfo(MCdebugInfo(1000));
  22. /*
  23. How the remote spawning works:
  24. i) the master starts a slave program using hoagent/ssh, passing a) who the master is and b) what reply tag to use
  25. ii) the slave starts up, and starts listening on a socket based on the reply tag.
  26. iii) the master connects to the socket, and is returned the ip/mpsocket that the slave is listening on.
  27. iv) The master connects to the slave on that mp channel.
  28. Complications:
  29. a) slave could fail to start
  30. b) slave/master could die at any point.
  31. c) more than one slave can be being started on the same socket/reply tag.
  32. Timeouts:
  33. master->slave socket connect 300 seconds + buffer read + delay * 20 attempts (assuming bad connect throws an exception)
  34. slave for master 5 minutes normally, max 5 mins * 20 * 20 attempts in weird cicumstances
  35. read buffer with no timeout - could it get stuck here?
  36. Q's
  37. What if always connect to an orphaned slave?
  38. MORE: This could be improved. Really there should be one thing connecting to the socket, that shares all the
  39. attempted connections. That would solve the problem of connecting for the wrong slave. However, since
  40. it is only a problem for running all the slaves on the same machine its probably not worth worrying about.
  41. */
  42. static unsigned nextReplyTag;
  43. static StringAttr SSHidentfilename;
  44. static StringAttr SSHusername;
  45. static StringAttr SSHpasswordenc;
  46. static unsigned SSHtimeout;
  47. static unsigned SSHretries;
  48. static StringAttr SSHexeprefix;
  49. void setRemoteSpawnSSH(
  50. const char *identfilename,
  51. const char *username, // if NULL then disable SSH
  52. const char *passwordenc,
  53. unsigned timeout,
  54. unsigned retries,
  55. const char *exeprefix)
  56. {
  57. SSHidentfilename.set(identfilename);
  58. SSHusername.set(username);
  59. SSHpasswordenc.set(passwordenc);
  60. SSHtimeout = timeout;
  61. SSHretries = retries;
  62. SSHexeprefix.set(exeprefix);
  63. }
  64. void getRemoteSpawnSSH(
  65. StringAttr &identfilename,
  66. StringAttr &username, // if isEmpty then disable SSH
  67. StringAttr &passwordenc,
  68. unsigned &timeout,
  69. unsigned &retries,
  70. StringAttr &exeprefix)
  71. {
  72. identfilename.set(SSHidentfilename);
  73. username.set(SSHusername);
  74. passwordenc.set(SSHpasswordenc);
  75. timeout = SSHtimeout;
  76. retries = SSHretries;
  77. exeprefix.set(SSHexeprefix);
  78. }
  79. ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & childEP, unsigned version, const char *logdir, IAbortRequestCallback * abort, const char *extra)
  80. {
  81. SocketEndpoint myEP;
  82. myEP.setLocalHost(0);
  83. unsigned replyTag = ++nextReplyTag;
  84. unsigned port = SLAVE_CONNECT_PORT + ((unsigned)kind * NUM_SLAVE_CONNECT_PORT) + getRandom() % NUM_SLAVE_CONNECT_PORT;
  85. StringBuffer args;
  86. myEP.getUrlStr(args);
  87. args.append(' ').append(replyTag).append(' ').append((unsigned)kind).append(" ").append(port);
  88. if (extra)
  89. args.append(' ').append(extra);
  90. else
  91. args.append(" _");
  92. if (logdir)
  93. args.append(' ').append(logdir);
  94. StringBuffer cmd;
  95. if (SSHexeprefix.isEmpty())
  96. cmd.append(exe);
  97. else {
  98. const char * tail = splitDirTail(exe,cmd);
  99. size32_t l = strlen(tail);
  100. addPathSepChar(cmd).append(SSHexeprefix);
  101. if ((l>4)&&(memcmp(tail+l-4,".exe",4)==0)) // bit odd but want .bat if prefix on windows
  102. cmd.append(l-4,tail).append(".bat");
  103. else
  104. cmd.append(tail);
  105. }
  106. cmd.append(' ').append(args);
  107. if (SSHusername.isEmpty())
  108. {
  109. #if defined(_WIN32)
  110. //Run the program directly if it is being run on the local machine - so ssh doesn't need to be running...
  111. //Change once we have solved the problems with ssh etc. on windows?
  112. if (childEP.isLocal())
  113. {
  114. DWORD runcode;
  115. if (!invoke_program(cmd.str(), runcode, false))
  116. return NULL;
  117. }
  118. else
  119. #endif
  120. throw MakeStringException(-1,"SSH user not specified");
  121. }
  122. else {
  123. Owned<IFRunSSH> runssh = createFRunSSH();
  124. runssh->init(cmd.str(),SSHidentfilename,SSHusername,SSHpasswordenc,SSHtimeout,SSHretries);
  125. runssh->exec(childEP,NULL,true); // need workdir? TBD
  126. }
  127. //Have to now try and connect to the child and get back the port it is listening on
  128. unsigned attempts = 20;
  129. SocketEndpoint connectEP(childEP);
  130. connectEP.port = port;
  131. LOG(MCdetailDebugInfo, unknownJob, "Start connect to correct slave (%3d)", replyTag);
  132. IException * error = NULL;
  133. ISocket * result = NULL;
  134. while (!result && attempts)
  135. {
  136. if (abort && abort->abortRequested())
  137. break;
  138. try
  139. {
  140. StringBuffer tmp;
  141. connectEP.getUrlStr(tmp);
  142. LOG(MCdetailDebugInfo, unknownJob, "Try to connect to slave %s",tmp.str());
  143. Owned<ISocket> socket = ISocket::connect_wait(connectEP,MASTER_CONNECT_SLAVE_TIMEOUT);
  144. if (socket)
  145. {
  146. try
  147. {
  148. MemoryBuffer buffer;
  149. buffer.setEndian(__BIG_ENDIAN);
  150. buffer.append(version);
  151. myEP.ipserialize(buffer);
  152. buffer.append((unsigned)kind);
  153. buffer.append(replyTag);
  154. writeBuffer(socket, buffer);
  155. bool connected;
  156. unsigned slaveTag;
  157. readBuffer(socket, buffer.clear(), 100*1000);
  158. buffer.read(connected);
  159. buffer.read(slaveTag);
  160. SocketEndpoint childEP;
  161. childEP.deserialize(buffer);
  162. if (connected)
  163. {
  164. assertex(slaveTag == replyTag);
  165. LOG(MCdetailDebugInfo, unknownJob, "Connected to correct slave (%3d)", replyTag);
  166. result = socket.getClear();
  167. break;
  168. }
  169. unsigned slaveVersion = 5;
  170. unsigned slaveKind = kind;
  171. if (buffer.getPos() < buffer.length())
  172. buffer.read(slaveVersion);
  173. if (buffer.getPos() < buffer.length())
  174. buffer.read(slaveKind);
  175. if ((slaveVersion != version) && (slaveKind == kind))
  176. {
  177. error = MakeStringException(RFSERR_VersionMismatch, RFSERR_VersionMismatch_Text, version, slaveVersion);
  178. break;
  179. }
  180. if (slaveKind != kind)
  181. LOG(MCdetailDebugInfo, unknownJob, "Connected to wrong kind of slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  182. else
  183. LOG(MCdetailDebugInfo, unknownJob, "Failed to connect to correct slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  184. //Wrong slave listening, need to leave time for the other, don't count as an attempt
  185. MilliSleep(rand() % 5000 + 5000);
  186. }
  187. catch (IException * e)
  188. {
  189. StringBuffer s;
  190. s.appendf("Retry after exception talking to slave (%d): ",replyTag);
  191. e->errorMessage(s);
  192. LOG(MCdetailDebugInfo, unknownJob, "%s", s.str());
  193. e->Release();
  194. //Probably another element just connected, and the listening socket has just been killed.
  195. //So try again. Wait just long enough to give another thread a chance.
  196. MilliSleep(10);
  197. }
  198. }
  199. }
  200. catch (IException * e)
  201. {
  202. StringBuffer s;
  203. LOG(MCdetailDebugInfo, unknownJob, e, s.appendf("Failed to connect to slave (%d) (try again): ", replyTag).str());
  204. e->Release();
  205. // No socket listening or contention - try again fairly soon
  206. MilliSleep(rand()%400+100);
  207. attempts--;
  208. }
  209. }
  210. if (error)
  211. throw error;
  212. if (!result)
  213. ERRLOG("Failed to connect to slave (%d)", replyTag);
  214. return result;
  215. }
  216. //---------------------------------------------------------------------------
  217. CRemoteParentInfo::CRemoteParentInfo()
  218. {
  219. }
  220. bool CRemoteParentInfo::processCommandLine(int argc, char * argv[], StringBuffer &logdir)
  221. {
  222. if (argc <= 4)
  223. return false;
  224. parent.set(argv[1]);
  225. replyTag = atoi(argv[2]);
  226. kind = (SpawnKind)atoi(argv[3]);
  227. port = atoi(argv[4]);
  228. // 5 is extra (only used in logging)
  229. if (argc>6)
  230. logdir.clear().append(argv[6]);
  231. return true;
  232. }
  233. void CRemoteParentInfo::log()
  234. {
  235. StringBuffer temp;
  236. LOG(MCdebugProgress, unknownJob, "Starting remote slave. Master=%s reply=%d port=%d", parent.getUrlStr(temp).str(), replyTag, port);
  237. }
  238. bool CRemoteParentInfo::sendReply(unsigned version)
  239. {
  240. unsigned listenAttempts = 20;
  241. MemoryBuffer buffer;
  242. buffer.setEndian(__BIG_ENDIAN);
  243. while (listenAttempts--)
  244. {
  245. try
  246. {
  247. LOG(MCdebugInfo(1000), unknownJob, "Ready to listen. reply=%d port=%d", replyTag, port);
  248. Owned<ISocket> listen = ISocket::create(port, 1);
  249. if (listen)
  250. {
  251. unsigned receiveAttempts = 10;
  252. unsigned connectVersion;
  253. unsigned connectTag;
  254. unsigned connectKind;
  255. StringBuffer masterIPtext;
  256. while (receiveAttempts--)
  257. {
  258. try
  259. {
  260. LOG(MCdebugInfo(1000), unknownJob, "Ready to accept connection. reply=%d", replyTag);
  261. if (!listen->wait_read(SLAVE_LISTEN_FOR_MASTER_TIMEOUT))
  262. {
  263. LOG(MCdebugInfo(1000), unknownJob, "Gave up waiting for a connection. reply=%d", replyTag);
  264. return false;
  265. }
  266. Owned<ISocket> connect = listen->accept();
  267. readBuffer(connect, buffer.clear());
  268. buffer.read(connectVersion);
  269. bool same = false;
  270. unsigned replyVersion = version;
  271. IpAddress masterIP;
  272. masterIP.ipdeserialize(buffer);
  273. buffer.read(connectKind);
  274. if (version == connectVersion)
  275. {
  276. buffer.read(connectTag);
  277. masterIP.getIpText(masterIPtext.clear());
  278. LOG(MCdebugInfo(1000), unknownJob, "Process incoming connection. reply=%d got(%d,%s)", replyTag,connectTag,masterIPtext.str());
  279. same = (kind == connectKind) && masterIP.ipequals(parent) && (connectTag == replyTag);
  280. }
  281. else
  282. {
  283. //If connected to a different kind of slave, fake the version number
  284. //so it doesn't think there is a version mismatch
  285. //can remove when all .exes have new code.
  286. if (connectKind != kind)
  287. {
  288. LOG(MCdebugInfo(1000), unknownJob, "Connection for wrong slave kind (%d vs %d)- ignore", connectKind, kind);
  289. replyVersion = connectVersion;
  290. }
  291. }
  292. buffer.clear().append(same).append(replyTag);
  293. SocketEndpoint ep(1U);
  294. ep.serialize(buffer);
  295. buffer.append(version);
  296. buffer.append(kind);
  297. writeBuffer(connect, buffer);
  298. if (same)
  299. {
  300. socket.setown(connect.getClear());
  301. LOG(MCdebugInfo(1000), unknownJob, "Connection matched - continue....");
  302. return true;
  303. }
  304. if ((connectKind == kind) && (version != connectVersion))
  305. {
  306. LOG(MCdebugInfo, unknownJob, "Version mismatch - terminating slave process expected %d got %d", version, connectVersion);
  307. return false;
  308. }
  309. }
  310. catch (IException * e)
  311. {
  312. EXCLOG(e, "Error reading information from master: ");
  313. e->Release();
  314. }
  315. MilliSleep(50);
  316. }
  317. }
  318. }
  319. catch (IException * e)
  320. {
  321. EXCLOG(e, "Failed to create master listener: ");
  322. e->Release();
  323. }
  324. MilliSleep(rand() % 3000 + 2000);
  325. }
  326. return false;
  327. }
  328. //---------------------------------------------------------------------------
  329. CRemoteSlave::CRemoteSlave(const char * _name, unsigned _tag, unsigned _version, bool _stayAlive)
  330. {
  331. slaveName.set(_name);
  332. tag = _tag;
  333. stayAlive = _stayAlive;
  334. version = _version;
  335. }
  336. void CRemoteSlave::run(int argc, char * argv[])
  337. {
  338. StringBuffer logFile;
  339. CRemoteParentInfo info;
  340. bool paramsok = info.processCommandLine(argc, argv, logFile);
  341. if (logFile.length()==0) { // not expected!
  342. #ifdef _WIN32
  343. //logFile.append("c:\\"); // don't write to root on windows!
  344. #else
  345. if (checkDirExists("/c$"))
  346. logFile.append("/c$/");
  347. #endif
  348. }
  349. if (logFile.length())
  350. addPathSepChar(logFile);
  351. logFile.append(slaveName);
  352. addFileTimestamp(logFile, true);
  353. logFile.append(".log");
  354. attachStandardFileLogMsgMonitor(logFile.str(), 0, MSGFIELD_STANDARD, MSGAUD_all, MSGCLS_all, TopDetail, false, true, true);
  355. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler()); // no point logging output to screen if run remote!
  356. LOG(MCdebugProgress, unknownJob, "Starting %s %s %s %s %s %s %s",slaveName.get(),(argc>1)?argv[1]:"",(argc>2)?argv[2]:"",(argc>3)?argv[3]:"",(argc>4)?argv[4]:"",(argc>5)?argv[5]:"",(argc>6)?argv[6]:"");
  357. if (paramsok)
  358. {
  359. info.log();
  360. EnableSEHtoExceptionMapping();
  361. CachedPasswordProvider passwordProvider;
  362. setPasswordProvider(&passwordProvider);
  363. try
  364. {
  365. if (info.sendReply(version))
  366. {
  367. ISocket * masterSocket = info.queryMasterSocket();
  368. unsigned timeOut = RMTTIME_RESPONSE_MASTER;
  369. do
  370. {
  371. MemoryBuffer msg;
  372. MemoryBuffer results;
  373. results.setEndian(__BIG_ENDIAN);
  374. bool ok = false;
  375. Linked<IException> error;
  376. try
  377. {
  378. if (!catchReadBuffer(masterSocket, msg, timeOut))
  379. throwError(RFSERR_MasterSeemsToHaveDied);
  380. msg.setEndian(__BIG_ENDIAN);
  381. byte action;
  382. msg.read(action);
  383. passwordProvider.clear();
  384. passwordProvider.deserialize(msg);
  385. ok = processCommand(action, masterSocket, msg, results);
  386. }
  387. catch (IException * e)
  388. {
  389. PrintExceptionLog(e, slaveName.get());
  390. error.setown(e);
  391. }
  392. catch (RELEASE_CATCH_ALL)
  393. {
  394. LOG(MCwarning, unknownJob, "Server seems to have crashed - close done gracefully");
  395. error.setown(MakeStringException(999, "Server seems to have crashed - close done gracefully"));
  396. }
  397. msg.setEndian(__BIG_ENDIAN);
  398. msg.clear().append(true).append(ok);
  399. serializeException(error, msg);
  400. msg.append(results);
  401. catchWriteBuffer(masterSocket, msg);
  402. LOG(MCdebugProgress, unknownJob, "Results sent from slave %d", info.replyTag);
  403. //Acknowledgement before closing down...
  404. msg.clear();
  405. if (catchReadBuffer(masterSocket, msg, RMTTIME_RESPONSE_MASTER))
  406. {
  407. msg.read(ok);
  408. assertex(ok);
  409. }
  410. if (error)
  411. break;
  412. timeOut = 24*60*60*1000;
  413. } while (stayAlive);
  414. LOG(MCdebugProgress, unknownJob, "Terminate acknowledgement received from master for slave %d", info.replyTag);
  415. }
  416. }
  417. catch (IException * e)
  418. {
  419. PrintExceptionLog(e, slaveName.get());
  420. e->Release();
  421. }
  422. setPasswordProvider(NULL);
  423. }
  424. LOG(MCdebugProgress, unknownJob, "Stopping %s", slaveName.get());
  425. }
  426. #if 0
  427. void checkForRemoteAbort(ICommunicator * communicator, mptag_t tag)
  428. {
  429. if (communicator->probe(1, tag, NULL, 0)!=0)
  430. {
  431. CMessageBuffer msg;
  432. if (!communicator->recv(msg, 1, tag, NULL))
  433. throwError(RFSERR_TimeoutWaitMaster);
  434. bool aborting;
  435. msg.setEndian(__BIG_ENDIAN);
  436. msg.read(aborting);
  437. if (aborting)
  438. throwAbortException();
  439. }
  440. }
  441. void sendRemoteAbort(INode * node, mptag_t tag)
  442. {
  443. CMessageBuffer msg;
  444. msg.clear().append(true);
  445. queryWorldCommunicator().send(msg, node, tag, MP_ASYNC_SEND);
  446. }
  447. #endif
  448. void checkForRemoteAbort(ISocket * socket)
  449. {
  450. if (socket->wait_read(0))
  451. {
  452. MemoryBuffer msg;
  453. if (!catchReadBuffer(socket, msg))
  454. throwError(RFSERR_TimeoutWaitMaster);
  455. bool aborting;
  456. msg.setEndian(__BIG_ENDIAN);
  457. msg.read(aborting);
  458. if (aborting)
  459. throwAbortException();
  460. }
  461. }
  462. bool sendRemoteAbort(ISocket * socket)
  463. {
  464. LOG(MCdebugInfo, unknownJob, "Send abort to remote slave (%d)", isAborting());
  465. MemoryBuffer msg;
  466. msg.append(true);
  467. return catchWriteBuffer(socket, msg);
  468. }
  469. #if 0
  470. bool sendSlaveCommand(INode * remote, CMessageBuffer & msg, unsigned tag)
  471. {
  472. if (!queryWorldCommunicator().send(msg, remote, tag, FTTIME_CONNECT_SLAVE))
  473. throwError1(DFTERR_TimeoutWaitConnect, url.str());
  474. bool done;
  475. loop
  476. {
  477. msg.clear();
  478. if (!queryWorldCommunicator().recv(msg, remote, tag, NULL, FTTIME_PROGRESS))
  479. throwError1(DFTERR_TimeoutWaitSlave, url.str());
  480. msg.setEndian(__BIG_ENDIAN);
  481. msg.read(done);
  482. if (!done)
  483. return SCcontinue;
  484. }
  485. bool ok;
  486. msg.read(ok);
  487. error.setown(deserializeException(msg));
  488. if (error)
  489. sprayer.setHadError();
  490. msg.clear().append(true);
  491. queryWorldCommunicator().send(msg, remote, tag);
  492. }
  493. #endif