dafscontrol.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jio.hpp"
  16. #include "jmisc.hpp"
  17. #include "jsuperhash.hpp"
  18. #include "mpbase.hpp"
  19. #include "mpcomm.hpp"
  20. #include "daclient.hpp"
  21. #include "dadfs.hpp"
  22. #include "dafdesc.hpp"
  23. #include "dasds.hpp"
  24. #include "rmtfile.hpp"
  25. #include "rmtclient.hpp"
  26. #include "dalienv.hpp"
  27. //#define TRACE
  28. #define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite
  29. void usage()
  30. {
  31. printf("DAFSCONTROL usage:\n");
  32. printf(" dafscontrol [<dali-ip>] STOP <ip-or-cluster>\n");
  33. printf(" dafscontrol [<dali-ip>] CHECK <ip-or-cluster>\n");
  34. printf(" dafscontrol [<dali-ip>] VER <ip-or-cluster>\n");
  35. printf(" dafscontrol [<dali-ip>] CHECKVER <ip-or-cluster>\n");
  36. printf(" dafscontrol [<dali-ip>] STOPVER <ip-or-cluster>\n");
  37. printf(" dafscontrol [<dali-ip>] CHECKVERMAJOR <ip-or-cluster>\n");
  38. printf(" dafscontrol [<dali-ip>] TRACE <ip> <num>\n");
  39. printf(" dafscontrol [<dali-ip>] CHKDSK <ip> <num>\n");
  40. printf(" dafscontrol [<dali-ip>] INFO <ip-or-clsuter> [level]\n");
  41. printf(" dafscontrol [<dali-ip>] THROTTLE <ip-or-cluster> <class> <limit> <ms-delay> <cpu-limit> <queue-limit>\n");
  42. printf(" dafscontrol MYVER\n");
  43. exit(1);
  44. }
  45. const char * queryDaFileSrvExecutable(const IpAddress &ip, StringBuffer &ret)
  46. {
  47. StringBuffer dir; // not currently used
  48. return querySlaveExecutable("DaFileSrvProcess", "dafilesrv", NULL, ip, ret, dir);
  49. }
  50. //extern REMOTE_API unsigned getDaliServixVersion(IpAddress &ip,StringBuffer &ver);
  51. enum ApplyMode
  52. {
  53. AMcheckver, // check version failures to result and resultstr version
  54. AMcheckvermajor, // only check major version
  55. AMcheck, //
  56. AMstop, // unconditional stop - stopped to results and resultstr version
  57. AMstopver, // stop if not current version
  58. AMver, // return versions of each
  59. AMmax
  60. };
  61. bool getCluster(const char *clustername,SocketEndpointArray &eps)
  62. {
  63. Owned<IGroup> grp = queryNamedGroupStore().lookup(clustername);
  64. if (grp.get()==NULL)
  65. return false;
  66. unsigned n = grp->ordinality();
  67. unsigned p = getDaliServixPort();
  68. for (unsigned i=0;i<n;i++) {
  69. SocketEndpoint ep(p,grp->queryNode(i).endpoint());
  70. eps.appendUniq(ep);
  71. }
  72. return eps.ordinality()!=0;
  73. }
  74. unsigned applyNodes(const char *grpip, ApplyMode mode, unsigned ver, bool isdali, bool quiet)
  75. {
  76. SocketEndpointArray eps;
  77. if (isdali&&(stricmp(grpip,"all")==0)) {
  78. Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
  79. if (!conn)
  80. return 0;
  81. IPropertyTree* root = conn->queryRoot();
  82. Owned<IPropertyTreeIterator> clusters= root->getElements("ThorCluster");
  83. unsigned ret = 0;
  84. if (clusters->first()) {
  85. do {
  86. IPropertyTree &cluster = clusters->query();
  87. ret += applyNodes(cluster.queryProp("@name"),mode,ver,true,quiet);
  88. } while (clusters->next());
  89. }
  90. return ret;
  91. }
  92. SocketEndpointArray result;
  93. StringAttrArray resultstr;
  94. if (!isdali||!getCluster(grpip,eps)) {
  95. SocketEndpoint ep(grpip);
  96. if (ep.isNull()) {
  97. UERRLOG("%s is not a group name or ip",grpip);
  98. return 0;
  99. }
  100. if (ep.port==0)
  101. ep.port = getDaliServixPort();
  102. eps.append(ep);
  103. }
  104. IPointerArrayOf<ISocket> sockets;
  105. unsigned to=10*1000;
  106. unsigned n=eps.ordinality(); // use approx log scale (timeout is long but only for failure situation)
  107. while (n>1) {
  108. n/=2;
  109. to+=10*1000;
  110. }
  111. if (!quiet&&(n>1))
  112. PROGLOG("Scanning %s...",grpip);
  113. multiConnect(eps,sockets,to);
  114. CriticalSection sect;
  115. class casyncfor: public CAsyncFor
  116. {
  117. SocketEndpointArray &eps;
  118. IPointerArrayOf<ISocket> &sockets;
  119. ApplyMode mode;
  120. unsigned ver;
  121. SocketEndpointArray &result;
  122. StringAttrArray &resultstr;
  123. CriticalSection &sect;
  124. public:
  125. casyncfor(ApplyMode _mode, unsigned _ver,SocketEndpointArray &_eps,IPointerArrayOf<ISocket> &_sockets,SocketEndpointArray &_result, StringAttrArray &_resultstr,CriticalSection &_sect)
  126. : eps(_eps), sockets(_sockets), result(_result), resultstr(_resultstr), sect(_sect)
  127. {
  128. mode = _mode;
  129. ver = _ver;
  130. }
  131. void Do(unsigned i)
  132. {
  133. ISocket *sock = sockets.item(i);
  134. StringBuffer epstr;
  135. SocketEndpoint ep = eps.item(i);
  136. ep.getUrlStr(epstr);
  137. // PROGLOG("T.1 %s %x",epstr.str(),(unsigned)sock);
  138. StringBuffer verstr;
  139. unsigned rver=0;
  140. if (sock) {
  141. rver = getRemoteVersion(sock, verstr);
  142. switch (mode) {
  143. case AMcheck:
  144. if (rver!=0)
  145. return;
  146. case AMver: {
  147. CriticalBlock block(sect);
  148. result.append(ep);
  149. StringBuffer ln;
  150. ln.append(rver).append(",\"").append(verstr).append('"');
  151. resultstr.append(* new StringAttrItem(ln.str()));
  152. }
  153. return;
  154. case AMstopver:
  155. case AMcheckver:
  156. case AMcheckvermajor: {
  157. // compares versions up to the '-'
  158. const char *rv = verstr.str();
  159. const char *v = DAFILESRV_VERSIONSTRING;
  160. if (mode!=AMcheckvermajor) {
  161. while (*v&&(*v!='-')&&(*v==*rv)) {
  162. v++;
  163. rv++;
  164. }
  165. }
  166. if ((*rv==*v)&&(rver==ver))
  167. return;
  168. while (*rv&&(*rv!='-'))
  169. rv++;
  170. verstr.setLength(rv-verstr.str());
  171. if ((mode==AMcheckver)||(mode==AMcheckvermajor))
  172. break;
  173. }
  174. // fall through
  175. case AMstop:
  176. {
  177. unsigned err = stopRemoteServer(sock);
  178. if (err!=0) {
  179. UERRLOG("Could not stop server on %s, %d returned",epstr.str(),err);
  180. if (mode!=AMstopver)
  181. return; // even though failed to stop - still return code
  182. }
  183. else
  184. Sleep(1000); // let stop
  185. }
  186. break;
  187. default:
  188. return;
  189. }
  190. }
  191. CriticalBlock block(sect);
  192. result.append(ep);
  193. if ((mode!=AMver)&&(mode!=AMcheckver)&&(mode!=AMcheckvermajor)&&(mode!=AMstopver))
  194. resultstr.append(* new StringAttrItem(""));
  195. else
  196. resultstr.append(* new StringAttrItem(verstr.str()));
  197. }
  198. } afor(mode,ver,eps,sockets,result,resultstr,sect);
  199. afor.For(eps.ordinality(), 10, false, true);
  200. if (result.ordinality()==0)
  201. return 0;
  202. switch (mode) {
  203. case AMstopver:
  204. case AMcheckver:
  205. case AMcheckvermajor:
  206. if (!quiet) {
  207. StringBuffer epstr;
  208. ForEachItemIn(i,result) {
  209. result.item(i).getUrlStr(epstr.clear());
  210. StringAttrItem &attr = resultstr.item(i);
  211. if (attr.text.length()==0)
  212. UERRLOG("%s: %s not running DAFILESRV",grpip,epstr.str());
  213. else
  214. UERRLOG("%s: %s %s running DAFILESRV version %s",grpip,(mode==AMstopver)?"was":"is",epstr.str(),attr.text.get());
  215. }
  216. unsigned numok = eps.ordinality()-result.ordinality();
  217. if (mode==AMcheckvermajor)
  218. PROGLOG("%s: %d node%s running version %.1f of DAFILESRV",grpip,numok,(numok!=1)?"s":"",((double)DAFILESRV_VERSION)/10.0);
  219. else {
  220. StringBuffer vs;
  221. const char *v = DAFILESRV_VERSIONSTRING;
  222. while (*v&&(*v!='-'))
  223. vs.append(*(v++));
  224. PROGLOG("%s: %d node%s running version %s of DAFILESRV",grpip,numok,(numok!=1)?"s":"",vs.str());
  225. }
  226. }
  227. break;
  228. case AMver: {
  229. StringBuffer epstr;
  230. unsigned failed=0;
  231. ForEachItemIn(i,result) {
  232. result.item(i).getUrlStr(epstr.clear());
  233. StringAttrItem &attr = resultstr.item(i);
  234. if (attr.text.length()!=0)
  235. PROGLOG("%s,%s,%s",grpip,epstr.str(),attr.text.get());
  236. else
  237. failed++;
  238. }
  239. if (failed&&!quiet)
  240. PROGLOG("%s: %d node%s not running DAFILESRV",grpip,failed,(failed!=1)?"s":"");
  241. }
  242. break;
  243. case AMcheck:
  244. if (!quiet) {
  245. StringBuffer epstr;
  246. ForEachItemIn(i,result) {
  247. result.item(i).getUrlStr(epstr.clear());
  248. UERRLOG("%s: %s not running DAFILESRV",grpip,epstr.str());
  249. }
  250. unsigned numok = eps.ordinality()-result.ordinality();
  251. PROGLOG("%s: %d node%s running DAFILESRV",grpip,numok,(numok!=1)?"s":"");
  252. }
  253. break;
  254. case AMstop: {
  255. if (!quiet)
  256. PROGLOG("%s: %d stopped",grpip, result.ordinality());
  257. }
  258. break;
  259. }
  260. return result.ordinality();
  261. }
  262. struct ReleaseAtomBlock { ~ReleaseAtomBlock() { releaseAtoms(); } };
  263. int main(int argc, char* argv[])
  264. {
  265. ReleaseAtomBlock rABlock;
  266. InitModuleObjects();
  267. if (argc<2) {
  268. usage();
  269. return 0;
  270. }
  271. EnableSEHtoExceptionMapping();
  272. attachStandardFileLogMsgMonitor("dafscontrol.log", NULL, MSGFIELD_STANDARD, MSGAUD_all, MSGCLS_all, TopDetail, false);
  273. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_prefix);
  274. int ret = 0;
  275. try {
  276. unsigned ai=1;
  277. unsigned ac=argc;
  278. bool isdali=false;
  279. bool quiet = false;
  280. if ((ac>2)&&(stricmp(argv[ac-1],"quiet")==0)) {
  281. quiet = true;
  282. ac--;
  283. }
  284. for (;;) {
  285. if (ai>=ac) {
  286. usage();
  287. break;
  288. }
  289. if (stricmp(argv[ai],"myver")==0) {
  290. const char *v = DAFILESRV_VERSIONSTRING;
  291. StringBuffer vs;
  292. if (memicmp(v,"DS V",4)==0)
  293. v += 4;
  294. while (*v&&(*v!=' ')) {
  295. vs.append(*v);
  296. v++;
  297. }
  298. printf("%s\n",vs.str());
  299. break;
  300. }
  301. if (stricmp(argv[ai],"stop")==0) {
  302. if (ai+1>=ac)
  303. usage();
  304. else
  305. applyNodes(argv[ai+1], AMstop, DAFILESRV_VERSION,isdali,quiet);
  306. break;
  307. }
  308. if (stricmp(argv[ai],"stopver")==0) {
  309. if (ai+1>=ac)
  310. usage();
  311. else
  312. applyNodes(argv[ai+1], AMstopver, DAFILESRV_VERSION,isdali,quiet);
  313. break;
  314. }
  315. if (stricmp(argv[ai],"check")==0) {
  316. if (ai+1>=ac)
  317. usage();
  318. else
  319. if (applyNodes(argv[ai+1], AMcheck, DAFILESRV_VERSION,isdali,quiet)>0)
  320. ret = 1;
  321. break;
  322. }
  323. if (stricmp(argv[ai],"checkver")==0) {
  324. if (ai+1>=ac)
  325. usage();
  326. else
  327. if (applyNodes(argv[ai+1], AMcheckver, DAFILESRV_VERSION,isdali,quiet)>0)
  328. ret = 1;
  329. break;
  330. }
  331. if (stricmp(argv[ai],"checkvermajor")==0) {
  332. if (ai+1>=ac)
  333. usage();
  334. else
  335. if (applyNodes(argv[ai+1], AMcheckvermajor, DAFILESRV_VERSION,isdali,quiet)>0)
  336. ret = 1;
  337. break;
  338. }
  339. if (stricmp(argv[ai],"ver")==0) {
  340. if (ai+1>=ac)
  341. usage();
  342. else
  343. applyNodes(argv[ai+1], AMver, DAFILESRV_VERSION,isdali,quiet);
  344. break;
  345. }
  346. if (stricmp(argv[ai],"trace")==0) {
  347. if (ai+2>=ac)
  348. usage();
  349. else {
  350. SocketEndpointArray eps;
  351. if (!isdali||!getCluster(argv[ai+1],eps)) {
  352. SocketEndpoint ep(argv[ai+1]);
  353. int ret = setDafileSvrTraceFlags(ep,(byte)atoi(argv[ai+2]));
  354. if (ret!=0)
  355. UERRLOG("setDafileSvrTraceFlags returned %d",ret);
  356. }
  357. else {
  358. ForEachItemIn(ni,eps) {
  359. SocketEndpoint ep = eps.item(ni);
  360. int ret = setDafileSvrTraceFlags(ep,(byte)atoi(argv[ai+2]));
  361. if (ret!=0)
  362. UERRLOG("setDafileSvrTraceFlags returned %d",ret);
  363. StringBuffer s("done ");
  364. ep.getUrlStr(s);
  365. PROGLOG("%s",s.str());
  366. }
  367. }
  368. }
  369. break;
  370. }
  371. if (stricmp(argv[ai], "info")==0) {
  372. if (ai+1>=ac)
  373. usage();
  374. else {
  375. SocketEndpointArray eps;
  376. StringBuffer errMsg;
  377. unsigned level=1;
  378. if (ac-(ai+1)>1)
  379. level = atoi(argv[ai+2]);
  380. PROGLOG("Info level = %u", level);
  381. if (!isdali||!getCluster(argv[ai+1],eps)) {
  382. SocketEndpoint ep(argv[ai+1]);
  383. StringBuffer epStr;
  384. ep.getUrlStr(epStr);
  385. VStringBuffer result("Info for %s", epStr.str());
  386. int ret = getDafileSvrInfo(ep, level, result);
  387. if (ret!=0)
  388. UERRLOG("getDafileSvrInfo for %s returned %d", epStr.str(), ret);
  389. else
  390. PROGLOG("%s", result.str());
  391. }
  392. else {
  393. ForEachItemIn(ni,eps) {
  394. SocketEndpoint ep = eps.item(ni);
  395. StringBuffer epStr;
  396. ep.getUrlStr(epStr);
  397. VStringBuffer result("Info for %s: ", epStr.str());
  398. int ret = getDafileSvrInfo(ep, level, result);
  399. if (ret!=0)
  400. UERRLOG("getDafileSvrInfo for %s returned %d", epStr.str(), ret);
  401. else
  402. PROGLOG("%s", result.str());
  403. }
  404. }
  405. }
  406. break;
  407. }
  408. if (stricmp(argv[ai],"throttle")==0) {
  409. if (ai+6>=ac)
  410. usage();
  411. else {
  412. SocketEndpointArray eps;
  413. StringBuffer errMsg;
  414. if (!isdali||!getCluster(argv[ai+1],eps)) {
  415. SocketEndpoint ep(argv[ai+1]);
  416. int ret = setDafileSvrThrottleLimit(ep, (ThrottleClass)atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]), atoi(argv[ai+5]), atoi(argv[ai+6]), &errMsg);
  417. if (ret!=0)
  418. UERRLOG("setDafileSvrThrottleLimit returned %d, error = %s", ret, errMsg.str());
  419. }
  420. else {
  421. ForEachItemIn(ni,eps) {
  422. SocketEndpoint ep = eps.item(ni);
  423. int ret = setDafileSvrThrottleLimit(ep, (ThrottleClass)atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]), atoi(argv[ai+5]), atoi(argv[ai+6]), &errMsg.clear());
  424. if (ret!=0)
  425. UERRLOG("setDafileSvrThrottleLimit returned %d, error = %s", ret, errMsg.str());
  426. StringBuffer s("done ");
  427. ep.getUrlStr(s);
  428. PROGLOG("%s",s.str());
  429. }
  430. }
  431. }
  432. break;
  433. }
  434. SocketEndpoint ep;
  435. SocketEndpointArray epa;
  436. ep.set(argv[ai],DALI_SERVER_PORT);
  437. epa.append(ep);
  438. if (ep.isNull()) {
  439. usage();
  440. break;
  441. }
  442. Owned<IGroup> daligroup = createIGroup(epa);
  443. initClientProcess(daligroup, DCR_DaFsControl);
  444. isdali = true;
  445. ai++;
  446. }
  447. closeEnvironment();
  448. if (isdali)
  449. closedownClientProcess();
  450. }
  451. catch (IException *e) {
  452. EXCLOG(e, "DAFSCONTROL");
  453. e->Release();
  454. }
  455. return ret;
  456. }