rmtspawn.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include "portlist.h"
  16. #include "remoteerr.hpp"
  17. #include "rmtspawn.hpp"
  18. #include "rmtssh.hpp"
  19. #include "rmtpass.hpp"
  20. LogMsgCategory MCdetailDebugInfo(MCdebugInfo(1000));
  21. /*
  22. How the remote spawning works:
  23. i) the master starts a slave program using hoagent/ssh, passing a) who the master is and b) what reply tag to use
  24. ii) the slave starts up, and starts listening on a socket based on the reply tag.
  25. iii) the master connects to the socket, and is returned the ip/mpsocket that the slave is listening on.
  26. iv) The master connects to the slave on that mp channel.
  27. Complications:
  28. a) slave could fail to start
  29. b) slave/master could die at any point.
  30. c) more than one slave can be being started on the same socket/reply tag.
  31. Timeouts:
  32. master->slave socket connect 300 seconds + buffer read + delay * 20 attempts (assuming bad connect throws an exception)
  33. slave for master 5 minutes normally, max 5 mins * 20 * 20 attempts in weird cicumstances
  34. read buffer with no timeout - could it get stuck here?
  35. Q's
  36. What if always connect to an orphaned slave?
  37. MORE: This could be improved. Really there should be one thing connecting to the socket, that shares all the
  38. attempted connections. That would solve the problem of connecting for the wrong slave. However, since
  39. it is only a problem for running all the slaves on the same machine its probably not worth worrying about.
  40. */
  41. static unsigned nextReplyTag;
  42. static StringAttr SSHidentfilename;
  43. static StringAttr SSHusername;
  44. static StringAttr SSHpasswordenc;
  45. static unsigned SSHtimeout;
  46. static unsigned SSHretries;
  47. static StringAttr SSHexeprefix;
  48. void setRemoteSpawnSSH(
  49. const char *identfilename,
  50. const char *username, // if NULL then disable SSH
  51. const char *passwordenc,
  52. unsigned timeout,
  53. unsigned retries,
  54. const char *exeprefix)
  55. {
  56. SSHidentfilename.set(identfilename);
  57. SSHusername.set(username);
  58. SSHpasswordenc.set(passwordenc);
  59. SSHtimeout = timeout;
  60. SSHretries = retries;
  61. SSHexeprefix.set(exeprefix);
  62. }
  63. void getRemoteSpawnSSH(
  64. StringAttr &identfilename,
  65. StringAttr &username, // if isEmpty then disable SSH
  66. StringAttr &passwordenc,
  67. unsigned &timeout,
  68. unsigned &retries,
  69. StringAttr &exeprefix)
  70. {
  71. identfilename.set(SSHidentfilename);
  72. username.set(SSHusername);
  73. passwordenc.set(SSHpasswordenc);
  74. timeout = SSHtimeout;
  75. retries = SSHretries;
  76. exeprefix.set(SSHexeprefix);
  77. }
  78. ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & childEP, unsigned version, const char *logdir, IAbortRequestCallback * abort, const char *extra)
  79. {
  80. SocketEndpoint myEP;
  81. myEP.setLocalHost(0);
  82. unsigned replyTag = ++nextReplyTag;
  83. unsigned port = SLAVE_CONNECT_PORT + ((unsigned)kind * NUM_SLAVE_CONNECT_PORT) + getRandom() % NUM_SLAVE_CONNECT_PORT;
  84. StringBuffer args;
  85. myEP.getUrlStr(args);
  86. args.append(' ').append(replyTag).append(' ').append((unsigned)kind).append(" ").append(port);
  87. if (extra)
  88. args.append(' ').append(extra);
  89. else
  90. args.append(" _");
  91. if (logdir)
  92. args.append(' ').append(logdir);
  93. StringBuffer cmd;
  94. if (SSHexeprefix.isEmpty())
  95. cmd.append(exe);
  96. else {
  97. const char * tail = splitDirTail(exe,cmd);
  98. size32_t l = strlen(tail);
  99. addPathSepChar(cmd).append(SSHexeprefix);
  100. if ((l>4)&&(memcmp(tail+l-4,".exe",4)==0)) // bit odd but want .bat if prefix on windows
  101. cmd.append(l-4,tail).append(".bat");
  102. else
  103. cmd.append(tail);
  104. }
  105. cmd.append(' ').append(args);
  106. if (SSHusername.isEmpty())
  107. {
  108. #if defined(_WIN32)
  109. //Run the program directly if it is being run on the local machine - so ssh doesn't need to be running...
  110. //Change once we have solved the problems with ssh etc. on windows?
  111. if (childEP.isLocal())
  112. {
  113. DWORD runcode;
  114. if (!invoke_program(cmd.str(), runcode, false))
  115. return NULL;
  116. }
  117. else
  118. #endif
  119. throw MakeStringException(-1,"SSH user not specified");
  120. }
  121. else {
  122. Owned<IFRunSSH> runssh = createFRunSSH();
  123. runssh->init(cmd.str(),SSHidentfilename,SSHusername,SSHpasswordenc,SSHtimeout,SSHretries);
  124. runssh->exec(childEP,NULL,true); // need workdir? TBD
  125. }
  126. //Have to now try and connect to the child and get back the port it is listening on
  127. unsigned attempts = 20;
  128. SocketEndpoint connectEP(childEP);
  129. connectEP.port = port;
  130. LOG(MCdetailDebugInfo, unknownJob, "Start connect to correct slave (%3d)", replyTag);
  131. IException * error = NULL;
  132. ISocket * result = NULL;
  133. while (!result && attempts)
  134. {
  135. if (abort && abort->abortRequested())
  136. break;
  137. try
  138. {
  139. StringBuffer tmp;
  140. connectEP.getUrlStr(tmp);
  141. LOG(MCdetailDebugInfo, unknownJob, "Try to connect to slave %s",tmp.str());
  142. Owned<ISocket> socket = ISocket::connect_wait(connectEP,MASTER_CONNECT_SLAVE_TIMEOUT);
  143. if (socket)
  144. {
  145. try
  146. {
  147. MemoryBuffer buffer;
  148. buffer.setEndian(__BIG_ENDIAN);
  149. buffer.append(version);
  150. myEP.ipserialize(buffer);
  151. buffer.append((unsigned)kind);
  152. buffer.append(replyTag);
  153. writeBuffer(socket, buffer);
  154. bool connected;
  155. unsigned slaveTag;
  156. readBuffer(socket, buffer.clear(), 100*1000);
  157. buffer.read(connected);
  158. buffer.read(slaveTag);
  159. SocketEndpoint childEP;
  160. childEP.deserialize(buffer);
  161. if (connected)
  162. {
  163. assertex(slaveTag == replyTag);
  164. LOG(MCdetailDebugInfo, unknownJob, "Connected to correct slave (%3d)", replyTag);
  165. result = socket.getClear();
  166. break;
  167. }
  168. unsigned slaveVersion = 5;
  169. unsigned slaveKind = kind;
  170. if (buffer.getPos() < buffer.length())
  171. buffer.read(slaveVersion);
  172. if (buffer.getPos() < buffer.length())
  173. buffer.read(slaveKind);
  174. if ((slaveVersion != version) && (slaveKind == kind))
  175. {
  176. error = MakeStringException(RFSERR_VersionMismatch, RFSERR_VersionMismatch_Text, version, slaveVersion);
  177. break;
  178. }
  179. if (slaveKind != kind)
  180. LOG(MCdetailDebugInfo, unknownJob, "Connected to wrong kind of slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  181. else
  182. LOG(MCdetailDebugInfo, unknownJob, "Failed to connect to correct slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  183. //Wrong slave listening, need to leave time for the other, don't count as an attempt
  184. MilliSleep(rand() % 5000 + 5000);
  185. }
  186. catch (IException * e)
  187. {
  188. StringBuffer s;
  189. s.appendf("Retry after exception talking to slave (%d): ",replyTag);
  190. e->errorMessage(s);
  191. LOG(MCdetailDebugInfo, unknownJob, "%s", s.str());
  192. e->Release();
  193. //Probably another element just connected, and the listening socket has just been killed.
  194. //So try again. Wait just long enough to give another thread a chance.
  195. MilliSleep(10);
  196. }
  197. }
  198. }
  199. catch (IException * e)
  200. {
  201. StringBuffer s;
  202. LOG(MCdetailDebugInfo, unknownJob, e, s.appendf("Failed to connect to slave (%d) (try again): ", replyTag).str());
  203. e->Release();
  204. // No socket listening or contention - try again fairly soon
  205. MilliSleep(rand()%400+100);
  206. attempts--;
  207. }
  208. }
  209. if (error)
  210. throw error;
  211. if (!result)
  212. ERRLOG("Failed to connect to slave (%d)", replyTag);
  213. return result;
  214. }
  215. //---------------------------------------------------------------------------
  216. CRemoteParentInfo::CRemoteParentInfo()
  217. {
  218. replyTag = 0;
  219. kind = SPAWNlast;
  220. port = 0;
  221. }
  222. bool CRemoteParentInfo::processCommandLine(int argc, char * argv[], StringBuffer &logdir)
  223. {
  224. if (argc <= 4)
  225. return false;
  226. parent.set(argv[1]);
  227. replyTag = atoi(argv[2]);
  228. kind = (SpawnKind)atoi(argv[3]);
  229. port = atoi(argv[4]);
  230. // 5 is extra (only used in logging)
  231. if (argc>6)
  232. logdir.clear().append(argv[6]);
  233. return true;
  234. }
  235. void CRemoteParentInfo::log()
  236. {
  237. StringBuffer temp;
  238. LOG(MCdebugProgress, unknownJob, "Starting remote slave. Master=%s reply=%d port=%d", parent.getUrlStr(temp).str(), replyTag, port);
  239. }
  240. bool CRemoteParentInfo::sendReply(unsigned version)
  241. {
  242. unsigned listenAttempts = 20;
  243. MemoryBuffer buffer;
  244. buffer.setEndian(__BIG_ENDIAN);
  245. while (listenAttempts--)
  246. {
  247. try
  248. {
  249. LOG(MCdebugInfo(1000), unknownJob, "Ready to listen. reply=%d port=%d", replyTag, port);
  250. Owned<ISocket> listen = ISocket::create(port, 1);
  251. if (listen)
  252. {
  253. unsigned receiveAttempts = 10;
  254. unsigned connectVersion;
  255. unsigned connectTag;
  256. unsigned connectKind;
  257. StringBuffer masterIPtext;
  258. while (receiveAttempts--)
  259. {
  260. try
  261. {
  262. LOG(MCdebugInfo(1000), unknownJob, "Ready to accept connection. reply=%d", replyTag);
  263. if (!listen->wait_read(SLAVE_LISTEN_FOR_MASTER_TIMEOUT))
  264. {
  265. LOG(MCdebugInfo(1000), unknownJob, "Gave up waiting for a connection. reply=%d", replyTag);
  266. return false;
  267. }
  268. Owned<ISocket> connect = listen->accept();
  269. readBuffer(connect, buffer.clear());
  270. buffer.read(connectVersion);
  271. bool same = false;
  272. IpAddress masterIP;
  273. masterIP.ipdeserialize(buffer);
  274. buffer.read(connectKind);
  275. if (version == connectVersion)
  276. {
  277. buffer.read(connectTag);
  278. masterIP.getIpText(masterIPtext.clear());
  279. LOG(MCdebugInfo(1000), unknownJob, "Process incoming connection. reply=%d got(%d,%s)", replyTag,connectTag,masterIPtext.str());
  280. same = (kind == connectKind) && masterIP.ipequals(parent) && (connectTag == replyTag);
  281. }
  282. else
  283. {
  284. //If connected to a different kind of slave, fake the version number
  285. //so it doesn't think there is a version mismatch
  286. //can remove when all .exes have new code.
  287. if (connectKind != kind)
  288. {
  289. LOG(MCdebugInfo(1000), unknownJob, "Connection for wrong slave kind (%u vs %u)- ignore", connectKind, kind);
  290. }
  291. }
  292. buffer.clear().append(same).append(replyTag);
  293. SocketEndpoint ep(1U);
  294. ep.serialize(buffer);
  295. buffer.append(version);
  296. buffer.append(kind);
  297. writeBuffer(connect, buffer);
  298. if (same)
  299. {
  300. socket.setown(connect.getClear());
  301. LOG(MCdebugInfo(1000), unknownJob, "Connection matched - continue....");
  302. return true;
  303. }
  304. if ((connectKind == kind) && (version != connectVersion))
  305. {
  306. LOG(MCdebugInfo, unknownJob, "Version mismatch - terminating slave process expected %d got %d", version, connectVersion);
  307. return false;
  308. }
  309. }
  310. catch (IException * e)
  311. {
  312. EXCLOG(e, "Error reading information from master: ");
  313. e->Release();
  314. }
  315. MilliSleep(50);
  316. }
  317. }
  318. }
  319. catch (IException * e)
  320. {
  321. EXCLOG(e, "Failed to create master listener: ");
  322. e->Release();
  323. }
  324. MilliSleep(rand() % 3000 + 2000);
  325. }
  326. return false;
  327. }
  328. //---------------------------------------------------------------------------
  329. CRemoteSlave::CRemoteSlave(const char * _name, unsigned _tag, unsigned _version, bool _stayAlive)
  330. {
  331. slaveName.set(_name);
  332. tag = _tag;
  333. stayAlive = _stayAlive;
  334. version = _version;
  335. }
  336. void CRemoteSlave::run(int argc, char * argv[])
  337. {
  338. StringBuffer logFile;
  339. CRemoteParentInfo info;
  340. bool paramsok = info.processCommandLine(argc, argv, logFile);
  341. if (logFile.length()==0) { // not expected!
  342. #ifdef _WIN32
  343. //logFile.append("c:\\"); // don't write to root on windows!
  344. #else
  345. if (checkDirExists("/c$"))
  346. logFile.append("/c$/");
  347. #endif
  348. }
  349. if (logFile.length())
  350. addPathSepChar(logFile);
  351. logFile.append(slaveName);
  352. addFileTimestamp(logFile, true);
  353. logFile.append(".log");
  354. attachStandardFileLogMsgMonitor(logFile.str(), 0, MSGFIELD_STANDARD, MSGAUD_all, MSGCLS_all, TopDetail, false, true, true);
  355. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler()); // no point logging output to screen if run remote!
  356. LOG(MCdebugProgress, unknownJob, "Starting %s %s %s %s %s %s %s",slaveName.get(),(argc>1)?argv[1]:"",(argc>2)?argv[2]:"",(argc>3)?argv[3]:"",(argc>4)?argv[4]:"",(argc>5)?argv[5]:"",(argc>6)?argv[6]:"");
  357. if (paramsok)
  358. {
  359. info.log();
  360. EnableSEHtoExceptionMapping();
  361. CachedPasswordProvider passwordProvider;
  362. setPasswordProvider(&passwordProvider);
  363. try
  364. {
  365. if (info.sendReply(version))
  366. {
  367. ISocket * masterSocket = info.queryMasterSocket();
  368. unsigned timeOut = RMTTIME_RESPONSE_MASTER;
  369. do
  370. {
  371. MemoryBuffer msg;
  372. MemoryBuffer results;
  373. results.setEndian(__BIG_ENDIAN);
  374. bool ok = false;
  375. Linked<IException> error;
  376. try
  377. {
  378. if (!catchReadBuffer(masterSocket, msg, timeOut))
  379. throwError(RFSERR_MasterSeemsToHaveDied);
  380. msg.setEndian(__BIG_ENDIAN);
  381. byte action;
  382. msg.read(action);
  383. passwordProvider.clear();
  384. passwordProvider.deserialize(msg);
  385. ok = processCommand(action, masterSocket, msg, results);
  386. }
  387. catch (IException * e)
  388. {
  389. PrintExceptionLog(e, slaveName.get());
  390. error.setown(e);
  391. }
  392. catch (RELEASE_CATCH_ALL)
  393. {
  394. LOG(MCwarning, unknownJob, "Server seems to have crashed - close done gracefully");
  395. error.setown(MakeStringException(999, "Server seems to have crashed - close done gracefully"));
  396. }
  397. msg.setEndian(__BIG_ENDIAN);
  398. msg.clear().append(true).append(ok);
  399. serializeException(error, msg);
  400. msg.append(results);
  401. catchWriteBuffer(masterSocket, msg);
  402. LOG(MCdebugProgress, unknownJob, "Results sent from slave %d", info.replyTag);
  403. //Acknowledgement before closing down...
  404. msg.clear();
  405. if (catchReadBuffer(masterSocket, msg, RMTTIME_RESPONSE_MASTER))
  406. {
  407. LOG(MCdebugProgress, unknownJob, "Terminate acknowledgement received from master for slave %d", info.replyTag);
  408. msg.read(ok);
  409. assertex(ok);
  410. }
  411. else
  412. LOG(MCdebugProgress, unknownJob, "No terminate acknowledgement received from master for slave %d", info.replyTag);
  413. if (error)
  414. break;
  415. timeOut = 24*60*60*1000;
  416. } while (stayAlive);
  417. LOG(MCdebugProgress, unknownJob, "Terminate acknowledgement received from master for slave %d", info.replyTag);
  418. }
  419. }
  420. catch (IException * e)
  421. {
  422. PrintExceptionLog(e, slaveName.get());
  423. e->Release();
  424. }
  425. setPasswordProvider(NULL);
  426. }
  427. LOG(MCdebugProgress, unknownJob, "Stopping %s", slaveName.get());
  428. }
  429. #if 0
  430. void checkForRemoteAbort(ICommunicator * communicator, mptag_t tag)
  431. {
  432. if (communicator->probe(1, tag, NULL, 0)!=0)
  433. {
  434. CMessageBuffer msg;
  435. if (!communicator->recv(msg, 1, tag, NULL))
  436. throwError(RFSERR_TimeoutWaitMaster);
  437. bool aborting;
  438. msg.setEndian(__BIG_ENDIAN);
  439. msg.read(aborting);
  440. if (aborting)
  441. throwAbortException();
  442. }
  443. }
  444. void sendRemoteAbort(INode * node, mptag_t tag)
  445. {
  446. CMessageBuffer msg;
  447. msg.clear().append(true);
  448. queryWorldCommunicator().send(msg, node, tag, MP_ASYNC_SEND);
  449. }
  450. #endif
  451. void checkForRemoteAbort(ISocket * socket)
  452. {
  453. if (socket->wait_read(0))
  454. {
  455. MemoryBuffer msg;
  456. if (!catchReadBuffer(socket, msg))
  457. throwError(RFSERR_TimeoutWaitMaster);
  458. bool aborting;
  459. msg.setEndian(__BIG_ENDIAN);
  460. msg.read(aborting);
  461. if (aborting)
  462. throwAbortException();
  463. }
  464. }
  465. bool sendRemoteAbort(ISocket * socket)
  466. {
  467. LOG(MCdebugInfo, unknownJob, "Send abort to remote slave (%d)", isAborting());
  468. MemoryBuffer msg;
  469. msg.append(true);
  470. return catchWriteBuffer(socket, msg);
  471. }
  472. #if 0
  473. bool sendSlaveCommand(INode * remote, CMessageBuffer & msg, unsigned tag)
  474. {
  475. if (!queryWorldCommunicator().send(msg, remote, tag, FTTIME_CONNECT_SLAVE))
  476. throwError1(DFTERR_TimeoutWaitConnect, url.str());
  477. bool done;
  478. loop
  479. {
  480. msg.clear();
  481. if (!queryWorldCommunicator().recv(msg, remote, tag, NULL, FTTIME_PROGRESS))
  482. throwError1(DFTERR_TimeoutWaitSlave, url.str());
  483. msg.setEndian(__BIG_ENDIAN);
  484. msg.read(done);
  485. if (!done)
  486. return SCcontinue;
  487. }
  488. bool ok;
  489. msg.read(ok);
  490. error.setown(deserializeException(msg));
  491. if (error)
  492. sprayer.setHadError();
  493. msg.clear().append(true);
  494. queryWorldCommunicator().send(msg, remote, tag);
  495. }
  496. #endif