ccdfile.cpp 86 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "ccd.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdquery.hpp"
  26. #include "ccdstate.hpp"
  27. #include "ccdsnmp.hpp"
  28. #include "rmtfile.hpp"
  29. #include "ccdqueue.ipp"
  30. #ifdef __linux__
  31. #include <sys/mman.h>
  32. #endif
  33. atomic_t numFilesOpen[2];
  34. #define MAX_READ_RETRIES 2
  35. #ifdef _DEBUG
  36. //#define FAIL_20_READ
  37. //#define FAIL_20_OPEN
  38. #endif
  39. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  40. class NotYetOpenException : public CInterface, implements IException
  41. {
  42. public:
  43. IMPLEMENT_IINTERFACE;
  44. virtual int errorCode() const { return 0; }
  45. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  46. virtual MessageAudience errorAudience() const { return MSGAUD_internal; }
  47. };
  48. class CFailingFileIO : public CInterface, implements IFileIO
  49. {
  50. #define THROWNOTOPEN throw new NotYetOpenException()
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  54. virtual offset_t size() { THROWNOTOPEN; }
  55. virtual void flush() { THROWNOTOPEN; }
  56. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  57. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  58. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  59. virtual void close() { }
  60. } failure;
  61. class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelayedFile
  62. {
  63. protected:
  64. IArrayOf<IFile> sources;
  65. Owned<IFile> logical;
  66. unsigned currentIdx;
  67. Owned<IFileIO> current;
  68. Owned<IMemoryMappedFile> mmapped;
  69. mutable CriticalSection crit;
  70. bool remote;
  71. offset_t fileSize;
  72. CDateTime fileDate;
  73. unsigned crc;
  74. unsigned lastAccess;
  75. bool copying;
  76. bool isCompressed;
  77. const IRoxieFileCache *cached;
  78. #ifdef FAIL_20_READ
  79. unsigned readCount;
  80. #endif
  81. public:
  82. IMPLEMENT_IINTERFACE;
  83. CLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, unsigned _crc, bool _isCompressed)
  84. : logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed)
  85. {
  86. fileDate.set(_date);
  87. currentIdx = 0;
  88. current.set(&failure);
  89. remote = false;
  90. #ifdef FAIL_20_READ
  91. readCount = 0;
  92. #endif
  93. lastAccess = msTick();
  94. copying = false;
  95. cached = NULL;
  96. }
  97. ~CLazyFileIO()
  98. {
  99. setFailure(); // ensures the open file count properly maintained
  100. }
  101. virtual void beforeDispose()
  102. {
  103. if (cached)
  104. cached->removeCache(this);
  105. }
  106. void setCache(const IRoxieFileCache *cache)
  107. {
  108. assertex(!cached);
  109. cached = cache;
  110. }
  111. void removeCache(const IRoxieFileCache *cache)
  112. {
  113. assertex(cached==cache);
  114. cached = NULL;
  115. }
  116. inline void setRemote(bool _remote) { remote = _remote; }
  117. virtual void setCopying(bool _copying)
  118. {
  119. CriticalBlock b(crit);
  120. copying = _copying;
  121. }
  122. virtual bool isCopying() const
  123. {
  124. CriticalBlock b(crit);
  125. return copying;
  126. }
  127. virtual bool isOpen() const
  128. {
  129. CriticalBlock b(crit);
  130. return current.get() != &failure;
  131. }
  132. virtual unsigned getLastAccessed() const
  133. {
  134. CriticalBlock b(crit);
  135. return lastAccess;
  136. }
  137. virtual void close()
  138. {
  139. CriticalBlock b(crit);
  140. setFailure();
  141. }
  142. virtual bool isRemote()
  143. {
  144. CriticalBlock b(crit);
  145. return remote;
  146. }
  147. void setFailure()
  148. {
  149. try
  150. {
  151. if (current.get()!=&failure)
  152. atomic_dec(&numFilesOpen[remote]);
  153. current.set(&failure);
  154. }
  155. catch (IException *E)
  156. {
  157. if (traceLevel > 5)
  158. {
  159. StringBuffer s;
  160. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  161. }
  162. E->Release();
  163. }
  164. }
  165. void checkOpen()
  166. {
  167. CriticalBlock b(crit);
  168. _checkOpen();
  169. }
  170. void _checkOpen()
  171. {
  172. if (current.get() == &failure)
  173. {
  174. StringBuffer filesTried;
  175. unsigned tries = 0;
  176. bool firstTime = true; // first time try the "fast / cache" way - if that fails - try original away - if that still fails, error
  177. RoxieFileStatus fileStatus = FileNotFound;
  178. loop
  179. {
  180. if (currentIdx >= sources.length())
  181. currentIdx = 0;
  182. if (tries==sources.length())
  183. {
  184. if (firstTime) // if first time - reset and try again - non cache way
  185. {
  186. firstTime = false;
  187. tries = 0;
  188. }
  189. else
  190. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  191. }
  192. const char *sourceName = sources.item(currentIdx).queryFilename();
  193. if (traceLevel > 10)
  194. DBGLOG("Trying to open %s", sourceName);
  195. try
  196. {
  197. #ifdef FAIL_20_OPEN
  198. openCount++;
  199. if ((openCount % 5) == 0)
  200. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  201. #endif
  202. IFile *f = &sources.item(currentIdx);
  203. if (firstTime)
  204. cacheFileConnect(f, dafilesrvLookupTimeout); // set timeout to 10 seconds
  205. else
  206. {
  207. if (traceLevel > 10)
  208. DBGLOG("Looking for file using non-cached file open");
  209. }
  210. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, crc, isCompressed);
  211. if (fileStatus == FileIsValid)
  212. {
  213. if (isCompressed)
  214. current.setown(createCompressedFileReader(f));
  215. else
  216. current.setown(f->open(IFOread));
  217. if (current)
  218. {
  219. if (traceLevel > 5)
  220. DBGLOG("Opening %s", sourceName);
  221. disconnectRemoteIoOnExit(current);
  222. break;
  223. }
  224. // throwUnexpected(); - try another location if this one has the wrong version of the file
  225. }
  226. disconnectRemoteFile(f);
  227. }
  228. catch (IException *E)
  229. {
  230. E->Release();
  231. }
  232. currentIdx++;
  233. tries++;
  234. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  235. {
  236. filesTried.appendf(" %s", sourceName); // only need to build this list once
  237. switch (fileStatus)
  238. {
  239. case FileNotFound:
  240. filesTried.append(": FileNotFound");
  241. break;
  242. case FileSizeMismatch:
  243. filesTried.append(": FileSizeMismatch");
  244. break;
  245. case FileCRCMismatch:
  246. filesTried.append(": FileCRCMismatch");
  247. break;
  248. case FileDateMismatch:
  249. filesTried.append(": FileDateMismatch");
  250. break;
  251. }
  252. }
  253. }
  254. lastAccess = msTick();
  255. atomic_inc(&numFilesOpen[remote]);
  256. if ((unsigned) atomic_read(&numFilesOpen[remote]) > maxFilesOpen[remote])
  257. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  258. }
  259. }
  260. virtual void addSource(IFile *newSource)
  261. {
  262. if (newSource)
  263. {
  264. if (traceLevel > 10)
  265. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  266. CriticalBlock b(crit);
  267. sources.append(*newSource);
  268. }
  269. }
  270. virtual size32_t read(offset_t pos, size32_t len, void * data)
  271. {
  272. CriticalBlock b(crit);
  273. unsigned tries = 0;
  274. loop
  275. {
  276. try
  277. {
  278. size32_t ret = current->read(pos, len, data);
  279. lastAccess = msTick();
  280. return ret;
  281. }
  282. catch (NotYetOpenException *E)
  283. {
  284. E->Release();
  285. }
  286. catch (IException *E)
  287. {
  288. EXCLOG(MCoperatorError, E, "Read error");
  289. E->Release();
  290. DBGLOG("Failed to read length %d offset %"I64F"x file %s", len, pos, sources.item(currentIdx).queryFilename());
  291. currentIdx++;
  292. setFailure();
  293. }
  294. _checkOpen();
  295. tries++;
  296. if (tries == MAX_READ_RETRIES)
  297. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %"I64F"x file %s after %d attempts", len, pos, sources.item(currentIdx).queryFilename(), tries);
  298. }
  299. }
  300. virtual void flush()
  301. {
  302. CriticalBlock b(crit);
  303. if (current.get() != &failure)
  304. current->flush();
  305. }
  306. virtual offset_t size()
  307. {
  308. CriticalBlock b(crit);
  309. _checkOpen();
  310. lastAccess = msTick();
  311. return current->size();
  312. }
  313. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  314. virtual void setSize(offset_t size) { throwUnexpected(); }
  315. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  316. virtual const char *queryFilename() { return logical->queryFilename(); }
  317. virtual bool isAlive() const { return CInterface::isAlive(); }
  318. virtual int getLinkCount() const { return CInterface::getLinkCount(); }
  319. virtual IMemoryMappedFile *queryMappedFile()
  320. {
  321. CriticalBlock b(crit);
  322. if (mmapped)
  323. return mmapped;
  324. if (!remote)
  325. {
  326. mmapped.setown(logical->openMemoryMapped());
  327. return mmapped;
  328. }
  329. return NULL;
  330. }
  331. virtual IFileIO *queryFileIO()
  332. {
  333. return this;
  334. }
  335. virtual bool createHardFileLink()
  336. {
  337. unsigned tries = 0;
  338. loop
  339. {
  340. StringBuffer filesTried;
  341. if (currentIdx >= sources.length())
  342. currentIdx = 0;
  343. if (tries==sources.length())
  344. return false;
  345. const char *sourceName = sources.item(currentIdx).queryFilename();
  346. filesTried.appendf(" %s", sourceName);
  347. try
  348. {
  349. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, crc, isCompressed) == FileIsValid)
  350. {
  351. StringBuffer source_drive;
  352. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  353. StringBuffer query_drive;
  354. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  355. // only try to create link if on the same drive
  356. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  357. {
  358. try
  359. {
  360. DBGLOG("Trying to create Hard Link for %s", sourceName);
  361. createHardLink(logical->queryFilename(), sourceName);
  362. current.setown(sources.item(currentIdx).open(IFOread));
  363. return true;
  364. }
  365. catch(IException *E)
  366. {
  367. StringBuffer err;
  368. DBGLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  369. E->Release();
  370. }
  371. }
  372. }
  373. }
  374. catch (IException *E)
  375. {
  376. E->Release();
  377. }
  378. currentIdx++;
  379. tries++;
  380. }
  381. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  382. return false; // if we get here - no hardlink
  383. }
  384. void copyComplete()
  385. {
  386. {
  387. CriticalBlock b(crit);
  388. setFailure(); // lazyOpen will then reopen it...
  389. currentIdx = 0;
  390. remote = false;
  391. copying = false;
  392. sources.kill();
  393. sources.add(*logical.getLink(), 0);
  394. if (!lazyOpen)
  395. _checkOpen();
  396. }
  397. }
  398. virtual IFile *querySource()
  399. {
  400. CriticalBlock b(crit);
  401. _checkOpen();
  402. return &sources.item(currentIdx);
  403. };
  404. virtual IFile *queryTarget() { return logical; }
  405. virtual offset_t getSize() { return fileSize; }
  406. virtual CDateTime *queryDateTime() { return &fileDate; }
  407. static int compareAccess(IInterface **L, IInterface **R)
  408. {
  409. ILazyFileIO *LL = (ILazyFileIO *) *L;
  410. ILazyFileIO *RR = (ILazyFileIO *) *R;
  411. return LL->getLastAccessed() - RR->getLastAccessed();
  412. }
  413. };
  414. //----------------------------------------------------------------------------------------------
  415. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  416. {
  417. if (!remoteFDesc)
  418. return NULL;
  419. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  420. if (!remotePDesc)
  421. return NULL;
  422. unsigned int crc, remoteCrc;
  423. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  424. return remotePDesc;
  425. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  426. return remotePDesc;
  427. return NULL;
  428. }
  429. static bool checkClusterCount(UnsignedArray &counts, unsigned clusterNo, unsigned max)
  430. {
  431. while (!counts.isItem(clusterNo))
  432. counts.append(0);
  433. unsigned count = counts.item(clusterNo);
  434. if (count>=max)
  435. return false;
  436. counts.replace(++count, clusterNo);
  437. return true;
  438. }
  439. static bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const char *name)
  440. {
  441. StringBuffer s;
  442. return strieq(name, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
  443. }
  444. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster)
  445. {
  446. UnsignedArray clusterCounts;
  447. unsigned numCopies = pdesc->numCopies();
  448. for (unsigned copy = 0; copy < numCopies; copy++)
  449. {
  450. unsigned clusterNo = pdesc->copyClusterNum(copy);
  451. if (fromCluster && *fromCluster && !isCopyFromCluster(pdesc, clusterNo, fromCluster))
  452. continue;
  453. RemoteFilename r;
  454. pdesc->getFilename(copy,r);
  455. StringBuffer path;
  456. r.getRemotePath(path);
  457. if (localFileName && r.isLocal())
  458. {
  459. StringBuffer l;
  460. r.getLocalPath(l);
  461. if (streq(l, localFileName))
  462. continue; // don't add ourself
  463. }
  464. if (!checkClusterCount(clusterCounts, clusterNo, 2)) // Don't add more than 2 from one cluster
  465. continue;
  466. locations.append(path.str());
  467. }
  468. }
  469. static void appendPeerLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName)
  470. {
  471. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  472. if (peerCluster)
  473. {
  474. if (*peerCluster=='-') //a remote cluster was specified explicitly
  475. return;
  476. if (streq(peerCluster, roxieName))
  477. peerCluster=NULL;
  478. }
  479. appendRemoteLocations(pdesc, locations, localFileName, peerCluster);
  480. }
  481. //----------------------------------------------------------------------------------------------
  482. typedef StringArray *StringArrayPtr;
  483. class CRoxieFileCache : public CInterface, implements ICopyFileProgress, implements IRoxieFileCache
  484. {
  485. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  486. InterruptableSemaphore toCopy;
  487. InterruptableSemaphore toClose;
  488. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  489. mutable CriticalSection crit;
  490. CriticalSection cpcrit;
  491. bool started;
  492. bool aborting;
  493. bool closing;
  494. bool closePending[2];
  495. bool needToDeleteFile;
  496. StringBuffer currentTodoFile;
  497. StringAttrMapping fileErrorList;
  498. Semaphore bctStarted;
  499. Semaphore hctStarted;
  500. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, unsigned crc, bool isCompressed)
  501. {
  502. if (f->exists())
  503. {
  504. // only check size if specified
  505. if ( (size != -1) && !isCompressed && f->size()!=size) // MORE - should be able to do better on compressed you'da thunk
  506. return FileSizeMismatch;
  507. if (crc > 0)
  508. {
  509. // if a crc is specified let's check it
  510. unsigned file_crc = f->getCRC();
  511. if (file_crc && crc != file_crc) // for remote files crc_file can fail, even if the file is valid
  512. {
  513. DBGLOG("FAILED CRC Check");
  514. return FileCRCMismatch;
  515. }
  516. }
  517. CDateTime mt;
  518. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  519. }
  520. else
  521. return FileNotFound;
  522. }
  523. ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
  524. IPartDescriptor *pdesc,
  525. const StringArray &remoteLocationInfo,
  526. offset_t size, const CDateTime &modified, unsigned crc)
  527. {
  528. Owned<IFile> local = createIFile(localLocation);
  529. bool isCompressed = pdesc->queryOwner().isCompressed();
  530. Owned<CLazyFileIO> ret = new CLazyFileIO(local.getLink(), size, modified, crc, isCompressed);
  531. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, crc, isCompressed);
  532. if (fileStatus == FileIsValid)
  533. {
  534. ret->addSource(local.getLink());
  535. ret->setRemote(false);
  536. }
  537. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  538. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  539. else
  540. {
  541. bool addedOne = false;
  542. // put the peerRoxieLocations next in the list
  543. StringArray localLocations;
  544. appendPeerLocations(pdesc, localLocations, localLocation);
  545. ForEachItemIn(roxie_idx, localLocations)
  546. {
  547. try
  548. {
  549. const char *remoteName = localLocations.item(roxie_idx);
  550. Owned<IFile> remote = createIFile(remoteName);
  551. if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
  552. {
  553. if (miscDebugTraceLevel > 10)
  554. DBGLOG("adding peer roxie location %s", remoteName);
  555. ret->addSource(remote.getClear());
  556. addedOne = true;
  557. }
  558. }
  559. catch (IException *E)
  560. {
  561. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  562. E->Release();
  563. }
  564. }
  565. if (!addedOne && (copyResources || useRemoteResources)) // If no peer locations available, go to remote
  566. {
  567. ForEachItemIn(idx, remoteLocationInfo)
  568. {
  569. try
  570. {
  571. const char *remoteName = remoteLocationInfo.item(idx);
  572. Owned<IFile> remote = createIFile(remoteName);
  573. if (traceLevel > 5)
  574. DBGLOG("checking remote location %s", remoteName);
  575. if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
  576. {
  577. if (miscDebugTraceLevel > 10)
  578. DBGLOG("adding remote location %s", remoteName);
  579. ret->addSource(remote.getClear());
  580. addedOne = true;
  581. }
  582. }
  583. catch (IException *E)
  584. {
  585. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  586. E->Release();
  587. }
  588. }
  589. }
  590. if (!addedOne)
  591. {
  592. if (local->exists()) // Implies local dali and local file out of sync
  593. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  594. else
  595. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  596. }
  597. ret->setRemote(true);
  598. }
  599. ret->setCache(this);
  600. files.setValue(localLocation, (ILazyFileIO *)ret);
  601. return ret.getClear();
  602. }
  603. void deleteTempFiles(const char *targetFilename)
  604. {
  605. try
  606. {
  607. StringBuffer destPath;
  608. StringBuffer prevTempFile;
  609. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  610. if (useTreeCopy)
  611. prevTempFile.append("*.tmp");
  612. else
  613. prevTempFile.append("*.$$$");
  614. Owned<IFile> dirf = createIFile(destPath.str());
  615. Owned<IDirectoryIterator> iter = dirf->directoryFiles(prevTempFile.str(),false,false);
  616. ForEach(*iter)
  617. {
  618. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  619. if (thisFile->isFile() == foundYes)
  620. thisFile->remove();
  621. }
  622. }
  623. catch(IException *E)
  624. {
  625. StringBuffer err;
  626. DBGLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  627. E->Release();
  628. }
  629. catch(...)
  630. {
  631. }
  632. }
  633. bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg)
  634. {
  635. bool fileCopied = false;
  636. IFile *sourceFile;
  637. try
  638. {
  639. f->setCopying(true);
  640. sourceFile = f->querySource();
  641. }
  642. catch (IException *E)
  643. {
  644. f->setCopying(false);
  645. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  646. throw;
  647. }
  648. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  649. deleteTempFiles(targetFilename);
  650. if ( (sourceFile->size() + minFreeDiskSpace) > freeDiskSpace)
  651. {
  652. StringBuffer err;
  653. err.appendf("Insufficient disk space. File %s needs %"I64F"d bytes, but only %"I64F"d remains, and %"I64F"d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  654. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  655. EXCLOG(MCoperatorError, E);
  656. E->Release();
  657. }
  658. else
  659. {
  660. IpSubNet subnet; // preferred set but not required
  661. IpAddress fromip; // returned
  662. Owned<IFile> destFile = createIFile(useTreeCopy?targetFilename:tempFile);
  663. bool hardLinkCreated = false;
  664. try
  665. {
  666. if (useHardLink)
  667. hardLinkCreated = f->createHardFileLink();
  668. if (hardLinkCreated)
  669. msg = "Hard Link";
  670. else
  671. {
  672. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  673. if (traceLevel > 5)
  674. {
  675. StringBuffer str;
  676. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  677. MTimeSection timing(NULL, str.str());
  678. if (useTreeCopy)
  679. sourceFile->treeCopyTo(destFile, subnet, fromip, true);
  680. else
  681. sourceFile->copyTo(destFile);
  682. }
  683. else
  684. {
  685. if (useTreeCopy)
  686. sourceFile->treeCopyTo(destFile, subnet, fromip, true);
  687. else
  688. sourceFile->copyTo(destFile);
  689. }
  690. }
  691. f->setCopying(false);
  692. fileCopied = true;
  693. }
  694. catch(IException *E)
  695. {
  696. f->setCopying(false);
  697. if (!useTreeCopy)
  698. { // done by tree copy
  699. EXCLOG(E, "Copy exception - remove templocal");
  700. destFile->remove();
  701. }
  702. deleteTempFiles(targetFilename);
  703. throw;
  704. }
  705. catch(...)
  706. {
  707. f->setCopying(false);
  708. if (!useTreeCopy)
  709. { // done by tree copy
  710. DBGLOG("%s exception - remove templocal", msg);
  711. destFile->remove();
  712. }
  713. deleteTempFiles(targetFilename);
  714. throw;
  715. }
  716. if (needToDeleteFile)
  717. {
  718. DBGLOG("%s of data file %s stopped since query has been deleted", msg, targetFilename);
  719. destFile->remove();
  720. }
  721. else
  722. {
  723. if (!hardLinkCreated && !useTreeCopy) // for hardlinks / treeCopy - no rename needed
  724. {
  725. try
  726. {
  727. destFile->rename(targetFilename);
  728. }
  729. catch(IException *)
  730. {
  731. f->setCopying(false);
  732. deleteTempFiles(targetFilename);
  733. throw;
  734. }
  735. DBGLOG("%s to %s complete", msg, targetFilename);
  736. }
  737. f->copyComplete();
  738. }
  739. }
  740. deleteTempFiles(targetFilename);
  741. return fileCopied;
  742. }
  743. bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage)
  744. {
  745. if (!f->isRemote())
  746. f->copyComplete();
  747. else
  748. {
  749. if (displayFirstFileMessage)
  750. DBGLOG("Received files to copy");
  751. const char *targetFilename = f->queryTarget()->queryFilename();
  752. StringBuffer tempFile(targetFilename);
  753. StringBuffer destPath;
  754. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  755. if (destPath.length())
  756. recursiveCreateDirectory(destPath.str());
  757. else
  758. destPath.append('.');
  759. if (!checkDirExists(destPath.str())) {
  760. ERRLOG("Dest directory %s does not exist", destPath.str());
  761. return false;
  762. }
  763. tempFile.append(".$$$");
  764. const char *msg = background ? "Background copy" : "Copy";
  765. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg);
  766. }
  767. return false; // if we get here there was no file copied
  768. }
  769. public:
  770. IMPLEMENT_IINTERFACE;
  771. CRoxieFileCache(bool testmode = false) : bct(*this), hct(*this)
  772. {
  773. aborting = false;
  774. closing = false;
  775. closePending[false] = false;
  776. closePending[true] = false;
  777. needToDeleteFile = false;
  778. started = false;
  779. }
  780. ~CRoxieFileCache()
  781. {
  782. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  783. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  784. HashIterator h(files);
  785. ForEach(h)
  786. {
  787. ILazyFileIO *f = files.mapToValue(&h.query());
  788. f->removeCache(this);
  789. }
  790. }
  791. virtual void start()
  792. {
  793. if (!started)
  794. {
  795. bct.start();
  796. hct.start();
  797. bctStarted.wait();
  798. hctStarted.wait();
  799. started = true;
  800. }
  801. }
  802. class BackgroundCopyThread : public Thread
  803. {
  804. CRoxieFileCache &owner;
  805. public:
  806. BackgroundCopyThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheBackgroundCopyThread") {}
  807. virtual int run()
  808. {
  809. return owner.runBackgroundCopy();
  810. }
  811. } bct;
  812. class HandleCloserThread : public Thread
  813. {
  814. CRoxieFileCache &owner;
  815. public:
  816. HandleCloserThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheHandleCloserThread") {}
  817. virtual int run()
  818. {
  819. return owner.runHandleCloser();
  820. }
  821. } hct;
  822. int runBackgroundCopy()
  823. {
  824. bctStarted.signal();
  825. if (traceLevel)
  826. DBGLOG("Background copy thread %p starting", this);
  827. try
  828. {
  829. int fileCopiedCount = 0;
  830. bool fileCopied = false;
  831. loop
  832. {
  833. {
  834. CriticalBlock b(crit);
  835. needToDeleteFile = false;
  836. currentTodoFile.clear();
  837. }
  838. fileCopied = false;
  839. Linked<ILazyFileIO> next;
  840. toCopy.wait();
  841. {
  842. CriticalBlock b(crit);
  843. if (closing)
  844. break;
  845. if (todo.ordinality())
  846. {
  847. ILazyFileIO *popped = &todo.pop();
  848. if (popped->isAlive())
  849. {
  850. next.set(popped);
  851. if (next)
  852. currentTodoFile.append(next->queryFilename());
  853. }
  854. atomic_dec(&numFilesToProcess); // must decrement counter for SNMP accuracy
  855. }
  856. }
  857. if (next)
  858. {
  859. try
  860. {
  861. fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false);
  862. CriticalBlock b(crit);
  863. if (fileCopied)
  864. fileCopiedCount++;
  865. }
  866. catch (IException *E)
  867. {
  868. if (aborting)
  869. throw;
  870. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  871. E->Release();
  872. }
  873. catch (...)
  874. {
  875. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  876. }
  877. }
  878. CriticalBlock b(crit);
  879. if ( (todo.ordinality()== 0) && (fileCopiedCount)) // finished last copy
  880. {
  881. DBGLOG("No more data files to copy");
  882. fileCopiedCount = 0;
  883. }
  884. }
  885. }
  886. catch (IException *E)
  887. {
  888. if (!aborting)
  889. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  890. E->Release();
  891. }
  892. catch (...)
  893. {
  894. DBGLOG("Unknown exception in background copy thread");
  895. }
  896. if (traceLevel)
  897. DBGLOG("Background copy thread %p exiting", this);
  898. return 0;
  899. }
  900. int runHandleCloser()
  901. {
  902. hctStarted.signal();
  903. if (traceLevel)
  904. DBGLOG("HandleCloser thread %p starting", this);
  905. try
  906. {
  907. loop
  908. {
  909. toClose.wait(10 * 60 * 1000); // check expired file handles every 10 minutes
  910. if (closing)
  911. break;
  912. doCloseExpired(true);
  913. doCloseExpired(false);
  914. }
  915. }
  916. catch (IException *E)
  917. {
  918. if (!aborting)
  919. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  920. E->Release();
  921. }
  922. catch (...)
  923. {
  924. DBGLOG("Unknown exception in handle closer thread");
  925. }
  926. if (traceLevel)
  927. DBGLOG("Handle closer thread %p exiting", this);
  928. return 0;
  929. }
  930. virtual void join(unsigned timeout=INFINITE)
  931. {
  932. aborting = true;
  933. if (started)
  934. {
  935. toCopy.interrupt();
  936. toClose.interrupt();
  937. bct.join(timeout);
  938. hct.join(timeout);
  939. }
  940. }
  941. virtual void wait()
  942. {
  943. closing = true;
  944. if (started)
  945. {
  946. toCopy.signal();
  947. toClose.signal();
  948. bct.join();
  949. hct.join();
  950. }
  951. }
  952. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  953. {
  954. return aborting ? CFPcancel : CFPcontinue;
  955. }
  956. virtual void removeCache(ILazyFileIO *file) const
  957. {
  958. CriticalBlock b(crit);
  959. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  960. // So only remove from hash table if what we find there matches the item that is being deleted.
  961. const char *filename = file->queryFilename();
  962. ILazyFileIO *goer = files.getValue(filename);
  963. if (goer == file)
  964. files.remove(filename);
  965. ForEachItemInRev(idx, todo)
  966. {
  967. if (file == &todo.item(idx))
  968. {
  969. todo.remove(idx);
  970. }
  971. }
  972. }
  973. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  974. IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
  975. const StringArray &deployedLocationInfo, bool startFileCopy)
  976. {
  977. IPropertyTree &partProps = pdesc->queryProperties();
  978. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  979. bool local = partProps.getPropBool("@local");
  980. unsigned crc;
  981. if (!crcResources || !pdesc->getCrc(crc))
  982. crc = 0;
  983. CDateTime dfsDate;
  984. if (checkFileDate)
  985. {
  986. const char *dateStr = partProps.queryProp("@modified");
  987. dfsDate.setString(dateStr);
  988. }
  989. unsigned partNo = pdesc->queryPartIndex() + 1;
  990. StringBuffer localLocation;
  991. if (local)
  992. {
  993. assertex(partNo==1 && numParts==1);
  994. localLocation.append(lfn); // any resolution done earlier
  995. }
  996. else
  997. {
  998. // MORE - not at all sure about this. Foreign files should stay foreign ?
  999. CDfsLogicalFileName dlfn;
  1000. dlfn.set(lfn);
  1001. if (dlfn.isForeign())
  1002. dlfn.clearForeign();
  1003. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault);
  1004. }
  1005. Owned<ILazyFileIO> ret;
  1006. try
  1007. {
  1008. CriticalBlock b(crit);
  1009. Linked<ILazyFileIO> f = files.getValue(localLocation);
  1010. if (f && f->isAlive())
  1011. {
  1012. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1013. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1014. {
  1015. StringBuffer modifiedDt;
  1016. if (!dfsDate.isNull())
  1017. dfsDate.getString(modifiedDt);
  1018. StringBuffer fileDt;
  1019. f->queryDateTime()->getString(fileDt);
  1020. if (fileErrorList.find(lfn) == 0)
  1021. {
  1022. switch (fileType)
  1023. {
  1024. case ROXIE_KEY:
  1025. fileErrorList.setValue(lfn, "Key");
  1026. break;
  1027. case ROXIE_FILE:
  1028. fileErrorList.setValue(lfn, "File");
  1029. break;
  1030. }
  1031. }
  1032. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1033. }
  1034. else
  1035. return f.getClear();
  1036. }
  1037. ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate, crc));
  1038. if (startFileCopy)
  1039. {
  1040. if (ret->isRemote())
  1041. {
  1042. if (copyResources) // MORE - should always copy peer files
  1043. {
  1044. needToDeleteFile = false;
  1045. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1046. {
  1047. ret->checkOpen();
  1048. doCopy(ret, false, false);
  1049. return ret.getLink();
  1050. }
  1051. todo.append(*ret);
  1052. atomic_inc(&numFilesToProcess); // must increment counter for SNMP accuracy
  1053. toCopy.signal();
  1054. }
  1055. }
  1056. }
  1057. if (!lazyOpen)
  1058. ret->checkOpen();
  1059. }
  1060. catch(IException *e)
  1061. {
  1062. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1063. {
  1064. if (fileErrorList.find(lfn) == 0)
  1065. {
  1066. switch (fileType)
  1067. {
  1068. case ROXIE_KEY:
  1069. fileErrorList.setValue(lfn, "Key");
  1070. break;
  1071. case ROXIE_FILE:
  1072. fileErrorList.setValue(lfn, "File");
  1073. break;
  1074. }
  1075. }
  1076. }
  1077. throw;
  1078. }
  1079. return ret.getLink();
  1080. }
  1081. virtual void closeExpired(bool remote)
  1082. {
  1083. // This schedules a close at the next available opportunity
  1084. CriticalBlock b(cpcrit); // paranoid...
  1085. if (!closePending[remote])
  1086. {
  1087. closePending[remote] = true;
  1088. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) atomic_read(&numFilesOpen[remote]));
  1089. toClose.signal();
  1090. }
  1091. }
  1092. void doCloseExpired(bool remote)
  1093. {
  1094. {
  1095. CriticalBlock b(cpcrit); // paranoid...
  1096. closePending[remote] = false;
  1097. }
  1098. CriticalBlock b(crit);
  1099. ICopyArrayOf<ILazyFileIO> goers;
  1100. HashIterator h(files);
  1101. ForEach(h)
  1102. {
  1103. ILazyFileIO *f = files.mapToValue(&h.query());
  1104. if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1105. {
  1106. unsigned age = msTick() - f->getLastAccessed();
  1107. if (age > maxFileAge[remote])
  1108. {
  1109. if (traceLevel > 5)
  1110. {
  1111. // NOTE - querySource will cause the file to be opened if not already open
  1112. // That's OK here, since we know the file is open and remote.
  1113. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1114. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1115. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1116. }
  1117. f->close();
  1118. }
  1119. else
  1120. goers.append(*f);
  1121. }
  1122. }
  1123. unsigned numFilesLeft = goers.ordinality();
  1124. if (numFilesLeft > maxFilesOpen[remote])
  1125. {
  1126. goers.sort(CLazyFileIO::compareAccess);
  1127. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1128. unsigned idx = minFilesOpen[remote];
  1129. while (idx < numFilesLeft)
  1130. {
  1131. ILazyFileIO &f = goers.item(idx++);
  1132. if (!f.isCopying())
  1133. {
  1134. if (traceLevel > 5)
  1135. {
  1136. unsigned age = msTick() - f.getLastAccessed();
  1137. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1138. }
  1139. f.close();
  1140. }
  1141. }
  1142. }
  1143. }
  1144. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1145. {
  1146. Owned<IFile> dirf = createIFile(directory);
  1147. if (dirf->exists() && dirf->isDirectory())
  1148. {
  1149. try
  1150. {
  1151. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1152. ForEach(*iter)
  1153. {
  1154. const char *thisName = iter->query().queryFilename();
  1155. flushUnusedDirectories(origBaseDir, thisName, xml);
  1156. }
  1157. if (stricmp(origBaseDir, directory) != 0)
  1158. {
  1159. try
  1160. {
  1161. dirf->remove();
  1162. xml.appendf("<Directory>%s</Directory>\n", directory);
  1163. DBGLOG("Deleted directory %s", directory);
  1164. }
  1165. catch (IException *e)
  1166. {
  1167. // don't care if we can't delete the directory
  1168. e->Release();
  1169. }
  1170. catch(...)
  1171. {
  1172. // don't care if we can't delete the directory
  1173. }
  1174. }
  1175. }
  1176. catch (IException *e)
  1177. {
  1178. // don't care if we can't delete the directory
  1179. e->Release();
  1180. }
  1181. catch(...)
  1182. {
  1183. // don't care if we can't delete the directory
  1184. }
  1185. }
  1186. }
  1187. int numFilesToCopy()
  1188. {
  1189. CriticalBlock b(crit);
  1190. return todo.ordinality();
  1191. }
  1192. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1193. static inline bool validFNameChar(char c)
  1194. {
  1195. static const char *invalids = "*\"/:<>?\\|";
  1196. return (c>=32 && c<127 && !strchr(invalids, c));
  1197. }
  1198. };
  1199. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  1200. {
  1201. StringArray remoteLocations;
  1202. if (remotePDesc)
  1203. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL);
  1204. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, replicationLevel[channel], remoteLocations, startCopy);
  1205. }
  1206. //====================================================================================================
  1207. class CFilePartMap : public CInterface, implements IFilePartMap
  1208. {
  1209. class FilePartMapElement
  1210. {
  1211. public:
  1212. offset_t base;
  1213. offset_t top;
  1214. inline int compare(offset_t offset)
  1215. {
  1216. if (offset < base)
  1217. return -1;
  1218. else if (offset >= top)
  1219. return 1;
  1220. else
  1221. return 0;
  1222. }
  1223. } *map;
  1224. static int compareParts(const void *l, const void *r)
  1225. {
  1226. offset_t lp = * (offset_t *) l;
  1227. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  1228. return thisPart->compare(lp);
  1229. }
  1230. unsigned numParts;
  1231. offset_t recordCount;
  1232. offset_t totalSize;
  1233. StringAttr fileName;
  1234. public:
  1235. IMPLEMENT_IINTERFACE;
  1236. CFilePartMap(IPropertyTree &resource)
  1237. {
  1238. fileName.set(resource.queryProp("@id"));
  1239. numParts = resource.getPropInt("@numparts");
  1240. recordCount = resource.getPropInt64("@recordCount");
  1241. totalSize = resource.getPropInt64("@size");
  1242. assertex(numParts);
  1243. map = new FilePartMapElement[numParts];
  1244. for (unsigned i = 0; i < numParts; i++)
  1245. {
  1246. StringBuffer partPath;
  1247. partPath.appendf("Part[@num='%d']", i+1);
  1248. IPropertyTree *part = resource.queryPropTree(partPath.str());
  1249. if (!part)
  1250. {
  1251. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  1252. part = resource.queryPropTree(partPath.str());
  1253. }
  1254. assertex(part);
  1255. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  1256. assertex(size != (unsigned __int64) -1);
  1257. map[i].base = i ? map[i-1].top : 0;
  1258. map[i].top = map[i].base + size;
  1259. }
  1260. if (totalSize == (offset_t)-1)
  1261. totalSize = map[numParts-1].top;
  1262. else if (totalSize != map[numParts-1].top)
  1263. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%"I64F"d vs %"I64F"d", map[numParts-1].top, totalSize);
  1264. }
  1265. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  1266. : fileName(_fileName)
  1267. {
  1268. numParts = fdesc.numParts();
  1269. IPropertyTree &props = fdesc.queryProperties();
  1270. recordCount = props.getPropInt64("@recordCount", -1);
  1271. totalSize = props.getPropInt64("@size", -1);
  1272. assertex(numParts);
  1273. map = new FilePartMapElement[numParts];
  1274. for (unsigned i = 0; i < numParts; i++)
  1275. {
  1276. IPartDescriptor &part = *fdesc.queryPart(i);
  1277. IPropertyTree &partProps = part.queryProperties();
  1278. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  1279. assertex(size != (unsigned __int64) -1);
  1280. map[i].base = i ? map[i-1].top : 0;
  1281. map[i].top = map[i].base + size;
  1282. }
  1283. if (totalSize == (offset_t)-1)
  1284. totalSize = map[numParts-1].top;
  1285. else if (totalSize != map[numParts-1].top)
  1286. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%"I64F"d vs %"I64F"d", map[numParts-1].top, totalSize);
  1287. }
  1288. ~CFilePartMap()
  1289. {
  1290. delete [] map;
  1291. }
  1292. virtual bool IsShared() const { return CInterface::IsShared(); };
  1293. virtual unsigned mapOffset(offset_t pos) const
  1294. {
  1295. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  1296. if (!part)
  1297. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %"I64F"d in file %s out of range (max offset permitted is %"I64F"d)", pos, fileName.sget(), totalSize);
  1298. return (part-map)+1;
  1299. }
  1300. virtual unsigned getNumParts() const
  1301. {
  1302. return numParts;
  1303. }
  1304. virtual offset_t getTotalSize() const
  1305. {
  1306. return totalSize;
  1307. }
  1308. virtual offset_t getRecordCount() const
  1309. {
  1310. return recordCount;
  1311. }
  1312. virtual offset_t getBase(unsigned part) const
  1313. {
  1314. if (part > numParts || part == 0)
  1315. {
  1316. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existant file part %d (valid are 1-%d)", part, numParts);
  1317. }
  1318. return map[part-1].base;
  1319. }
  1320. virtual offset_t getFileSize() const
  1321. {
  1322. return map[numParts-1].top;
  1323. }
  1324. };
  1325. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  1326. {
  1327. return new CFilePartMap(fileName, fdesc);
  1328. }
  1329. //====================================================================================================
  1330. class CFileIOArray : public CInterface, implements IFileIOArray
  1331. {
  1332. unsigned __int64 totalSize;
  1333. mutable CriticalSection crit;
  1334. mutable StringAttr id;
  1335. void _getId() const
  1336. {
  1337. md5_state_t md5;
  1338. md5_byte_t digest[16];
  1339. md5_init(&md5);
  1340. ForEachItemIn(idx, files)
  1341. {
  1342. IFileIO *file = files.item(idx);
  1343. if (file)
  1344. {
  1345. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  1346. }
  1347. }
  1348. md5_finish(&md5, digest);
  1349. char digestStr[33];
  1350. for (int i = 0; i < 16; i++)
  1351. {
  1352. sprintf(&digestStr[i*2],"%02x", digest[i]);
  1353. }
  1354. id.set(digestStr, 32);
  1355. }
  1356. public:
  1357. IMPLEMENT_IINTERFACE;
  1358. CFileIOArray()
  1359. {
  1360. valid = 0;
  1361. totalSize = (unsigned __int64) -1;
  1362. }
  1363. virtual bool IsShared() const { return CInterface::IsShared(); };
  1364. PointerIArrayOf<IFileIO> files;
  1365. Int64Array bases;
  1366. unsigned valid;
  1367. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base)
  1368. {
  1369. if (!files.isItem(partNo))
  1370. {
  1371. DBGLOG("getFilePart requested invalid part %d", partNo);
  1372. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  1373. }
  1374. IFileIO *file = files.item(partNo);
  1375. if (!file)
  1376. {
  1377. // DBGLOG("getFilePart requested nonBonded part %d", partNo);
  1378. // throw MakeStringException(ROXIE_FILE_FAIL, "getFilePart requested nonBonded part %d", partNo);
  1379. base = 0;
  1380. return NULL;
  1381. }
  1382. base = bases.item(partNo);
  1383. return LINK(file);
  1384. }
  1385. void addFile(IFileIO *f, offset_t base)
  1386. {
  1387. if (f)
  1388. valid++;
  1389. files.append(f);
  1390. bases.append(base);
  1391. }
  1392. virtual unsigned length()
  1393. {
  1394. return files.length();
  1395. }
  1396. virtual unsigned numValid()
  1397. {
  1398. return valid;
  1399. }
  1400. virtual bool isValid(unsigned partNo)
  1401. {
  1402. if (!files.isItem(partNo))
  1403. return false;
  1404. IFileIO *file = files.item(partNo);
  1405. if (!file)
  1406. return false;
  1407. return true;
  1408. }
  1409. virtual unsigned __int64 size()
  1410. {
  1411. CriticalBlock b(crit);
  1412. if (totalSize == (unsigned __int64) -1)
  1413. {
  1414. totalSize = 0;
  1415. ForEachItemIn(idx, files)
  1416. {
  1417. IFileIO *file = files.item(idx);
  1418. if (file)
  1419. totalSize += file->size();
  1420. }
  1421. }
  1422. return totalSize;
  1423. }
  1424. virtual StringBuffer &getId(StringBuffer &ret) const
  1425. {
  1426. CriticalBlock b(crit);
  1427. if (!id)
  1428. _getId();
  1429. return ret.append(id);
  1430. }
  1431. };
  1432. template <class X> class PerChannelCacheOf
  1433. {
  1434. PointerIArrayOf<X> cache;
  1435. IntArray channels;
  1436. public:
  1437. void set(X *value, unsigned channel)
  1438. {
  1439. cache.append(value);
  1440. channels.append(channel);
  1441. }
  1442. X *get(unsigned channel) const
  1443. {
  1444. ForEachItemIn(idx, channels)
  1445. {
  1446. if (channels.item(idx)==channel)
  1447. return cache.item(idx);
  1448. }
  1449. return NULL;
  1450. }
  1451. };
  1452. CRoxieFileCache * fileCache;
  1453. class CResolvedFile : public CInterface, implements IResolvedFileCreator
  1454. {
  1455. protected:
  1456. IResolvedFileCache *cached;
  1457. StringAttr lfn;
  1458. StringAttr physicalName;
  1459. Owned<IDistributedFile> dFile; // NULL on copies serialized to slaves. Note that this implies we keep a lock on dali file for the lifetime of this object.
  1460. CDateTime fileTimeStamp;
  1461. offset_t fileSize;
  1462. unsigned fileCheckSum;
  1463. RoxieFileType fileType;
  1464. bool isSuper;
  1465. StringArray subNames;
  1466. PointerIArrayOf<IFileDescriptor> subFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1467. PointerIArrayOf<IFileDescriptor> remoteSubFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1468. PointerIArrayOf<IDefRecordMeta> diskMeta;
  1469. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  1470. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  1471. Owned <IPropertyTree> properties;
  1472. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  1473. {
  1474. subNames.append(subName);
  1475. subFiles.append(fdesc);
  1476. remoteSubFiles.append(remoteFDesc);
  1477. IPropertyTree const & props = fdesc->queryProperties();
  1478. if(props.hasProp("_record_layout"))
  1479. {
  1480. MemoryBuffer mb;
  1481. props.getPropBin("_record_layout", mb);
  1482. diskMeta.append(deserializeRecordMeta(mb, true));
  1483. }
  1484. else
  1485. diskMeta.append(NULL);
  1486. unsigned numParts = fdesc->numParts();
  1487. offset_t base = 0;
  1488. for (unsigned i = 0; i < numParts; i++)
  1489. {
  1490. IPartDescriptor *pdesc = fdesc->queryPart(i);
  1491. IPropertyTree &partProps = pdesc->queryProperties();
  1492. offset_t dfsSize = partProps.getPropInt64("@size");
  1493. partProps.setPropInt64("@offset", base);
  1494. base += dfsSize;
  1495. }
  1496. fileSize += base;
  1497. }
  1498. // We cache all the file maps/arrays etc here.
  1499. mutable CriticalSection lock;
  1500. mutable Owned<IFilePartMap> fileMap;
  1501. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  1502. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  1503. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  1504. public:
  1505. IMPLEMENT_IINTERFACE;
  1506. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* daliHelper, bool cacheIt, bool writeAccess, bool _isSuperFile)
  1507. : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
  1508. {
  1509. cached = NULL;
  1510. fileSize = 0;
  1511. fileCheckSum = 0;
  1512. if (dFile)
  1513. {
  1514. if (traceLevel > 5)
  1515. DBGLOG("Roxie server adding information for file %s", lfn.get());
  1516. IDistributedSuperFile *superFile = dFile->querySuperFile();
  1517. if (superFile)
  1518. {
  1519. isSuper = true;
  1520. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  1521. ForEach(*subs)
  1522. {
  1523. IDistributedFile &sub = subs->query();
  1524. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  1525. Owned<IFileDescriptor> remoteFDesc;
  1526. if (daliHelper)
  1527. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt));
  1528. subDFiles.append(OLINK(sub));
  1529. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1530. }
  1531. }
  1532. else // normal file, not superkey
  1533. {
  1534. isSuper = false;
  1535. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  1536. Owned<IFileDescriptor> remoteFDesc;
  1537. if (daliHelper)
  1538. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
  1539. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1540. }
  1541. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  1542. bool csSet = dFile->getFileCheckSum(fileCheckSum);
  1543. assertex(tsSet); // per Nigel, is always set
  1544. properties.set(&dFile->queryAttributes());
  1545. }
  1546. }
  1547. virtual void beforeDispose()
  1548. {
  1549. if (cached)
  1550. {
  1551. cached->removeCache(this);
  1552. }
  1553. }
  1554. virtual unsigned numSubFiles() const
  1555. {
  1556. return subNames.length();
  1557. }
  1558. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  1559. {
  1560. if (subNames.isItem(num))
  1561. {
  1562. name.append(subNames.item(num));
  1563. return true;
  1564. }
  1565. else
  1566. {
  1567. return false;
  1568. }
  1569. }
  1570. virtual unsigned findSubName(const char *subname) const
  1571. {
  1572. ForEachItemIn(idx, subNames)
  1573. {
  1574. if (stricmp(subNames.item(idx), subname))
  1575. return idx;
  1576. }
  1577. return NotFound;
  1578. }
  1579. virtual unsigned getContents(StringArray &contents) const
  1580. {
  1581. ForEachItemIn(idx, subNames)
  1582. {
  1583. contents.append(subNames.item(idx));
  1584. }
  1585. return subNames.length();
  1586. }
  1587. virtual bool isSuperFile() const
  1588. {
  1589. return isSuper;
  1590. }
  1591. inline bool isKey() const
  1592. {
  1593. return fileType==ROXIE_KEY;
  1594. }
  1595. virtual IFilePartMap *getFileMap() const
  1596. {
  1597. CriticalBlock b(lock);
  1598. if (!fileMap)
  1599. {
  1600. if (subFiles.length())
  1601. {
  1602. assertex(subFiles.length()==1);
  1603. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  1604. }
  1605. }
  1606. return fileMap.getLink();
  1607. }
  1608. virtual void serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  1609. {
  1610. // Find all the partno's that go to this channel
  1611. unsigned numParts = fdesc->numParts();
  1612. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  1613. numParts--; // don't want to send TLK
  1614. UnsignedArray partNos;
  1615. for (unsigned i = 1; i <= numParts; i++)
  1616. {
  1617. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1618. if (getBondedChannel(i)==channel || !isLocal)
  1619. {
  1620. partNos.append(i-1);
  1621. }
  1622. }
  1623. fdesc->serializeParts(mb, partNos);
  1624. }
  1625. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const
  1626. {
  1627. if (traceLevel > 6)
  1628. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  1629. byte type = (byte) fileType;
  1630. mb.append(type);
  1631. fileTimeStamp.serialize(mb);
  1632. mb.append(fileCheckSum);
  1633. mb.append(fileSize);
  1634. unsigned numSubFiles = subFiles.length();
  1635. mb.append(numSubFiles);
  1636. ForEachItemIn(idx, subFiles)
  1637. {
  1638. mb.append(subNames.item(idx));
  1639. IFileDescriptor *fdesc = subFiles.item(idx);
  1640. serializeFDesc(mb, fdesc, channel, isLocal);
  1641. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1642. if (remoteFDesc)
  1643. {
  1644. mb.append(true);
  1645. serializeFDesc(mb, remoteFDesc, channel, isLocal);
  1646. }
  1647. else
  1648. mb.append(false);
  1649. if (fileType == ROXIE_KEY) // for now we only support translation on index files
  1650. {
  1651. IDefRecordMeta *meta = diskMeta.item(idx);
  1652. if (meta)
  1653. {
  1654. mb.append(true);
  1655. serializeRecordMeta(mb, meta, true);
  1656. }
  1657. else
  1658. mb.append(false);
  1659. }
  1660. }
  1661. if (properties)
  1662. {
  1663. mb.append(true);
  1664. properties->serialize(mb);
  1665. }
  1666. else
  1667. mb.append(false);
  1668. }
  1669. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  1670. {
  1671. CriticalBlock b(lock);
  1672. IFileIOArray *ret = ioArrayMap.get(channel);
  1673. if (!ret)
  1674. {
  1675. ret = createIFileIOArray(isOpt, channel);
  1676. ioArrayMap.set(ret, channel);
  1677. }
  1678. return LINK(ret);
  1679. }
  1680. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  1681. {
  1682. Owned<CFileIOArray> f = new CFileIOArray();
  1683. f->addFile(NULL, 0);
  1684. if (subFiles.length())
  1685. {
  1686. IFileDescriptor *fdesc = subFiles.item(0);
  1687. IFileDescriptor *remoteFDesc = remoteSubFiles.item(0);
  1688. if (fdesc)
  1689. {
  1690. unsigned numParts = fdesc->numParts();
  1691. for (unsigned i = 1; i <= numParts; i++)
  1692. {
  1693. if (!channel || getBondedChannel(i)==channel)
  1694. {
  1695. try
  1696. {
  1697. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1698. assertex(pdesc);
  1699. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  1700. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  1701. IPropertyTree &partProps = pdesc->queryProperties();
  1702. f->addFile(file.getClear(), partProps.getPropInt64("@offset"));
  1703. }
  1704. catch (IException *E)
  1705. {
  1706. StringBuffer err;
  1707. err.append("Could not load file ");
  1708. fdesc->getTraceName(err);
  1709. DBGLOG(E, err.str());
  1710. if (!isOpt)
  1711. throw;
  1712. E->Release();
  1713. f->addFile(NULL, 0);
  1714. }
  1715. }
  1716. else
  1717. f->addFile(NULL, 0);
  1718. }
  1719. }
  1720. }
  1721. return f.getClear();
  1722. }
  1723. virtual IKeyArray *getKeyArray(IDefRecordMeta *activityMeta, TranslatorArray *translators, bool isOpt, unsigned channel, bool allowFieldTranslation) const
  1724. {
  1725. unsigned maxParts = 0;
  1726. ForEachItemIn(subFile, subFiles)
  1727. {
  1728. IFileDescriptor *fdesc = subFiles.item(subFile);
  1729. if (fdesc)
  1730. {
  1731. unsigned numParts = fdesc->numParts();
  1732. if (numParts > 1)
  1733. numParts--; // Don't include TLK
  1734. if (numParts > maxParts)
  1735. maxParts = numParts;
  1736. }
  1737. IDefRecordMeta *thisDiskMeta = diskMeta.item(subFile);
  1738. if (fdesc && thisDiskMeta && activityMeta && !thisDiskMeta->equals(activityMeta))
  1739. if (allowFieldTranslation)
  1740. translators->append(createRecordLayoutTranslator(lfn, thisDiskMeta, activityMeta));
  1741. else
  1742. {
  1743. DBGLOG("Key layout mismatch: %s", lfn.get());
  1744. StringBuffer q, d;
  1745. getRecordMetaAsString(q, activityMeta);
  1746. getRecordMetaAsString(d, thisDiskMeta);
  1747. DBGLOG("Activity: %s", q.str());
  1748. DBGLOG("Disk: %s", d.str());
  1749. throw MakeStringException(ROXIE_MISMATCH, "Key layout mismatch detected for index %s", lfn.get());
  1750. }
  1751. else
  1752. translators->append(NULL);
  1753. }
  1754. CriticalBlock b(lock);
  1755. IKeyArray *ret = keyArrayMap.get(channel);
  1756. if (!ret)
  1757. {
  1758. ret = createKeyArray(isOpt, channel, maxParts);
  1759. keyArrayMap.set(ret, channel);
  1760. }
  1761. return LINK(ret);
  1762. }
  1763. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  1764. {
  1765. Owned<IKeyArray> ret = ::createKeyArray();
  1766. if (channel)
  1767. {
  1768. ret->addKey(NULL);
  1769. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  1770. {
  1771. if (channel == getBondedChannel(partNo))
  1772. {
  1773. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  1774. ForEachItemIn(idx, subFiles)
  1775. {
  1776. IFileDescriptor *fdesc = subFiles.item(idx);
  1777. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1778. Owned <ILazyFileIO> part;
  1779. unsigned crc = 0;
  1780. if (fdesc) // NB there may be no parts for this channel
  1781. {
  1782. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  1783. if (pdesc)
  1784. {
  1785. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  1786. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  1787. pdesc->getCrc(crc);
  1788. }
  1789. }
  1790. if (part)
  1791. {
  1792. if (lazyOpen)
  1793. {
  1794. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  1795. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), false, false));
  1796. }
  1797. else
  1798. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), false, false));
  1799. }
  1800. else
  1801. keyset->addIndex(NULL);
  1802. }
  1803. ret->addKey(keyset.getClear());
  1804. }
  1805. else
  1806. ret->addKey(NULL);
  1807. }
  1808. }
  1809. else
  1810. {
  1811. // Channel 0 means return the TLK
  1812. IArrayOf<IKeyIndexBase> subkeys;
  1813. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  1814. ForEachItemIn(idx, subFiles)
  1815. {
  1816. IFileDescriptor *fdesc = subFiles.item(idx);
  1817. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1818. Owned<IKeyIndexBase> key;
  1819. if (fdesc)
  1820. {
  1821. unsigned numParts = fdesc->numParts();
  1822. assertex(numParts > 0);
  1823. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  1824. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  1825. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  1826. unsigned crc = 0;
  1827. pdesc->getCrc(crc);
  1828. StringBuffer pname;
  1829. pdesc->getPath(pname);
  1830. if (lazyOpen)
  1831. {
  1832. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  1833. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), numParts>1, false));
  1834. }
  1835. else
  1836. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), numParts>1, false));
  1837. keyset->addIndex(LINK(key->queryPart(0)));
  1838. }
  1839. else
  1840. keyset->addIndex(NULL);
  1841. }
  1842. if (keyset->numParts())
  1843. ret->addKey(keyset.getClear());
  1844. else if (!isOpt)
  1845. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  1846. else if (traceLevel > 4)
  1847. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  1848. }
  1849. return ret.getClear();
  1850. }
  1851. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IFileIOArray *files, IRecordSize *recs, bool preload, int numKeys) const
  1852. {
  1853. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  1854. // Failures to resolve will not be cached, only successes.
  1855. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  1856. CriticalBlock b(lock);
  1857. IInMemoryIndexManager *ret = indexMap.get(channel);
  1858. if (!ret)
  1859. {
  1860. ret = createInMemoryIndexManager(isOpt, lfn);
  1861. ret->load(files, recs, preload, numKeys); // note - files (passed in) are channel specific
  1862. indexMap.set(ret, channel);
  1863. }
  1864. return LINK(ret);
  1865. }
  1866. virtual const CDateTime &queryTimeStamp() const
  1867. {
  1868. return fileTimeStamp;
  1869. }
  1870. virtual unsigned queryCheckSum() const
  1871. {
  1872. return fileCheckSum;
  1873. }
  1874. virtual offset_t getFileSize() const
  1875. {
  1876. return fileSize;
  1877. }
  1878. virtual void addSubFile(const IResolvedFile *_sub)
  1879. {
  1880. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  1881. if (subFiles.length())
  1882. assertex(sub->fileType==fileType);
  1883. else
  1884. fileType = sub->fileType;
  1885. subRFiles.append((IResolvedFile &) *LINK(_sub));
  1886. ForEachItemIn(idx, sub->subFiles)
  1887. {
  1888. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  1889. }
  1890. }
  1891. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  1892. {
  1893. addFile(lfn, _sub, _remoteSub);
  1894. }
  1895. virtual void addSubFile(const char *localFileName)
  1896. {
  1897. Owned<IFile> file = createIFile(localFileName);
  1898. assertex(file->exists());
  1899. offset_t size = file->size();
  1900. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1901. Owned<IPropertyTree> pp = createPTree("Part");
  1902. pp->setPropInt64("@size",size);
  1903. pp->setPropBool("@local", true);
  1904. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  1905. addSubFile(fdesc.getClear(), NULL);
  1906. }
  1907. virtual void setCache(IResolvedFileCache *cache)
  1908. {
  1909. if (cached)
  1910. {
  1911. if (traceLevel > 9)
  1912. DBGLOG("setCache removing from prior cache %s", queryFileName());
  1913. if (cache==NULL)
  1914. cached->removeCache(this);
  1915. else
  1916. throwUnexpected();
  1917. }
  1918. cached = cache;
  1919. }
  1920. virtual bool isAlive() const
  1921. {
  1922. return CInterface::isAlive();
  1923. }
  1924. virtual const char *queryFileName() const
  1925. {
  1926. return lfn.get();
  1927. }
  1928. virtual const char *queryPhysicalName() const
  1929. {
  1930. return physicalName.get();
  1931. }
  1932. virtual const IPropertyTree *queryProperties() const
  1933. {
  1934. return properties;
  1935. }
  1936. virtual void remove()
  1937. {
  1938. subFiles.kill();
  1939. properties.clear();
  1940. if (dFile)
  1941. {
  1942. dFile->detach();
  1943. }
  1944. else
  1945. {
  1946. try
  1947. {
  1948. Owned<IFile> file = createIFile(physicalName.get());
  1949. file->remove();
  1950. }
  1951. catch (IException *e)
  1952. {
  1953. ERRLOG(-1, "Error removing file %s",lfn.get());
  1954. e->Release();
  1955. }
  1956. }
  1957. }
  1958. virtual bool exists() const
  1959. {
  1960. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  1961. // This will make more sense if/when we start to lock earlier.
  1962. if (dFile)
  1963. return true; // MORE - may need some thought
  1964. else
  1965. return checkFileExists(lfn.get());
  1966. }
  1967. };
  1968. /*----------------------------------------------------------------------------------------------------------
  1969. MORE
  1970. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  1971. ------------------------------------------------------------------------------------------------------------*/
  1972. class CSlaveDynamicFile : public CResolvedFile
  1973. {
  1974. public:
  1975. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  1976. bool isLocal;
  1977. unsigned channel;
  1978. unsigned serverIdx;
  1979. public:
  1980. CSlaveDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  1981. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
  1982. {
  1983. // call back to the server to get the info
  1984. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  1985. try
  1986. {
  1987. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK);
  1988. bool ok = false;
  1989. for (unsigned i = 0; i < callbackRetries; i++)
  1990. {
  1991. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  1992. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  1993. char *buf = (char *) output->getBuffer(len, true);
  1994. buf[0] = isOpt;
  1995. buf[1] = isLocal;
  1996. strcpy(buf+2, lfn.get());
  1997. output->putBuffer(buf, len, true);
  1998. output->flush(true);
  1999. output.clear();
  2000. if (callback->wait(callbackTimeout))
  2001. {
  2002. ok = true;
  2003. break;
  2004. }
  2005. else
  2006. {
  2007. DBGLOG("timed out waiting for server callback - retrying");
  2008. }
  2009. }
  2010. if (ok)
  2011. {
  2012. if (traceLevel > 6)
  2013. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  2014. MemoryBuffer &serverData = callback->queryData();
  2015. byte type;
  2016. serverData.read(type);
  2017. fileType = (RoxieFileType) type;
  2018. fileTimeStamp.deserialize(serverData);
  2019. serverData.read(fileCheckSum);
  2020. serverData.read(fileSize);
  2021. unsigned numSubFiles;
  2022. serverData.read(numSubFiles);
  2023. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  2024. {
  2025. StringBuffer subName;
  2026. serverData.read(subName);
  2027. subNames.append(subName.str());
  2028. deserializeFilePart(serverData, subFiles, fileNo, false);
  2029. bool remotePresent;
  2030. serverData.read(remotePresent);
  2031. if (remotePresent)
  2032. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  2033. else
  2034. remoteSubFiles.append(NULL);
  2035. if (fileType==ROXIE_KEY)
  2036. {
  2037. bool diskMetaPresent;
  2038. serverData.read(diskMetaPresent);
  2039. if (diskMetaPresent)
  2040. diskMeta.append(deserializeRecordMeta(serverData, true));
  2041. else
  2042. diskMeta.append(NULL);
  2043. }
  2044. }
  2045. bool propertiesPresent;
  2046. serverData.read(propertiesPresent);
  2047. if (propertiesPresent)
  2048. properties.setown(createPTree(serverData));
  2049. }
  2050. else
  2051. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  2052. }
  2053. catch (...)
  2054. {
  2055. ROQ->removePendingCallback(callback);
  2056. throw;
  2057. }
  2058. ROQ->removePendingCallback(callback);
  2059. }
  2060. private:
  2061. void deserializeFilePart(MemoryBuffer &serverData, PointerIArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  2062. {
  2063. IArrayOf<IPartDescriptor> parts;
  2064. deserializePartFileDescriptors(serverData, parts);
  2065. if (parts.length())
  2066. {
  2067. files.append(LINK(&parts.item(0).queryOwner()));
  2068. }
  2069. else
  2070. {
  2071. if (traceLevel > 6)
  2072. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  2073. files.append(NULL);
  2074. }
  2075. }
  2076. };
  2077. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  2078. {
  2079. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, false, false, isSuperFile);
  2080. }
  2081. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool cacheIt, bool writeAccess)
  2082. {
  2083. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  2084. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, cacheIt, writeAccess, false);
  2085. }
  2086. class CSlaveDynamicFileCache : public CInterface, implements ISlaveDynamicFileCache
  2087. {
  2088. mutable CriticalSection crit;
  2089. CIArrayOf<CSlaveDynamicFile> files; // expect numbers to be small - probably not worth hashing
  2090. unsigned tableSize;
  2091. public:
  2092. IMPLEMENT_IINTERFACE;
  2093. CSlaveDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  2094. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal)
  2095. {
  2096. if (logctx.queryTraceLevel() > 5)
  2097. {
  2098. StringBuffer s;
  2099. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  2100. }
  2101. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  2102. CriticalBlock b(crit);
  2103. if (!cacheDate.isNull())
  2104. {
  2105. unsigned idx = 0;
  2106. while (files.isItem(idx))
  2107. {
  2108. CSlaveDynamicFile &f = files.item(idx);
  2109. if (f.channel==header->channel && f.serverIdx==header->serverIdx && stricmp(f.queryFileName(), lfn)==0)
  2110. {
  2111. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  2112. {
  2113. if (f.isKey())
  2114. clearKeyStoreCacheEntry(f.queryFileName());
  2115. files.remove(idx);
  2116. idx--;
  2117. }
  2118. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  2119. {
  2120. files.swap(idx, 0);
  2121. return LINK(&f);
  2122. }
  2123. }
  2124. idx++;
  2125. }
  2126. }
  2127. Owned<CSlaveDynamicFile> ret;
  2128. {
  2129. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  2130. CriticalUnblock b1(crit);
  2131. ret.setown(new CSlaveDynamicFile(logctx, lfn, header, isOpt, isLocal));
  2132. }
  2133. while (files.length() > tableSize)
  2134. files.remove(files.length()-1);
  2135. files.add(*ret.getLink(), 0);
  2136. return ret.getClear();
  2137. }
  2138. };
  2139. static CriticalSection slaveDynamicFileCacheCrit;
  2140. static Owned<ISlaveDynamicFileCache> slaveDynamicFileCache;
  2141. extern ISlaveDynamicFileCache *querySlaveDynamicFileCache()
  2142. {
  2143. if (!slaveDynamicFileCache)
  2144. {
  2145. CriticalBlock b(slaveDynamicFileCacheCrit);
  2146. if (!slaveDynamicFileCache)
  2147. slaveDynamicFileCache.setown(new CSlaveDynamicFileCache(20));
  2148. }
  2149. return slaveDynamicFileCache;
  2150. }
  2151. extern void releaseSlaveDynamicFileCache()
  2152. {
  2153. CriticalBlock b(slaveDynamicFileCacheCrit);
  2154. slaveDynamicFileCache.clear();
  2155. }
  2156. // Initialization/termination
  2157. MODULE_INIT(INIT_PRIORITY_STANDARD)
  2158. {
  2159. fileCache = new CRoxieFileCache;
  2160. return true;
  2161. }
  2162. MODULE_EXIT()
  2163. {
  2164. fileCache->join();
  2165. fileCache->Release();
  2166. }
  2167. extern IRoxieFileCache &queryFileCache()
  2168. {
  2169. return *fileCache;
  2170. }
  2171. class CRoxieWriteHandler : public CInterface, implements IRoxieWriteHandler
  2172. {
  2173. public:
  2174. IMPLEMENT_IINTERFACE;
  2175. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2176. : daliHelper(_daliHelper), dFile(_dFile)
  2177. {
  2178. ForEachItemIn(idx, _clusters)
  2179. {
  2180. addCluster(_clusters.item(idx));
  2181. }
  2182. if (dFile->queryDistributedFile())
  2183. {
  2184. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  2185. if (isTemporary)
  2186. {
  2187. UNIMPLEMENTED;
  2188. }
  2189. else
  2190. localFile.setown(dFile->getPartFile(0, 0));
  2191. }
  2192. else
  2193. {
  2194. isTemporary = false;
  2195. localFile.setown(dFile->getPartFile(0, 0));
  2196. }
  2197. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  2198. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  2199. }
  2200. virtual IFile *queryFile() const
  2201. {
  2202. return localFile;
  2203. }
  2204. void getClusters(StringArray &clusters) const
  2205. {
  2206. ForEachItemIn(idx, allClusters)
  2207. {
  2208. clusters.append(allClusters.item(idx));
  2209. }
  2210. }
  2211. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  2212. {
  2213. if (success)
  2214. {
  2215. copyPhysical();
  2216. if (daliHelper && daliHelper->connected())
  2217. publish(activity);
  2218. }
  2219. if (isTemporary || !success)
  2220. {
  2221. localFile->remove();
  2222. }
  2223. }
  2224. private:
  2225. bool isTemporary;
  2226. Linked<IRoxieDaliHelper> daliHelper;
  2227. Owned<ILocalOrDistributedFile> dFile;
  2228. Owned<IFile> localFile;
  2229. Owned<IGroup> localCluster;
  2230. StringAttr localClusterName;
  2231. IArrayOf<IGroup> remoteNodes;
  2232. StringArray allClusters;
  2233. void copyPhysical() const
  2234. {
  2235. if (remoteNodes.length())
  2236. {
  2237. RemoteFilename rfn, rdn;
  2238. dFile->getPartFilename(rfn, 0, 0);
  2239. StringBuffer physicalName, physicalDir, physicalBase;
  2240. rfn.getLocalPath(physicalName);
  2241. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2242. rdn.setLocalPath(physicalDir.str());
  2243. ForEachItemIn(idx, remoteNodes)
  2244. {
  2245. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2246. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2247. Owned<IFile> targetdir = createIFile(rdn);
  2248. Owned<IFile> target = createIFile(rfn);
  2249. targetdir->createDirectory();
  2250. copyFile(target, localFile);
  2251. }
  2252. }
  2253. }
  2254. void publish(const IRoxiePublishCallback *activity)
  2255. {
  2256. if (!dFile->isExternal())
  2257. {
  2258. Owned<IFileDescriptor> desc = createFileDescriptor();
  2259. desc->setNumParts(1);
  2260. RemoteFilename rfn;
  2261. dFile->getPartFilename(rfn, 0, 0);
  2262. StringBuffer physicalName, physicalDir, physicalBase;
  2263. rfn.getLocalPath(physicalName);
  2264. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2265. desc->setDefaultDir(physicalDir.str());
  2266. desc->setPartMask(physicalBase.str());
  2267. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  2268. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  2269. offset_t fileSize = localFile->size();
  2270. fileProps.setPropInt64("@size", fileSize);
  2271. partProps.setPropInt64("@size", fileSize);
  2272. CDateTime createTime, modifiedTime, accessedTime;
  2273. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  2274. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  2275. unsigned hour, min, sec, nanosec;
  2276. modifiedTime.getTime(hour, min, sec, nanosec);
  2277. modifiedTime.setTime(hour, min, sec, 0);
  2278. StringBuffer timestr;
  2279. modifiedTime.getString(timestr);
  2280. if(timestr.length())
  2281. partProps.setProp("@modified", timestr.str());
  2282. ClusterPartDiskMapSpec partmap;
  2283. if (localCluster)
  2284. {
  2285. desc->addCluster(localCluster, partmap);
  2286. desc->setClusterGroupName(0, localClusterName.get());
  2287. }
  2288. ForEachItemIn(idx, remoteNodes)
  2289. desc->addCluster(&remoteNodes.item(idx), partmap);
  2290. if (activity)
  2291. activity->setFileProperties(desc);
  2292. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  2293. publishFile->setAccessedTime(modifiedTime);
  2294. publishFile->attach(dFile->queryLogicalName(), activity ? activity->queryUserDescriptor() : UNKNOWN_USER);
  2295. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  2296. }
  2297. }
  2298. void addCluster(char const * cluster)
  2299. {
  2300. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  2301. if (!group)
  2302. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  2303. cluster, dFile->queryLogicalName());
  2304. if (group->isMember())
  2305. {
  2306. if (localCluster)
  2307. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2308. cluster, dFile->queryLogicalName());
  2309. localCluster.setown(group.getClear());
  2310. localClusterName.set(cluster);
  2311. }
  2312. else
  2313. {
  2314. ForEachItemIn(idx, remoteNodes)
  2315. {
  2316. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  2317. if (group->isMember(other))
  2318. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2319. cluster, dFile->queryLogicalName());
  2320. }
  2321. remoteNodes.append(*group.getClear());
  2322. }
  2323. allClusters.append(cluster);
  2324. }
  2325. };
  2326. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2327. {
  2328. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  2329. }