123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #include "platform.h"
- #include "portlist.h"
- #include "jlib.hpp"
- #include "jio.hpp"
- #include "jlog.hpp"
- #include "jmutex.hpp"
- #include "jfile.hpp"
- #include "jencrypt.hpp"
- #include "rmtssh.hpp"
- #ifndef _WIN32
- #include <wordexp.h>
- #endif
- static CBuildVersion _bv("$HeadURL: https://svn.br.seisint.com/ecl/trunk/common/remote/rmtssh.cpp $ $Id: rmtssh.cpp 64028 2011-04-14 14:28:10Z nhicks $");
- //----------------------------------------------------------------------------
- //#define PLINK_USE_CMD
- class CFRunSSH: public CInterface, implements IFRunSSH
- {
- unsigned numthreads;
- unsigned connecttimeout;
- unsigned attempts;
- StringAttr cmd;
- StringAttr identityfile;
- StringAttr user;
- StringAttr password; // encrypted
- StringAttr workdir;
- StringAttr slavesfile;
- StringArray slaves;
- StringAttr treeroot;
- StringArray replytext;
- UnsignedArray reply;
- UnsignedArray done;
- bool background;
- bool strict;
- bool verbose;
- bool dryrun;
- bool useplink;
- int replicationoffset;
- CriticalSection sect;
- StringBuffer expandCmd(StringBuffer &cmdbuf, unsigned nodenum, unsigned treefrom)
- {
- const char *cp=cmd.get();
- if (!cp)
- return cmdbuf;
- for (; *cp; cp++) {
- if ((*cp=='%') && cp[1]) {
- cp++;
- switch (*cp) {
- case 'n': // Node number
- cmdbuf.append(nodenum+1);
- break;
- case 'a': // Node address
- cmdbuf.append(slaves.item(nodenum));
- break;
- case 'l': // Node list
- cmdbuf.append(slavesfile);
- break;
- case '%':
- cmdbuf.append('%');
- break;
- case 'x': // Next Node
- cmdbuf.append(slaves.item((nodenum+replicationoffset)%slaves.ordinality()));
- break;
- case 't': // Tree Node
- if (treefrom)
- cmdbuf.append(slaves.item(treefrom-1));
- else
- cmdbuf.append(treeroot);
- break;
- case 's': { // ssh params
- bool usepssh = !password.isEmpty();
- cmdbuf.appendf("%s -o LogLevel=ERROR -o StrictHostKeyChecking=%s -o BatchMode=yes ",usepssh?"pssh":"ssh",strict?"yes":"no");
- if (!identityfile.isEmpty())
- cmdbuf.appendf("-i %s ",identityfile.get());
- if (background)
- cmdbuf.append("-f ");
- if (connecttimeout)
- cmdbuf.appendf("-o ConnectTimeout=%d ",connecttimeout);
- if (attempts)
- cmdbuf.appendf("-o ConnectionAttempts=%d ",attempts);
- if (!user.isEmpty())
- cmdbuf.appendf("-l %s ",user.get());
- }
- break;
- default: // treat as literal (?)
- cmdbuf.append('%').append(*cp);
- break;
- }
- }
- else
- cmdbuf.append(*cp);
- }
- return cmdbuf;
- }
- void loadSlaves()
- {
- FILE *slavesFile = fopen(slavesfile.get(), "rt");
- if( !slavesFile) {
- const char * s = slavesfile.get();
- while (*s&&(isdigit(*s)||(*s=='.')||(*s==',')||(*s==':')||(*s=='-')||(*s=='*')))
- s++;
- if (!*s) {
- SocketEndpointArray sa;
- sa.fromText(slavesfile.get(),0);
- if (sa.ordinality()) {
- StringBuffer ns;
- ForEachItemIn(i,sa) {
- sa.item(i).getIpText(ns.clear());
- slaves.append(ns.str());
- }
- return;
- }
- }
- throw MakeStringException(-1, "Failed to open slaves file %s", slavesfile.get());
- }
- char inbuf[1000];
- StringAttr slave;
- while (fgets( inbuf, sizeof(inbuf), slavesFile)) {
- char *hash = strchr(inbuf, '#');
- if (hash)
- *hash = 0;
- char *finger = inbuf;
- loop {
- while (isspace(*finger))
- finger++;
- char *start = finger;
- while (*finger && !isspace(*finger))
- finger++;
- if (finger > start) {
- slave.set(start, finger - start);
- slaves.append(slave);
- }
- else
- break;
- }
- }
- fclose(slavesFile);
- }
- public:
- IMPLEMENT_IINTERFACE;
- CFRunSSH()
- {
- numthreads = 5;
- connecttimeout = 0; // no timeout
- attempts = 3;
- background = false;
- strict = false;
- verbose = false;
- dryrun = false;
- replicationoffset = 0;
- }
- void init(int argc,char * argv[])
- {
- numthreads = 10;
- connecttimeout = 0; // no timeout
- attempts = 3;
- background = false;
- strict = false;
- verbose = false;
- dryrun = false;
- useplink = false;
- for (int i=1; i<argc; i++) {
- const char *arg = argv[i];
- if (arg[0]=='-') {
- arg++;
- const char *parm = (arg[1]==':')?(arg+2):(arg+1);
- switch (toupper(*arg)) {
- case 'N':
- numthreads = *parm?atoi(parm):numthreads;
- break;
- case 'T':
- if (toupper(arg[1])=='R') {
- parm = (arg[2]==':')?(arg+3):(arg+2);
- treeroot.set(parm);
- break;
- }
- connecttimeout=*parm?atoi(parm):connecttimeout;
- break;
- case 'A':
- attempts=*parm?atoi(parm):attempts;
- break;
- case 'I':
- identityfile.set(parm);
- break;
- case 'U':
- user.set(parm);
- break;
- case 'D':
- if (*parm)
- workdir.set(parm);
- else
- dryrun = true;
- break;
- case 'S':
- strict = true;
- break;
- case 'B':
- background = true;
- break;
- case 'V':
- verbose = true;
- break;
- case 'O':
- replicationoffset = atoi(parm);
- break;
- case 'P':
- #ifdef _WIN32
- if (toupper(arg[1])=='L') {
- useplink = true;
- break;
- }
- #endif
- parm = (arg[2]==':')?(arg+3):(arg+2);
- if (!*parm)
- break;
- if (toupper(arg[1])=='W') {
- StringBuffer buf;
- encrypt(buf,parm);
- password.set(buf.str());
- break;
- }
- else if (toupper(arg[1])=='E') {
- password.set(parm);
- break;
- }
- // continue
- default:
- throw MakeStringException(-1,"Unknown option %s",argv[i]);
- }
- }
- else {
- if (slavesfile.isEmpty()) {
- slavesfile.set(argv[i]);
- loadSlaves();
- }
- else if (cmd.isEmpty())
- cmd.set(argv[i]);
- else
- throw MakeStringException(-1,"Unknown parameter %s",argv[i]);
- }
- }
- if (dryrun||(numthreads<=0))
- numthreads=1;
- if (!identityfile.isEmpty()&&!checkFileExists(identityfile.get()))
- throw MakeStringException(-1,"Cannot find identity file: %s",identityfile.get());
- if (!password.isEmpty()&&!identityfile.isEmpty()) {
- WARNLOG("SSH identity file specified, ignoring password");
- password.clear();
- }
- }
- void init(
- const char *cmdline,
- const char *identfilename,
- const char *username,
- const char *passwordenc,
- unsigned timeout,
- unsigned retries)
- {
- strict = false;
- verbose = false;
- numthreads = 1;
- connecttimeout=timeout;
- attempts=retries;
- #ifdef _WIN32
- identityfile.set(identfilename);
- #else
- if (identfilename&&*identfilename) {
- wordexp_t exp_result; // expand ~ etc
- wordexp(identfilename, &exp_result, 0);
- identityfile.set(exp_result.we_wordv[0]);
- wordfree(&exp_result);
- }
- else
- identityfile.clear();
- #endif
- user.set(username);
- password.set(passwordenc);
- cmd.set(cmdline);
- }
- unsigned log2(unsigned n)
- {
- assertex(n);
- unsigned ret=0;
- while (n>1) {
- ret++;
- n /= 2;
- }
- return ret;
- }
- unsigned pow2(unsigned n)
- {
- unsigned ret=1;
- while (n--)
- ret *= 2;
- return ret;
- }
- unsigned treesrc(unsigned n)
- {
- return n-pow2(log2(n));
- }
-
- void exec(unsigned i,unsigned treefrom)
- {
- {
- CriticalBlock block1(sect);
- if (!dryrun) {
- if (slaves.ordinality()>1)
- PROGLOG("%d: starting %s (%d of %d finished)",i,slaves.item(i),done.ordinality(),slaves.ordinality());
- }
- }
- int retcode=-1;
- StringBuffer outbuf;
- try {
- bool usepssh = false;
- StringBuffer cmdline;
- if (!password.isEmpty()) {
- #ifdef _WIN32
- if (useplink) {
- cmdline.append("plink -ssh -batch ");
- if (!user.isEmpty())
- cmdline.append(" -l ").append(user);
- StringBuffer tmp;
- decrypt(tmp,password);
- cmdline.append(" -pw ").append(tmp);
- cmdline.append(' ').append(slaves.item(i)).append(' ');
- #ifdef PLINK_USE_CMD
- // bit of a kludge
- cmdline.append("cmd /c \"");
- const char *dir = cmd.get();
- const char *s = dir;
- const char *e = NULL;
- while (*s>' ') {
- if (*s=='\\')
- e = s;
- s++;
- }
- #endif
- expandCmd(cmdline,i,treefrom);
- #ifdef PLINK_USE_CMD
- cmdline.append('"');
- #endif
- }
- else {
- // windows use psexec
- cmdline.append("psexec \\\\").append(slaves.item(i));
- if (!user.isEmpty())
- cmdline.append(" -u ").append(user);
- StringBuffer tmp;
- decrypt(tmp,password);
- cmdline.append(" -p ").append(tmp);
- if (background)
- cmdline.append("-d ");
- cmdline.append(' ');
- expandCmd(cmdline,i,treefrom);
- }
- #else
- // linux use pssh
- usepssh = true;
- #endif
- }
- if (cmdline.length()==0) {
- // ssh
- cmdline.appendf("%s -n -o LogLevel=ERROR -o StrictHostKeyChecking=%s ",usepssh?"pssh":"ssh",strict?"yes":"no");
- if (!usepssh)
- cmdline.append("-o BatchMode=yes ");
- if (!identityfile.isEmpty())
- cmdline.appendf("-i %s ",identityfile.get());
- if (background)
- cmdline.append("-f ");
- if (connecttimeout)
- cmdline.appendf("-o ConnectTimeout=%d ",connecttimeout);
- if (attempts)
- cmdline.appendf("-o ConnectionAttempts=%d ",attempts);
- if (usepssh) {
- StringBuffer tmp;
- decrypt(tmp,password);
- cmdline.appendf("-o password=%s ",tmp.str());
- }
- if (!user.isEmpty())
- cmdline.appendf("%s@",user.get());
- cmdline.appendf("%s \"",slaves.item(i));
- expandCmd(cmdline,i,treefrom);
- cmdline.append('"');
- }
- if (dryrun)
- printf("%s\n",cmdline.str());
- else {
- Owned<IPipeProcess> pipe = createPipeProcess();
- if (pipe->run((verbose&&!usepssh)?"FRUNSSH":NULL,cmdline.str(),workdir,
- useplink, // for some reason plink needs input handle
- true,true)) {
- byte buf[4096];
- loop {
- size32_t read = pipe->read(sizeof(buf),buf);
- if (!read)
- break;
- outbuf.append(read,(const char *)buf);
- }
- retcode = pipe->wait();
- bool firsterr=true;
- loop {
- size32_t read = pipe->readError(sizeof(buf),buf);
- if (!read)
- break;
- if (firsterr) {
- firsterr = false;
- if (outbuf.length())
- outbuf.append('\n');
- outbuf.append("ERR: ");
- }
- outbuf.append(read,(const char *)buf);
- }
- }
- }
- }
- catch (IException *e) {
- e->errorMessage(outbuf);
- retcode = -2;
- }
- CriticalBlock block(sect);
- done.append(i);
- replytext.append(outbuf.str());
- reply.append((unsigned)retcode);
- }
- void exec()
- {
- if (!treeroot.isEmpty()) {
- // remove from slaves
- ForEachItemInRev(i,slaves)
- if (strcmp(slaves.item(i),treeroot)==0)
- slaves.remove(i);
- }
- if (slaves.ordinality()==0)
- return;
- class cRun: public CAsyncFor
- {
- bool treemode;
- CFRunSSH &parent;
- Semaphore *treesem;
- public:
- cRun(CFRunSSH &_parent)
- : parent(_parent)
- {
- treemode = !parent.treeroot.isEmpty();
- if (treemode) {
- treesem = new Semaphore[parent.slaves.ordinality()+1]; // don't actually use all
- treesem[0].signal();
- }
- else
- treesem = NULL;
- }
- ~cRun()
- {
- delete [] treesem;
- }
- void Do(unsigned i)
- {
- if (treemode) {
- unsigned from = parent.treesrc(i+1);
- treesem[from].wait();
- parent.exec(i,from);
- treesem[from].signal();
- treesem[i+1].signal();
- }
- else
- parent.exec(i,0);
- }
- } afor(*this);
- afor.For(slaves.ordinality(),(numthreads>slaves.ordinality())?slaves.ordinality():numthreads,!treeroot.isEmpty(),treeroot.isEmpty());
- if (dryrun)
- return;
- if (slaves.ordinality()>1) {
- PROGLOG("Results: (%d of %d finished)",done.ordinality(),slaves.ordinality());
- for (unsigned i=0;i<done.ordinality();i++) {
- unsigned n = done.item(i);
- StringBuffer res(replytext.item(n));
- while (res.length()&&(res.charAt(res.length()-1)<=' '))
- res.setLength(res.length()-1);
- if (res.length()==0)
- PROGLOG("%d: %s(%d): [OK]",n+1,slaves.item(n),reply.item(n));
- else if (strchr(res.str(),'\n')==NULL)
- PROGLOG("%d: %s(%d): %s",n+1,slaves.item(n),reply.item(n),res.str());
- else
- PROGLOG("%d: %s(%d):\n---------------------------\n%s\n===========================",n+1,slaves.item(n),reply.item(n),res.str());
- }
- }
- else {
- StringBuffer res(replytext.item(0));
- while (res.length()&&(res.charAt(res.length()-1)<=' '))
- res.setLength(res.length()-1);
- PROGLOG("%s result(%d):\n%s",useplink?"plink":"ssh",reply.item(0),res.str());
- }
- }
- void exec(
- const IpAddress &ip,
- const char *workdirname,
- bool _background)
- {
- background = _background;
- strict = false;
- verbose = false;
- StringBuffer ips;
- ip.getIpText(ips);
- slaves.kill();
- slaves.append(ips.str());
- numthreads = 1;
- workdir.set(workdirname);
- exec();
- }
- };
- IFRunSSH *createFRunSSH()
- {
- return new CFRunSSH;
- }
|