daliadmin.cpp 124 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "portlist.h"
  15. #include "jlib.hpp"
  16. #include "jmisc.hpp"
  17. #include "jptree.hpp"
  18. #include "jarray.hpp"
  19. #include "jencrypt.hpp"
  20. #include "jregexp.hpp"
  21. #include "jptree.hpp"
  22. #include "jlzw.hpp"
  23. #include "jexcept.hpp"
  24. #include "jset.hpp"
  25. #include "jprop.hpp"
  26. #include "mpbase.hpp"
  27. #include "mpcomm.hpp"
  28. #include "daclient.hpp"
  29. #include "dadiags.hpp"
  30. #include "danqs.hpp"
  31. #include "dadfs.hpp"
  32. #include "dasds.hpp"
  33. #include "dautils.hpp"
  34. #include "daaudit.hpp"
  35. #include "daft.hpp"
  36. #include "rmtfile.hpp"
  37. #include "workunit.hpp"
  38. #include "dllserver.hpp"
  39. #include "seclib.hpp"
  40. #ifdef _WIN32
  41. #include <conio.h>
  42. #else
  43. #define _getch getchar
  44. #define _putch putchar
  45. #endif
  46. #define DEFAULT_DALICONNECT_TIMEOUT 5 // seconds
  47. static unsigned daliConnectTimeoutMs = 5000;
  48. static bool noninteractive=false;
  49. void usage(const char *exe)
  50. {
  51. printf("Usage:\n");
  52. printf(" %s [<daliserver-ip>] <command> { <option> }\n", exe);
  53. printf("\n");
  54. printf("Data store commands:\n");
  55. printf(" export <branchxpath> <destfile>\n");
  56. printf(" import <branchxpath> <srcfile>\n");
  57. printf(" importadd <branchxpath> <srcfile>\n");
  58. printf(" delete <branchxpath>\n");
  59. printf(" set <xpath> <value> -- set single value\n");
  60. printf(" get <xpath> -- get single value\n");
  61. printf(" bget <xpath> <dest-file> -- binary property\n");
  62. printf(" xget <xpath> -- (multi-value tail can have commas)\n");
  63. printf(" wget <xpath> -- (gets all matching xpath)\n");
  64. printf(" add <xpath> [<value>] -- adds new xpath node with optional value\n");
  65. printf(" delv <xpath> -- deletes value\n");
  66. printf(" count <xpath> -- counts xpath matches\n");
  67. printf("\n");
  68. printf("Logical File meta information commands:\n");
  69. printf(" dfsfile <logicalname> -- get meta information for file\n");
  70. printf(" dfspart <logicalname> <part> -- get meta information for part num\n");
  71. printf(" dfscheck -- verify dfs file information is valid\n");
  72. printf(" dfscsv <logicalnamemask> -- get csv info. for files matching mask\n");
  73. printf(" dfsgroup <logicalgroupname> [filename] -- get IPs for logical group (aka cluster). Written to optional filename if provided\n");
  74. printf(" clusternodes <clustername> [filename] -- get IPs for cluster group. Written to optional filename if provided\n");
  75. printf(" dfsls [<logicalname>] [options]-- get list of files within a scope (options=lrs)\n");
  76. printf(" dfsmap <logicalname> -- get part files (primary and replicates)\n");
  77. printf(" dfsexists <logicalname> -- sets return value to 0 if file exists\n");
  78. printf(" dfsparents <logicalname> -- list superfiles containing file\n");
  79. printf(" dfsunlink <logicalname> -- unlinks file from all super parents\n");
  80. printf(" dfsverify <logicalname> -- verifies parts exist, returns 0 if ok\n");
  81. printf(" setprotect <logicalname> <id> -- overwrite protects logical file\n");
  82. printf(" unprotect <logicalname> <id> -- unprotect (if id=* then clear all)\n");
  83. printf(" listprotect <logicalnamemask> <id-mask> -- list protected files\n");
  84. printf(" checksuperfile <superfilename> [fix=true|false] -- check superfile links consistent and optionally fix\n");
  85. printf(" checksubfile <subfilename> -- check subfile links to parent consistent\n");
  86. printf(" listexpires <logicalnamemask> -- lists logical files with expiry value\n");
  87. printf(" listrelationships <primary> <secondary>\n");
  88. printf(" dfsperm <logicalname> -- returns LDAP permission for file\n");
  89. printf(" dfscompratio <logicalname> -- returns compression ratio of file\n");
  90. printf(" dfsscopes <mask> -- lists logical scopes (mask = * for all)\n");
  91. printf(" cleanscopes -- remove empty scopes\n");
  92. printf(" dfsreplication <clustermask> <logicalnamemask> <redundancy-count> -- set redundancy for files matching mask, on specified clusters only\n");
  93. printf(" holdlock <logicalfile> <read|write> -- hold a lock to the logical-file until a key is pressed");
  94. printf("\n");
  95. printf("Workunit commands:\n");
  96. printf(" listworkunits [<prop>=<val> [<lower> [<upper>]]] -- list workunits that match prop=val in workunit name range lower to upper\n");
  97. printf(" listmatches <connection xpath> [<match xpath>=<val> [<property xpath>]]\n");
  98. printf(" workunittimings <WUID>\n");
  99. printf("\n");
  100. printf("Other dali server and misc commands:\n");
  101. printf(" serverlist <mask> -- list server IPs (mask optional)\n");
  102. printf(" clusterlist <mask> -- list clusters (mask optional)\n");
  103. printf(" auditlog <fromdate> <todate> <match>\n");
  104. printf(" coalesce -- force transaction coalesce\n");
  105. printf(" mpping <server-ip> -- time MP connect\n");
  106. printf(" daliping [ <num> ] -- time dali server connect\n");
  107. printf(" getxref <destxmlfile> -- get all XREF information\n");
  108. printf(" dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
  109. printf(" unlock <xpath or logicalfile> <[path|file]> -- unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
  110. printf(" validatestore [fix=<true|false>]\n"
  111. " [verbose=<true|false>]\n"
  112. " [deletefiles=<true|false>]-- perform some checks on dali meta data an optionally fix or remove redundant info \n");
  113. printf(" stats <workunit> [<creator-type> <creator> <scope-type> <scope> <kind>|category'['value']',...]\n"
  114. " -- dump the statistics for a workunit\n");
  115. printf(" workunit <workunit> [true] -- dump workunit xml, if 2nd parameter equals true, will also include progress data\n");
  116. printf(" wuidcompress <wildcard> <type> -- scan workunits that match <wildcard> and compress resources of <type>\n");
  117. printf(" wuiddecompress <wildcard> <type> -- scan workunits that match <wildcard> and decompress resources of <type>\n");
  118. printf(" xmlsize <filename> [<percentage>] -- analyse size usage in xml file, display individual items above 'percentage' \n");
  119. printf(" migratefiles <src-group> <target-group> [<filemask>] [dryrun] [createmaps] [listonly] [verbose]\n");
  120. printf("\n");
  121. printf("Common options\n");
  122. printf(" server=<dali-server-ip> -- server ip\n");
  123. printf(" -- can be 1st param if numeric ip (or '.')\n");
  124. printf(" user=<username> -- for file operations\n");
  125. printf(" password=<password> -- for file operations\n");
  126. printf(" logfile=<filename> -- filename blank for no log\n");
  127. printf(" rawlog=0|1 -- if raw omits timestamps etc\n");
  128. printf(" timeout=<seconds> -- set dali connect timeout\n");
  129. }
  130. #define SDS_LOCK_TIMEOUT 60000
  131. static void outln(const char *ln)
  132. {
  133. PROGLOG("%s",ln);
  134. }
  135. #define OUTLOG PROGLOG
  136. static const char *remLeading(const char *s)
  137. {
  138. if (*s=='/')
  139. s++;
  140. return s;
  141. }
  142. static bool isWild(const char *path)
  143. {
  144. if (strchr(path,'?')||strchr(path,'*'))
  145. return true;
  146. return false;
  147. }
  148. static const char *splitpath(const char *path,StringBuffer &head,StringBuffer &tmp)
  149. {
  150. if (path[0]!='/')
  151. path = tmp.append('/').append(path).str();
  152. return splitXPath(path, head);
  153. }
  154. // NB: there's strtoll under Linux
  155. static unsigned __int64 hextoll(const char *str, bool &error)
  156. {
  157. unsigned len = strlen(str);
  158. if (!len)
  159. {
  160. error = true;
  161. return 0;
  162. }
  163. unsigned __int64 factor = 1;
  164. unsigned __int64 rolling = 0;
  165. char *ptr = (char *)str+len-1;
  166. for (;;) {
  167. char c = *ptr;
  168. unsigned v;
  169. if (isdigit(c))
  170. v = c-'0';
  171. else if (c>='A' && c<='F')
  172. v = 10+(c-'A');
  173. else if (c>='a' && c<='f')
  174. v = 10+(c-'a');
  175. else {
  176. error = true;
  177. return 0;
  178. }
  179. rolling += v * factor;
  180. factor <<= 4;
  181. if (ptr == str)
  182. break;
  183. --ptr;
  184. }
  185. error = false;
  186. return rolling;
  187. }
  188. static IRemoteConnection *connectXPathOrFile(const char *path,bool safe,StringBuffer &xpath)
  189. {
  190. CDfsLogicalFileName lfn;
  191. StringBuffer lfnpath;
  192. if ((strstr(path,"::")!=NULL)&&!strchr(path,'/')) {
  193. lfn.set(path);
  194. lfn.makeFullnameQuery(lfnpath,DXB_File);
  195. path = lfnpath.str();
  196. }
  197. else if (strchr(path+((*path=='/')?1:0),'/')==NULL)
  198. safe = true; // all root trees safe
  199. Owned<IRemoteConnection> conn = querySDS().connect(remLeading(path),myProcessSession(),safe?0:RTM_LOCK_READ, daliConnectTimeoutMs);
  200. if (!conn&&lfnpath.length()) {
  201. lfn.makeFullnameQuery(lfnpath.clear(),DXB_SuperFile);
  202. path = lfnpath.str();
  203. conn.setown(querySDS().connect(remLeading(path),myProcessSession(),safe?0:RTM_LOCK_READ, daliConnectTimeoutMs));
  204. }
  205. if (conn.get())
  206. xpath.append(path);
  207. return conn.getClear();
  208. }
  209. //=============================================================================
  210. static void _export_(const char *path,const char *dst,bool safe=false)
  211. {
  212. StringBuffer xpath;
  213. Owned<IRemoteConnection> conn = connectXPathOrFile(path,safe,xpath);
  214. if (!conn) {
  215. ERRLOG("Could not connect to %s",path);
  216. return;
  217. }
  218. Owned<IPropertyTree> root = conn->getRoot();
  219. Owned<IFile> f = createIFile(dst);
  220. Owned<IFileIO> io = f->open(IFOcreate);
  221. Owned<IFileIOStream> fstream = createBufferedIOStream(io);
  222. toXML(root, *fstream); // formatted (default)
  223. OUTLOG("Branch %s saved in '%s'",xpath.str(),dst);
  224. conn->close();
  225. }
  226. //==========================================================================================================
  227. static void import(const char *path,const char *src,bool add)
  228. {
  229. Owned<IFile> iFile = createIFile(src);
  230. Owned<IFileIO> iFileIO = iFile->open(IFOread);
  231. if (!iFileIO)
  232. {
  233. ERRLOG("Could not open to %s",src);
  234. return;
  235. }
  236. size32_t sz = (size32_t)iFile->size();
  237. StringBuffer xml;
  238. iFileIO->read(0, sz, xml.reserve(sz));
  239. Owned<IPropertyTree> branch = createPTreeFromXMLString(xml.str());
  240. StringBuffer head;
  241. StringBuffer tmp;
  242. const char *tail=splitpath(path,head,tmp);
  243. if (!tail)
  244. return;
  245. if (!add) {
  246. Owned<IRemoteConnection> bconn = querySDS().connect(remLeading(path),myProcessSession(),RTM_LOCK_READ|RTM_SUB, daliConnectTimeoutMs);
  247. if (bconn) {
  248. Owned<IPropertyTree> broot = bconn->getRoot();
  249. StringBuffer bakname;
  250. Owned<IFileIO> io = createUniqueFile(NULL, tail, "bak", bakname);
  251. OUTLOG("Saving backup of %s to %s",path,bakname.str());
  252. Owned<IFileIOStream> fstream = createBufferedIOStream(io);
  253. toXML(broot, *fstream); // formatted (default)
  254. }
  255. }
  256. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),0, daliConnectTimeoutMs);
  257. if (!conn) {
  258. ERRLOG("Could not connect to %s",path);
  259. return;
  260. }
  261. StringAttr newtail; // must be declared outside the following if
  262. Owned<IPropertyTree> root = conn->getRoot();
  263. if (!add) {
  264. Owned<IPropertyTree> child = root->getPropTree(tail);
  265. root->removeTree(child);
  266. //If replacing a qualified branch then remove the qualifiers before calling addProp
  267. const char * qualifier = strchr(tail, '[');
  268. if (qualifier)
  269. {
  270. newtail.set(tail, qualifier-tail);
  271. tail = newtail;
  272. }
  273. }
  274. Owned<IPropertyTree> oldEnvironment;
  275. if (streq(path,"Environment"))
  276. oldEnvironment.setown(createPTreeFromIPT(conn->queryRoot()));
  277. root->addPropTree(tail,LINK(branch));
  278. conn->commit();
  279. OUTLOG("Branch %s loaded from '%s'",path,src);
  280. conn->close();
  281. if (*path=='/')
  282. path++;
  283. if (strcmp(path,"Environment")==0) {
  284. OUTLOG("Refreshing cluster groups from Environment");
  285. StringBuffer response;
  286. initClusterGroups(false, response, oldEnvironment);
  287. if (response.length())
  288. PROGLOG("updating Environment via import path=%s : %s", path, response.str());
  289. }
  290. }
  291. //=============================================================================
  292. static void _delete_(const char *path,bool backup)
  293. {
  294. StringBuffer head;
  295. StringBuffer tmp;
  296. const char *tail=splitpath(path,head,tmp);
  297. if (!tail)
  298. return;
  299. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_WRITE, daliConnectTimeoutMs);
  300. if (!conn) {
  301. ERRLOG("Could not connect to %s",path);
  302. return;
  303. }
  304. Owned<IPropertyTree> root = conn->getRoot();
  305. Owned<IPropertyTree> child = root->getPropTree(tail);
  306. if (!child) {
  307. ERRLOG("Couldn't find %s/%s",head.str(),tail);
  308. return;
  309. }
  310. if (backup) {
  311. StringBuffer bakname;
  312. Owned<IFileIO> io = createUniqueFile(NULL,"daliadmin", "bak", bakname);
  313. OUTLOG("Saving backup of %s/%s to %s",head.str(),tail,bakname.str());
  314. Owned<IFileIOStream> fstream = createBufferedIOStream(io);
  315. toXML(child, *fstream); // formatted (default)
  316. }
  317. root->removeTree(child);
  318. child.clear();
  319. root.clear();
  320. conn->commit();
  321. conn->close();
  322. }
  323. //=============================================================================
  324. static void set(const char *path,const char *val)
  325. {
  326. StringBuffer head;
  327. StringBuffer tmp;
  328. const char *tail=splitpath(path,head,tmp);
  329. if (!tail)
  330. return;
  331. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_WRITE, daliConnectTimeoutMs);
  332. if (!conn) {
  333. ERRLOG("Could not connect to %s",path);
  334. return;
  335. }
  336. Owned<IPropertyTree> root = conn->getRoot();
  337. StringBuffer oldv;
  338. StringBuffer newv;
  339. root->getProp(tail,oldv);
  340. root->setProp(tail,val);
  341. conn->commit();
  342. root->getProp(tail,newv);
  343. OUTLOG("Changed %s from '%s' to '%s'",path,oldv.str(),newv.str());
  344. conn->close();
  345. }
  346. //=============================================================================
  347. static void get(const char *path)
  348. {
  349. StringBuffer head;
  350. StringBuffer tmp;
  351. const char *tail=splitpath(path,head,tmp);
  352. if (!tail)
  353. return;
  354. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  355. if (!conn) {
  356. ERRLOG("Could not connect to %s",path);
  357. return;
  358. }
  359. Owned<IPropertyTree> root = conn->getRoot();
  360. StringBuffer val;
  361. root->getProp(tail,val);
  362. OUTLOG("Value of %s is: '%s'",path,val.str());
  363. conn->close();
  364. }
  365. //=============================================================================
  366. static void bget(const char *path,const char *outfn)
  367. {
  368. StringBuffer head;
  369. StringBuffer tmp;
  370. const char *tail=splitpath(path,head,tmp);
  371. if (!tail)
  372. return;
  373. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  374. if (!conn) {
  375. ERRLOG("Could not connect to %s",path);
  376. return;
  377. }
  378. Owned<IPropertyTree> root = conn->getRoot();
  379. MemoryBuffer val;
  380. root->getPropBin(tail,val);
  381. Owned<IFile> f = createIFile(outfn);
  382. Owned<IFileIO> io = f->open(IFOcreate);
  383. io->write(0,val.length(),val.toByteArray());
  384. conn->close();
  385. }
  386. //=============================================================================
  387. static void xget(const char *path)
  388. {
  389. if (!path||!*path)
  390. return;
  391. Owned<IRemoteConnection> conn = querySDS().connect("/",myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  392. if (!conn) {
  393. ERRLOG("Could not connect to /");
  394. return;
  395. }
  396. Owned<IPropertyTree> root = conn->getRoot();
  397. StringBuffer head;
  398. StringBuffer tmp;
  399. const char *props=splitpath(path,head,tmp);
  400. const char *s = head.str();
  401. if (*s=='/')
  402. s++;
  403. Owned<IPropertyTreeIterator> it = root->getElements(s);
  404. if (it->first()) {
  405. unsigned idx = 0;
  406. do {
  407. idx++;
  408. StringBuffer res;
  409. res.append(idx).append(',');
  410. s = props;
  411. for (;;) {
  412. const char *e = strchr(s,',');
  413. if (e&&e[1]) {
  414. StringBuffer prop(e-s,s);
  415. it->query().getProp(prop.str(),res);
  416. s = e+1;
  417. res.append(',');
  418. }
  419. else {
  420. it->query().getProp(s,res);
  421. break;
  422. }
  423. }
  424. outln(res.str());
  425. } while (it->next());
  426. }
  427. conn->close();
  428. }
  429. //=============================================================================
  430. static void wget(const char *path)
  431. {
  432. StringBuffer head;
  433. StringBuffer tmp;
  434. const char *tail=splitpath(path,head,tmp);
  435. if (!tail)
  436. return;
  437. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  438. if (!conn) {
  439. ERRLOG("Could not connect to %s",path);
  440. return;
  441. }
  442. Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(tail);
  443. unsigned n = 0;
  444. ForEach(*iter) {
  445. n++;
  446. const char *s = iter->query().queryName();
  447. OUTLOG("%d,%s",n,s);
  448. }
  449. conn->close();
  450. }
  451. //=============================================================================
  452. static void add(const char *path, const char *val)
  453. {
  454. if (!path || !*path)
  455. throw makeStringException(0, "Invalid xpath (empty)");
  456. if ('/' == path[strlen(path)-1])
  457. throw makeStringException(0, "Invalid xpath (no trailing xpath node provided)");
  458. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_ADD, daliConnectTimeoutMs);
  459. if (!conn)
  460. {
  461. ERRLOG("Could not connect to %s", path);
  462. return;
  463. }
  464. VStringBuffer msg("Added %s", path);
  465. if (val)
  466. {
  467. conn->queryRoot()->setProp(NULL, val);
  468. msg.appendf(" (with value = '%s')", val);
  469. }
  470. OUTLOG("%s", msg.str());
  471. }
  472. //=============================================================================
  473. static void delv(const char *path)
  474. {
  475. StringBuffer head;
  476. StringBuffer tmp;
  477. const char *tail=splitpath(path,head,tmp);
  478. if (!tail)
  479. return;
  480. Owned<IRemoteConnection> conn = querySDS().connect(head.str(),myProcessSession(),RTM_LOCK_WRITE, daliConnectTimeoutMs);
  481. if (!conn) {
  482. ERRLOG("Could not connect to %s",path);
  483. return;
  484. }
  485. Owned<IPropertyTree> root = conn->getRoot();
  486. StringBuffer val;
  487. root->getProp(tail,val);
  488. root->removeProp(tail);
  489. OUTLOG("Value of %s was: '%s'",path,val.str());
  490. conn->close();
  491. }
  492. //=============================================================================
  493. static void count(const char *path)
  494. {
  495. unsigned result = querySDS().queryCount(path);
  496. OUTLOG("Count of %s is: %d", path, result);
  497. }
  498. //=============================================================================
  499. static void dfsfile(const char *lname,IUserDescriptor *userDesc, UnsignedArray *partslist=NULL)
  500. {
  501. StringBuffer str;
  502. CDfsLogicalFileName lfn;
  503. lfn.set(lname);
  504. if (!lfn.isExternal()) {
  505. Owned<IPropertyTree> tree = queryDistributedFileDirectory().getFileTree(lname,userDesc,NULL,daliConnectTimeoutMs,true); //,userDesc);
  506. if (partslist)
  507. filterParts(tree,*partslist);
  508. if (!tree) {
  509. ERRLOG("%s not found",lname);
  510. return;
  511. }
  512. toXML(tree, str);
  513. outln(str.str());
  514. }
  515. else {
  516. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lname,userDesc);
  517. if (file) {
  518. Owned<IFileDescriptor> fdesc = file->getFileDescriptor();
  519. Owned<IPropertyTree> t = createPTree("File");
  520. fdesc->serializeTree(*t);
  521. filterParts(t,*partslist);
  522. toXML(t, str.clear());
  523. outln(str.str());
  524. }
  525. }
  526. }
  527. //=============================================================================
  528. static void dfspart(const char *lname,IUserDescriptor *userDesc, unsigned partnum)
  529. {
  530. UnsignedArray partslist;
  531. partslist.append(partnum);
  532. dfsfile(lname,userDesc,&partslist);
  533. }
  534. //=============================================================================
  535. void dfscsv(const char *dali,IUserDescriptor *udesc)
  536. {
  537. const char *fields[] = {
  538. "name","group","directory","partmask","modified","job","owner","workunit","numparts","size","recordCount","recordSize","compressedSize",NULL
  539. };
  540. Owned<INode> foreigndali;
  541. if (dali&&*dali&&(*dali!='*')) {
  542. SocketEndpoint ep(dali,DALI_SERVER_PORT);
  543. foreigndali.setown(createINode(ep));
  544. }
  545. unsigned start = msTick();
  546. IDFAttributesIterator *iter = queryDistributedFileDirectory().getDFAttributesIterator("*",udesc,true,false,foreigndali);
  547. StringBuffer ln;
  548. unsigned i;
  549. for (i=0;fields[i];i++) {
  550. if (i>0)
  551. ln.append(',');
  552. ln.append('"').append(fields[i]).append('"');
  553. }
  554. outln(ln.str());
  555. if (iter) {
  556. StringBuffer aname;
  557. StringBuffer vals;
  558. ForEach(*iter) {
  559. IPropertyTree &attr=iter->query();
  560. ln.clear();
  561. for (i=0;fields[i];i++) {
  562. aname.clear().append('@').append(fields[i]);
  563. const char *val = attr.queryProp(aname.str());
  564. if (i>0)
  565. ln.append(',');
  566. if (val)
  567. while (*val) {
  568. if (*val!=',')
  569. ln.append(*val);
  570. val++;
  571. }
  572. }
  573. outln(ln.str());
  574. }
  575. }
  576. }
  577. //=============================================================================
  578. static void writeGroup(IGroup *group, const char *name, const char *outputFilename)
  579. {
  580. Owned<IFileIOStream> io;
  581. if (outputFilename)
  582. {
  583. OwnedIFile iFile = createIFile(outputFilename);
  584. OwnedIFileIO iFileIO = iFile->open(IFOcreate);
  585. io.setown(createIOStream(iFileIO));
  586. }
  587. StringBuffer eps;
  588. for (unsigned i=0;i<group->ordinality();i++)
  589. {
  590. group->queryNode(i).endpoint().getUrlStr(eps.clear());
  591. if (io)
  592. {
  593. eps.newline();
  594. io->write(eps.length(), eps.str());
  595. }
  596. else
  597. OUTLOG("%s",eps.str());
  598. }
  599. }
  600. unsigned dfsCheck(StringBuffer & path, IPropertyTree * tree)
  601. {
  602. const char * name = tree->queryProp("@name");
  603. //MORE: What other consistency checks can be added here?
  604. if (tree->hasProp("Attr[2]"))
  605. {
  606. printf("%s%s - duplicate Attr tag\n", path.str(), name ? name : "");
  607. return 1;
  608. }
  609. unsigned issues = 0;
  610. unsigned prevLength = path.length();
  611. if (name)
  612. path.append(name).append("::");
  613. Owned<IPropertyTreeIterator> elems = tree->getElements("*");
  614. ForEach(*elems)
  615. {
  616. issues += dfsCheck(path, &elems->query());
  617. }
  618. path.setLength(prevLength);
  619. return issues;
  620. }
  621. void dfsCheck()
  622. {
  623. StringBuffer xpath;
  624. Owned<IRemoteConnection> conn = querySDS().connect("Files",myProcessSession(),0, daliConnectTimeoutMs);
  625. if (!conn)
  626. {
  627. ERRLOG("Could not connect to %s","/Files");
  628. return;
  629. }
  630. StringBuffer path;
  631. dfsCheck(path, conn->queryRoot());
  632. }
  633. static void dfsGroup(const char *name, const char *outputFilename)
  634. {
  635. Owned<IGroup> group = queryNamedGroupStore().lookup(name);
  636. if (!group)
  637. {
  638. ERRLOG("cannot find group %s",name);
  639. return;
  640. }
  641. writeGroup(group, name, outputFilename);
  642. }
  643. static int clusterGroup(const char *name, const char *outputFilename)
  644. {
  645. StringBuffer errStr;
  646. try
  647. {
  648. Owned<IGroup> group = getClusterNodeGroup(name, "ThorCluster");
  649. if (group)
  650. {
  651. writeGroup(group, name, outputFilename);
  652. return 0; // success
  653. }
  654. errStr.appendf("cannot find group %s", name);
  655. }
  656. catch (IException *e)
  657. {
  658. e->errorMessage(errStr);
  659. e->Release();
  660. }
  661. ERRLOG("%s", errStr.str());
  662. return 1;
  663. }
  664. static IPropertyTree * selectLevel(IPropertyTree * root, const char * name)
  665. {
  666. StringBuffer xpath;
  667. xpath.append("*[@name='").append(name).append("']");
  668. Owned<IPropertyTree> match = root->getPropTree(xpath);
  669. if (match)
  670. return match.getClear();
  671. ERRLOG("Path %s not found", name);
  672. return nullptr;
  673. }
  674. static IPropertyTree * selectPath(IPropertyTree * root, const char * path)
  675. {
  676. if (!path || !*path) // use / to refer to the root directory
  677. return LINK(root);
  678. const char * split = strstr(path, "::");
  679. if (split)
  680. {
  681. //Can use :: to refer to the root directory
  682. if (split == path)
  683. return selectPath(root, split + 2);
  684. StringAttr name(path, split - path);
  685. Owned<IPropertyTree> match = selectLevel(root, name);
  686. if (match)
  687. return selectPath(match, split + 2);
  688. return nullptr;
  689. }
  690. return selectLevel(root, path);
  691. }
  692. static void displayDirectory(IPropertyTree * directory, const char * options, unsigned depth)
  693. {
  694. Owned<IPropertyTreeIterator> elems = directory->getElements("*");
  695. ForEach(*elems)
  696. {
  697. IPropertyTree & cur = elems->query();
  698. const char * tag = cur.queryName();
  699. const char * name = cur.queryProp("@name");
  700. const char * modified = cur.queryProp("@modified");
  701. if (name && tag)
  702. {
  703. if (strieq(tag, "Scope"))
  704. {
  705. OUTLOG("%*sD %s", depth, "", name);
  706. if (options && strchr(options, 'r'))
  707. displayDirectory(&cur, options, depth+1);
  708. }
  709. else if (strieq(tag, "File"))
  710. {
  711. const char * group = cur.queryProp("@group");
  712. const char * size = cur.queryProp("Attr[1]/@size");
  713. if (options && strchr(options, 'l'))
  714. OUTLOG("%*s %-30s %12s %s %s", depth, "", name, size ? size : "", group ? group : "?", modified ? modified : "");
  715. else
  716. OUTLOG("%*s %s", depth, "", name);
  717. }
  718. else if (strieq(tag, "SuperFile"))
  719. {
  720. if (options && strchr(options, 'l'))
  721. OUTLOG("%*sS %s %s (%d)", depth, "", name, modified ? modified : "", cur.getPropInt("@numsubfiles"));
  722. else
  723. OUTLOG("%*sS %s", depth, "", name);
  724. if (options && strchr(options, 's'))
  725. {
  726. Owned<IPropertyTreeIterator> subs = cur.getElements("SubFile");
  727. ForEach(*subs)
  728. {
  729. OUTLOG("%*s->%s", depth, "", subs->query().queryProp("@name"));
  730. }
  731. }
  732. }
  733. else
  734. OUTLOG("? %s %s", name, tag);
  735. }
  736. }
  737. }
  738. static void dfsLs(const char *name, const char *options, bool safe = false)
  739. {
  740. StringBuffer xpath;
  741. Owned<IRemoteConnection> conn = querySDS().connect("Files",myProcessSession(),0, daliConnectTimeoutMs);
  742. if (!conn)
  743. {
  744. ERRLOG("Could not connect to %s","/Files");
  745. return;
  746. }
  747. {
  748. Owned<IPropertyTree> directory = selectPath(conn->queryRoot(), name);
  749. if (directory)
  750. displayDirectory(directory, options, 0);
  751. }
  752. }
  753. //=============================================================================
  754. static void dfsmap(const char *lname, IUserDescriptor *user)
  755. {
  756. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lname,user);
  757. if (!file) {
  758. ERRLOG("File %s not found",lname);
  759. return;
  760. }
  761. Owned<IDistributedFilePartIterator> pi = file->getIterator();
  762. unsigned pn=1;
  763. StringBuffer ln;
  764. ForEach(*pi) {
  765. ln.clear().appendf("%d: ",pn);
  766. Owned<IDistributedFilePart> part = &pi->get();
  767. for (unsigned int i=0; i<part->numCopies(); i++) {
  768. RemoteFilename rfn;
  769. part->getFilename(rfn,i);
  770. if (i)
  771. ln.append(", ");
  772. rfn.getRemotePath(ln);
  773. }
  774. outln(ln.str());
  775. pn++;
  776. }
  777. }
  778. //=============================================================================
  779. static int dfsexists(const char *lname,IUserDescriptor *user)
  780. {
  781. return queryDistributedFileDirectory().exists(lname,user)?0:1;
  782. }
  783. //=============================================================================
  784. static void dfsparents(const char *lname, IUserDescriptor *user)
  785. {
  786. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lname,user,false,false,true);
  787. if (file) {
  788. Owned<IDistributedSuperFileIterator> iter = file->getOwningSuperFiles();
  789. ForEach(*iter)
  790. OUTLOG("%s,%s",iter->query().queryLogicalName(),lname);
  791. }
  792. }
  793. //=============================================================================
  794. static void dfsunlink(const char *lname, IUserDescriptor *user)
  795. {
  796. for (;;)
  797. {
  798. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lname,user,false,false,true);
  799. if (!file)
  800. {
  801. ERRLOG("File '%s' not found", lname);
  802. break;
  803. }
  804. Owned<IDistributedSuperFileIterator> iter = file->getOwningSuperFiles();
  805. if (!iter->first())
  806. break;
  807. file.clear();
  808. Owned<IDistributedSuperFile> sf = &iter->get();
  809. iter.clear();
  810. if (sf->removeSubFile(lname,false))
  811. OUTLOG("removed %s from %s",lname,sf->queryLogicalName());
  812. else
  813. ERRLOG("FAILED to remove %s from %s",lname,sf->queryLogicalName());
  814. }
  815. }
  816. //=============================================================================
  817. class CIpItem: public CInterface
  818. {
  819. public:
  820. bool ok;
  821. IpAddress ip;
  822. };
  823. class CIpTable: public SuperHashTableOf<CIpItem,IpAddress>
  824. {
  825. public:
  826. ~CIpTable()
  827. {
  828. _releaseAll();
  829. }
  830. void onAdd(void *)
  831. {
  832. // not used
  833. }
  834. void onRemove(void *e)
  835. {
  836. CIpItem &elem=*(CIpItem *)e;
  837. elem.Release();
  838. }
  839. unsigned getHashFromElement(const void *e) const
  840. {
  841. const CIpItem &elem=*(const CIpItem *)e;
  842. return elem.ip.iphash();
  843. }
  844. unsigned getHashFromFindParam(const void *fp) const
  845. {
  846. return ((const IpAddress *)fp)->iphash();
  847. }
  848. const void * getFindParam(const void *p) const
  849. {
  850. const CIpItem &elem=*(const CIpItem *)p;
  851. return &elem.ip;
  852. }
  853. bool matchesFindParam(const void * et, const void *fp, unsigned fphash) const
  854. {
  855. return ((CIpItem *)et)->ip.ipequals(*(IpAddress *)fp);
  856. }
  857. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CIpItem,IpAddress);
  858. bool verifyDaliFileServer(IpAddress &ip)
  859. {
  860. CIpItem *item=find(ip);
  861. if (!item) {
  862. item = new CIpItem;
  863. item->ip.ipset(ip);
  864. item->ok = testDaliServixPresent(ip);
  865. add(*item);
  866. }
  867. return item->ok;
  868. }
  869. };
  870. class CFileCrcItem: public CInterface
  871. {
  872. public:
  873. RemoteFilename filename;
  874. unsigned requiredcrc;
  875. unsigned crc;
  876. unsigned partno;
  877. unsigned copy;
  878. bool ok;
  879. byte flags;
  880. CDateTime dt;
  881. };
  882. #define FLAG_ROW_COMPRESSED 1
  883. #define FLAG_NO_CRC 2
  884. class CFileList: public CIArrayOf<CFileCrcItem>
  885. {
  886. public:
  887. void add(RemoteFilename &filename,unsigned partno,unsigned copy,unsigned crc,byte flags)
  888. {
  889. CFileCrcItem *item = new CFileCrcItem();
  890. item->filename.set(filename);
  891. item->partno = partno;
  892. item->copy = copy;
  893. item->crc = crc;
  894. item->requiredcrc = crc;
  895. item->flags = flags;
  896. append(*item);
  897. }
  898. };
  899. static int dfsverify(const char *name,CDateTime *cutoff, IUserDescriptor *user)
  900. {
  901. static CIpTable dafilesrvips;
  902. Owned<IDistributedFile> file=queryDistributedFileDirectory().lookup(name,user);
  903. if (!file) {
  904. ERRLOG("VERIFY: cannot find %s",name);
  905. return 1;
  906. }
  907. CDateTime filetime;
  908. if (file->getModificationTime(filetime)) {
  909. if (cutoff&&(filetime.compare(*cutoff)<=0))
  910. return 0;
  911. }
  912. IPropertyTree &fileprops = file->queryAttributes();
  913. bool blocked;
  914. bool rowcompressed = file->isCompressed(&blocked)&&!blocked;
  915. CFileList list;
  916. unsigned width = file->numParts();
  917. unsigned short port = getDaliServixPort();
  918. try {
  919. for (unsigned i=0;i<width;i++) {
  920. Owned<IDistributedFilePart> part = file->getPart(i);
  921. for (unsigned copy = 0; copy < part->numCopies(); copy++) {
  922. unsigned reqcrc;
  923. bool noreq = !part->getCrc(reqcrc);
  924. // if (reqcrc==(unsigned)-1)
  925. // continue;
  926. SocketEndpoint ep(part->queryNode()->endpoint());
  927. if (!dafilesrvips.verifyDaliFileServer(ep)) {
  928. StringBuffer ips;
  929. ep.getIpText(ips);
  930. ERRLOG("VERIFY: file %s, cannot run DAFILESRV on %s",name,ips.str());
  931. return 4;
  932. }
  933. RemoteFilename rfn;
  934. part->getFilename(rfn,copy);
  935. rfn.setPort(port);
  936. list.add(rfn,i,copy,reqcrc,rowcompressed?FLAG_ROW_COMPRESSED:(noreq?FLAG_NO_CRC:0));
  937. }
  938. }
  939. }
  940. catch (IException *e)
  941. {
  942. StringBuffer s;
  943. s.appendf("VERIFY: file %s",name);
  944. EXCLOG(e, s.str());
  945. e->Release();
  946. return 2;
  947. }
  948. if (list.ordinality()==0)
  949. return 0;
  950. OUTLOG("VERIFY: start file %s",name);
  951. file.clear();
  952. CriticalSection crit;
  953. class casyncfor: public CAsyncFor
  954. {
  955. CFileList &list;
  956. CriticalSection &crit;
  957. public:
  958. bool ok;
  959. casyncfor(CFileList &_list, CriticalSection &_crit)
  960. : list(_list), crit(_crit)
  961. {
  962. ok = true;
  963. }
  964. void Do(unsigned i)
  965. {
  966. CriticalBlock block(crit);
  967. CFileCrcItem &item = list.item(i);
  968. RemoteFilename &rfn = item.filename;
  969. Owned<IFile> partfile;
  970. StringBuffer eps;
  971. try
  972. {
  973. partfile.setown(createIFile(rfn));
  974. // OUTLOG("VERIFY: part %s on %s",partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
  975. if (partfile) {
  976. CriticalUnblock unblock(crit);
  977. item.crc = partfile->getCRC();
  978. partfile->getTime(NULL,&item.dt,NULL);
  979. if ((item.crc==0)&&!partfile->exists()) {
  980. ERRLOG("VERIFY: does not exist part %s on %s",partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
  981. ok = false;
  982. }
  983. }
  984. else
  985. ok = false;
  986. }
  987. catch (IException *e)
  988. {
  989. StringBuffer s;
  990. s.appendf("VERIFY: part %s on %s",partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
  991. EXCLOG(e, s.str());
  992. e->Release();
  993. ok = false;
  994. }
  995. }
  996. } afor(list,crit);
  997. afor.For(list.ordinality(),400,false,true);
  998. StringBuffer outs;
  999. ForEachItemIn(j,list) {
  1000. CFileCrcItem &item = list.item(j);
  1001. item.filename.setPort(0);
  1002. if (item.crc!=item.requiredcrc) {
  1003. StringBuffer rfs;
  1004. ERRLOG("VERIFY: FAILED %s (%x,%x) file %s",name,item.crc,item.requiredcrc,item.filename.getRemotePath(rfs).str());
  1005. afor.ok = false;
  1006. }
  1007. }
  1008. if (afor.ok) {
  1009. OUTLOG("VERIFY: OK file %s",name);
  1010. return 0;
  1011. }
  1012. return 3;
  1013. }
  1014. //=============================================================================
  1015. static void setprotect(const char *filename, const char *callerid, IUserDescriptor *user)
  1016. {
  1017. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(filename,user);
  1018. file->setProtect(callerid,true);
  1019. }
  1020. //=============================================================================
  1021. static void unprotect(const char *filename, const char *callerid, IUserDescriptor *user)
  1022. {
  1023. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(filename,user);
  1024. file->setProtect((strcmp(callerid,"*")==0)?NULL:callerid,false);
  1025. }
  1026. //=============================================================================
  1027. static void listprotect(const char *filename, const char *callerid)
  1028. {
  1029. Owned<IDFProtectedIterator> piter = queryDistributedFileDirectory().lookupProtectedFiles((strcmp(callerid,"*")==0)?NULL:callerid);
  1030. ForEach(*piter) {
  1031. if (WildMatch(piter->queryFilename(),filename))
  1032. OUTLOG("%s,%s,%s,%u",piter->isSuper()?"SuperFile":"File",piter->queryFilename(),piter->queryOwner(),piter->getCount());
  1033. }
  1034. }
  1035. //=============================================================================
  1036. static bool allyes = false;
  1037. static bool getResponse()
  1038. {
  1039. if (allyes)
  1040. return true;
  1041. int ch;
  1042. do
  1043. {
  1044. ch = toupper(ch = _getch());
  1045. } while (ch != 'Y' && ch != 'N' && ch != '*');
  1046. _putch(ch);
  1047. _putch('\n');
  1048. if (ch=='*') {
  1049. allyes = true;
  1050. return true;
  1051. }
  1052. return ch=='Y' ? true : false;
  1053. }
  1054. static bool doFix()
  1055. {
  1056. if (allyes)
  1057. return true;
  1058. printf("Fix? (Y/N/*):");
  1059. return getResponse();
  1060. }
  1061. static void checksuperfile(const char *lfn,bool fix=false)
  1062. {
  1063. if (strcmp(lfn,"*")==0) {
  1064. class csuperfilescan: public CSDSFileScanner
  1065. {
  1066. virtual bool checkScopeOk(const char *scopename)
  1067. {
  1068. OUTLOG("Processing scope %s",scopename);
  1069. return true;
  1070. }
  1071. void processSuperFile(IPropertyTree &superfile,StringBuffer &name)
  1072. {
  1073. try {
  1074. checksuperfile(name.str(),fix);
  1075. }
  1076. catch (IException *e) {
  1077. EXCLOG(e,"processSuperFiles");
  1078. e->Release();
  1079. }
  1080. }
  1081. public:
  1082. bool fix;
  1083. } superfilescan;
  1084. superfilescan.fix = fix;
  1085. Owned<IRemoteConnection> conn = querySDS().connect("/Files", myProcessSession(), 0, 100000);
  1086. superfilescan.scan(conn,false,true);
  1087. return;
  1088. }
  1089. bool fixed = false;
  1090. CDfsLogicalFileName lname;
  1091. lname.set(lfn);
  1092. StringBuffer query;
  1093. lname.makeFullnameQuery(query, DXB_SuperFile, true);
  1094. Owned<IRemoteConnection> conn = querySDS().connect(query.str(),myProcessSession(),fix?RTM_LOCK_WRITE:0, daliConnectTimeoutMs);
  1095. if (!conn) {
  1096. ERRLOG("Could not connect to %s",lfn);
  1097. ERRLOG("Superfile %s FAILED",lname.get());
  1098. return;
  1099. }
  1100. Owned<IPropertyTree> root = conn->getRoot();
  1101. unsigned n=root->getPropInt("@numsubfiles");
  1102. StringBuffer path;
  1103. StringBuffer subname;
  1104. unsigned subnum = 0;
  1105. unsigned i;
  1106. for (i=0;i<n;i++) {
  1107. for (;;) {
  1108. IPropertyTree *sub2 = root->queryPropTree(path.clear().appendf("SubFile[@num=\"%d\"][2]",i+1).str());
  1109. if (!sub2)
  1110. break;
  1111. StringBuffer s;
  1112. s.appendf("SuperFile %s: corrupt, subfile file part %d is duplicated",lname.get(),i+1);
  1113. ERRLOG("%s",s.str());
  1114. if (!fix||!doFix()) {
  1115. ERRLOG("Superfile %s FAILED",lname.get());
  1116. return;
  1117. }
  1118. root->removeProp(path.str());
  1119. }
  1120. IPropertyTree *sub = root->queryPropTree(path.clear().appendf("SubFile[@num=\"%d\"]",i+1).str());
  1121. if (!sub) {
  1122. StringBuffer s;
  1123. s.appendf("SuperFile %s: corrupt, subfile file part %d cannot be found",lname.get(),i+1);
  1124. ERRLOG("%s",s.str());
  1125. if (!fix||!doFix()) {
  1126. ERRLOG("Superfile %s FAILED",lname.get());
  1127. return;
  1128. }
  1129. fixed = true;
  1130. break;
  1131. }
  1132. sub->getProp("@name",subname.clear());
  1133. CDfsLogicalFileName sublname;
  1134. sublname.set(subname.str());
  1135. if (!sublname.isExternal()&&!sublname.isForeign()) {
  1136. StringBuffer subquery;
  1137. sublname.makeFullnameQuery(subquery, DXB_File, true);
  1138. Owned<IRemoteConnection> subconn = querySDS().connect(subquery.str(),myProcessSession(),fix?RTM_LOCK_WRITE:0, daliConnectTimeoutMs);
  1139. if (!subconn) {
  1140. sublname.makeFullnameQuery(subquery.clear(), DXB_SuperFile, true);
  1141. subconn.setown(querySDS().connect(subquery.str(),myProcessSession(),0, daliConnectTimeoutMs));
  1142. }
  1143. if (!subconn) {
  1144. ERRLOG("SuperFile %s is missing sub-file file %s",lname.get(),subname.str());
  1145. if (!fix||!doFix()) {
  1146. ERRLOG("Superfile %s FAILED",lname.get());
  1147. return;
  1148. }
  1149. root->removeTree(sub);
  1150. for (unsigned j=i+1;j<n; j++) {
  1151. sub = root->queryPropTree(path.clear().appendf("SubFile[@num=\"%d\"]",j+1).str());
  1152. if (sub)
  1153. sub->setPropInt("@num",j);
  1154. }
  1155. i--;
  1156. n--;
  1157. fixed = true;
  1158. continue;
  1159. }
  1160. subnum++;
  1161. Owned<IPropertyTree> subroot = subconn->getRoot();
  1162. Owned<IPropertyTreeIterator> iter = subroot->getElements("SuperOwner");
  1163. StringBuffer pname;
  1164. bool parentok=false;
  1165. ForEach(*iter) {
  1166. iter->query().getProp("@name",pname.clear());
  1167. if (strcmp(pname.str(),lname.get())==0)
  1168. parentok = true;
  1169. else {
  1170. CDfsLogicalFileName sdlname;
  1171. sdlname.set(pname.str());
  1172. StringBuffer sdquery;
  1173. sdlname.makeFullnameQuery(sdquery, DXB_SuperFile, true);
  1174. Owned<IRemoteConnection> sdconn = querySDS().connect(sdquery.str(),myProcessSession(),0, daliConnectTimeoutMs);
  1175. if (!conn) {
  1176. WARNLOG("SubFile %s has missing owner superfile %s",sublname.get(),sdlname.get());
  1177. }
  1178. // make sure superfile exists
  1179. }
  1180. }
  1181. if (!parentok) {
  1182. WARNLOG("SubFile %s is missing link to Superfile %s",sublname.get(),lname.get());
  1183. ForEach(*iter) {
  1184. iter->query().getProp("@name",pname.clear());
  1185. OUTLOG("Candidate %s",pname.str());
  1186. }
  1187. if (fix&&doFix()) {
  1188. Owned<IPropertyTree> t = createPTree("SuperOwner");
  1189. t->setProp("@name",lname.get());
  1190. subroot->addPropTree("SuperOwner",t.getClear());
  1191. }
  1192. }
  1193. }
  1194. else
  1195. subnum++;
  1196. }
  1197. if (fixed)
  1198. root->setPropInt("@numsubfiles",subnum);
  1199. i = 0;
  1200. byte fixstate = 0;
  1201. for (;;) {
  1202. bool err = false;
  1203. IPropertyTree *sub = root->queryPropTree(path.clear().appendf("SubFile[%d]",i+1).str());
  1204. if (sub) {
  1205. unsigned pn = sub->getPropInt("@num");
  1206. if (pn>subnum) {
  1207. ERRLOG("SuperFile %s: corrupt, subfile file part %d spurious",lname.get(),pn);
  1208. if (fixstate==0)
  1209. {
  1210. if (fix&&doFix())
  1211. fixstate = 1;
  1212. else
  1213. fixstate = 2;
  1214. }
  1215. if (fixstate==1) {
  1216. root->removeTree(sub);
  1217. fixed = true;
  1218. i--;
  1219. }
  1220. }
  1221. }
  1222. else
  1223. break;
  1224. i++;
  1225. }
  1226. if (n==0) {
  1227. IPropertyTree *sub = root->queryPropTree("Attr");
  1228. if (!isEmptyPTree(sub)&&!sub->queryProp("description")) {
  1229. if (fix) {
  1230. if (!fixed)
  1231. ERRLOG("FIX Empty Superfile %s contains non-empty Attr",lname.get());
  1232. root->removeTree(sub);
  1233. }
  1234. else if (sub->getPropInt64("@recordCount")||sub->getPropInt64("@size"))
  1235. ERRLOG("FAIL Empty Superfile %s contains non-empty Attr sz=%" I64F "d rc=%" I64F "d",lname.get(),sub->getPropInt64("@recordCount"),sub->getPropInt64("@size"));
  1236. }
  1237. }
  1238. if (fixed)
  1239. OUTLOG("Superfile %s FIXED - from %d to %d subfiles",lname.get(),n,subnum);
  1240. else
  1241. OUTLOG("Superfile %s OK - contains %d subfiles",lname.get(),n);
  1242. }
  1243. //=============================================================================
  1244. static void checksubfile(const char *lfn)
  1245. {
  1246. if (strcmp(lfn,"*")==0) {
  1247. class csubfilescan: public CSDSFileScanner
  1248. {
  1249. virtual bool checkFileOk(IPropertyTree &file,const char *filename)
  1250. {
  1251. return (file.hasProp("SuperOwner[1]"));
  1252. }
  1253. virtual bool checkSuperFileOk(IPropertyTree &file,const char *filename)
  1254. {
  1255. return (file.hasProp("SuperOwner[1]"));
  1256. }
  1257. virtual bool checkScopeOk(const char *scopename)
  1258. {
  1259. OUTLOG("Processing scope %s",scopename);
  1260. return true;
  1261. }
  1262. void processFile(IPropertyTree &root,StringBuffer &name)
  1263. {
  1264. try {
  1265. checksubfile(name.str());
  1266. }
  1267. catch (IException *e) {
  1268. EXCLOG(e,"processSuperFiles");
  1269. e->Release();
  1270. }
  1271. }
  1272. void processSuperFile(IPropertyTree &root,StringBuffer &name)
  1273. {
  1274. try {
  1275. checksubfile(name.str());
  1276. }
  1277. catch (IException *e) {
  1278. EXCLOG(e,"processSuperFiles");
  1279. e->Release();
  1280. }
  1281. }
  1282. public:
  1283. } subfilescan;
  1284. Owned<IRemoteConnection> conn = querySDS().connect("/Files", myProcessSession(), 0, 100000);
  1285. subfilescan.scan(conn,true,true);
  1286. return;
  1287. }
  1288. CDfsLogicalFileName lname;
  1289. lname.set(lfn);
  1290. StringBuffer query;
  1291. lname.makeFullnameQuery(query, DXB_File, true);
  1292. Owned<IRemoteConnection> conn = querySDS().connect(query.str(),myProcessSession(),0, daliConnectTimeoutMs);
  1293. if (!conn) {
  1294. lname.makeFullnameQuery(query.clear(), DXB_SuperFile, true);
  1295. conn.setown(querySDS().connect(query.str(),myProcessSession(),0, daliConnectTimeoutMs));
  1296. }
  1297. if (!conn) {
  1298. ERRLOG("Could not connect to %s",lfn);
  1299. ERRLOG("Subfile %s FAILED",lname.get());
  1300. return;
  1301. }
  1302. Owned<IPropertyTree> root = conn->getRoot();
  1303. Owned<IPropertyTreeIterator> iter = root->getElements("SuperOwner");
  1304. StringBuffer pname;
  1305. bool ok=true;
  1306. ForEach(*iter) {
  1307. iter->query().getProp("@name",pname.clear());
  1308. CDfsLogicalFileName sdlname;
  1309. sdlname.set(pname.str());
  1310. StringBuffer sdquery;
  1311. sdlname.makeFullnameQuery(sdquery, DXB_SuperFile, true);
  1312. Owned<IRemoteConnection> sdconn = querySDS().connect(sdquery.str(),myProcessSession(),0, daliConnectTimeoutMs);
  1313. if (!conn) {
  1314. ERRLOG("SubFile %s has missing owner superfile %s",lname.get(),sdlname.get());
  1315. ok = false;
  1316. }
  1317. else {
  1318. StringBuffer path;
  1319. IPropertyTree *sub = sdconn->queryRoot()->queryPropTree(path.clear().appendf("SubFile[@name=\"%s\"]",lname.get()).str());
  1320. if (!sub) {
  1321. ERRLOG("Superfile %s is not linked to %s",sdlname.get(),lname.get());
  1322. ok = false;
  1323. }
  1324. }
  1325. }
  1326. if (ok)
  1327. OUTLOG("SubFile %s OK",lname.get());
  1328. }
  1329. //=============================================================================
  1330. static void listexpires(const char * lfnmask, IUserDescriptor *user)
  1331. {
  1332. IDFAttributesIterator *iter = queryDistributedFileDirectory().getDFAttributesIterator(lfnmask,user,true,false);
  1333. ForEach(*iter) {
  1334. IPropertyTree &attr=iter->query();
  1335. if (attr.hasProp("@expireDays"))
  1336. {
  1337. unsigned expireDays = attr.getPropInt("@expireDays");
  1338. const char *name = attr.queryProp("@name");
  1339. const char *lastAccessed = attr.queryProp("@accessed");
  1340. if (lastAccessed && name&&*name) // NB: all files that have expireDays should have lastAccessed also
  1341. {
  1342. StringBuffer days;
  1343. if (0 == expireDays)
  1344. days.append("the sasha default number of days");
  1345. else
  1346. {
  1347. days.append(expireDays);
  1348. if (expireDays>1)
  1349. days.append(" days");
  1350. else
  1351. days.append(" day");
  1352. }
  1353. OUTLOG("%s, last accessed = %s, set to expire %s after last accessed", name, lastAccessed, days.str());
  1354. }
  1355. }
  1356. }
  1357. }
  1358. //=============================================================================
  1359. static void listrelationships(const char *primary,const char *secondary)
  1360. {
  1361. Owned<IFileRelationshipIterator> iter = queryDistributedFileDirectory().lookupFileRelationships(primary,secondary,NULL,NULL,S_LINK_RELATIONSHIP_KIND,NULL,NULL,NULL);
  1362. ForEach(*iter) {
  1363. OUTLOG("%s,%s,%s,%s,%s,%s,%s,%s",
  1364. iter->query().queryKind(),
  1365. iter->query().queryPrimaryFilename(),
  1366. iter->query().querySecondaryFilename(),
  1367. iter->query().queryPrimaryFields(),
  1368. iter->query().querySecondaryFields(),
  1369. iter->query().queryCardinality(),
  1370. iter->query().isPayload()?"payload":"",
  1371. iter->query().queryDescription());
  1372. }
  1373. }
  1374. //=============================================================================
  1375. int dfsperm(const char *obj,IUserDescriptor *user)
  1376. {
  1377. SecAccessFlags perm = SecAccess_None;
  1378. if (strchr(obj,'\\')||strchr(obj,'/')) {
  1379. Owned<IFileDescriptor> fd = createFileDescriptor();
  1380. RemoteFilename rfn;
  1381. rfn.setRemotePath(obj);
  1382. fd->setPart(0, rfn);
  1383. perm = queryDistributedFileDirectory().getFDescPermissions(fd,user,0);
  1384. }
  1385. else {
  1386. perm = queryDistributedFileDirectory().getFilePermissions(obj,user,0);
  1387. }
  1388. OUTLOG("perm %s = %d",obj,perm);
  1389. return perm;
  1390. }
  1391. //=============================================================================
  1392. static offset_t getCompressedSize(IDistributedFile *file)
  1393. { // this should be parallel! TBD
  1394. if (!file)
  1395. return (offset_t)-1;
  1396. offset_t ret = (offset_t)file->queryAttributes().getPropInt64("@compressedSize",-1);
  1397. if (ret==(offset_t)-1) {
  1398. try {
  1399. ret = 0;
  1400. Owned<IDistributedFilePartIterator> piter = file->getIterator();
  1401. ForEach(*piter) {
  1402. IDistributedFilePart &part = piter->query();
  1403. offset_t sz = (offset_t)-1;
  1404. for (unsigned c=0;c<part.numCopies();c++) {
  1405. RemoteFilename rfn;
  1406. part.getFilename(rfn,c);
  1407. try {
  1408. Owned<IFile> file = createIFile(rfn);
  1409. sz = file->size();
  1410. }
  1411. catch (IException *e) {
  1412. StringBuffer tmp("getCompressedSize(1): ");
  1413. rfn.getPath(tmp);
  1414. EXCLOG(e,tmp.str());
  1415. sz = (offset_t)-1;
  1416. e->Release();
  1417. }
  1418. if (sz!=(offset_t)-1)
  1419. break;
  1420. }
  1421. if (sz==(offset_t)-1) {
  1422. ret = (offset_t)-1;
  1423. break;
  1424. }
  1425. ret += sz;
  1426. }
  1427. }
  1428. catch (IException *e) {
  1429. EXCLOG(e,"getCompressedSize");
  1430. ret = (offset_t)-1;
  1431. e->Release();
  1432. }
  1433. }
  1434. return ret;
  1435. }
  1436. static void dfscompratio (const char *lname, IUserDescriptor *user)
  1437. {
  1438. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lname,user);
  1439. StringBuffer out;
  1440. out.appendf("File %s ",lname);
  1441. if (file) {
  1442. bool compressed = file->isCompressed();
  1443. if (!compressed)
  1444. out.append("not ");
  1445. out.append("compressed, ");
  1446. offset_t size = file->getFileSize(true,false);
  1447. if (size==(offset_t)-1)
  1448. out.appendf("size not known");
  1449. else if (compressed) {
  1450. out.appendf("expanded size %" I64F "d, ",size);
  1451. offset_t csize = getCompressedSize(file);
  1452. if (csize==(offset_t)-1)
  1453. out.append("compressed size unknown");
  1454. else {
  1455. out.appendf("compressed size %" I64F "d",csize);
  1456. if (csize)
  1457. out.appendf(", Ratio %.2f:1 (%%%d)",(float)size/csize,(unsigned)(csize*100/size));
  1458. }
  1459. }
  1460. else
  1461. out.appendf("not compressed, size %" I64F "d",size);
  1462. }
  1463. else
  1464. out.appendf("File %s not found",lname);
  1465. outln(out.str());
  1466. }
  1467. //=============================================================================
  1468. static bool onlyNamePtree(IPropertyTree *t)
  1469. {
  1470. if (!t)
  1471. return true;
  1472. if (t->numUniq())
  1473. return false;
  1474. Owned<IAttributeIterator> ai = t->getAttributes();
  1475. if (ai->first()) {
  1476. if (strcmp(ai->queryName(),"@name")!=0)
  1477. return false;
  1478. if (ai->next())
  1479. return false;
  1480. }
  1481. const char *s = t->queryProp(NULL);
  1482. if (s&&*s)
  1483. return false;
  1484. return true;
  1485. }
  1486. static bool countScopeChildren(IPropertyTree *t,unsigned &files, unsigned &sfiles, unsigned &scopes, unsigned &other)
  1487. {
  1488. scopes = 0;
  1489. files = 0;
  1490. sfiles = 0;
  1491. other = 0;
  1492. if (!t)
  1493. return false;
  1494. Owned<IPropertyTreeIterator> it = t->getElements("*");
  1495. ForEach(*it) {
  1496. IPropertyTree *st = &it->query();
  1497. const char *s = st?st->queryName():NULL;
  1498. if (!s)
  1499. other++;
  1500. else if (stricmp(s,queryDfsXmlBranchName(DXB_File))==0)
  1501. files++;
  1502. else if (stricmp(s,queryDfsXmlBranchName(DXB_SuperFile))==0)
  1503. sfiles++;
  1504. else if (stricmp(s,queryDfsXmlBranchName(DXB_Scope))==0)
  1505. scopes++;
  1506. else
  1507. other++;
  1508. }
  1509. return (other!=0)||(files!=0)||(sfiles!=0)||(scopes!=0)||(!onlyNamePtree(t));
  1510. }
  1511. static void dfsscopes(const char *name, IUserDescriptor *user)
  1512. {
  1513. bool wild = isWild(name);
  1514. Owned<IDFScopeIterator> iter = queryDistributedFileDirectory().getScopeIterator(user,wild?NULL:name,true,true);
  1515. StringBuffer ln;
  1516. ForEach(*iter) {
  1517. CDfsLogicalFileName dlfn;
  1518. StringBuffer scope;
  1519. if (!wild&&name&&*name&&(strcmp(name,".")!=0))
  1520. scope.append(name).append("::");
  1521. scope.append(iter->query());
  1522. if (wild&&!WildMatch(scope.str(),name))
  1523. continue;
  1524. dlfn.set(scope.str(),"x");
  1525. StringBuffer s;
  1526. dlfn.makeScopeQuery(s,true);
  1527. ln.clear().append("SCOPE '").append(iter->query()).append('\'');
  1528. Owned<IRemoteConnection> conn = querySDS().connect(s.str(),myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  1529. if (!conn)
  1530. ERRLOG("%s - Could not connect using %s",ln.str(),s.str());
  1531. else {
  1532. unsigned files;
  1533. unsigned sfiles;
  1534. unsigned scopes;
  1535. unsigned other;
  1536. if (countScopeChildren(conn->queryRoot(),files,sfiles,scopes,other)) {
  1537. ln.appendf(" Files=%d SuperFiles=%d Scopes=%d",files,sfiles,scopes);
  1538. if (other)
  1539. ln.appendf(" others=%d",other);
  1540. OUTLOG("%s",ln.str());
  1541. }
  1542. else
  1543. OUTLOG("%s EMPTY",ln.str());
  1544. }
  1545. }
  1546. }
  1547. //=============================================================================
  1548. static bool recursiveCheckEmptyScope(IPropertyTree &ct)
  1549. {
  1550. Owned<IPropertyTreeIterator> iter = ct.getElements("*");
  1551. ForEach(*iter) {
  1552. IPropertyTree &item = iter->query();
  1553. const char *n = item.queryName();
  1554. if (!n||(strcmp(n,queryDfsXmlBranchName(DXB_Scope))!=0))
  1555. return false;
  1556. if (!recursiveCheckEmptyScope(item))
  1557. return false;
  1558. }
  1559. return true;
  1560. }
  1561. static void cleanscopes(IUserDescriptor *user)
  1562. {
  1563. Owned<IDFScopeIterator> iter = queryDistributedFileDirectory().getScopeIterator(user, NULL,true,true);
  1564. CDfsLogicalFileName dlfn;
  1565. StringBuffer s;
  1566. StringArray toremove;
  1567. ForEach(*iter) {
  1568. CDfsLogicalFileName dlfn;
  1569. StringBuffer scope;
  1570. scope.append(iter->query());
  1571. dlfn.set(scope.str(),"x");
  1572. dlfn.makeScopeQuery(s.clear(),true);
  1573. Owned<IRemoteConnection> conn = querySDS().connect(s.str(),myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  1574. if (!conn)
  1575. DBGLOG("Could not connect to '%s' using %s",iter->query(),s.str());
  1576. else {
  1577. if (recursiveCheckEmptyScope(*conn->queryRoot())) {
  1578. toremove.append(iter->query());
  1579. PROGLOG("EMPTY %s, %s",iter->query(),s.str());
  1580. }
  1581. }
  1582. }
  1583. iter.clear();
  1584. ForEachItemIn(i,toremove) {
  1585. PROGLOG("REMOVE %s",toremove.item(i));
  1586. try {
  1587. queryDistributedFileDirectory().removeEmptyScope(toremove.item(i));
  1588. }
  1589. catch (IException *e) {
  1590. EXCLOG(e,"checkScopes");
  1591. e->Release();
  1592. }
  1593. }
  1594. }
  1595. //=============================================================================
  1596. static void listworkunits(const char *test, const char *min, const char *max)
  1597. {
  1598. Owned<IRemoteConnection> conn = querySDS().connect("/", myProcessSession(), 0, daliConnectTimeoutMs);
  1599. Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements("WorkUnits/*");
  1600. ForEach(*iter)
  1601. {
  1602. IPropertyTree &e=iter->query();
  1603. if (test&&*test) {
  1604. const char *tval = strchr(test,'=');
  1605. if (!tval)
  1606. {
  1607. ERRLOG("missing '=' in %s",test);
  1608. return;
  1609. }
  1610. StringBuffer prop;
  1611. if (*test!='@')
  1612. prop.append('@');
  1613. prop.append(tval-test,test);
  1614. tval++;
  1615. const char *val = e.queryProp(prop.str());
  1616. if (!val||(strcmp(val,tval)!=0))
  1617. continue;
  1618. if (min &&(strcmp(e.queryName(),min)<0))
  1619. continue;
  1620. if (max &&(strcmp(e.queryName(),max)>0))
  1621. continue;
  1622. }
  1623. outln(e.queryName());
  1624. }
  1625. }
  1626. //=============================================================================
  1627. static void listmatches(const char *path, const char *match, const char *pval)
  1628. {
  1629. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), 0, daliConnectTimeoutMs);
  1630. if (!conn)
  1631. {
  1632. PROGLOG("Failed to connect to %s", path);
  1633. return;
  1634. }
  1635. StringBuffer output("Listing matches for path=");
  1636. output.append(path);
  1637. if (match)
  1638. {
  1639. output.append(", match=").append(match);
  1640. if (pval)
  1641. output.append(", property value = ").append(pval);
  1642. }
  1643. Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(match?match:"*", iptiter_remote);
  1644. ForEach(*iter)
  1645. {
  1646. IPropertyTree &e=iter->query();
  1647. output.clear().append(e.queryName());
  1648. const char *val = e.queryProp(pval?pval:NULL);
  1649. if (val)
  1650. output.append(" = ").append(val);
  1651. outln(output.str());
  1652. }
  1653. }
  1654. //=============================================================================
  1655. static void dfsreplication(const char *clusterMask, const char *lfnMask, unsigned redundancy, bool dryRun)
  1656. {
  1657. StringBuffer findXPath("//File");
  1658. if (clusterMask && !streq("*", clusterMask))
  1659. findXPath.appendf("[Cluster/@name=\"%s\"]", clusterMask);
  1660. if (lfnMask && !streq("*", lfnMask))
  1661. findXPath.appendf("[@name=\"%s\"]", lfnMask);
  1662. const char *basePath = "/Files";
  1663. const char *propToSet = "@redundancy";
  1664. StringBuffer value;
  1665. value.append(redundancy);
  1666. StringBuffer clusterFilter("Cluster");
  1667. if (clusterMask && !streq("*", clusterMask))
  1668. clusterFilter.appendf("[@name=\"%s\"]", clusterMask);
  1669. Owned<IRemoteConnection> conn = querySDS().connect(basePath, myProcessSession(), 0, daliConnectTimeoutMs);
  1670. Owned<IPropertyTreeIterator> iter = conn->getElements(findXPath);
  1671. ForEach(*iter)
  1672. {
  1673. IPropertyTree &file = iter->query();
  1674. Owned<IPropertyTreeIterator> clusterIter = file.getElements(clusterFilter);
  1675. ForEach(*clusterIter)
  1676. {
  1677. IPropertyTree &cluster = clusterIter->query();
  1678. const char *oldValue = cluster.queryProp(propToSet);
  1679. if (!oldValue || !streq(value, oldValue))
  1680. {
  1681. const char *fileName = file.queryProp("OrigName");
  1682. const char *clusterName = cluster.queryProp("@name");
  1683. VStringBuffer msg("File=%s on cluster=%s - %s %s to %s", fileName, clusterName, dryRun?"Would set":"Setting", propToSet, value.str());
  1684. if (oldValue)
  1685. msg.appendf(" [old value = %s]", oldValue);
  1686. PROGLOG("%s", msg.str());
  1687. if (!dryRun)
  1688. cluster.setProp(propToSet, value);
  1689. }
  1690. }
  1691. }
  1692. }
  1693. static void holdlock(const char *logicalFile, const char *mode, IUserDescriptor *userDesc)
  1694. {
  1695. bool write;
  1696. if (strieq(mode, "read"))
  1697. write = false;
  1698. else if (strieq(mode, "write"))
  1699. write = true;
  1700. else
  1701. throw MakeStringException(0,"Invalid mode: %s", mode);
  1702. PROGLOG("Looking up file: %s, mode=%s", logicalFile, mode);
  1703. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(logicalFile, userDesc, write, false, false, NULL, 5000);
  1704. if (!file)
  1705. {
  1706. ERRLOG("File not found: %s", logicalFile);
  1707. return;
  1708. }
  1709. OwnedPtr<DistributedFilePropertyLock> writeLock;
  1710. if (write)
  1711. writeLock.setown(new DistributedFilePropertyLock(file));
  1712. PROGLOG("File: %s, locked, mode=%s - press a key to release", logicalFile, mode);
  1713. getchar();
  1714. }
  1715. static const char *getNum(const char *s,unsigned &num)
  1716. {
  1717. while (*s&&!isdigit(*s))
  1718. s++;
  1719. num = 0;
  1720. while (isdigit(*s)) {
  1721. num = num*10+*s-'0';
  1722. s++;
  1723. }
  1724. return s;
  1725. }
  1726. static void displayGraphTiming(const char * name, unsigned time)
  1727. {
  1728. unsigned gn;
  1729. const char *s = getNum(name,gn);
  1730. unsigned sn;
  1731. s = getNum(s,sn);
  1732. if (gn&&sn) {
  1733. const char *gs = strchr(name,'(');
  1734. unsigned gid = 0;
  1735. if (gs)
  1736. getNum(gs+1,gid);
  1737. OUTLOG("\"%s\",%d,%d,%d,%d,%d",name,gn,sn,gid,time,(time/60000));
  1738. }
  1739. }
  1740. static void workunittimings(const char *wuid)
  1741. {
  1742. StringBuffer path;
  1743. path.append("/WorkUnits/").append(wuid);
  1744. Owned<IRemoteConnection> conn = querySDS().connect(path, myProcessSession(), 0, daliConnectTimeoutMs);
  1745. if (!conn) {
  1746. ERRLOG("WU %s not found",wuid);
  1747. return;
  1748. }
  1749. IPropertyTree *wu = conn->queryRoot();
  1750. StringBuffer name;
  1751. outln("Name,graph,sub,gid,time ms,time min");
  1752. if (wu->hasProp("Statistics"))
  1753. {
  1754. Owned<IPropertyTreeIterator> iter = wu->getElements("Statistics/Statistic");
  1755. ForEach(*iter)
  1756. {
  1757. if (iter->query().getProp("@desc",name.clear()))
  1758. {
  1759. if ((name.length()>11)&&(memcmp("Graph graph",name.str(),11)==0))
  1760. {
  1761. unsigned time = (unsigned)(iter->query().getPropInt64("@value") / 1000000);
  1762. displayGraphTiming(name.str(), time);
  1763. }
  1764. }
  1765. }
  1766. }
  1767. else
  1768. {
  1769. Owned<IPropertyTreeIterator> iter = wu->getElements("Timings/Timing");
  1770. ForEach(*iter)
  1771. {
  1772. if (iter->query().getProp("@name",name.clear()))
  1773. {
  1774. if ((name.length()>11)&&(memcmp("Graph graph",name.str(),11)==0))
  1775. {
  1776. unsigned time = iter->query().getPropInt("@duration");
  1777. displayGraphTiming(name.str(), time);
  1778. }
  1779. }
  1780. }
  1781. }
  1782. }
  1783. //=============================================================================
  1784. static void serverlist(const char *mask)
  1785. {
  1786. Owned<IRemoteConnection> conn = querySDS().connect( "/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  1787. if (!conn)
  1788. throw MakeStringException(0,"Failed to connect to Environment/Software");
  1789. IPropertyTree* root = conn->queryRoot();
  1790. Owned<IPropertyTreeIterator> services= root->getElements("*");
  1791. ForEach(*services) {
  1792. IPropertyTree& t = services->query();
  1793. const char *name = t.queryName();
  1794. if (name) {
  1795. if (!mask||!*mask||WildMatch(name,mask)) {
  1796. Owned<IPropertyTreeIterator> insts = t.getElements("Instance");
  1797. ForEach(*insts) {
  1798. StringBuffer ips;
  1799. insts->query().getProp("@netAddress",ips);
  1800. StringBuffer dir;
  1801. insts->query().getProp("@directory",dir);
  1802. OUTLOG("%s,%s,%s",name,ips.str(),dir.str());
  1803. }
  1804. }
  1805. }
  1806. }
  1807. }
  1808. //=============================================================================
  1809. static void clusterlist(const char *mask)
  1810. {
  1811. Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  1812. if (!conn)
  1813. throw MakeStringException(0,"Failed to connect to Environment/Software");
  1814. IPropertyTree* root = conn->queryRoot();
  1815. Owned<IPropertyTreeIterator> clusters;
  1816. clusters.setown(root->getElements("ThorCluster"));
  1817. ForEach(*clusters) {
  1818. }
  1819. clusters.setown(root->getElements("RoxieCluster"));
  1820. ForEach(*clusters) {
  1821. }
  1822. clusters.setown(root->getElements("EclAgentProcess"));
  1823. ForEach(*clusters) {
  1824. }
  1825. }
  1826. static unsigned clustersToGroups(IPropertyTree *envroot,const StringArray &cmplst,StringArray &cnames,StringArray &groups,bool *done)
  1827. {
  1828. if (!envroot)
  1829. return 0;
  1830. for (int roxie=0;roxie<2;roxie++) {
  1831. Owned<IPropertyTreeIterator> clusters= envroot->getElements(roxie?"RoxieCluster":"ThorCluster");
  1832. unsigned ret = 0;
  1833. ForEach(*clusters) {
  1834. IPropertyTree &cluster = clusters->query();
  1835. const char *name = cluster.queryProp("@name");
  1836. if (name&&*name) {
  1837. ForEachItemIn(i,cmplst) {
  1838. const char *s = cmplst.item(i);
  1839. assertex(s);
  1840. if ((strcmp(s,"*")==0)||WildMatch(name,s,true)) {
  1841. const char *group = cluster.queryProp("@nodeGroup");
  1842. if (!group||!*group)
  1843. group = name;
  1844. bool found = false;
  1845. ForEachItemIn(j,groups)
  1846. if (strcmp(groups.item(j),group)==0)
  1847. found = true;
  1848. if (!found) {
  1849. cnames.append(name);
  1850. groups.append(group);
  1851. if (done)
  1852. done[i] =true;
  1853. break;
  1854. }
  1855. }
  1856. }
  1857. }
  1858. }
  1859. }
  1860. return groups.ordinality();
  1861. }
  1862. static void clusterlist()
  1863. {
  1864. Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, daliConnectTimeoutMs);
  1865. if (!conn) {
  1866. ERRLOG("Could not connect to /Environment/Software");
  1867. return;
  1868. }
  1869. StringArray list;
  1870. list.append("*");
  1871. StringArray groups;
  1872. StringArray cnames;
  1873. bool *done = (bool *)calloc(list.ordinality(),sizeof(bool));
  1874. clustersToGroups(conn->queryRoot(),list,cnames,groups,done);
  1875. free(done);
  1876. ForEachItemIn(i,cnames)
  1877. OUTLOG("%s,%s",cnames.item(i),groups.item(i));
  1878. }
  1879. //=============================================================================
  1880. static void auditlog(const char *froms, const char *tos, const char *matchs)
  1881. {
  1882. CDateTime from;
  1883. try {
  1884. from.setDateString(froms);
  1885. }
  1886. catch (IException *) {
  1887. ERRLOG("%s: invalid date (format YYYY-MM-DD)",froms);
  1888. throw;
  1889. }
  1890. CDateTime to;
  1891. try {
  1892. to.setDateString(tos);
  1893. }
  1894. catch (IException *) {
  1895. ERRLOG("%s: invalid date (format YYYY-MM-DD)",tos);
  1896. throw;
  1897. }
  1898. StringAttrArray res;
  1899. queryAuditLogs(from,to,matchs,res);
  1900. ForEachItemIn(i,res)
  1901. outln(res.item(i).text.get());
  1902. }
  1903. //=============================================================================
  1904. static void coalesce()
  1905. {
  1906. const char *daliDataPath = NULL;
  1907. const char *remoteBackupLocation = NULL;
  1908. Owned<IStoreHelper> iStoreHelper = createStoreHelper(NULL, daliDataPath, remoteBackupLocation, SH_External|SH_RecoverFromIncErrors);
  1909. unsigned baseEdition = iStoreHelper->queryCurrentEdition();
  1910. StringBuffer storeFilename(daliDataPath);
  1911. iStoreHelper->getCurrentStoreFilename(storeFilename);
  1912. OUTLOG("Loading store: %s", storeFilename.str());
  1913. Owned<IPropertyTree> root = createPTreeFromXMLFile(storeFilename.str());
  1914. OUTLOG("Loaded: %s", storeFilename.str());
  1915. if (baseEdition != iStoreHelper->queryCurrentEdition())
  1916. OUTLOG("Store was changed by another process prior to coalesce. Exiting.");
  1917. else
  1918. {
  1919. if (!iStoreHelper->loadDeltas(root))
  1920. OUTLOG("Nothing to coalesce");
  1921. else
  1922. iStoreHelper->saveStore(root, &baseEdition);
  1923. }
  1924. }
  1925. //=============================================================================
  1926. static void mpping(const char *eps)
  1927. {
  1928. SocketEndpoint ep(eps);
  1929. Owned<INode> node = createINode(ep);
  1930. Owned<IGroup> grp = createIGroup(1,&ep);
  1931. Owned<ICommunicator> comm = createCommunicator(grp,true);
  1932. unsigned start = msTick();
  1933. if (!comm->verifyConnection(0,60*1000))
  1934. ERRLOG("MPping %s failed",eps);
  1935. else
  1936. OUTLOG("MPping %s succeeded in %d",eps,msTick()-start);
  1937. }
  1938. //=============================================================================
  1939. static void daliping(const char *dalis,unsigned connecttime,unsigned n)
  1940. {
  1941. OUTLOG("Dali(%s) connect time: %d ms",dalis,connecttime);
  1942. if (!n)
  1943. return;
  1944. StringBuffer qname("TESTINGQ_");
  1945. SocketEndpoint ep;
  1946. ep.setLocalHost(0);
  1947. ep.getUrlStr(qname);
  1948. Owned<INamedQueueConnection> qconn;
  1949. qconn.setown(createNamedQueueConnection(0));
  1950. Owned<IQueueChannel> channel;
  1951. channel.setown(qconn->open(qname.str()));
  1952. MemoryBuffer mb;
  1953. while (channel->probe()) {
  1954. mb.clear();
  1955. channel->get(mb);
  1956. }
  1957. unsigned max = 0;
  1958. unsigned tot = 0;
  1959. for (unsigned i=0;i<=n;i++) {
  1960. mb.clear().append("Hello").append(i);
  1961. ep.serialize(mb);
  1962. unsigned start = msTick();
  1963. channel->put(mb);
  1964. channel->get(mb);
  1965. if (i) { // ignore first
  1966. unsigned t = msTick()-start;
  1967. if (t>max)
  1968. max = t;
  1969. tot += t;
  1970. OUTLOG("Dali(%s) ping %d ms",dalis,t);
  1971. if (i+1<n)
  1972. Sleep(1000);
  1973. }
  1974. }
  1975. OUTLOG("Dali(%s) ping avg = %d max = %d ms",dalis,tot/n,max);
  1976. }
  1977. //=============================================================================
  1978. static void convertBinBranch(IPropertyTree &cluster,const char *branch)
  1979. {
  1980. StringBuffer query(branch);
  1981. query.append("/data");
  1982. IPropertyTree *t;
  1983. MemoryBuffer buf;
  1984. cluster.getPropBin(query.str(),buf);
  1985. if (buf.length()) {
  1986. StringBuffer xml;
  1987. xml.append(buf.length(),buf.toByteArray());
  1988. t = createPTreeFromXMLString(xml.str());
  1989. cluster.removeProp(query.str());
  1990. cluster.addPropTree(query.str(),t);
  1991. }
  1992. }
  1993. static void getxref(const char *dst)
  1994. {
  1995. Owned<IRemoteConnection> conn = querySDS().connect("DFU/XREF",myProcessSession(),RTM_LOCK_READ, daliConnectTimeoutMs);
  1996. Owned<IPropertyTree> root = createPTreeFromIPT(conn->getRoot());
  1997. Owned<IPropertyTreeIterator> iter = root->getElements("Cluster");
  1998. ForEach(*iter) {
  1999. IPropertyTree &cluster = iter->query();
  2000. convertBinBranch(cluster,"Directories");
  2001. convertBinBranch(cluster,"Lost");
  2002. convertBinBranch(cluster,"Found");
  2003. convertBinBranch(cluster,"Orphans");
  2004. convertBinBranch(cluster,"Messages");
  2005. }
  2006. Owned<IFile> f = createIFile(dst);
  2007. Owned<IFileIO> io = f->open(IFOcreate);
  2008. Owned<IFileIOStream> fstream = createBufferedIOStream(io);
  2009. toXML(root, *fstream); // formatted (default)
  2010. OUTLOG("DFU/XREF saved in '%s'",dst);
  2011. conn->close();
  2012. }
  2013. struct CTreeItem : public CInterface
  2014. {
  2015. String *tail;
  2016. CTreeItem *parent;
  2017. unsigned index;
  2018. offset_t startOffset;
  2019. offset_t endOffset;
  2020. offset_t adjust;
  2021. bool supressidx;
  2022. CTreeItem(CTreeItem *_parent, String *_tail, unsigned _index, offset_t _startOffset)
  2023. {
  2024. parent = LINK(_parent);
  2025. startOffset = _startOffset;
  2026. endOffset = 0;
  2027. adjust = 0;
  2028. index = _index;
  2029. supressidx = true;
  2030. tail = _tail;
  2031. }
  2032. ~CTreeItem()
  2033. {
  2034. if (parent)
  2035. parent->Release();
  2036. ::Release(tail);
  2037. }
  2038. void getXPath(StringBuffer &xpath)
  2039. {
  2040. if (parent)
  2041. parent->getXPath(xpath);
  2042. xpath.append('/').append(tail->str());
  2043. if ((index!=0)||tail->IsShared())
  2044. xpath.append('[').append(index+1).append(']');
  2045. }
  2046. offset_t size() { return endOffset?(endOffset-startOffset):0; }
  2047. offset_t adjustedSize(bool &adjusted) { adjusted = (adjust!=0); return size()-adjust; }
  2048. };
  2049. class CXMLSizesParser : public CInterface
  2050. {
  2051. Owned<IPullPTreeReader> xmlReader;
  2052. PTreeReaderOptions xmlOptions;
  2053. double pc;
  2054. class CParse : implements IPTreeNotifyEvent, public CInterface
  2055. {
  2056. CIArrayOf<CTreeItem> stack;
  2057. String * levtail;
  2058. CIArrayOf<CTreeItem> arr;
  2059. unsigned limit;
  2060. __int64 totalSize;
  2061. static int _sortF(CInterface * const *_left, CInterface * const *_right)
  2062. {
  2063. CTreeItem **left = (CTreeItem **)_left;
  2064. CTreeItem **right = (CTreeItem **)_right;
  2065. offset_t leftSize = (*left)->size();
  2066. offset_t rightSize = (*right)->size();
  2067. if (rightSize > leftSize)
  2068. return +1;
  2069. else if (rightSize < leftSize)
  2070. return -1;
  2071. else
  2072. return 0;
  2073. }
  2074. public:
  2075. IMPLEMENT_IINTERFACE;
  2076. CParse(unsigned __int64 _totalSize, double limitpc) : totalSize(_totalSize)
  2077. {
  2078. levtail = NULL;
  2079. limit = (unsigned)((double)totalSize*limitpc/100.0);
  2080. }
  2081. void reset()
  2082. {
  2083. stack.kill();
  2084. }
  2085. // IPTreeNotifyEvent
  2086. virtual void beginNode(const char *tag, offset_t startOffset)
  2087. {
  2088. String *tail = levtail;
  2089. if (levtail&&(0 == strcmp(tag, levtail->str())))
  2090. tail->Link();
  2091. else
  2092. tail = new String(tag);
  2093. levtail = NULL; // opening new child
  2094. CTreeItem *parent = stack.empty()?NULL:&stack.tos();
  2095. CTreeItem *item = new CTreeItem(parent, tail, tail->getLinkCount(), startOffset);
  2096. stack.append(*item);
  2097. }
  2098. virtual void newAttribute(const char *tag, const char *value)
  2099. {
  2100. }
  2101. virtual void beginNodeContent(const char *tag)
  2102. {
  2103. }
  2104. virtual void endNode(const char *tag, unsigned length, const void *value, bool binary, offset_t endOffset)
  2105. {
  2106. CTreeItem *tos = &stack.tos();
  2107. assertex(tos);
  2108. tos->endOffset = endOffset;
  2109. bool adjusted;
  2110. offset_t sz = tos->adjustedSize(adjusted);
  2111. if (sz>=limit)
  2112. {
  2113. CTreeItem *parent = tos->parent;
  2114. while (parent) {
  2115. parent->adjust += sz;
  2116. parent = parent->parent;
  2117. }
  2118. tos->Link();
  2119. arr.append(*tos);
  2120. levtail = tos->tail;
  2121. }
  2122. else
  2123. levtail = NULL;
  2124. stack.pop();
  2125. }
  2126. void printFullResults()
  2127. {
  2128. arr.sort(_sortF);
  2129. ForEachItemIn(m, arr)
  2130. {
  2131. CTreeItem &match = arr.item(m);
  2132. StringBuffer xpath;
  2133. match.getXPath(xpath);
  2134. printf("xpath=%s, size=%" I64F "d\n", xpath.str(), match.size());
  2135. }
  2136. }
  2137. void printResultTree()
  2138. {
  2139. if (!totalSize)
  2140. return;
  2141. StringBuffer res;
  2142. ForEachItemIn(i, arr) {
  2143. CTreeItem &item = arr.item(i);
  2144. bool adjusted;
  2145. offset_t sz = item.adjustedSize(adjusted);
  2146. if (sz>=limit) {
  2147. res.clear();
  2148. item.getXPath(res);
  2149. if (adjusted)
  2150. res.append(" (rest)");
  2151. res.padTo(40);
  2152. res.appendf(" %10" I64F "d(%5.2f%%)",sz,((float)sz*100.0)/(float)totalSize);
  2153. printf("%s\n",res.str());
  2154. }
  2155. }
  2156. }
  2157. } *parser;
  2158. public:
  2159. CXMLSizesParser(const char *fName, PTreeReaderOptions _xmlOptions=ptr_none, double _pc=1.0) : xmlOptions(_xmlOptions), pc(_pc) { go(fName); }
  2160. ~CXMLSizesParser() { ::Release(parser); }
  2161. void go(const char *fName)
  2162. {
  2163. OwnedIFile ifile = createIFile(fName);
  2164. OwnedIFileIO ifileio = ifile->open(IFOread);
  2165. if (!ifileio)
  2166. throw MakeStringException(0, "Failed to open: %s", ifile->queryFilename());
  2167. parser = new CParse(ifileio->size(), pc);
  2168. Owned<IIOStream> stream = createIOStream(ifileio);
  2169. xmlReader.setown(createPullXMLStreamReader(*stream, *parser, xmlOptions));
  2170. }
  2171. void printResultTree()
  2172. {
  2173. parser->printResultTree();
  2174. }
  2175. virtual bool next()
  2176. {
  2177. return xmlReader->next();
  2178. }
  2179. virtual void reset()
  2180. {
  2181. parser->reset();
  2182. xmlReader->reset();
  2183. }
  2184. };
  2185. static void xmlSize(const char *filename, double pc)
  2186. {
  2187. try
  2188. {
  2189. OwnedIFile iFile = createIFile(filename);
  2190. if (!iFile->exists())
  2191. OUTLOG("File '%s' not found", filename);
  2192. else
  2193. {
  2194. Owned<CXMLSizesParser> parser = new CXMLSizesParser((filename&&*filename)?filename:"dalisds.xml", ptr_none, pc);
  2195. while (parser->next())
  2196. ;
  2197. parser->printResultTree();
  2198. }
  2199. }
  2200. catch (IException *e)
  2201. {
  2202. pexception("xmlSize", e);
  2203. e->Release();
  2204. }
  2205. }
  2206. //=============================================================================
  2207. static bool begins(const char *&ln,const char *pat)
  2208. {
  2209. size32_t sz = strlen(pat);
  2210. if (memicmp(ln,pat,sz)==0) {
  2211. ln += sz;
  2212. return true;
  2213. }
  2214. return false;
  2215. }
  2216. static void dalilocks(const char *ipPattern, bool files)
  2217. {
  2218. Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(ipPattern, files ? "/Files/*" : NULL);
  2219. bool headers = true;
  2220. CDfsLogicalFileName dlfn;
  2221. for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
  2222. {
  2223. ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
  2224. if (files)
  2225. {
  2226. if (!dlfn.setFromXPath(lockInfo.queryXPath()))
  2227. continue;
  2228. }
  2229. if (0 == lockInfo.queryConnections())
  2230. continue;
  2231. StringBuffer lockFormat;
  2232. lockInfo.toString(lockFormat, 1, headers, files ? dlfn.get() : NULL);
  2233. headers = false;
  2234. PROGLOG("%s", lockFormat.str());
  2235. }
  2236. if (headers) // if still true, no locks matched
  2237. {
  2238. printf("No lock(s) found\n");
  2239. return;
  2240. }
  2241. }
  2242. //=============================================================================
  2243. static void unlock(const char *pattern, bool files)
  2244. {
  2245. Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(NULL, files ? "/Files/*" : pattern);
  2246. for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
  2247. {
  2248. ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
  2249. bool match = false;
  2250. if (files)
  2251. {
  2252. CDfsLogicalFileName dlfn;
  2253. dlfn.setAllowWild(true);
  2254. if (dlfn.setFromXPath(lockInfo.queryXPath()))
  2255. match = WildMatch(dlfn.get(), pattern);
  2256. }
  2257. else
  2258. match = WildMatch(lockInfo.queryXPath(), pattern);
  2259. if (match)
  2260. {
  2261. for (unsigned c=0; c<lockInfo.queryConnections(); c++)
  2262. {
  2263. ConnectionId connectionId = lockInfo.queryLockData(c).connectionId;
  2264. bool disconnect = false; // TBD?
  2265. MemoryBuffer mb;
  2266. mb.append("unlock").append(connectionId).append(disconnect);
  2267. getDaliDiagnosticValue(mb);
  2268. bool success;
  2269. mb.read(success);
  2270. if (!success)
  2271. PROGLOG("Lock %" I64F "x not found",connectionId);
  2272. else
  2273. {
  2274. StringBuffer connectionInfo;
  2275. mb.read(connectionInfo);
  2276. PROGLOG("Lock %" I64F "x successfully removed: %s", connectionId, connectionInfo.str());
  2277. }
  2278. }
  2279. }
  2280. }
  2281. }
  2282. static void dumpWorkunit(const char *wuid, bool includeProgress)
  2283. {
  2284. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2285. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
  2286. exportWorkUnitToXMLFile(workunit, "stdout:", 0, true, includeProgress, true, false);
  2287. }
  2288. static void dumpProgress(const char *wuid, const char * graph)
  2289. {
  2290. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2291. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
  2292. if (!workunit)
  2293. return;
  2294. Owned<IConstWUGraphProgress> progress = workunit->getGraphProgress(graph);
  2295. if (!progress)
  2296. return;
  2297. Owned<IPropertyTree> tree = progress->getProgressTree();
  2298. saveXML("stdout:", tree);
  2299. }
  2300. static const char * checkDash(const char * s)
  2301. {
  2302. //Supplying * on the command line is a pain because it needs quoting. Allow - instead.
  2303. if (streq(s, ".") || streq(s, "-"))
  2304. return "*";
  2305. return s;
  2306. }
  2307. static void dumpStats(IConstWorkUnit * workunit, const StatisticsFilter & filter, bool csv)
  2308. {
  2309. Owned<IConstWUStatisticIterator> stats = &workunit->getStatistics(&filter);
  2310. if (!csv)
  2311. printf("<Statistics wuid=\"%s\">\n", workunit->queryWuid());
  2312. ForEach(*stats)
  2313. {
  2314. IConstWUStatistic & cur = stats->query();
  2315. StringBuffer xml;
  2316. SCMStringBuffer curCreator;
  2317. SCMStringBuffer curScope;
  2318. SCMStringBuffer curDescription;
  2319. SCMStringBuffer curFormattedValue;
  2320. StatisticCreatorType curCreatorType = cur.getCreatorType();
  2321. StatisticScopeType curScopeType = cur.getScopeType();
  2322. StatisticMeasure curMeasure = cur.getMeasure();
  2323. StatisticKind curKind = cur.getKind();
  2324. unsigned __int64 value = cur.getValue();
  2325. unsigned __int64 count = cur.getCount();
  2326. unsigned __int64 max = cur.getMax();
  2327. unsigned __int64 ts = cur.getTimestamp();
  2328. cur.getCreator(curCreator);
  2329. cur.getScope(curScope);
  2330. cur.getDescription(curDescription, false);
  2331. cur.getFormattedValue(curFormattedValue);
  2332. if (csv)
  2333. {
  2334. xml.append(workunit->queryWuid());
  2335. xml.append(",");
  2336. if (curCreatorType != SCTnone)
  2337. xml.append(queryCreatorTypeName(curCreatorType));
  2338. xml.append(",");
  2339. if (curCreator.length())
  2340. xml.append(curCreator.str());
  2341. xml.append(",");
  2342. if (curScopeType != SSTnone)
  2343. xml.append(queryScopeTypeName(curScopeType));
  2344. xml.append(",");
  2345. if (curScope.length())
  2346. xml.append(curScope.str());
  2347. xml.append(",");
  2348. if (curMeasure != SMeasureNone)
  2349. xml.append(queryMeasureName(curMeasure));
  2350. xml.append(",");
  2351. if (curKind != StKindNone)
  2352. xml.append(queryStatisticName(curKind));
  2353. xml.append(",");
  2354. xml.append(value);
  2355. xml.append(",");
  2356. xml.append(curFormattedValue);
  2357. xml.append(",");
  2358. if (count != 1)
  2359. xml.append(count);
  2360. xml.append(",");
  2361. if (max)
  2362. xml.append(max);
  2363. xml.append(",");
  2364. if (ts)
  2365. formatStatistic(xml, ts, SMeasureTimestampUs);
  2366. xml.append(",");
  2367. if (curDescription.length())
  2368. xml.append('"').append(curDescription.str()).append('"');
  2369. printf("%s\n", xml.str());
  2370. }
  2371. else
  2372. {
  2373. if (curCreatorType != SCTnone)
  2374. xml.append("<ctype>").append(queryCreatorTypeName(curCreatorType)).append("</ctype>");
  2375. if (curCreator.length())
  2376. xml.append("<creator>").append(curCreator.str()).append("</creator>");
  2377. if (curScopeType != SSTnone)
  2378. xml.append("<stype>").append(queryScopeTypeName(curScopeType)).append("</stype>");
  2379. if (curScope.length())
  2380. xml.append("<scope>").append(curScope.str()).append("</scope>");
  2381. if (curMeasure != SMeasureNone)
  2382. xml.append("<unit>").append(queryMeasureName(curMeasure)).append("</unit>");
  2383. if (curKind != StKindNone)
  2384. xml.append("<kind>").append(queryStatisticName(curKind)).append("</kind>");
  2385. xml.append("<rawvalue>").append(value).append("</rawvalue>");
  2386. xml.append("<value>").append(curFormattedValue).append("</value>");
  2387. if (count != 1)
  2388. xml.append("<count>").append(count).append("</count>");
  2389. if (max)
  2390. xml.append("<max>").append(value).append("</max>");
  2391. if (ts)
  2392. {
  2393. xml.append("<ts>");
  2394. formatStatistic(xml, ts, SMeasureTimestampUs);
  2395. xml.append("</ts>");
  2396. }
  2397. if (curDescription.length())
  2398. xml.append("<desc>").append(curDescription.str()).append("</desc>");
  2399. printf("<stat>%s</stat>\n", xml.str());
  2400. }
  2401. }
  2402. if (!csv)
  2403. printf("</Statistics>\n");
  2404. }
  2405. static void dumpStats(const char *wuid, const char * creatorTypeText, const char * creator, const char * scopeTypeText, const char * scope, const char * kindText, const char * userFilter, bool csv)
  2406. {
  2407. StatisticsFilter filter(checkDash(creatorTypeText), checkDash(creator), checkDash(scopeTypeText), checkDash(scope), NULL, checkDash(kindText));
  2408. if (userFilter)
  2409. filter.setFilter(userFilter);
  2410. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2411. const char * star = strchr(wuid, '*');
  2412. if (star)
  2413. {
  2414. WUSortField filters[2];
  2415. MemoryBuffer filterbuf;
  2416. filters[0] = WUSFwildwuid;
  2417. filterbuf.append(wuid);
  2418. filters[1] = WUSFterm;
  2419. Owned<IConstWorkUnitIterator> iter = factory->getWorkUnitsSorted((WUSortField) (WUSFwuid), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL);
  2420. ForEach(*iter)
  2421. {
  2422. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(iter->query().queryWuid());
  2423. if (workunit)
  2424. dumpStats(workunit, filter, csv);
  2425. }
  2426. }
  2427. else
  2428. {
  2429. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
  2430. if (!workunit)
  2431. return;
  2432. dumpStats(workunit, filter, csv);
  2433. }
  2434. }
  2435. static void wuidCompress(const char *match, const char *type, bool compress)
  2436. {
  2437. if (0 != stricmp("graph", type))
  2438. {
  2439. WARNLOG("Currently, only type=='graph' supported.");
  2440. return;
  2441. }
  2442. Owned<IRemoteConnection> conn = querySDS().connect("/WorkUnits", myProcessSession(), 0, daliConnectTimeoutMs);
  2443. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2444. Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(match?match:"*", iptiter_remote);
  2445. ForEach(*iter)
  2446. {
  2447. const char *wuid = iter->query().queryName();
  2448. IConstWorkUnit &wu = *factory->openWorkUnit(wuid);
  2449. StringArray graphNames;
  2450. Owned<IConstWUGraphIterator> graphIter = &wu.getGraphs(GraphTypeAny);
  2451. ForEach(*graphIter)
  2452. {
  2453. SCMStringBuffer graphName;
  2454. IConstWUGraph &graph = graphIter->query();
  2455. Owned<IPropertyTree> xgmml = graph.getXGMMLTreeRaw();
  2456. if (compress != xgmml->hasProp("graphBin"))
  2457. {
  2458. graph.getName(graphName);
  2459. graphNames.append(graphName.s.str());
  2460. }
  2461. }
  2462. }
  2463. }
  2464. static void validateStore(bool fix, bool deleteFiles, bool verbose)
  2465. {
  2466. /*
  2467. * Place holder for client-side dali store verification/validation. Currently performs:
  2468. * 1) validates GeneratedDll entries correspond to current workunits (see HPCC-9146)
  2469. */
  2470. CTimeMon totalTime, ts;
  2471. PROGLOG("Gathering list of workunits");
  2472. Owned<IRemoteConnection> conn = querySDS().connect("/WorkUnits", myProcessSession(), RTM_LOCK_READ, 10000);
  2473. if (!conn)
  2474. throw MakeStringException(0, "Failed to connect to /WorkUnits");
  2475. AtomRefTable wuids;
  2476. Owned<IPropertyTreeIterator> wuidIter = conn->queryRoot()->getElements("*");
  2477. ForEach(*wuidIter)
  2478. {
  2479. IPropertyTree &wuid = wuidIter->query();
  2480. wuids.queryCreate(wuid.queryName());
  2481. }
  2482. PROGLOG("%d workunits gathered. Took %d ms", wuids.count(), ts.elapsed());
  2483. ts.reset(0);
  2484. StringArray uidsToDelete;
  2485. UnsignedArray indexToDelete;
  2486. PROGLOG("Gathering associated files");
  2487. conn.setown(querySDS().connect("/GeneratedDlls", myProcessSession(), fix?RTM_LOCK_WRITE:RTM_LOCK_READ, 10000));
  2488. if (!conn)
  2489. {
  2490. PROGLOG("No generated DLLs associated with any workunit.\nExit. Took %d ms", ts.elapsed());
  2491. return;
  2492. }
  2493. IPropertyTree *root = conn->queryRoot()->queryBranch(NULL); // force all to download
  2494. Owned<IPropertyTreeIterator> gdIter = root->getElements("*");
  2495. RegExpr RE("^.*{W2[0-9][0-9][0-9][0-9][0-9][0-9][0-9]-[0-9][0-9][0-9][0-9][0-9][0-9]{-[0-9]+}?}{[^0-9].*|}$");
  2496. unsigned index=1;
  2497. ForEach(*gdIter)
  2498. {
  2499. IPropertyTree &gd = gdIter->query();
  2500. const char *name = gd.queryProp("@name");
  2501. if (name && *name)
  2502. {
  2503. if (RE.find(name))
  2504. {
  2505. StringBuffer wuid;
  2506. RE.substitute(wuid,"#1");
  2507. const char *w = wuid.str();
  2508. bool found = NULL != wuids.find(*w);
  2509. const char *uid = gd.queryProp("@uid");
  2510. if (!found)
  2511. {
  2512. uidsToDelete.append(uid);
  2513. indexToDelete.append(index);
  2514. }
  2515. }
  2516. }
  2517. ++index;
  2518. }
  2519. PROGLOG("%d out of %d workunit files not associated with any workunit. Took %d ms", indexToDelete.ordinality(), index, ts.elapsed());
  2520. ts.reset(0);
  2521. IArrayOf<IDllEntry> removedEntries;
  2522. unsigned numDeleted = 0;
  2523. ForEachItemInRev(d, indexToDelete)
  2524. {
  2525. const char *uid = uidsToDelete.item(d);
  2526. unsigned index = indexToDelete.item(d);
  2527. StringBuffer path("GeneratedDll[");
  2528. path.append(index).append("]");
  2529. IPropertyTree *gd = root->queryPropTree(path.str());
  2530. if (NULL == gd)
  2531. throwUnexpected();
  2532. const char *uidQuery = gd->queryProp("@uid");
  2533. if (0 != strcmp(uid, uidQuery))
  2534. throw MakeStringException(0, "Expecting uid=%s @ GeneratedDll[%d], but found uid=%s", uid, index, uidQuery);
  2535. if (verbose)
  2536. PROGLOG("Removing: %s, uid=%s", path.str(), uid);
  2537. if (fix)
  2538. {
  2539. Owned<IDllEntry> entry = queryDllServer().createEntry(root, gd);
  2540. entry->remove(false, false); // NB: This will remove child 'gd' element from root (GeneratedDlls)
  2541. if (deleteFiles) // delay until after meta info removed and /GeneratedDlls unlocked
  2542. removedEntries.append(*entry.getClear());
  2543. }
  2544. ++numDeleted;
  2545. }
  2546. if (fix)
  2547. {
  2548. conn->commit();
  2549. PROGLOG("Removed %d unassociated file entries. Took %d ms", numDeleted, ts.elapsed());
  2550. ts.reset(0);
  2551. if (deleteFiles)
  2552. {
  2553. PROGLOG("Deleting physical files..");
  2554. ForEachItemIn(r, removedEntries)
  2555. {
  2556. IDllEntry &entry = removedEntries.item(r);
  2557. PROGLOG("Removing files for: %s", entry.queryName());
  2558. entry.remove(true, false);
  2559. }
  2560. PROGLOG("Removed physical files. Took %d ms", ts.elapsed());
  2561. }
  2562. }
  2563. else
  2564. PROGLOG("%d unassociated file entries to remove - use 'fix=true'", numDeleted);
  2565. PROGLOG("Done time = %d secs", totalTime.elapsed()/1000);
  2566. }
  2567. //=============================================================================
  2568. static void migrateFiles(const char *srcGroup, const char *tgtGroup, const char *filemask, const char *_options)
  2569. {
  2570. if (strieq(srcGroup, tgtGroup))
  2571. throw makeStringExceptionV(0, "source and target cluster groups cannot be the same! cluster = %s", srcGroup);
  2572. enum class mg_options : unsigned { nop, createmaps=1, listonly=2, dryrun=4, verbose=8};
  2573. StringArray options;
  2574. options.appendList(_options, ",");
  2575. mg_options opts = mg_options::nop;
  2576. ForEachItemIn(o, options)
  2577. {
  2578. const char *opt = options.item(o);
  2579. if (strieq("CREATEMAPS", opt))
  2580. opts = (mg_options)((unsigned)opts | (unsigned)mg_options::createmaps);
  2581. else if (strieq("LISTONLY", opt))
  2582. opts = (mg_options)((unsigned)opts | (unsigned)mg_options::listonly);
  2583. else if (strieq("DRYRUN", opt))
  2584. opts = (mg_options)((unsigned)opts | (unsigned)mg_options::dryrun);
  2585. else if (strieq("VERBOSE", opt))
  2586. opts = (mg_options)((unsigned)opts | (unsigned)mg_options::verbose);
  2587. else
  2588. WARNLOG("Unknown option: %s", opt);
  2589. }
  2590. /*
  2591. * CMatchScanner scans logical files, looking for files that are in the source group
  2592. * and matching against the logical file names against filemask.
  2593. * Then (depending on options) manipulates the meta data to point to new target group
  2594. * and outputs a file per node of the source group, with a list of all matching
  2595. * physical files in the format: srcIP,dstIP,physical file
  2596. */
  2597. class CMatchScanner : public CSDSFileScanner
  2598. {
  2599. StringAttr srcGroup, tgtGroup;
  2600. mg_options options;
  2601. StringBuffer tgtClusterGroupText;
  2602. Owned<IGroup> srcClusterGroup, tgtClusterGroup;
  2603. IPointerArrayOf<IFileIOStream> fileLists;
  2604. unsigned matchingFiles = 0;
  2605. Linked<IRemoteConnection> conn;
  2606. StringAttr filemask;
  2607. bool wild = false;
  2608. unsigned srcClusterSize = 0;
  2609. unsigned tgtClusterSize = 0;
  2610. bool mgOpt(mg_options o)
  2611. {
  2612. return ((unsigned)o & (unsigned)options);
  2613. }
  2614. IFileIOStream *getFileIOStream(unsigned p)
  2615. {
  2616. while (fileLists.ordinality()<=p)
  2617. fileLists.append(nullptr);
  2618. Linked<IFileIOStream> stream = fileLists.item(p);
  2619. if (nullptr == stream)
  2620. {
  2621. VStringBuffer filePartList("fileparts%u_%s_%u.lst", GetCurrentProcessId(), srcGroup.get(), p);
  2622. Owned<IFile> iFile = createIFile(filePartList);
  2623. Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
  2624. if (!iFileIO)
  2625. throw makeStringExceptionV(0, "Failed to open: %s", filePartList.str());
  2626. stream.setown(createBufferedIOStream(iFileIO));
  2627. fileLists.replace(stream.getLink(), p);
  2628. }
  2629. return stream.getClear();
  2630. }
  2631. unsigned find(IGroup *group, const IpAddress &ip) const
  2632. {
  2633. unsigned c = group->ordinality();
  2634. for (unsigned i=0; i<c; i++)
  2635. {
  2636. const IpAddress &nodeIP = group->queryNode(i).endpoint();
  2637. if (ip.ipequals(nodeIP))
  2638. return i;
  2639. }
  2640. return NotFound;
  2641. }
  2642. public:
  2643. CMatchScanner(const char *_srcGroup, const char *_tgtGroup, mg_options _options) : srcGroup(_srcGroup), tgtGroup(_tgtGroup), options(_options)
  2644. {
  2645. srcClusterGroup.setown(queryNamedGroupStore().lookup(srcGroup));
  2646. if (!srcClusterGroup)
  2647. throw makeStringExceptionV(0, "Could not find source cluster group: %s", _srcGroup);
  2648. tgtClusterGroup.setown(queryNamedGroupStore().lookup(tgtGroup));
  2649. if (!tgtClusterGroup)
  2650. throw makeStringExceptionV(0, "Could not find target cluster group: %s", _tgtGroup);
  2651. srcClusterSize = srcClusterGroup->ordinality();
  2652. tgtClusterSize = tgtClusterGroup->ordinality();
  2653. if (tgtClusterSize>srcClusterSize)
  2654. throw makeStringExceptionV(0, "Unsupported - target cluster is wider than source (target size=%u, source size=%u", tgtClusterSize, srcClusterSize);
  2655. if (0 != (srcClusterSize%tgtClusterSize))
  2656. throw makeStringExceptionV(0, "Unsupported - target cluster must be a factor of source cluster size (target size=%u, source size=%u", tgtClusterSize, srcClusterSize);
  2657. tgtClusterGroup->getText(tgtClusterGroupText);
  2658. }
  2659. virtual bool checkFileOk(IPropertyTree &file, const char *filename) override
  2660. {
  2661. const char *group = file.queryProp("@group");
  2662. if (!group)
  2663. {
  2664. if (mgOpt(mg_options::verbose))
  2665. PROGLOG("No group defined - filename=%s, mask=%s, srcGroup=%s", filename, filemask.get(), srcGroup.get());
  2666. return false;
  2667. }
  2668. else if (nullptr == strstr(file.queryProp("@group"), srcGroup)) // crude match, could be rejected in processFile
  2669. {
  2670. if (mgOpt(mg_options::verbose))
  2671. PROGLOG("GROUP-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
  2672. return false;
  2673. }
  2674. else if (wild)
  2675. {
  2676. if (WildMatch(filename, filemask, false))
  2677. {
  2678. if (mgOpt(mg_options::verbose))
  2679. PROGLOG("WILD-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
  2680. return true;
  2681. }
  2682. }
  2683. else if (strieq(filename, filemask))
  2684. return true;
  2685. if (mgOpt(mg_options::verbose))
  2686. PROGLOG("EXACT-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
  2687. return false;
  2688. }
  2689. virtual bool checkScopeOk(const char *scopename) override
  2690. {
  2691. if (mgOpt(mg_options::verbose))
  2692. PROGLOG("Processing scope %s", scopename);
  2693. return true;
  2694. }
  2695. virtual void processFile(IPropertyTree &root, StringBuffer &name) override
  2696. {
  2697. try
  2698. {
  2699. bool doCommit = false;
  2700. StringBuffer _tgtClusterGroupText = tgtClusterGroupText;
  2701. Owned<IFileDescriptor> fileDesc = deserializeFileDescriptorTree(&root, &queryNamedGroupStore());
  2702. unsigned numClusters = fileDesc->numClusters();
  2703. for (unsigned clusterNum=0; clusterNum<numClusters; clusterNum++)
  2704. {
  2705. StringBuffer srcFileGroup;
  2706. fileDesc->getClusterGroupName(clusterNum, srcFileGroup);
  2707. StringBuffer srcFileGroupName, srcFileGroupRange;
  2708. if (!decodeChildGroupName(srcFileGroup, srcFileGroupName, srcFileGroupRange))
  2709. srcFileGroupName.append(srcFileGroup);
  2710. if (streq(srcFileGroupName, srcGroup))
  2711. {
  2712. IGroup *srcFileClusterGroup = fileDesc->queryClusterGroup(clusterNum);
  2713. unsigned srcFileClusterGroupWidth = srcFileClusterGroup->ordinality();
  2714. StringBuffer _tgtGroup(tgtGroup);
  2715. unsigned groupOffset = NotFound;
  2716. if (srcFileGroupRange.length())
  2717. {
  2718. SocketEndpointArray epas;
  2719. UnsignedArray dstPositions;
  2720. Owned<INodeIterator> nodeIter = srcFileClusterGroup->getIterator();
  2721. ForEach(*nodeIter)
  2722. {
  2723. const IpAddress &ip = nodeIter->query().endpoint();
  2724. unsigned srcRelPos = find(srcClusterGroup, ip);
  2725. if (NotFound == groupOffset)
  2726. groupOffset = srcRelPos;
  2727. unsigned dstRelPos = srcRelPos % tgtClusterSize;
  2728. dstPositions.append(dstRelPos);
  2729. }
  2730. StringBuffer rangeText;
  2731. encodeChildGroupRange(dstPositions, rangeText);
  2732. _tgtGroup.append(rangeText);
  2733. }
  2734. else
  2735. groupOffset = 0;
  2736. unsigned numParts = fileDesc->numParts();
  2737. PROGLOG("Processing file %s (width=%u), cluster group=%s (%u of %u), new group = %s", name.str(), numParts, srcFileGroup.str(), clusterNum+1, numClusters, _tgtGroup.str());
  2738. if (!mgOpt(mg_options::listonly))
  2739. {
  2740. if (!mgOpt(mg_options::dryrun))
  2741. {
  2742. doCommit = true;
  2743. VStringBuffer clusterXPath("Cluster[%u]", clusterNum+1);
  2744. IPropertyTree *cluster = root.queryPropTree(clusterXPath);
  2745. root.setProp("@group", _tgtGroup);
  2746. if (cluster)
  2747. cluster->setProp("@name", _tgtGroup);
  2748. else
  2749. WARNLOG("No Cluster found for file: %s", name.str());
  2750. }
  2751. if (mgOpt(mg_options::createmaps))
  2752. {
  2753. for (unsigned partNum=0; partNum<numParts; partNum++)
  2754. {
  2755. unsigned r = partNum % srcFileClusterGroupWidth;
  2756. const SocketEndpoint &srcEp = srcFileClusterGroup->queryNode(r).endpoint();
  2757. unsigned relPos = find(srcClusterGroup, srcEp);
  2758. unsigned dstPos = (partNum+groupOffset) % tgtClusterSize;
  2759. const SocketEndpoint &tgtEp = tgtClusterGroup->queryNode(dstPos).endpoint();
  2760. // output srcIP, dstIP, path/file-part-name >> script<N>.lst
  2761. Owned<IFileIOStream> iFileIOStream = getFileIOStream(relPos+1);
  2762. StringBuffer outputLine;
  2763. srcEp.getIpText(outputLine);
  2764. outputLine.append(",");
  2765. tgtEp.getIpText(outputLine);
  2766. outputLine.append(",");
  2767. IPartDescriptor *part = fileDesc->queryPart(partNum);
  2768. StringBuffer filePath;
  2769. part->getPath(filePath);
  2770. outputLine.append(filePath);
  2771. outputLine.newline();
  2772. iFileIOStream->write(outputLine.length(), outputLine.str());
  2773. }
  2774. }
  2775. }
  2776. }
  2777. }
  2778. ++matchingFiles;
  2779. if (doCommit)
  2780. conn->commit(); // NB: the scanner rolls back any changes, mainly to reduce cost/exposure to previously lazy fetched scope branches
  2781. }
  2782. catch (IException *e)
  2783. {
  2784. VStringBuffer errorMsg("Failed to process file : %s", name.str());
  2785. EXCLOG(e, errorMsg.str());
  2786. e->Release();
  2787. }
  2788. }
  2789. unsigned scan(IRemoteConnection *_conn, const char *_filemask, bool includefiles=true, bool includesuper=false)
  2790. {
  2791. filemask.set(_filemask);
  2792. conn.set(_conn);
  2793. wild = containsWildcard(_filemask);
  2794. CSDSFileScanner::scan(_conn, includefiles, includesuper);
  2795. return matchingFiles;
  2796. }
  2797. } scanner(srcGroup, tgtGroup, opts);
  2798. IUserDescriptor *user = nullptr;
  2799. Owned<IRemoteConnection> conn = querySDS().connect("/Files", myProcessSession(), 0, 100000);
  2800. bool success=false;
  2801. unsigned matchingFiles=0;
  2802. try
  2803. {
  2804. matchingFiles = scanner.scan(conn, filemask, true, false);
  2805. success=true;
  2806. }
  2807. catch (IException *e)
  2808. {
  2809. EXCLOG(e, nullptr);
  2810. e->Release();
  2811. }
  2812. if (!success)
  2813. {
  2814. WARNLOG("Failed to make changes");
  2815. conn->rollback();
  2816. }
  2817. else if ((unsigned)opts & (unsigned)mg_options::dryrun)
  2818. {
  2819. conn->rollback();
  2820. WARNLOG("Dry-run, no changes committed. %u files matched", matchingFiles);
  2821. }
  2822. else
  2823. PROGLOG("Committed changes: %u files changed", matchingFiles);
  2824. }
  2825. //=============================================================================
  2826. void testThorRunningWUs()
  2827. {
  2828. Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
  2829. if (conn.get())
  2830. {
  2831. Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server"));
  2832. ForEach(*it) {
  2833. StringBuffer instance;
  2834. if(it->query().hasProp("@queue"))
  2835. {
  2836. const char* queue=it->query().queryProp("@queue");
  2837. if(strstr(queue,".thor")) {
  2838. Owned<IPropertyTreeIterator> wuids(it->query().getElements("WorkUnit"));
  2839. ForEach(*wuids) {
  2840. IPropertyTree &wu = wuids->query();
  2841. const char* wuid=wu.queryProp(NULL);
  2842. if (wuid&&*wuid) {
  2843. const char *prioclass = wu.queryProp("@priorityClass");
  2844. bool high = false;
  2845. if (prioclass&&(stricmp(prioclass,"high")==0))
  2846. high = true;
  2847. OUTLOG("%s running on queue %s",wuid,queue);
  2848. }
  2849. }
  2850. }
  2851. }
  2852. }
  2853. }
  2854. }
  2855. #define CHECKPARAMS(mn,mx) { if ((np<mn)||(np>mx)) throw MakeStringException(-1,"%s: incorrect number of parameters",cmd); }
  2856. int main(int argc, char* argv[])
  2857. {
  2858. int ret = 0;
  2859. InitModuleObjects();
  2860. EnableSEHtoExceptionMapping();
  2861. setDaliServixSocketCaching(true);
  2862. if (argc<2) {
  2863. usage(argv[0]);
  2864. return -1;
  2865. }
  2866. Owned<IProperties> props = createProperties("daliadmin.ini");
  2867. StringArray params;
  2868. SocketEndpoint ep;
  2869. StringBuffer tmps;
  2870. for (int i=1;i<argc;i++) {
  2871. const char *param = argv[i];
  2872. if ((memcmp(param,"server=",7)==0)||
  2873. (memcmp(param,"logfile=",8)==0)||
  2874. (memcmp(param,"rawlog=",7)==0)||
  2875. (memcmp(param,"user=",5)==0)||
  2876. (memcmp(param,"password=",9)==0) ||
  2877. (memcmp(param,"fix=",4)==0) ||
  2878. (memcmp(param,"verbose=",8)==0) ||
  2879. (memcmp(param,"deletefiles=",12)==0) ||
  2880. (memcmp(param,"timeout=",8)==0))
  2881. props->loadProp(param);
  2882. else if ((i==1)&&(isdigit(*param)||(*param=='.'))&&ep.set(((*param=='.')&&param[1])?(param+1):param,DALI_SERVER_PORT))
  2883. props->setProp("server",ep.getUrlStr(tmps.clear()).str());
  2884. else {
  2885. if ((0==stricmp(param,"help")) || (0 ==stricmp(param,"-help")) || (0 ==stricmp(param,"--help"))) {
  2886. usage(argv[0]);
  2887. return -1;
  2888. }
  2889. params.append(param);
  2890. }
  2891. }
  2892. if (!params.ordinality()) {
  2893. usage(argv[0]);
  2894. return -1;
  2895. }
  2896. try {
  2897. StringBuffer logname;
  2898. StringBuffer aliasname;
  2899. bool rawlog = props->getPropBool("rawlog");
  2900. Owned<ILogMsgHandler> fileMsgHandler;
  2901. if (props->getProp("logfile",logname)) {
  2902. if (logname.length()) {
  2903. fileMsgHandler.setown(getFileLogMsgHandler(logname.str(), NULL, rawlog?MSGFIELD_prefix:MSGFIELD_STANDARD, false, false, true));
  2904. queryLogMsgManager()->addMonitorOwn(fileMsgHandler.getClear(), getCategoryLogMsgFilter(MSGAUD_all, MSGCLS_all, TopDetail));
  2905. }
  2906. }
  2907. // set stdout
  2908. attachStandardHandleLogMsgMonitor(stdout,0,MSGAUD_all,MSGCLS_all&~(MSGCLS_disaster|MSGCLS_error|MSGCLS_warning));
  2909. Owned<ILogMsgFilter> filter = getCategoryLogMsgFilter(MSGAUD_user, MSGCLS_error|MSGCLS_warning);
  2910. queryLogMsgManager()->changeMonitorFilter(queryStderrLogMsgHandler(), filter);
  2911. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_prefix);
  2912. }
  2913. catch (IException *e) {
  2914. pexception("daliadmin",e);
  2915. e->Release();
  2916. ret = 255;
  2917. }
  2918. unsigned daliconnectelapsed;
  2919. StringBuffer daliserv;
  2920. if (!ret) {
  2921. const char *cmd = params.item(0);
  2922. unsigned np = params.ordinality()-1;
  2923. if (!props->getProp("server",daliserv.clear()))
  2924. {
  2925. // external commands
  2926. if (stricmp(cmd,"xmlsize")==0)
  2927. {
  2928. CHECKPARAMS(1,2);
  2929. xmlSize(params.item(1), np>1?atof(params.item(2)):1.0);
  2930. }
  2931. else
  2932. ERRLOG("Unknown command %s",cmd);
  2933. return 0;
  2934. }
  2935. else
  2936. {
  2937. try {
  2938. SocketEndpoint ep(daliserv.str(),DALI_SERVER_PORT);
  2939. SocketEndpointArray epa;
  2940. epa.append(ep);
  2941. Owned<IGroup> group = createIGroup(epa);
  2942. unsigned start = msTick();
  2943. initClientProcess(group, DCR_Util);
  2944. daliconnectelapsed = msTick()-start;
  2945. }
  2946. catch (IException *e) {
  2947. EXCLOG(e,"daliadmin initClientProcess");
  2948. e->Release();
  2949. ret = 254;
  2950. }
  2951. if (!ret) {
  2952. try {
  2953. Owned<IUserDescriptor> userDesc;
  2954. if (props->getProp("user",tmps.clear())) {
  2955. userDesc.setown(createUserDescriptor());
  2956. StringBuffer ps;
  2957. props->getProp("password",ps);
  2958. userDesc->set(tmps.str(),ps.str());
  2959. queryDistributedFileDirectory().setDefaultUser(userDesc);
  2960. }
  2961. daliConnectTimeoutMs = 1000 * props->getPropInt("timeout", DEFAULT_DALICONNECT_TIMEOUT);
  2962. if (stricmp(cmd,"export")==0) {
  2963. CHECKPARAMS(2,2);
  2964. _export_(params.item(1),params.item(2));
  2965. }
  2966. else if (stricmp(cmd,"import")==0) {
  2967. CHECKPARAMS(2,2);
  2968. import(params.item(1),params.item(2),false);
  2969. }
  2970. else if (stricmp(cmd,"importadd")==0) {
  2971. CHECKPARAMS(2,2);
  2972. import(params.item(1),params.item(2),true);
  2973. }
  2974. else if (stricmp(cmd,"delete")==0) {
  2975. CHECKPARAMS(1,1);
  2976. _delete_(params.item(1),true);
  2977. }
  2978. else if (stricmp(cmd,"set")==0) {
  2979. CHECKPARAMS(2,2);
  2980. set(params.item(1),params.item(2));
  2981. }
  2982. else if (stricmp(cmd,"get")==0) {
  2983. CHECKPARAMS(1,1);
  2984. get(params.item(1));
  2985. }
  2986. else if (stricmp(cmd,"bget")==0) {
  2987. CHECKPARAMS(2,2);
  2988. bget(params.item(1),params.item(2));
  2989. }
  2990. else if (stricmp(cmd,"wget")==0) {
  2991. CHECKPARAMS(1,1);
  2992. wget(params.item(1));
  2993. }
  2994. else if (stricmp(cmd,"xget")==0) {
  2995. CHECKPARAMS(1,1);
  2996. wget(params.item(1));
  2997. }
  2998. else if (stricmp(cmd,"add")==0) {
  2999. CHECKPARAMS(1,2);
  3000. add(params.item(1), (np>1) ? params.item(2) : nullptr);
  3001. }
  3002. else if (stricmp(cmd,"delv")==0) {
  3003. CHECKPARAMS(1,1);
  3004. delv(params.item(1));
  3005. }
  3006. else if (stricmp(cmd,"count")==0) {
  3007. CHECKPARAMS(1,1);
  3008. count(params.item(1));
  3009. }
  3010. else if (stricmp(cmd,"dfsfile")==0) {
  3011. CHECKPARAMS(1,1);
  3012. dfsfile(params.item(1),userDesc);
  3013. }
  3014. else if (stricmp(cmd,"dfspart")==0) {
  3015. CHECKPARAMS(2,2);
  3016. dfspart(params.item(1),userDesc,atoi(params.item(2)));
  3017. }
  3018. else if (stricmp(cmd,"dfscheck")==0) {
  3019. CHECKPARAMS(0,0);
  3020. dfsCheck();
  3021. }
  3022. else if (stricmp(cmd,"dfscsv")==0) {
  3023. CHECKPARAMS(1,1);
  3024. dfscsv(params.item(1),userDesc);
  3025. }
  3026. else if (stricmp(cmd,"dfsgroup")==0) {
  3027. CHECKPARAMS(1,2);
  3028. dfsGroup(params.item(1),(np>1)?params.item(2):NULL);
  3029. }
  3030. else if (stricmp(cmd,"clusternodes")==0) {
  3031. CHECKPARAMS(1,2);
  3032. ret = clusterGroup(params.item(1),(np>1)?params.item(2):NULL);
  3033. }
  3034. else if (stricmp(cmd,"dfsls")==0) {
  3035. CHECKPARAMS(0,2);
  3036. dfsLs((np>0)?params.item(1):NULL,(np>1)?params.item(2):NULL);
  3037. }
  3038. else if (stricmp(cmd,"dfsmap")==0) {
  3039. CHECKPARAMS(1,1);
  3040. dfsmap(params.item(1), userDesc);
  3041. }
  3042. else if (stricmp(cmd,"dfsexist")==0) {
  3043. CHECKPARAMS(1,1);
  3044. ret = dfsexists(params.item(1),userDesc);
  3045. }
  3046. else if (stricmp(cmd,"dfsparents")==0) {
  3047. CHECKPARAMS(1,1);
  3048. dfsparents(params.item(1),userDesc);
  3049. }
  3050. else if (stricmp(cmd,"dfsunlink")==0) {
  3051. CHECKPARAMS(1,1);
  3052. dfsunlink(params.item(1),userDesc);
  3053. }
  3054. else if (stricmp(cmd,"dfsverify")==0) {
  3055. CHECKPARAMS(1,1);
  3056. ret = dfsverify(params.item(1),NULL,userDesc);
  3057. }
  3058. else if (stricmp(cmd,"setprotect")==0) {
  3059. CHECKPARAMS(2,2);
  3060. setprotect(params.item(1),params.item(2),userDesc);
  3061. }
  3062. else if (stricmp(cmd,"unprotect")==0) {
  3063. CHECKPARAMS(2,2);
  3064. unprotect(params.item(1),params.item(2),userDesc);
  3065. }
  3066. else if (stricmp(cmd,"listprotect")==0) {
  3067. CHECKPARAMS(0,2);
  3068. listprotect((np>1)?params.item(1):"*",(np>2)?params.item(2):"*");
  3069. }
  3070. else if (stricmp(cmd,"checksuperfile")==0) {
  3071. CHECKPARAMS(1,1);
  3072. bool fix = props->getPropBool("fix");
  3073. checksuperfile(params.item(1),fix);
  3074. }
  3075. else if (stricmp(cmd,"checksubfile")==0) {
  3076. CHECKPARAMS(1,1);
  3077. checksubfile(params.item(1));
  3078. }
  3079. else if (stricmp(cmd,"listexpires")==0) {
  3080. CHECKPARAMS(0,1);
  3081. listexpires((np>1)?params.item(1):"*",userDesc);
  3082. }
  3083. else if (stricmp(cmd,"listrelationships")==0) {
  3084. CHECKPARAMS(2,2);
  3085. listrelationships(params.item(1),params.item(2));
  3086. }
  3087. else if (stricmp(cmd,"dfsperm")==0) {
  3088. if (!userDesc.get())
  3089. throw MakeStringException(-1,"dfsperm requires username to be set (user=)");
  3090. CHECKPARAMS(1,1);
  3091. ret = dfsperm(params.item(1),userDesc);
  3092. }
  3093. else if (stricmp(cmd,"dfscompratio")==0) {
  3094. CHECKPARAMS(1,1);
  3095. dfscompratio(params.item(1),userDesc);
  3096. }
  3097. else if (stricmp(cmd,"dfsscopes")==0) {
  3098. CHECKPARAMS(0,1);
  3099. dfsscopes((np>1)?params.item(1):"*",userDesc);
  3100. }
  3101. else if (stricmp(cmd,"cleanscopes")==0) {
  3102. CHECKPARAMS(0,0);
  3103. cleanscopes(userDesc);
  3104. }
  3105. else if (stricmp(cmd,"listworkunits")==0) {
  3106. CHECKPARAMS(0,3);
  3107. listworkunits((np>0)?params.item(1):NULL,(np>1)?params.item(2):NULL,(np>2)?params.item(3):NULL);
  3108. }
  3109. else if (stricmp(cmd,"listmatches")==0) {
  3110. CHECKPARAMS(0,3);
  3111. listmatches((np>0)?params.item(1):NULL,(np>1)?params.item(2):NULL,(np>2)?params.item(3):NULL);
  3112. }
  3113. else if (stricmp(cmd,"workunittimings")==0) {
  3114. CHECKPARAMS(1,1);
  3115. workunittimings(params.item(1));
  3116. }
  3117. else if (stricmp(cmd,"serverlist")==0) {
  3118. CHECKPARAMS(1,1);
  3119. serverlist(params.item(1));
  3120. }
  3121. else if (stricmp(cmd,"clusterlist")==0) {
  3122. CHECKPARAMS(1,1);
  3123. clusterlist(params.item(1));
  3124. }
  3125. else if (stricmp(cmd,"auditlog")==0) {
  3126. CHECKPARAMS(2,3);
  3127. auditlog(params.item(1),params.item(2),(np>2)?params.item(3):NULL);
  3128. }
  3129. else if (stricmp(cmd,"coalesce")==0) {
  3130. CHECKPARAMS(0,0);
  3131. coalesce();
  3132. }
  3133. else if (stricmp(cmd,"mpping")==0) {
  3134. CHECKPARAMS(1,1);
  3135. mpping(params.item(1));
  3136. }
  3137. else if (stricmp(cmd,"daliping")==0) {
  3138. CHECKPARAMS(0,1);
  3139. daliping(daliserv.str(),daliconnectelapsed,(np>0)?atoi(params.item(1)):1);
  3140. }
  3141. else if (stricmp(cmd,"getxref")==0) {
  3142. CHECKPARAMS(1,1);
  3143. getxref(params.item(1));
  3144. }
  3145. else if (stricmp(cmd,"dalilocks")==0) {
  3146. CHECKPARAMS(0,2);
  3147. bool filesonly = false;
  3148. if (np&&(stricmp(params.item(np),"files")==0)) {
  3149. filesonly = true;
  3150. np--;
  3151. }
  3152. dalilocks(np>0?params.item(1):NULL,filesonly);
  3153. }
  3154. else if (stricmp(cmd,"unlock")==0) {
  3155. CHECKPARAMS(2,2);
  3156. const char *fileOrPath = params.item(2);
  3157. if (0 == stricmp("file", fileOrPath))
  3158. unlock(params.item(1), true);
  3159. else if (0 == stricmp("path", fileOrPath))
  3160. unlock(params.item(1), false);
  3161. else
  3162. throw MakeStringException(0, "unknown type [ %s ], must be 'file' or 'path'", fileOrPath);
  3163. }
  3164. else if (stricmp(cmd,"validateStore")==0) {
  3165. CHECKPARAMS(0,2);
  3166. bool fix = props->getPropBool("fix");
  3167. bool verbose = props->getPropBool("verbose");
  3168. bool deleteFiles = props->getPropBool("deletefiles");
  3169. validateStore(fix, deleteFiles, verbose);
  3170. }
  3171. else if (stricmp(cmd, "workunit") == 0) {
  3172. CHECKPARAMS(1,2);
  3173. bool includeProgress=false;
  3174. if (np>1)
  3175. includeProgress = strToBool(params.item(2));
  3176. dumpWorkunit(params.item(1), includeProgress);
  3177. }
  3178. else if (stricmp(cmd,"wuidCompress")==0) {
  3179. CHECKPARAMS(2,2);
  3180. wuidCompress(params.item(1), params.item(2), true);
  3181. }
  3182. else if (stricmp(cmd,"wuidDecompress")==0) {
  3183. CHECKPARAMS(2,2);
  3184. wuidCompress(params.item(1), params.item(2), false);
  3185. }
  3186. else if (stricmp(cmd,"dfsreplication")==0) {
  3187. CHECKPARAMS(3,4);
  3188. bool dryRun = np>3 && strieq("dryrun", params.item(4));
  3189. dfsreplication(params.item(1), params.item(2), atoi(params.item(3)), dryRun);
  3190. }
  3191. else if (stricmp(cmd,"holdlock")==0) {
  3192. CHECKPARAMS(2,2);
  3193. holdlock(params.item(1), params.item(2), userDesc);
  3194. }
  3195. else if (stricmp(cmd, "progress") == 0) {
  3196. CHECKPARAMS(2,2);
  3197. dumpProgress(params.item(1), params.item(2));
  3198. }
  3199. else if (stricmp(cmd, "stats") == 0) {
  3200. CHECKPARAMS(1, 7);
  3201. if ((params.ordinality() >= 3) && (strchr(params.item(2), '[')))
  3202. {
  3203. bool csv = params.isItem(3) && strieq(params.item(3), "csv");
  3204. dumpStats(params.item(1), "-", "-", "-", "-", "-", params.item(2), csv);
  3205. }
  3206. else
  3207. {
  3208. while (params.ordinality() < 7)
  3209. params.append("*");
  3210. bool csv = params.isItem(7) && strieq(params.item(7), "csv");
  3211. dumpStats(params.item(1), params.item(2), params.item(3), params.item(4), params.item(5), params.item(6), nullptr, csv);
  3212. }
  3213. }
  3214. else if (stricmp(cmd, "migratefiles") == 0)
  3215. {
  3216. CHECKPARAMS(2, 7);
  3217. const char *srcGroup = params.item(1);
  3218. const char *dstGroup = params.item(2);
  3219. const char *filemask = "*";
  3220. StringBuffer options;
  3221. if (params.isItem(3))
  3222. {
  3223. filemask = params.item(3);
  3224. unsigned arg=4;
  3225. StringArray optArray;
  3226. while (arg<params.ordinality())
  3227. optArray.append(params.item(arg++));
  3228. optArray.getString(options, ",");
  3229. }
  3230. migrateFiles(srcGroup, dstGroup, filemask, options);
  3231. }
  3232. else
  3233. ERRLOG("Unknown command %s",cmd);
  3234. }
  3235. catch (IException *e) {
  3236. EXCLOG(e,"daliadmin");
  3237. e->Release();
  3238. }
  3239. closedownClientProcess();
  3240. }
  3241. }
  3242. }
  3243. setDaliServixSocketCaching(false);
  3244. setNodeCaching(false);
  3245. releaseAtoms();
  3246. fflush(stdout);
  3247. fflush(stderr);
  3248. return ret;
  3249. }