backupnode2.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jiface.hpp"
  17. #include "jptree.hpp"
  18. #include "jmisc.hpp"
  19. #include "jregexp.hpp"
  20. #include "jset.hpp"
  21. #include "jflz.hpp"
  22. #include "mpbase.hpp"
  23. #include "mpcomm.hpp"
  24. #include "daclient.hpp"
  25. #include "dadfs.hpp"
  26. #include "dautils.hpp"
  27. #include "dasds.hpp"
  28. #include "rmtfile.hpp"
  29. #define LOGPFX "backupnode: "
  30. #define FLZCOMPRESS
  31. class CFileListWriter
  32. {
  33. public:
  34. bool abort;
  35. bool verbose;
  36. StringArray clustersin;
  37. StringArray clustersout;
  38. IGroup *group;
  39. IpAddress *iphash;
  40. unsigned *ipnum;
  41. unsigned iphashsz;
  42. unsigned numparts;
  43. unsigned numfiles;
  44. IArrayOf<IFileIOStream> *outStreams;
  45. void log(const char * format, ...) __attribute__((format(printf, 2, 3)))
  46. {
  47. va_list args;
  48. va_start(args, format);
  49. StringBuffer line;
  50. line.valist_appendf(format, args);
  51. va_end(args);
  52. PROGLOG(LOGPFX "%s",line.str());
  53. }
  54. void error(const char * format, ...) __attribute__((format(printf, 2, 3)))
  55. {
  56. va_list args;
  57. va_start(args, format);
  58. StringBuffer line;
  59. line.valist_appendf(format, args);
  60. va_end(args);
  61. ERRLOG(LOGPFX "%s",line.str());
  62. }
  63. void warn(const char * format, ...) __attribute__((format(printf, 2, 3)))
  64. {
  65. va_list args;
  66. va_start(args, format);
  67. StringBuffer line;
  68. line.valist_appendf(format, args);
  69. va_end(args);
  70. WARNLOG(LOGPFX "%s",line.str());
  71. }
  72. void addIpHash(const IpAddress &ip,unsigned n)
  73. {
  74. unsigned r;
  75. _cpyrev4(&r,&ip);
  76. unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
  77. while (!iphash[h].isNull())
  78. if (++h==iphashsz)
  79. h = 0;
  80. iphash[h] = ip;
  81. ipnum[h] = n;
  82. }
  83. unsigned checkIpHash(const IpAddress &ip)
  84. {
  85. unsigned r;
  86. _cpyrev4(&r,&ip);
  87. unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
  88. while (!iphash[h].isNull()) {
  89. if (iphash[h].ipequals(ip))
  90. return ipnum[h];
  91. if (++h==iphashsz)
  92. h = 0;
  93. }
  94. return NotFound;
  95. }
  96. CFileListWriter()
  97. {
  98. abort = false;
  99. verbose = false;
  100. iphash = NULL;
  101. ipnum = NULL;
  102. iphashsz = 0;
  103. numfiles = 0;
  104. numparts = 0;
  105. }
  106. ~CFileListWriter()
  107. {
  108. if (iphash)
  109. delete [] iphash;
  110. delete [] ipnum;
  111. }
  112. void write(IGroup *_group,IArrayOf<IFileIOStream> &_outStreams)
  113. {
  114. if (!_group||abort)
  115. return;
  116. group = _group;
  117. outStreams = &_outStreams;
  118. delete [] iphash;
  119. iphash = NULL;
  120. delete [] ipnum;
  121. iphashsz = group->ordinality()*2;
  122. iphash = new IpAddress[iphashsz];
  123. ipnum = new unsigned[iphashsz];
  124. bool grphasports = false;
  125. ForEachNodeInGroup(i,*group) {
  126. const SocketEndpoint &ep = group->queryNode(i).endpoint();
  127. if (ep.port!=0)
  128. grphasports = true;
  129. addIpHash(ep,i);
  130. }
  131. if (grphasports)
  132. ERRLOG(LOGPFX "Group has ports!");
  133. class cfilescan1 : public CSDSFileScanner
  134. {
  135. Owned<IRemoteConnection> conn;
  136. CFileListWriter &parent;
  137. bool &abort;
  138. bool checkFileOk(IPropertyTree &file,const char *filename)
  139. {
  140. if (abort)
  141. return false;
  142. StringArray groups;
  143. getFileGroups(&file,groups);
  144. if (groups.ordinality()==0) {
  145. parent.error("File has no group defined: %s",filename);
  146. return false;
  147. }
  148. ForEachItemIn(i,groups) {
  149. const char *cluster = groups.item(i);
  150. ForEachItemIn(j1,parent.clustersin) {
  151. if (strcmp(parent.clustersin.item(j1),cluster)==0)
  152. return true;
  153. }
  154. bool excluded = false;
  155. ForEachItemIn(j2,parent.clustersout) {
  156. if (strcmp(parent.clustersout.item(j2),cluster)==0) {
  157. excluded = true;
  158. break;
  159. }
  160. }
  161. if (excluded)
  162. continue;
  163. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  164. if (!group) {
  165. parent.error("cannot find cluster %s",cluster);
  166. parent.clustersout.append(cluster);
  167. continue;
  168. }
  169. ForEachNodeInGroup(i,*group) {
  170. unsigned nn = parent.checkIpHash(group->queryNode(i).endpoint());
  171. if (nn!=NotFound) {
  172. parent.clustersin.append(cluster);
  173. return true;
  174. }
  175. }
  176. }
  177. return false;
  178. }
  179. bool checkScopeOk(const char *scopename)
  180. {
  181. return !abort;
  182. }
  183. void processFile(IPropertyTree &file,StringBuffer &name)
  184. {
  185. if (abort)
  186. return;
  187. if (parent.verbose)
  188. parent.log("Process file %s",name.str());
  189. Owned<IFileDescriptor> fdesc;
  190. try {
  191. fdesc.setown(deserializeFileDescriptorTree(&file,&queryNamedGroupStore()));
  192. }
  193. catch (IException *e) {
  194. EXCLOG(e,LOGPFX "processFile");
  195. e->Release();
  196. }
  197. if (fdesc) {
  198. unsigned np = fdesc->numParts();
  199. if (np==0) {
  200. parent.error("File has no parts %s",name.str());
  201. return;
  202. }
  203. parent.numfiles++;
  204. StringBuffer fn;
  205. StringBuffer dir;
  206. bool incluster = true;
  207. StringBuffer ln;
  208. for (unsigned p=0;p<np;p++) {
  209. if (abort)
  210. return;
  211. unsigned matched = 0;
  212. unsigned nc = fdesc->numCopies(p);
  213. unsigned c;
  214. UnsignedArray map;
  215. unsigned nf = 0;
  216. for (c=0;c<nc;c++) {
  217. INode *node = fdesc->queryNode(p,c);
  218. unsigned nn = parent.checkIpHash(node->endpoint());
  219. map.append(nn);
  220. if (nn!=NotFound)
  221. nf++;
  222. }
  223. if (nf>1) { // 1 not much use
  224. parent.numparts++;
  225. ForEachItemIn(i,map) {
  226. unsigned from = map.item(i);
  227. if (from!=NotFound) {
  228. ForEachItemIn(j,map) {
  229. if (i!=j) {
  230. unsigned to = map.item(j);
  231. if (to!=NotFound) {
  232. // right lets go for it
  233. IFileIOStream &out = parent.outStreams->item(from);
  234. RemoteFilename rfn;
  235. fdesc->getFilename(p,i,rfn);
  236. rfn.getLocalPath(ln.clear());
  237. ln.append('|');
  238. fdesc->getFilename(p,j,rfn);
  239. rfn.getRemotePath(ln);
  240. ln.append('\n');
  241. out.write(ln.length(),ln.str());
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }
  248. }
  249. }
  250. else
  251. parent.error("cannot create file descriptor",name.str());
  252. }
  253. public:
  254. cfilescan1(CFileListWriter &_parent,bool &_abort)
  255. : parent(_parent), abort(_abort)
  256. {
  257. }
  258. ~cfilescan1()
  259. {
  260. }
  261. void scan()
  262. {
  263. if (abort)
  264. return;
  265. conn.setown(querySDS().connect("/Files", myProcessSession(), 0, 100000));
  266. if (!conn||abort)
  267. return;
  268. CSDSFileScanner::scan(conn);
  269. }
  270. } filescan(*this,abort);
  271. filescan.scan();
  272. log("File scan complete, %d files, %d parts",numfiles,numparts);
  273. }
  274. };
  275. class CFileStreamReader // this ought to be in jlib really
  276. {
  277. Linked<IFileIOStream> strm;
  278. MemoryAttr mba;
  279. size32_t maxlinesize;
  280. size32_t buffsize;
  281. char *buf;
  282. size32_t lbsize;
  283. char *p;
  284. bool eof;
  285. public:
  286. CFileStreamReader(IFileIOStream * _in,size32_t _maxlinesize=8192,size32_t _buffsize=0x10000)
  287. : strm(_in)
  288. {
  289. maxlinesize = _maxlinesize;
  290. buffsize = _buffsize;
  291. buf = (char *)mba.allocate(buffsize+maxlinesize+1);
  292. lbsize = 0;
  293. p=NULL;
  294. eof = false;
  295. }
  296. char* nextLine(size32_t &lnsize)
  297. {
  298. if (lbsize<maxlinesize) {
  299. if (!eof) {
  300. if (lbsize&&(p!=buf))
  301. memmove(buf,p,lbsize);
  302. p = buf;
  303. size32_t rd = strm->read(buffsize,buf+lbsize);
  304. if (rd==0) {
  305. eof = true;
  306. if (lbsize==0)
  307. return NULL;
  308. if (buf[lbsize-1]!='\n')
  309. buf[lbsize++] = '\n'; // terminate unfinished line
  310. }
  311. else
  312. lbsize += rd;
  313. }
  314. else if (lbsize==0)
  315. return NULL;
  316. }
  317. size32_t len = 0;
  318. char *ret = p;
  319. while ((len<maxlinesize)&&(p[len]!='\n'))
  320. len++;
  321. p[len] = 0;
  322. lnsize = len;
  323. len++;
  324. lbsize-=len;
  325. p+=len;
  326. return ret;
  327. }
  328. };
  329. bool outputPartsFiles(const char *daliserver,const char *cluster,const char *outdir, StringBuffer &errstr)
  330. {
  331. errstr.clear();
  332. bool dalistarted;
  333. if (daliserver&&*daliserver) {
  334. try {
  335. // connect to dali
  336. Owned<IGroup> serverGroup = createIGroup(daliserver,DALI_SERVER_PORT);
  337. initClientProcess(serverGroup, DCR_BackupGen, 0, NULL, NULL, 1000*60*5);
  338. dalistarted = true;
  339. CFileListWriter writer;
  340. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  341. if (group) {
  342. IArrayOf<IFileIOStream> outStreams;
  343. StringBuffer path;
  344. ForEachNodeInGroup(i,*group) {
  345. addPathSepChar(path.clear().append(outdir)).append(i+1).append(".DAT");
  346. Owned<IFile> outf = createIFile(path.str());
  347. Owned<IFileIO> outio = outf?outf->open(IFOcreate):NULL;
  348. #ifdef FLZCOMPRESS
  349. Owned<IFileIOStream> out = outio?createFastLZStreamWrite(outio):NULL;
  350. #else
  351. Owned<IFileIOStream> out = outio?createBufferedIOStream(outio):NULL;
  352. #endif
  353. if (!out) {
  354. errstr.appendf(LOGPFX "cannot create file %s",path.str());
  355. closedownClientProcess();
  356. return false;
  357. }
  358. outStreams.append(*out.getClear());
  359. }
  360. writer.write(group,outStreams);
  361. closedownClientProcess();
  362. return true;
  363. }
  364. else
  365. errstr.appendf(LOGPFX "cannot find cluster %s",cluster);
  366. }
  367. catch (IException *e) {
  368. errstr.append(LOGPFX "outPartsFile : ");
  369. e->errorMessage(errstr);
  370. e->Release();
  371. }
  372. }
  373. else
  374. errstr.append(LOGPFX "no dali server specified");
  375. if (dalistarted)
  376. closedownClientProcess();
  377. return errstr.length()==0;
  378. }
  379. void applyPartsFile(IFileIO *in,void (* sync)(const char *,const char *))
  380. {
  381. #ifdef FLZCOMPRESS
  382. Owned<IFileIOStream> strm = createFastLZStreamRead(in);
  383. #else
  384. Owned<IFileIOStream> strm = createBufferedIOStream(in);
  385. #endif
  386. CFileStreamReader reader(strm);
  387. loop {
  388. size32_t sz;
  389. char *line = reader.nextLine(sz);
  390. if (!line)
  391. break;
  392. char *split = strchr(line,'|');
  393. if (split) {
  394. *(split++) = 0;
  395. sync(line,split);
  396. }
  397. }
  398. }