rmtssh.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "portlist.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jlog.hpp"
  19. #include "jmutex.hpp"
  20. #include "jfile.hpp"
  21. #include "jencrypt.hpp"
  22. #include "rmtssh.hpp"
  23. #ifndef _WIN32
  24. #include <wordexp.h>
  25. #endif
  26. static CBuildVersion _bv("$HeadURL: https://svn.br.seisint.com/ecl/trunk/common/remote/rmtssh.cpp $ $Id: rmtssh.cpp 64028 2011-04-14 14:28:10Z nhicks $");
  27. //----------------------------------------------------------------------------
  28. //#define PLINK_USE_CMD
  29. class CFRunSSH: public CInterface, implements IFRunSSH
  30. {
  31. unsigned numthreads;
  32. unsigned connecttimeout;
  33. unsigned attempts;
  34. StringAttr cmd;
  35. StringAttr identityfile;
  36. StringAttr user;
  37. StringAttr password; // encrypted
  38. StringAttr workdir;
  39. StringAttr slavesfile;
  40. StringArray slaves;
  41. StringAttr treeroot;
  42. StringArray replytext;
  43. UnsignedArray reply;
  44. UnsignedArray done;
  45. bool background;
  46. bool strict;
  47. bool verbose;
  48. bool dryrun;
  49. bool useplink;
  50. int replicationoffset;
  51. CriticalSection sect;
  52. StringBuffer expandCmd(StringBuffer &cmdbuf, unsigned nodenum, unsigned treefrom)
  53. {
  54. const char *cp=cmd.get();
  55. if (!cp)
  56. return cmdbuf;
  57. for (; *cp; cp++) {
  58. if ((*cp=='%') && cp[1]) {
  59. cp++;
  60. switch (*cp) {
  61. case 'n': // Node number
  62. cmdbuf.append(nodenum+1);
  63. break;
  64. case 'a': // Node address
  65. cmdbuf.append(slaves.item(nodenum));
  66. break;
  67. case 'l': // Node list
  68. cmdbuf.append(slavesfile);
  69. break;
  70. case '%':
  71. cmdbuf.append('%');
  72. break;
  73. case 'x': // Next Node
  74. cmdbuf.append(slaves.item((nodenum+replicationoffset)%slaves.ordinality()));
  75. break;
  76. case 't': // Tree Node
  77. if (treefrom)
  78. cmdbuf.append(slaves.item(treefrom-1));
  79. else
  80. cmdbuf.append(treeroot);
  81. break;
  82. case 's': { // ssh params
  83. bool usepssh = !password.isEmpty();
  84. cmdbuf.appendf("%s -o LogLevel=ERROR -o StrictHostKeyChecking=%s -o BatchMode=yes ",usepssh?"pssh":"ssh",strict?"yes":"no");
  85. if (!identityfile.isEmpty())
  86. cmdbuf.appendf("-i %s ",identityfile.get());
  87. if (background)
  88. cmdbuf.append("-f ");
  89. if (connecttimeout)
  90. cmdbuf.appendf("-o ConnectTimeout=%d ",connecttimeout);
  91. if (attempts)
  92. cmdbuf.appendf("-o ConnectionAttempts=%d ",attempts);
  93. if (!user.isEmpty())
  94. cmdbuf.appendf("-l %s ",user.get());
  95. }
  96. break;
  97. default: // treat as literal (?)
  98. cmdbuf.append('%').append(*cp);
  99. break;
  100. }
  101. }
  102. else
  103. cmdbuf.append(*cp);
  104. }
  105. return cmdbuf;
  106. }
  107. void loadSlaves()
  108. {
  109. FILE *slavesFile = fopen(slavesfile.get(), "rt");
  110. if( !slavesFile) {
  111. const char * s = slavesfile.get();
  112. while (*s&&(isdigit(*s)||(*s=='.')||(*s==',')||(*s==':')||(*s=='-')||(*s=='*')))
  113. s++;
  114. if (!*s) {
  115. SocketEndpointArray sa;
  116. sa.fromText(slavesfile.get(),0);
  117. if (sa.ordinality()) {
  118. StringBuffer ns;
  119. ForEachItemIn(i,sa) {
  120. sa.item(i).getIpText(ns.clear());
  121. slaves.append(ns.str());
  122. }
  123. return;
  124. }
  125. }
  126. throw MakeStringException(-1, "Failed to open slaves file %s", slavesfile.get());
  127. }
  128. char inbuf[1000];
  129. StringAttr slave;
  130. while (fgets( inbuf, sizeof(inbuf), slavesFile)) {
  131. char *hash = strchr(inbuf, '#');
  132. if (hash)
  133. *hash = 0;
  134. char *finger = inbuf;
  135. loop {
  136. while (isspace(*finger))
  137. finger++;
  138. char *start = finger;
  139. while (*finger && !isspace(*finger))
  140. finger++;
  141. if (finger > start) {
  142. slave.set(start, finger - start);
  143. slaves.append(slave);
  144. }
  145. else
  146. break;
  147. }
  148. }
  149. fclose(slavesFile);
  150. }
  151. public:
  152. IMPLEMENT_IINTERFACE;
  153. CFRunSSH()
  154. {
  155. numthreads = 5;
  156. connecttimeout = 0; // no timeout
  157. attempts = 3;
  158. background = false;
  159. strict = false;
  160. verbose = false;
  161. dryrun = false;
  162. replicationoffset = 0;
  163. }
  164. void init(int argc,char * argv[])
  165. {
  166. numthreads = 10;
  167. connecttimeout = 0; // no timeout
  168. attempts = 3;
  169. background = false;
  170. strict = false;
  171. verbose = false;
  172. dryrun = false;
  173. useplink = false;
  174. for (int i=1; i<argc; i++) {
  175. const char *arg = argv[i];
  176. if (arg[0]=='-') {
  177. arg++;
  178. const char *parm = (arg[1]==':')?(arg+2):(arg+1);
  179. switch (toupper(*arg)) {
  180. case 'N':
  181. numthreads = *parm?atoi(parm):numthreads;
  182. break;
  183. case 'T':
  184. if (toupper(arg[1])=='R') {
  185. parm = (arg[2]==':')?(arg+3):(arg+2);
  186. treeroot.set(parm);
  187. break;
  188. }
  189. connecttimeout=*parm?atoi(parm):connecttimeout;
  190. break;
  191. case 'A':
  192. attempts=*parm?atoi(parm):attempts;
  193. break;
  194. case 'I':
  195. identityfile.set(parm);
  196. break;
  197. case 'U':
  198. user.set(parm);
  199. break;
  200. case 'D':
  201. if (*parm)
  202. workdir.set(parm);
  203. else
  204. dryrun = true;
  205. break;
  206. case 'S':
  207. strict = true;
  208. break;
  209. case 'B':
  210. background = true;
  211. break;
  212. case 'V':
  213. verbose = true;
  214. break;
  215. case 'O':
  216. replicationoffset = atoi(parm);
  217. break;
  218. case 'P':
  219. #ifdef _WIN32
  220. if (toupper(arg[1])=='L') {
  221. useplink = true;
  222. break;
  223. }
  224. #endif
  225. parm = (arg[2]==':')?(arg+3):(arg+2);
  226. if (!*parm)
  227. break;
  228. if (toupper(arg[1])=='W') {
  229. StringBuffer buf;
  230. encrypt(buf,parm);
  231. password.set(buf.str());
  232. break;
  233. }
  234. else if (toupper(arg[1])=='E') {
  235. password.set(parm);
  236. break;
  237. }
  238. // continue
  239. default:
  240. throw MakeStringException(-1,"Unknown option %s",argv[i]);
  241. }
  242. }
  243. else {
  244. if (slavesfile.isEmpty()) {
  245. slavesfile.set(argv[i]);
  246. loadSlaves();
  247. }
  248. else if (cmd.isEmpty())
  249. cmd.set(argv[i]);
  250. else
  251. throw MakeStringException(-1,"Unknown parameter %s",argv[i]);
  252. }
  253. }
  254. if (dryrun||(numthreads<=0))
  255. numthreads=1;
  256. if (!identityfile.isEmpty()&&!checkFileExists(identityfile.get()))
  257. throw MakeStringException(-1,"Cannot find identity file: %s",identityfile.get());
  258. if (!password.isEmpty()&&!identityfile.isEmpty()) {
  259. WARNLOG("SSH identity file specified, ignoring password");
  260. password.clear();
  261. }
  262. }
  263. void init(
  264. const char *cmdline,
  265. const char *identfilename,
  266. const char *username,
  267. const char *passwordenc,
  268. unsigned timeout,
  269. unsigned retries)
  270. {
  271. strict = false;
  272. verbose = false;
  273. numthreads = 1;
  274. connecttimeout=timeout;
  275. attempts=retries;
  276. #ifdef _WIN32
  277. identityfile.set(identfilename);
  278. #else
  279. if (identfilename&&*identfilename) {
  280. wordexp_t exp_result; // expand ~ etc
  281. wordexp(identfilename, &exp_result, 0);
  282. identityfile.set(exp_result.we_wordv[0]);
  283. wordfree(&exp_result);
  284. }
  285. else
  286. identityfile.clear();
  287. #endif
  288. user.set(username);
  289. password.set(passwordenc);
  290. cmd.set(cmdline);
  291. }
  292. unsigned log2(unsigned n)
  293. {
  294. assertex(n);
  295. unsigned ret=0;
  296. while (n>1) {
  297. ret++;
  298. n /= 2;
  299. }
  300. return ret;
  301. }
  302. unsigned pow2(unsigned n)
  303. {
  304. unsigned ret=1;
  305. while (n--)
  306. ret *= 2;
  307. return ret;
  308. }
  309. unsigned treesrc(unsigned n)
  310. {
  311. return n-pow2(log2(n));
  312. }
  313. void exec(unsigned i,unsigned treefrom)
  314. {
  315. {
  316. CriticalBlock block1(sect);
  317. if (!dryrun) {
  318. if (slaves.ordinality()>1)
  319. PROGLOG("%d: starting %s (%d of %d finished)",i,slaves.item(i),done.ordinality(),slaves.ordinality());
  320. }
  321. }
  322. int retcode=-1;
  323. StringBuffer outbuf;
  324. try {
  325. bool usepssh = false;
  326. StringBuffer cmdline;
  327. if (!password.isEmpty()) {
  328. #ifdef _WIN32
  329. if (useplink) {
  330. cmdline.append("plink -ssh -batch ");
  331. if (!user.isEmpty())
  332. cmdline.append(" -l ").append(user);
  333. StringBuffer tmp;
  334. decrypt(tmp,password);
  335. cmdline.append(" -pw ").append(tmp);
  336. cmdline.append(' ').append(slaves.item(i)).append(' ');
  337. #ifdef PLINK_USE_CMD
  338. // bit of a kludge
  339. cmdline.append("cmd /c \"");
  340. const char *dir = cmd.get();
  341. const char *s = dir;
  342. const char *e = NULL;
  343. while (*s>' ') {
  344. if (*s=='\\')
  345. e = s;
  346. s++;
  347. }
  348. #endif
  349. expandCmd(cmdline,i,treefrom);
  350. #ifdef PLINK_USE_CMD
  351. cmdline.append('"');
  352. #endif
  353. }
  354. else {
  355. // windows use psexec
  356. cmdline.append("psexec \\\\").append(slaves.item(i));
  357. if (!user.isEmpty())
  358. cmdline.append(" -u ").append(user);
  359. StringBuffer tmp;
  360. decrypt(tmp,password);
  361. cmdline.append(" -p ").append(tmp);
  362. if (background)
  363. cmdline.append("-d ");
  364. cmdline.append(' ');
  365. expandCmd(cmdline,i,treefrom);
  366. }
  367. #else
  368. // linux use pssh
  369. usepssh = true;
  370. #endif
  371. }
  372. if (cmdline.length()==0) {
  373. // ssh
  374. cmdline.appendf("%s -n -o LogLevel=ERROR -o StrictHostKeyChecking=%s ",usepssh?"pssh":"ssh",strict?"yes":"no");
  375. if (!usepssh)
  376. cmdline.append("-o BatchMode=yes ");
  377. if (!identityfile.isEmpty())
  378. cmdline.appendf("-i %s ",identityfile.get());
  379. if (background)
  380. cmdline.append("-f ");
  381. if (connecttimeout)
  382. cmdline.appendf("-o ConnectTimeout=%d ",connecttimeout);
  383. if (attempts)
  384. cmdline.appendf("-o ConnectionAttempts=%d ",attempts);
  385. if (usepssh) {
  386. StringBuffer tmp;
  387. decrypt(tmp,password);
  388. cmdline.appendf("-o password=%s ",tmp.str());
  389. }
  390. if (!user.isEmpty())
  391. cmdline.appendf("%s@",user.get());
  392. cmdline.appendf("%s \"",slaves.item(i));
  393. expandCmd(cmdline,i,treefrom);
  394. cmdline.append('"');
  395. }
  396. if (dryrun)
  397. printf("%s\n",cmdline.str());
  398. else {
  399. Owned<IPipeProcess> pipe = createPipeProcess();
  400. if (pipe->run((verbose&&!usepssh)?"FRUNSSH":NULL,cmdline.str(),workdir,
  401. useplink, // for some reason plink needs input handle
  402. true,true)) {
  403. byte buf[4096];
  404. loop {
  405. size32_t read = pipe->read(sizeof(buf),buf);
  406. if (!read)
  407. break;
  408. outbuf.append(read,(const char *)buf);
  409. }
  410. retcode = pipe->wait();
  411. bool firsterr=true;
  412. loop {
  413. size32_t read = pipe->readError(sizeof(buf),buf);
  414. if (!read)
  415. break;
  416. if (firsterr) {
  417. firsterr = false;
  418. if (outbuf.length())
  419. outbuf.append('\n');
  420. outbuf.append("ERR: ");
  421. }
  422. outbuf.append(read,(const char *)buf);
  423. }
  424. }
  425. }
  426. }
  427. catch (IException *e) {
  428. e->errorMessage(outbuf);
  429. retcode = -2;
  430. }
  431. CriticalBlock block(sect);
  432. done.append(i);
  433. replytext.append(outbuf.str());
  434. reply.append((unsigned)retcode);
  435. }
  436. void exec()
  437. {
  438. if (!treeroot.isEmpty()) {
  439. // remove from slaves
  440. ForEachItemInRev(i,slaves)
  441. if (strcmp(slaves.item(i),treeroot)==0)
  442. slaves.remove(i);
  443. }
  444. if (slaves.ordinality()==0)
  445. return;
  446. class cRun: public CAsyncFor
  447. {
  448. bool treemode;
  449. CFRunSSH &parent;
  450. Semaphore *treesem;
  451. public:
  452. cRun(CFRunSSH &_parent)
  453. : parent(_parent)
  454. {
  455. treemode = !parent.treeroot.isEmpty();
  456. if (treemode) {
  457. treesem = new Semaphore[parent.slaves.ordinality()+1]; // don't actually use all
  458. treesem[0].signal();
  459. }
  460. else
  461. treesem = NULL;
  462. }
  463. ~cRun()
  464. {
  465. delete [] treesem;
  466. }
  467. void Do(unsigned i)
  468. {
  469. if (treemode) {
  470. unsigned from = parent.treesrc(i+1);
  471. treesem[from].wait();
  472. parent.exec(i,from);
  473. treesem[from].signal();
  474. treesem[i+1].signal();
  475. }
  476. else
  477. parent.exec(i,0);
  478. }
  479. } afor(*this);
  480. afor.For(slaves.ordinality(),(numthreads>slaves.ordinality())?slaves.ordinality():numthreads,!treeroot.isEmpty(),treeroot.isEmpty());
  481. if (dryrun)
  482. return;
  483. if (slaves.ordinality()>1) {
  484. PROGLOG("Results: (%d of %d finished)",done.ordinality(),slaves.ordinality());
  485. for (unsigned i=0;i<done.ordinality();i++) {
  486. unsigned n = done.item(i);
  487. StringBuffer res(replytext.item(n));
  488. while (res.length()&&(res.charAt(res.length()-1)<=' '))
  489. res.setLength(res.length()-1);
  490. if (res.length()==0)
  491. PROGLOG("%d: %s(%d): [OK]",n+1,slaves.item(n),reply.item(n));
  492. else if (strchr(res.str(),'\n')==NULL)
  493. PROGLOG("%d: %s(%d): %s",n+1,slaves.item(n),reply.item(n),res.str());
  494. else
  495. PROGLOG("%d: %s(%d):\n---------------------------\n%s\n===========================",n+1,slaves.item(n),reply.item(n),res.str());
  496. }
  497. }
  498. else {
  499. StringBuffer res(replytext.item(0));
  500. while (res.length()&&(res.charAt(res.length()-1)<=' '))
  501. res.setLength(res.length()-1);
  502. PROGLOG("%s result(%d):\n%s",useplink?"plink":"ssh",reply.item(0),res.str());
  503. }
  504. }
  505. void exec(
  506. const IpAddress &ip,
  507. const char *workdirname,
  508. bool _background)
  509. {
  510. background = _background;
  511. strict = false;
  512. verbose = false;
  513. StringBuffer ips;
  514. ip.getIpText(ips);
  515. slaves.kill();
  516. slaves.append(ips.str());
  517. numthreads = 1;
  518. workdir.set(workdirname);
  519. exec();
  520. }
  521. };
  522. IFRunSSH *createFRunSSH()
  523. {
  524. return new CFRunSSH;
  525. }