rmtssh.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "portlist.h"
  15. #include "jlib.hpp"
  16. #include "jio.hpp"
  17. #include "jlog.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jencrypt.hpp"
  21. #include "rmtssh.hpp"
  22. #ifndef _WIN32
  23. #include <wordexp.h>
  24. #endif
  25. //----------------------------------------------------------------------------
  26. //#define PLINK_USE_CMD
  27. class CFRunSSH: public CInterface, implements IFRunSSH
  28. {
  29. unsigned numthreads;
  30. unsigned connecttimeout;
  31. unsigned attempts;
  32. StringAttr cmd;
  33. StringAttr identityfile;
  34. StringAttr user;
  35. StringAttr password; // encrypted
  36. StringAttr workdir;
  37. StringAttr slavesfile;
  38. StringArray slaves;
  39. StringAttr treeroot;
  40. StringArray replytext;
  41. UnsignedArray reply;
  42. UnsignedArray done;
  43. bool background;
  44. bool strict;
  45. bool verbose;
  46. bool dryrun;
  47. bool useplink;
  48. int replicationoffset;
  49. CriticalSection sect;
  50. StringBuffer expandCmd(StringBuffer &cmdbuf, unsigned nodenum, unsigned treefrom)
  51. {
  52. const char *cp=cmd.get();
  53. if (!cp)
  54. return cmdbuf;
  55. for (; *cp; cp++) {
  56. if ((*cp=='%') && cp[1]) {
  57. cp++;
  58. switch (*cp) {
  59. case 'n': // Node number
  60. cmdbuf.append(nodenum+1);
  61. break;
  62. case 'a': // Node address
  63. cmdbuf.append(slaves.item(nodenum));
  64. break;
  65. case 'l': // Node list
  66. cmdbuf.append(slavesfile);
  67. break;
  68. case '%':
  69. cmdbuf.append('%');
  70. break;
  71. case 'x': // Next Node
  72. cmdbuf.append(slaves.item((nodenum+replicationoffset)%slaves.ordinality()));
  73. break;
  74. case 'c':
  75. cmdbuf.append(slaves.ordinality());
  76. break;
  77. case 't': // Tree Node
  78. if (treefrom)
  79. cmdbuf.append(slaves.item(treefrom-1));
  80. else
  81. cmdbuf.append(treeroot);
  82. break;
  83. case 's': { // ssh params
  84. bool usepssh = !password.isEmpty();
  85. cmdbuf.appendf("%s -o LogLevel=QUIET -o StrictHostKeyChecking=%s -o BatchMode=yes ",usepssh?"pssh":"ssh",strict?"yes":"no");
  86. if (!identityfile.isEmpty())
  87. cmdbuf.appendf("-i %s ",identityfile.get());
  88. if (background)
  89. cmdbuf.append("-f ");
  90. if (connecttimeout)
  91. cmdbuf.appendf("-o ConnectTimeout=%d ",connecttimeout);
  92. if (attempts)
  93. cmdbuf.appendf("-o ConnectionAttempts=%d ",attempts);
  94. if (!user.isEmpty())
  95. cmdbuf.appendf("-l %s ",user.get());
  96. }
  97. break;
  98. default: // treat as literal (?)
  99. cmdbuf.append('%').append(*cp);
  100. break;
  101. }
  102. }
  103. else
  104. cmdbuf.append(*cp);
  105. }
  106. return cmdbuf;
  107. }
  108. void loadSlaves()
  109. {
  110. FILE *slavesFile = fopen(slavesfile.get(), "rt");
  111. if( !slavesFile) {
  112. const char * s = slavesfile.get();
  113. while (*s&&(isdigit(*s)||(*s=='.')||(*s==',')||(*s==':')||(*s=='-')||(*s=='*')))
  114. s++;
  115. if (!*s) {
  116. SocketEndpointArray sa;
  117. sa.fromText(slavesfile.get(),0);
  118. if (sa.ordinality()) {
  119. StringBuffer ns;
  120. ForEachItemIn(i,sa) {
  121. sa.item(i).getIpText(ns.clear());
  122. slaves.append(ns.str());
  123. }
  124. return;
  125. }
  126. }
  127. throw MakeStringException(-1, "Failed to open slaves file %s", slavesfile.get());
  128. }
  129. char inbuf[1000];
  130. StringAttr slave;
  131. while (fgets( inbuf, sizeof(inbuf), slavesFile)) {
  132. char *hash = strchr(inbuf, '#');
  133. if (hash)
  134. *hash = 0;
  135. char *finger = inbuf;
  136. for (;;) {
  137. while (isspace(*finger))
  138. finger++;
  139. char *start = finger;
  140. while (*finger && !isspace(*finger))
  141. finger++;
  142. if (finger > start) {
  143. slave.set(start, finger - start);
  144. slaves.append(slave);
  145. }
  146. else
  147. break;
  148. }
  149. }
  150. fclose(slavesFile);
  151. }
  152. public:
  153. IMPLEMENT_IINTERFACE;
  154. CFRunSSH()
  155. {
  156. numthreads = 5;
  157. connecttimeout = 0; // no timeout
  158. attempts = 3;
  159. background = false;
  160. strict = false;
  161. verbose = false;
  162. dryrun = false;
  163. useplink = false;
  164. replicationoffset = 0;
  165. }
  166. void init(int argc,char * argv[])
  167. {
  168. numthreads = 10;
  169. connecttimeout = 0; // no timeout
  170. attempts = 3;
  171. background = false;
  172. strict = false;
  173. verbose = false;
  174. dryrun = false;
  175. useplink = false;
  176. for (int i=1; i<argc; i++) {
  177. const char *arg = argv[i];
  178. if (arg[0]=='-') {
  179. arg++;
  180. const char *parm = (arg[1]==':')?(arg+2):(arg+1);
  181. switch (toupper(*arg)) {
  182. case 'N':
  183. numthreads = *parm?atoi(parm):numthreads;
  184. break;
  185. case 'T':
  186. if (toupper(arg[1])=='R') {
  187. parm = (arg[2]==':')?(arg+3):(arg+2);
  188. treeroot.set(parm);
  189. break;
  190. }
  191. connecttimeout=*parm?atoi(parm):connecttimeout;
  192. break;
  193. case 'A':
  194. attempts=*parm?atoi(parm):attempts;
  195. break;
  196. case 'I':
  197. identityfile.set(parm);
  198. break;
  199. case 'U':
  200. user.set(parm);
  201. break;
  202. case 'D':
  203. if (*parm)
  204. workdir.set(parm);
  205. else
  206. dryrun = true;
  207. break;
  208. case 'S':
  209. strict = true;
  210. break;
  211. case 'B':
  212. background = true;
  213. break;
  214. case 'V':
  215. verbose = true;
  216. break;
  217. case 'O':
  218. replicationoffset = atoi(parm);
  219. break;
  220. case 'P':
  221. #ifdef _WIN32
  222. if (toupper(arg[1])=='L') {
  223. useplink = true;
  224. break;
  225. }
  226. #endif
  227. parm = (arg[2]==':')?(arg+3):(arg+2);
  228. if (!*parm)
  229. break;
  230. if (toupper(arg[1])=='W') {
  231. StringBuffer buf;
  232. encrypt(buf,parm);
  233. password.set(buf.str());
  234. break;
  235. }
  236. else if (toupper(arg[1])=='E') {
  237. password.set(parm);
  238. break;
  239. }
  240. // continue
  241. default:
  242. throw MakeStringException(-1,"Unknown option %s",argv[i]);
  243. }
  244. }
  245. else {
  246. if (slavesfile.isEmpty()) {
  247. slavesfile.set(argv[i]);
  248. loadSlaves();
  249. }
  250. else if (cmd.isEmpty())
  251. cmd.set(argv[i]);
  252. else
  253. throw MakeStringException(-1,"Unknown parameter %s",argv[i]);
  254. }
  255. }
  256. if (dryrun||(numthreads<=0))
  257. numthreads=1;
  258. if (!identityfile.isEmpty()&&!checkFileExists(identityfile.get()))
  259. throw MakeStringException(-1,"Cannot find identity file: %s",identityfile.get());
  260. if (!password.isEmpty()&&!identityfile.isEmpty()) {
  261. WARNLOG("SSH identity file specified, ignoring password");
  262. password.clear();
  263. }
  264. }
  265. void init(
  266. const char *cmdline,
  267. const char *identfilename,
  268. const char *username,
  269. const char *passwordenc,
  270. unsigned timeout,
  271. unsigned retries)
  272. {
  273. strict = false;
  274. verbose = false;
  275. numthreads = 1;
  276. connecttimeout=timeout;
  277. attempts=retries;
  278. #ifdef _WIN32
  279. identityfile.set(identfilename);
  280. #else
  281. if (identfilename&&*identfilename) {
  282. wordexp_t exp_result; // expand ~ etc
  283. wordexp(identfilename, &exp_result, 0);
  284. identityfile.set(exp_result.we_wordv[0]);
  285. wordfree(&exp_result);
  286. }
  287. else
  288. identityfile.clear();
  289. #endif
  290. user.set(username);
  291. password.set(passwordenc);
  292. cmd.set(cmdline);
  293. }
  294. unsigned log2(unsigned n)
  295. {
  296. assertex(n);
  297. unsigned ret=0;
  298. while (n>1) {
  299. ret++;
  300. n /= 2;
  301. }
  302. return ret;
  303. }
  304. unsigned pow2(unsigned n)
  305. {
  306. unsigned ret=1;
  307. while (n--)
  308. ret *= 2;
  309. return ret;
  310. }
  311. unsigned treesrc(unsigned n)
  312. {
  313. return n-pow2(log2(n));
  314. }
  315. void exec(unsigned i,unsigned treefrom)
  316. {
  317. int retcode=-1;
  318. StringBuffer outbuf;
  319. try {
  320. bool usepssh = false;
  321. StringBuffer cmdline;
  322. if (!password.isEmpty()) {
  323. #ifdef _WIN32
  324. if (useplink) {
  325. cmdline.append("plink -ssh -batch ");
  326. if (!user.isEmpty())
  327. cmdline.append(" -l ").append(user);
  328. StringBuffer tmp;
  329. decrypt(tmp,password);
  330. cmdline.append(" -pw ").append(tmp);
  331. cmdline.append(' ').append(slaves.item(i)).append(' ');
  332. #ifdef PLINK_USE_CMD
  333. // bit of a kludge
  334. cmdline.append("cmd /c \"");
  335. const char *dir = cmd.get();
  336. const char *s = dir;
  337. const char *e = NULL;
  338. while (*s>' ') {
  339. if (*s=='\\')
  340. e = s;
  341. s++;
  342. }
  343. #endif
  344. expandCmd(cmdline,i,treefrom);
  345. #ifdef PLINK_USE_CMD
  346. cmdline.append('"');
  347. #endif
  348. }
  349. else {
  350. // windows use psexec
  351. cmdline.append("psexec \\\\").append(slaves.item(i));
  352. if (!user.isEmpty())
  353. cmdline.append(" -u ").append(user);
  354. StringBuffer tmp;
  355. decrypt(tmp,password);
  356. cmdline.append(" -p ").append(tmp);
  357. if (background)
  358. cmdline.append("-d ");
  359. cmdline.append(' ');
  360. expandCmd(cmdline,i,treefrom);
  361. }
  362. #else
  363. // linux use pssh
  364. usepssh = true;
  365. #endif
  366. }
  367. if (cmdline.length()==0) {
  368. // ssh
  369. cmdline.appendf("%s -n -o LogLevel=QUIET -o StrictHostKeyChecking=%s ",usepssh?"pssh":"ssh",strict?"yes":"no -o UserKnownHostsFile=/dev/null");
  370. if (!usepssh)
  371. cmdline.append("-o BatchMode=yes ");
  372. if (!identityfile.isEmpty())
  373. cmdline.appendf("-i %s ",identityfile.get());
  374. if (background)
  375. cmdline.append("-f ");
  376. if (connecttimeout)
  377. cmdline.appendf("-o ConnectTimeout=%d ",connecttimeout);
  378. if (attempts)
  379. cmdline.appendf("-o ConnectionAttempts=%d ",attempts);
  380. if (usepssh) {
  381. StringBuffer tmp;
  382. decrypt(tmp,password);
  383. cmdline.appendf("-o password=%s ",tmp.str());
  384. }
  385. if (!user.isEmpty())
  386. cmdline.appendf("%s@",user.get());
  387. cmdline.appendf("%s \"",slaves.item(i));
  388. expandCmd(cmdline,i,treefrom);
  389. cmdline.append('"');
  390. }
  391. if (dryrun)
  392. printf("%s\n",cmdline.str());
  393. else {
  394. Owned<IPipeProcess> pipe = createPipeProcess();
  395. if (pipe->run((verbose&&!usepssh)?"FRUNSSH":NULL,cmdline.str(),workdir,
  396. useplink, // for some reason plink needs input handle
  397. true,true)) {
  398. byte buf[4096];
  399. for (;;) {
  400. size32_t read = pipe->read(sizeof(buf),buf);
  401. if (!read)
  402. break;
  403. outbuf.append(read,(const char *)buf);
  404. }
  405. retcode = pipe->wait();
  406. bool firsterr=true;
  407. for (;;) {
  408. size32_t read = pipe->readError(sizeof(buf),buf);
  409. if (!read)
  410. break;
  411. if (firsterr) {
  412. firsterr = false;
  413. if (outbuf.length())
  414. outbuf.append('\n');
  415. outbuf.append("STDERR: ");
  416. }
  417. outbuf.append(read,(const char *)buf);
  418. }
  419. }
  420. }
  421. }
  422. catch (IException *e) {
  423. e->errorMessage(outbuf);
  424. retcode = -2;
  425. }
  426. CriticalBlock block(sect);
  427. done.append(i);
  428. replytext.append(outbuf.str());
  429. reply.append((unsigned)retcode);
  430. }
  431. void exec()
  432. {
  433. if (!treeroot.isEmpty()) {
  434. // remove from slaves
  435. ForEachItemInRev(i,slaves)
  436. if (strcmp(slaves.item(i),treeroot)==0)
  437. slaves.remove(i);
  438. }
  439. if (slaves.ordinality()==0)
  440. return;
  441. class cRun: public CAsyncFor
  442. {
  443. bool treemode;
  444. CFRunSSH &parent;
  445. Semaphore *treesem;
  446. public:
  447. cRun(CFRunSSH &_parent)
  448. : parent(_parent)
  449. {
  450. treemode = !parent.treeroot.isEmpty();
  451. if (treemode) {
  452. treesem = new Semaphore[parent.slaves.ordinality()+1]; // don't actually use all
  453. treesem[0].signal();
  454. }
  455. else
  456. treesem = NULL;
  457. }
  458. ~cRun()
  459. {
  460. delete [] treesem;
  461. }
  462. void Do(unsigned i)
  463. {
  464. if (treemode) {
  465. unsigned from = parent.treesrc(i+1);
  466. treesem[from].wait();
  467. parent.exec(i,from);
  468. treesem[from].signal();
  469. treesem[i+1].signal();
  470. }
  471. else
  472. parent.exec(i,0);
  473. }
  474. } afor(*this);
  475. afor.For(slaves.ordinality(),(numthreads>slaves.ordinality())?slaves.ordinality():numthreads,!treeroot.isEmpty(),treeroot.isEmpty());
  476. if (dryrun)
  477. return;
  478. if (slaves.ordinality()>1) {
  479. int errCode = 0;
  480. Owned<IMultiException> multiException = MakeMultiException();
  481. for (unsigned i=0;i<done.ordinality();i++) {
  482. unsigned n = done.item(i);
  483. StringBuffer res(replytext.item(n));
  484. while (res.length()&&(res.charAt(res.length()-1)<=' '))
  485. res.setLength(res.length()-1);
  486. if (reply.item(n)) {
  487. errCode = reply.item(n);
  488. if (res.length())
  489. multiException->append(*MakeStringExceptionDirect(errCode,res.str()));
  490. else
  491. multiException->append(*MakeStringExceptionDirect(errCode,cmd.str()));
  492. }
  493. }
  494. if (errCode)
  495. throw multiException.getClear();
  496. }
  497. else {
  498. StringBuffer res(replytext.item(0));
  499. while (res.length()&&(res.charAt(res.length()-1)<=' '))
  500. res.setLength(res.length()-1);
  501. if (reply.item(0)) {
  502. if (res.length())
  503. throw MakeStringExceptionDirect(reply.item(0), res.str());
  504. else
  505. throw MakeStringExceptionDirect(reply.item(0), cmd.str());
  506. }
  507. }
  508. }
  509. void exec(
  510. const IpAddress &ip,
  511. const char *workdirname,
  512. bool _background)
  513. {
  514. background = _background;
  515. strict = false;
  516. verbose = false;
  517. StringBuffer ips;
  518. ip.getIpText(ips);
  519. slaves.kill();
  520. slaves.append(ips.str());
  521. numthreads = 1;
  522. workdir.set(workdirname);
  523. exec();
  524. }
  525. const StringArray &getReplyText() const
  526. {
  527. return replytext;
  528. }
  529. const UnsignedArray &getReply() const
  530. {
  531. return reply;
  532. }
  533. };
  534. IFRunSSH *createFRunSSH()
  535. {
  536. return new CFRunSSH;
  537. }