frunagent.cpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <stdio.h>
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <stddef.h>
  18. #include <stdlib.h>
  19. #include <assert.h>
  20. #include <jencrypt.hpp>
  21. #include "jlib.hpp"
  22. #include "jsem.hpp"
  23. #include "jthread.hpp"
  24. #include "hrpcsock.hpp"
  25. #include "hrpcutil.hpp"
  26. #include "homisc.hpp"
  27. #include "hagent.hpp"
  28. #include "portlist.h"
  29. #ifdef _WIN32
  30. #include <Winsock2.h>
  31. #endif
  32. #include "jsocket.hpp"
  33. #include "hodisp_base.hpp"
  34. #ifdef _WIN32
  35. const char* statcmd = "cmd /c dir c:\\hoagent.txt";
  36. #else
  37. const char* statcmd = "pwd";
  38. #endif
  39. int num_threads=25;
  40. char * unscr(char * s)
  41. {
  42. int j=0;
  43. while (s[j])
  44. {
  45. if (s[j]==0x55)
  46. throw_error ("Invalid (upper case U) in username/password");
  47. s[j++] ^=0x55;
  48. }
  49. return s;
  50. }
  51. static int _nn = 1;
  52. class cmd_dispatch : public c_dispatch
  53. {
  54. public :
  55. int argc;
  56. int node_number;
  57. char ** argv;
  58. char * name;
  59. char *next;
  60. virtual void action();
  61. cmd_dispatch(int _argc,char * _argv[],const char * _name) { argc=_argc; argv=_argv; name=strdup(_name);id=name;node_number=_nn++; next = NULL;};
  62. cmd_dispatch(int _argc,char * _argv[],const char * _name, const char* _next)
  63. {
  64. argc=_argc;
  65. argv=_argv;
  66. name=strdup(_name);
  67. id=name;
  68. node_number=_nn++;
  69. next = strdup(_next);
  70. };
  71. ~cmd_dispatch()
  72. {
  73. free(name);
  74. if(next)
  75. free(next);
  76. };
  77. };
  78. m_dispatch *disp=0;
  79. class split_node : public split_nodes_base_ex
  80. {
  81. public:
  82. int argc;
  83. char ** argv;
  84. virtual void add_machine(const char * s);
  85. virtual void add_machine_ex(const char * s, const char* nxt);
  86. split_node(int _argc,char * _argv[] ){ argc=_argc; argv=_argv; };
  87. };
  88. int calltimeout=0;
  89. bool encrypted = false;
  90. unsigned replicationoffset = 1;
  91. void cmd_dispatch::action()
  92. {
  93. // printf("%s\n",name);
  94. char x[100];
  95. sprintf(x,"elapsed time : %s",name);
  96. // elapsed_time_trace t(x);
  97. IHRPCtransport * transport = MakeTcpTransportFromUrl(name, HOAGENT_PORT);
  98. hoagent agent;
  99. agent.UseTransport(transport);
  100. transport->SetConnectTimeout(calltimeout?(calltimeout*1000):(num_threads==1?600:0));
  101. transport->SetTimeout(calltimeout?calltimeout:3);
  102. StringBuffer result;
  103. result.append(name).appendf("(%d) ",node_number);
  104. if (stricmp(argv[2], "alive") == 0)
  105. result.append(agent.alive(atoi(argv[3])));
  106. else if (stricmp(argv[2], "start") == 0)
  107. {
  108. StringBuffer cmdbuf;
  109. for (char *cp=argv[3]; *cp; cp++)
  110. {
  111. if (*cp == '%' && *(cp+1))
  112. {
  113. cp++;
  114. switch (*cp)
  115. {
  116. case 'n': // Node number
  117. cmdbuf.append(node_number);
  118. break;
  119. case 'a': // Node address
  120. cmdbuf.append(name);
  121. break;
  122. case 'l': // Node list
  123. cmdbuf.append(argv[1]);
  124. break;
  125. case 'x': // Next Node
  126. if(next != NULL)
  127. cmdbuf.append(next);
  128. break;
  129. default: // treat as literal (?)
  130. cmdbuf.append('%').append(*cp);
  131. break;
  132. }
  133. }
  134. else
  135. cmdbuf.append(*cp);
  136. }
  137. result.append(agent.start_process(cmdbuf.str()));
  138. }
  139. else if (stricmp(argv[2], "stop") == 0)
  140. result.append(agent.stop_process(atoi(argv[3])));
  141. else if (stricmp(argv[2], "account") == 0)
  142. {
  143. transport->SetTimeout(calltimeout?calltimeout:15);
  144. int cd=25;
  145. bool success=false;
  146. while (cd&&!success) {
  147. char * u=unscr(strdup(argv[3]));
  148. StringBuffer pw;
  149. if (encrypted)
  150. decrypt(pw, argv[4]);
  151. else
  152. pw.append(argv[4]);
  153. char *p = unscr(pw.detach());
  154. if (cd>1)
  155. {
  156. try
  157. {
  158. agent.account(u, p, argv[5]);
  159. if (agent.start_process(statcmd) )
  160. success=true;
  161. }
  162. catch (...)
  163. {
  164. }
  165. if (!success)
  166. {
  167. srand(GetCurrentThreadId()+clock());
  168. MilliSleep((rand() * 3000) / RAND_MAX);
  169. }
  170. }
  171. else
  172. {
  173. agent.account(u, p, argv[5]);
  174. if (agent.start_process(statcmd) )
  175. success=true;
  176. }
  177. cd--;
  178. }
  179. if (!success) result.append(" failed"); else result.appendf(" ok (retries=%i)",24-cd);
  180. }
  181. else if (stricmp(argv[2], "dir") == 0)
  182. {
  183. transport->SetTimeout(15);
  184. agent.set_dir(argv[3]);
  185. }
  186. if (result.length()) {
  187. printf("%s\n", result.toCharArray());
  188. }
  189. transport->Release();
  190. }
  191. void split_node::add_machine(const char *n)
  192. {
  193. disp->dispatch(new cmd_dispatch(argc,argv,n));
  194. }
  195. void split_node::add_machine_ex(const char *n, const char *nxt)
  196. {
  197. disp->dispatch(new cmd_dispatch(argc,argv,n,nxt));
  198. }
  199. void setoptions(int argc,char * argv[] )
  200. {
  201. calltimeout=0;
  202. for (int i=1; i<argc; i++)
  203. {
  204. if (argv[i]==stristr(argv[i],"/n"))
  205. {
  206. int c=atoi(&argv[i][2]);
  207. printf("%i threads\n",c);
  208. num_threads=c;
  209. }
  210. else if (argv[i]==stristr(argv[i],"/t"))
  211. {
  212. calltimeout=atoi(&argv[i][2]);
  213. }
  214. else if (argv[i]==stristr(argv[i],"/e"))
  215. {
  216. encrypted=true;
  217. }
  218. else if (argv[i]==stristr(argv[i],"/o"))
  219. {
  220. replicationoffset = atoi(&argv[i][2]);
  221. }
  222. }
  223. if (num_threads<0) num_threads=1;
  224. if (num_threads>200) num_threads=200;
  225. if (!disp) disp=new m_dispatch(num_threads);
  226. }
  227. int main( int argc, char *argv[] )
  228. {
  229. int res=0;
  230. if (argc < 3)
  231. {
  232. printf("frunagent <nodelist> start \"command\" [options] \n"
  233. " stop <commandid> [options]\n"
  234. " account <user> <password> <dir> [option]\n"
  235. " dir <dir> [option]\n"
  236. " alive <integer> [option]\n\n"
  237. "where <nodelist> is of the form h009100:h009119,h007010:h007020\n"
  238. "or 192.168.6.100:192.168.6.119,192.168.7.10:192.168.7.20\n"
  239. "or @filename where filename contains a file in the above format\n"
  240. "options: /n<number_of_thread> /t<call_time_out> /encrypt /o<replication_offset>\n"
  241. );
  242. return 255;
  243. }
  244. InitModuleObjects();
  245. StringBuffer tracepath;
  246. tracepath.append(".").append(PATHSEPCHAR).append("frunagent.txt");
  247. settrace(tracepath.str(),false);
  248. ECHO_TO_CONSOLE=true;
  249. try
  250. {
  251. setoptions(argc,argv);
  252. split_node x(argc,argv);
  253. if (argv[1][0]=='@')
  254. {
  255. StringBuffer b;
  256. b.loadFile(argv[1]+1);
  257. char *finger = (char *) b.str();
  258. while (*finger)
  259. {
  260. if (*finger == '\n')
  261. *finger++ = ';';
  262. else if (*finger == '#')
  263. {
  264. while (*finger && *finger != '\n')
  265. *finger++ = ' ';
  266. }
  267. else
  268. finger++;
  269. }
  270. x.split_nodes_ex(b.str(),replicationoffset);
  271. }
  272. else
  273. x.split_nodes_ex(argv[1],replicationoffset);
  274. disp->all_done_ex(false);
  275. }
  276. catch(IException *e)
  277. {
  278. pexception("",e);
  279. e->Release();
  280. res=255;
  281. }
  282. catch (...)
  283. {
  284. traceft("Caught unknown exception");
  285. }
  286. #ifdef _TRACING
  287. traceflush();
  288. #endif
  289. if (disp) delete disp;
  290. return res;
  291. }