ccdfile.cpp 90 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "ccd.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdquery.hpp"
  26. #include "ccdstate.hpp"
  27. #include "ccdsnmp.hpp"
  28. #include "rmtfile.hpp"
  29. #include "ccdqueue.ipp"
  30. #ifdef __linux__
  31. #include <sys/mman.h>
  32. #endif
  33. atomic_t numFilesOpen[2];
  34. #define MAX_READ_RETRIES 2
  35. #ifdef _DEBUG
  36. //#define FAIL_20_READ
  37. //#define FAIL_20_OPEN
  38. #endif
  39. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  40. class NotYetOpenException : public CInterface, implements IException
  41. {
  42. public:
  43. IMPLEMENT_IINTERFACE;
  44. virtual int errorCode() const { return 0; }
  45. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  46. virtual MessageAudience errorAudience() const { return MSGAUD_internal; }
  47. };
  48. class CFailingFileIO : public CInterface, implements IFileIO
  49. {
  50. #define THROWNOTOPEN throw new NotYetOpenException()
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  54. virtual offset_t size() { THROWNOTOPEN; }
  55. virtual void flush() { THROWNOTOPEN; }
  56. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  57. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  58. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  59. virtual void close() { }
  60. } failure;
  61. class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelayedFile
  62. {
  63. protected:
  64. IArrayOf<IFile> sources;
  65. Owned<IFile> logical;
  66. unsigned currentIdx;
  67. Owned<IFileIO> current;
  68. Owned<IMemoryMappedFile> mmapped;
  69. mutable CriticalSection crit;
  70. bool remote;
  71. offset_t fileSize;
  72. CDateTime fileDate;
  73. unsigned crc;
  74. unsigned lastAccess;
  75. bool copying;
  76. bool isCompressed;
  77. const IRoxieFileCache *cached;
  78. #ifdef FAIL_20_READ
  79. unsigned readCount;
  80. #endif
  81. public:
  82. IMPLEMENT_IINTERFACE;
  83. CLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, unsigned _crc, bool _isCompressed)
  84. : logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed)
  85. {
  86. fileDate.set(_date);
  87. currentIdx = 0;
  88. current.set(&failure);
  89. remote = false;
  90. #ifdef FAIL_20_READ
  91. readCount = 0;
  92. #endif
  93. lastAccess = msTick();
  94. copying = false;
  95. cached = NULL;
  96. }
  97. ~CLazyFileIO()
  98. {
  99. setFailure(); // ensures the open file count properly maintained
  100. }
  101. virtual void beforeDispose()
  102. {
  103. if (cached)
  104. cached->removeCache(this);
  105. }
  106. void setCache(const IRoxieFileCache *cache)
  107. {
  108. assertex(!cached);
  109. cached = cache;
  110. }
  111. void removeCache(const IRoxieFileCache *cache)
  112. {
  113. assertex(cached==cache);
  114. cached = NULL;
  115. }
  116. inline void setRemote(bool _remote) { remote = _remote; }
  117. virtual void setCopying(bool _copying)
  118. {
  119. CriticalBlock b(crit);
  120. copying = _copying;
  121. }
  122. virtual bool isCopying() const
  123. {
  124. CriticalBlock b(crit);
  125. return copying;
  126. }
  127. virtual bool isOpen() const
  128. {
  129. CriticalBlock b(crit);
  130. return current.get() != &failure;
  131. }
  132. virtual unsigned getLastAccessed() const
  133. {
  134. CriticalBlock b(crit);
  135. return lastAccess;
  136. }
  137. virtual void close()
  138. {
  139. CriticalBlock b(crit);
  140. setFailure();
  141. }
  142. virtual bool isRemote()
  143. {
  144. CriticalBlock b(crit);
  145. return remote;
  146. }
  147. void setFailure()
  148. {
  149. try
  150. {
  151. if (current.get()!=&failure)
  152. atomic_dec(&numFilesOpen[remote]);
  153. current.set(&failure);
  154. }
  155. catch (IException *E)
  156. {
  157. if (traceLevel > 5)
  158. {
  159. StringBuffer s;
  160. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  161. }
  162. E->Release();
  163. }
  164. }
  165. void checkOpen()
  166. {
  167. CriticalBlock b(crit);
  168. _checkOpen();
  169. }
  170. void _checkOpen()
  171. {
  172. if (current.get() == &failure)
  173. {
  174. StringBuffer filesTried;
  175. unsigned tries = 0;
  176. bool firstTime = true; // first time try the "fast / cache" way - if that fails - try original away - if that still fails, error
  177. RoxieFileStatus fileStatus = FileNotFound;
  178. loop
  179. {
  180. if (currentIdx >= sources.length())
  181. currentIdx = 0;
  182. if (tries==sources.length())
  183. {
  184. if (firstTime) // if first time - reset and try again - non cache way
  185. {
  186. firstTime = false;
  187. tries = 0;
  188. }
  189. else
  190. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  191. }
  192. const char *sourceName = sources.item(currentIdx).queryFilename();
  193. if (traceLevel > 10)
  194. DBGLOG("Trying to open %s", sourceName);
  195. try
  196. {
  197. #ifdef FAIL_20_OPEN
  198. openCount++;
  199. if ((openCount % 5) == 0)
  200. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  201. #endif
  202. IFile *f = &sources.item(currentIdx);
  203. if (firstTime)
  204. cacheFileConnect(f, dafilesrvLookupTimeout); // set timeout to 10 seconds
  205. else
  206. {
  207. if (traceLevel > 10)
  208. DBGLOG("Looking for file using non-cached file open");
  209. }
  210. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, crc, isCompressed);
  211. if (fileStatus == FileIsValid)
  212. {
  213. if (isCompressed)
  214. current.setown(createCompressedFileReader(f));
  215. else
  216. current.setown(f->open(IFOread));
  217. if (current)
  218. {
  219. if (traceLevel > 5)
  220. DBGLOG("Opening %s", sourceName);
  221. disconnectRemoteIoOnExit(current);
  222. break;
  223. }
  224. // throwUnexpected(); - try another location if this one has the wrong version of the file
  225. }
  226. disconnectRemoteFile(f);
  227. }
  228. catch (IException *E)
  229. {
  230. E->Release();
  231. }
  232. currentIdx++;
  233. tries++;
  234. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  235. {
  236. filesTried.appendf(" %s", sourceName); // only need to build this list once
  237. switch (fileStatus)
  238. {
  239. case FileNotFound:
  240. filesTried.append(": FileNotFound");
  241. break;
  242. case FileSizeMismatch:
  243. filesTried.append(": FileSizeMismatch");
  244. break;
  245. case FileCRCMismatch:
  246. filesTried.append(": FileCRCMismatch");
  247. break;
  248. case FileDateMismatch:
  249. filesTried.append(": FileDateMismatch");
  250. break;
  251. }
  252. }
  253. }
  254. lastAccess = msTick();
  255. atomic_inc(&numFilesOpen[remote]);
  256. if ((unsigned) atomic_read(&numFilesOpen[remote]) > maxFilesOpen[remote])
  257. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  258. }
  259. }
  260. virtual void addSource(IFile *newSource)
  261. {
  262. if (newSource)
  263. {
  264. if (traceLevel > 10)
  265. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  266. CriticalBlock b(crit);
  267. sources.append(*newSource);
  268. }
  269. }
  270. virtual size32_t read(offset_t pos, size32_t len, void * data)
  271. {
  272. CriticalBlock b(crit);
  273. unsigned tries = 0;
  274. loop
  275. {
  276. try
  277. {
  278. size32_t ret = current->read(pos, len, data);
  279. lastAccess = msTick();
  280. return ret;
  281. }
  282. catch (NotYetOpenException *E)
  283. {
  284. E->Release();
  285. }
  286. catch (IException *E)
  287. {
  288. EXCLOG(MCoperatorError, E, "Read error");
  289. E->Release();
  290. DBGLOG("Failed to read length %d offset %"I64F"x file %s", len, pos, sources.item(currentIdx).queryFilename());
  291. currentIdx++;
  292. setFailure();
  293. }
  294. _checkOpen();
  295. tries++;
  296. if (tries == MAX_READ_RETRIES)
  297. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %"I64F"x file %s after %d attempts", len, pos, sources.item(currentIdx).queryFilename(), tries);
  298. }
  299. }
  300. virtual void flush()
  301. {
  302. CriticalBlock b(crit);
  303. if (current.get() != &failure)
  304. current->flush();
  305. }
  306. virtual offset_t size()
  307. {
  308. CriticalBlock b(crit);
  309. _checkOpen();
  310. lastAccess = msTick();
  311. return current->size();
  312. }
  313. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  314. virtual void setSize(offset_t size) { throwUnexpected(); }
  315. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  316. virtual const char *queryFilename() { return logical->queryFilename(); }
  317. virtual bool isAlive() const { return CInterface::isAlive(); }
  318. virtual int getLinkCount() const { return CInterface::getLinkCount(); }
  319. virtual IMemoryMappedFile *queryMappedFile()
  320. {
  321. CriticalBlock b(crit);
  322. if (mmapped)
  323. return mmapped;
  324. if (!remote)
  325. {
  326. mmapped.setown(logical->openMemoryMapped());
  327. return mmapped;
  328. }
  329. return NULL;
  330. }
  331. virtual IFileIO *queryFileIO()
  332. {
  333. return this;
  334. }
  335. virtual bool createHardFileLink()
  336. {
  337. unsigned tries = 0;
  338. loop
  339. {
  340. StringBuffer filesTried;
  341. if (currentIdx >= sources.length())
  342. currentIdx = 0;
  343. if (tries==sources.length())
  344. return false;
  345. const char *sourceName = sources.item(currentIdx).queryFilename();
  346. filesTried.appendf(" %s", sourceName);
  347. try
  348. {
  349. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, crc, isCompressed) == FileIsValid)
  350. {
  351. StringBuffer source_drive;
  352. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  353. StringBuffer query_drive;
  354. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  355. // only try to create link if on the same drive
  356. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  357. {
  358. try
  359. {
  360. DBGLOG("Trying to create Hard Link for %s", sourceName);
  361. createHardLink(logical->queryFilename(), sourceName);
  362. current.setown(sources.item(currentIdx).open(IFOread));
  363. return true;
  364. }
  365. catch(IException *E)
  366. {
  367. StringBuffer err;
  368. DBGLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  369. E->Release();
  370. }
  371. }
  372. }
  373. }
  374. catch (IException *E)
  375. {
  376. E->Release();
  377. }
  378. currentIdx++;
  379. tries++;
  380. }
  381. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  382. return false; // if we get here - no hardlink
  383. }
  384. void copyComplete()
  385. {
  386. {
  387. CriticalBlock b(crit);
  388. setFailure(); // lazyOpen will then reopen it...
  389. currentIdx = 0;
  390. remote = false;
  391. copying = false;
  392. sources.kill();
  393. sources.add(*logical.getLink(), 0);
  394. if (!lazyOpen)
  395. _checkOpen();
  396. }
  397. }
  398. virtual IFile *querySource()
  399. {
  400. CriticalBlock b(crit);
  401. _checkOpen();
  402. return &sources.item(currentIdx);
  403. };
  404. virtual IFile *queryTarget() { return logical; }
  405. virtual offset_t getSize() { return fileSize; }
  406. virtual CDateTime *queryDateTime() { return &fileDate; }
  407. static int compareAccess(IInterface **L, IInterface **R)
  408. {
  409. ILazyFileIO *LL = (ILazyFileIO *) *L;
  410. ILazyFileIO *RR = (ILazyFileIO *) *R;
  411. return LL->getLastAccessed() - RR->getLastAccessed();
  412. }
  413. };
  414. //----------------------------------------------------------------------------------------------
  415. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  416. {
  417. if (!remoteFDesc)
  418. return NULL;
  419. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  420. if (!remotePDesc)
  421. return NULL;
  422. unsigned int crc, remoteCrc;
  423. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  424. return remotePDesc;
  425. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  426. return remotePDesc;
  427. return NULL;
  428. }
  429. static int getClusterPriority(const char *clusterName)
  430. {
  431. assertex(preferredClusters);
  432. int *priority = preferredClusters->getValue(clusterName);
  433. return priority ? *priority : 100;
  434. }
  435. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
  436. {
  437. IFileDescriptor &fdesc = pdesc->queryOwner();
  438. unsigned numCopies = pdesc->numCopies();
  439. unsigned lastClusterNo = (unsigned) -1;
  440. unsigned numThisCluster = 0;
  441. int priority = 0;
  442. IntArray priorities;
  443. for (unsigned copy = 0; copy < numCopies; copy++)
  444. {
  445. unsigned clusterNo = pdesc->copyClusterNum(copy);
  446. StringBuffer clusterName;
  447. fdesc.getClusterGroupName(clusterNo, clusterName);
  448. if (fromCluster && *fromCluster)
  449. {
  450. bool matches = strieq(clusterName.str(), fromCluster);
  451. if (matches!=includeFromCluster)
  452. continue;
  453. }
  454. RemoteFilename r;
  455. pdesc->getFilename(copy,r);
  456. StringBuffer path;
  457. r.getRemotePath(path);
  458. if (localFileName && r.isLocal())
  459. {
  460. StringBuffer l;
  461. r.getLocalPath(l);
  462. if (streq(l, localFileName))
  463. continue; // don't add ourself
  464. }
  465. if (clusterNo == lastClusterNo)
  466. {
  467. numThisCluster++;
  468. if (numThisCluster > 2) // Don't add more than 2 from one cluster
  469. continue;
  470. }
  471. else
  472. {
  473. numThisCluster = 1;
  474. lastClusterNo = clusterNo;
  475. if (preferredClusters)
  476. {
  477. priority = getClusterPriority(clusterName);
  478. }
  479. else
  480. priority = copy;
  481. }
  482. if (priority >= 0)
  483. {
  484. ForEachItemIn(idx, priorities)
  485. {
  486. if (priorities.item(idx) < priority)
  487. break;
  488. }
  489. priorities.add(priority, idx);
  490. locations.add(path.str(), idx);
  491. }
  492. }
  493. }
  494. //----------------------------------------------------------------------------------------------
  495. typedef StringArray *StringArrayPtr;
  496. class CRoxieFileCache : public CInterface, implements ICopyFileProgress, implements IRoxieFileCache
  497. {
  498. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  499. InterruptableSemaphore toCopy;
  500. InterruptableSemaphore toClose;
  501. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  502. mutable CriticalSection crit;
  503. CriticalSection cpcrit;
  504. bool started;
  505. bool aborting;
  506. bool closing;
  507. bool closePending[2];
  508. StringAttrMapping fileErrorList;
  509. Semaphore bctStarted;
  510. Semaphore hctStarted;
  511. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, unsigned crc, bool isCompressed)
  512. {
  513. cacheFileConnect(f, dafilesrvLookupTimeout); // set timeout to 10 seconds
  514. if (f->exists())
  515. {
  516. // only check size if specified
  517. if ( (size != -1) && !isCompressed && f->size()!=size) // MORE - should be able to do better on compressed you'da thunk
  518. return FileSizeMismatch;
  519. if (crc > 0)
  520. {
  521. // if a crc is specified let's check it
  522. unsigned file_crc = f->getCRC();
  523. if (file_crc && crc != file_crc) // for remote files crc_file can fail, even if the file is valid
  524. {
  525. DBGLOG("FAILED CRC Check");
  526. return FileCRCMismatch;
  527. }
  528. }
  529. CDateTime mt;
  530. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  531. }
  532. else
  533. return FileNotFound;
  534. }
  535. ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
  536. IPartDescriptor *pdesc,
  537. const StringArray &remoteLocationInfo,
  538. offset_t size, const CDateTime &modified, unsigned crc)
  539. {
  540. Owned<IFile> local = createIFile(localLocation);
  541. bool isCompressed = pdesc->queryOwner().isCompressed();
  542. Owned<CLazyFileIO> ret = new CLazyFileIO(local.getLink(), size, modified, crc, isCompressed);
  543. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, crc, isCompressed);
  544. if (fileStatus == FileIsValid)
  545. {
  546. ret->addSource(local.getLink());
  547. ret->setRemote(false);
  548. }
  549. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  550. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  551. else
  552. {
  553. bool addedOne = false;
  554. // put the peerRoxieLocations next in the list
  555. StringArray localLocations;
  556. appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true); // Adds all locations on the same cluster
  557. ForEachItemIn(roxie_idx, localLocations)
  558. {
  559. try
  560. {
  561. const char *remoteName = localLocations.item(roxie_idx);
  562. Owned<IFile> remote = createIFile(remoteName);
  563. RoxieFileStatus status = fileUpToDate(remote, size, modified, crc, isCompressed);
  564. if (status==FileIsValid)
  565. {
  566. if (miscDebugTraceLevel > 5)
  567. DBGLOG("adding peer location %s", remoteName);
  568. ret->addSource(remote.getClear());
  569. addedOne = true;
  570. }
  571. else if (miscDebugTraceLevel > 10)
  572. DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
  573. }
  574. catch (IException *E)
  575. {
  576. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  577. E->Release();
  578. }
  579. }
  580. if (!addedOne && (copyResources || useRemoteResources)) // If no peer locations available, go to remote
  581. {
  582. ForEachItemIn(idx, remoteLocationInfo)
  583. {
  584. try
  585. {
  586. const char *remoteName = remoteLocationInfo.item(idx);
  587. Owned<IFile> remote = createIFile(remoteName);
  588. if (traceLevel > 5)
  589. DBGLOG("checking remote location %s", remoteName);
  590. RoxieFileStatus status = fileUpToDate(remote, size, modified, crc, isCompressed);
  591. if (status==FileIsValid)
  592. {
  593. if (miscDebugTraceLevel > 5)
  594. DBGLOG("adding remote location %s", remoteName);
  595. ret->addSource(remote.getClear());
  596. addedOne = true;
  597. }
  598. else if (miscDebugTraceLevel > 10)
  599. DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
  600. }
  601. catch (IException *E)
  602. {
  603. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  604. E->Release();
  605. }
  606. }
  607. }
  608. if (!addedOne)
  609. {
  610. if (local->exists()) // Implies local dali and local file out of sync
  611. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  612. else
  613. {
  614. if (traceLevel > 2)
  615. {
  616. DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
  617. ForEachItemIn(local_idx, localLocations)
  618. {
  619. DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
  620. }
  621. DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
  622. ForEachItemIn(remote_idx, remoteLocationInfo)
  623. {
  624. DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
  625. }
  626. }
  627. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  628. }
  629. }
  630. ret->setRemote(true);
  631. }
  632. ret->setCache(this);
  633. files.setValue(localLocation, (ILazyFileIO *)ret);
  634. return ret.getClear();
  635. }
  636. void deleteTempFiles(const char *targetFilename)
  637. {
  638. try
  639. {
  640. StringBuffer destPath;
  641. StringBuffer prevTempFile;
  642. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  643. if (useTreeCopy)
  644. prevTempFile.append("*.tmp");
  645. else
  646. prevTempFile.append("*.$$$");
  647. Owned<IFile> dirf = createIFile(destPath.str());
  648. Owned<IDirectoryIterator> iter = dirf->directoryFiles(prevTempFile.str(),false,false);
  649. ForEach(*iter)
  650. {
  651. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  652. if (thisFile->isFile() == foundYes)
  653. thisFile->remove();
  654. }
  655. }
  656. catch(IException *E)
  657. {
  658. StringBuffer err;
  659. DBGLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  660. E->Release();
  661. }
  662. catch(...)
  663. {
  664. }
  665. }
  666. bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
  667. {
  668. bool fileCopied = false;
  669. IFile *sourceFile;
  670. try
  671. {
  672. f->setCopying(true);
  673. sourceFile = f->querySource();
  674. }
  675. catch (IException *E)
  676. {
  677. f->setCopying(false);
  678. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  679. throw;
  680. }
  681. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  682. deleteTempFiles(targetFilename);
  683. if ( (sourceFile->size() + minFreeDiskSpace) > freeDiskSpace)
  684. {
  685. StringBuffer err;
  686. err.appendf("Insufficient disk space. File %s needs %"I64F"d bytes, but only %"I64F"d remains, and %"I64F"d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  687. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  688. EXCLOG(MCoperatorError, E);
  689. E->Release();
  690. }
  691. else
  692. {
  693. IpSubNet subnet; // preferred set but not required
  694. IpAddress fromip; // returned
  695. Owned<IFile> destFile = createIFile(useTreeCopy?targetFilename:tempFile);
  696. bool hardLinkCreated = false;
  697. try
  698. {
  699. if (useHardLink)
  700. hardLinkCreated = f->createHardFileLink();
  701. if (hardLinkCreated)
  702. msg = "Hard Link";
  703. else
  704. {
  705. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  706. if (traceLevel > 5)
  707. {
  708. StringBuffer str;
  709. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  710. TimeSection timing(str.str());
  711. if (useTreeCopy)
  712. sourceFile->treeCopyTo(destFile, subnet, fromip, true, copyFlags);
  713. else
  714. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  715. }
  716. else
  717. {
  718. if (useTreeCopy)
  719. sourceFile->treeCopyTo(destFile, subnet, fromip, true, copyFlags);
  720. else
  721. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  722. }
  723. }
  724. f->setCopying(false);
  725. fileCopied = true;
  726. }
  727. catch(IException *E)
  728. {
  729. f->setCopying(false);
  730. if (!useTreeCopy)
  731. { // done by tree copy
  732. EXCLOG(E, "Copy exception - remove templocal");
  733. destFile->remove();
  734. }
  735. deleteTempFiles(targetFilename);
  736. throw;
  737. }
  738. catch(...)
  739. {
  740. f->setCopying(false);
  741. if (!useTreeCopy)
  742. { // done by tree copy
  743. DBGLOG("%s exception - remove templocal", msg);
  744. destFile->remove();
  745. }
  746. deleteTempFiles(targetFilename);
  747. throw;
  748. }
  749. if (!hardLinkCreated && !useTreeCopy) // for hardlinks / treeCopy - no rename needed
  750. {
  751. try
  752. {
  753. destFile->rename(targetFilename);
  754. }
  755. catch(IException *)
  756. {
  757. f->setCopying(false);
  758. deleteTempFiles(targetFilename);
  759. throw;
  760. }
  761. DBGLOG("%s to %s complete", msg, targetFilename);
  762. }
  763. f->copyComplete();
  764. }
  765. deleteTempFiles(targetFilename);
  766. return fileCopied;
  767. }
  768. bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage, CFflags copyFlags=CFnone)
  769. {
  770. if (!f->isRemote())
  771. f->copyComplete();
  772. else
  773. {
  774. if (displayFirstFileMessage)
  775. DBGLOG("Received files to copy");
  776. const char *targetFilename = f->queryTarget()->queryFilename();
  777. StringBuffer tempFile(targetFilename);
  778. StringBuffer destPath;
  779. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  780. if (destPath.length())
  781. recursiveCreateDirectory(destPath.str());
  782. else
  783. destPath.append('.');
  784. if (!checkDirExists(destPath.str())) {
  785. ERRLOG("Dest directory %s does not exist", destPath.str());
  786. return false;
  787. }
  788. tempFile.append(".$$$");
  789. const char *msg = background ? "Background copy" : "Copy";
  790. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
  791. }
  792. return false; // if we get here there was no file copied
  793. }
  794. public:
  795. IMPLEMENT_IINTERFACE;
  796. CRoxieFileCache(bool testmode = false) : bct(*this), hct(*this)
  797. {
  798. aborting = false;
  799. closing = false;
  800. closePending[false] = false;
  801. closePending[true] = false;
  802. started = false;
  803. }
  804. ~CRoxieFileCache()
  805. {
  806. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  807. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  808. HashIterator h(files);
  809. ForEach(h)
  810. {
  811. ILazyFileIO *f = files.mapToValue(&h.query());
  812. f->removeCache(this);
  813. }
  814. }
  815. virtual void start()
  816. {
  817. if (!started)
  818. {
  819. bct.start();
  820. hct.start();
  821. bctStarted.wait();
  822. hctStarted.wait();
  823. started = true;
  824. }
  825. }
  826. class BackgroundCopyThread : public Thread
  827. {
  828. CRoxieFileCache &owner;
  829. public:
  830. BackgroundCopyThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheBackgroundCopyThread") {}
  831. virtual int run()
  832. {
  833. return owner.runBackgroundCopy();
  834. }
  835. } bct;
  836. class HandleCloserThread : public Thread
  837. {
  838. CRoxieFileCache &owner;
  839. public:
  840. HandleCloserThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheHandleCloserThread") {}
  841. virtual int run()
  842. {
  843. return owner.runHandleCloser();
  844. }
  845. } hct;
  846. int runBackgroundCopy()
  847. {
  848. bctStarted.signal();
  849. if (traceLevel)
  850. DBGLOG("Background copy thread %p starting", this);
  851. try
  852. {
  853. int fileCopiedCount = 0;
  854. bool fileCopied = false;
  855. loop
  856. {
  857. fileCopied = false;
  858. Linked<ILazyFileIO> next;
  859. toCopy.wait();
  860. {
  861. CriticalBlock b(crit);
  862. if (closing)
  863. break;
  864. if (todo.ordinality())
  865. {
  866. ILazyFileIO *popped = &todo.pop();
  867. if (popped->isAlive())
  868. {
  869. next.set(popped);
  870. }
  871. atomic_dec(&numFilesToProcess); // must decrement counter for SNMP accuracy
  872. }
  873. }
  874. if (next)
  875. {
  876. try
  877. {
  878. fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false, CFflush_rdwr);
  879. CriticalBlock b(crit);
  880. if (fileCopied)
  881. fileCopiedCount++;
  882. }
  883. catch (IException *E)
  884. {
  885. if (aborting)
  886. throw;
  887. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  888. E->Release();
  889. }
  890. catch (...)
  891. {
  892. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  893. }
  894. }
  895. CriticalBlock b(crit);
  896. if ( (todo.ordinality()== 0) && (fileCopiedCount)) // finished last copy
  897. {
  898. DBGLOG("No more data files to copy");
  899. fileCopiedCount = 0;
  900. }
  901. }
  902. }
  903. catch (IException *E)
  904. {
  905. if (!aborting)
  906. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  907. E->Release();
  908. }
  909. catch (...)
  910. {
  911. DBGLOG("Unknown exception in background copy thread");
  912. }
  913. if (traceLevel)
  914. DBGLOG("Background copy thread %p exiting", this);
  915. return 0;
  916. }
  917. int runHandleCloser()
  918. {
  919. hctStarted.signal();
  920. if (traceLevel)
  921. DBGLOG("HandleCloser thread %p starting", this);
  922. try
  923. {
  924. loop
  925. {
  926. toClose.wait(10 * 60 * 1000); // check expired file handles every 10 minutes
  927. if (closing)
  928. break;
  929. doCloseExpired(true);
  930. doCloseExpired(false);
  931. }
  932. }
  933. catch (IException *E)
  934. {
  935. if (!aborting)
  936. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  937. E->Release();
  938. }
  939. catch (...)
  940. {
  941. DBGLOG("Unknown exception in handle closer thread");
  942. }
  943. if (traceLevel)
  944. DBGLOG("Handle closer thread %p exiting", this);
  945. return 0;
  946. }
  947. virtual void join(unsigned timeout=INFINITE)
  948. {
  949. aborting = true;
  950. if (started)
  951. {
  952. toCopy.interrupt();
  953. toClose.interrupt();
  954. bct.join(timeout);
  955. hct.join(timeout);
  956. }
  957. }
  958. virtual void wait()
  959. {
  960. closing = true;
  961. if (started)
  962. {
  963. toCopy.signal();
  964. toClose.signal();
  965. bct.join();
  966. hct.join();
  967. }
  968. }
  969. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  970. {
  971. return aborting ? CFPcancel : CFPcontinue;
  972. }
  973. virtual void removeCache(ILazyFileIO *file) const
  974. {
  975. CriticalBlock b(crit);
  976. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  977. // So only remove from hash table if what we find there matches the item that is being deleted.
  978. const char *filename = file->queryFilename();
  979. ILazyFileIO *goer = files.getValue(filename);
  980. if (goer == file)
  981. files.remove(filename);
  982. ForEachItemInRev(idx, todo)
  983. {
  984. if (file == &todo.item(idx))
  985. {
  986. todo.remove(idx);
  987. atomic_dec(&numFilesToProcess); // must decrement counter for SNMP accuracy
  988. }
  989. }
  990. }
  991. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  992. IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
  993. const StringArray &deployedLocationInfo, bool startFileCopy)
  994. {
  995. IPropertyTree &partProps = pdesc->queryProperties();
  996. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  997. bool local = partProps.getPropBool("@local");
  998. unsigned crc;
  999. if (!crcResources || !pdesc->getCrc(crc))
  1000. crc = 0;
  1001. CDateTime dfsDate;
  1002. if (checkFileDate)
  1003. {
  1004. const char *dateStr = partProps.queryProp("@modified");
  1005. dfsDate.setString(dateStr);
  1006. }
  1007. unsigned partNo = pdesc->queryPartIndex() + 1;
  1008. StringBuffer localLocation;
  1009. if (local)
  1010. {
  1011. assertex(partNo==1 && numParts==1);
  1012. localLocation.append(lfn); // any resolution done earlier
  1013. }
  1014. else
  1015. {
  1016. // MORE - not at all sure about this. Foreign files should stay foreign ?
  1017. CDfsLogicalFileName dlfn;
  1018. dlfn.set(lfn);
  1019. if (dlfn.isForeign())
  1020. dlfn.clearForeign();
  1021. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault);
  1022. }
  1023. Owned<ILazyFileIO> ret;
  1024. try
  1025. {
  1026. CriticalBlock b(crit);
  1027. Linked<ILazyFileIO> f = files.getValue(localLocation);
  1028. if (f && f->isAlive())
  1029. {
  1030. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1031. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1032. {
  1033. StringBuffer modifiedDt;
  1034. if (!dfsDate.isNull())
  1035. dfsDate.getString(modifiedDt);
  1036. StringBuffer fileDt;
  1037. f->queryDateTime()->getString(fileDt);
  1038. if (fileErrorList.find(lfn) == 0)
  1039. {
  1040. switch (fileType)
  1041. {
  1042. case ROXIE_KEY:
  1043. fileErrorList.setValue(lfn, "Key");
  1044. break;
  1045. case ROXIE_FILE:
  1046. fileErrorList.setValue(lfn, "File");
  1047. break;
  1048. }
  1049. }
  1050. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1051. }
  1052. else
  1053. return f.getClear();
  1054. }
  1055. ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate, crc));
  1056. if (startFileCopy)
  1057. {
  1058. if (ret->isRemote())
  1059. {
  1060. if (copyResources) // MORE - should always copy peer files
  1061. {
  1062. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1063. {
  1064. ret->checkOpen();
  1065. doCopy(ret, false, false, CFflush_rdwr);
  1066. return ret.getLink();
  1067. }
  1068. // Copies are popped from end of the todo list
  1069. // By putting the replicates on the front we ensure they are done after the primaries
  1070. // and are therefore likely to result in local rather than remote copies.
  1071. if (replicationLevel)
  1072. todo.add(*ret, 0);
  1073. else
  1074. todo.append(*ret);
  1075. atomic_inc(&numFilesToProcess); // must increment counter for SNMP accuracy
  1076. toCopy.signal();
  1077. }
  1078. }
  1079. }
  1080. if (!lazyOpen)
  1081. ret->checkOpen();
  1082. }
  1083. catch(IException *e)
  1084. {
  1085. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1086. {
  1087. if (fileErrorList.find(lfn) == 0)
  1088. {
  1089. switch (fileType)
  1090. {
  1091. case ROXIE_KEY:
  1092. fileErrorList.setValue(lfn, "Key");
  1093. break;
  1094. case ROXIE_FILE:
  1095. fileErrorList.setValue(lfn, "File");
  1096. break;
  1097. }
  1098. }
  1099. }
  1100. throw;
  1101. }
  1102. return ret.getLink();
  1103. }
  1104. virtual void closeExpired(bool remote)
  1105. {
  1106. // This schedules a close at the next available opportunity
  1107. CriticalBlock b(cpcrit); // paranoid...
  1108. if (!closePending[remote])
  1109. {
  1110. closePending[remote] = true;
  1111. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) atomic_read(&numFilesOpen[remote]));
  1112. toClose.signal();
  1113. }
  1114. }
  1115. void doCloseExpired(bool remote)
  1116. {
  1117. {
  1118. CriticalBlock b(cpcrit); // paranoid...
  1119. closePending[remote] = false;
  1120. }
  1121. CriticalBlock b(crit);
  1122. ICopyArrayOf<ILazyFileIO> goers;
  1123. HashIterator h(files);
  1124. ForEach(h)
  1125. {
  1126. ILazyFileIO *f = files.mapToValue(&h.query());
  1127. if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1128. {
  1129. unsigned age = msTick() - f->getLastAccessed();
  1130. if (age > maxFileAge[remote])
  1131. {
  1132. if (traceLevel > 5)
  1133. {
  1134. // NOTE - querySource will cause the file to be opened if not already open
  1135. // That's OK here, since we know the file is open and remote.
  1136. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1137. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1138. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1139. }
  1140. f->close();
  1141. }
  1142. else
  1143. goers.append(*f);
  1144. }
  1145. }
  1146. unsigned numFilesLeft = goers.ordinality();
  1147. if (numFilesLeft > maxFilesOpen[remote])
  1148. {
  1149. goers.sort(CLazyFileIO::compareAccess);
  1150. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1151. unsigned idx = minFilesOpen[remote];
  1152. while (idx < numFilesLeft)
  1153. {
  1154. ILazyFileIO &f = goers.item(idx++);
  1155. if (!f.isCopying())
  1156. {
  1157. if (traceLevel > 5)
  1158. {
  1159. unsigned age = msTick() - f.getLastAccessed();
  1160. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1161. }
  1162. f.close();
  1163. }
  1164. }
  1165. }
  1166. }
  1167. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1168. {
  1169. Owned<IFile> dirf = createIFile(directory);
  1170. if (dirf->exists() && dirf->isDirectory())
  1171. {
  1172. try
  1173. {
  1174. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1175. ForEach(*iter)
  1176. {
  1177. const char *thisName = iter->query().queryFilename();
  1178. flushUnusedDirectories(origBaseDir, thisName, xml);
  1179. }
  1180. if (stricmp(origBaseDir, directory) != 0)
  1181. {
  1182. try
  1183. {
  1184. dirf->remove();
  1185. xml.appendf("<Directory>%s</Directory>\n", directory);
  1186. DBGLOG("Deleted directory %s", directory);
  1187. }
  1188. catch (IException *e)
  1189. {
  1190. // don't care if we can't delete the directory
  1191. e->Release();
  1192. }
  1193. catch(...)
  1194. {
  1195. // don't care if we can't delete the directory
  1196. }
  1197. }
  1198. }
  1199. catch (IException *e)
  1200. {
  1201. // don't care if we can't delete the directory
  1202. e->Release();
  1203. }
  1204. catch(...)
  1205. {
  1206. // don't care if we can't delete the directory
  1207. }
  1208. }
  1209. }
  1210. int numFilesToCopy()
  1211. {
  1212. CriticalBlock b(crit);
  1213. return todo.ordinality();
  1214. }
  1215. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1216. static inline bool validFNameChar(char c)
  1217. {
  1218. static const char *invalids = "*\"/:<>?\\|";
  1219. return (c>=32 && c<127 && !strchr(invalids, c));
  1220. }
  1221. };
  1222. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  1223. {
  1224. StringArray remoteLocations;
  1225. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  1226. if (peerCluster)
  1227. {
  1228. if (*peerCluster!='-') // a remote cluster was specified explicitly
  1229. appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true); // Add only from specified cluster
  1230. }
  1231. else
  1232. appendRemoteLocations(pdesc, remoteLocations, NULL, roxieName, false); // Add from any cluster on same dali, other than mine
  1233. if (remotePDesc)
  1234. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false); // Then any remote on remote dali
  1235. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, replicationLevel[channel], remoteLocations, startCopy);
  1236. }
  1237. //====================================================================================================
  1238. class CFilePartMap : public CInterface, implements IFilePartMap
  1239. {
  1240. class FilePartMapElement
  1241. {
  1242. public:
  1243. offset_t base;
  1244. offset_t top;
  1245. inline int compare(offset_t offset)
  1246. {
  1247. if (offset < base)
  1248. return -1;
  1249. else if (offset >= top)
  1250. return 1;
  1251. else
  1252. return 0;
  1253. }
  1254. } *map;
  1255. static int compareParts(const void *l, const void *r)
  1256. {
  1257. offset_t lp = * (offset_t *) l;
  1258. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  1259. return thisPart->compare(lp);
  1260. }
  1261. unsigned numParts;
  1262. offset_t recordCount;
  1263. offset_t totalSize;
  1264. StringAttr fileName;
  1265. public:
  1266. IMPLEMENT_IINTERFACE;
  1267. CFilePartMap(IPropertyTree &resource)
  1268. {
  1269. fileName.set(resource.queryProp("@id"));
  1270. numParts = resource.getPropInt("@numparts");
  1271. recordCount = resource.getPropInt64("@recordCount");
  1272. totalSize = resource.getPropInt64("@size");
  1273. assertex(numParts);
  1274. map = new FilePartMapElement[numParts];
  1275. for (unsigned i = 0; i < numParts; i++)
  1276. {
  1277. StringBuffer partPath;
  1278. partPath.appendf("Part[@num='%d']", i+1);
  1279. IPropertyTree *part = resource.queryPropTree(partPath.str());
  1280. if (!part)
  1281. {
  1282. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  1283. part = resource.queryPropTree(partPath.str());
  1284. }
  1285. assertex(part);
  1286. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  1287. assertex(size != (unsigned __int64) -1);
  1288. map[i].base = i ? map[i-1].top : 0;
  1289. map[i].top = map[i].base + size;
  1290. }
  1291. if (totalSize == (offset_t)-1)
  1292. totalSize = map[numParts-1].top;
  1293. else if (totalSize != map[numParts-1].top)
  1294. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%"I64F"d vs %"I64F"d", map[numParts-1].top, totalSize);
  1295. }
  1296. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  1297. : fileName(_fileName)
  1298. {
  1299. numParts = fdesc.numParts();
  1300. IPropertyTree &props = fdesc.queryProperties();
  1301. recordCount = props.getPropInt64("@recordCount", -1);
  1302. totalSize = props.getPropInt64("@size", -1);
  1303. assertex(numParts);
  1304. map = new FilePartMapElement[numParts];
  1305. for (unsigned i = 0; i < numParts; i++)
  1306. {
  1307. IPartDescriptor &part = *fdesc.queryPart(i);
  1308. IPropertyTree &partProps = part.queryProperties();
  1309. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  1310. assertex(size != (unsigned __int64) -1);
  1311. map[i].base = i ? map[i-1].top : 0;
  1312. map[i].top = map[i].base + size;
  1313. }
  1314. if (totalSize == (offset_t)-1)
  1315. totalSize = map[numParts-1].top;
  1316. else if (totalSize != map[numParts-1].top)
  1317. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%"I64F"d vs %"I64F"d", map[numParts-1].top, totalSize);
  1318. }
  1319. ~CFilePartMap()
  1320. {
  1321. delete [] map;
  1322. }
  1323. virtual bool IsShared() const { return CInterface::IsShared(); };
  1324. virtual unsigned mapOffset(offset_t pos) const
  1325. {
  1326. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  1327. if (!part)
  1328. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %"I64F"d in file %s out of range (max offset permitted is %"I64F"d)", pos, fileName.sget(), totalSize);
  1329. return (part-map)+1;
  1330. }
  1331. virtual unsigned getNumParts() const
  1332. {
  1333. return numParts;
  1334. }
  1335. virtual offset_t getTotalSize() const
  1336. {
  1337. return totalSize;
  1338. }
  1339. virtual offset_t getRecordCount() const
  1340. {
  1341. return recordCount;
  1342. }
  1343. virtual offset_t getBase(unsigned part) const
  1344. {
  1345. if (part > numParts || part == 0)
  1346. {
  1347. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existant file part %d (valid are 1-%d)", part, numParts);
  1348. }
  1349. return map[part-1].base;
  1350. }
  1351. virtual offset_t getFileSize() const
  1352. {
  1353. return map[numParts-1].top;
  1354. }
  1355. };
  1356. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  1357. {
  1358. return new CFilePartMap(fileName, fdesc);
  1359. }
  1360. //====================================================================================================
  1361. class CFileIOArray : public CInterface, implements IFileIOArray
  1362. {
  1363. unsigned __int64 totalSize;
  1364. mutable CriticalSection crit;
  1365. mutable StringAttr id;
  1366. void _getId() const
  1367. {
  1368. md5_state_t md5;
  1369. md5_byte_t digest[16];
  1370. md5_init(&md5);
  1371. ForEachItemIn(idx, files)
  1372. {
  1373. IFileIO *file = files.item(idx);
  1374. if (file)
  1375. {
  1376. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  1377. }
  1378. }
  1379. md5_finish(&md5, digest);
  1380. char digestStr[33];
  1381. for (int i = 0; i < 16; i++)
  1382. {
  1383. sprintf(&digestStr[i*2],"%02x", digest[i]);
  1384. }
  1385. id.set(digestStr, 32);
  1386. }
  1387. public:
  1388. IMPLEMENT_IINTERFACE;
  1389. CFileIOArray()
  1390. {
  1391. valid = 0;
  1392. totalSize = (unsigned __int64) -1;
  1393. }
  1394. virtual bool IsShared() const { return CInterface::IsShared(); };
  1395. PointerIArrayOf<IFileIO> files;
  1396. Int64Array bases;
  1397. unsigned valid;
  1398. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base)
  1399. {
  1400. if (!files.isItem(partNo))
  1401. {
  1402. DBGLOG("getFilePart requested invalid part %d", partNo);
  1403. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  1404. }
  1405. IFileIO *file = files.item(partNo);
  1406. if (!file)
  1407. {
  1408. // DBGLOG("getFilePart requested nonBonded part %d", partNo);
  1409. // throw MakeStringException(ROXIE_FILE_FAIL, "getFilePart requested nonBonded part %d", partNo);
  1410. base = 0;
  1411. return NULL;
  1412. }
  1413. base = bases.item(partNo);
  1414. return LINK(file);
  1415. }
  1416. void addFile(IFileIO *f, offset_t base)
  1417. {
  1418. if (f)
  1419. valid++;
  1420. files.append(f);
  1421. bases.append(base);
  1422. }
  1423. virtual unsigned length()
  1424. {
  1425. return files.length();
  1426. }
  1427. virtual unsigned numValid()
  1428. {
  1429. return valid;
  1430. }
  1431. virtual bool isValid(unsigned partNo)
  1432. {
  1433. if (!files.isItem(partNo))
  1434. return false;
  1435. IFileIO *file = files.item(partNo);
  1436. if (!file)
  1437. return false;
  1438. return true;
  1439. }
  1440. virtual unsigned __int64 size()
  1441. {
  1442. CriticalBlock b(crit);
  1443. if (totalSize == (unsigned __int64) -1)
  1444. {
  1445. totalSize = 0;
  1446. ForEachItemIn(idx, files)
  1447. {
  1448. IFileIO *file = files.item(idx);
  1449. if (file)
  1450. totalSize += file->size();
  1451. }
  1452. }
  1453. return totalSize;
  1454. }
  1455. virtual StringBuffer &getId(StringBuffer &ret) const
  1456. {
  1457. CriticalBlock b(crit);
  1458. if (!id)
  1459. _getId();
  1460. return ret.append(id);
  1461. }
  1462. };
  1463. template <class X> class PerChannelCacheOf
  1464. {
  1465. PointerIArrayOf<X> cache;
  1466. IntArray channels;
  1467. public:
  1468. void set(X *value, unsigned channel)
  1469. {
  1470. cache.append(value);
  1471. channels.append(channel);
  1472. }
  1473. X *get(unsigned channel) const
  1474. {
  1475. ForEachItemIn(idx, channels)
  1476. {
  1477. if (channels.item(idx)==channel)
  1478. return cache.item(idx);
  1479. }
  1480. return NULL;
  1481. }
  1482. };
  1483. CRoxieFileCache * fileCache;
  1484. class CResolvedFile : public CInterface, implements IResolvedFileCreator, implements ISDSSubscription
  1485. {
  1486. protected:
  1487. IResolvedFileCache *cached;
  1488. StringAttr lfn;
  1489. StringAttr physicalName;
  1490. Owned<IDistributedFile> dFile; // NULL on copies serialized to slaves. Note that this implies we keep a lock on dali file for the lifetime of this object.
  1491. CDateTime fileTimeStamp;
  1492. offset_t fileSize;
  1493. unsigned fileCheckSum;
  1494. RoxieFileType fileType;
  1495. bool isSuper;
  1496. StringArray subNames;
  1497. PointerIArrayOf<IFileDescriptor> subFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1498. PointerIArrayOf<IFileDescriptor> remoteSubFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1499. PointerIArrayOf<IDefRecordMeta> diskMeta;
  1500. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  1501. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  1502. Owned <IPropertyTree> properties;
  1503. Owned<IDaliPackageWatcher> notifier;
  1504. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  1505. {
  1506. subNames.append(subName);
  1507. subFiles.append(fdesc);
  1508. remoteSubFiles.append(remoteFDesc);
  1509. IPropertyTree const & props = fdesc->queryProperties();
  1510. if(props.hasProp("_record_layout"))
  1511. {
  1512. MemoryBuffer mb;
  1513. props.getPropBin("_record_layout", mb);
  1514. diskMeta.append(deserializeRecordMeta(mb, true));
  1515. }
  1516. else
  1517. diskMeta.append(NULL);
  1518. unsigned numParts = fdesc->numParts();
  1519. offset_t base = 0;
  1520. for (unsigned i = 0; i < numParts; i++)
  1521. {
  1522. IPartDescriptor *pdesc = fdesc->queryPart(i);
  1523. IPropertyTree &partProps = pdesc->queryProperties();
  1524. offset_t dfsSize = partProps.getPropInt64("@size");
  1525. partProps.setPropInt64("@offset", base);
  1526. base += dfsSize;
  1527. }
  1528. fileSize += base;
  1529. }
  1530. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1531. {
  1532. if (traceLevel > 2)
  1533. DBGLOG("Superfile %s change detected", lfn.get());
  1534. {
  1535. CriticalBlock b(lock);
  1536. if (cached)
  1537. {
  1538. cached->removeCache(this);
  1539. cached = NULL;
  1540. }
  1541. }
  1542. globalPackageSetManager->requestReload();
  1543. }
  1544. // We cache all the file maps/arrays etc here.
  1545. mutable CriticalSection lock;
  1546. mutable Owned<IFilePartMap> fileMap;
  1547. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  1548. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  1549. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  1550. public:
  1551. IMPLEMENT_IINTERFACE;
  1552. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
  1553. : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
  1554. {
  1555. cached = NULL;
  1556. fileSize = 0;
  1557. fileCheckSum = 0;
  1558. if (dFile)
  1559. {
  1560. if (traceLevel > 5)
  1561. DBGLOG("Roxie server adding information for file %s", lfn.get());
  1562. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  1563. bool csSet = dFile->getFileCheckSum(fileCheckSum);
  1564. assertex(tsSet); // per Nigel, is always set
  1565. IDistributedSuperFile *superFile = dFile->querySuperFile();
  1566. if (superFile)
  1567. {
  1568. isSuper = true;
  1569. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  1570. ForEach(*subs)
  1571. {
  1572. IDistributedFile &sub = subs->query();
  1573. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  1574. Owned<IFileDescriptor> remoteFDesc;
  1575. if (daliHelper)
  1576. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt));
  1577. subDFiles.append(OLINK(sub));
  1578. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1579. }
  1580. // We have to clone the properties since we don't want to keep the superfile locked
  1581. properties.setown(createPTreeFromIPT(&dFile->queryAttributes()));
  1582. if (!isDynamic)
  1583. {
  1584. notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
  1585. dFile.clear(); // We don't lock superfiles, except dynamic ones
  1586. }
  1587. }
  1588. else // normal file, not superkey
  1589. {
  1590. isSuper = false;
  1591. properties.set(&dFile->queryAttributes());
  1592. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  1593. Owned<IFileDescriptor> remoteFDesc;
  1594. if (daliHelper)
  1595. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
  1596. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1597. }
  1598. }
  1599. }
  1600. virtual void beforeDispose()
  1601. {
  1602. notifier.clear();
  1603. if (cached)
  1604. {
  1605. cached->removeCache(this);
  1606. }
  1607. }
  1608. virtual unsigned numSubFiles() const
  1609. {
  1610. return subNames.length();
  1611. }
  1612. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  1613. {
  1614. if (subNames.isItem(num))
  1615. {
  1616. name.append(subNames.item(num));
  1617. return true;
  1618. }
  1619. else
  1620. {
  1621. return false;
  1622. }
  1623. }
  1624. virtual unsigned findSubName(const char *subname) const
  1625. {
  1626. ForEachItemIn(idx, subNames)
  1627. {
  1628. if (stricmp(subNames.item(idx), subname))
  1629. return idx;
  1630. }
  1631. return NotFound;
  1632. }
  1633. virtual unsigned getContents(StringArray &contents) const
  1634. {
  1635. ForEachItemIn(idx, subNames)
  1636. {
  1637. contents.append(subNames.item(idx));
  1638. }
  1639. return subNames.length();
  1640. }
  1641. virtual bool isSuperFile() const
  1642. {
  1643. return isSuper;
  1644. }
  1645. inline bool isKey() const
  1646. {
  1647. return fileType==ROXIE_KEY;
  1648. }
  1649. virtual IFilePartMap *getFileMap() const
  1650. {
  1651. CriticalBlock b(lock);
  1652. if (!fileMap)
  1653. {
  1654. if (subFiles.length())
  1655. {
  1656. assertex(subFiles.length()==1);
  1657. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  1658. }
  1659. }
  1660. return fileMap.getLink();
  1661. }
  1662. virtual void serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  1663. {
  1664. // Find all the partno's that go to this channel
  1665. unsigned numParts = fdesc->numParts();
  1666. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  1667. numParts--; // don't want to send TLK
  1668. UnsignedArray partNos;
  1669. for (unsigned i = 1; i <= numParts; i++)
  1670. {
  1671. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1672. if (getBondedChannel(i)==channel || !isLocal)
  1673. {
  1674. partNos.append(i-1);
  1675. }
  1676. }
  1677. fdesc->serializeParts(mb, partNos);
  1678. }
  1679. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const
  1680. {
  1681. if (traceLevel > 6)
  1682. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  1683. byte type = (byte) fileType;
  1684. mb.append(type);
  1685. fileTimeStamp.serialize(mb);
  1686. mb.append(fileCheckSum);
  1687. mb.append(fileSize);
  1688. unsigned numSubFiles = subFiles.length();
  1689. mb.append(numSubFiles);
  1690. ForEachItemIn(idx, subFiles)
  1691. {
  1692. mb.append(subNames.item(idx));
  1693. IFileDescriptor *fdesc = subFiles.item(idx);
  1694. serializeFDesc(mb, fdesc, channel, isLocal);
  1695. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1696. if (remoteFDesc)
  1697. {
  1698. mb.append(true);
  1699. serializeFDesc(mb, remoteFDesc, channel, isLocal);
  1700. }
  1701. else
  1702. mb.append(false);
  1703. if (fileType == ROXIE_KEY) // for now we only support translation on index files
  1704. {
  1705. IDefRecordMeta *meta = diskMeta.item(idx);
  1706. if (meta)
  1707. {
  1708. mb.append(true);
  1709. serializeRecordMeta(mb, meta, true);
  1710. }
  1711. else
  1712. mb.append(false);
  1713. }
  1714. }
  1715. if (properties)
  1716. {
  1717. mb.append(true);
  1718. properties->serialize(mb);
  1719. }
  1720. else
  1721. mb.append(false);
  1722. }
  1723. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  1724. {
  1725. CriticalBlock b(lock);
  1726. IFileIOArray *ret = ioArrayMap.get(channel);
  1727. if (!ret)
  1728. {
  1729. ret = createIFileIOArray(isOpt, channel);
  1730. ioArrayMap.set(ret, channel);
  1731. }
  1732. return LINK(ret);
  1733. }
  1734. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  1735. {
  1736. Owned<CFileIOArray> f = new CFileIOArray();
  1737. f->addFile(NULL, 0);
  1738. if (subFiles.length())
  1739. {
  1740. IFileDescriptor *fdesc = subFiles.item(0);
  1741. IFileDescriptor *remoteFDesc = remoteSubFiles.item(0);
  1742. if (fdesc)
  1743. {
  1744. unsigned numParts = fdesc->numParts();
  1745. for (unsigned i = 1; i <= numParts; i++)
  1746. {
  1747. if (!channel || getBondedChannel(i)==channel)
  1748. {
  1749. try
  1750. {
  1751. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1752. assertex(pdesc);
  1753. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  1754. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  1755. IPropertyTree &partProps = pdesc->queryProperties();
  1756. f->addFile(file.getClear(), partProps.getPropInt64("@offset"));
  1757. }
  1758. catch (IException *E)
  1759. {
  1760. StringBuffer err;
  1761. err.append("Could not load file ");
  1762. fdesc->getTraceName(err);
  1763. DBGLOG(E, err.str());
  1764. if (!isOpt)
  1765. throw;
  1766. E->Release();
  1767. f->addFile(NULL, 0);
  1768. }
  1769. }
  1770. else
  1771. f->addFile(NULL, 0);
  1772. }
  1773. }
  1774. }
  1775. return f.getClear();
  1776. }
  1777. virtual IKeyArray *getKeyArray(IDefRecordMeta *activityMeta, TranslatorArray *translators, bool isOpt, unsigned channel, bool allowFieldTranslation) const
  1778. {
  1779. unsigned maxParts = 0;
  1780. ForEachItemIn(subFile, subFiles)
  1781. {
  1782. IFileDescriptor *fdesc = subFiles.item(subFile);
  1783. if (fdesc)
  1784. {
  1785. unsigned numParts = fdesc->numParts();
  1786. if (numParts > 1)
  1787. numParts--; // Don't include TLK
  1788. if (numParts > maxParts)
  1789. maxParts = numParts;
  1790. }
  1791. IDefRecordMeta *thisDiskMeta = diskMeta.item(subFile);
  1792. if (fdesc && thisDiskMeta && activityMeta && !thisDiskMeta->equals(activityMeta))
  1793. if (allowFieldTranslation)
  1794. translators->append(createRecordLayoutTranslator(lfn, thisDiskMeta, activityMeta));
  1795. else
  1796. {
  1797. DBGLOG("Key layout mismatch: %s", lfn.get());
  1798. StringBuffer q, d;
  1799. getRecordMetaAsString(q, activityMeta);
  1800. getRecordMetaAsString(d, thisDiskMeta);
  1801. DBGLOG("Activity: %s", q.str());
  1802. DBGLOG("Disk: %s", d.str());
  1803. throw MakeStringException(ROXIE_MISMATCH, "Key layout mismatch detected for index %s", lfn.get());
  1804. }
  1805. else
  1806. translators->append(NULL);
  1807. }
  1808. CriticalBlock b(lock);
  1809. IKeyArray *ret = keyArrayMap.get(channel);
  1810. if (!ret)
  1811. {
  1812. ret = createKeyArray(isOpt, channel, maxParts);
  1813. keyArrayMap.set(ret, channel);
  1814. }
  1815. return LINK(ret);
  1816. }
  1817. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  1818. {
  1819. Owned<IKeyArray> ret = ::createKeyArray();
  1820. if (channel)
  1821. {
  1822. ret->addKey(NULL);
  1823. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  1824. {
  1825. if (channel == getBondedChannel(partNo))
  1826. {
  1827. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  1828. ForEachItemIn(idx, subFiles)
  1829. {
  1830. IFileDescriptor *fdesc = subFiles.item(idx);
  1831. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1832. Owned <ILazyFileIO> part;
  1833. unsigned crc = 0;
  1834. if (fdesc) // NB there may be no parts for this channel
  1835. {
  1836. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  1837. if (pdesc)
  1838. {
  1839. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  1840. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  1841. pdesc->getCrc(crc);
  1842. }
  1843. }
  1844. if (part)
  1845. {
  1846. if (lazyOpen)
  1847. {
  1848. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  1849. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), false, false));
  1850. }
  1851. else
  1852. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), false, false));
  1853. }
  1854. else
  1855. keyset->addIndex(NULL);
  1856. }
  1857. ret->addKey(keyset.getClear());
  1858. }
  1859. else
  1860. ret->addKey(NULL);
  1861. }
  1862. }
  1863. else
  1864. {
  1865. // Channel 0 means return the TLK
  1866. IArrayOf<IKeyIndexBase> subkeys;
  1867. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  1868. ForEachItemIn(idx, subFiles)
  1869. {
  1870. IFileDescriptor *fdesc = subFiles.item(idx);
  1871. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1872. Owned<IKeyIndexBase> key;
  1873. if (fdesc)
  1874. {
  1875. unsigned numParts = fdesc->numParts();
  1876. assertex(numParts > 0);
  1877. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  1878. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  1879. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  1880. unsigned crc = 0;
  1881. pdesc->getCrc(crc);
  1882. StringBuffer pname;
  1883. pdesc->getPath(pname);
  1884. if (lazyOpen)
  1885. {
  1886. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  1887. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), numParts>1, false));
  1888. }
  1889. else
  1890. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), numParts>1, false));
  1891. keyset->addIndex(LINK(key->queryPart(0)));
  1892. }
  1893. else
  1894. keyset->addIndex(NULL);
  1895. }
  1896. if (keyset->numParts())
  1897. ret->addKey(keyset.getClear());
  1898. else if (!isOpt)
  1899. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  1900. else if (traceLevel > 4)
  1901. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  1902. }
  1903. return ret.getClear();
  1904. }
  1905. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IFileIOArray *files, IRecordSize *recs, bool preload, int numKeys) const
  1906. {
  1907. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  1908. // Failures to resolve will not be cached, only successes.
  1909. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  1910. CriticalBlock b(lock);
  1911. IInMemoryIndexManager *ret = indexMap.get(channel);
  1912. if (!ret)
  1913. {
  1914. ret = createInMemoryIndexManager(isOpt, lfn);
  1915. ret->load(files, recs, preload, numKeys); // note - files (passed in) are channel specific
  1916. indexMap.set(ret, channel);
  1917. }
  1918. return LINK(ret);
  1919. }
  1920. virtual const CDateTime &queryTimeStamp() const
  1921. {
  1922. return fileTimeStamp;
  1923. }
  1924. virtual unsigned queryCheckSum() const
  1925. {
  1926. return fileCheckSum;
  1927. }
  1928. virtual offset_t getFileSize() const
  1929. {
  1930. return fileSize;
  1931. }
  1932. virtual hash64_t addHash64(hash64_t hashValue) const
  1933. {
  1934. hashValue = rtlHash64Data(sizeof(fileTimeStamp), &fileTimeStamp, hashValue);
  1935. if (fileCheckSum)
  1936. hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
  1937. return hashValue;
  1938. }
  1939. virtual void addSubFile(const IResolvedFile *_sub)
  1940. {
  1941. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  1942. if (subFiles.length())
  1943. assertex(sub->fileType==fileType);
  1944. else
  1945. fileType = sub->fileType;
  1946. subRFiles.append((IResolvedFile &) *LINK(_sub));
  1947. ForEachItemIn(idx, sub->subFiles)
  1948. {
  1949. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  1950. }
  1951. }
  1952. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  1953. {
  1954. addFile(lfn, _sub, _remoteSub);
  1955. }
  1956. virtual void addSubFile(const char *localFileName)
  1957. {
  1958. Owned<IFile> file = createIFile(localFileName);
  1959. assertex(file->exists());
  1960. offset_t size = file->size();
  1961. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  1962. Owned<IPropertyTree> pp = createPTree("Part");
  1963. pp->setPropInt64("@size",size);
  1964. pp->setPropBool("@local", true);
  1965. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  1966. addSubFile(fdesc.getClear(), NULL);
  1967. }
  1968. virtual void setCache(IResolvedFileCache *cache)
  1969. {
  1970. if (cached)
  1971. {
  1972. if (traceLevel > 9)
  1973. DBGLOG("setCache removing from prior cache %s", queryFileName());
  1974. if (cache==NULL)
  1975. cached->removeCache(this);
  1976. else
  1977. throwUnexpected();
  1978. }
  1979. cached = cache;
  1980. }
  1981. virtual bool isAlive() const
  1982. {
  1983. return CInterface::isAlive();
  1984. }
  1985. virtual const char *queryFileName() const
  1986. {
  1987. return lfn.get();
  1988. }
  1989. virtual const char *queryPhysicalName() const
  1990. {
  1991. return physicalName.get();
  1992. }
  1993. virtual const IPropertyTree *queryProperties() const
  1994. {
  1995. return properties;
  1996. }
  1997. virtual void remove()
  1998. {
  1999. subFiles.kill();
  2000. subDFiles.kill();
  2001. subRFiles.kill();
  2002. subNames.kill();
  2003. remoteSubFiles.kill();
  2004. diskMeta.kill();
  2005. properties.clear();
  2006. notifier.clear();
  2007. if (isSuper)
  2008. {
  2009. // Because we don't lock superfiles, we need to behave differently
  2010. UNIMPLEMENTED;
  2011. }
  2012. else if (dFile)
  2013. {
  2014. dFile->detach();
  2015. }
  2016. else
  2017. {
  2018. try
  2019. {
  2020. Owned<IFile> file = createIFile(physicalName.get());
  2021. file->remove();
  2022. }
  2023. catch (IException *e)
  2024. {
  2025. ERRLOG(-1, "Error removing file %s",lfn.get());
  2026. e->Release();
  2027. }
  2028. }
  2029. }
  2030. virtual bool exists() const
  2031. {
  2032. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  2033. // This will make more sense if/when we start to lock earlier.
  2034. if (dFile || isSuper)
  2035. return true; // MORE - may need some thought - especially the isSuper case
  2036. else
  2037. return checkFileExists(lfn.get());
  2038. }
  2039. };
  2040. /*----------------------------------------------------------------------------------------------------------
  2041. MORE
  2042. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  2043. ------------------------------------------------------------------------------------------------------------*/
  2044. class CSlaveDynamicFile : public CResolvedFile
  2045. {
  2046. public:
  2047. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  2048. bool isLocal;
  2049. unsigned channel;
  2050. unsigned serverIdx;
  2051. public:
  2052. CSlaveDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  2053. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
  2054. {
  2055. // call back to the server to get the info
  2056. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  2057. try
  2058. {
  2059. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK);
  2060. bool ok = false;
  2061. for (unsigned i = 0; i < callbackRetries; i++)
  2062. {
  2063. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  2064. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  2065. char *buf = (char *) output->getBuffer(len, true);
  2066. buf[0] = isOpt;
  2067. buf[1] = isLocal;
  2068. strcpy(buf+2, lfn.get());
  2069. output->putBuffer(buf, len, true);
  2070. output->flush(true);
  2071. output.clear();
  2072. if (callback->wait(callbackTimeout))
  2073. {
  2074. ok = true;
  2075. break;
  2076. }
  2077. else
  2078. {
  2079. DBGLOG("timed out waiting for server callback - retrying");
  2080. }
  2081. }
  2082. if (ok)
  2083. {
  2084. if (traceLevel > 6)
  2085. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  2086. MemoryBuffer &serverData = callback->queryData();
  2087. byte type;
  2088. serverData.read(type);
  2089. fileType = (RoxieFileType) type;
  2090. fileTimeStamp.deserialize(serverData);
  2091. serverData.read(fileCheckSum);
  2092. serverData.read(fileSize);
  2093. unsigned numSubFiles;
  2094. serverData.read(numSubFiles);
  2095. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  2096. {
  2097. StringBuffer subName;
  2098. serverData.read(subName);
  2099. subNames.append(subName.str());
  2100. deserializeFilePart(serverData, subFiles, fileNo, false);
  2101. bool remotePresent;
  2102. serverData.read(remotePresent);
  2103. if (remotePresent)
  2104. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  2105. else
  2106. remoteSubFiles.append(NULL);
  2107. if (fileType==ROXIE_KEY)
  2108. {
  2109. bool diskMetaPresent;
  2110. serverData.read(diskMetaPresent);
  2111. if (diskMetaPresent)
  2112. diskMeta.append(deserializeRecordMeta(serverData, true));
  2113. else
  2114. diskMeta.append(NULL);
  2115. }
  2116. }
  2117. bool propertiesPresent;
  2118. serverData.read(propertiesPresent);
  2119. if (propertiesPresent)
  2120. properties.setown(createPTree(serverData));
  2121. }
  2122. else
  2123. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  2124. }
  2125. catch (...)
  2126. {
  2127. ROQ->removePendingCallback(callback);
  2128. throw;
  2129. }
  2130. ROQ->removePendingCallback(callback);
  2131. }
  2132. private:
  2133. void deserializeFilePart(MemoryBuffer &serverData, PointerIArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  2134. {
  2135. IArrayOf<IPartDescriptor> parts;
  2136. deserializePartFileDescriptors(serverData, parts);
  2137. if (parts.length())
  2138. {
  2139. files.append(LINK(&parts.item(0).queryOwner()));
  2140. }
  2141. else
  2142. {
  2143. if (traceLevel > 6)
  2144. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  2145. files.append(NULL);
  2146. }
  2147. }
  2148. };
  2149. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  2150. {
  2151. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
  2152. }
  2153. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
  2154. {
  2155. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  2156. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
  2157. }
  2158. class CSlaveDynamicFileCache : public CInterface, implements ISlaveDynamicFileCache
  2159. {
  2160. mutable CriticalSection crit;
  2161. CIArrayOf<CSlaveDynamicFile> files; // expect numbers to be small - probably not worth hashing
  2162. unsigned tableSize;
  2163. public:
  2164. IMPLEMENT_IINTERFACE;
  2165. CSlaveDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  2166. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal)
  2167. {
  2168. if (logctx.queryTraceLevel() > 5)
  2169. {
  2170. StringBuffer s;
  2171. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  2172. }
  2173. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  2174. CriticalBlock b(crit);
  2175. if (!cacheDate.isNull())
  2176. {
  2177. unsigned idx = 0;
  2178. while (files.isItem(idx))
  2179. {
  2180. CSlaveDynamicFile &f = files.item(idx);
  2181. if (f.channel==header->channel && f.serverIdx==header->serverIdx && stricmp(f.queryFileName(), lfn)==0)
  2182. {
  2183. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  2184. {
  2185. if (f.isKey())
  2186. clearKeyStoreCacheEntry(f.queryFileName());
  2187. files.remove(idx);
  2188. idx--;
  2189. }
  2190. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  2191. {
  2192. files.swap(idx, 0);
  2193. return LINK(&f);
  2194. }
  2195. }
  2196. idx++;
  2197. }
  2198. }
  2199. Owned<CSlaveDynamicFile> ret;
  2200. {
  2201. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  2202. CriticalUnblock b1(crit);
  2203. ret.setown(new CSlaveDynamicFile(logctx, lfn, header, isOpt, isLocal));
  2204. }
  2205. while (files.length() > tableSize)
  2206. files.remove(files.length()-1);
  2207. files.add(*ret.getLink(), 0);
  2208. return ret.getClear();
  2209. }
  2210. };
  2211. static CriticalSection slaveDynamicFileCacheCrit;
  2212. static Owned<ISlaveDynamicFileCache> slaveDynamicFileCache;
  2213. extern ISlaveDynamicFileCache *querySlaveDynamicFileCache()
  2214. {
  2215. if (!slaveDynamicFileCache)
  2216. {
  2217. CriticalBlock b(slaveDynamicFileCacheCrit);
  2218. if (!slaveDynamicFileCache)
  2219. slaveDynamicFileCache.setown(new CSlaveDynamicFileCache(20));
  2220. }
  2221. return slaveDynamicFileCache;
  2222. }
  2223. extern void releaseSlaveDynamicFileCache()
  2224. {
  2225. CriticalBlock b(slaveDynamicFileCacheCrit);
  2226. slaveDynamicFileCache.clear();
  2227. }
  2228. // Initialization/termination
  2229. MODULE_INIT(INIT_PRIORITY_STANDARD)
  2230. {
  2231. fileCache = new CRoxieFileCache;
  2232. return true;
  2233. }
  2234. MODULE_EXIT()
  2235. {
  2236. fileCache->join();
  2237. fileCache->Release();
  2238. }
  2239. extern IRoxieFileCache &queryFileCache()
  2240. {
  2241. return *fileCache;
  2242. }
  2243. class CRoxieWriteHandler : public CInterface, implements IRoxieWriteHandler
  2244. {
  2245. public:
  2246. IMPLEMENT_IINTERFACE;
  2247. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2248. : daliHelper(_daliHelper), dFile(_dFile)
  2249. {
  2250. ForEachItemIn(idx, _clusters)
  2251. {
  2252. addCluster(_clusters.item(idx));
  2253. }
  2254. if (dFile->queryDistributedFile())
  2255. {
  2256. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  2257. if (isTemporary)
  2258. {
  2259. UNIMPLEMENTED;
  2260. }
  2261. else
  2262. localFile.setown(dFile->getPartFile(0, 0));
  2263. }
  2264. else
  2265. {
  2266. isTemporary = false;
  2267. localFile.setown(dFile->getPartFile(0, 0));
  2268. }
  2269. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  2270. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  2271. }
  2272. virtual IFile *queryFile() const
  2273. {
  2274. return localFile;
  2275. }
  2276. void getClusters(StringArray &clusters) const
  2277. {
  2278. ForEachItemIn(idx, allClusters)
  2279. {
  2280. clusters.append(allClusters.item(idx));
  2281. }
  2282. }
  2283. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  2284. {
  2285. if (success)
  2286. {
  2287. copyPhysical();
  2288. if (daliHelper && daliHelper->connected())
  2289. publish(activity);
  2290. }
  2291. if (isTemporary || !success)
  2292. {
  2293. localFile->remove();
  2294. }
  2295. }
  2296. private:
  2297. bool isTemporary;
  2298. Linked<IRoxieDaliHelper> daliHelper;
  2299. Owned<ILocalOrDistributedFile> dFile;
  2300. Owned<IFile> localFile;
  2301. Owned<IGroup> localCluster;
  2302. StringAttr localClusterName;
  2303. IArrayOf<IGroup> remoteNodes;
  2304. StringArray allClusters;
  2305. void copyPhysical() const
  2306. {
  2307. if (remoteNodes.length())
  2308. {
  2309. RemoteFilename rfn, rdn;
  2310. dFile->getPartFilename(rfn, 0, 0);
  2311. StringBuffer physicalName, physicalDir, physicalBase;
  2312. rfn.getLocalPath(physicalName);
  2313. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2314. rdn.setLocalPath(physicalDir.str());
  2315. ForEachItemIn(idx, remoteNodes)
  2316. {
  2317. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2318. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2319. Owned<IFile> targetdir = createIFile(rdn);
  2320. Owned<IFile> target = createIFile(rfn);
  2321. targetdir->createDirectory();
  2322. copyFile(target, localFile);
  2323. }
  2324. }
  2325. }
  2326. void publish(const IRoxiePublishCallback *activity)
  2327. {
  2328. if (!dFile->isExternal())
  2329. {
  2330. Owned<IFileDescriptor> desc = createFileDescriptor();
  2331. desc->setNumParts(1);
  2332. RemoteFilename rfn;
  2333. dFile->getPartFilename(rfn, 0, 0);
  2334. StringBuffer physicalName, physicalDir, physicalBase;
  2335. rfn.getLocalPath(physicalName);
  2336. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2337. desc->setDefaultDir(physicalDir.str());
  2338. desc->setPartMask(physicalBase.str());
  2339. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  2340. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  2341. offset_t fileSize = localFile->size();
  2342. fileProps.setPropInt64("@size", fileSize);
  2343. partProps.setPropInt64("@size", fileSize);
  2344. CDateTime createTime, modifiedTime, accessedTime;
  2345. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  2346. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  2347. unsigned hour, min, sec, nanosec;
  2348. modifiedTime.getTime(hour, min, sec, nanosec);
  2349. modifiedTime.setTime(hour, min, sec, 0);
  2350. StringBuffer timestr;
  2351. modifiedTime.getString(timestr);
  2352. if(timestr.length())
  2353. partProps.setProp("@modified", timestr.str());
  2354. ClusterPartDiskMapSpec partmap;
  2355. if (localCluster)
  2356. {
  2357. desc->addCluster(localCluster, partmap);
  2358. desc->setClusterGroupName(0, localClusterName.get());
  2359. }
  2360. ForEachItemIn(idx, remoteNodes)
  2361. desc->addCluster(&remoteNodes.item(idx), partmap);
  2362. if (activity)
  2363. activity->setFileProperties(desc);
  2364. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  2365. publishFile->setAccessedTime(modifiedTime);
  2366. publishFile->attach(dFile->queryLogicalName(), activity ? activity->queryUserDescriptor() : UNKNOWN_USER);
  2367. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  2368. }
  2369. }
  2370. void addCluster(char const * cluster)
  2371. {
  2372. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  2373. if (!group)
  2374. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  2375. cluster, dFile->queryLogicalName());
  2376. if (group->isMember())
  2377. {
  2378. if (localCluster)
  2379. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2380. cluster, dFile->queryLogicalName());
  2381. localCluster.setown(group.getClear());
  2382. localClusterName.set(cluster);
  2383. }
  2384. else
  2385. {
  2386. ForEachItemIn(idx, remoteNodes)
  2387. {
  2388. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  2389. if (group->isMember(other))
  2390. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2391. cluster, dFile->queryLogicalName());
  2392. }
  2393. remoteNodes.append(*group.getClear());
  2394. }
  2395. allClusters.append(cluster);
  2396. }
  2397. };
  2398. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2399. {
  2400. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  2401. }