rmtspawn.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include "portlist.h"
  16. /* NB: error numbers/text are currently in remoteerr.hpp
  17. * Which mainly contains error codes used by dafilesrv and clients
  18. * This is not really related, but for legacy reasons, the error code
  19. * numbers remain the same, and remain within the same header file.
  20. */
  21. #include "remoteerr.hpp"
  22. #include "rmtspawn.hpp"
  23. #include "rmtssh.hpp"
  24. constexpr LogMsgCategory MCdetailDebugInfo(MCdebugInfo(1000));
  25. /*
  26. How the remote spawning works:
  27. i) the master starts a slave program using hoagent/ssh, passing a) who the master is and b) what reply tag to use
  28. ii) the slave starts up, and starts listening on a socket based on the reply tag.
  29. iii) the master connects to the socket, and is returned the ip/mpsocket that the slave is listening on.
  30. iv) The master connects to the slave on that mp channel.
  31. Complications:
  32. a) slave could fail to start
  33. b) slave/master could die at any point.
  34. c) more than one slave can be being started on the same socket/reply tag.
  35. Timeouts:
  36. master->slave socket connect 300 seconds + buffer read + delay * 20 attempts (assuming bad connect throws an exception)
  37. slave for master 5 minutes normally, max 5 mins * 20 * 20 attempts in weird cicumstances
  38. read buffer with no timeout - could it get stuck here?
  39. Q's
  40. What if always connect to an orphaned slave?
  41. MORE: This could be improved. Really there should be one thing connecting to the socket, that shares all the
  42. attempted connections. That would solve the problem of connecting for the wrong slave. However, since
  43. it is only a problem for running all the slaves on the same machine its probably not worth worrying about.
  44. */
  45. static unsigned nextReplyTag;
  46. static StringAttr SSHidentfilename;
  47. static StringAttr SSHusername;
  48. static StringAttr SSHpasswordenc;
  49. static unsigned SSHtimeout;
  50. static unsigned SSHretries;
  51. static StringAttr SSHexeprefix;
  52. void setRemoteSpawnSSH(
  53. const char *identfilename,
  54. const char *username, // if NULL then disable SSH
  55. const char *passwordenc,
  56. unsigned timeout,
  57. unsigned retries,
  58. const char *exeprefix)
  59. {
  60. SSHidentfilename.set(identfilename);
  61. SSHusername.set(username);
  62. SSHpasswordenc.set(passwordenc);
  63. SSHtimeout = timeout;
  64. SSHretries = retries;
  65. SSHexeprefix.set(exeprefix);
  66. }
  67. void getRemoteSpawnSSH(
  68. StringAttr &identfilename,
  69. StringAttr &username, // if isEmpty then disable SSH
  70. StringAttr &passwordenc,
  71. unsigned &timeout,
  72. unsigned &retries,
  73. StringAttr &exeprefix)
  74. {
  75. identfilename.set(SSHidentfilename);
  76. username.set(SSHusername);
  77. passwordenc.set(SSHpasswordenc);
  78. timeout = SSHtimeout;
  79. retries = SSHretries;
  80. exeprefix.set(SSHexeprefix);
  81. }
  82. ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & childEP, unsigned version, const char *logdir, IAbortRequestCallback * abort, const char *extra)
  83. {
  84. SocketEndpoint myEP;
  85. myEP.setLocalHost(0);
  86. unsigned replyTag = ++nextReplyTag;
  87. unsigned port = SLAVE_CONNECT_PORT + ((unsigned)kind * NUM_SLAVE_CONNECT_PORT) + getRandom() % NUM_SLAVE_CONNECT_PORT;
  88. StringBuffer args;
  89. myEP.getUrlStr(args);
  90. args.append(' ').append(replyTag).append(' ').append((unsigned)kind).append(" ").append(port);
  91. if (extra)
  92. args.append(' ').append(extra);
  93. else
  94. args.append(" _");
  95. if (logdir)
  96. args.append(' ').append(logdir);
  97. StringBuffer cmd;
  98. if (SSHexeprefix.isEmpty())
  99. cmd.append(exe);
  100. else {
  101. const char * tail = splitDirTail(exe,cmd);
  102. size32_t l = strlen(tail);
  103. addPathSepChar(cmd).append(SSHexeprefix);
  104. if ((l>4)&&(memcmp(tail+l-4,".exe",4)==0)) // bit odd but want .bat if prefix on windows
  105. cmd.append(l-4,tail).append(".bat");
  106. else
  107. cmd.append(tail);
  108. }
  109. cmd.append(' ').append(args);
  110. if (abort && abort->abortRequested())
  111. {
  112. LOG(MCdetailDebugInfo, unknownJob, "Action aborted before connecting to slave (%3d)", replyTag);
  113. return NULL;
  114. }
  115. if (SSHusername.isEmpty())
  116. {
  117. #if defined(_WIN32)
  118. //Run the program directly if it is being run on the local machine - so ssh doesn't need to be running...
  119. //Change once we have solved the problems with ssh etc. on windows?
  120. if (childEP.isLocal())
  121. {
  122. DWORD runcode;
  123. if (!invoke_program(cmd.str(), runcode, false))
  124. return NULL;
  125. }
  126. else
  127. #endif
  128. throw MakeStringException(-1,"SSH user not specified");
  129. }
  130. else {
  131. Owned<IFRunSSH> runssh = createFRunSSH();
  132. runssh->init(cmd.str(),SSHidentfilename,SSHusername,SSHpasswordenc,SSHtimeout,SSHretries);
  133. runssh->exec(childEP,NULL,true); // need workdir? TBD
  134. }
  135. //Have to now try and connect to the child and get back the port it is listening on
  136. unsigned attempts = 20;
  137. SocketEndpoint connectEP(childEP);
  138. connectEP.port = port;
  139. LOG(MCdetailDebugInfo, unknownJob, "Start connect to correct slave (%3d)", replyTag);
  140. IException * error = NULL;
  141. ISocket * result = NULL;
  142. while (!result && attempts)
  143. {
  144. try
  145. {
  146. StringBuffer tmp;
  147. connectEP.getUrlStr(tmp);
  148. LOG(MCdetailDebugInfo, unknownJob, "Try to connect to slave %s",tmp.str());
  149. Owned<ISocket> socket = ISocket::connect_wait(connectEP,MASTER_CONNECT_SLAVE_TIMEOUT);
  150. if (socket)
  151. {
  152. try
  153. {
  154. MemoryBuffer buffer;
  155. buffer.setEndian(__BIG_ENDIAN);
  156. buffer.append(version);
  157. myEP.ipserialize(buffer);
  158. buffer.append((unsigned)kind);
  159. buffer.append(replyTag);
  160. writeBuffer(socket, buffer);
  161. bool connected;
  162. unsigned slaveTag;
  163. readBuffer(socket, buffer.clear(), 100*1000);
  164. buffer.read(connected);
  165. buffer.read(slaveTag);
  166. SocketEndpoint childEP;
  167. childEP.deserialize(buffer);
  168. if (connected)
  169. {
  170. assertex(slaveTag == replyTag);
  171. LOG(MCdetailDebugInfo, unknownJob, "Connected to correct slave (%3d)", replyTag);
  172. result = socket.getClear();
  173. break;
  174. }
  175. unsigned slaveVersion = 5;
  176. unsigned slaveKind = kind;
  177. if (buffer.getPos() < buffer.length())
  178. buffer.read(slaveVersion);
  179. if (buffer.getPos() < buffer.length())
  180. buffer.read(slaveKind);
  181. if ((slaveVersion != version) && (slaveKind == kind))
  182. {
  183. error = MakeStringException(RFSERR_VersionMismatch, RFSERR_VersionMismatch_Text, version, slaveVersion);
  184. break;
  185. }
  186. if (slaveKind != kind)
  187. LOG(MCdetailDebugInfo, unknownJob, "Connected to wrong kind of slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  188. else
  189. LOG(MCdetailDebugInfo, unknownJob, "Failed to connect to correct slave (%d,%d/%d) - try again later",connected,replyTag,slaveTag);
  190. //Wrong slave listening, need to leave time for the other, don't count as an attempt
  191. MilliSleep(fastRand() % 5000 + 5000);
  192. }
  193. catch (IException * e)
  194. {
  195. StringBuffer s;
  196. s.appendf("Retry after exception talking to slave (%d): ",replyTag);
  197. e->errorMessage(s);
  198. LOG(MCdetailDebugInfo, unknownJob, "%s", s.str());
  199. e->Release();
  200. //Probably another element just connected, and the listening socket has just been killed.
  201. //So try again. Wait just long enough to give another thread a chance.
  202. MilliSleep(10);
  203. }
  204. }
  205. }
  206. catch (IException * e)
  207. {
  208. StringBuffer s;
  209. LOG(MCdetailDebugInfo, unknownJob, e, s.appendf("Failed to connect to slave (%d) (try again): ", replyTag).str());
  210. e->Release();
  211. // No socket listening or contention - try again fairly soon
  212. MilliSleep(fastRand()%400+100);
  213. attempts--;
  214. }
  215. if (abort && abort->abortRequested())
  216. break;
  217. }
  218. if (error)
  219. throw error;
  220. if (!result)
  221. ERRLOG("Failed to connect to slave (%d)", replyTag);
  222. return result;
  223. }
  224. //---------------------------------------------------------------------------
  225. CRemoteParentInfo::CRemoteParentInfo()
  226. {
  227. replyTag = 0;
  228. kind = SPAWNlast;
  229. port = 0;
  230. }
  231. bool CRemoteParentInfo::processCommandLine(int argc, char * argv[], StringBuffer &logdir)
  232. {
  233. if (argc <= 4)
  234. return false;
  235. parent.set(argv[1]);
  236. replyTag = atoi(argv[2]);
  237. kind = (SpawnKind)atoi(argv[3]);
  238. port = atoi(argv[4]);
  239. // 5 is extra (only used in logging)
  240. if (argc>6)
  241. logdir.clear().append(argv[6]);
  242. return true;
  243. }
  244. void CRemoteParentInfo::log()
  245. {
  246. StringBuffer temp;
  247. LOG(MCdebugProgress, unknownJob, "Starting remote slave. Master=%s reply=%d port=%d", parent.getUrlStr(temp).str(), replyTag, port);
  248. }
  249. bool CRemoteParentInfo::sendReply(unsigned version)
  250. {
  251. unsigned listenAttempts = 20;
  252. MemoryBuffer buffer;
  253. buffer.setEndian(__BIG_ENDIAN);
  254. while (listenAttempts--)
  255. {
  256. try
  257. {
  258. LOG(MCdetailDebugInfo, unknownJob, "Ready to listen. reply=%d port=%d", replyTag, port);
  259. Owned<ISocket> listen = ISocket::create(port, 1);
  260. if (listen)
  261. {
  262. unsigned receiveAttempts = 10;
  263. unsigned connectVersion;
  264. unsigned connectTag;
  265. unsigned connectKind;
  266. StringBuffer masterIPtext;
  267. while (receiveAttempts--)
  268. {
  269. try
  270. {
  271. LOG(MCdetailDebugInfo, unknownJob, "Ready to accept connection. reply=%d", replyTag);
  272. if (!listen->wait_read(SLAVE_LISTEN_FOR_MASTER_TIMEOUT))
  273. {
  274. LOG(MCdetailDebugInfo, unknownJob, "Gave up waiting for a connection. reply=%d", replyTag);
  275. return false;
  276. }
  277. Owned<ISocket> connect = listen->accept();
  278. readBuffer(connect, buffer.clear());
  279. buffer.read(connectVersion);
  280. bool same = false;
  281. IpAddress masterIP;
  282. masterIP.ipdeserialize(buffer);
  283. buffer.read(connectKind);
  284. if (version == connectVersion)
  285. {
  286. buffer.read(connectTag);
  287. masterIP.getIpText(masterIPtext.clear());
  288. LOG(MCdetailDebugInfo, unknownJob, "Process incoming connection. reply=%d got(%d,%s)", replyTag,connectTag,masterIPtext.str());
  289. same = (kind == connectKind) && masterIP.ipequals(parent) && (connectTag == replyTag);
  290. }
  291. else
  292. {
  293. //If connected to a different kind of slave, fake the version number
  294. //so it doesn't think there is a version mismatch
  295. //can remove when all .exes have new code.
  296. if (connectKind != kind)
  297. {
  298. LOG(MCdetailDebugInfo, unknownJob, "Connection for wrong slave kind (%u vs %u)- ignore", connectKind, kind);
  299. }
  300. }
  301. buffer.clear().append(same).append(replyTag);
  302. SocketEndpoint ep(1U);
  303. ep.serialize(buffer);
  304. buffer.append(version);
  305. buffer.append(kind);
  306. writeBuffer(connect, buffer);
  307. if (same)
  308. {
  309. socket.setown(connect.getClear());
  310. LOG(MCdetailDebugInfo, unknownJob, "Connection matched - continue....");
  311. return true;
  312. }
  313. if ((connectKind == kind) && (version != connectVersion))
  314. {
  315. LOG(MCdebugInfo, unknownJob, "Version mismatch - terminating slave process expected %d got %d", version, connectVersion);
  316. return false;
  317. }
  318. }
  319. catch (IException * e)
  320. {
  321. EXCLOG(e, "Error reading information from master: ");
  322. e->Release();
  323. }
  324. MilliSleep(50);
  325. }
  326. }
  327. }
  328. catch (IException * e)
  329. {
  330. EXCLOG(e, "Failed to create master listener: ");
  331. e->Release();
  332. }
  333. MilliSleep(fastRand() % 3000 + 2000);
  334. }
  335. return false;
  336. }
  337. //---------------------------------------------------------------------------
  338. CRemoteSlave::CRemoteSlave(const char * _name, unsigned _tag, unsigned _version, bool _stayAlive)
  339. {
  340. slaveName.set(_name);
  341. tag = _tag;
  342. stayAlive = _stayAlive;
  343. version = _version;
  344. }
  345. void CRemoteSlave::run(int argc, char * argv[])
  346. {
  347. StringBuffer logFile;
  348. CRemoteParentInfo info;
  349. bool paramsok = info.processCommandLine(argc, argv, logFile);
  350. if (logFile.length()==0) { // not expected! Caller queries logfile location via getConfigurationDirectory
  351. #ifdef _WIN32
  352. logFile.append("c:\\HPCCSystems\\logs\\ftslave");
  353. #else
  354. logFile.append("/var/log/HPCCSystems/ftslave");
  355. #endif
  356. }
  357. if (logFile.length())
  358. addPathSepChar(logFile);
  359. logFile.append(slaveName);
  360. addFileTimestamp(logFile, true);
  361. logFile.append(".log");
  362. attachStandardFileLogMsgMonitor(logFile.str(), 0, MSGFIELD_STANDARD, MSGAUD_all, MSGCLS_all, TopDetail, false, true, true);
  363. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler()); // no point logging output to screen if run remote!
  364. LOG(MCdebugProgress, unknownJob, "Starting %s %s %s %s %s %s %s",slaveName.get(),(argc>1)?argv[1]:"",(argc>2)?argv[2]:"",(argc>3)?argv[3]:"",(argc>4)?argv[4]:"",(argc>5)?argv[5]:"",(argc>6)?argv[6]:"");
  365. if (paramsok)
  366. {
  367. info.log();
  368. EnableSEHtoExceptionMapping();
  369. try
  370. {
  371. if (info.sendReply(version))
  372. {
  373. ISocket * masterSocket = info.queryMasterSocket();
  374. unsigned timeOut = RMTTIME_RESPONSE_MASTER;
  375. do
  376. {
  377. MemoryBuffer msg;
  378. MemoryBuffer results;
  379. results.setEndian(__BIG_ENDIAN);
  380. bool ok = false;
  381. Linked<IException> error;
  382. try
  383. {
  384. if (!catchReadBuffer(masterSocket, msg, timeOut))
  385. throwError(RFSERR_MasterSeemsToHaveDied);
  386. msg.setEndian(__BIG_ENDIAN);
  387. byte action;
  388. msg.read(action);
  389. // <= 7.6.xx clients also send:
  390. // num passwords followed by those pwds
  391. // >= 7.8.0 clients until now do not send password data
  392. // so next unsigned could be either num passwords or netaddr
  393. // if unsigned value <= 10 assumme it is num passwords
  394. // as IP address will surely have higher bits set and be > 10
  395. unsigned numPasswds = 0;
  396. size32_t origPos = msg.getPos();
  397. msg.read(numPasswds);
  398. if (numPasswds <= 10)
  399. {
  400. for (int i=0; i<numPasswds; i++)
  401. {
  402. IpAddress tip;
  403. tip.ipdeserialize(msg);
  404. StringAttr password, username;
  405. msg.read(password).read(username);
  406. }
  407. }
  408. else
  409. msg.reset(origPos);
  410. ok = processCommand(action, masterSocket, msg, results);
  411. }
  412. catch (IException * e)
  413. {
  414. PrintExceptionLog(e, slaveName.get());
  415. error.setown(e);
  416. }
  417. catch (RELEASE_CATCH_ALL)
  418. {
  419. LOG(MCwarning, unknownJob, "Server seems to have crashed - close done gracefully");
  420. error.setown(MakeStringException(999, "Server seems to have crashed - close done gracefully"));
  421. }
  422. msg.setEndian(__BIG_ENDIAN);
  423. msg.clear().append(true).append(ok);
  424. serializeException(error, msg);
  425. msg.append(results);
  426. catchWriteBuffer(masterSocket, msg);
  427. LOG(MCdebugProgress, unknownJob, "Results sent from slave %d", info.replyTag);
  428. //Acknowledgement before closing down...
  429. msg.clear();
  430. if (catchReadBuffer(masterSocket, msg, RMTTIME_RESPONSE_MASTER))
  431. {
  432. LOG(MCdebugProgress, unknownJob, "Terminate acknowledgement received from master for slave %d", info.replyTag);
  433. msg.read(ok);
  434. assertex(ok);
  435. }
  436. else
  437. LOG(MCdebugProgress, unknownJob, "No terminate acknowledgement received from master for slave %d", info.replyTag);
  438. if (error)
  439. break;
  440. timeOut = 24*60*60*1000;
  441. } while (stayAlive);
  442. LOG(MCdebugProgress, unknownJob, "Terminate acknowledgement received from master for slave %d", info.replyTag);
  443. }
  444. }
  445. catch (IException * e)
  446. {
  447. PrintExceptionLog(e, slaveName.get());
  448. e->Release();
  449. }
  450. }
  451. LOG(MCdebugProgress, unknownJob, "Stopping %s", slaveName.get());
  452. }
  453. #if 0
  454. void checkForRemoteAbort(ICommunicator * communicator, mptag_t tag)
  455. {
  456. if (communicator->probe(1, tag, NULL, 0)!=0)
  457. {
  458. CMessageBuffer msg;
  459. if (!communicator->recv(msg, 1, tag, NULL))
  460. throwError(RFSERR_TimeoutWaitMaster);
  461. bool aborting;
  462. msg.setEndian(__BIG_ENDIAN);
  463. msg.read(aborting);
  464. if (aborting)
  465. throwAbortException();
  466. }
  467. }
  468. void sendRemoteAbort(INode * node, mptag_t tag)
  469. {
  470. CMessageBuffer msg;
  471. msg.clear().append(true);
  472. queryWorldCommunicator().send(msg, node, tag, MP_ASYNC_SEND);
  473. }
  474. #endif
  475. void checkForRemoteAbort(ISocket * socket)
  476. {
  477. if (socket->wait_read(0))
  478. {
  479. MemoryBuffer msg;
  480. if (!catchReadBuffer(socket, msg))
  481. throwError(RFSERR_TimeoutWaitMaster);
  482. bool aborting;
  483. msg.setEndian(__BIG_ENDIAN);
  484. msg.read(aborting);
  485. if (aborting)
  486. throwAbortException();
  487. }
  488. }
  489. bool sendRemoteAbort(ISocket * socket)
  490. {
  491. LOG(MCdebugInfo, unknownJob, "Send abort to remote slave (%d)", isAborting());
  492. MemoryBuffer msg;
  493. msg.append(true);
  494. return catchWriteBuffer(socket, msg);
  495. }
  496. #if 0
  497. bool sendSlaveCommand(INode * remote, CMessageBuffer & msg, unsigned tag)
  498. {
  499. if (!queryWorldCommunicator().send(msg, remote, tag, FTTIME_CONNECT_SLAVE))
  500. throwError1(DFTERR_TimeoutWaitConnect, url.str());
  501. bool done;
  502. for (;;)
  503. {
  504. msg.clear();
  505. if (!queryWorldCommunicator().recv(msg, remote, tag, NULL, FTTIME_PROGRESS))
  506. throwError1(DFTERR_TimeoutWaitSlave, url.str());
  507. msg.setEndian(__BIG_ENDIAN);
  508. msg.read(done);
  509. if (!done)
  510. return SCcontinue;
  511. }
  512. bool ok;
  513. msg.read(ok);
  514. error.setown(deserializeException(msg));
  515. if (error)
  516. sprayer.setHadError();
  517. msg.clear().append(true);
  518. queryWorldCommunicator().send(msg, remote, tag);
  519. }
  520. #endif