ccdfile.cpp 106 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "ccd.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdquery.hpp"
  26. #include "ccdstate.hpp"
  27. #include "ccdsnmp.hpp"
  28. #include "rmtfile.hpp"
  29. #include "ccdqueue.ipp"
  30. #ifdef __linux__
  31. #include <sys/mman.h>
  32. #endif
  33. #if defined (__linux__)
  34. #include <sys/syscall.h>
  35. #include "ioprio.h"
  36. #endif
  37. #include "thorcommon.hpp"
  38. #include "eclhelper_dyn.hpp"
  39. #include "rtldynfield.hpp"
  40. atomic_t numFilesOpen[2];
  41. #define MAX_READ_RETRIES 2
  42. #ifdef _DEBUG
  43. //#define FAIL_20_READ
  44. //#define FAIL_20_OPEN
  45. #endif
  46. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  47. class DECL_EXCEPTION NotYetOpenException : implements IException, public CInterface
  48. {
  49. public:
  50. IMPLEMENT_IINTERFACE;
  51. virtual int errorCode() const { return 0; }
  52. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  53. virtual MessageAudience errorAudience() const { return MSGAUD_internal; }
  54. };
  55. class CFailingFileIO : implements IFileIO, public CInterface
  56. {
  57. #define THROWNOTOPEN throw new NotYetOpenException()
  58. public:
  59. IMPLEMENT_IINTERFACE;
  60. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  61. virtual offset_t size() { THROWNOTOPEN; }
  62. virtual void flush() { THROWNOTOPEN; }
  63. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  64. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  65. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  66. virtual void close() { }
  67. virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
  68. } failure;
  69. class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public CInterface
  70. {
  71. protected:
  72. IArrayOf<IFile> sources;
  73. Owned<IFile> logical;
  74. unsigned currentIdx;
  75. Owned<IFileIO> current;
  76. Owned<IMemoryMappedFile> mmapped;
  77. mutable CriticalSection crit;
  78. bool remote;
  79. offset_t fileSize;
  80. CDateTime fileDate;
  81. unsigned lastAccess;
  82. bool copying;
  83. bool isCompressed;
  84. const IRoxieFileCache *cached;
  85. CRuntimeStatisticCollection fileStats;
  86. #ifdef FAIL_20_READ
  87. unsigned readCount;
  88. #endif
  89. public:
  90. IMPLEMENT_IINTERFACE;
  91. CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed)
  92. : logical(_logical), fileSize(size), isCompressed(_isCompressed), fileStats(diskLocalStatistics)
  93. {
  94. fileDate.set(_date);
  95. currentIdx = 0;
  96. current.set(&failure);
  97. remote = false;
  98. #ifdef FAIL_20_READ
  99. readCount = 0;
  100. #endif
  101. lastAccess = msTick();
  102. copying = false;
  103. cached = NULL;
  104. }
  105. ~CRoxieLazyFileIO()
  106. {
  107. setFailure(); // ensures the open file count properly maintained
  108. }
  109. virtual void beforeDispose()
  110. {
  111. if (cached)
  112. cached->removeCache(this);
  113. }
  114. void setCache(const IRoxieFileCache *cache)
  115. {
  116. assertex(!cached);
  117. cached = cache;
  118. }
  119. void removeCache(const IRoxieFileCache *cache)
  120. {
  121. assertex(cached==cache);
  122. cached = NULL;
  123. }
  124. inline void setRemote(bool _remote) { remote = _remote; }
  125. virtual void setCopying(bool _copying)
  126. {
  127. CriticalBlock b(crit);
  128. if (remote && currentIdx)
  129. {
  130. // The current location is not our preferred location. Recheck whether we can now access our preferred location
  131. setFailure();
  132. currentIdx = 0;
  133. _checkOpen();
  134. }
  135. copying = _copying;
  136. }
  137. virtual bool isCopying() const
  138. {
  139. CriticalBlock b(crit);
  140. return copying;
  141. }
  142. virtual bool isOpen() const
  143. {
  144. CriticalBlock b(crit);
  145. return current.get() != &failure;
  146. }
  147. virtual unsigned getLastAccessed() const
  148. {
  149. CriticalBlock b(crit);
  150. return lastAccess;
  151. }
  152. virtual void close()
  153. {
  154. CriticalBlock b(crit);
  155. setFailure();
  156. }
  157. virtual bool isRemote()
  158. {
  159. CriticalBlock b(crit);
  160. return remote;
  161. }
  162. void setFailure()
  163. {
  164. try
  165. {
  166. if (current.get()==&failure)
  167. return;
  168. atomic_dec(&numFilesOpen[remote]);
  169. mergeStats(fileStats, current);
  170. current.set(&failure);
  171. }
  172. catch (IException *E)
  173. {
  174. if (traceLevel > 5)
  175. {
  176. StringBuffer s;
  177. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  178. }
  179. E->Release();
  180. }
  181. }
  182. void checkOpen()
  183. {
  184. CriticalBlock b(crit);
  185. _checkOpen();
  186. }
  187. IFileIO *getCheckOpen(unsigned &activeIdx)
  188. {
  189. CriticalBlock b(crit);
  190. _checkOpen();
  191. activeIdx = currentIdx;
  192. return LINK(current);
  193. }
  194. void _checkOpen()
  195. {
  196. if (current.get() == &failure)
  197. {
  198. StringBuffer filesTried;
  199. unsigned tries = 0;
  200. bool firstTime = true;
  201. RoxieFileStatus fileStatus = FileNotFound;
  202. for (;;)
  203. {
  204. if (currentIdx >= sources.length())
  205. currentIdx = 0;
  206. if (tries==sources.length())
  207. {
  208. if (firstTime) // if first time - reset and try again
  209. {
  210. firstTime = false;
  211. tries = 0;
  212. }
  213. else
  214. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  215. }
  216. const char *sourceName = sources.item(currentIdx).queryFilename();
  217. if (traceLevel > 10)
  218. DBGLOG("Trying to open %s", sourceName);
  219. try
  220. {
  221. #ifdef FAIL_20_OPEN
  222. openCount++;
  223. if ((openCount % 5) == 0)
  224. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  225. #endif
  226. IFile *f = &sources.item(currentIdx);
  227. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, isCompressed, false);
  228. if (fileStatus == FileIsValid)
  229. {
  230. if (isCompressed)
  231. current.setown(createCompressedFileReader(f));
  232. else
  233. current.setown(f->open(IFOread));
  234. if (current)
  235. {
  236. if (traceLevel > 5)
  237. DBGLOG("Opening %s", sourceName);
  238. disconnectRemoteIoOnExit(current);
  239. break;
  240. }
  241. // throwUnexpected(); - try another location if this one has the wrong version of the file
  242. }
  243. disconnectRemoteFile(f);
  244. }
  245. catch (IException *E)
  246. {
  247. E->Release();
  248. }
  249. currentIdx++;
  250. tries++;
  251. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  252. {
  253. filesTried.appendf(" %s", sourceName); // only need to build this list once
  254. switch (fileStatus)
  255. {
  256. case FileNotFound:
  257. filesTried.append(": FileNotFound");
  258. break;
  259. case FileSizeMismatch:
  260. filesTried.append(": FileSizeMismatch");
  261. break;
  262. case FileDateMismatch:
  263. filesTried.append(": FileDateMismatch");
  264. break;
  265. }
  266. }
  267. }
  268. lastAccess = msTick();
  269. atomic_inc(&numFilesOpen[remote]);
  270. if ((unsigned) atomic_read(&numFilesOpen[remote]) > maxFilesOpen[remote])
  271. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  272. }
  273. }
  274. virtual void addSource(IFile *newSource)
  275. {
  276. if (newSource)
  277. {
  278. if (traceLevel > 10)
  279. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  280. CriticalBlock b(crit);
  281. sources.append(*newSource);
  282. }
  283. }
  284. virtual size32_t read(offset_t pos, size32_t len, void * data)
  285. {
  286. unsigned activeIdx;
  287. Owned<IFileIO> active = getCheckOpen(activeIdx);
  288. unsigned tries = 0;
  289. for (;;)
  290. {
  291. try
  292. {
  293. size32_t ret = active->read(pos, len, data);
  294. lastAccess = msTick();
  295. return ret;
  296. }
  297. catch (NotYetOpenException *E)
  298. {
  299. E->Release();
  300. }
  301. catch (IException *E)
  302. {
  303. EXCLOG(MCoperatorError, E, "Read error");
  304. E->Release();
  305. DBGLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(activeIdx).queryFilename());
  306. {
  307. CriticalBlock b(crit);
  308. if (currentIdx == activeIdx)
  309. {
  310. currentIdx = activeIdx+1;
  311. setFailure();
  312. }
  313. }
  314. }
  315. active.setown(getCheckOpen(activeIdx));
  316. tries++;
  317. if (tries == MAX_READ_RETRIES)
  318. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(activeIdx).queryFilename(), tries);
  319. }
  320. }
  321. virtual void flush()
  322. {
  323. Linked<IFileIO> active;
  324. {
  325. CriticalBlock b(crit);
  326. active.set(current);
  327. }
  328. if (active.get() != &failure)
  329. active->flush();
  330. }
  331. virtual offset_t size()
  332. {
  333. unsigned activeIdx;
  334. Owned<IFileIO> active = getCheckOpen(activeIdx);
  335. lastAccess = msTick();
  336. return active->size();
  337. }
  338. virtual unsigned __int64 getStatistic(StatisticKind kind)
  339. {
  340. unsigned __int64 v = fileStats.getStatisticValue(kind);
  341. CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
  342. return v + current->getStatistic(kind);
  343. }
  344. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  345. virtual void setSize(offset_t size) { throwUnexpected(); }
  346. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  347. virtual const char *queryFilename() { return logical->queryFilename(); }
  348. virtual bool isAlive() const { return CInterface::isAlive(); }
  349. virtual IMemoryMappedFile *getMappedFile() override
  350. {
  351. CriticalBlock b(crit);
  352. if (mmapped)
  353. return mmapped.getLink();
  354. if (!remote)
  355. {
  356. mmapped.setown(logical->openMemoryMapped());
  357. return mmapped.getLink();
  358. }
  359. return nullptr;
  360. }
  361. virtual IFileIO *getFileIO() override
  362. {
  363. return LINK(this);
  364. }
  365. virtual bool createHardFileLink()
  366. {
  367. unsigned tries = 0;
  368. for (;;)
  369. {
  370. StringBuffer filesTried;
  371. if (currentIdx >= sources.length())
  372. currentIdx = 0;
  373. if (tries==sources.length())
  374. return false;
  375. const char *sourceName = sources.item(currentIdx).queryFilename();
  376. filesTried.appendf(" %s", sourceName);
  377. try
  378. {
  379. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, isCompressed) == FileIsValid)
  380. {
  381. StringBuffer source_drive;
  382. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  383. StringBuffer query_drive;
  384. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  385. // only try to create link if on the same drive
  386. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  387. {
  388. try
  389. {
  390. DBGLOG("Trying to create Hard Link for %s", sourceName);
  391. createHardLink(logical->queryFilename(), sourceName);
  392. current.setown(sources.item(currentIdx).open(IFOread));
  393. return true;
  394. }
  395. catch(IException *E)
  396. {
  397. StringBuffer err;
  398. DBGLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  399. E->Release();
  400. }
  401. }
  402. }
  403. }
  404. catch (IException *E)
  405. {
  406. E->Release();
  407. }
  408. currentIdx++;
  409. tries++;
  410. }
  411. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  412. return false; // if we get here - no hardlink
  413. }
  414. void copyComplete()
  415. {
  416. {
  417. CriticalBlock b(crit);
  418. setFailure(); // lazyOpen will then reopen it...
  419. currentIdx = 0;
  420. remote = false;
  421. copying = false;
  422. sources.kill();
  423. sources.add(*logical.getLink(), 0);
  424. if (!lazyOpen)
  425. _checkOpen();
  426. }
  427. }
  428. virtual IFile *querySource()
  429. {
  430. CriticalBlock b(crit);
  431. _checkOpen();
  432. return &sources.item(currentIdx);
  433. };
  434. virtual IFile *queryTarget() { return logical; }
  435. virtual offset_t getSize() { return fileSize; }
  436. virtual CDateTime *queryDateTime() { return &fileDate; }
  437. static int compareAccess(IInterface * const *L, IInterface * const *R)
  438. {
  439. ILazyFileIO *LL = (ILazyFileIO *) *L;
  440. ILazyFileIO *RR = (ILazyFileIO *) *R;
  441. return LL->getLastAccessed() - RR->getLastAccessed();
  442. }
  443. };
  444. //----------------------------------------------------------------------------------------------
  445. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  446. {
  447. if (!remoteFDesc)
  448. return NULL;
  449. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  450. if (!remotePDesc)
  451. return NULL;
  452. unsigned int crc, remoteCrc;
  453. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  454. return remotePDesc;
  455. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  456. return remotePDesc;
  457. return NULL;
  458. }
  459. static int getClusterPriority(const char *clusterName)
  460. {
  461. assertex(preferredClusters);
  462. int *priority = preferredClusters->getValue(clusterName);
  463. return priority ? *priority : 100;
  464. }
  465. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
  466. {
  467. IFileDescriptor &fdesc = pdesc->queryOwner();
  468. unsigned numCopies = pdesc->numCopies();
  469. unsigned lastClusterNo = (unsigned) -1;
  470. unsigned numThisCluster = 0;
  471. unsigned initialSize = locations.length();
  472. int priority = 0;
  473. IntArray priorities;
  474. for (unsigned copy = 0; copy < numCopies; copy++)
  475. {
  476. unsigned clusterNo = pdesc->copyClusterNum(copy);
  477. StringBuffer clusterName;
  478. fdesc.getClusterGroupName(clusterNo, clusterName);
  479. if (fromCluster && *fromCluster)
  480. {
  481. bool matches = strieq(clusterName.str(), fromCluster);
  482. if (matches!=includeFromCluster)
  483. continue;
  484. }
  485. RemoteFilename r;
  486. pdesc->getFilename(copy,r);
  487. StringBuffer path;
  488. r.getRemotePath(path);
  489. if (localFileName && r.isLocal())
  490. {
  491. StringBuffer l;
  492. r.getLocalPath(l);
  493. if (streq(l, localFileName))
  494. continue; // don't add ourself
  495. }
  496. if (clusterNo == lastClusterNo)
  497. {
  498. numThisCluster++;
  499. if (numThisCluster > 2) // Don't add more than 2 from one cluster
  500. continue;
  501. }
  502. else
  503. {
  504. numThisCluster = 1;
  505. lastClusterNo = clusterNo;
  506. if (preferredClusters)
  507. {
  508. priority = getClusterPriority(clusterName);
  509. }
  510. else
  511. priority = copy;
  512. }
  513. if (priority >= 0)
  514. {
  515. ForEachItemIn(idx, priorities)
  516. {
  517. if (priorities.item(idx) < priority)
  518. break;
  519. }
  520. priorities.add(priority, idx);
  521. locations.add(path.str(), idx+initialSize);
  522. }
  523. }
  524. }
  525. //----------------------------------------------------------------------------------------------
  526. typedef StringArray *StringArrayPtr;
  527. class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
  528. {
  529. friend class CcdFileTest;
  530. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  531. InterruptableSemaphore toCopy;
  532. InterruptableSemaphore toClose;
  533. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  534. mutable CriticalSection crit;
  535. CriticalSection cpcrit;
  536. bool started;
  537. bool aborting;
  538. bool closing;
  539. bool testMode;
  540. bool closePending[2];
  541. StringAttrMapping fileErrorList;
  542. Semaphore bctStarted;
  543. Semaphore hctStarted;
  544. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true)
  545. {
  546. // Ensure that SockFile does not keep these sockets open (or we will run out)
  547. class AutoDisconnector
  548. {
  549. public:
  550. AutoDisconnector(IFile *_f, bool isEnabled) { f = isEnabled ? _f : NULL; };
  551. ~AutoDisconnector() { if (f) disconnectRemoteFile(f); }
  552. private:
  553. IFile *f;
  554. } autoDisconnector(f, autoDisconnect);
  555. offset_t fileSize = f->size();
  556. if (fileSize != (offset_t) -1)
  557. {
  558. // only check size if specified
  559. if ( (size != -1) && !isCompressed && fileSize != size) // MORE - should be able to do better on compressed you'da thunk
  560. return FileSizeMismatch;
  561. CDateTime mt;
  562. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  563. }
  564. else
  565. return FileNotFound;
  566. }
  567. ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
  568. IPartDescriptor *pdesc,
  569. const StringArray &remoteLocationInfo,
  570. offset_t size, const CDateTime &modified)
  571. {
  572. Owned<IFile> local = createIFile(localLocation);
  573. bool isCompressed = testMode ? false : pdesc->queryOwner().isCompressed();
  574. Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed);
  575. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, isCompressed);
  576. if (fileStatus == FileIsValid)
  577. {
  578. ret->addSource(local.getLink());
  579. ret->setRemote(false);
  580. }
  581. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  582. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  583. else
  584. {
  585. bool addedOne = false;
  586. // put the peerRoxieLocations next in the list
  587. StringArray localLocations;
  588. if (testMode)
  589. localLocations.append("test.buddy");
  590. else
  591. appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true); // Adds all locations on the same cluster
  592. ForEachItemIn(roxie_idx, localLocations)
  593. {
  594. try
  595. {
  596. const char *remoteName = localLocations.item(roxie_idx);
  597. Owned<IFile> remote = createIFile(remoteName);
  598. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  599. if (status==FileIsValid)
  600. {
  601. if (miscDebugTraceLevel > 5)
  602. DBGLOG("adding peer location %s", remoteName);
  603. ret->addSource(remote.getClear());
  604. addedOne = true;
  605. }
  606. else if (status==FileNotFound)
  607. {
  608. // Even though it's not on the buddy (yet), add it to the locations since it may well be there
  609. // by the time we come to copy (and if it is, we want to copy from there)
  610. if (miscDebugTraceLevel > 5)
  611. DBGLOG("adding missing peer location %s", remoteName);
  612. ret->addSource(remote.getClear());
  613. // Don't set addedOne - we need to go to remote too
  614. }
  615. else if (miscDebugTraceLevel > 10)
  616. DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
  617. }
  618. catch (IException *E)
  619. {
  620. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  621. E->Release();
  622. }
  623. }
  624. if (!addedOne && (copyResources || useRemoteResources || testMode)) // If no peer locations available, go to remote
  625. {
  626. ForEachItemIn(idx, remoteLocationInfo)
  627. {
  628. try
  629. {
  630. const char *remoteName = remoteLocationInfo.item(idx);
  631. Owned<IFile> remote = createIFile(remoteName);
  632. if (traceLevel > 5)
  633. DBGLOG("checking remote location %s", remoteName);
  634. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  635. if (status==FileIsValid)
  636. {
  637. if (miscDebugTraceLevel > 5)
  638. DBGLOG("adding remote location %s", remoteName);
  639. ret->addSource(remote.getClear());
  640. addedOne = true;
  641. }
  642. else if (miscDebugTraceLevel > 10)
  643. DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
  644. }
  645. catch (IException *E)
  646. {
  647. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  648. E->Release();
  649. }
  650. }
  651. }
  652. if (!addedOne)
  653. {
  654. if (local->exists()) // Implies local dali and local file out of sync
  655. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  656. else
  657. {
  658. if (traceLevel >= 2)
  659. {
  660. DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
  661. ForEachItemIn(local_idx, localLocations)
  662. {
  663. DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
  664. }
  665. DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
  666. ForEachItemIn(remote_idx, remoteLocationInfo)
  667. {
  668. DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
  669. }
  670. }
  671. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  672. }
  673. }
  674. ret->setRemote(true);
  675. }
  676. ret->setCache(this);
  677. files.setValue(localLocation, (ILazyFileIO *)ret);
  678. return ret.getClear();
  679. }
  680. void deleteTempFiles(const char *targetFilename)
  681. {
  682. try
  683. {
  684. StringBuffer destPath;
  685. StringBuffer prevTempFile;
  686. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  687. prevTempFile.append("*.$$$");
  688. Owned<IFile> dirf = createIFile(destPath.str());
  689. Owned<IDirectoryIterator> iter = dirf->directoryFiles(prevTempFile.str(),false,false);
  690. ForEach(*iter)
  691. {
  692. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  693. if (thisFile->isFile() == foundYes)
  694. thisFile->remove();
  695. }
  696. }
  697. catch(IException *E)
  698. {
  699. StringBuffer err;
  700. DBGLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  701. E->Release();
  702. }
  703. catch(...)
  704. {
  705. }
  706. }
  707. bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
  708. {
  709. bool fileCopied = false;
  710. IFile *sourceFile;
  711. try
  712. {
  713. f->setCopying(true);
  714. sourceFile = f->querySource();
  715. }
  716. catch (IException *E)
  717. {
  718. f->setCopying(false);
  719. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  720. throw;
  721. }
  722. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  723. deleteTempFiles(targetFilename);
  724. offset_t fileSize = sourceFile->size();
  725. if ( (fileSize + minFreeDiskSpace) > freeDiskSpace)
  726. {
  727. StringBuffer err;
  728. err.appendf("Insufficient disk space. File %s needs %" I64F "d bytes, but only %" I64F "d remains, and %" I64F "d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  729. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  730. EXCLOG(MCoperatorError, E);
  731. E->Release();
  732. f->setCopying(false);
  733. }
  734. else
  735. {
  736. IpSubNet subnet; // preferred set but not required
  737. IpAddress fromip; // returned
  738. Owned<IFile> destFile = createIFile(tempFile);
  739. bool hardLinkCreated = false;
  740. unsigned start = msTick();
  741. try
  742. {
  743. if (useHardLink)
  744. hardLinkCreated = f->createHardFileLink();
  745. if (hardLinkCreated)
  746. msg = "Hard Link";
  747. else
  748. {
  749. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  750. if (traceLevel > 5)
  751. {
  752. StringBuffer str;
  753. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  754. TimeSection timing(str.str());
  755. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  756. }
  757. else
  758. {
  759. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  760. }
  761. }
  762. f->setCopying(false);
  763. fileCopied = true;
  764. }
  765. catch(IException *E)
  766. {
  767. f->setCopying(false);
  768. EXCLOG(E, "Copy exception - remove templocal");
  769. destFile->remove();
  770. deleteTempFiles(targetFilename);
  771. throw;
  772. }
  773. catch(...)
  774. {
  775. f->setCopying(false);
  776. DBGLOG("%s exception - remove templocal", msg);
  777. destFile->remove();
  778. deleteTempFiles(targetFilename);
  779. throw;
  780. }
  781. if (!hardLinkCreated) // for hardlinks no rename needed
  782. {
  783. try
  784. {
  785. destFile->rename(targetFilename);
  786. }
  787. catch(IException *)
  788. {
  789. f->setCopying(false);
  790. deleteTempFiles(targetFilename);
  791. throw;
  792. }
  793. unsigned elapsed = msTick() - start;
  794. double sizeMB = ((double) fileSize) / (1024*1024);
  795. double MBperSec = elapsed ? (sizeMB / elapsed) * 1000 : 0;
  796. DBGLOG("%s to %s complete in %d ms (%.1f MB/sec)", msg, targetFilename, elapsed, MBperSec);
  797. }
  798. f->copyComplete();
  799. }
  800. deleteTempFiles(targetFilename);
  801. return fileCopied;
  802. }
  803. bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage, CFflags copyFlags=CFnone)
  804. {
  805. if (!f->isRemote())
  806. f->copyComplete();
  807. else
  808. {
  809. if (displayFirstFileMessage)
  810. DBGLOG("Received files to copy");
  811. const char *targetFilename = f->queryTarget()->queryFilename();
  812. StringBuffer tempFile(targetFilename);
  813. StringBuffer destPath;
  814. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  815. if (destPath.length())
  816. recursiveCreateDirectory(destPath.str());
  817. else
  818. destPath.append('.');
  819. if (!checkDirExists(destPath.str())) {
  820. ERRLOG("Dest directory %s does not exist", destPath.str());
  821. return false;
  822. }
  823. tempFile.append(".$$$");
  824. const char *msg = background ? "Background copy" : "Copy";
  825. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
  826. }
  827. return false; // if we get here there was no file copied
  828. }
  829. public:
  830. IMPLEMENT_IINTERFACE;
  831. CRoxieFileCache(bool _testMode = false) : bct(*this), hct(*this), testMode(_testMode)
  832. {
  833. aborting = false;
  834. closing = false;
  835. closePending[false] = false;
  836. closePending[true] = false;
  837. started = false;
  838. }
  839. ~CRoxieFileCache()
  840. {
  841. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  842. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  843. HashIterator h(files);
  844. ForEach(h)
  845. {
  846. ILazyFileIO *f = files.mapToValue(&h.query());
  847. f->removeCache(this);
  848. }
  849. }
  850. virtual void start()
  851. {
  852. if (!started)
  853. {
  854. bct.start();
  855. hct.start();
  856. bctStarted.wait();
  857. hctStarted.wait();
  858. started = true;
  859. }
  860. }
  861. class BackgroundCopyThread : public Thread
  862. {
  863. CRoxieFileCache &owner;
  864. public:
  865. BackgroundCopyThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheBackgroundCopyThread") {}
  866. virtual int run()
  867. {
  868. return owner.runBackgroundCopy();
  869. }
  870. } bct;
  871. class HandleCloserThread : public Thread
  872. {
  873. CRoxieFileCache &owner;
  874. public:
  875. HandleCloserThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheHandleCloserThread") {}
  876. virtual int run()
  877. {
  878. return owner.runHandleCloser();
  879. }
  880. } hct;
  881. int runBackgroundCopy()
  882. {
  883. bctStarted.signal();
  884. #if defined(__linux__) && defined(SYS_ioprio_set)
  885. if (backgroundCopyClass)
  886. syscall(SYS_ioprio_set, IOPRIO_WHO_PROCESS, 0, IOPRIO_PRIO_VALUE(backgroundCopyClass, backgroundCopyPrio));
  887. #endif
  888. if (traceLevel)
  889. {
  890. #if defined(__linux__) && defined(SYS_ioprio_get)
  891. int ioprio = syscall(SYS_ioprio_get, IOPRIO_WHO_PROCESS, 0);
  892. int ioclass = IOPRIO_PRIO_CLASS(ioprio);
  893. ioprio = IOPRIO_PRIO_DATA(ioprio);
  894. DBGLOG("Background copy thread %p starting, io priority class %d, priority %d", this, ioclass, ioprio);
  895. #else
  896. DBGLOG("Background copy thread %p starting", this);
  897. #endif
  898. }
  899. try
  900. {
  901. int fileCopiedCount = 0;
  902. bool fileCopied = false;
  903. for (;;)
  904. {
  905. fileCopied = false;
  906. Linked<ILazyFileIO> next;
  907. toCopy.wait();
  908. {
  909. CriticalBlock b(crit);
  910. if (closing)
  911. break;
  912. if (todo.ordinality())
  913. {
  914. ILazyFileIO *popped = &todo.popGet();
  915. if (popped->isAlive())
  916. {
  917. next.set(popped);
  918. }
  919. atomic_dec(&numFilesToProcess); // must decrement counter for SNMP accuracy
  920. }
  921. }
  922. if (next)
  923. {
  924. try
  925. {
  926. fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false, CFflush_rdwr);
  927. CriticalBlock b(crit);
  928. if (fileCopied)
  929. fileCopiedCount++;
  930. }
  931. catch (IException *E)
  932. {
  933. if (aborting)
  934. throw;
  935. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  936. E->Release();
  937. }
  938. catch (...)
  939. {
  940. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  941. }
  942. }
  943. CriticalBlock b(crit);
  944. if ( (todo.ordinality()== 0) && (fileCopiedCount)) // finished last copy
  945. {
  946. DBGLOG("No more data files to copy");
  947. fileCopiedCount = 0;
  948. }
  949. }
  950. }
  951. catch (IException *E)
  952. {
  953. if (!aborting)
  954. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  955. E->Release();
  956. }
  957. catch (...)
  958. {
  959. DBGLOG("Unknown exception in background copy thread");
  960. }
  961. if (traceLevel)
  962. DBGLOG("Background copy thread %p exiting", this);
  963. return 0;
  964. }
  965. int runHandleCloser()
  966. {
  967. hctStarted.signal();
  968. if (traceLevel)
  969. DBGLOG("HandleCloser thread %p starting", this);
  970. try
  971. {
  972. for (;;)
  973. {
  974. toClose.wait(10 * 60 * 1000); // check expired file handles every 10 minutes
  975. if (closing)
  976. break;
  977. doCloseExpired(true);
  978. doCloseExpired(false);
  979. }
  980. }
  981. catch (IException *E)
  982. {
  983. if (!aborting)
  984. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  985. E->Release();
  986. }
  987. catch (...)
  988. {
  989. DBGLOG("Unknown exception in handle closer thread");
  990. }
  991. if (traceLevel)
  992. DBGLOG("Handle closer thread %p exiting", this);
  993. return 0;
  994. }
  995. virtual void join(unsigned timeout=INFINITE)
  996. {
  997. aborting = true;
  998. if (started)
  999. {
  1000. toCopy.interrupt();
  1001. toClose.interrupt();
  1002. bct.join(timeout);
  1003. hct.join(timeout);
  1004. }
  1005. }
  1006. virtual void wait()
  1007. {
  1008. closing = true;
  1009. if (started)
  1010. {
  1011. toCopy.signal();
  1012. toClose.signal();
  1013. bct.join();
  1014. hct.join();
  1015. }
  1016. }
  1017. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  1018. {
  1019. return aborting ? CFPcancel : CFPcontinue;
  1020. }
  1021. virtual void removeCache(ILazyFileIO *file) const
  1022. {
  1023. CriticalBlock b(crit);
  1024. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1025. // So only remove from hash table if what we find there matches the item that is being deleted.
  1026. const char *filename = file->queryFilename();
  1027. ILazyFileIO *goer = files.getValue(filename);
  1028. if (goer == file)
  1029. files.remove(filename);
  1030. ForEachItemInRev(idx, todo)
  1031. {
  1032. if (file == &todo.item(idx))
  1033. {
  1034. todo.remove(idx);
  1035. atomic_dec(&numFilesToProcess); // must decrement counter for SNMP accuracy
  1036. }
  1037. }
  1038. }
  1039. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  1040. IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
  1041. const StringArray &deployedLocationInfo, bool startFileCopy)
  1042. {
  1043. IPropertyTree &partProps = pdesc->queryProperties();
  1044. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  1045. bool local = partProps.getPropBool("@local");
  1046. CDateTime dfsDate;
  1047. if (checkFileDate)
  1048. {
  1049. const char *dateStr = partProps.queryProp("@modified");
  1050. dfsDate.setString(dateStr);
  1051. }
  1052. unsigned partNo = pdesc->queryPartIndex() + 1;
  1053. StringBuffer localLocation;
  1054. if (local)
  1055. {
  1056. assertex(partNo==1 && numParts==1);
  1057. localLocation.append(lfn); // any resolution done earlier
  1058. }
  1059. else
  1060. {
  1061. // MORE - not at all sure about this. Foreign files should stay foreign ?
  1062. CDfsLogicalFileName dlfn;
  1063. dlfn.set(lfn);
  1064. if (dlfn.isForeign())
  1065. dlfn.clearForeign();
  1066. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault);
  1067. }
  1068. Owned<ILazyFileIO> ret;
  1069. try
  1070. {
  1071. CriticalBlock b(crit);
  1072. Linked<ILazyFileIO> f = files.getValue(localLocation);
  1073. if (f && f->isAlive())
  1074. {
  1075. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1076. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1077. {
  1078. releaseSlaveDynamicFileCache(); // Slave dynamic file cache or...
  1079. if (fileType == ROXIE_KEY) // ...jhtree cache can keep files active and thus prevent us from loading a new version
  1080. clearKeyStoreCacheEntry(f); // Will release iff that is the only link
  1081. f.clear(); // Note - needs to be done before calling getValue() again, hence the need to make it separate from the f.set below
  1082. f.set(files.getValue(localLocation));
  1083. if (f) // May have been cleared above...
  1084. {
  1085. StringBuffer modifiedDt;
  1086. if (!dfsDate.isNull())
  1087. dfsDate.getString(modifiedDt);
  1088. StringBuffer fileDt;
  1089. f->queryDateTime()->getString(fileDt);
  1090. if (fileErrorList.find(lfn) == 0)
  1091. {
  1092. switch (fileType)
  1093. {
  1094. case ROXIE_KEY:
  1095. fileErrorList.setValue(lfn, "Key");
  1096. break;
  1097. case ROXIE_FILE:
  1098. fileErrorList.setValue(lfn, "File");
  1099. break;
  1100. }
  1101. }
  1102. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %" I64F "d %" I64F "d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1103. }
  1104. }
  1105. else
  1106. return f.getClear();
  1107. }
  1108. ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
  1109. if (startFileCopy)
  1110. {
  1111. if (ret->isRemote())
  1112. {
  1113. if (copyResources) // MORE - should always copy peer files
  1114. {
  1115. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1116. {
  1117. ret->checkOpen();
  1118. doCopy(ret, false, false, CFflush_rdwr);
  1119. return ret.getLink();
  1120. }
  1121. // Copies are popped from end of the todo list
  1122. // By putting the replicates on the front we ensure they are done after the primaries
  1123. // and are therefore likely to result in local rather than remote copies.
  1124. if (replicationLevel)
  1125. todo.add(*ret, 0);
  1126. else
  1127. todo.append(*ret);
  1128. atomic_inc(&numFilesToProcess); // must increment counter for SNMP accuracy
  1129. toCopy.signal();
  1130. }
  1131. }
  1132. }
  1133. if (!lazyOpen)
  1134. ret->checkOpen();
  1135. }
  1136. catch(IException *e)
  1137. {
  1138. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1139. {
  1140. if (fileErrorList.find(lfn) == 0)
  1141. {
  1142. switch (fileType)
  1143. {
  1144. case ROXIE_KEY:
  1145. fileErrorList.setValue(lfn, "Key");
  1146. break;
  1147. case ROXIE_FILE:
  1148. fileErrorList.setValue(lfn, "File");
  1149. break;
  1150. }
  1151. }
  1152. }
  1153. throw;
  1154. }
  1155. return ret.getLink();
  1156. }
  1157. virtual void closeExpired(bool remote)
  1158. {
  1159. // This schedules a close at the next available opportunity
  1160. CriticalBlock b(cpcrit); // paranoid...
  1161. if (!closePending[remote])
  1162. {
  1163. closePending[remote] = true;
  1164. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) atomic_read(&numFilesOpen[remote]));
  1165. toClose.signal();
  1166. }
  1167. }
  1168. void doCloseExpired(bool remote)
  1169. {
  1170. {
  1171. CriticalBlock b(cpcrit); // paranoid...
  1172. closePending[remote] = false;
  1173. }
  1174. CriticalBlock b(crit);
  1175. ICopyArrayOf<ILazyFileIO> goers;
  1176. HashIterator h(files);
  1177. ForEach(h)
  1178. {
  1179. ILazyFileIO *f = files.mapToValue(&h.query());
  1180. if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1181. {
  1182. unsigned age = msTick() - f->getLastAccessed();
  1183. if (age > maxFileAge[remote])
  1184. {
  1185. if (traceLevel > 5)
  1186. {
  1187. // NOTE - querySource will cause the file to be opened if not already open
  1188. // That's OK here, since we know the file is open and remote.
  1189. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1190. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1191. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1192. }
  1193. f->close();
  1194. }
  1195. else
  1196. goers.append(*f);
  1197. }
  1198. }
  1199. unsigned numFilesLeft = goers.ordinality();
  1200. if (numFilesLeft > maxFilesOpen[remote])
  1201. {
  1202. goers.sort(CRoxieLazyFileIO::compareAccess);
  1203. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1204. unsigned idx = minFilesOpen[remote];
  1205. while (idx < numFilesLeft)
  1206. {
  1207. ILazyFileIO &f = goers.item(idx++);
  1208. if (!f.isCopying())
  1209. {
  1210. if (traceLevel > 5)
  1211. {
  1212. unsigned age = msTick() - f.getLastAccessed();
  1213. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1214. }
  1215. f.close();
  1216. }
  1217. }
  1218. }
  1219. }
  1220. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1221. {
  1222. Owned<IFile> dirf = createIFile(directory);
  1223. if (dirf->exists() && dirf->isDirectory())
  1224. {
  1225. try
  1226. {
  1227. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1228. ForEach(*iter)
  1229. {
  1230. const char *thisName = iter->query().queryFilename();
  1231. flushUnusedDirectories(origBaseDir, thisName, xml);
  1232. }
  1233. if (stricmp(origBaseDir, directory) != 0)
  1234. {
  1235. try
  1236. {
  1237. dirf->remove();
  1238. xml.appendf("<Directory>%s</Directory>\n", directory);
  1239. DBGLOG("Deleted directory %s", directory);
  1240. }
  1241. catch (IException *e)
  1242. {
  1243. // don't care if we can't delete the directory
  1244. e->Release();
  1245. }
  1246. catch(...)
  1247. {
  1248. // don't care if we can't delete the directory
  1249. }
  1250. }
  1251. }
  1252. catch (IException *e)
  1253. {
  1254. // don't care if we can't delete the directory
  1255. e->Release();
  1256. }
  1257. catch(...)
  1258. {
  1259. // don't care if we can't delete the directory
  1260. }
  1261. }
  1262. }
  1263. int numFilesToCopy()
  1264. {
  1265. CriticalBlock b(crit);
  1266. return todo.ordinality();
  1267. }
  1268. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1269. static inline bool validFNameChar(char c)
  1270. {
  1271. static const char *invalids = "*\"/:<>?\\|";
  1272. return (c>=32 && c<127 && !strchr(invalids, c));
  1273. }
  1274. };
  1275. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  1276. {
  1277. StringArray remoteLocations;
  1278. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  1279. if (peerCluster)
  1280. {
  1281. if (*peerCluster!='-') // a remote cluster was specified explicitly
  1282. appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true); // Add only from specified cluster
  1283. }
  1284. else
  1285. appendRemoteLocations(pdesc, remoteLocations, NULL, roxieName, false); // Add from any cluster on same dali, other than mine
  1286. if (remotePDesc)
  1287. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false); // Then any remote on remote dali
  1288. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, replicationLevel[channel], remoteLocations, startCopy);
  1289. }
  1290. //====================================================================================================
  1291. class CFilePartMap : implements IFilePartMap, public CInterface
  1292. {
  1293. class FilePartMapElement
  1294. {
  1295. public:
  1296. offset_t base;
  1297. offset_t top;
  1298. inline int compare(offset_t offset)
  1299. {
  1300. if (offset < base)
  1301. return -1;
  1302. else if (offset >= top)
  1303. return 1;
  1304. else
  1305. return 0;
  1306. }
  1307. } *map;
  1308. static int compareParts(const void *l, const void *r)
  1309. {
  1310. offset_t lp = * (offset_t *) l;
  1311. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  1312. return thisPart->compare(lp);
  1313. }
  1314. unsigned numParts;
  1315. offset_t recordCount;
  1316. offset_t totalSize;
  1317. StringAttr fileName;
  1318. public:
  1319. IMPLEMENT_IINTERFACE;
  1320. CFilePartMap(IPropertyTree &resource)
  1321. {
  1322. fileName.set(resource.queryProp("@id"));
  1323. numParts = resource.getPropInt("@numparts");
  1324. recordCount = resource.getPropInt64("@recordCount");
  1325. totalSize = resource.getPropInt64("@size");
  1326. assertex(numParts);
  1327. map = new FilePartMapElement[numParts];
  1328. for (unsigned i = 0; i < numParts; i++)
  1329. {
  1330. StringBuffer partPath;
  1331. partPath.appendf("Part[@num='%d']", i+1);
  1332. IPropertyTree *part = resource.queryPropTree(partPath.str());
  1333. if (!part)
  1334. {
  1335. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  1336. part = resource.queryPropTree(partPath.str());
  1337. }
  1338. assertex(part);
  1339. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  1340. assertex(size != (unsigned __int64) -1);
  1341. map[i].base = i ? map[i-1].top : 0;
  1342. map[i].top = map[i].base + size;
  1343. }
  1344. if (totalSize == (offset_t)-1)
  1345. totalSize = map[numParts-1].top;
  1346. else if (totalSize != map[numParts-1].top)
  1347. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  1348. }
  1349. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  1350. : fileName(_fileName)
  1351. {
  1352. numParts = fdesc.numParts();
  1353. IPropertyTree &props = fdesc.queryProperties();
  1354. recordCount = props.getPropInt64("@recordCount", -1);
  1355. totalSize = props.getPropInt64("@size", -1);
  1356. assertex(numParts);
  1357. map = new FilePartMapElement[numParts];
  1358. for (unsigned i = 0; i < numParts; i++)
  1359. {
  1360. IPartDescriptor &part = *fdesc.queryPart(i);
  1361. IPropertyTree &partProps = part.queryProperties();
  1362. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  1363. map[i].base = i ? map[i-1].top : 0;
  1364. if (size==(unsigned __int64) -1)
  1365. {
  1366. if (i==numParts-1)
  1367. map[i].top = (unsigned __int64) -1;
  1368. else
  1369. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file sizes not known for file %s", fileName.get());
  1370. }
  1371. else
  1372. map[i].top = map[i].base + size;
  1373. }
  1374. if (totalSize == (offset_t)-1)
  1375. totalSize = map[numParts-1].top;
  1376. else if (totalSize != map[numParts-1].top)
  1377. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  1378. }
  1379. ~CFilePartMap()
  1380. {
  1381. delete [] map;
  1382. }
  1383. virtual bool IsShared() const { return CInterface::IsShared(); };
  1384. virtual unsigned mapOffset(offset_t pos) const
  1385. {
  1386. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  1387. if (!part)
  1388. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %" I64F "d in file %s out of range (max offset permitted is %" I64F "d)", pos, fileName.str(), totalSize);
  1389. return (part-map)+1;
  1390. }
  1391. virtual unsigned getNumParts() const
  1392. {
  1393. return numParts;
  1394. }
  1395. virtual offset_t getTotalSize() const
  1396. {
  1397. return totalSize;
  1398. }
  1399. virtual offset_t getRecordCount() const
  1400. {
  1401. return recordCount;
  1402. }
  1403. virtual offset_t getBase(unsigned part) const
  1404. {
  1405. if (part > numParts || part == 0)
  1406. {
  1407. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existant file part %d (valid are 1-%d)", part, numParts);
  1408. }
  1409. return map[part-1].base;
  1410. }
  1411. virtual offset_t getFileSize() const
  1412. {
  1413. return map[numParts-1].top;
  1414. }
  1415. };
  1416. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  1417. {
  1418. return new CFilePartMap(fileName, fdesc);
  1419. }
  1420. //====================================================================================================
  1421. class CFileIOArray : implements IFileIOArray, public CInterface
  1422. {
  1423. mutable CriticalSection crit;
  1424. mutable unsigned __int64 totalSize = (unsigned __int64) -1; // Calculated on demand, and cached
  1425. mutable StringAttr id; // Calculated on demand, and cached
  1426. IPointerArrayOf<IFileIO> files;
  1427. UnsignedArray subfiles;
  1428. StringArray filenames;
  1429. Int64Array bases;
  1430. int actualCrc = 0;
  1431. unsigned valid = 0;
  1432. bool multipleFormatsSeen = false;
  1433. void _getId() const
  1434. {
  1435. md5_state_t md5;
  1436. md5_byte_t digest[16];
  1437. md5_init(&md5);
  1438. ForEachItemIn(idx, files)
  1439. {
  1440. IFileIO *file = files.item(idx);
  1441. if (file)
  1442. {
  1443. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  1444. }
  1445. }
  1446. md5_finish(&md5, digest);
  1447. char digestStr[33];
  1448. for (int i = 0; i < 16; i++)
  1449. {
  1450. sprintf(&digestStr[i*2],"%02x", digest[i]);
  1451. }
  1452. id.set(digestStr, 32);
  1453. }
  1454. public:
  1455. IMPLEMENT_IINTERFACE;
  1456. virtual bool IsShared() const { return CInterface::IsShared(); };
  1457. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base) const override
  1458. {
  1459. if (!files.isItem(partNo))
  1460. {
  1461. DBGLOG("getFilePart requested invalid part %d", partNo);
  1462. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  1463. }
  1464. IFileIO *file = files.item(partNo);
  1465. if (!file)
  1466. {
  1467. base = 0;
  1468. return NULL;
  1469. }
  1470. base = bases.item(partNo);
  1471. return LINK(file);
  1472. }
  1473. virtual const char *queryLogicalFilename(unsigned partNo) const override
  1474. {
  1475. if (!filenames.isItem(partNo))
  1476. {
  1477. DBGLOG("queryLogicalFilename requested invalid part %d", partNo);
  1478. throw MakeStringException(ROXIE_FILE_ERROR, "queryLogicalFilename requested invalid part %d", partNo);
  1479. }
  1480. return filenames.item(partNo);
  1481. }
  1482. void addFile(IFileIO *f, offset_t base, unsigned subfile, const char *filename, int _actualCrc)
  1483. {
  1484. if (f)
  1485. valid++;
  1486. files.append(f);
  1487. bases.append(base);
  1488. if (_actualCrc)
  1489. {
  1490. if (actualCrc && actualCrc != _actualCrc)
  1491. multipleFormatsSeen = true;
  1492. else
  1493. actualCrc = _actualCrc;
  1494. }
  1495. // MORE - lots of duplication in subfiles and filenames arrays
  1496. subfiles.append(subfile);
  1497. filenames.append(filename ? filename : "");
  1498. }
  1499. virtual unsigned length() const override
  1500. {
  1501. return files.length();
  1502. }
  1503. virtual unsigned numValid() const override
  1504. {
  1505. return valid;
  1506. }
  1507. virtual int queryActualFormatCrc() const override
  1508. {
  1509. return actualCrc;
  1510. }
  1511. virtual bool allFormatsMatch() const override
  1512. {
  1513. return !multipleFormatsSeen;
  1514. }
  1515. virtual bool isValid(unsigned partNo) const override
  1516. {
  1517. if (!files.isItem(partNo))
  1518. return false;
  1519. IFileIO *file = files.item(partNo);
  1520. if (!file)
  1521. return false;
  1522. return true;
  1523. }
  1524. virtual unsigned __int64 size() const override
  1525. {
  1526. CriticalBlock b(crit);
  1527. if (totalSize == (unsigned __int64) -1)
  1528. {
  1529. totalSize = 0;
  1530. ForEachItemIn(idx, files)
  1531. {
  1532. IFileIO *file = files.item(idx);
  1533. if (file)
  1534. totalSize += file->size();
  1535. }
  1536. }
  1537. return totalSize;
  1538. }
  1539. virtual StringBuffer &getId(StringBuffer &ret) const override
  1540. {
  1541. CriticalBlock b(crit);
  1542. if (!id)
  1543. _getId();
  1544. return ret.append(id);
  1545. }
  1546. virtual unsigned getSubFile(unsigned partNo) const override
  1547. {
  1548. return subfiles.item(partNo);
  1549. }
  1550. };
  1551. class CTranslatorSet : implements CInterfaceOf<ITranslatorSet>
  1552. {
  1553. IConstPointerArrayOf<IDynamicTransform> transformers;
  1554. IConstPointerArrayOf<IKeyTranslator> keyTranslators;
  1555. IPointerArrayOf<IOutputMetaData> actualLayouts;
  1556. const RtlRecord &targetLayout;
  1557. int targetFormatCrc = 0;
  1558. bool anyTranslators = false;
  1559. public:
  1560. CTranslatorSet(const RtlRecord &_targetLayout, int _targetFormatCrc)
  1561. : targetLayout(_targetLayout), targetFormatCrc(_targetFormatCrc)
  1562. {}
  1563. void addTranslator(const IDynamicTransform *translator, const IKeyTranslator *keyTranslator, IOutputMetaData *actualLayout)
  1564. {
  1565. assertex(actualLayout);
  1566. if (translator || keyTranslator)
  1567. anyTranslators = true;
  1568. transformers.append(translator);
  1569. keyTranslators.append(keyTranslator);
  1570. actualLayouts.append(actualLayout);
  1571. }
  1572. virtual const RtlRecord &queryTargetFormat() const override
  1573. {
  1574. return targetLayout;
  1575. }
  1576. virtual int queryTargetFormatCrc() const override
  1577. {
  1578. return targetFormatCrc;
  1579. }
  1580. virtual const IDynamicTransform *queryTranslator(unsigned subFile) const override
  1581. {
  1582. // We need to have translated partnos to subfiles before calling this!
  1583. // Note: while the required projected format will be the same for all parts, the
  1584. // actual layout - and thus the required translation - may not be, for example if
  1585. // we have a superfile with mismatching formats.
  1586. if (anyTranslators && transformers.isItem(subFile))
  1587. return transformers.item(subFile);
  1588. return nullptr;
  1589. }
  1590. virtual const IKeyTranslator *queryKeyTranslator(unsigned subFile) const override
  1591. {
  1592. if (anyTranslators && keyTranslators.isItem(subFile))
  1593. return keyTranslators.item(subFile);
  1594. return nullptr;
  1595. }
  1596. virtual ISourceRowPrefetcher *getPrefetcher(unsigned subFile) const override
  1597. {
  1598. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  1599. assertex(actualLayout);
  1600. return actualLayout->createDiskPrefetcher();
  1601. }
  1602. virtual IOutputMetaData *queryActualLayout(unsigned subFile) const override
  1603. {
  1604. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  1605. assertex(actualLayout);
  1606. return actualLayout;
  1607. }
  1608. virtual bool isTranslating() const override
  1609. {
  1610. return anyTranslators;
  1611. }
  1612. };
  1613. template <class X> class PerChannelCacheOf
  1614. {
  1615. IPointerArrayOf<X> cache;
  1616. IntArray channels;
  1617. public:
  1618. // NOTE - typically only a couple of entries (but see PerFormatCacheOf below
  1619. void set(X *value, unsigned channel)
  1620. {
  1621. cache.append(value);
  1622. channels.append(channel);
  1623. }
  1624. X *get(unsigned channel) const
  1625. {
  1626. ForEachItemIn(idx, channels)
  1627. {
  1628. if (channels.item(idx)==channel)
  1629. return cache.item(idx);
  1630. }
  1631. return NULL;
  1632. }
  1633. };
  1634. template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
  1635. {
  1636. // Identical for now, but characteristics are different so implementations may diverge.
  1637. // For example, this one may want to be a hash table, and there may be many more entries
  1638. };
  1639. CRoxieFileCache * fileCache;
  1640. class CResolvedFile : implements IResolvedFileCreator, implements ISDSSubscription, public CInterface
  1641. {
  1642. protected:
  1643. IResolvedFileCache *cached;
  1644. StringAttr lfn;
  1645. StringAttr physicalName;
  1646. Owned<IDistributedFile> dFile; // NULL on copies serialized to slaves. Note that this implies we keep a lock on dali file for the lifetime of this object.
  1647. CDateTime fileTimeStamp;
  1648. offset_t fileSize;
  1649. unsigned fileCheckSum;
  1650. RoxieFileType fileType;
  1651. bool isSuper;
  1652. StringArray subNames;
  1653. IPointerArrayOf<IFileDescriptor> subFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1654. IPointerArrayOf<IFileDescriptor> remoteSubFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1655. IntArray formatCrcs;
  1656. IPointerArrayOf<IOutputMetaData> diskTypeInfo; // New info using RtlTypeInfo structures
  1657. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  1658. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  1659. Owned <IPropertyTree> properties;
  1660. Linked<IRoxieDaliHelper> daliHelper;
  1661. Owned<IDaliPackageWatcher> notifier;
  1662. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  1663. {
  1664. subNames.append(subName);
  1665. subFiles.append(fdesc);
  1666. remoteSubFiles.append(remoteFDesc);
  1667. IPropertyTree const & props = fdesc->queryProperties();
  1668. // NOTE - grouping is not included in the formatCRC, nor is the trailing byte that indicates grouping
  1669. // included in the rtlTypeInfo.
  1670. bool isGrouped = props.getPropBool("@grouped", false);
  1671. int formatCrc = props.getPropInt("@formatCrc", 0);
  1672. // If formatCrc and grouping are same as previous, reuse previous typeInfo
  1673. Owned<IOutputMetaData> actualFormat;
  1674. unsigned prevIdx = formatCrcs.length()-1;
  1675. if (formatCrcs.length() && formatCrc == formatCrcs.item(prevIdx) &&
  1676. diskTypeInfo.item(prevIdx) && isGrouped==diskTypeInfo.item(prevIdx)->isGrouped())
  1677. actualFormat.set(diskTypeInfo.item(prevIdx));
  1678. else
  1679. actualFormat.setown(getDaliLayoutInfo(props));
  1680. diskTypeInfo.append(actualFormat.getClear());
  1681. formatCrcs.append(formatCrc);
  1682. unsigned numParts = fdesc->numParts();
  1683. offset_t base = 0;
  1684. for (unsigned i = 0; i < numParts; i++)
  1685. {
  1686. IPartDescriptor *pdesc = fdesc->queryPart(i);
  1687. IPropertyTree &partProps = pdesc->queryProperties();
  1688. offset_t dfsSize = partProps.getPropInt64("@size");
  1689. partProps.setPropInt64("@offset", base);
  1690. base += dfsSize;
  1691. }
  1692. fileSize += base;
  1693. }
  1694. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1695. {
  1696. if (traceLevel > 2)
  1697. DBGLOG("Superfile %s change detected", lfn.get());
  1698. {
  1699. CriticalBlock b(lock);
  1700. if (cached)
  1701. {
  1702. cached->removeCache(this);
  1703. cached = NULL;
  1704. }
  1705. }
  1706. globalPackageSetManager->requestReload(false, false);
  1707. }
  1708. // We cache all the file maps/arrays etc here.
  1709. mutable CriticalSection lock;
  1710. mutable Owned<IFilePartMap> fileMap;
  1711. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  1712. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  1713. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  1714. public:
  1715. IMPLEMENT_IINTERFACE;
  1716. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* _daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
  1717. : daliHelper(_daliHelper), lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
  1718. {
  1719. cached = NULL;
  1720. fileSize = 0;
  1721. fileCheckSum = 0;
  1722. if (dFile)
  1723. {
  1724. if (traceLevel > 5)
  1725. DBGLOG("Roxie server adding information for file %s", lfn.get());
  1726. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  1727. bool csSet = dFile->getFileCheckSum(fileCheckSum);
  1728. assertex(tsSet); // per Nigel, is always set
  1729. IDistributedSuperFile *superFile = dFile->querySuperFile();
  1730. if (superFile)
  1731. {
  1732. isSuper = true;
  1733. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  1734. ForEach(*subs)
  1735. {
  1736. IDistributedFile &sub = subs->query();
  1737. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  1738. Owned<IFileDescriptor> remoteFDesc;
  1739. if (daliHelper)
  1740. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt));
  1741. subDFiles.append(OLINK(sub));
  1742. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1743. }
  1744. // We have to clone the properties since we don't want to keep the superfile locked
  1745. properties.setown(createPTreeFromIPT(&dFile->queryAttributes(), ipt_lowmem));
  1746. if (!isDynamic && !lockSuperFiles)
  1747. {
  1748. notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
  1749. dFile.clear(); // We don't lock superfiles, except dynamic ones
  1750. }
  1751. }
  1752. else // normal file, not superkey
  1753. {
  1754. isSuper = false;
  1755. properties.set(&dFile->queryAttributes());
  1756. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  1757. Owned<IFileDescriptor> remoteFDesc;
  1758. if (daliHelper)
  1759. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
  1760. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1761. }
  1762. }
  1763. }
  1764. virtual void beforeDispose()
  1765. {
  1766. if (notifier)
  1767. daliHelper->releaseSubscription(notifier);
  1768. notifier.clear();
  1769. if (cached)
  1770. {
  1771. cached->removeCache(this);
  1772. }
  1773. }
  1774. virtual unsigned numSubFiles() const
  1775. {
  1776. return subNames.length();
  1777. }
  1778. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  1779. {
  1780. if (subNames.isItem(num))
  1781. {
  1782. name.append(subNames.item(num));
  1783. return true;
  1784. }
  1785. else
  1786. {
  1787. return false;
  1788. }
  1789. }
  1790. virtual unsigned findSubName(const char *subname) const
  1791. {
  1792. ForEachItemIn(idx, subNames)
  1793. {
  1794. if (stricmp(subNames.item(idx), subname))
  1795. return idx;
  1796. }
  1797. return NotFound;
  1798. }
  1799. virtual unsigned getContents(StringArray &contents) const
  1800. {
  1801. ForEachItemIn(idx, subNames)
  1802. {
  1803. contents.append(subNames.item(idx));
  1804. }
  1805. return subNames.length();
  1806. }
  1807. virtual bool isSuperFile() const
  1808. {
  1809. return isSuper;
  1810. }
  1811. virtual bool isKey() const
  1812. {
  1813. return fileType==ROXIE_KEY;
  1814. }
  1815. virtual IFilePartMap *getFileMap() const
  1816. {
  1817. CriticalBlock b(lock);
  1818. if (!fileMap)
  1819. {
  1820. if (subFiles.length())
  1821. {
  1822. if (subFiles.length()!=1)
  1823. throw MakeStringException(0, "Roxie does not support FETCH or KEYED JOIN to superkey with multiple parts");
  1824. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  1825. }
  1826. }
  1827. return fileMap.getLink();
  1828. }
  1829. virtual unsigned getNumParts() const
  1830. {
  1831. CriticalBlock b(lock);
  1832. unsigned numParts = 0;
  1833. ForEachItemIn(idx, subFiles)
  1834. {
  1835. unsigned thisNumParts = subFiles.item(idx)->numParts();
  1836. if (thisNumParts > numParts)
  1837. numParts = thisNumParts;
  1838. }
  1839. return numParts;
  1840. }
  1841. bool serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  1842. {
  1843. // Find all the partno's that go to this channel
  1844. unsigned numParts = fdesc->numParts();
  1845. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  1846. numParts--; // don't want to send TLK
  1847. UnsignedArray partNos;
  1848. for (unsigned i = 1; i <= numParts; i++)
  1849. {
  1850. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1851. if (getBondedChannel(i)==channel || !isLocal)
  1852. {
  1853. partNos.append(i-1);
  1854. }
  1855. }
  1856. fdesc->serializeParts(mb, partNos);
  1857. return partNos.length();
  1858. }
  1859. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const override
  1860. {
  1861. if (traceLevel > 6)
  1862. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  1863. byte type = (byte) fileType;
  1864. mb.append(type);
  1865. fileTimeStamp.serialize(mb);
  1866. mb.append(fileCheckSum);
  1867. mb.append(fileSize);
  1868. unsigned numSubFiles = subFiles.length();
  1869. mb.append(numSubFiles);
  1870. ForEachItemIn(idx, subFiles)
  1871. {
  1872. mb.append(subNames.item(idx));
  1873. IFileDescriptor *fdesc = subFiles.item(idx);
  1874. bool anyparts = serializeFDesc(mb, fdesc, channel, isLocal);
  1875. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1876. if (remoteFDesc)
  1877. {
  1878. mb.append(true);
  1879. anyparts |= serializeFDesc(mb, remoteFDesc, channel, isLocal);
  1880. }
  1881. else
  1882. mb.append(false);
  1883. mb.append(formatCrcs.item(idx));
  1884. IOutputMetaData *diskType = diskTypeInfo.item(idx);
  1885. if (anyparts && diskType)
  1886. {
  1887. if (idx && formatCrcs.item(idx)==formatCrcs.item(idx-1))
  1888. mb.append((byte) 3); // indicating same format as previous
  1889. else
  1890. {
  1891. mb.append((byte) (diskType->isGrouped() ? 2 : 1));
  1892. verifyex(dumpTypeInfo(mb, diskType->queryTypeInfo())); // Must be serializable, as we deserialized it...
  1893. }
  1894. }
  1895. else
  1896. mb.append((byte) 0);
  1897. }
  1898. if (properties)
  1899. {
  1900. mb.append(true);
  1901. properties->serialize(mb);
  1902. }
  1903. else
  1904. mb.append(false);
  1905. }
  1906. virtual ITranslatorSet *getTranslators(int formatCrc, IOutputMetaData *projected, IOutputMetaData *expected, RecordTranslationMode mode, bool isIndex) const override
  1907. {
  1908. // NOTE - projected and expected and anything fetched from them such as type info may reside in dynamically loaded (and unloaded)
  1909. // query DLLs - this means it is not safe to include them in any sort of cache that might outlive the current query.
  1910. Owned<CTranslatorSet> result = new CTranslatorSet(expected->queryRecordAccessor(true), formatCrc);
  1911. Owned<const IDynamicTransform> translator; // Translates rows from actual to projected
  1912. Owned<const IKeyTranslator> keyedTranslator; // translate filter conditions from expected to actual
  1913. int prevFormatCrc = 0;
  1914. assertex(projected != nullptr);
  1915. ForEachItemIn(idx, subFiles)
  1916. {
  1917. IOutputMetaData *actual = expected;
  1918. if (formatCrc)
  1919. {
  1920. const char *subname = subNames.item(idx);
  1921. int thisFormatCrc = formatCrcs.item(idx);
  1922. if (mode == RecordTranslationMode::AlwaysDisk && diskTypeInfo.item(idx))
  1923. {
  1924. actual = diskTypeInfo.item(idx);
  1925. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), actual->queryRecordAccessor(true)));
  1926. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  1927. if (!translator->canTranslate())
  1928. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  1929. }
  1930. else if (mode == RecordTranslationMode::AlwaysECL)
  1931. {
  1932. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  1933. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  1934. if (!translator->canTranslate())
  1935. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  1936. }
  1937. else if (!thisFormatCrc || thisFormatCrc==formatCrc)
  1938. translator.clear();
  1939. else
  1940. {
  1941. actual = diskTypeInfo.item(idx);
  1942. if (thisFormatCrc != prevFormatCrc) // Check if same translation as last subfile
  1943. {
  1944. translator.clear();
  1945. keyedTranslator.clear();
  1946. if (actual)
  1947. {
  1948. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), actual->queryRecordAccessor(true)));
  1949. // translator->describe();
  1950. }
  1951. if (!translator || !translator->canTranslate())
  1952. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  1953. if (translator->needsTranslate())
  1954. {
  1955. if (mode == RecordTranslationMode::None)
  1956. throw MakeStringException(ROXIE_MISMATCH, "Translatable record layout mismatch detected for file %s, but translation disabled", subname);
  1957. if (isIndex && translator->keyedTranslated())
  1958. throw MakeStringException(ROXIE_MISMATCH, "Record layout mismatch detected in keyed fields for file %s", subname);
  1959. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  1960. }
  1961. }
  1962. }
  1963. prevFormatCrc = thisFormatCrc;
  1964. }
  1965. result->addTranslator(LINK(translator), LINK(keyedTranslator), LINK(actual));
  1966. }
  1967. return result.getClear();
  1968. }
  1969. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  1970. {
  1971. CriticalBlock b(lock);
  1972. IFileIOArray *ret = ioArrayMap.get(channel);
  1973. if (!ret)
  1974. {
  1975. ret = createIFileIOArray(isOpt, channel);
  1976. ioArrayMap.set(ret, channel);
  1977. }
  1978. return LINK(ret);
  1979. }
  1980. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  1981. {
  1982. Owned<CFileIOArray> f = new CFileIOArray;
  1983. f->addFile(nullptr, 0, 0, nullptr, 0);
  1984. ForEachItemIn(idx, subFiles)
  1985. {
  1986. IFileDescriptor *fdesc = subFiles.item(idx);
  1987. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1988. const char *subname = subNames.item(idx);
  1989. int thisFormatCrc = formatCrcs.item(idx);
  1990. if (fdesc)
  1991. {
  1992. unsigned numParts = fdesc->numParts();
  1993. for (unsigned i = 1; i <= numParts; i++)
  1994. {
  1995. if (!channel || getBondedChannel(i)==channel)
  1996. {
  1997. try
  1998. {
  1999. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  2000. assertex(pdesc);
  2001. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  2002. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  2003. IPropertyTree &partProps = pdesc->queryProperties();
  2004. f->addFile(file.getClear(), partProps.getPropInt64("@offset"), idx, subname, thisFormatCrc);
  2005. }
  2006. catch (IException *E)
  2007. {
  2008. StringBuffer err;
  2009. err.append("Could not load file ");
  2010. fdesc->getTraceName(err);
  2011. DBGLOG(E, err.str());
  2012. if (!isOpt)
  2013. throw;
  2014. E->Release();
  2015. f->addFile(nullptr, 0, idx, nullptr, 0);
  2016. }
  2017. }
  2018. else
  2019. f->addFile(nullptr, 0, idx, nullptr, 0);
  2020. }
  2021. }
  2022. }
  2023. return f.getClear();
  2024. }
  2025. virtual IKeyArray *getKeyArray(bool isOpt, unsigned channel) const override
  2026. {
  2027. unsigned maxParts = 0;
  2028. ForEachItemIn(subFile, subFiles)
  2029. {
  2030. IFileDescriptor *fdesc = subFiles.item(subFile);
  2031. if (fdesc)
  2032. {
  2033. unsigned numParts = fdesc->numParts();
  2034. if (numParts > 1)
  2035. numParts--; // Don't include TLK
  2036. if (numParts > maxParts)
  2037. maxParts = numParts;
  2038. }
  2039. }
  2040. CriticalBlock b(lock);
  2041. IKeyArray *ret = keyArrayMap.get(channel);
  2042. if (!ret)
  2043. {
  2044. ret = createKeyArray(isOpt, channel, maxParts);
  2045. keyArrayMap.set(ret, channel);
  2046. }
  2047. return LINK(ret);
  2048. }
  2049. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  2050. {
  2051. Owned<IKeyArray> ret = ::createKeyArray();
  2052. if (channel)
  2053. {
  2054. ret->addKey(NULL);
  2055. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  2056. {
  2057. if (channel == getBondedChannel(partNo))
  2058. {
  2059. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2060. ForEachItemIn(idx, subFiles)
  2061. {
  2062. IFileDescriptor *fdesc = subFiles.item(idx);
  2063. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2064. Owned <ILazyFileIO> part;
  2065. unsigned crc = 0;
  2066. if (fdesc) // NB there may be no parts for this channel
  2067. {
  2068. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  2069. if (pdesc)
  2070. {
  2071. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  2072. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  2073. pdesc->getCrc(crc);
  2074. }
  2075. }
  2076. if (part)
  2077. {
  2078. if (lazyOpen)
  2079. {
  2080. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2081. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), false, false));
  2082. }
  2083. else
  2084. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), false, false));
  2085. }
  2086. else
  2087. keyset->addIndex(NULL);
  2088. }
  2089. ret->addKey(keyset.getClear());
  2090. }
  2091. else
  2092. ret->addKey(NULL);
  2093. }
  2094. }
  2095. else
  2096. {
  2097. // Channel 0 means return the TLK
  2098. IArrayOf<IKeyIndexBase> subkeys;
  2099. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2100. ForEachItemIn(idx, subFiles)
  2101. {
  2102. IFileDescriptor *fdesc = subFiles.item(idx);
  2103. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2104. Owned<IKeyIndexBase> key;
  2105. if (fdesc)
  2106. {
  2107. unsigned numParts = fdesc->numParts();
  2108. assertex(numParts > 0);
  2109. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  2110. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  2111. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  2112. unsigned crc = 0;
  2113. pdesc->getCrc(crc);
  2114. StringBuffer pname;
  2115. pdesc->getPath(pname);
  2116. if (lazyOpen)
  2117. {
  2118. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2119. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), numParts>1, false));
  2120. }
  2121. else
  2122. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), numParts>1, false));
  2123. keyset->addIndex(LINK(key->queryPart(0)));
  2124. }
  2125. else
  2126. keyset->addIndex(NULL);
  2127. }
  2128. if (keyset->numParts())
  2129. ret->addKey(keyset.getClear());
  2130. else if (!isOpt)
  2131. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  2132. else if (traceLevel > 4)
  2133. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  2134. }
  2135. return ret.getClear();
  2136. }
  2137. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IOutputMetaData *preloadLayout, bool preload) const
  2138. {
  2139. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  2140. // Failures to resolve will not be cached, only successes.
  2141. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  2142. CriticalBlock b(lock);
  2143. IInMemoryIndexManager *ret = indexMap.get(channel);
  2144. if (!ret)
  2145. {
  2146. ret = createInMemoryIndexManager(preloadLayout->queryRecordAccessor(true), isOpt, lfn);
  2147. Owned<IFileIOArray> files = getIFileIOArray(isOpt, channel);
  2148. ret->load(files, preloadLayout, preload); // note - files (passed in) are also channel specific
  2149. indexMap.set(ret, channel);
  2150. }
  2151. return LINK(ret);
  2152. }
  2153. virtual const CDateTime &queryTimeStamp() const
  2154. {
  2155. return fileTimeStamp;
  2156. }
  2157. virtual unsigned queryCheckSum() const
  2158. {
  2159. return fileCheckSum;
  2160. }
  2161. virtual offset_t getFileSize() const
  2162. {
  2163. return fileSize;
  2164. }
  2165. virtual hash64_t addHash64(hash64_t hashValue) const
  2166. {
  2167. hashValue = fileTimeStamp.getHash(hashValue);
  2168. if (fileCheckSum)
  2169. hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
  2170. return hashValue;
  2171. }
  2172. virtual void addSubFile(const IResolvedFile *_sub)
  2173. {
  2174. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  2175. if (subFiles.length())
  2176. assertex(sub->fileType==fileType);
  2177. else
  2178. fileType = sub->fileType;
  2179. subRFiles.append((IResolvedFile &) *LINK(_sub));
  2180. ForEachItemIn(idx, sub->subFiles)
  2181. {
  2182. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  2183. }
  2184. }
  2185. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  2186. {
  2187. addFile(lfn, _sub, _remoteSub);
  2188. }
  2189. virtual void addSubFile(const char *localFileName)
  2190. {
  2191. Owned<IFile> file = createIFile(localFileName);
  2192. assertex(file->exists());
  2193. offset_t size = file->size();
  2194. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  2195. Owned<IPropertyTree> pp = createPTree("Part", ipt_lowmem);
  2196. pp->setPropInt64("@size",size);
  2197. pp->setPropBool("@local", true);
  2198. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  2199. addSubFile(fdesc.getClear(), NULL);
  2200. }
  2201. virtual void setCache(IResolvedFileCache *cache)
  2202. {
  2203. if (cached)
  2204. {
  2205. if (traceLevel > 9)
  2206. DBGLOG("setCache removing from prior cache %s", queryFileName());
  2207. if (cache==NULL)
  2208. cached->removeCache(this);
  2209. else
  2210. throwUnexpected();
  2211. }
  2212. cached = cache;
  2213. }
  2214. virtual bool isAlive() const
  2215. {
  2216. return CInterface::isAlive();
  2217. }
  2218. virtual const char *queryFileName() const
  2219. {
  2220. return lfn.get();
  2221. }
  2222. virtual const char *queryPhysicalName() const
  2223. {
  2224. return physicalName.get();
  2225. }
  2226. virtual const IPropertyTree *queryProperties() const
  2227. {
  2228. return properties;
  2229. }
  2230. virtual void remove()
  2231. {
  2232. subFiles.kill();
  2233. subDFiles.kill();
  2234. subRFiles.kill();
  2235. subNames.kill();
  2236. remoteSubFiles.kill();
  2237. properties.clear();
  2238. notifier.clear();
  2239. if (isSuper)
  2240. {
  2241. // Because we don't lock superfiles, we need to behave differently
  2242. UNIMPLEMENTED;
  2243. }
  2244. else if (dFile)
  2245. {
  2246. dFile->detach();
  2247. }
  2248. else if (!physicalName.isEmpty())
  2249. {
  2250. try
  2251. {
  2252. Owned<IFile> file = createIFile(physicalName.get());
  2253. file->remove();
  2254. }
  2255. catch (IException *e)
  2256. {
  2257. ERRLOG(-1, "Error removing file %s (%s)", lfn.get(), physicalName.get());
  2258. e->Release();
  2259. }
  2260. }
  2261. }
  2262. virtual bool exists() const
  2263. {
  2264. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  2265. // This will make more sense if/when we start to lock earlier.
  2266. if (dFile || isSuper)
  2267. return true; // MORE - may need some thought - especially the isSuper case
  2268. else if (!physicalName.isEmpty())
  2269. return checkFileExists(physicalName.get());
  2270. else
  2271. return false;
  2272. }
  2273. };
  2274. /*----------------------------------------------------------------------------------------------------------
  2275. MORE
  2276. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  2277. ------------------------------------------------------------------------------------------------------------*/
  2278. class CSlaveDynamicFile : public CResolvedFile
  2279. {
  2280. public:
  2281. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  2282. bool isLocal;
  2283. unsigned channel;
  2284. unsigned serverIdx;
  2285. public:
  2286. CSlaveDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  2287. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
  2288. {
  2289. // call back to the server to get the info
  2290. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  2291. try
  2292. {
  2293. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK);
  2294. bool ok = false;
  2295. for (unsigned i = 0; i < callbackRetries; i++)
  2296. {
  2297. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  2298. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  2299. char *buf = (char *) output->getBuffer(len, true);
  2300. buf[0] = isOpt;
  2301. buf[1] = isLocal;
  2302. strcpy(buf+2, lfn.get());
  2303. output->putBuffer(buf, len, true);
  2304. output->flush(true);
  2305. output.clear();
  2306. if (callback->wait(callbackTimeout))
  2307. {
  2308. ok = true;
  2309. break;
  2310. }
  2311. else
  2312. {
  2313. DBGLOG("timed out waiting for server callback - retrying");
  2314. }
  2315. }
  2316. if (ok)
  2317. {
  2318. if (traceLevel > 6)
  2319. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  2320. MemoryBuffer &serverData = callback->queryData();
  2321. byte type;
  2322. serverData.read(type);
  2323. fileType = (RoxieFileType) type;
  2324. fileTimeStamp.deserialize(serverData);
  2325. serverData.read(fileCheckSum);
  2326. serverData.read(fileSize);
  2327. unsigned numSubFiles;
  2328. serverData.read(numSubFiles);
  2329. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  2330. {
  2331. StringBuffer subName;
  2332. serverData.read(subName);
  2333. subNames.append(subName.str());
  2334. deserializeFilePart(serverData, subFiles, fileNo, false);
  2335. bool remotePresent;
  2336. serverData.read(remotePresent);
  2337. if (remotePresent)
  2338. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  2339. else
  2340. remoteSubFiles.append(NULL);
  2341. unsigned formatCrc;
  2342. serverData.read(formatCrc);
  2343. formatCrcs.append(formatCrc);
  2344. byte diskTypeInfoPresent;
  2345. serverData.read(diskTypeInfoPresent);
  2346. switch (diskTypeInfoPresent)
  2347. {
  2348. case 0:
  2349. diskTypeInfo.append(NULL);
  2350. break;
  2351. case 1:
  2352. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, false, nullptr));
  2353. break;
  2354. case 2:
  2355. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, true, nullptr));
  2356. break;
  2357. case 3:
  2358. assertex(fileNo > 0);
  2359. diskTypeInfo.append(LINK(diskTypeInfo.item(fileNo-1)));
  2360. break;
  2361. default:
  2362. throwUnexpected();
  2363. }
  2364. }
  2365. bool propertiesPresent;
  2366. serverData.read(propertiesPresent);
  2367. if (propertiesPresent)
  2368. properties.setown(createPTree(serverData, ipt_lowmem));
  2369. }
  2370. else
  2371. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  2372. }
  2373. catch (...)
  2374. {
  2375. ROQ->removePendingCallback(callback);
  2376. throw;
  2377. }
  2378. ROQ->removePendingCallback(callback);
  2379. }
  2380. private:
  2381. void deserializeFilePart(MemoryBuffer &serverData, IPointerArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  2382. {
  2383. IArrayOf<IPartDescriptor> parts;
  2384. deserializePartFileDescriptors(serverData, parts);
  2385. if (parts.length())
  2386. {
  2387. files.append(LINK(&parts.item(0).queryOwner()));
  2388. }
  2389. else
  2390. {
  2391. if (traceLevel > 6)
  2392. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  2393. files.append(NULL);
  2394. }
  2395. }
  2396. };
  2397. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  2398. {
  2399. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
  2400. }
  2401. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
  2402. {
  2403. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  2404. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
  2405. }
  2406. class CSlaveDynamicFileCache : implements ISlaveDynamicFileCache, public CInterface
  2407. {
  2408. unsigned tableSize;
  2409. mutable CriticalSection crit;
  2410. CIArrayOf<CSlaveDynamicFile> files; // expect numbers to be small - probably not worth hashing
  2411. public:
  2412. IMPLEMENT_IINTERFACE;
  2413. CSlaveDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  2414. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal) override
  2415. {
  2416. if (logctx.queryTraceLevel() > 5)
  2417. {
  2418. StringBuffer s;
  2419. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  2420. }
  2421. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  2422. CriticalBlock b(crit);
  2423. if (!cacheDate.isNull())
  2424. {
  2425. unsigned idx = 0;
  2426. while (files.isItem(idx))
  2427. {
  2428. CSlaveDynamicFile &f = files.item(idx);
  2429. if (f.channel==header->channel && f.serverIdx==header->serverIdx && stricmp(f.queryFileName(), lfn)==0)
  2430. {
  2431. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  2432. {
  2433. if (f.isKey())
  2434. clearKeyStoreCacheEntry(f.queryFileName());
  2435. files.remove(idx);
  2436. idx--;
  2437. }
  2438. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  2439. {
  2440. files.swap(idx, 0);
  2441. return LINK(&f);
  2442. }
  2443. }
  2444. idx++;
  2445. }
  2446. }
  2447. Owned<CSlaveDynamicFile> ret;
  2448. {
  2449. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  2450. CriticalUnblock b1(crit);
  2451. ret.setown(new CSlaveDynamicFile(logctx, lfn, header, isOpt, isLocal));
  2452. }
  2453. while (files.length() > tableSize)
  2454. files.remove(files.length()-1);
  2455. files.add(*ret.getLink(), 0);
  2456. return ret.getClear();
  2457. }
  2458. virtual void releaseAll() override
  2459. {
  2460. CriticalBlock b(crit);
  2461. files.kill();
  2462. }
  2463. };
  2464. static CriticalSection slaveDynamicFileCacheCrit;
  2465. static Owned<ISlaveDynamicFileCache> slaveDynamicFileCache;
  2466. extern ISlaveDynamicFileCache *querySlaveDynamicFileCache()
  2467. {
  2468. if (!slaveDynamicFileCache)
  2469. {
  2470. CriticalBlock b(slaveDynamicFileCacheCrit);
  2471. if (!slaveDynamicFileCache)
  2472. slaveDynamicFileCache.setown(new CSlaveDynamicFileCache(20));
  2473. }
  2474. return slaveDynamicFileCache;
  2475. }
  2476. extern void releaseSlaveDynamicFileCache()
  2477. {
  2478. CriticalBlock b(slaveDynamicFileCacheCrit);
  2479. if (slaveDynamicFileCache)
  2480. slaveDynamicFileCache->releaseAll();
  2481. }
  2482. // Initialization/termination
  2483. MODULE_INIT(INIT_PRIORITY_STANDARD)
  2484. {
  2485. fileCache = new CRoxieFileCache;
  2486. return true;
  2487. }
  2488. MODULE_EXIT()
  2489. {
  2490. fileCache->join();
  2491. fileCache->Release();
  2492. }
  2493. extern IRoxieFileCache &queryFileCache()
  2494. {
  2495. return *fileCache;
  2496. }
  2497. class CRoxieWriteHandler : implements IRoxieWriteHandler, public CInterface
  2498. {
  2499. public:
  2500. IMPLEMENT_IINTERFACE;
  2501. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2502. : daliHelper(_daliHelper), dFile(_dFile)
  2503. {
  2504. ForEachItemIn(idx, _clusters)
  2505. {
  2506. addCluster(_clusters.item(idx));
  2507. }
  2508. if (dFile->queryDistributedFile())
  2509. {
  2510. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  2511. if (isTemporary)
  2512. {
  2513. UNIMPLEMENTED;
  2514. }
  2515. else
  2516. localFile.setown(dFile->getPartFile(0, 0));
  2517. }
  2518. else
  2519. {
  2520. isTemporary = false;
  2521. localFile.setown(dFile->getPartFile(0, 0));
  2522. }
  2523. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  2524. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  2525. }
  2526. virtual IFile *queryFile() const
  2527. {
  2528. return localFile;
  2529. }
  2530. void getClusters(StringArray &clusters) const
  2531. {
  2532. ForEachItemIn(idx, allClusters)
  2533. {
  2534. clusters.append(allClusters.item(idx));
  2535. }
  2536. }
  2537. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  2538. {
  2539. if (success)
  2540. {
  2541. copyPhysical();
  2542. if (daliHelper && daliHelper->connected())
  2543. publish(activity);
  2544. }
  2545. if (isTemporary || !success)
  2546. {
  2547. localFile->remove();
  2548. }
  2549. }
  2550. private:
  2551. bool isTemporary;
  2552. Linked<IRoxieDaliHelper> daliHelper;
  2553. Owned<ILocalOrDistributedFile> dFile;
  2554. Owned<IFile> localFile;
  2555. Owned<IGroup> localCluster;
  2556. StringAttr localClusterName;
  2557. IArrayOf<IGroup> remoteNodes;
  2558. StringArray allClusters;
  2559. void copyPhysical() const
  2560. {
  2561. if (remoteNodes.length())
  2562. {
  2563. RemoteFilename rfn, rdn;
  2564. dFile->getPartFilename(rfn, 0, 0);
  2565. StringBuffer physicalName, physicalDir, physicalBase;
  2566. rfn.getLocalPath(physicalName);
  2567. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2568. rdn.setLocalPath(physicalDir.str());
  2569. ForEachItemIn(idx, remoteNodes)
  2570. {
  2571. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2572. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2573. Owned<IFile> targetdir = createIFile(rdn);
  2574. Owned<IFile> target = createIFile(rfn);
  2575. targetdir->createDirectory();
  2576. copyFile(target, localFile);
  2577. }
  2578. }
  2579. }
  2580. void publish(const IRoxiePublishCallback *activity)
  2581. {
  2582. if (!dFile->isExternal())
  2583. {
  2584. Owned<IFileDescriptor> desc = createFileDescriptor();
  2585. desc->setNumParts(1);
  2586. RemoteFilename rfn;
  2587. dFile->getPartFilename(rfn, 0, 0);
  2588. StringBuffer physicalName, physicalDir, physicalBase;
  2589. rfn.getLocalPath(physicalName);
  2590. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2591. desc->setDefaultDir(physicalDir.str());
  2592. desc->setPartMask(physicalBase.str());
  2593. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  2594. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  2595. offset_t fileSize = localFile->size();
  2596. fileProps.setPropInt64("@size", fileSize);
  2597. partProps.setPropInt64("@size", fileSize);
  2598. CDateTime createTime, modifiedTime, accessedTime;
  2599. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  2600. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  2601. unsigned hour, min, sec, nanosec;
  2602. modifiedTime.getTime(hour, min, sec, nanosec);
  2603. modifiedTime.setTime(hour, min, sec, 0);
  2604. StringBuffer timestr;
  2605. modifiedTime.getString(timestr);
  2606. if(timestr.length())
  2607. partProps.setProp("@modified", timestr.str());
  2608. ClusterPartDiskMapSpec partmap;
  2609. if (localCluster)
  2610. {
  2611. desc->addCluster(localCluster, partmap);
  2612. desc->setClusterGroupName(0, localClusterName.get());
  2613. }
  2614. ForEachItemIn(idx, remoteNodes)
  2615. desc->addCluster(&remoteNodes.item(idx), partmap);
  2616. if (activity)
  2617. activity->setFileProperties(desc);
  2618. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  2619. publishFile->setAccessedTime(modifiedTime);
  2620. IUserDescriptor * userdesc = NULL;
  2621. if (activity)
  2622. userdesc = activity->queryUserDescriptor();
  2623. else
  2624. {
  2625. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  2626. if (daliHelper)
  2627. userdesc = daliHelper->queryUserDescriptor();//predeployed query mode
  2628. }
  2629. publishFile->attach(dFile->queryLogicalName(), userdesc);
  2630. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  2631. }
  2632. }
  2633. void addCluster(char const * cluster)
  2634. {
  2635. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  2636. if (!group)
  2637. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  2638. cluster, dFile->queryLogicalName());
  2639. if (group->isMember())
  2640. {
  2641. if (localCluster)
  2642. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2643. cluster, dFile->queryLogicalName());
  2644. localCluster.setown(group.getClear());
  2645. localClusterName.set(cluster);
  2646. }
  2647. else
  2648. {
  2649. ForEachItemIn(idx, remoteNodes)
  2650. {
  2651. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  2652. if (group->isMember(other))
  2653. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2654. cluster, dFile->queryLogicalName());
  2655. }
  2656. remoteNodes.append(*group.getClear());
  2657. }
  2658. allClusters.append(cluster);
  2659. }
  2660. };
  2661. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2662. {
  2663. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  2664. }
  2665. //================================================================================================================
  2666. #ifdef _USE_CPPUNIT
  2667. #include "unittests.hpp"
  2668. class CcdFileTest : public CppUnit::TestFixture
  2669. {
  2670. CPPUNIT_TEST_SUITE(CcdFileTest);
  2671. CPPUNIT_TEST(testCopy);
  2672. CPPUNIT_TEST_SUITE_END();
  2673. protected:
  2674. class DummyPartDescriptor : public CInterfaceOf<IPartDescriptor>
  2675. {
  2676. virtual unsigned queryPartIndex() { UNIMPLEMENTED; }
  2677. virtual unsigned numCopies() { UNIMPLEMENTED; }
  2678. virtual INode *getNode(unsigned copy=0) { UNIMPLEMENTED; }
  2679. virtual INode *queryNode(unsigned copy=0) { UNIMPLEMENTED; }
  2680. virtual IPropertyTree &queryProperties() { UNIMPLEMENTED; }
  2681. virtual IPropertyTree *getProperties() { UNIMPLEMENTED; }
  2682. virtual RemoteFilename &getFilename(unsigned copy, RemoteFilename &rfn) { UNIMPLEMENTED; }
  2683. virtual StringBuffer &getTail(StringBuffer &name) { UNIMPLEMENTED; }
  2684. virtual StringBuffer &getDirectory(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  2685. virtual StringBuffer &getPath(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  2686. virtual void serialize(MemoryBuffer &tgt) { UNIMPLEMENTED; }
  2687. virtual bool isMulti() { UNIMPLEMENTED; }
  2688. virtual RemoteMultiFilename &getMultiFilename(unsigned copy, RemoteMultiFilename &rfn) { UNIMPLEMENTED; }
  2689. virtual bool getCrc(unsigned &crc) { UNIMPLEMENTED; }
  2690. virtual IFileDescriptor &queryOwner() { UNIMPLEMENTED; }
  2691. virtual const char *queryOverrideName() { UNIMPLEMENTED; }
  2692. virtual unsigned copyClusterNum(unsigned copy,unsigned *replicate=NULL) { UNIMPLEMENTED; }
  2693. virtual IReplicatedFile *getReplicatedFile() { UNIMPLEMENTED; }
  2694. };
  2695. void testCopy()
  2696. {
  2697. remove("test.local");
  2698. remove("test.remote");
  2699. remove("test.buddy");
  2700. CRoxieFileCache cache(true);
  2701. StringArray remotes;
  2702. DummyPartDescriptor pdesc;
  2703. CDateTime dummy;
  2704. remotes.append("test.remote");
  2705. int f = open("test.remote", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  2706. CPPUNIT_ASSERT(f >= 0);
  2707. int val = 1;
  2708. int wrote = write(f, &val, sizeof(int));
  2709. CPPUNIT_ASSERT(wrote==sizeof(int));
  2710. close(f);
  2711. Owned<ILazyFileIO> io = cache.openFile("test.local", 0, "test.local", NULL, remotes, sizeof(int), dummy);
  2712. CPPUNIT_ASSERT(io != NULL);
  2713. // Reading it should read 1
  2714. val = 0;
  2715. ssize_t bytesRead = io->read(0, sizeof(int), &val);
  2716. CPPUNIT_ASSERT(bytesRead==4);
  2717. CPPUNIT_ASSERT(val==1);
  2718. // Now create the buddy
  2719. f = open("test.buddy", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  2720. val = 2;
  2721. ssize_t numwritten = write(f, &val, sizeof(int));
  2722. CPPUNIT_ASSERT(numwritten == sizeof(int));
  2723. close(f);
  2724. // Reading it should still read 1...
  2725. val = 0;
  2726. io->read(0, sizeof(int), &val);
  2727. CPPUNIT_ASSERT(val==1);
  2728. // Now copy it - should copy the buddy
  2729. cache.doCopy(io, false, false);
  2730. // Reading it should read 2...
  2731. val = 0;
  2732. io->read(0, sizeof(int), &val);
  2733. CPPUNIT_ASSERT(val==2);
  2734. // And the data in the file should be 2
  2735. f = open("test.local", _O_RDONLY);
  2736. val = 0;
  2737. ssize_t numread = read(f, &val, sizeof(int));
  2738. CPPUNIT_ASSERT(numread == sizeof(int));
  2739. close(f);
  2740. CPPUNIT_ASSERT(val==2);
  2741. io.clear();
  2742. remove("test.local");
  2743. remove("test.remote");
  2744. remove("test.buddy");
  2745. }
  2746. };
  2747. CPPUNIT_TEST_SUITE_REGISTRATION( CcdFileTest );
  2748. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CcdFileTest, "CcdFileTest" );
  2749. #endif