backupnode.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #ifdef _WIN32
  15. #define _WIN32_WINNT 0x0400
  16. #include <windows.h>
  17. #endif
  18. #include "platform.h"
  19. #include "thirdparty.h"
  20. #include "jlib.hpp"
  21. #include "jhtree.hpp"
  22. #include "jio.hpp"
  23. #include "jstring.hpp"
  24. #include "jfile.hpp"
  25. #include "jexcept.hpp"
  26. #include "jsocket.hpp"
  27. #include "jlog.hpp"
  28. #include "rmtfile.hpp"
  29. #define USE_JLOG
  30. extern bool outputPartsFiles(const char *daliserver,const char *cluster,const char *outdir,StringBuffer &errstr);
  31. extern void applyPartsFile(IFileIO *in,void (* applyfn)(const char *,const char *));
  32. static AtomRefTable *ignoreExt = NULL;
  33. MODULE_INIT(INIT_PRIORITY_STANDARD)
  34. {
  35. ignoreExt = new AtomRefTable(true);
  36. return true;
  37. }
  38. MODULE_EXIT()
  39. {
  40. ::Release(ignoreExt);
  41. }
  42. #define BUFSIZE 0x10000
  43. #define MINCOMPRESS_THRESHOLD 0x8000
  44. static StringAttr unixmirror("/mnt/mirror");
  45. static bool checkMode = false;
  46. static bool silent = false;
  47. static bool verbose = false;
  48. static bool compressAll = false;
  49. static bool noCheckSlaveCount = false;
  50. static bool inexactDateMatch = false;
  51. #ifdef USE_JLOG // and why not?
  52. #define println PROGLOG
  53. #define printerr ERRLOG
  54. #else
  55. static void println(const char *format, ...) __attribute__((format(printf, 1, 2)))
  56. {
  57. va_list x;
  58. va_start(x, format);
  59. vfprintf(stdout,format, x);
  60. fprintf(stdout,"\n");
  61. fflush(stdout);
  62. va_end(x);
  63. }
  64. static void printerr(const char *format, ...) __attribute__((format(printf, 1, 2)))
  65. {
  66. va_list x;
  67. va_start(x, format);
  68. fprintf(stderr,"ERROR: ");
  69. vfprintf(stderr,format, x);
  70. fprintf(stderr,"\n");
  71. fflush(stderr);
  72. va_end(x);
  73. }
  74. #endif
  75. static bool shouldCompressFile(const char *name)
  76. {
  77. if (compressAll)
  78. return true;
  79. OwnedIFile file = createIFile(name);
  80. bool iskey = false;
  81. unsigned __int64 filesize = file->size();
  82. if (filesize < MINCOMPRESS_THRESHOLD)
  83. {
  84. if (verbose)
  85. println("File %s is too small to compress", name);
  86. return false;
  87. }
  88. return !isCompressedIndex(name);
  89. }
  90. static bool CopySingleFile(IFile *srcfile,IFile *dstfile, bool compress, bool suppressnotfounderrs)
  91. {
  92. const char *source = srcfile->queryFilename();
  93. const char *target = dstfile->queryFilename();
  94. #ifdef _WIN32
  95. if (compress && shouldCompressFile(source))
  96. {
  97. if (!silent)
  98. println("Copy %s to %s with compress", source, target);
  99. if (!checkMode)
  100. {
  101. HANDLE hTarget=::CreateFile(target,GENERIC_READ|GENERIC_WRITE,0,NULL,CREATE_NEW,0,NULL);
  102. USHORT compression=COMPRESSION_FORMAT_DEFAULT;
  103. DWORD bytes;
  104. if(::DeviceIoControl(hTarget, FSCTL_SET_COMPRESSION, &compression, sizeof(compression), NULL, 0, &bytes, NULL))
  105. {
  106. HANDLE hSource=::CreateFile(source,GENERIC_READ,0,NULL,OPEN_EXISTING,0,NULL);
  107. void *buf = malloc(BUFSIZE);
  108. loop
  109. {
  110. DWORD read;
  111. if (!::ReadFile(hSource, buf, BUFSIZE, &read, NULL))
  112. throw MakeOsException(GetLastError(), "Failed to read file %s", source);
  113. if (read)
  114. {
  115. DWORD wrote;
  116. if (!::WriteFile(hTarget, buf, read, &wrote, NULL))
  117. throw MakeOsException(GetLastError(), "Failed to write file %s", target);
  118. assertex(wrote==read);
  119. }
  120. else
  121. break;
  122. }
  123. FILETIME c, a, w;
  124. ::GetFileTime(hSource, &c, &a, &w);
  125. ::SetFileTime(hTarget, &c, &a, &w);
  126. ::CloseHandle(hSource);
  127. ::CloseHandle(hTarget);
  128. return true;
  129. }
  130. DWORD err=::GetLastError();
  131. ::CloseHandle(hTarget);
  132. }
  133. return checkMode;
  134. }
  135. #endif
  136. if (!silent)
  137. println("Copy %s to %s", source, target);
  138. if(checkMode)
  139. return false;
  140. try {
  141. recursiveCreateDirectoryForFile(target); // maybe should only do if fails
  142. dstfile->remove();
  143. srcfile->copyTo(dstfile,0x100000,NULL,true);
  144. }
  145. catch (IException *e) {
  146. if (suppressnotfounderrs) {
  147. if (srcfile&&!srcfile->exists()) { // its gone!
  148. if (verbose)
  149. printerr("File %s no longer exists", source);
  150. e->Release();
  151. return true;
  152. }
  153. }
  154. StringBuffer msg("CopyFile(");
  155. msg.append(source).append(',').append(target).append("): ");
  156. e->errorMessage(msg);
  157. printerr("%s",msg.str());
  158. e->Release();
  159. return false;
  160. }
  161. return true;
  162. }
  163. void syncFile(const char *src, const char *dst)
  164. {
  165. // from must exist otherwise ignore
  166. Owned<IFile> srcfile = createIFile(src);
  167. bool isdir;
  168. CDateTime srcdt;
  169. offset_t srcsz;
  170. if (srcfile->getInfo(isdir,srcsz,srcdt)) { // ignore if not there
  171. if (isdir)
  172. printerr("src file %s is directory, ignoring copy", src);
  173. else {
  174. Owned<IFile> dstfile = createIFile(dst);
  175. CDateTime dstdt;
  176. offset_t dstsz;
  177. if (dstfile->getInfo(isdir,dstsz,dstdt)) { // check if there
  178. if (isdir) {
  179. printerr("dst file %s is directory, ignoring copy", dst);
  180. return;
  181. }
  182. if ((srcsz==dstsz)&&srcdt.equals(dstdt,!inexactDateMatch))
  183. return;
  184. }
  185. CopySingleFile(srcfile,dstfile, false, true);
  186. }
  187. }
  188. }
  189. static void usage()
  190. {
  191. printf("\nBACKUPNODE sourcepath targetpath [options]\n");
  192. printf(" Copies and optionally compresses files from source to target\n\n");
  193. printf("BACKUPNODE -T slavesfile slaveno path1 path2 path3...\n");
  194. printf(" Thor node backup mode - syncs named paths with adjacent d: drive\n\n");
  195. printf(" if no paths specified use DAT files in directory specified by -X\n\n");
  196. printf("BACKUPNODE -W slavesfile dir\n");
  197. printf(" Waits for .ERR files in the specified directory then concatenates into a log file\n\n");
  198. printf("BACKUPNODE -O daliip cluster outdir\n");
  199. printf(" generates data files in outdir containing all files to be checked (*.DAT) \n\n");
  200. printf("Options:\n");
  201. printf(" -A - compression options apply to all files (normally excludes small files and all keys)\n");
  202. printf(" -B - use /mnt/mirror for replicate target\n");
  203. printf(" -C - compress files on target (including existing files)\n");
  204. printf(" -D - overwrite existing files if size/date mismatch\n");
  205. printf(" -E - set compression state of existing files\n");
  206. printf(" -F <file> - use option XML file\n");
  207. printf(" -I <ext> - ignore files that have specified extention\n");
  208. printf(" -M - ignore sub-second differences when comparing file dates\n");
  209. printf(" -N - Include files even if slave count does not match filename\n");
  210. printf(" -Q - quiet mode: only errors are reported\n");
  211. printf(" -V - verbose mode\n");
  212. printf(" -Y - report what would have been copied/compressed but do nothing\n");
  213. printf(" -S - snmp enabled\n");
  214. printf(" -X <dir> - read part lists (%%n.DAT) from and write %%n.ERR to specified dir\n");
  215. exit(2);
  216. }
  217. static bool different(IFile &target, IFile &source)
  218. {
  219. CDateTime tmt, smt;
  220. if (target.size() != source.size())
  221. return true;
  222. target.getTime(NULL, &tmt, NULL);
  223. if (inexactDateMatch)
  224. {
  225. unsigned hour, min, sec, nanosec;
  226. tmt.getTime(hour, min, sec, nanosec);
  227. tmt.setTime(hour, min, sec, 0);
  228. }
  229. source.getTime(NULL, &smt, NULL);
  230. if (inexactDateMatch)
  231. {
  232. unsigned hour, min, sec, nanosec;
  233. smt.getTime(hour, min, sec, nanosec);
  234. smt.setTime(hour, min, sec, 0);
  235. }
  236. return tmt.compare(smt) != 0;
  237. }
  238. static bool includeFile(IFile &file, unsigned numSlaves)
  239. {
  240. StringBuffer ext;
  241. splitFilename(file.queryFilename(), NULL, NULL, NULL, &ext);
  242. const char *_ext = ext.length()?ext.str()+1:"";
  243. if (ignoreExt->find(*_ext))
  244. return false;
  245. if (!numSlaves || noCheckSlaveCount)
  246. return true;
  247. const char *partcount = strstr(ext.str(), "_of_");
  248. if (partcount)
  249. {
  250. unsigned clusterSize = atoi(partcount+4);
  251. return clusterSize==numSlaves || clusterSize==numSlaves+1;
  252. }
  253. else
  254. return false;
  255. }
  256. static void CopyDirectory(const char *source, const char *target, unsigned numSlaves, bool compress, bool sourceIsMaster)
  257. {
  258. if (verbose)
  259. println("Copy directory %s to %s", source, target);
  260. bool first = true;
  261. Owned<IDirectoryIterator> dir = createDirectoryIterator(source, "*");
  262. ForEach (*dir)
  263. {
  264. IFile &sourceFile = dir->query();
  265. if (sourceFile.isFile())
  266. {
  267. if (includeFile(sourceFile, numSlaves))
  268. {
  269. StringBuffer targetname(target);
  270. targetname.append(PATHSEPCHAR);
  271. dir->getName(targetname);
  272. OwnedIFile destFile = createIFile(targetname.str());
  273. if ((destFile->size()==-1) || (sourceIsMaster && different(*destFile, sourceFile)))
  274. {
  275. if (first && !checkMode)
  276. {
  277. if (!recursiveCreateDirectory(target)) {
  278. throw MakeStringException(-1,"Cannot create directory %s",target);
  279. }
  280. first = false;
  281. }
  282. if (!CopySingleFile(&sourceFile, destFile, compress, true))
  283. printerr("File %s copy to %s failed", sourceFile.queryFilename(), destFile->queryFilename());
  284. }
  285. else if (verbose)
  286. {
  287. println("File %s already exists", destFile->queryFilename());
  288. }
  289. }
  290. else if (verbose)
  291. println("Skipping file %s (cluster size mismatch)", sourceFile.queryFilename());
  292. }
  293. else if (sourceFile.isDirectory())
  294. {
  295. StringBuffer newSource(source);
  296. StringBuffer newTarget(target);
  297. newSource.append(PATHSEPCHAR);
  298. newTarget.append(PATHSEPCHAR);
  299. dir->getName(newSource);
  300. dir->getName(newTarget);
  301. CopyDirectory(newSource.str(), newTarget.str(), numSlaves, compress, sourceIsMaster);
  302. }
  303. }
  304. if (verbose)
  305. println("Copied directory %s to %s", source, target);
  306. }
  307. static void CompressDirectory(const char *target, unsigned numSlaves, bool compress)
  308. {
  309. #ifdef _WIN32
  310. if (verbose)
  311. println("%s directory %s", compress ? "Compress" : "Decompress", target);
  312. Owned<IDirectoryIterator> dir = createDirectoryIterator(target, "*");
  313. ForEach (*dir)
  314. {
  315. IFile &targetFile = dir->query();
  316. if (targetFile.isFile())
  317. {
  318. if (includeFile(targetFile, numSlaves))
  319. {
  320. // Quick test to see if it's a key file.
  321. bool compressThis = compress && shouldCompressFile(targetFile.queryFilename());
  322. DWORD attr=::GetFileAttributes(targetFile.queryFilename());
  323. if (attr==-1)
  324. printerr("Could not read compression state of %s: error %x", targetFile.queryFilename(), ::GetLastError());
  325. else
  326. {
  327. bool compressed = (attr & FILE_ATTRIBUTE_COMPRESSED) != 0;
  328. if (compressed != compressThis)
  329. {
  330. if (!silent)
  331. {
  332. if (compressThis)
  333. println("Compress %s before %"I64F"d", targetFile.queryFilename(), targetFile.size());
  334. else
  335. println("Decompress %s before %"I64F"d", targetFile.queryFilename(), targetFile.compressedSize());
  336. }
  337. if (!checkMode)
  338. targetFile.setCompression(compressThis);
  339. if (!silent)
  340. {
  341. if (compressThis)
  342. {
  343. if (checkMode)
  344. println(""); // size after not known
  345. else
  346. println("after %"I64F"d", targetFile.compressedSize());
  347. }
  348. else
  349. println("after %"I64F"d", targetFile.size());
  350. }
  351. }
  352. }
  353. }
  354. }
  355. else if (targetFile.isDirectory())
  356. {
  357. StringBuffer newTarget(target);
  358. newTarget.append(PATHSEPCHAR);
  359. dir->getName(newTarget);
  360. CompressDirectory(newTarget.str(), numSlaves, compress);
  361. }
  362. }
  363. if (verbose)
  364. println("%s directory %s", compress ? "Compressed" : "Decompressed", target);
  365. #endif
  366. }
  367. #define MAX_SLAVES 1000
  368. static StringAttr slaveIP[MAX_SLAVES+1];
  369. static unsigned numSlaves;
  370. static void loadSlaves(const char *slavesName)
  371. {
  372. FILE *slavesFile = fopen(slavesName, "rt");
  373. if( !slavesFile)
  374. {
  375. printerr("failed to open slaves file %s", slavesName);
  376. throw MakeStringException(MSGAUD_operator, 0, "failed to open slaves file %s", slavesName);
  377. }
  378. char inbuf[1000];
  379. numSlaves = 0;
  380. while (fgets( inbuf, sizeof(inbuf), slavesFile))
  381. {
  382. char *hash = strchr(inbuf, '#');
  383. if (hash)
  384. *hash = 0;
  385. char *finger = inbuf;
  386. loop
  387. {
  388. while (isspace(*finger))
  389. finger++;
  390. char *start = finger;
  391. while (*finger && !isspace(*finger))
  392. finger++;
  393. if (finger > start)
  394. slaveIP[numSlaves ++].set(start, finger - start);
  395. else
  396. break;
  397. if (numSlaves > MAX_SLAVES)
  398. {
  399. printerr("Too many slaves - invalid slaves file %s?", slavesName);
  400. throw MakeStringException(MSGAUD_operator, 0, "Too many slaves - invalid slaves file %s?", slavesName);
  401. }
  402. }
  403. }
  404. fclose(slavesFile);
  405. slaveIP[numSlaves].set(slaveIP[0].get());
  406. }
  407. static void waitSlaves(const char *dir,unsigned num,StringAttr *slaves)
  408. {
  409. unsigned start=msTick();
  410. unsigned last=0;
  411. bool *done = (bool *)calloc(num,sizeof(bool));
  412. unsigned ndone = 0;
  413. unsigned errors = 0;
  414. StringBuffer name;
  415. while (ndone<num) {
  416. unsigned startndone = ndone;
  417. for (unsigned i=0;i<num;i++) {
  418. if (!done[i]) {
  419. addPathSepChar(name.clear().append(dir)).append(i+1).append(".ERR");
  420. if (checkFileExists(name.str())) {
  421. done[i] = true;
  422. ndone++;
  423. for (unsigned attempt=0;attempt<10;attempt++) {
  424. try {
  425. Owned<IFile> file = createIFile(name.str());
  426. Owned<IFileIO> fio = file->open(IFOread);
  427. if (fio) {
  428. size32_t sz = (size32_t)fio->size();
  429. if (sz) {
  430. StringBuffer s;
  431. fio->read(0,sz,s.reserve(sz));
  432. println("%s: %s",slaves[i].get(),s.str());
  433. errors++;
  434. }
  435. else {
  436. try {
  437. fio.clear();
  438. file->remove();
  439. }
  440. catch (IException *e) {
  441. StringBuffer msg("waitSlaves.1: ");
  442. e->errorMessage(msg);
  443. println("%s",msg.str());
  444. e->Release();
  445. }
  446. println("%s: DONE",slaves[i].get());
  447. }
  448. break;
  449. }
  450. }
  451. catch (IException *e) {
  452. if (attempt==9) {
  453. StringBuffer msg("waitSlaves.2: ");
  454. e->errorMessage(msg);
  455. println("%s",msg.str());
  456. }
  457. e->Release();
  458. }
  459. Sleep(5000);
  460. }
  461. }
  462. }
  463. }
  464. if (startndone==ndone) {
  465. Sleep(5000);
  466. }
  467. unsigned t = (msTick()-start)/(5*1000*60);
  468. if (t!=last) {
  469. last = t;
  470. println("Running: %d minutes taken, %d slave%s complete of %d",t*5,ndone,(ndone==1)?"":"s",num);
  471. if (num-ndone<10) {
  472. StringBuffer waiting;
  473. for (unsigned j=0;j<num;j++) {
  474. if (!done[j]) {
  475. if (waiting.length())
  476. waiting.append(',');
  477. waiting.append(slaves[j]);
  478. }
  479. }
  480. println("Waiting for %s",waiting.str());
  481. }
  482. }
  483. }
  484. unsigned t2 = (msTick()-start)/1000;
  485. println("Completed in %dm %ds with %d error%s",t2/60,t2%60,errors,(errors==1)?"":"s");
  486. free(done);
  487. }
  488. int main(int argc, const char *argv[])
  489. {
  490. #ifndef _WIN32
  491. InitModuleObjects();
  492. #endif
  493. int retValue = 0;
  494. bool compress = false;
  495. bool compressExisting = false;
  496. bool overwriteDifferent = false;
  497. bool thorMode = false;
  498. bool waitMode = false;
  499. bool forceSlaveIP = false;
  500. bool snmpEnabled = false;
  501. bool useMirrorMount = false;
  502. bool outputMode = false;
  503. StringAttr errdatdir;
  504. StringArray args;
  505. unsigned slaveNum = 0;
  506. unsigned argNo = 1;
  507. while ((int)argNo<argc)
  508. {
  509. const char *arg = argv[argNo++];
  510. if (arg[0]=='-')
  511. {
  512. while (arg)
  513. {
  514. switch (toupper(arg[1]))
  515. {
  516. case 'A':
  517. compressAll = true;
  518. break;
  519. case 'B':
  520. useMirrorMount = true;
  521. break;
  522. case 'C':
  523. compress = true;
  524. println("NOTE - executing in check mode. No files will compressed or copied");
  525. break;
  526. case 'D':
  527. overwriteDifferent = true;
  528. break;
  529. case 'E':
  530. compressExisting = true;
  531. break;
  532. case 'F':
  533. forceSlaveIP = true;
  534. break;
  535. case 'I':
  536. {
  537. if ((int)argNo<argc)
  538. ignoreExt->queryCreate(argv[argNo++]);
  539. break;
  540. }
  541. case 'M':
  542. inexactDateMatch = true;
  543. break;
  544. case 'N':
  545. noCheckSlaveCount = true;
  546. break;
  547. case 'O':
  548. outputMode = true;
  549. break;
  550. case 'Q':
  551. if (verbose)
  552. println("Silent and verbose specified - silent will be ignored");
  553. else
  554. silent = true;
  555. break;
  556. case 'S':
  557. snmpEnabled = true;
  558. break;
  559. case 'T':
  560. thorMode = true;
  561. break;
  562. case 'V':
  563. if (silent)
  564. {
  565. println("Silent and verbose specified - silent will be ignored");
  566. silent = false;
  567. }
  568. verbose = true;
  569. break;
  570. case 'W':
  571. waitMode = true;
  572. break;
  573. case 'X':
  574. if ((int)argNo<argc)
  575. errdatdir.set(argv[argNo++]);
  576. break;
  577. case 'Y':
  578. checkMode = true;
  579. break;
  580. default:
  581. usage();
  582. break;
  583. }
  584. if (arg[2]=='/' || arg[2]=='-')
  585. arg += 2;
  586. else if (arg[2])
  587. usage();
  588. else
  589. arg = NULL;
  590. }
  591. }
  592. else
  593. args.append(arg);
  594. }
  595. if (args.ordinality()<2)
  596. usage();
  597. StringBuffer erroutstr;
  598. try
  599. {
  600. if (thorMode)
  601. {
  602. bool usedatfile=false;
  603. setDaliServixSocketCaching(true);
  604. slaveNum = (args.ordinality()<2)?0:atoi(args.item(1));
  605. if (args.ordinality()<3) {
  606. if ((errdatdir.length()==0)||!slaveNum)
  607. {
  608. printerr("-T option specified but no paths and no data dir/slave number specified");
  609. throw MakeStringException(MSGAUD_operator, 0, "-T option specified but no paths and no data dir/slave number specified");
  610. }
  611. usedatfile = true;
  612. }
  613. loadSlaves(args.item(0));
  614. if (!slaveNum || slaveNum>numSlaves)
  615. {
  616. printerr("'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
  617. throw MakeStringException(-1, "'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
  618. }
  619. if (!forceSlaveIP)
  620. {
  621. IpAddress myip;
  622. GetHostIp(myip);
  623. IpAddress myipfromSlaves(slaveIP[slaveNum-1]);
  624. if (!myip.ipequals(myipfromSlaves))
  625. {
  626. StringBuffer ips1, ips2;
  627. myipfromSlaves.getIpText(ips1);
  628. myip.getIpText(ips2);
  629. printerr("IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
  630. throw MakeStringException(-1, "IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
  631. }
  632. }
  633. if (usedatfile) {
  634. StringBuffer datafile(errdatdir);
  635. addPathSepChar(datafile).append(slaveNum).append(".DAT");
  636. Owned<IFile> file = createIFile(datafile.str());
  637. Owned<IFileIO> fio;
  638. // add a slight stagger
  639. Sleep(slaveNum*200);
  640. for (unsigned attempt=0;attempt<10;attempt++) {
  641. try {
  642. fio.setown(file->open(IFOread));
  643. if (fio)
  644. break;
  645. }
  646. catch (IException *e) {
  647. if (attempt==9) {
  648. StringBuffer msg;
  649. e->errorMessage(msg);
  650. printerr("%s",msg.str());
  651. }
  652. e->Release();
  653. }
  654. Sleep(5000);
  655. }
  656. if (fio)
  657. applyPartsFile(fio,syncFile);
  658. else {
  659. printerr("Could not read file %s",datafile.str());
  660. throw MakeStringException(-1, "Could not read file %s",datafile.str());
  661. }
  662. }
  663. else {
  664. aindex_t numArgs = args.ordinality();
  665. for (aindex_t idx = 2; idx<numArgs; ++idx)
  666. {
  667. const char *arg = args.item(idx);
  668. StringBuffer backupDirectory;
  669. StringBuffer localDirectory;
  670. #ifdef _WIN32
  671. backupDirectory.append("\\\\").append(slaveIP[slaveNum]).append("\\d$\\").append(arg);
  672. localDirectory.append("\\\\").append(slaveIP[slaveNum-1]).append("\\c$\\").append(arg);
  673. #else
  674. if (useMirrorMount)
  675. backupDirectory.append(unixmirror.get()).append("/").append(arg);
  676. else
  677. backupDirectory.append("//").append(slaveIP[slaveNum]).append("/d$/").append(arg);
  678. localDirectory.append("/c$/").append(arg);
  679. #endif
  680. if (compressExisting)
  681. CompressDirectory(localDirectory.str(), numSlaves, compress);
  682. CopyDirectory(backupDirectory.str(), localDirectory.str(), numSlaves, compress, false);
  683. if (compressExisting)
  684. CompressDirectory(backupDirectory.str(), numSlaves, compress);
  685. CopyDirectory(localDirectory.str(), backupDirectory.str(), numSlaves, compress, true);
  686. }
  687. }
  688. }
  689. else if (waitMode) {
  690. loadSlaves(args.item(0));
  691. #ifndef _WIN32
  692. struct sigaction act; // ignore break (from parent)
  693. sigset_t blockset;
  694. sigemptyset(&blockset);
  695. act.sa_mask = blockset;
  696. act.sa_handler = SIG_IGN;
  697. sigaction(SIGINT, &act, NULL);
  698. #endif
  699. waitSlaves(args.item(1),numSlaves,slaveIP);
  700. }
  701. else if (outputMode) {
  702. if (args.ordinality()<3)
  703. usage();
  704. else {
  705. if (!silent)
  706. println("Creating part lists, please wait...");
  707. StringBuffer errstr;
  708. if (!outputPartsFiles(args.item(0),args.item(1),args.item(2),errstr))
  709. throw MakeStringException(-1, "%s", errstr.str());
  710. }
  711. }
  712. else
  713. {
  714. const char *source = args.item(0);
  715. const char *target = args.item(1);
  716. if (compressExisting)
  717. CompressDirectory(target, 0, compress);
  718. CopyDirectory(source, target, 0, compress, overwriteDifferent);
  719. }
  720. if (checkMode)
  721. println("NOTE - executing in check mode. No files were compressed or copied");
  722. if(!silent)
  723. println("backupnode finished");
  724. }
  725. catch(IException *E)
  726. {
  727. E->errorMessage(erroutstr);
  728. printerr("%s",erroutstr.str());
  729. E->Release();
  730. retValue = 2;
  731. }
  732. if (errdatdir.length()&&slaveNum) {
  733. StringBuffer errfilename(errdatdir);
  734. addPathSepChar(errfilename).append(slaveNum).append(".ERR");
  735. Owned<IFile> file = createIFile(errfilename.str());
  736. for (unsigned attempt=0;attempt<10;attempt++) {
  737. try {
  738. Owned<IFileIO> fio = file->open(IFOcreate);
  739. if (fio) {
  740. if (erroutstr.length()) {
  741. if (erroutstr.charAt(erroutstr.length()-1)!='\n')
  742. erroutstr.append('\n');
  743. fio->write(0,erroutstr.length(),erroutstr.str());
  744. }
  745. releaseAtoms();
  746. return retValue;
  747. }
  748. }
  749. catch (IException *e) {
  750. if (attempt==9) {
  751. StringBuffer msg;
  752. e->errorMessage(msg);
  753. printerr("%s",msg.str());
  754. }
  755. e->Release();
  756. }
  757. Sleep(5000);
  758. }
  759. printerr("Could not write to %s",errfilename.str());
  760. }
  761. releaseAtoms();
  762. return retValue;
  763. }