backupnode.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifdef _WIN32
  14. #define _WIN32_WINNT 0x0400
  15. #include <windows.h>
  16. #endif
  17. #include "platform.h"
  18. #include "thirdparty.h"
  19. #include "jlib.hpp"
  20. #include "jhtree.hpp"
  21. #include "jio.hpp"
  22. #include "jstring.hpp"
  23. #include "jfile.hpp"
  24. #include "jexcept.hpp"
  25. #include "jsocket.hpp"
  26. #include "jlog.hpp"
  27. #include "rmtfile.hpp"
  28. #define USE_JLOG
  29. extern bool outputPartsFiles(const char *daliserver,const char *cluster,const char *outdir,StringBuffer &errstr,bool verbose);
  30. extern void applyPartsFile(IFileIO *in,void (* applyfn)(const char *,const char *));
  31. static AtomRefTable *ignoreExt = NULL;
  32. MODULE_INIT(INIT_PRIORITY_STANDARD)
  33. {
  34. ignoreExt = new AtomRefTable(true);
  35. return true;
  36. }
  37. MODULE_EXIT()
  38. {
  39. ::Release(ignoreExt);
  40. }
  41. #define BUFSIZE 0x10000
  42. #define MINCOMPRESS_THRESHOLD 0x8000
  43. static StringAttr unixmirror("/mnt/mirror");
  44. static bool checkMode = false;
  45. static bool silent = false;
  46. static bool verbose = false;
  47. static bool compressAll = false;
  48. static bool noCheckSlaveCount = false;
  49. static bool inexactDateMatch = false;
  50. #ifdef USE_JLOG // and why not?
  51. #define println PROGLOG
  52. #define printerr ERRLOG
  53. #else
  54. static void println(const char *format, ...) __attribute__((format(printf, 1, 2)))
  55. {
  56. va_list x;
  57. va_start(x, format);
  58. vfprintf(stdout,format, x);
  59. fprintf(stdout,"\n");
  60. fflush(stdout);
  61. va_end(x);
  62. }
  63. static void printerr(const char *format, ...) __attribute__((format(printf, 1, 2)))
  64. {
  65. va_list x;
  66. va_start(x, format);
  67. fprintf(stderr,"ERROR: ");
  68. vfprintf(stderr,format, x);
  69. fprintf(stderr,"\n");
  70. fflush(stderr);
  71. va_end(x);
  72. }
  73. #endif
  74. static bool shouldCompressFile(const char *name)
  75. {
  76. if (compressAll)
  77. return true;
  78. OwnedIFile file = createIFile(name);
  79. bool iskey = false;
  80. unsigned __int64 filesize = file->size();
  81. if (filesize < MINCOMPRESS_THRESHOLD)
  82. {
  83. if (verbose)
  84. println("File %s is too small to compress", name);
  85. return false;
  86. }
  87. return !isCompressedIndex(name);
  88. }
  89. static bool CopySingleFile(IFile *srcfile,IFile *dstfile, bool compress, bool suppressnotfounderrs)
  90. {
  91. const char *source = srcfile->queryFilename();
  92. const char *target = dstfile->queryFilename();
  93. #ifdef _WIN32
  94. if (compress && shouldCompressFile(source))
  95. {
  96. if (!silent)
  97. println("Copy %s to %s with compress", source, target);
  98. if (!checkMode)
  99. {
  100. HANDLE hTarget=::CreateFile(target,GENERIC_READ|GENERIC_WRITE,0,NULL,CREATE_NEW,0,NULL);
  101. USHORT compression=COMPRESSION_FORMAT_DEFAULT;
  102. DWORD bytes;
  103. if(::DeviceIoControl(hTarget, FSCTL_SET_COMPRESSION, &compression, sizeof(compression), NULL, 0, &bytes, NULL))
  104. {
  105. HANDLE hSource=::CreateFile(source,GENERIC_READ,0,NULL,OPEN_EXISTING,0,NULL);
  106. void *buf = malloc(BUFSIZE);
  107. loop
  108. {
  109. DWORD read;
  110. if (!::ReadFile(hSource, buf, BUFSIZE, &read, NULL))
  111. throw makeOsExceptionV(GetLastError(), "Failed to read file %s", source);
  112. if (read)
  113. {
  114. DWORD wrote;
  115. if (!::WriteFile(hTarget, buf, read, &wrote, NULL))
  116. throw makeOsExceptionV(GetLastError(), "Failed to write file %s", target);
  117. assertex(wrote==read);
  118. }
  119. else
  120. break;
  121. }
  122. FILETIME c, a, w;
  123. ::GetFileTime(hSource, &c, &a, &w);
  124. ::SetFileTime(hTarget, &c, &a, &w);
  125. ::CloseHandle(hSource);
  126. ::CloseHandle(hTarget);
  127. return true;
  128. }
  129. DWORD err=::GetLastError();
  130. ::CloseHandle(hTarget);
  131. }
  132. return checkMode;
  133. }
  134. #endif
  135. if (!silent)
  136. println("Copy %s to %s", source, target);
  137. if(checkMode)
  138. return false;
  139. try {
  140. recursiveCreateDirectoryForFile(target); // maybe should only do if fails
  141. dstfile->remove();
  142. srcfile->copyTo(dstfile,0x100000,NULL,true);
  143. }
  144. catch (IException *e) {
  145. if (suppressnotfounderrs) {
  146. if (srcfile&&!srcfile->exists()) { // its gone!
  147. if (verbose)
  148. printerr("File %s no longer exists", source);
  149. e->Release();
  150. return true;
  151. }
  152. }
  153. StringBuffer msg("CopyFile(");
  154. msg.append(source).append(',').append(target).append("): ");
  155. e->errorMessage(msg);
  156. printerr("%s",msg.str());
  157. e->Release();
  158. return false;
  159. }
  160. return true;
  161. }
  162. void syncFile(const char *src, const char *dst)
  163. {
  164. // from must exist otherwise ignore
  165. Owned<IFile> srcfile = createIFile(src);
  166. bool isdir;
  167. CDateTime srcdt;
  168. offset_t srcsz;
  169. if (srcfile->getInfo(isdir,srcsz,srcdt)) { // ignore if not there
  170. if (isdir)
  171. printerr("src file %s is directory, ignoring copy", src);
  172. else {
  173. Owned<IFile> dstfile = createIFile(dst);
  174. CDateTime dstdt;
  175. offset_t dstsz;
  176. if (dstfile->getInfo(isdir,dstsz,dstdt)) { // check if there
  177. if (isdir) {
  178. printerr("dst file %s is directory, ignoring copy", dst);
  179. return;
  180. }
  181. if ((srcsz==dstsz)&&srcdt.equals(dstdt,!inexactDateMatch))
  182. return;
  183. }
  184. CopySingleFile(srcfile,dstfile, false, true);
  185. }
  186. }
  187. }
  188. static void usage()
  189. {
  190. printf("\nBACKUPNODE sourcepath targetpath [options]\n");
  191. printf(" Copies and optionally compresses files from source to target\n\n");
  192. printf("BACKUPNODE -X <data-dir-path> -T slaveno numslaves myip backupip\n");
  193. printf(" Thor node backup mode - syncs named paths with adjacent drive\n\n");
  194. printf("BACKUPNODE -W slavesfile dir\n");
  195. printf(" Waits for .ERR files in the specified directory then concatenates into a log file\n\n");
  196. printf("BACKUPNODE -O daliip cluster outdir\n");
  197. printf(" generates data files in outdir containing all files to be checked (*.DAT) \n\n");
  198. printf("Options:\n");
  199. printf(" -A - compression options apply to all files (normally excludes small files and all keys)\n");
  200. printf(" -B - use /mnt/mirror for replicate target\n");
  201. printf(" -C - compress files on target (including existing files)\n");
  202. printf(" -D - overwrite existing files if size/date mismatch\n");
  203. printf(" -E - set compression state of existing files\n");
  204. printf(" -F <file> - use option XML file\n");
  205. printf(" -I <ext> - ignore files that have specified extention\n");
  206. printf(" -M - ignore sub-second differences when comparing file dates\n");
  207. printf(" -N - Include files even if slave count does not match filename\n");
  208. printf(" -Q - quiet mode: only errors are reported\n");
  209. printf(" -V - verbose mode\n");
  210. printf(" -Y - report what would have been copied/compressed but do nothing\n");
  211. printf(" -S - snmp enabled\n");
  212. printf(" -X <dir> - read part lists (%%n.DAT) from and write %%n.ERR to specified dir\n");
  213. exit(2);
  214. }
  215. static bool different(IFile &target, IFile &source)
  216. {
  217. CDateTime tmt, smt;
  218. if (target.size() != source.size())
  219. return true;
  220. target.getTime(NULL, &tmt, NULL);
  221. if (inexactDateMatch)
  222. {
  223. unsigned hour, min, sec, nanosec;
  224. tmt.getTime(hour, min, sec, nanosec);
  225. tmt.setTime(hour, min, sec, 0);
  226. }
  227. source.getTime(NULL, &smt, NULL);
  228. if (inexactDateMatch)
  229. {
  230. unsigned hour, min, sec, nanosec;
  231. smt.getTime(hour, min, sec, nanosec);
  232. smt.setTime(hour, min, sec, 0);
  233. }
  234. return tmt.compare(smt) != 0;
  235. }
  236. static bool includeFile(IFile &file, unsigned numSlaves)
  237. {
  238. StringBuffer ext;
  239. splitFilename(file.queryFilename(), NULL, NULL, NULL, &ext);
  240. const char *_ext = ext.length()?ext.str()+1:"";
  241. if (ignoreExt->find(*_ext))
  242. return false;
  243. if (!numSlaves || noCheckSlaveCount)
  244. return true;
  245. const char *partcount = strstr(ext.str(), "_of_");
  246. if (partcount)
  247. {
  248. unsigned clusterSize = atoi(partcount+4);
  249. return clusterSize==numSlaves || clusterSize==numSlaves+1;
  250. }
  251. else
  252. return false;
  253. }
  254. static void CopyDirectory(const char *source, const char *target, unsigned numSlaves, bool compress, bool sourceIsMaster)
  255. {
  256. if (verbose)
  257. println("Copy directory %s to %s", source, target);
  258. bool first = true;
  259. Owned<IDirectoryIterator> dir = createDirectoryIterator(source, "*");
  260. ForEach (*dir)
  261. {
  262. IFile &sourceFile = dir->query();
  263. if (sourceFile.isFile())
  264. {
  265. if (includeFile(sourceFile, numSlaves))
  266. {
  267. StringBuffer targetname(target);
  268. targetname.append(PATHSEPCHAR);
  269. dir->getName(targetname);
  270. OwnedIFile destFile = createIFile(targetname.str());
  271. if ((destFile->size()==-1) || (sourceIsMaster && different(*destFile, sourceFile)))
  272. {
  273. if (first && !checkMode)
  274. {
  275. if (!recursiveCreateDirectory(target)) {
  276. throw MakeStringException(-1,"Cannot create directory %s",target);
  277. }
  278. first = false;
  279. }
  280. if (!CopySingleFile(&sourceFile, destFile, compress, true))
  281. printerr("File %s copy to %s failed", sourceFile.queryFilename(), destFile->queryFilename());
  282. }
  283. else if (verbose)
  284. {
  285. println("File %s already exists", destFile->queryFilename());
  286. }
  287. }
  288. else if (verbose)
  289. println("Skipping file %s (cluster size mismatch)", sourceFile.queryFilename());
  290. }
  291. else if (sourceFile.isDirectory())
  292. {
  293. StringBuffer newSource(source);
  294. StringBuffer newTarget(target);
  295. newSource.append(PATHSEPCHAR);
  296. newTarget.append(PATHSEPCHAR);
  297. dir->getName(newSource);
  298. dir->getName(newTarget);
  299. CopyDirectory(newSource.str(), newTarget.str(), numSlaves, compress, sourceIsMaster);
  300. }
  301. }
  302. if (verbose)
  303. println("Copied directory %s to %s", source, target);
  304. }
  305. static void CompressDirectory(const char *target, unsigned numSlaves, bool compress)
  306. {
  307. #ifdef _WIN32
  308. if (verbose)
  309. println("%s directory %s", compress ? "Compress" : "Decompress", target);
  310. Owned<IDirectoryIterator> dir = createDirectoryIterator(target, "*");
  311. ForEach (*dir)
  312. {
  313. IFile &targetFile = dir->query();
  314. if (targetFile.isFile())
  315. {
  316. if (includeFile(targetFile, numSlaves))
  317. {
  318. // Quick test to see if it's a key file.
  319. bool compressThis = compress && shouldCompressFile(targetFile.queryFilename());
  320. DWORD attr=::GetFileAttributes(targetFile.queryFilename());
  321. if (attr==-1)
  322. printerr("Could not read compression state of %s: error %x", targetFile.queryFilename(), ::GetLastError());
  323. else
  324. {
  325. bool compressed = (attr & FILE_ATTRIBUTE_COMPRESSED) != 0;
  326. if (compressed != compressThis)
  327. {
  328. if (!silent)
  329. {
  330. if (compressThis)
  331. println("Compress %s before %"I64F"d", targetFile.queryFilename(), targetFile.size());
  332. else
  333. println("Decompress %s before %"I64F"d", targetFile.queryFilename(), targetFile.compressedSize());
  334. }
  335. if (!checkMode)
  336. targetFile.setCompression(compressThis);
  337. if (!silent)
  338. {
  339. if (compressThis)
  340. {
  341. if (checkMode)
  342. println(""); // size after not known
  343. else
  344. println("after %"I64F"d", targetFile.compressedSize());
  345. }
  346. else
  347. println("after %"I64F"d", targetFile.size());
  348. }
  349. }
  350. }
  351. }
  352. }
  353. else if (targetFile.isDirectory())
  354. {
  355. StringBuffer newTarget(target);
  356. newTarget.append(PATHSEPCHAR);
  357. dir->getName(newTarget);
  358. CompressDirectory(newTarget.str(), numSlaves, compress);
  359. }
  360. }
  361. if (verbose)
  362. println("%s directory %s", compress ? "Compressed" : "Decompressed", target);
  363. #endif
  364. }
  365. #define MAX_SLAVES 1000
  366. static StringAttr slaveIP[MAX_SLAVES+1];
  367. static unsigned numSlaves;
  368. static void loadSlaves(const char *slavesName)
  369. {
  370. FILE *slavesFile = fopen(slavesName, "rt");
  371. if( !slavesFile)
  372. {
  373. printerr("failed to open slaves file %s", slavesName);
  374. throw MakeStringException(MSGAUD_operator, 0, "failed to open slaves file %s", slavesName);
  375. }
  376. char inbuf[1000];
  377. numSlaves = 0;
  378. while (fgets( inbuf, sizeof(inbuf), slavesFile))
  379. {
  380. char *hash = strchr(inbuf, '#');
  381. if (hash)
  382. *hash = 0;
  383. char *finger = inbuf;
  384. loop
  385. {
  386. while (isspace(*finger))
  387. finger++;
  388. char *start = finger;
  389. while (*finger && !isspace(*finger))
  390. finger++;
  391. if (finger > start)
  392. slaveIP[numSlaves ++].set(start, finger - start);
  393. else
  394. break;
  395. if (numSlaves > MAX_SLAVES)
  396. {
  397. printerr("Too many slaves - invalid slaves file %s?", slavesName);
  398. throw MakeStringException(MSGAUD_operator, 0, "Too many slaves - invalid slaves file %s?", slavesName);
  399. }
  400. }
  401. }
  402. fclose(slavesFile);
  403. slaveIP[numSlaves].set(slaveIP[0].get());
  404. }
  405. static void waitSlaves(const char *dir,unsigned num,StringAttr *slaves)
  406. {
  407. unsigned start=msTick();
  408. unsigned last=0;
  409. bool *done = (bool *)calloc(num,sizeof(bool));
  410. unsigned ndone = 0;
  411. unsigned errors = 0;
  412. StringBuffer name;
  413. while (ndone<num) {
  414. unsigned startndone = ndone;
  415. for (unsigned i=0;i<num;i++) {
  416. if (!done[i]) {
  417. addPathSepChar(name.clear().append(dir)).append(i+1).append(".ERR");
  418. if (checkFileExists(name.str())) {
  419. done[i] = true;
  420. ndone++;
  421. for (unsigned attempt=0;attempt<10;attempt++) {
  422. try {
  423. Owned<IFile> file = createIFile(name.str());
  424. Owned<IFileIO> fio = file->open(IFOread);
  425. if (fio) {
  426. size32_t sz = (size32_t)fio->size();
  427. if (sz) {
  428. StringBuffer s;
  429. fio->read(0,sz,s.reserve(sz));
  430. println("%s: %s",slaves[i].get(),s.str());
  431. errors++;
  432. }
  433. else {
  434. try {
  435. fio.clear();
  436. file->remove();
  437. }
  438. catch (IException *e) {
  439. StringBuffer msg("waitSlaves.1: ");
  440. e->errorMessage(msg);
  441. println("%s",msg.str());
  442. e->Release();
  443. }
  444. println("%s: DONE",slaves[i].get());
  445. }
  446. break;
  447. }
  448. }
  449. catch (IException *e) {
  450. if (attempt==9) {
  451. StringBuffer msg("waitSlaves.2: ");
  452. e->errorMessage(msg);
  453. println("%s",msg.str());
  454. }
  455. e->Release();
  456. }
  457. Sleep(5000);
  458. }
  459. }
  460. }
  461. }
  462. if (startndone==ndone) {
  463. Sleep(5000);
  464. }
  465. unsigned t = (msTick()-start)/(5*1000*60);
  466. if (t!=last) {
  467. last = t;
  468. println("Running: %d minutes taken, %d slave%s complete of %d",t*5,ndone,(ndone==1)?"":"s",num);
  469. if (num-ndone<10) {
  470. StringBuffer waiting;
  471. for (unsigned j=0;j<num;j++) {
  472. if (!done[j]) {
  473. if (waiting.length())
  474. waiting.append(',');
  475. waiting.append(slaves[j]);
  476. }
  477. }
  478. println("Waiting for %s",waiting.str());
  479. }
  480. }
  481. }
  482. unsigned t2 = (msTick()-start)/1000;
  483. println("Completed in %dm %ds with %d error%s",t2/60,t2%60,errors,(errors==1)?"":"s");
  484. free(done);
  485. }
  486. int main(int argc, const char *argv[])
  487. {
  488. #ifndef _WIN32
  489. InitModuleObjects();
  490. #endif
  491. int retValue = 0;
  492. bool compress = false;
  493. bool compressExisting = false;
  494. bool overwriteDifferent = false;
  495. bool thorMode = false;
  496. bool waitMode = false;
  497. bool forceSlaveIP = false;
  498. bool snmpEnabled = false;
  499. bool useMirrorMount = false;
  500. bool outputMode = false;
  501. StringAttr errdatdir;
  502. StringArray args;
  503. unsigned slaveNum = 0;
  504. unsigned argNo = 1;
  505. while ((int)argNo<argc)
  506. {
  507. const char *arg = argv[argNo++];
  508. if (arg[0]=='-')
  509. {
  510. while (arg)
  511. {
  512. switch (toupper(arg[1]))
  513. {
  514. case 'A':
  515. compressAll = true;
  516. break;
  517. case 'B':
  518. useMirrorMount = true;
  519. break;
  520. case 'C':
  521. compress = true;
  522. println("NOTE - executing in check mode. No files will compressed or copied");
  523. break;
  524. case 'D':
  525. overwriteDifferent = true;
  526. break;
  527. case 'E':
  528. compressExisting = true;
  529. break;
  530. case 'F':
  531. forceSlaveIP = true;
  532. break;
  533. case 'I':
  534. {
  535. if ((int)argNo<argc)
  536. ignoreExt->queryCreate(argv[argNo++]);
  537. break;
  538. }
  539. case 'M':
  540. inexactDateMatch = true;
  541. break;
  542. case 'N':
  543. noCheckSlaveCount = true;
  544. break;
  545. case 'O':
  546. outputMode = true;
  547. break;
  548. case 'Q':
  549. if (verbose)
  550. println("Silent and verbose specified - silent will be ignored");
  551. else
  552. silent = true;
  553. break;
  554. case 'S':
  555. snmpEnabled = true;
  556. break;
  557. case 'T':
  558. thorMode = true;
  559. break;
  560. case 'V':
  561. if (silent)
  562. {
  563. println("Silent and verbose specified - silent will be ignored");
  564. silent = false;
  565. }
  566. verbose = true;
  567. break;
  568. case 'W':
  569. waitMode = true;
  570. break;
  571. case 'X':
  572. if ((int)argNo<argc)
  573. errdatdir.set(argv[argNo++]);
  574. break;
  575. case 'Y':
  576. checkMode = true;
  577. break;
  578. default:
  579. usage();
  580. break;
  581. }
  582. if (arg[2]=='/' || arg[2]=='-')
  583. arg += 2;
  584. else if (arg[2])
  585. usage();
  586. else
  587. arg = NULL;
  588. }
  589. }
  590. else
  591. args.append(arg);
  592. }
  593. if (args.ordinality()<2)
  594. usage();
  595. StringBuffer erroutstr;
  596. try
  597. {
  598. if (thorMode)
  599. {
  600. if (args.ordinality()<4 || 0 == errdatdir.length())
  601. usage();
  602. slaveNum = atoi(args.item(0));
  603. numSlaves = atoi(args.item(1));
  604. const char *myIp = args.item(2);
  605. const char *backupIp = args.item(3);
  606. setDaliServixSocketCaching(true);
  607. if (!slaveNum || slaveNum>numSlaves)
  608. {
  609. printerr("'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
  610. throw MakeStringException(-1, "'%s' is not a valid slave number (range is 1 to %d)", args.item(1), numSlaves);
  611. }
  612. if (!forceSlaveIP)
  613. {
  614. IpAddress myip;
  615. GetHostIp(myip);
  616. IpAddress myipfromSlaves(myIp);
  617. if (!myip.ipequals(myipfromSlaves))
  618. {
  619. StringBuffer ips1, ips2;
  620. myipfromSlaves.getIpText(ips1);
  621. myip.getIpText(ips2);
  622. printerr("IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
  623. throw MakeStringException(-1, "IP address %d in slaves file %s does not match this machine %s", slaveNum, ips1.str(), ips2.str());
  624. }
  625. }
  626. StringBuffer datafile(errdatdir);
  627. addPathSepChar(datafile).append(slaveNum).append(".DAT");
  628. Owned<IFile> file = createIFile(datafile.str());
  629. Owned<IFileIO> fio;
  630. // add a slight stagger
  631. Sleep(slaveNum*200);
  632. for (unsigned attempt=0;attempt<10;attempt++) {
  633. try {
  634. fio.setown(file->open(IFOread));
  635. if (fio)
  636. break;
  637. }
  638. catch (IException *e) {
  639. if (attempt==9) {
  640. StringBuffer msg;
  641. e->errorMessage(msg);
  642. printerr("%s",msg.str());
  643. }
  644. e->Release();
  645. }
  646. Sleep(5000);
  647. }
  648. if (fio)
  649. applyPartsFile(fio,syncFile);
  650. else {
  651. printerr("Could not read file %s",datafile.str());
  652. throw MakeStringException(-1, "Could not read file %s",datafile.str());
  653. }
  654. }
  655. else if (waitMode) {
  656. loadSlaves(args.item(0));
  657. #ifndef _WIN32
  658. struct sigaction act; // ignore break (from parent)
  659. sigset_t blockset;
  660. sigemptyset(&blockset);
  661. act.sa_mask = blockset;
  662. act.sa_handler = SIG_IGN;
  663. act.sa_flags = 0;
  664. sigaction(SIGINT, &act, NULL);
  665. #endif
  666. waitSlaves(args.item(1),numSlaves,slaveIP);
  667. }
  668. else if (outputMode) {
  669. if (args.ordinality()<3)
  670. usage();
  671. else {
  672. if (!silent)
  673. println("Creating part lists, please wait...");
  674. StringBuffer errstr;
  675. if (!outputPartsFiles(args.item(0),args.item(1),args.item(2),errstr,verbose))
  676. throw MakeStringExceptionDirect(-1, errstr.str());
  677. }
  678. }
  679. else
  680. {
  681. const char *source = args.item(0);
  682. const char *target = args.item(1);
  683. if (compressExisting)
  684. CompressDirectory(target, 0, compress);
  685. CopyDirectory(source, target, 0, compress, overwriteDifferent);
  686. }
  687. if (checkMode)
  688. println("NOTE - executing in check mode. No files were compressed or copied");
  689. if(!silent)
  690. println("backupnode finished");
  691. }
  692. catch(IException *E)
  693. {
  694. E->errorMessage(erroutstr);
  695. printerr("%s",erroutstr.str());
  696. E->Release();
  697. retValue = 2;
  698. }
  699. if (errdatdir.length()&&slaveNum) {
  700. StringBuffer errfilename(errdatdir);
  701. addPathSepChar(errfilename).append(slaveNum).append(".ERR");
  702. Owned<IFile> file = createIFile(errfilename.str());
  703. for (unsigned attempt=0;attempt<10;attempt++) {
  704. try {
  705. Owned<IFileIO> fio = file->open(IFOcreate);
  706. if (fio) {
  707. if (erroutstr.length()) {
  708. if (erroutstr.charAt(erroutstr.length()-1)!='\n')
  709. erroutstr.append('\n');
  710. fio->write(0,erroutstr.length(),erroutstr.str());
  711. }
  712. releaseAtoms();
  713. return retValue;
  714. }
  715. }
  716. catch (IException *e) {
  717. if (attempt==9) {
  718. StringBuffer msg;
  719. e->errorMessage(msg);
  720. printerr("%s",msg.str());
  721. }
  722. e->Release();
  723. }
  724. Sleep(5000);
  725. }
  726. printerr("Could not write to %s",errfilename.str());
  727. }
  728. releaseAtoms();
  729. return retValue;
  730. }