backupnode2.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jiface.hpp"
  16. #include "jptree.hpp"
  17. #include "jmisc.hpp"
  18. #include "jregexp.hpp"
  19. #include "jset.hpp"
  20. #include "jflz.hpp"
  21. #include "mpbase.hpp"
  22. #include "mpcomm.hpp"
  23. #include "daclient.hpp"
  24. #include "dadfs.hpp"
  25. #include "dautils.hpp"
  26. #include "dasds.hpp"
  27. #include "rmtfile.hpp"
  28. #define LOGPFX "backupnode: "
  29. #define FLZCOMPRESS
  30. class CFileListWriter
  31. {
  32. public:
  33. bool abort;
  34. bool verbose;
  35. StringArray clustersin;
  36. StringArray clustersout;
  37. IGroup *group;
  38. IpAddress *iphash;
  39. unsigned *ipnum;
  40. unsigned iphashsz;
  41. unsigned numparts;
  42. unsigned numfiles;
  43. IArrayOf<IFileIOStream> *outStreams;
  44. void log(const char * format, ...) __attribute__((format(printf, 2, 3)))
  45. {
  46. va_list args;
  47. va_start(args, format);
  48. StringBuffer line;
  49. line.valist_appendf(format, args);
  50. va_end(args);
  51. PROGLOG(LOGPFX "%s",line.str());
  52. }
  53. void error(const char * format, ...) __attribute__((format(printf, 2, 3)))
  54. {
  55. va_list args;
  56. va_start(args, format);
  57. StringBuffer line;
  58. line.valist_appendf(format, args);
  59. va_end(args);
  60. ERRLOG(LOGPFX "%s",line.str());
  61. }
  62. void warn(const char * format, ...) __attribute__((format(printf, 2, 3)))
  63. {
  64. va_list args;
  65. va_start(args, format);
  66. StringBuffer line;
  67. line.valist_appendf(format, args);
  68. va_end(args);
  69. WARNLOG(LOGPFX "%s",line.str());
  70. }
  71. void addIpHash(const IpAddress &ip,unsigned n)
  72. {
  73. unsigned r;
  74. _cpyrev4(&r,&ip);
  75. unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
  76. while (!iphash[h].isNull())
  77. if (++h==iphashsz)
  78. h = 0;
  79. iphash[h] = ip;
  80. ipnum[h] = n;
  81. }
  82. unsigned checkIpHash(const IpAddress &ip)
  83. {
  84. unsigned r;
  85. _cpyrev4(&r,&ip);
  86. unsigned h = hashc((const byte *)&r,sizeof(r),0)%iphashsz;
  87. while (!iphash[h].isNull()) {
  88. if (iphash[h].ipequals(ip))
  89. return ipnum[h];
  90. if (++h==iphashsz)
  91. h = 0;
  92. }
  93. return NotFound;
  94. }
  95. CFileListWriter()
  96. {
  97. abort = false;
  98. verbose = false;
  99. iphash = NULL;
  100. ipnum = NULL;
  101. iphashsz = 0;
  102. numfiles = 0;
  103. numparts = 0;
  104. }
  105. ~CFileListWriter()
  106. {
  107. if (iphash)
  108. delete [] iphash;
  109. delete [] ipnum;
  110. }
  111. void write(IGroup *_group,IArrayOf<IFileIOStream> &_outStreams)
  112. {
  113. if (!_group||abort)
  114. return;
  115. group = _group;
  116. outStreams = &_outStreams;
  117. delete [] iphash;
  118. iphash = NULL;
  119. delete [] ipnum;
  120. iphashsz = group->ordinality()*2;
  121. iphash = new IpAddress[iphashsz];
  122. ipnum = new unsigned[iphashsz];
  123. bool grphasports = false;
  124. ForEachNodeInGroup(i,*group) {
  125. const SocketEndpoint &ep = group->queryNode(i).endpoint();
  126. if (ep.port!=0)
  127. grphasports = true;
  128. addIpHash(ep,i);
  129. }
  130. if (grphasports)
  131. ERRLOG(LOGPFX "Group has ports!");
  132. class cfilescan1 : public CSDSFileScanner
  133. {
  134. Owned<IRemoteConnection> conn;
  135. CFileListWriter &parent;
  136. bool &abort;
  137. bool checkFileOk(IPropertyTree &file,const char *filename)
  138. {
  139. if (abort)
  140. return false;
  141. StringArray groups;
  142. getFileGroups(&file,groups);
  143. if (groups.ordinality()==0) {
  144. parent.error("File has no group defined: %s",filename);
  145. return false;
  146. }
  147. ForEachItemIn(i,groups) {
  148. const char *cluster = groups.item(i);
  149. ForEachItemIn(j1,parent.clustersin) {
  150. if (strcmp(parent.clustersin.item(j1),cluster)==0)
  151. return true;
  152. }
  153. bool excluded = false;
  154. ForEachItemIn(j2,parent.clustersout) {
  155. if (strcmp(parent.clustersout.item(j2),cluster)==0) {
  156. excluded = true;
  157. break;
  158. }
  159. }
  160. if (excluded)
  161. continue;
  162. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  163. if (!group) {
  164. parent.error("cannot find cluster %s",cluster);
  165. parent.clustersout.append(cluster);
  166. continue;
  167. }
  168. ForEachNodeInGroup(i,*group) {
  169. unsigned nn = parent.checkIpHash(group->queryNode(i).endpoint());
  170. if (nn!=NotFound) {
  171. parent.clustersin.append(cluster);
  172. return true;
  173. }
  174. }
  175. }
  176. return false;
  177. }
  178. bool checkScopeOk(const char *scopename)
  179. {
  180. return !abort;
  181. }
  182. void processFile(IPropertyTree &file,StringBuffer &name)
  183. {
  184. if (abort)
  185. return;
  186. if (parent.verbose)
  187. parent.log("Process file %s",name.str());
  188. Owned<IFileDescriptor> fdesc;
  189. try {
  190. fdesc.setown(deserializeFileDescriptorTree(&file,&queryNamedGroupStore()));
  191. }
  192. catch (IException *e) {
  193. EXCLOG(e,LOGPFX "processFile");
  194. e->Release();
  195. }
  196. if (fdesc) {
  197. unsigned np = fdesc->numParts();
  198. if (np==0) {
  199. parent.error("File has no parts %s",name.str());
  200. return;
  201. }
  202. parent.numfiles++;
  203. StringBuffer fn;
  204. StringBuffer dir;
  205. bool incluster = true;
  206. StringBuffer ln;
  207. for (unsigned p=0;p<np;p++) {
  208. if (abort)
  209. return;
  210. unsigned matched = 0;
  211. unsigned nc = fdesc->numCopies(p);
  212. unsigned c;
  213. UnsignedArray map;
  214. unsigned nf = 0;
  215. for (c=0;c<nc;c++) {
  216. INode *node = fdesc->queryNode(p,c);
  217. unsigned nn = parent.checkIpHash(node->endpoint());
  218. map.append(nn);
  219. if (nn!=NotFound)
  220. nf++;
  221. }
  222. if (nf>1) { // 1 not much use
  223. parent.numparts++;
  224. ForEachItemIn(i,map) {
  225. unsigned from = map.item(i);
  226. if (from!=NotFound) {
  227. ForEachItemIn(j,map) {
  228. if (i!=j) {
  229. unsigned to = map.item(j);
  230. if (to!=NotFound) {
  231. // right lets go for it
  232. IFileIOStream &out = parent.outStreams->item(from);
  233. RemoteFilename rfn;
  234. fdesc->getFilename(p,i,rfn);
  235. rfn.getLocalPath(ln.clear());
  236. ln.append('|');
  237. fdesc->getFilename(p,j,rfn);
  238. rfn.getRemotePath(ln);
  239. ln.append('\n');
  240. out.write(ln.length(),ln.str());
  241. }
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }
  248. }
  249. else
  250. parent.error("cannot create file descriptor %s",name.str());
  251. }
  252. public:
  253. cfilescan1(CFileListWriter &_parent,bool &_abort)
  254. : parent(_parent), abort(_abort)
  255. {
  256. }
  257. ~cfilescan1()
  258. {
  259. }
  260. void scan()
  261. {
  262. if (abort)
  263. return;
  264. conn.setown(querySDS().connect("/Files", myProcessSession(), 0, 100000));
  265. if (!conn||abort)
  266. return;
  267. CSDSFileScanner::scan(conn);
  268. }
  269. } filescan(*this,abort);
  270. filescan.scan();
  271. log("File scan complete, %d files, %d parts",numfiles,numparts);
  272. }
  273. };
  274. class CFileStreamReader // this ought to be in jlib really
  275. {
  276. Linked<IFileIOStream> strm;
  277. MemoryAttr mba;
  278. size32_t maxlinesize;
  279. size32_t buffsize;
  280. char *buf;
  281. size32_t lbsize;
  282. char *p;
  283. bool eof;
  284. public:
  285. CFileStreamReader(IFileIOStream * _in,size32_t _maxlinesize=8192,size32_t _buffsize=0x10000)
  286. : strm(_in)
  287. {
  288. maxlinesize = _maxlinesize;
  289. buffsize = _buffsize;
  290. buf = (char *)mba.allocate(buffsize+maxlinesize+1);
  291. lbsize = 0;
  292. p=NULL;
  293. eof = false;
  294. }
  295. char* nextLine(size32_t &lnsize)
  296. {
  297. if (lbsize<maxlinesize) {
  298. if (!eof) {
  299. if (lbsize&&(p!=buf))
  300. memmove(buf,p,lbsize);
  301. p = buf;
  302. size32_t rd = strm->read(buffsize,buf+lbsize);
  303. if (rd==0) {
  304. eof = true;
  305. if (lbsize==0)
  306. return NULL;
  307. if (buf[lbsize-1]!='\n')
  308. buf[lbsize++] = '\n'; // terminate unfinished line
  309. }
  310. else
  311. lbsize += rd;
  312. }
  313. else if (lbsize==0)
  314. return NULL;
  315. }
  316. size32_t len = 0;
  317. char *ret = p;
  318. while ((len<maxlinesize)&&(p[len]!='\n'))
  319. len++;
  320. p[len] = 0;
  321. lnsize = len;
  322. len++;
  323. lbsize-=len;
  324. p+=len;
  325. return ret;
  326. }
  327. };
  328. bool outputPartsFiles(const char *daliserver,const char *cluster,const char *outdir, StringBuffer &errstr, bool verbose)
  329. {
  330. errstr.clear();
  331. bool dalistarted = false;
  332. if (daliserver&&*daliserver) {
  333. try {
  334. // connect to dali
  335. Owned<IGroup> serverGroup = createIGroup(daliserver,DALI_SERVER_PORT);
  336. initClientProcess(serverGroup, DCR_BackupGen, 0, NULL, NULL, 1000*60*5);
  337. dalistarted = true;
  338. CFileListWriter writer;
  339. writer.verbose = verbose;
  340. StringBuffer groupDir;
  341. GroupType groupType;
  342. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster, groupDir, groupType);
  343. if (group) {
  344. if (groupType != grp_thor)
  345. {
  346. errstr.appendf(LOGPFX "expected cluster %s to be a thor cluster",cluster);
  347. closedownClientProcess();
  348. return false;
  349. }
  350. const char * overrideBaseDirectory = NULL;
  351. const char * overrideReplicateDirectory = NULL;
  352. StringBuffer datadir;
  353. StringBuffer repdir;
  354. if (getConfigurationDirectory(NULL,"data","thor",cluster,datadir))
  355. overrideBaseDirectory = datadir.str();
  356. if (getConfigurationDirectory(NULL,"mirror","thor",cluster,repdir))
  357. overrideReplicateDirectory = repdir.str();
  358. if (overrideBaseDirectory&&*overrideBaseDirectory)
  359. setBaseDirectory(overrideBaseDirectory, false);
  360. if (overrideReplicateDirectory&&*overrideBaseDirectory)
  361. setBaseDirectory(overrideReplicateDirectory, true);
  362. IArrayOf<IFileIOStream> outStreams;
  363. StringBuffer path;
  364. ForEachNodeInGroup(i,*group) {
  365. addPathSepChar(path.clear().append(outdir)).append(i+1).append(".DAT");
  366. Owned<IFile> outf = createIFile(path.str());
  367. Owned<IFileIO> outio = outf?outf->open(IFOcreate):NULL;
  368. #ifdef FLZCOMPRESS
  369. Owned<IFileIOStream> out = outio?createFastLZStreamWrite(outio):NULL;
  370. #else
  371. Owned<IFileIOStream> out = outio?createBufferedIOStream(outio):NULL;
  372. #endif
  373. if (!out) {
  374. errstr.appendf(LOGPFX "cannot create file %s",path.str());
  375. closedownClientProcess();
  376. return false;
  377. }
  378. outStreams.append(*out.getClear());
  379. }
  380. writer.write(group,outStreams);
  381. closedownClientProcess();
  382. return true;
  383. }
  384. else
  385. errstr.appendf(LOGPFX "cannot find cluster %s",cluster);
  386. }
  387. catch (IException *e) {
  388. errstr.append(LOGPFX "outPartsFile : ");
  389. e->errorMessage(errstr);
  390. e->Release();
  391. }
  392. }
  393. else
  394. errstr.append(LOGPFX "no dali server specified");
  395. if (dalistarted)
  396. closedownClientProcess();
  397. return errstr.length()==0;
  398. }
  399. void applyPartsFile(IFileIO *in,void (* sync)(const char *,const char *))
  400. {
  401. #ifdef FLZCOMPRESS
  402. Owned<IFileIOStream> strm = createFastLZStreamRead(in);
  403. #else
  404. Owned<IFileIOStream> strm = createBufferedIOStream(in);
  405. #endif
  406. CFileStreamReader reader(strm);
  407. for (;;) {
  408. size32_t sz;
  409. char *line = reader.nextLine(sz);
  410. if (!line)
  411. break;
  412. char *split = strchr(line,'|');
  413. if (split) {
  414. *(split++) = 0;
  415. sync(line,split);
  416. }
  417. }
  418. }