daftdir.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "rmtspawn.hpp"
  15. #include "fterror.hpp"
  16. #include "daft.hpp"
  17. #include "dadfs.hpp"
  18. #include "daftcfg.hpp"
  19. #include "daftdir.ipp"
  20. #include "daftmc.hpp"
  21. #include "dalienv.hpp"
  22. #ifdef __linux__
  23. #include <glob.h>
  24. #endif
  25. #ifdef _WIN32
  26. #define DEFAULT_DRIVE "c:" // What about solaris machines.
  27. #else
  28. #define DEFAULT_DRIVE ""
  29. #endif
  30. #define DEFAULT_ROOT_PATH PATHSEPSTR // Should this be the thor output directory
  31. #define WILDCARD_ALL "*"
  32. //Use hash defines for properties so I can't mis-spell them....
  33. #define ANcrc "@crc"
  34. #define ANtime "@time"
  35. #define ANrecurse "@recurse"
  36. //---------------------------------------------------------------------------
  37. #ifdef _WIN32
  38. void FILETIMEtoIDateTime(CDateTime * target, const FILETIME & ft)
  39. {
  40. if (target)
  41. {
  42. SYSTEMTIME systime;
  43. FileTimeToSystemTime(&ft, &systime);
  44. target->setDate(systime.wYear, systime.wMonth, systime.wDay);
  45. target->setTime(systime.wHour, systime.wMinute, systime.wSecond, systime.wMilliseconds*1000000);
  46. }
  47. }
  48. void setTimestamp(IPropertyTree * entry, const char * attr, const FILETIME & ft)
  49. {
  50. CDateTime time;
  51. StringBuffer timeText;
  52. FILETIMEtoIDateTime(&time, ft);
  53. time.getString(timeText.clear());
  54. entry->setProp(attr, timeText.str());
  55. }
  56. #endif
  57. //---------------------------------------------------------------------------
  58. DirectoryBuilder::DirectoryBuilder(ISocket * _masterSocket, IPropertyTree * options)
  59. {
  60. masterSocket = _masterSocket;
  61. calcCRC = false;
  62. recurse = false;
  63. addTimes = true;
  64. if (options)
  65. {
  66. addTimes = options->getPropBool(ANtime, addTimes);
  67. calcCRC = options->getPropBool(ANcrc);
  68. recurse = options->getPropBool(ANrecurse);
  69. }
  70. includeEmptyDirectory = false;
  71. }
  72. void DirectoryBuilder::rootDirectory(const char * directory, INode * node, IPropertyTree * result)
  73. {
  74. OwnedIFile dir = createIFile(directory);
  75. StringBuffer path;
  76. const char * tag = "directory";
  77. if (dir->isDirectory() == foundYes)
  78. {
  79. implicitWildcard = true;
  80. includeEmptyDirectory = true;
  81. path.append(directory);
  82. wildcard.set(WILDCARD_ALL);
  83. }
  84. else
  85. {
  86. implicitWildcard = false;
  87. includeEmptyDirectory = false;
  88. StringBuffer wild;
  89. StringBuffer drive;
  90. splitFilename(directory, &drive, &path, &wild, &wild);
  91. wildcard.set(wild.str());
  92. if (!drive.length())
  93. drive.append(DEFAULT_DRIVE);
  94. if (!path.length())
  95. path.append(DEFAULT_ROOT_PATH);
  96. path.insert(0, drive.str());
  97. }
  98. IPropertyTree * dirTree = result->addPropTree(tag, createPTree(ipt_caseInsensitive));
  99. dirTree->setProp("@name", path.str());
  100. if (addTimes)
  101. {
  102. #ifdef _WIN32
  103. WIN32_FILE_ATTRIBUTE_DATA info;
  104. if (GetFileAttributesEx(path.str(), GetFileExInfoStandard, &info))
  105. {
  106. setTimestamp(dirTree, "@created", info.ftCreationTime);
  107. setTimestamp(dirTree, "@modified", info.ftLastWriteTime);
  108. setTimestamp(dirTree, "@accessed", info.ftLastAccessTime);
  109. }
  110. #else
  111. OwnedIFile file = createIFile(path.str());
  112. if(file->exists())
  113. {
  114. CDateTime ctime, mtime, atime;
  115. file->getTime(&ctime, &mtime, &atime);
  116. StringBuffer ctimestr, mtimestr, atimestr;
  117. ctime.getString(ctimestr);
  118. mtime.getString(mtimestr);
  119. atime.getString(atimestr);
  120. dirTree->setProp("@created", ctimestr.str());
  121. dirTree->setProp("@modified", mtimestr.str());
  122. dirTree->setProp("@accessed", atimestr.str());
  123. }
  124. #endif
  125. }
  126. walkDirectory("", dirTree);
  127. }
  128. bool DirectoryBuilder::walkDirectory(const char * path, IPropertyTree * directory)
  129. {
  130. StringBuffer fullname, search;
  131. directory->getProp("@name", fullname.append(path));
  132. if (fullname.length() && fullname.charAt(fullname.length()-1) != PATHSEPCHAR)
  133. fullname.append(PATHSEPCHAR);
  134. search.append(fullname).append(wildcard);
  135. IArray pending;
  136. bool empty = true;
  137. checkForRemoteAbort(masterSocket);
  138. #ifdef _WIN32
  139. WIN32_FIND_DATA info;
  140. HANDLE handle = FindFirstFile(search.str(), &info);
  141. if (handle != INVALID_HANDLE_VALUE)
  142. {
  143. do
  144. {
  145. if (strcmp(info.cFileName, ".") == 0 || strcmp(info.cFileName, "..") == 0)
  146. continue;
  147. const char * tag = (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) ? "directory" : "file";
  148. IPropertyTree * entry = NULL;
  149. if (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)
  150. {
  151. if (implicitWildcard && !recurse)
  152. entry = directory->addPropTree(tag, createPTree(ipt_caseInsensitive));
  153. }
  154. else
  155. {
  156. entry = directory->addPropTree(tag, createPTree(ipt_caseInsensitive));
  157. entry->setPropInt64("@size", ((unsigned __int64)info.nFileSizeHigh) << 32 | info.nFileSizeLow);
  158. if (calcCRC)
  159. {
  160. StringBuffer filename;
  161. filename.append(fullname).append(info.cFileName);
  162. try
  163. {
  164. OwnedIFile file = createIFile(filename.str());
  165. OwnedIFileIO io = file->open(IFOread);
  166. if (io)
  167. {
  168. OwnedIFileIOStream stream = createIOStream(io);
  169. CrcIOStream crcstream(stream, ~0);
  170. char buffer[32768];
  171. while (crcstream.read(sizeof(buffer), buffer))
  172. { }
  173. entry->setPropInt("@crc", ~crcstream.getCRC());
  174. }
  175. }
  176. catch (IException * e)
  177. {
  178. FLLOG(MCexception(e)(1000), unknownJob, e, "Trying to calculate CRC");
  179. e->Release();
  180. }
  181. }
  182. }
  183. if (entry)
  184. {
  185. entry->setProp("@name", info.cFileName);
  186. if (addTimes)
  187. {
  188. setTimestamp(entry, "@created", info.ftCreationTime);
  189. setTimestamp(entry, "@modified", info.ftLastWriteTime);
  190. setTimestamp(entry, "@accessed", info.ftLastAccessTime);
  191. }
  192. empty = false;
  193. }
  194. } while (FindNextFile(handle, &info));
  195. FindClose(handle);
  196. }
  197. if (recurse)
  198. {
  199. search.clear().append(fullname).append(WILDCARD_ALL);
  200. HANDLE handle = FindFirstFile(search.str(), &info);
  201. if (handle != INVALID_HANDLE_VALUE)
  202. {
  203. StringBuffer prev;
  204. do
  205. {
  206. if (strcmp(info.cFileName, ".") == 0 || strcmp(info.cFileName, "..") == 0)
  207. continue;
  208. if (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)
  209. {
  210. prev.clear().append("directory[@name=\"").append(info.cFileName).append("\"]");
  211. if (directory->hasProp(prev.str()))
  212. continue;
  213. IPropertyTree * entry = directory->addPropTree("directory", createPTree(ipt_caseInsensitive));
  214. entry->setProp("@name", info.cFileName);
  215. pending.append(*LINK(entry));
  216. if (addTimes)
  217. {
  218. setTimestamp(entry, "@created", info.ftCreationTime);
  219. setTimestamp(entry, "@modified", info.ftLastWriteTime);
  220. setTimestamp(entry, "@accessed", info.ftLastAccessTime);
  221. }
  222. }
  223. } while (FindNextFile(handle, &info));
  224. FindClose(handle);
  225. }
  226. }
  227. ForEachItemIn(idx, pending)
  228. {
  229. IPropertyTree & cur = (IPropertyTree &)pending.item(idx);
  230. if (walkDirectory(fullname.str(), &cur))
  231. empty = false;
  232. else if (!includeEmptyDirectory)
  233. directory->removeTree(&cur);
  234. }
  235. #elif defined(__linux__)
  236. glob_t glbuf;
  237. int glret = glob(search.str(), 0, NULL, &glbuf);
  238. if(glret != 0)
  239. {
  240. switch (glret)
  241. {
  242. case GLOB_NOSPACE:
  243. IERRLOG("glob error for %s : running out of memory space", search.str());
  244. break;
  245. case GLOB_ABORTED:
  246. IERRLOG("glob error for %s : read error", search.str());
  247. break;
  248. case GLOB_NOMATCH:
  249. IERRLOG("no match found for %s", search.str());
  250. break;
  251. default:
  252. IERRLOG("glob error for %s : %s", search.str(), strerror(errno));
  253. }
  254. }
  255. else
  256. {
  257. for(int fno = 0; fno < glbuf.gl_pathc; fno++)
  258. {
  259. const char* curfname = glbuf.gl_pathv[fno];
  260. if (strcmp(curfname, ".") == 0 || strcmp(curfname, "..") == 0)
  261. continue;
  262. OwnedIFile file = createIFile(curfname);
  263. const char * tag = (file->isDirectory()==foundYes) ? "directory" : "file";
  264. IPropertyTree * entry = NULL;
  265. if (file->isDirectory()==foundYes)
  266. {
  267. if (implicitWildcard && !recurse)
  268. entry = directory->addPropTree(tag, createPTree(ipt_caseInsensitive));
  269. else if(recurse)
  270. {
  271. StringBuffer prev;
  272. prev.append("directory[@name=\"").append(curfname).append("\"]");
  273. if (directory->hasProp(prev.str()))
  274. continue;
  275. entry = directory->addPropTree("directory", createPTree(ipt_caseInsensitive));
  276. pending.append(*LINK(entry));
  277. }
  278. }
  279. else
  280. {
  281. entry = directory->addPropTree(tag, createPTree(ipt_caseInsensitive));
  282. entry->setPropInt64("@size", file->size());
  283. if (calcCRC)
  284. {
  285. try
  286. {
  287. OwnedIFileIO io = file->open(IFOread);
  288. if (io)
  289. {
  290. OwnedIFileIOStream stream = createIOStream(io);
  291. CrcIOStream crcstream(stream, ~0);
  292. char buffer[32768];
  293. while (crcstream.read(sizeof(buffer), buffer))
  294. { }
  295. entry->setPropInt("@crc", ~crcstream.getCRC());
  296. }
  297. }
  298. catch (IException * e)
  299. {
  300. FLLOG(MCexception(e)(1000), unknownJob, e, "Trying to calculate CRC");
  301. e->Release();
  302. }
  303. }
  304. }
  305. if (entry)
  306. {
  307. entry->setProp("@name", curfname);
  308. if (addTimes)
  309. {
  310. CDateTime ctime, mtime, atime;
  311. file->getTime(&ctime, &mtime, &atime);
  312. StringBuffer ctimestr, mtimestr, atimestr;
  313. ctime.getString(ctimestr);
  314. mtime.getString(mtimestr);
  315. atime.getString(atimestr);
  316. entry->setProp("@created", ctimestr.str());
  317. entry->setProp("@modified", mtimestr.str());
  318. entry->setProp("@accessed", atimestr.str());
  319. }
  320. empty = false;
  321. }
  322. }
  323. }
  324. ForEachItemIn(idx, pending)
  325. {
  326. IPropertyTree & cur = (IPropertyTree &)pending.item(idx);
  327. if (walkDirectory("", &cur))
  328. empty = false;
  329. else if (!includeEmptyDirectory)
  330. directory->removeTree(&cur);
  331. }
  332. #endif
  333. return !empty;
  334. }
  335. //---------------------------------------------------------------------------
  336. bool processDirCommand(ISocket * masterSocket, MemoryBuffer & cmd, MemoryBuffer & result)
  337. {
  338. StringAttr directory; cmd.read(directory);
  339. Owned<IPropertyTree> options = createPTree(cmd);
  340. Owned<INode> node = deserializeINode(cmd);
  341. DirectoryBuilder builder(masterSocket, options);
  342. StringBuffer url;
  343. Owned<IPropertyTree> dirTree = createPTree("machine", ipt_caseInsensitive);
  344. node->endpoint().getIpText(url.clear());
  345. dirTree->setProp("@ip", url.str());
  346. StringAttr nextDir;
  347. const char * cur = directory;
  348. for (;;)
  349. {
  350. const char * sep = strchr(cur, ';');
  351. if (sep)
  352. nextDir.set(cur, sep-cur);
  353. else
  354. nextDir.set(cur);
  355. LOG(MCdebugProgress, unknownJob, "Process Directory Command: %s", nextDir.get());
  356. builder.rootDirectory(nextDir, node, dirTree);
  357. if (!sep)
  358. break;
  359. cur = sep+1;
  360. }
  361. result.clear();
  362. dirTree->serialize(result);
  363. return true;
  364. }
  365. //---------------------------------------------------------------------------
  366. DirectoryThread::DirectoryThread(IRunningSlaveObserver & _observer, const char * _directory, INode * _node, IPropertyTree * _options) : Thread("directoryThread"), observer(_observer)
  367. {
  368. directory = _directory;
  369. node.set(_node);
  370. options.set(_options);
  371. sem = NULL;
  372. ok = false;
  373. job = unknownJob;
  374. }
  375. void DirectoryThread::go(Semaphore & _sem)
  376. {
  377. sem = &_sem;
  378. #ifdef RUN_SLAVES_ON_THREADS
  379. start();
  380. #else
  381. commandAndSignal();
  382. #endif
  383. }
  384. bool DirectoryThread::performCommand()
  385. {
  386. bool ok = false;
  387. StringBuffer url;
  388. node->endpoint().getUrlStr(url);
  389. if (!canSpawnChildProcess(node->endpoint()))
  390. throwError(DFTERR_NoSolarisDir);
  391. LOG(MCdebugProgressDetail, job, "Starting to generate part %s [%p]", url.str(), this);
  392. StringBuffer tmp;
  393. Owned<ISocket> socket = spawnRemoteChild(SPAWNdfu, queryFtSlaveExecutable(node->endpoint(), tmp), node->endpoint(), DAFT_VERSION, queryFtSlaveLogDir(), NULL, NULL);
  394. if (socket)
  395. {
  396. observer.addSlave(socket);
  397. MemoryBuffer msg;
  398. msg.setEndian(__BIG_ENDIAN);
  399. //Send message and wait for response...
  400. //MORE: they should probably all be sent on different threads....
  401. msg.append((byte)FTactiondirectory);
  402. msg.append(directory);
  403. options->serialize(msg);
  404. node->serialize(msg);
  405. if (!catchWriteBuffer(socket, msg))
  406. throwError1(RFSERR_TimeoutWaitConnect, url.str());
  407. bool done;
  408. for (;;)
  409. {
  410. msg.clear();
  411. if (!catchReadBuffer(socket, msg, FTTIME_DIRECTORY))
  412. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  413. msg.setEndian(__BIG_ENDIAN);
  414. msg.read(done);
  415. if (done)
  416. break;
  417. assertex(!"Progress not supported yet...");
  418. if (isAborting())
  419. {
  420. msg.clear().append(isAborting());
  421. if (!catchWriteBuffer(socket, msg))
  422. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  423. }
  424. }
  425. msg.read(ok);
  426. error.setown(deserializeException(msg));
  427. if (ok)
  428. resultTree.setown(createPTree(msg));
  429. msg.clear().append(true);
  430. catchWriteBuffer(socket, msg); // if it fails then can't do anything about it...
  431. observer.removeSlave(socket);
  432. }
  433. else
  434. {
  435. throwError1(DFTERR_FailedStartSlave, url.str());
  436. }
  437. LOG(MCdebugProgressDetail, job, "Completed generating part %s [%p]", url.str(), this);
  438. return ok;
  439. }
  440. bool DirectoryThread::commandAndSignal()
  441. {
  442. ok = false;
  443. try
  444. {
  445. ok = performCommand();
  446. }
  447. catch (IException * e)
  448. {
  449. PrintExceptionLog(e, "Gathering directory");
  450. error.setown(e);
  451. }
  452. sem->signal();
  453. return ok;
  454. }
  455. int DirectoryThread::run()
  456. {
  457. commandAndSignal();
  458. return 0;
  459. }
  460. //---------------------------------------------------------------------------
  461. #if 0
  462. void doDirectoryCommand(const char * directory, IGroup * machines, IPropertyTree * options, IPropertyTree * result)
  463. {
  464. DirectoryBuilder builder(options);
  465. StringBuffer url;
  466. unsigned max = machines->ordinality();
  467. for (unsigned idx=0; idx < max; idx++)
  468. {
  469. INode & node = machines->queryNode(idx);
  470. node.endpoint().getIpText(url.clear());
  471. IPropertyTree * machine = createPTree("machine", ipt_caseInsensitive);
  472. machine->setProp("@ip", url.str());
  473. result->addPropTree("machine", machine);
  474. builder.rootDirectory(directory, &node, machine);
  475. }
  476. }
  477. #endif
  478. class BroadcastAbortHandler : public CInterface, implements IAbortHandler, implements IRunningSlaveObserver
  479. {
  480. public:
  481. IMPLEMENT_IINTERFACE
  482. void addSlave(ISocket * node);
  483. void removeSlave(ISocket * node);
  484. void abort();
  485. virtual bool onAbort();
  486. protected:
  487. IArrayOf<ISocket> sockets;
  488. CriticalSection crit;
  489. };
  490. void BroadcastAbortHandler::abort()
  491. {
  492. CriticalBlock proceduce(crit);
  493. //MORE: Implement mode efficiently;
  494. ForEachItemIn(i, sockets)
  495. {
  496. MemoryBuffer msg;
  497. msg.append(true);
  498. catchWriteBuffer(&sockets.item(i), msg); // async?
  499. }
  500. }
  501. bool BroadcastAbortHandler::onAbort()
  502. {
  503. if (isAborting())
  504. abort();
  505. return false;
  506. }
  507. void BroadcastAbortHandler::addSlave(ISocket * socket)
  508. {
  509. CriticalBlock procedure(crit);
  510. sockets.append(*LINK(socket));
  511. }
  512. void BroadcastAbortHandler::removeSlave(ISocket * socket)
  513. {
  514. CriticalBlock procedure(crit);
  515. sockets.zap(*socket);
  516. }
  517. void doDirectory(const char * directory, IGroup * machines, IPropertyTree * options, IPropertyTree * result)
  518. {
  519. LocalAbortHandler localHandler(daftAbortHandler);
  520. BroadcastAbortHandler broadcaster;
  521. LocalIAbortHandler localHandler2(broadcaster);
  522. StringBuffer url;
  523. CIArrayOf<DirectoryThread> threads;
  524. unsigned max = machines->ordinality();
  525. unsigned idx;
  526. for (idx=0; idx < max; idx++)
  527. {
  528. INode & node = machines->queryNode(idx);
  529. DirectoryThread & cur = * new DirectoryThread(broadcaster, directory, &node, options);
  530. threads.append(cur);
  531. }
  532. Semaphore sem;
  533. for (idx=0; idx < max; idx++)
  534. threads.item(idx).go(sem);
  535. for (idx=0; idx < max; idx++)
  536. sem.wait();
  537. for (idx=0; idx < max; idx++)
  538. {
  539. DirectoryThread & cur = threads.item(idx);
  540. if (cur.error)
  541. throw cur.error.getLink();
  542. result->addPropTree("machine", threads.item(idx).getTree());
  543. }
  544. }
  545. //---------------------------------------------------------------------------
  546. DirectoryCopier::DirectoryCopier(ISocket * _masterSocket, MemoryBuffer & in)
  547. {
  548. masterSocket = _masterSocket;
  549. source.setown(createPTree(in));
  550. target.deserialize(in);
  551. options.setown(createPTree(in));
  552. initOptions();
  553. }
  554. DirectoryCopier::DirectoryCopier(ISocket * _masterSocket, IPropertyTree * _source, RemoteFilename & _target, IPropertyTree * _options)
  555. {
  556. masterSocket = _masterSocket;
  557. source.set(_source);
  558. target.set(_target);
  559. options.set(_options);
  560. initOptions();
  561. }
  562. void DirectoryCopier::initOptions()
  563. {
  564. onlyCopyMissing = options->getPropBool("@copyMissing", false);
  565. onlyCopyExisting = options->getPropBool("@copyExisting", false);
  566. preserveTimes = options->getPropBool("@preserveTimes", false);
  567. preserveIfNewer = options->getPropBool("@preserveIfNewer", false);
  568. verbose = options->getPropBool("@verbose", false);
  569. }
  570. void DirectoryCopier::copy()
  571. {
  572. IPropertyTree * machine = source->queryPropTree("machine");
  573. IPropertyTree * rootDirectory = machine->queryPropTree("directory");
  574. RemoteFilename sourceName;
  575. StringBuffer sourcePath;
  576. StringBuffer targetPath;
  577. SocketEndpoint ip(machine->queryProp("@ip"));
  578. sourceName.setPath(ip, rootDirectory->queryProp("@name"));
  579. sourceName.getRemotePath(sourcePath);
  580. target.getLocalPath(targetPath);
  581. recursiveCopy(rootDirectory, sourcePath.str(), targetPath.str());
  582. }
  583. void DirectoryCopier::recursiveCopy(IPropertyTree * level, const char * sourcePath, const char * targetPath)
  584. {
  585. if (masterSocket)
  586. checkForRemoteAbort(masterSocket);
  587. Owned<IFile> dir = createIFile(targetPath);
  588. dir->createDirectory();
  589. StringBuffer source, target;
  590. Owned<IPropertyTreeIterator> iter = level->getElements("file");
  591. ForEach(*iter)
  592. {
  593. const char * filename = iter->query().queryProp("@name");
  594. source.clear().append(sourcePath).append(PATHSEPCHAR).append(filename);
  595. target.clear().append(targetPath).append(PATHSEPCHAR).append(filename);
  596. bool doCopy = true;
  597. OwnedIFile sourceFile = createIFile(source.str());
  598. OwnedIFile targetFile = createIFile(target.str());
  599. if (onlyCopyExisting || onlyCopyMissing)
  600. {
  601. fileBool exists = targetFile->isFile();
  602. if (onlyCopyExisting && (exists != foundYes))
  603. doCopy = false;
  604. if (onlyCopyMissing && (exists != notFound))
  605. doCopy = false;
  606. }
  607. if (doCopy && preserveIfNewer)
  608. {
  609. if (targetFile->isFile() == foundYes)
  610. {
  611. CDateTime modifiedSource, modifiedTarget;
  612. sourceFile->getTime(NULL, &modifiedSource, NULL);
  613. targetFile->getTime(NULL, &modifiedTarget, NULL);
  614. if (modifiedSource.compare(modifiedTarget) <= 0)
  615. doCopy = false;
  616. }
  617. }
  618. if (doCopy)
  619. {
  620. if (verbose)
  621. {
  622. MemoryBuffer msg;
  623. msg.setEndian(__BIG_ENDIAN);
  624. msg.append(false);
  625. msg.append(source.str());
  626. writeBuffer(masterSocket, msg);
  627. }
  628. copyFile(targetFile, sourceFile);
  629. if (preserveTimes)
  630. {
  631. CDateTime created, modified, accessed;
  632. sourceFile->getTime(&created, &modified, &accessed);
  633. targetFile->setTime(&created, &modified, &accessed);
  634. }
  635. }
  636. }
  637. iter.setown(level->getElements("directory"));
  638. ForEach(*iter)
  639. {
  640. IPropertyTree * directory = &iter->query();
  641. const char * filename = directory->queryProp("@name");
  642. source.clear().append(sourcePath).append(PATHSEPCHAR).append(filename);
  643. target.clear().append(targetPath).append(PATHSEPCHAR).append(filename);
  644. recursiveCopy(directory, source.str(), target.str());
  645. }
  646. }
  647. //---------------------------------------------------------------------------
  648. bool processPhysicalCopyCommand(ISocket * masterSocket, MemoryBuffer & cmd, MemoryBuffer & result)
  649. {
  650. LOG(MCdebugProgress, unknownJob, "Process Physical Copy Command");
  651. DirectoryCopier copier(masterSocket, cmd);
  652. copier.copy();
  653. result.clear();
  654. return true;
  655. }
  656. //---------------------------------------------------------------------------
  657. void doPhysicalCopy(IPropertyTree * source, const char * target, IPropertyTree * _options, IDaftCopyProgress * progress)
  658. {
  659. LocalAbortHandler localHandler(daftAbortHandler);
  660. BroadcastAbortHandler broadcaster;
  661. LocalIAbortHandler localHandler2(broadcaster);
  662. Linked<IPropertyTree> options = _options;
  663. if (!options)
  664. options.setown(createPTree("options", ipt_caseInsensitive));
  665. if (progress)
  666. options->setPropBool("@verbose", true);
  667. #if 0
  668. //Enable for debugging locally
  669. RemoteFilename xtargetName;
  670. xtargetName.setRemotePath(target);
  671. DirectoryCopier copier(NULL, source, xtargetName, options);
  672. copier.copy();
  673. return;
  674. #endif
  675. SocketEndpoint sourceMachine(source->queryProp("machine/@ip"));
  676. RemoteFilename targetName;
  677. Owned<IException> error;
  678. targetName.setRemotePath(target);
  679. const IpAddress & targetIP = targetName.queryIP();
  680. if (!canSpawnChildProcess(targetIP))
  681. throwError(DFTERR_NoSolarisCopy);
  682. bool ok = false;
  683. StringBuffer url;
  684. targetIP.getIpText(url);
  685. LOG(MCdebugProgressDetail, unknownJob, "Starting to generate part %s", url.str());
  686. StringBuffer tmp;
  687. Owned<ISocket> socket = spawnRemoteChild(SPAWNdfu, queryFtSlaveExecutable(targetIP, tmp), targetName.queryEndpoint(), DAFT_VERSION, queryFtSlaveLogDir(), NULL);
  688. if (socket)
  689. {
  690. broadcaster.addSlave(socket);
  691. MemoryBuffer msg;
  692. msg.setEndian(__BIG_ENDIAN);
  693. //Send message and wait for response...
  694. //MORE: they should probably all be sent on different threads....
  695. msg.append((byte)FTactionpcopy);
  696. source->serialize(msg);
  697. targetName.serialize(msg);
  698. options->serialize(msg);
  699. if (!catchWriteBuffer(socket, msg))
  700. throwError1(RFSERR_TimeoutWaitConnect, url.str());
  701. bool done;
  702. for (;;)
  703. {
  704. msg.clear();
  705. if (!catchReadBuffer(socket, msg, FTTIME_DIRECTORY))
  706. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  707. msg.setEndian(__BIG_ENDIAN);
  708. msg.read(done);
  709. if (done)
  710. break;
  711. StringAttr displayText;
  712. msg.read(displayText);
  713. if (progress)
  714. progress->onProgress(displayText);
  715. else
  716. LOG(MCoperatorProgress, unknownJob, "Copy file %s", displayText.get());
  717. if (isAborting())
  718. {
  719. msg.clear().append(isAborting());
  720. if (!catchWriteBuffer(socket, msg))
  721. throwError1(RFSERR_TimeoutWaitSlave, url.str());
  722. }
  723. }
  724. msg.read(ok);
  725. error.setown(deserializeException(msg));
  726. msg.clear().append(true);
  727. catchWriteBuffer(socket, msg);
  728. broadcaster.removeSlave(socket);
  729. }
  730. else
  731. {
  732. throwError1(DFTERR_FailedStartSlave, url.str());
  733. }
  734. LOG(MCdebugProgressDetail, unknownJob, "Completed generating part %s", url.str());
  735. if (error)
  736. throw error.getClear();
  737. }