ccdfile.cpp 137 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "udptopo.hpp"
  24. #include "ccd.hpp"
  25. #include "ccdfile.hpp"
  26. #include "ccdquery.hpp"
  27. #include "ccdstate.hpp"
  28. #include "ccdsnmp.hpp"
  29. #include "rmtfile.hpp"
  30. #include "ccdqueue.ipp"
  31. #include "ccdcache.hpp"
  32. #if defined(__linux__) || defined(__APPLE__)
  33. #include <sys/mman.h>
  34. #endif
  35. #if defined (__linux__)
  36. #include <sys/syscall.h>
  37. #include "ioprio.h"
  38. #endif
  39. #include "thorcommon.hpp"
  40. #include "eclhelper_dyn.hpp"
  41. #include "rtldynfield.hpp"
  42. std::atomic<unsigned> numFilesOpen[2];
  43. #define MAX_READ_RETRIES 2
  44. #ifdef _DEBUG
  45. //#define FAIL_20_READ
  46. //#define FAIL_20_OPEN
  47. #endif
  48. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  49. class DECL_EXCEPTION NotYetOpenException : implements IException, public CInterface
  50. {
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. virtual int errorCode() const { return 0; }
  54. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  55. virtual MessageAudience errorAudience() const { return MSGAUD_programmer; }
  56. };
  57. class CFailingFileIO : implements IFileIO, public CInterface
  58. {
  59. #define THROWNOTOPEN throw new NotYetOpenException()
  60. public:
  61. IMPLEMENT_IINTERFACE;
  62. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  63. virtual offset_t size() { THROWNOTOPEN; }
  64. virtual void flush() { THROWNOTOPEN; }
  65. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  66. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  67. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  68. virtual void close() { }
  69. virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
  70. } failure;
  71. class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public CInterface
  72. {
  73. protected:
  74. IArrayOf<IFile> sources;
  75. Owned<IFile> logical;
  76. Owned<IFileIO> current;
  77. Owned<IMemoryMappedFile> mmapped;
  78. mutable CriticalSection crit;
  79. offset_t fileSize;
  80. unsigned currentIdx;
  81. unsigned lastAccess;
  82. CDateTime fileDate;
  83. bool copying = false;
  84. bool isCompressed = false;
  85. bool remote = false;
  86. IRoxieFileCache *cached = nullptr;
  87. unsigned fileIdx = 0;
  88. unsigned crc = 0;
  89. CRuntimeStatisticCollection fileStats;
  90. #ifdef FAIL_20_READ
  91. unsigned readCount;
  92. #endif
  93. public:
  94. IMPLEMENT_IINTERFACE;
  95. CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed, unsigned _crc)
  96. : logical(_logical), fileSize(size), isCompressed(_isCompressed), crc(_crc), fileStats(diskLocalStatistics)
  97. {
  98. fileDate.set(_date);
  99. currentIdx = 0;
  100. current.set(&failure);
  101. #ifdef FAIL_20_READ
  102. readCount = 0;
  103. #endif
  104. lastAccess = msTick();
  105. }
  106. ~CRoxieLazyFileIO()
  107. {
  108. setFailure(); // ensures the open file count properly maintained
  109. }
  110. virtual void beforeDispose()
  111. {
  112. if (cached)
  113. cached->removeCache(this);
  114. }
  115. virtual unsigned getFileIdx() const override
  116. {
  117. return fileIdx;
  118. }
  119. virtual unsigned getCrc() const override
  120. {
  121. return crc;
  122. }
  123. void setCache(IRoxieFileCache *cache, unsigned _fileIdx)
  124. {
  125. assertex(!cached);
  126. cached = cache;
  127. fileIdx = _fileIdx;
  128. }
  129. void removeCache(const IRoxieFileCache *cache)
  130. {
  131. assertex(cached==cache);
  132. cached = NULL;
  133. }
  134. inline void setRemote(bool _remote) { remote = _remote; }
  135. virtual void setCopying(bool _copying)
  136. {
  137. CriticalBlock b(crit);
  138. if (remote && currentIdx)
  139. {
  140. // The current location is not our preferred location. Recheck whether we can now access our preferred location
  141. setFailure();
  142. currentIdx = 0;
  143. _checkOpen();
  144. }
  145. copying = _copying;
  146. }
  147. virtual void dump() const
  148. {
  149. CriticalBlock b(crit);
  150. DBGLOG("LazyFileIO object %s has %d sources:", queryFilename(), sources.ordinality());
  151. ForEachItemIn(idx, sources)
  152. {
  153. DBGLOG("%c %s", idx==currentIdx ? '*' : ' ', sources.item(idx).queryFilename());
  154. }
  155. }
  156. virtual bool isCopying() const
  157. {
  158. CriticalBlock b(crit);
  159. return copying;
  160. }
  161. virtual bool isOpen() const
  162. {
  163. CriticalBlock b(crit);
  164. return current.get() != &failure;
  165. }
  166. virtual unsigned getLastAccessed() const
  167. {
  168. CriticalBlock b(crit);
  169. return lastAccess;
  170. }
  171. virtual void close()
  172. {
  173. CriticalBlock b(crit);
  174. setFailure();
  175. }
  176. virtual bool isRemote()
  177. {
  178. CriticalBlock b(crit);
  179. return remote;
  180. }
  181. void setFailure()
  182. {
  183. try
  184. {
  185. if (current.get()==&failure)
  186. return;
  187. numFilesOpen[remote]--;
  188. mergeStats(fileStats, current);
  189. current.set(&failure);
  190. }
  191. catch (IException *E)
  192. {
  193. if (traceLevel > 5)
  194. {
  195. StringBuffer s;
  196. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  197. }
  198. E->Release();
  199. }
  200. }
  201. void checkOpen()
  202. {
  203. CriticalBlock b(crit);
  204. _checkOpen();
  205. }
  206. IFileIO *getCheckOpen(unsigned &activeIdx)
  207. {
  208. CriticalBlock b(crit);
  209. _checkOpen();
  210. activeIdx = currentIdx;
  211. return LINK(current);
  212. }
  213. void _checkOpen()
  214. {
  215. if (current.get() == &failure)
  216. {
  217. StringBuffer filesTried;
  218. unsigned tries = 0;
  219. bool firstTime = true;
  220. RoxieFileStatus fileStatus = FileNotFound;
  221. for (;;)
  222. {
  223. if (currentIdx >= sources.length())
  224. currentIdx = 0;
  225. if (tries==sources.length())
  226. {
  227. if (firstTime) // if first time - reset and try again
  228. {
  229. firstTime = false;
  230. tries = 0;
  231. }
  232. else
  233. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  234. }
  235. const char *sourceName = sources.item(currentIdx).queryFilename();
  236. if (traceLevel > 10)
  237. DBGLOG("Trying to open %s", sourceName);
  238. try
  239. {
  240. #ifdef FAIL_20_OPEN
  241. openCount++;
  242. if ((openCount % 5) == 0)
  243. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  244. #endif
  245. IFile *f = &sources.item(currentIdx);
  246. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, isCompressed, false);
  247. if (fileStatus == FileIsValid)
  248. {
  249. if (isCompressed)
  250. current.setown(createCompressedFileReader(f));
  251. else
  252. current.setown(f->open(IFOread));
  253. if (current)
  254. {
  255. if (traceLevel > 5)
  256. DBGLOG("Opening %s", sourceName);
  257. disconnectRemoteIoOnExit(current);
  258. break;
  259. }
  260. // throwUnexpected(); - try another location if this one has the wrong version of the file
  261. }
  262. disconnectRemoteFile(f);
  263. }
  264. catch (IException *E)
  265. {
  266. E->Release();
  267. }
  268. currentIdx++;
  269. tries++;
  270. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  271. {
  272. filesTried.appendf(" %s", sourceName); // only need to build this list once
  273. switch (fileStatus)
  274. {
  275. case FileNotFound:
  276. filesTried.append(": FileNotFound");
  277. break;
  278. case FileSizeMismatch:
  279. filesTried.append(": FileSizeMismatch");
  280. break;
  281. case FileDateMismatch:
  282. filesTried.append(": FileDateMismatch");
  283. break;
  284. }
  285. }
  286. }
  287. lastAccess = msTick();
  288. if (++numFilesOpen[remote] > maxFilesOpen[remote])
  289. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  290. }
  291. }
  292. virtual void addSource(IFile *newSource)
  293. {
  294. if (newSource)
  295. {
  296. if (traceLevel > 10)
  297. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  298. CriticalBlock b(crit);
  299. sources.append(*newSource);
  300. }
  301. }
  302. virtual size32_t read(offset_t pos, size32_t len, void * data)
  303. {
  304. unsigned activeIdx;
  305. Owned<IFileIO> active = getCheckOpen(activeIdx);
  306. unsigned tries = 0;
  307. for (;;)
  308. {
  309. try
  310. {
  311. size32_t ret = active->read(pos, len, data);
  312. lastAccess = msTick();
  313. if (cached && !remote)
  314. cached->noteRead(fileIdx, pos, ret);
  315. return ret;
  316. }
  317. catch (NotYetOpenException *E)
  318. {
  319. E->Release();
  320. }
  321. catch (IException *E)
  322. {
  323. EXCLOG(MCoperatorError, E, "Read error");
  324. E->Release();
  325. OERRLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(activeIdx).queryFilename());
  326. {
  327. CriticalBlock b(crit);
  328. if (currentIdx == activeIdx)
  329. {
  330. currentIdx = activeIdx+1;
  331. setFailure();
  332. }
  333. }
  334. }
  335. active.setown(getCheckOpen(activeIdx));
  336. tries++;
  337. if (tries == MAX_READ_RETRIES)
  338. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(activeIdx).queryFilename(), tries);
  339. }
  340. }
  341. virtual void flush()
  342. {
  343. Linked<IFileIO> active;
  344. {
  345. CriticalBlock b(crit);
  346. active.set(current);
  347. }
  348. if (active.get() != &failure)
  349. active->flush();
  350. }
  351. virtual offset_t size()
  352. {
  353. unsigned activeIdx;
  354. Owned<IFileIO> active = getCheckOpen(activeIdx);
  355. lastAccess = msTick();
  356. return active->size();
  357. }
  358. virtual unsigned __int64 getStatistic(StatisticKind kind)
  359. {
  360. unsigned __int64 v = fileStats.getStatisticValue(kind);
  361. CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
  362. return v + current->getStatistic(kind);
  363. }
  364. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  365. virtual void setSize(offset_t size) { throwUnexpected(); }
  366. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  367. virtual const char *queryFilename() const { return logical->queryFilename(); }
  368. virtual bool isAliveAndLink() const { return CInterface::isAliveAndLink(); }
  369. virtual IMemoryMappedFile *getMappedFile() override
  370. {
  371. CriticalBlock b(crit);
  372. if (mmapped)
  373. return mmapped.getLink();
  374. if (!remote)
  375. {
  376. mmapped.setown(logical->openMemoryMapped());
  377. return mmapped.getLink();
  378. }
  379. return nullptr;
  380. }
  381. virtual IFileIO *getFileIO() override
  382. {
  383. return LINK(this);
  384. }
  385. virtual bool createHardFileLink()
  386. {
  387. unsigned tries = 0;
  388. for (;;)
  389. {
  390. StringBuffer filesTried;
  391. if (currentIdx >= sources.length())
  392. currentIdx = 0;
  393. if (tries==sources.length())
  394. return false;
  395. const char *sourceName = sources.item(currentIdx).queryFilename();
  396. filesTried.appendf(" %s", sourceName);
  397. try
  398. {
  399. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, isCompressed) == FileIsValid)
  400. {
  401. StringBuffer source_drive;
  402. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  403. StringBuffer query_drive;
  404. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  405. // only try to create link if on the same drive
  406. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  407. {
  408. try
  409. {
  410. DBGLOG("Trying to create Hard Link for %s", sourceName);
  411. createHardLink(logical->queryFilename(), sourceName);
  412. current.setown(sources.item(currentIdx).open(IFOread));
  413. return true;
  414. }
  415. catch(IException *E)
  416. {
  417. StringBuffer err;
  418. OERRLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  419. E->Release();
  420. }
  421. }
  422. }
  423. }
  424. catch (IException *E)
  425. {
  426. E->Release();
  427. }
  428. currentIdx++;
  429. tries++;
  430. }
  431. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  432. return false; // if we get here - no hardlink
  433. }
  434. void copyComplete()
  435. {
  436. CriticalBlock b(crit);
  437. setFailure(); // lazyOpen will then reopen it...
  438. currentIdx = 0;
  439. remote = false;
  440. copying = false;
  441. sources.kill();
  442. sources.add(*logical.getLink(), 0);
  443. if (!lazyOpen)
  444. _checkOpen();
  445. }
  446. bool checkCopyComplete()
  447. {
  448. CriticalBlock b(crit);
  449. if (logical->exists()) // MORE - do we need to check data/size etc? do we have the info to do so?
  450. {
  451. copyComplete();
  452. return true;
  453. }
  454. return false;
  455. }
  456. virtual IFile *querySource()
  457. {
  458. CriticalBlock b(crit);
  459. _checkOpen();
  460. return &sources.item(currentIdx);
  461. };
  462. virtual IFile *queryTarget() { return logical; }
  463. virtual offset_t getSize() { return fileSize; }
  464. virtual CDateTime *queryDateTime() { return &fileDate; }
  465. static int compareAccess(IInterface * const *L, IInterface * const *R)
  466. {
  467. ILazyFileIO *LL = (ILazyFileIO *) *L;
  468. ILazyFileIO *RR = (ILazyFileIO *) *R;
  469. return LL->getLastAccessed() - RR->getLastAccessed();
  470. }
  471. };
  472. //----------------------------------------------------------------------------------------------
  473. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  474. {
  475. if (!remoteFDesc)
  476. return NULL;
  477. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  478. if (!remotePDesc)
  479. return NULL;
  480. unsigned int crc, remoteCrc;
  481. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  482. return remotePDesc;
  483. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  484. return remotePDesc;
  485. return NULL;
  486. }
  487. static int getClusterPriority(const char *clusterName)
  488. {
  489. assertex(preferredClusters);
  490. int *priority = preferredClusters->getValue(clusterName);
  491. return priority ? *priority : 100;
  492. }
  493. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
  494. {
  495. if (traceRemoteFiles)
  496. DBGLOG("appendRemoteLocations lfn=%s fromCluster=%s, includeFromCluster=%s", nullText(localFileName), nullText(fromCluster), boolToStr(includeFromCluster));
  497. IFileDescriptor &fdesc = pdesc->queryOwner();
  498. unsigned numCopies = pdesc->numCopies();
  499. unsigned lastClusterNo = (unsigned) -1;
  500. unsigned numThisCluster = 0;
  501. unsigned initialSize = locations.length();
  502. int priority = 0;
  503. IntArray priorities;
  504. for (unsigned copy = 0; copy < numCopies; copy++)
  505. {
  506. unsigned clusterNo = pdesc->copyClusterNum(copy);
  507. StringBuffer clusterName;
  508. fdesc.getClusterGroupName(clusterNo, clusterName);
  509. if (traceRemoteFiles)
  510. DBGLOG("appendRemoteLocations found entry in cluster %s", clusterName.str());
  511. if (fromCluster && *fromCluster)
  512. {
  513. bool matches = strieq(clusterName.str(), fromCluster);
  514. if (matches!=includeFromCluster)
  515. continue;
  516. }
  517. RemoteFilename r;
  518. pdesc->getFilename(copy,r);
  519. StringBuffer path;
  520. r.getRemotePath(path);
  521. if (localFileName && r.isLocal())
  522. {
  523. StringBuffer l;
  524. r.getLocalPath(l);
  525. if (streq(l, localFileName))
  526. continue; // don't add ourself
  527. }
  528. if (clusterNo == lastClusterNo)
  529. {
  530. numThisCluster++;
  531. if (numThisCluster > 2) // Don't add more than 2 from one cluster
  532. continue;
  533. }
  534. else
  535. {
  536. numThisCluster = 1;
  537. lastClusterNo = clusterNo;
  538. if (preferredClusters)
  539. {
  540. priority = getClusterPriority(clusterName);
  541. }
  542. else
  543. priority = copy;
  544. }
  545. if (priority >= 0)
  546. {
  547. ForEachItemIn(idx, priorities)
  548. {
  549. if (priorities.item(idx) < priority)
  550. break;
  551. }
  552. priorities.add(priority, idx);
  553. locations.add(path.str(), idx+initialSize);
  554. if (traceRemoteFiles)
  555. DBGLOG("appendRemoteLocations adding location %s at position %u", path.str(), idx+initialSize);
  556. }
  557. }
  558. }
  559. //----------------------------------------------------------------------------------------------
  560. typedef StringArray *StringArrayPtr;
  561. // A circular buffer recording recent disk read operations that can be used to "prewarm" the cache
  562. class CacheReportingBuffer : public CInterfaceOf<ICacheInfoRecorder>
  563. {
  564. // A circular buffer recording recent file activity. Note that noteRead() and clear() may be called from multiple threads
  565. // (other functions are assumed single-threaded) and that locking is kept to a minimum, even if it means information may be slightly inaccurate.
  566. CacheInfoEntry *recentReads = nullptr;
  567. std::atomic<unsigned> recentReadHead = {0};
  568. unsigned recentReadSize;
  569. public:
  570. CacheReportingBuffer(offset_t trackSize)
  571. {
  572. recentReadSize = trackSize >> CacheInfoEntry::pageBits;
  573. if (traceLevel)
  574. DBGLOG("Creating CacheReportingBuffer with %d elements", recentReadSize);
  575. if (!recentReadSize)
  576. throw makeStringExceptionV(ROXIE_FILE_ERROR, "cacheTrackSize(%u) is the size in bytes it cannot be < %u", (unsigned)trackSize, 1U << CacheInfoEntry::pageBits);
  577. recentReads = new CacheInfoEntry[recentReadSize];
  578. recentReadHead = 0;
  579. }
  580. CacheReportingBuffer(const CacheReportingBuffer &from)
  581. {
  582. // NOTE - from may be updated concurrently - we do not want to lock it
  583. // There are therefore races in here, but they do not matter (may result in very recent data being regarded as very old or vice versa).
  584. recentReadSize = from.recentReadSize;
  585. recentReadHead = from.recentReadHead.load(std::memory_order_relaxed);
  586. recentReads = new CacheInfoEntry[recentReadSize];
  587. memcpy(recentReads, from.recentReads, recentReadSize * sizeof(CacheInfoEntry));
  588. }
  589. ~CacheReportingBuffer()
  590. {
  591. delete [] recentReads;
  592. }
  593. void clear()
  594. {
  595. recentReadHead = 0;
  596. }
  597. void noteRead(unsigned fileIdx, offset_t pos, unsigned len, CacheInfoEntry::PageType pageType)
  598. {
  599. if (recentReads && len)
  600. {
  601. CacheInfoEntry start(fileIdx, pos, pageType);
  602. CacheInfoEntry end(fileIdx, pos+len-1, pageType);
  603. for(;start <= end; ++start)
  604. {
  605. recentReads[recentReadHead++ % recentReadSize] = start;
  606. }
  607. }
  608. }
  609. void sortAndDedup()
  610. {
  611. // NOTE: single-threaded
  612. unsigned sortSize;
  613. if (recentReadHead > recentReadSize)
  614. sortSize = recentReadSize;
  615. else
  616. sortSize = recentReadHead;
  617. std::sort(recentReads, recentReads + sortSize);
  618. CacheInfoEntry lastPos(-1,-1,CacheInfoEntry::PageTypeDisk);
  619. unsigned dest = 0;
  620. for (unsigned idx = 0; idx < sortSize; idx++)
  621. {
  622. CacheInfoEntry pos = recentReads[idx];
  623. if (pos.b.file != lastPos.b.file || pos.b.page != lastPos.b.page) // Ignore inNodeCache bit when deduping
  624. {
  625. recentReads[dest++] = pos;
  626. lastPos = pos;
  627. }
  628. }
  629. recentReadHead = dest;
  630. }
  631. void report(StringBuffer &ret, unsigned channel, const StringArray &cacheIndexes, const UnsignedShortArray &cacheIndexChannels)
  632. {
  633. // NOTE: single-threaded
  634. assertex(recentReadHead <= recentReadSize); // Should have sorted and deduped before calling this
  635. unsigned lastFileIdx = (unsigned) -1;
  636. offset_t lastPage = (offset_t) -1;
  637. offset_t startRange = 0;
  638. CacheInfoEntry::PageType lastPageType = CacheInfoEntry::PageTypeDisk;
  639. bool includeFile = false;
  640. for (unsigned idx = 0; idx < recentReadHead; idx++)
  641. {
  642. CacheInfoEntry pos = recentReads[idx];
  643. if (pos.b.file != lastFileIdx)
  644. {
  645. if (includeFile)
  646. appendRange(ret, startRange, lastPage, lastPageType).newline();
  647. lastFileIdx = pos.b.file;
  648. if (channel==(unsigned) -1 || cacheIndexChannels.item(lastFileIdx)==channel)
  649. {
  650. ret.appendf("%u|%s|", cacheIndexChannels.item(lastFileIdx), cacheIndexes.item(lastFileIdx));
  651. includeFile = true;
  652. }
  653. else
  654. includeFile = false;
  655. startRange = pos.b.page;
  656. }
  657. else if ((pos.b.page == lastPage || pos.b.page == lastPage+1) && pos.b.type == lastPageType)
  658. {
  659. // Still in current range
  660. }
  661. else
  662. {
  663. if (includeFile)
  664. appendRange(ret, startRange, lastPage, lastPageType);
  665. startRange = pos.b.page;
  666. }
  667. lastPage = pos.b.page;
  668. lastPageType = (CacheInfoEntry::PageType)pos.b.type;
  669. }
  670. if (includeFile)
  671. appendRange(ret, startRange, lastPage, lastPageType).newline();
  672. }
  673. virtual void noteWarm(unsigned fileIdx, offset_t pos, unsigned len, NodeType type) override
  674. {
  675. //For convenience the values for PageType match the NodeX enumeration.
  676. CacheInfoEntry::PageType pageType = (type <= NodeBlob) ? (CacheInfoEntry::PageType)type : CacheInfoEntry::PageTypeDisk;
  677. noteRead(fileIdx, pos, len, pageType);
  678. }
  679. private:
  680. static StringBuffer &appendRange(StringBuffer &ret, offset_t start, offset_t end, CacheInfoEntry::PageType pageType)
  681. {
  682. ret.append(' ');
  683. if (pageType != CacheInfoEntry::PageTypeDisk)
  684. ret.append('*').append("RLB"[pageType]);
  685. if (start==end)
  686. ret.appendf("%" I64F "x", start);
  687. else
  688. ret.appendf("%" I64F "x-%" I64F "x", start, end);
  689. return ret;
  690. }
  691. };
  692. class IndexCacheWarmer : implements ICacheWarmer
  693. {
  694. IRoxieFileCache *cache = nullptr;
  695. Owned<ILazyFileIO> localFile;
  696. Owned<IKeyIndex> keyIndex;
  697. bool keyFailed = false;
  698. unsigned fileIdx = (unsigned) -1;
  699. unsigned filesProcessed = 0;
  700. unsigned pagesPreloaded = 0;
  701. public:
  702. IndexCacheWarmer(IRoxieFileCache *_cache) : cache(_cache) {}
  703. virtual void startFile(const char *filename) override
  704. {
  705. // "filename" is the filename that roxie would use if it copied the file locally. This may not
  706. // match the name of the actual file - e.g. if the file is local but in a different location.
  707. localFile.setown(cache->lookupLocalFile(filename));
  708. if (localFile)
  709. {
  710. fileIdx = localFile->getFileIdx();
  711. }
  712. keyFailed = false;
  713. filesProcessed++;
  714. }
  715. virtual bool warmBlock(const char *filename, NodeType nodeType, offset_t startOffset, offset_t endOffset) override
  716. {
  717. if (nodeType != NodeNone && !keyFailed && localFile && !keyIndex)
  718. {
  719. //Pass false for isTLK - it will be initialised from the index header
  720. keyIndex.setown(createKeyIndex(filename, localFile->getCrc(), *localFile.get(), fileIdx, false));
  721. if (!keyIndex)
  722. keyFailed = true;
  723. }
  724. if (nodeType != NodeNone && keyIndex)
  725. {
  726. // Round startOffset up to nearest multiple of index node size
  727. unsigned nodeSize = keyIndex->getNodeSize();
  728. startOffset = ((startOffset+nodeSize-1)/nodeSize)*nodeSize;
  729. do
  730. {
  731. if (traceLevel > 8)
  732. DBGLOG("prewarming index page %u %s %" I64F "x-%" I64F "x", (int) nodeType, filename, startOffset, endOffset);
  733. bool loaded = keyIndex->prewarmPage(startOffset, nodeType);
  734. if (!loaded)
  735. break;
  736. pagesPreloaded++;
  737. startOffset += nodeSize;
  738. }
  739. while (startOffset < endOffset);
  740. }
  741. else if (fileIdx != (unsigned) -1)
  742. cache->noteRead(fileIdx, startOffset, (endOffset-1) - startOffset); // Ensure pages we prewarm are recorded in our cache tracker
  743. return true;
  744. }
  745. virtual void endFile() override
  746. {
  747. localFile.clear();
  748. keyIndex.clear();
  749. }
  750. virtual void report() override
  751. {
  752. if (traceLevel)
  753. DBGLOG("Processed %u files and preloaded %u index nodes", filesProcessed, pagesPreloaded);
  754. }
  755. };
  756. class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
  757. {
  758. friend class CcdFileTest;
  759. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  760. #ifdef _CONTAINERIZED
  761. mutable ICopyArrayOf<ILazyFileIO> buddyCopying;
  762. mutable bool buddyChecking = false;
  763. #endif
  764. bool reportedFilesToCopy = false;
  765. InterruptableSemaphore toCopy;
  766. InterruptableSemaphore toClose;
  767. InterruptableSemaphore cidtSleep;
  768. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  769. mutable CriticalSection crit;
  770. CriticalSection cpcrit;
  771. bool started;
  772. bool aborting;
  773. std::atomic<bool> closing;
  774. bool closePending[2];
  775. StringAttrMapping fileErrorList;
  776. bool cidtActive = false;
  777. Semaphore cidtStarted;
  778. Semaphore bctStarted;
  779. Semaphore hctStarted;
  780. // Read-tracking code for pre-warming OS caches
  781. StringArray cacheIndexes;
  782. UnsignedShortArray cacheIndexChannels;
  783. CacheReportingBuffer *activeCacheReportingBuffer = nullptr;
  784. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true)
  785. {
  786. // Ensure that SockFile does not keep these sockets open (or we will run out)
  787. class AutoDisconnector
  788. {
  789. public:
  790. AutoDisconnector(IFile *_f, bool isEnabled) { f = isEnabled ? _f : NULL; };
  791. ~AutoDisconnector() { if (f) disconnectRemoteFile(f); }
  792. private:
  793. IFile *f;
  794. } autoDisconnector(f, autoDisconnect);
  795. offset_t fileSize = f->size();
  796. if (fileSize != (offset_t) -1)
  797. {
  798. // only check size if specified
  799. if ( (size != (offset_t) -1) && !isCompressed && fileSize != size) // MORE - should be able to do better on compressed you'da thunk
  800. return FileSizeMismatch;
  801. // A temporary fix - files stored on azure don't have an accurate time stamp, so treat them as up to date.
  802. if (isUrl(f->queryFilename()))
  803. return FileIsValid;
  804. CDateTime mt;
  805. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  806. }
  807. else
  808. return FileNotFound;
  809. }
  810. int runCacheInfoDump()
  811. {
  812. cidtStarted.signal();
  813. if (traceLevel)
  814. DBGLOG("Cache info dump thread %p starting", this);
  815. try
  816. {
  817. for (;;)
  818. {
  819. cidtSleep.wait(cacheReportPeriodSeconds * 1000);
  820. if (closing)
  821. break;
  822. if (traceLevel>8)
  823. DBGLOG("Cache info dump");
  824. // Note - cache info is stored in the DLLSERVER persistent area - which we should perhaps consider renaming
  825. const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
  826. assertex(dllserver_root != nullptr);
  827. Owned<const ITopologyServer> topology = getTopology();
  828. Owned<CacheReportingBuffer> tempCacheReportingBuffer = new CacheReportingBuffer(*activeCacheReportingBuffer);
  829. getNodeCacheInfo(*tempCacheReportingBuffer);
  830. tempCacheReportingBuffer->sortAndDedup();
  831. StringBuffer ret;
  832. tempCacheReportingBuffer->report(ret, 0, cacheIndexes, cacheIndexChannels);
  833. if (ret.length())
  834. {
  835. // NOTE - this location is shared with other nodes - who may also be writing
  836. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), 0);
  837. atomicWriteFile(cacheFileName, ret);
  838. if (traceLevel > 8)
  839. DBGLOG("Channel 0 cache info:\n%s", ret.str());
  840. }
  841. for (unsigned channel : topology->queryChannels())
  842. {
  843. tempCacheReportingBuffer->report(ret.clear(), channel, cacheIndexes, cacheIndexChannels);
  844. if (ret.length())
  845. {
  846. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), channel);
  847. atomicWriteFile(cacheFileName, ret);
  848. if (traceLevel > 8)
  849. DBGLOG("Channel %u cache info:\n%s", channel, ret.str());
  850. }
  851. }
  852. // We could at this point put deduped back into active
  853. }
  854. }
  855. catch (IException *E)
  856. {
  857. // Any exceptions terminate the thread - probably a better option than flooding the log
  858. if (!aborting)
  859. EXCLOG(MCoperatorError, E, "Cache info dumper: ");
  860. E->Release();
  861. }
  862. catch (...)
  863. {
  864. IERRLOG("Unknown exception in cache info dump thread");
  865. }
  866. if (traceLevel)
  867. DBGLOG("Cache info dump thread %p exiting", this);
  868. return 0;
  869. }
  870. unsigned trackCache(const char *filename, unsigned channel)
  871. {
  872. // NOTE - called from openFile, with crit already held
  873. if (!activeCacheReportingBuffer)
  874. return (unsigned) -1;
  875. cacheIndexes.append(filename);
  876. cacheIndexChannels.append(channel);
  877. return cacheIndexes.length()-1;
  878. }
  879. virtual void noteRead(unsigned fileIdx, offset_t pos, unsigned len) override
  880. {
  881. if (activeCacheReportingBuffer)
  882. activeCacheReportingBuffer->noteRead(fileIdx, pos, len, CacheInfoEntry::PageTypeDisk);
  883. }
  884. ILazyFileIO *openFile(const char *lfn, unsigned partNo, unsigned channel, const char *localLocation,
  885. IPartDescriptor *pdesc,
  886. const StringArray &remoteLocationInfo,
  887. offset_t size, const CDateTime &modified)
  888. {
  889. Owned<IFile> local = createIFile(localLocation);
  890. if (traceRemoteFiles)
  891. DBGLOG("openFile adding file %s (localLocation %s)", lfn, localLocation);
  892. bool isCompressed = selfTestMode ? false : pdesc->queryOwner().isCompressed();
  893. unsigned crc = 0;
  894. if (!selfTestMode)
  895. pdesc->getCrc(crc);
  896. Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed, crc);
  897. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, isCompressed);
  898. if (fileStatus == FileIsValid)
  899. {
  900. ret->addSource(local.getLink());
  901. ret->setRemote(false);
  902. }
  903. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  904. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  905. else
  906. {
  907. bool addedOne = false;
  908. #ifndef _CONTAINERIZED
  909. // put the peerRoxieLocations next in the list
  910. StringArray localLocations;
  911. if (selfTestMode)
  912. localLocations.append("test.buddy");
  913. else
  914. appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true); // Adds all locations on the same cluster
  915. ForEachItemIn(roxie_idx, localLocations)
  916. {
  917. try
  918. {
  919. const char *remoteName = localLocations.item(roxie_idx);
  920. Owned<IFile> remote = createIFile(remoteName);
  921. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  922. if (status==FileIsValid)
  923. {
  924. if (miscDebugTraceLevel > 5)
  925. DBGLOG("adding peer location %s", remoteName);
  926. ret->addSource(remote.getClear());
  927. addedOne = true;
  928. }
  929. else if (status==FileNotFound)
  930. {
  931. // Even though it's not on the buddy (yet), add it to the locations since it may well be there
  932. // by the time we come to copy (and if it is, we want to copy from there)
  933. if (miscDebugTraceLevel > 5)
  934. DBGLOG("adding missing peer location %s", remoteName);
  935. ret->addSource(remote.getClear());
  936. // Don't set addedOne - we need to go to remote too
  937. }
  938. else if (miscDebugTraceLevel > 10)
  939. DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
  940. }
  941. catch (IException *E)
  942. {
  943. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  944. E->Release();
  945. }
  946. ret->setRemote(true);
  947. }
  948. #endif
  949. if (!addedOne && (copyResources || useRemoteResources || selfTestMode)) // If no peer locations available, go to remote
  950. {
  951. ForEachItemIn(idx, remoteLocationInfo)
  952. {
  953. try
  954. {
  955. const char *remoteName = remoteLocationInfo.item(idx);
  956. Owned<IFile> remote = createIFile(remoteName);
  957. if (traceLevel > 5)
  958. DBGLOG("checking remote location %s", remoteName);
  959. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  960. if (status==FileIsValid)
  961. {
  962. if (miscDebugTraceLevel > 5)
  963. DBGLOG("adding remote location %s", remoteName);
  964. RemoteFilename rfn;
  965. rfn.setRemotePath(remoteName);
  966. #ifndef _CONTAINERIZED
  967. // MORE - may want to change this to mark some other locations as "local enough"
  968. if (!rfn.isLocal()) // MORE - may still want to copy files even if they are on a posix-accessible path, for local caching? Probably really want to know if hooked or not...
  969. #endif
  970. ret->setRemote(true);
  971. ret->addSource(remote.getClear());
  972. addedOne = true;
  973. }
  974. else if (miscDebugTraceLevel > 10)
  975. DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
  976. }
  977. catch (IException *E)
  978. {
  979. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  980. E->Release();
  981. }
  982. }
  983. }
  984. if (!addedOne)
  985. {
  986. if (local->exists()) // Implies local dali and local file out of sync
  987. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  988. else
  989. {
  990. if (traceLevel >= 2)
  991. {
  992. #ifndef _CONTAINERIZED
  993. DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
  994. ForEachItemIn(local_idx, localLocations)
  995. {
  996. DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
  997. }
  998. DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
  999. #else
  1000. DBGLOG("Failed to open file at any of the following %d remote locations:", remoteLocationInfo.length());
  1001. #endif
  1002. ForEachItemIn(remote_idx, remoteLocationInfo)
  1003. {
  1004. DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
  1005. }
  1006. }
  1007. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  1008. }
  1009. }
  1010. }
  1011. ret->setCache(this, trackCache(local->queryFilename(), channel));
  1012. files.setValue(local->queryFilename(), (ILazyFileIO *)ret);
  1013. return ret.getClear();
  1014. }
  1015. static void deleteTempFiles(const char *targetFilename)
  1016. {
  1017. try
  1018. {
  1019. StringBuffer destPath;
  1020. StringBuffer prevTempFile;
  1021. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  1022. prevTempFile.append("*.$$$");
  1023. Owned<IDirectoryIterator> iter = createDirectoryIterator(destPath, prevTempFile, false, false);
  1024. ForEach(*iter)
  1025. {
  1026. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  1027. if (thisFile->isFile() == fileBool::foundYes)
  1028. thisFile->remove();
  1029. }
  1030. }
  1031. catch(IException *E)
  1032. {
  1033. StringBuffer err;
  1034. OERRLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  1035. E->Release();
  1036. }
  1037. catch(...)
  1038. {
  1039. }
  1040. }
  1041. static bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
  1042. {
  1043. bool fileCopied = false;
  1044. IFile *sourceFile;
  1045. try
  1046. {
  1047. f->setCopying(true);
  1048. sourceFile = f->querySource();
  1049. }
  1050. catch (IException *E)
  1051. {
  1052. f->setCopying(false);
  1053. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  1054. throw;
  1055. }
  1056. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  1057. deleteTempFiles(targetFilename);
  1058. offset_t fileSize = sourceFile->size();
  1059. if ( (fileSize + minFreeDiskSpace) > freeDiskSpace)
  1060. {
  1061. StringBuffer err;
  1062. err.appendf("Insufficient disk space. File %s needs %" I64F "d bytes, but only %" I64F "d remains, and %" I64F "d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  1063. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  1064. EXCLOG(MCoperatorError, E);
  1065. E->Release();
  1066. f->setCopying(false);
  1067. }
  1068. else
  1069. {
  1070. Owned<IFile> destFile = createIFile(tempFile);
  1071. bool hardLinkCreated = false;
  1072. unsigned start = msTick();
  1073. #ifdef _DEBUG
  1074. if (topology && topology->getPropBool("@simulateSlowCopies")) // topology is null when running unit tests
  1075. {
  1076. DBGLOG("Simulating a slow copy");
  1077. Sleep(10*1000);
  1078. }
  1079. #endif
  1080. try
  1081. {
  1082. if (useHardLink)
  1083. hardLinkCreated = f->createHardFileLink();
  1084. if (hardLinkCreated)
  1085. msg = "Hard Link";
  1086. else
  1087. {
  1088. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  1089. if (traceLevel > 5)
  1090. {
  1091. StringBuffer str;
  1092. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  1093. TimeSection timing(str.str());
  1094. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  1095. }
  1096. else
  1097. {
  1098. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  1099. }
  1100. }
  1101. f->setCopying(false);
  1102. fileCopied = true;
  1103. }
  1104. catch(IException *E)
  1105. {
  1106. f->setCopying(false);
  1107. EXCLOG(E, "Copy exception - remove templocal");
  1108. destFile->remove();
  1109. deleteTempFiles(targetFilename);
  1110. throw;
  1111. }
  1112. catch(...)
  1113. {
  1114. f->setCopying(false);
  1115. IERRLOG("%s exception - remove templocal", msg);
  1116. destFile->remove();
  1117. deleteTempFiles(targetFilename);
  1118. throw;
  1119. }
  1120. if (!hardLinkCreated) // for hardlinks no rename needed
  1121. {
  1122. try
  1123. {
  1124. destFile->rename(targetFilename);
  1125. }
  1126. catch(IException *)
  1127. {
  1128. f->setCopying(false);
  1129. deleteTempFiles(targetFilename);
  1130. throw;
  1131. }
  1132. unsigned elapsed = msTick() - start;
  1133. double sizeMB = ((double) fileSize) / (1024*1024);
  1134. double MBperSec = elapsed ? (sizeMB / elapsed) * 1000 : 0;
  1135. DBGLOG("%s to %s complete in %d ms (%.1f MB/sec)", msg, targetFilename, elapsed, MBperSec);
  1136. }
  1137. f->copyComplete();
  1138. }
  1139. deleteTempFiles(targetFilename);
  1140. return fileCopied;
  1141. }
  1142. static bool doCopy(ILazyFileIO *f, bool background, CFflags copyFlags=CFnone)
  1143. {
  1144. if (!f->isRemote())
  1145. f->copyComplete();
  1146. else
  1147. {
  1148. const char *targetFilename = f->queryTarget()->queryFilename();
  1149. StringBuffer tempFile(targetFilename);
  1150. StringBuffer destPath;
  1151. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  1152. if (destPath.length())
  1153. recursiveCreateDirectory(destPath.str());
  1154. else
  1155. destPath.append('.');
  1156. if (!checkDirExists(destPath.str())) {
  1157. OERRLOG("Dest directory %s does not exist", destPath.str());
  1158. return false;
  1159. }
  1160. tempFile.append(".$$$");
  1161. const char *msg = background ? "Background copy" : "Copy";
  1162. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
  1163. }
  1164. return false; // if we get here there was no file copied
  1165. }
  1166. public:
  1167. IMPLEMENT_IINTERFACE;
  1168. CRoxieFileCache() :
  1169. cidt(*this),
  1170. bct(*this), hct(*this)
  1171. {
  1172. aborting = false;
  1173. closing = false;
  1174. closePending[false] = false;
  1175. closePending[true] = false;
  1176. started = false;
  1177. if (!selfTestMode && !allFilesDynamic)
  1178. {
  1179. Owned<IPropertyTree> compConfig = getComponentConfig();
  1180. offset_t cacheTrackSize = compConfig->getPropInt64("@cacheTrackSize", (offset_t) -1);
  1181. if (cacheTrackSize == (offset_t) -1)
  1182. {
  1183. const char *memLimit = compConfig->queryProp("resources/limits/@memory");
  1184. if (!memLimit)
  1185. memLimit = compConfig->queryProp("resources/requests/@memory");
  1186. if (memLimit)
  1187. {
  1188. try
  1189. {
  1190. cacheTrackSize = friendlyStringToSize(memLimit);
  1191. }
  1192. catch (IException *E)
  1193. {
  1194. EXCLOG(E);
  1195. E->Release();
  1196. cacheTrackSize = 0;
  1197. }
  1198. }
  1199. else
  1200. cacheTrackSize = 0x10000 * (1<<CacheInfoEntry::pageBits);
  1201. }
  1202. if (cacheTrackSize)
  1203. activeCacheReportingBuffer = new CacheReportingBuffer(cacheTrackSize);
  1204. }
  1205. }
  1206. ~CRoxieFileCache()
  1207. {
  1208. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  1209. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  1210. HashIterator h(files);
  1211. ForEach(h)
  1212. {
  1213. ILazyFileIO *f = files.mapToValue(&h.query());
  1214. f->removeCache(this);
  1215. }
  1216. delete activeCacheReportingBuffer;
  1217. }
  1218. virtual void start()
  1219. {
  1220. if (!started)
  1221. {
  1222. bct.start();
  1223. hct.start();
  1224. bctStarted.wait();
  1225. hctStarted.wait();
  1226. }
  1227. started = true;
  1228. }
  1229. virtual void startCacheReporter() override
  1230. {
  1231. #ifndef _CONTAINERIZED
  1232. if (!getenv("HPCC_DLLSERVER_PATH"))
  1233. return;
  1234. #endif
  1235. if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1236. {
  1237. cidt.start();
  1238. cidtStarted.wait();
  1239. cidtActive = true;
  1240. }
  1241. }
  1242. class CacheInfoDumpThread : public Thread
  1243. {
  1244. CRoxieFileCache &owner;
  1245. public:
  1246. CacheInfoDumpThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-CacheInfoDumpThread"), owner(_owner) {}
  1247. virtual int run()
  1248. {
  1249. return owner.runCacheInfoDump();
  1250. }
  1251. } cidt;
  1252. class BackgroundCopyThread : public Thread
  1253. {
  1254. CRoxieFileCache &owner;
  1255. public:
  1256. BackgroundCopyThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-BackgroundCopyThread"), owner(_owner) {}
  1257. virtual int run()
  1258. {
  1259. return owner.runBackgroundCopy();
  1260. }
  1261. } bct;
  1262. class HandleCloserThread : public Thread
  1263. {
  1264. CRoxieFileCache &owner;
  1265. public:
  1266. HandleCloserThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-HandleCloserThread"), owner(_owner) {}
  1267. virtual int run()
  1268. {
  1269. return owner.runHandleCloser();
  1270. }
  1271. } hct;
  1272. int runBackgroundCopy()
  1273. {
  1274. bctStarted.signal();
  1275. #if defined(__linux__) && defined(SYS_ioprio_set)
  1276. if (backgroundCopyClass)
  1277. syscall(SYS_ioprio_set, IOPRIO_WHO_PROCESS, 0, IOPRIO_PRIO_VALUE(backgroundCopyClass, backgroundCopyPrio));
  1278. #endif
  1279. if (traceLevel)
  1280. {
  1281. #if defined(__linux__) && defined(SYS_ioprio_get)
  1282. int ioprio = syscall(SYS_ioprio_get, IOPRIO_WHO_PROCESS, 0);
  1283. int ioclass = IOPRIO_PRIO_CLASS(ioprio);
  1284. ioprio = IOPRIO_PRIO_DATA(ioprio);
  1285. DBGLOG("Background copy thread %p starting, io priority class %d, priority %d", this, ioclass, ioprio);
  1286. #else
  1287. DBGLOG("Background copy thread %p starting", this);
  1288. #endif
  1289. }
  1290. try
  1291. {
  1292. for (;;)
  1293. {
  1294. Linked<ILazyFileIO> next;
  1295. toCopy.wait();
  1296. {
  1297. CriticalBlock b(crit);
  1298. if (closing)
  1299. break;
  1300. if (todo.ordinality())
  1301. {
  1302. ILazyFileIO *popped = &todo.popGet();
  1303. if (popped->isAliveAndLink())
  1304. {
  1305. next.setown(popped);
  1306. }
  1307. numFilesToProcess--; // must decrement counter for SNMP accuracy
  1308. }
  1309. }
  1310. if (next)
  1311. {
  1312. try
  1313. {
  1314. doCopy(next, true, CFflush_rdwr);
  1315. }
  1316. catch (IException *E)
  1317. {
  1318. if (aborting)
  1319. throw;
  1320. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  1321. E->Release();
  1322. }
  1323. catch (...)
  1324. {
  1325. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  1326. }
  1327. }
  1328. CriticalBlock b(crit);
  1329. if (todo.ordinality()==0 && reportedFilesToCopy)
  1330. {
  1331. #ifdef _CONTAINERIZED
  1332. DBGLOG("No more data files for this node to copy");
  1333. if (!buddyCopying.length() && !buddyChecking)
  1334. #endif
  1335. {
  1336. DBGLOG("No more data files to copy");
  1337. reportedFilesToCopy = false;
  1338. }
  1339. }
  1340. }
  1341. }
  1342. catch (IException *E)
  1343. {
  1344. if (!aborting)
  1345. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  1346. E->Release();
  1347. }
  1348. catch (...)
  1349. {
  1350. IERRLOG("Unknown exception in background copy thread");
  1351. }
  1352. if (traceLevel)
  1353. DBGLOG("Background copy thread %p exiting", this);
  1354. return 0;
  1355. }
  1356. int runHandleCloser()
  1357. {
  1358. hctStarted.signal();
  1359. if (traceLevel)
  1360. DBGLOG("HandleCloser thread %p starting", this);
  1361. try
  1362. {
  1363. unsigned lastCloseCheck = msTick();
  1364. for (;;)
  1365. {
  1366. #ifdef _CONTAINERIZED
  1367. unsigned checkPeriod = topology->getPropInt("@copyCheckPeriod", 60);
  1368. #else
  1369. unsigned checkPeriod = 10*60; // check expired file handles every 10 minutes, buddyCopying a little more often
  1370. #endif
  1371. toClose.wait(checkPeriod * 1000);
  1372. if (closing)
  1373. break;
  1374. #ifdef _CONTAINERIZED
  1375. // Periodically recheck the list to see what is now local, and remove them from the buddyCopying list
  1376. ICopyArrayOf<ILazyFileIO> checkBuddies;
  1377. {
  1378. CriticalBlock b(crit);
  1379. if (buddyCopying.length())
  1380. {
  1381. buddyCopying.swapWith(checkBuddies);
  1382. buddyChecking = true;
  1383. }
  1384. }
  1385. if (checkBuddies.length())
  1386. {
  1387. ForEachItemIn(idx, checkBuddies)
  1388. {
  1389. ILazyFileIO &check = checkBuddies.item(idx);
  1390. if (traceRemoteFiles)
  1391. DBGLOG("Checking whether someone has copied file %s for me", check.queryFilename());
  1392. if (check.isRemote())
  1393. {
  1394. if (traceRemoteFiles)
  1395. check.dump();
  1396. if (!check.checkCopyComplete()) // Recheck whether there is a local file we can open
  1397. {
  1398. CriticalBlock b1(crit);
  1399. buddyCopying.append(check);
  1400. }
  1401. }
  1402. }
  1403. CriticalBlock b2(crit);
  1404. buddyChecking = false;
  1405. if (buddyCopying.length()==0)
  1406. {
  1407. DBGLOG("No more data files being copied by other nodes");
  1408. if (todo.ordinality()==0 && reportedFilesToCopy)
  1409. {
  1410. DBGLOG("No more data files to copy");
  1411. reportedFilesToCopy = false;
  1412. }
  1413. }
  1414. }
  1415. #endif
  1416. unsigned elapsed = msTick()-lastCloseCheck;
  1417. if (elapsed >= 10*60*1000)
  1418. {
  1419. doCloseExpired(true);
  1420. doCloseExpired(false);
  1421. lastCloseCheck = msTick();
  1422. }
  1423. }
  1424. }
  1425. catch (IException *E)
  1426. {
  1427. if (!aborting)
  1428. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  1429. E->Release();
  1430. }
  1431. catch (...)
  1432. {
  1433. IERRLOG("Unknown exception in handle closer thread");
  1434. }
  1435. if (traceLevel)
  1436. DBGLOG("Handle closer thread %p exiting", this);
  1437. return 0;
  1438. }
  1439. virtual void join(unsigned timeout=INFINITE)
  1440. {
  1441. aborting = true;
  1442. if (started)
  1443. {
  1444. toCopy.interrupt();
  1445. toClose.interrupt();
  1446. bct.join(timeout);
  1447. hct.join(timeout);
  1448. }
  1449. #ifdef _CONTAINERIZED
  1450. if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1451. {
  1452. cidtSleep.interrupt();
  1453. cidt.join(timeout);
  1454. }
  1455. #endif
  1456. }
  1457. virtual void wait()
  1458. {
  1459. closing = true;
  1460. if (started)
  1461. {
  1462. toCopy.signal();
  1463. toClose.signal();
  1464. bct.join();
  1465. hct.join();
  1466. }
  1467. #ifdef _CONTAINERIZED
  1468. if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
  1469. {
  1470. cidtSleep.signal();
  1471. cidt.join();
  1472. }
  1473. #endif
  1474. }
  1475. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  1476. {
  1477. return aborting ? CFPcancel : CFPcontinue;
  1478. }
  1479. virtual void removeCache(ILazyFileIO *file) const
  1480. {
  1481. CriticalBlock b(crit);
  1482. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1483. // So only remove from hash table if what we find there matches the item that is being deleted.
  1484. const char *filename = file->queryFilename();
  1485. ILazyFileIO *goer = files.getValue(filename);
  1486. if (goer == file)
  1487. files.remove(filename);
  1488. ForEachItemInRev(idx, todo)
  1489. {
  1490. if (file == &todo.item(idx))
  1491. {
  1492. todo.remove(idx);
  1493. numFilesToProcess--; // must decrement counter for SNMP accuracy
  1494. }
  1495. }
  1496. }
  1497. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  1498. IPartDescriptor *pdesc, unsigned numParts, unsigned channel,
  1499. const StringArray &deployedLocationInfo, bool startFileCopy)
  1500. {
  1501. unsigned replicationLevel = getReplicationLevel(channel);
  1502. IPropertyTree &partProps = pdesc->queryProperties();
  1503. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  1504. bool local = partProps.getPropBool("@local");
  1505. CDateTime dfsDate;
  1506. if (checkFileDate)
  1507. {
  1508. const char *dateStr = partProps.queryProp("@modified");
  1509. dfsDate.setString(dateStr);
  1510. }
  1511. unsigned partNo = pdesc->queryPartIndex() + 1;
  1512. StringBuffer localLocation;
  1513. if (local)
  1514. {
  1515. assertex(partNo==1 && numParts==1);
  1516. localLocation.append(lfn); // any resolution done earlier
  1517. }
  1518. else
  1519. {
  1520. // MORE - not at all sure about this. Foreign files should stay foreign ?
  1521. CDfsLogicalFileName dlfn;
  1522. dlfn.set(lfn);
  1523. if (dlfn.isForeign())
  1524. dlfn.clearForeign();
  1525. #ifdef _CONTAINERIZED
  1526. const char *defaultDir = defaultPlaneDirPrefix;
  1527. #else
  1528. const char *defaultDir = nullptr;
  1529. #endif
  1530. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault, defaultDir);
  1531. }
  1532. Owned<ILazyFileIO> ret;
  1533. try
  1534. {
  1535. CLeavableCriticalBlock b(crit);
  1536. ILazyFileIO * match = files.getValue(localLocation);
  1537. if (match && match->isAliveAndLink())
  1538. {
  1539. Owned<ILazyFileIO> f = match;
  1540. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1541. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1542. {
  1543. releaseAgentDynamicFileCache(); // Agent dynamic file cache or...
  1544. if (fileType == ROXIE_KEY) // ...jhtree cache can keep files active and thus prevent us from loading a new version
  1545. clearKeyStoreCacheEntry(f); // Will release iff that is the only link
  1546. f.clear(); // Note - needs to be done before calling getValue() again, hence the need to make it separate from the f.set below
  1547. f.set(files.getValue(localLocation));
  1548. if (f) // May have been cleared above...
  1549. {
  1550. StringBuffer modifiedDt;
  1551. if (!dfsDate.isNull())
  1552. dfsDate.getString(modifiedDt);
  1553. StringBuffer fileDt;
  1554. f->queryDateTime()->getString(fileDt);
  1555. if (fileErrorList.find(lfn) == 0)
  1556. {
  1557. switch (fileType)
  1558. {
  1559. case ROXIE_KEY:
  1560. fileErrorList.setValue(lfn, "Key");
  1561. break;
  1562. case ROXIE_FILE:
  1563. fileErrorList.setValue(lfn, "File");
  1564. break;
  1565. }
  1566. }
  1567. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %" I64F "d %" I64F "d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1568. }
  1569. }
  1570. else
  1571. return f.getClear();
  1572. }
  1573. ret.setown(openFile(lfn, partNo, channel, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
  1574. if (startFileCopy)
  1575. {
  1576. if (ret->isRemote())
  1577. {
  1578. if (copyResources) // MORE - should always copy peer files
  1579. {
  1580. #ifdef _CONTAINERIZED
  1581. // In containerized mode, Roxie file copies are restricted to have only one node do the copying (first node on a channel,
  1582. // random node for single-part). But any node that has
  1583. // (a) files being read remotely and
  1584. // (b) no files to copy and
  1585. // (c) a small delay will go through all remote files and check if they are now available locally
  1586. // There is an assumption that a "pull" roxie does not have replicas that we don't know about
  1587. // - more than one "pull" roxie copying to the same plane at the same time
  1588. // - replicas=1 should be set on the "pull" roxie (we may be able to relax that using info from toposerver)
  1589. // - can't use localAgent mode on a "pull" roxie
  1590. bool iShouldCopy = (replicationLevel==0);
  1591. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1592. {
  1593. // We distribute the responsibility for copying the TLK/single-part files
  1594. unsigned whoShouldCopy = (rtlHash32VStr(lfn, HASH32_INIT) % numChannels) + 1;
  1595. if (whoShouldCopy != myChannel)
  1596. iShouldCopy = false;
  1597. }
  1598. if (!reportedFilesToCopy)
  1599. DBGLOG("Received files to copy");
  1600. reportedFilesToCopy = true;
  1601. if (iShouldCopy)
  1602. {
  1603. todo.append(*ret);
  1604. numFilesToProcess++; // must increment counter for SNMP accuracy
  1605. toCopy.signal();
  1606. }
  1607. else
  1608. {
  1609. if (traceRemoteFiles)
  1610. DBGLOG("Add file %s to buddyCopying list", ret->queryFilename());
  1611. buddyCopying.append(*ret); // We expect someone else to copy it for us
  1612. }
  1613. #else
  1614. // Single-part files and top-level keys are copied immediately rather than being read remotely while background copying
  1615. // This is to avoid huge contention on the source dafilesrv if the Roxie is live.
  1616. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1617. {
  1618. b.leave();
  1619. ret->checkOpen();
  1620. doCopy(ret, false, CFflush_rdwr);
  1621. return ret.getLink();
  1622. }
  1623. // Copies are popped from end of the todo list
  1624. // By putting the replicates on the front we ensure they are done after the primaries
  1625. // and are therefore likely to result in local rather than remote copies.
  1626. if (!reportedFilesToCopy)
  1627. DBGLOG("Received files to copy");
  1628. reportedFilesToCopy = true;
  1629. if (replicationLevel)
  1630. todo.add(*ret, 0);
  1631. else
  1632. todo.append(*ret);
  1633. numFilesToProcess++; // must increment counter for SNMP accuracy
  1634. toCopy.signal();
  1635. #endif
  1636. }
  1637. }
  1638. }
  1639. if (!lazyOpen)
  1640. ret->checkOpen();
  1641. }
  1642. catch(IException *e)
  1643. {
  1644. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1645. {
  1646. if (fileErrorList.find(lfn) == 0)
  1647. {
  1648. switch (fileType)
  1649. {
  1650. case ROXIE_KEY:
  1651. fileErrorList.setValue(lfn, "Key");
  1652. break;
  1653. case ROXIE_FILE:
  1654. fileErrorList.setValue(lfn, "File");
  1655. break;
  1656. }
  1657. }
  1658. }
  1659. throw;
  1660. }
  1661. return ret.getLink();
  1662. }
  1663. virtual ILazyFileIO *lookupLocalFile(const char *filename)
  1664. {
  1665. try
  1666. {
  1667. CriticalBlock b(crit);
  1668. ILazyFileIO * match = files.getValue(filename);
  1669. if (match && match->isAliveAndLink())
  1670. return match;
  1671. }
  1672. catch(IException *e)
  1673. {
  1674. e->Release();
  1675. }
  1676. return nullptr;
  1677. }
  1678. virtual void closeExpired(bool remote)
  1679. {
  1680. // This schedules a close at the next available opportunity
  1681. CriticalBlock b(cpcrit); // paranoid...
  1682. if (!closePending[remote])
  1683. {
  1684. closePending[remote] = true;
  1685. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) numFilesOpen[remote]);
  1686. toClose.signal();
  1687. }
  1688. }
  1689. static unsigned __int64 readPage(const char * &_t)
  1690. {
  1691. const char *t = _t;
  1692. unsigned __int64 v = 0;
  1693. for (;;)
  1694. {
  1695. char c = *t;
  1696. if ((c >= '0') && (c <= '9'))
  1697. v = v * 16 + (c-'0');
  1698. else if ((c >= 'a') && (c <= 'f'))
  1699. v = v * 16 + (c-'a'+10);
  1700. else if ((c >= 'A') && (c <= 'F'))
  1701. v = v * 16 + (c-'A'+10);
  1702. else
  1703. break;
  1704. t++;
  1705. }
  1706. _t = t;
  1707. return v;
  1708. }
  1709. virtual void loadSavedOsCacheInfo() override
  1710. {
  1711. if (!topology->getPropBool("@warmOsCache", true))
  1712. return;
  1713. Owned<const ITopologyServer> topology = getTopology();
  1714. for (unsigned channel : topology->queryChannels())
  1715. doLoadSavedOsCacheInfo(channel);
  1716. doLoadSavedOsCacheInfo(0); // MORE - maybe only if I am also a server?
  1717. }
  1718. void doLoadSavedOsCacheInfo(unsigned channel)
  1719. {
  1720. const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
  1721. #ifdef _CONTAINERIZED
  1722. assertex(dllserver_root != nullptr);
  1723. #else
  1724. //Default behaviour is to not load or saving anything on bare metal
  1725. if (!dllserver_root)
  1726. return;
  1727. #endif
  1728. unsigned cacheWarmTraceLevel = topology->getPropInt("@cacheWarmTraceLevel", traceLevel);
  1729. VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), channel);
  1730. StringBuffer cacheInfo;
  1731. try
  1732. {
  1733. if (checkFileExists(cacheFileName))
  1734. {
  1735. #ifndef _WIN32
  1736. StringBuffer output;
  1737. VStringBuffer command("ccdcache %s -t %u", cacheFileName.str(), cacheWarmTraceLevel);
  1738. unsigned retcode = runExternalCommand(nullptr, output, output, command, nullptr);
  1739. if (output.length())
  1740. {
  1741. StringArray outputLines;
  1742. outputLines.appendList(output, "\n");
  1743. ForEachItemIn(idx, outputLines)
  1744. {
  1745. const char *line = outputLines.item(idx);
  1746. if (line && *line)
  1747. DBGLOG("ccdcache: %s", line);
  1748. }
  1749. }
  1750. if (retcode)
  1751. DBGLOG("ccdcache failed with exit code %u", retcode);
  1752. #endif
  1753. cacheInfo.loadFile(cacheFileName, false);
  1754. if (traceLevel)
  1755. DBGLOG("Loading cache information from %s for channel %d", cacheFileName.str(), channel);
  1756. warmOsCache(cacheInfo);
  1757. }
  1758. }
  1759. catch(IException *E)
  1760. {
  1761. EXCLOG(E);
  1762. E->Release();
  1763. }
  1764. }
  1765. virtual void warmOsCache(const char *cacheInfo) override
  1766. {
  1767. if (!cacheInfo)
  1768. return;
  1769. IndexCacheWarmer warmer(this);
  1770. if (!::warmOsCache(cacheInfo, &warmer))
  1771. DBGLOG("WARNING: Unrecognized cacheInfo format");
  1772. warmer.report();
  1773. }
  1774. virtual void clearOsCache() override
  1775. {
  1776. if (activeCacheReportingBuffer)
  1777. activeCacheReportingBuffer->clear();
  1778. }
  1779. virtual void reportOsCache(StringBuffer &ret, unsigned channel) const override
  1780. {
  1781. if (activeCacheReportingBuffer)
  1782. {
  1783. Owned<CacheReportingBuffer> temp = new CacheReportingBuffer(*activeCacheReportingBuffer);
  1784. getNodeCacheInfo(*temp);
  1785. temp->sortAndDedup();
  1786. temp->report(ret, channel, cacheIndexes, cacheIndexChannels);
  1787. // We could at this point put deduped back into active
  1788. }
  1789. }
  1790. void doCloseExpired(bool remote)
  1791. {
  1792. {
  1793. CriticalBlock b(cpcrit); // paranoid...
  1794. closePending[remote] = false;
  1795. }
  1796. IArrayOf<ILazyFileIO> goers;
  1797. {
  1798. CriticalBlock b(crit);
  1799. HashIterator h(files);
  1800. ForEach(h)
  1801. {
  1802. ILazyFileIO * match = files.mapToValue(&h.query());
  1803. if (match->isAliveAndLink())
  1804. {
  1805. Owned<ILazyFileIO> f = match;
  1806. if (f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1807. {
  1808. unsigned age = msTick() - f->getLastAccessed();
  1809. if (age > maxFileAge[remote])
  1810. {
  1811. if (traceLevel > 5)
  1812. {
  1813. // NOTE - querySource will cause the file to be opened if not already open
  1814. // That's OK here, since we know the file is open and remote.
  1815. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1816. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1817. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1818. }
  1819. f->close();
  1820. }
  1821. else
  1822. goers.append(*f.getClear());
  1823. }
  1824. }
  1825. }
  1826. }
  1827. unsigned numFilesLeft = goers.ordinality();
  1828. if (numFilesLeft > maxFilesOpen[remote])
  1829. {
  1830. goers.sort(CRoxieLazyFileIO::compareAccess);
  1831. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1832. unsigned idx = minFilesOpen[remote];
  1833. while (idx < numFilesLeft)
  1834. {
  1835. ILazyFileIO &f = goers.item(idx++);
  1836. if (!f.isCopying())
  1837. {
  1838. if (traceLevel > 5)
  1839. {
  1840. unsigned age = msTick() - f.getLastAccessed();
  1841. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1842. }
  1843. f.close();
  1844. }
  1845. }
  1846. }
  1847. }
  1848. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1849. {
  1850. Owned<IFile> dirf = createIFile(directory);
  1851. if (dirf->exists() && dirf->isDirectory()==fileBool::foundYes)
  1852. {
  1853. try
  1854. {
  1855. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1856. ForEach(*iter)
  1857. {
  1858. const char *thisName = iter->query().queryFilename();
  1859. flushUnusedDirectories(origBaseDir, thisName, xml);
  1860. }
  1861. if (stricmp(origBaseDir, directory) != 0)
  1862. {
  1863. try
  1864. {
  1865. dirf->remove();
  1866. xml.appendf("<Directory>%s</Directory>\n", directory);
  1867. DBGLOG("Deleted directory %s", directory);
  1868. }
  1869. catch (IException *e)
  1870. {
  1871. // don't care if we can't delete the directory
  1872. e->Release();
  1873. }
  1874. catch(...)
  1875. {
  1876. // don't care if we can't delete the directory
  1877. }
  1878. }
  1879. }
  1880. catch (IException *e)
  1881. {
  1882. // don't care if we can't delete the directory
  1883. e->Release();
  1884. }
  1885. catch(...)
  1886. {
  1887. // don't care if we can't delete the directory
  1888. }
  1889. }
  1890. }
  1891. int numFilesToCopy()
  1892. {
  1893. CriticalBlock b(crit);
  1894. return todo.ordinality();
  1895. }
  1896. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1897. static inline bool validFNameChar(char c)
  1898. {
  1899. static const char *invalids = "*\"/:<>?\\|";
  1900. return (c>=32 && c<127 && !strchr(invalids, c));
  1901. }
  1902. };
  1903. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  1904. {
  1905. #ifdef _CONTAINERIZED
  1906. const char *myCluster = defaultPlane.str();
  1907. #else
  1908. const char *myCluster = roxieName.str();
  1909. #endif
  1910. StringArray remoteLocations;
  1911. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  1912. if (peerCluster)
  1913. {
  1914. if (*peerCluster!='-') // a remote cluster was specified explicitly
  1915. appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true); // Add only from specified cluster
  1916. }
  1917. else
  1918. appendRemoteLocations(pdesc, remoteLocations, NULL, myCluster, false); // Add from any cluster on same dali, other than mine
  1919. if (remotePDesc)
  1920. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false); // Then any remote on remote dali
  1921. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, channel, remoteLocations, startCopy);
  1922. }
  1923. //====================================================================================================
  1924. class CFilePartMap : implements IFilePartMap, public CInterface
  1925. {
  1926. class FilePartMapElement
  1927. {
  1928. public:
  1929. offset_t base;
  1930. offset_t top;
  1931. inline int compare(offset_t offset)
  1932. {
  1933. if (offset < base)
  1934. return -1;
  1935. else if (offset >= top)
  1936. return 1;
  1937. else
  1938. return 0;
  1939. }
  1940. } *map;
  1941. static int compareParts(const void *l, const void *r)
  1942. {
  1943. offset_t lp = * (offset_t *) l;
  1944. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  1945. return thisPart->compare(lp);
  1946. }
  1947. unsigned numParts;
  1948. offset_t recordCount;
  1949. offset_t totalSize;
  1950. StringAttr fileName;
  1951. public:
  1952. IMPLEMENT_IINTERFACE;
  1953. CFilePartMap(IPropertyTree &resource)
  1954. {
  1955. fileName.set(resource.queryProp("@id"));
  1956. numParts = resource.getPropInt("@numparts");
  1957. recordCount = resource.getPropInt64("@recordCount");
  1958. totalSize = resource.getPropInt64("@size");
  1959. assertex(numParts);
  1960. map = new FilePartMapElement[numParts];
  1961. for (unsigned i = 0; i < numParts; i++)
  1962. {
  1963. StringBuffer partPath;
  1964. partPath.appendf("Part[@num='%d']", i+1);
  1965. IPropertyTree *part = resource.queryPropTree(partPath.str());
  1966. if (!part)
  1967. {
  1968. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  1969. part = resource.queryPropTree(partPath.str());
  1970. }
  1971. assertex(part);
  1972. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  1973. assertex(size != (unsigned __int64) -1);
  1974. map[i].base = i ? map[i-1].top : 0;
  1975. map[i].top = map[i].base + size;
  1976. }
  1977. if (totalSize == (offset_t)-1)
  1978. totalSize = map[numParts-1].top;
  1979. else if (totalSize != map[numParts-1].top)
  1980. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  1981. }
  1982. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  1983. : fileName(_fileName)
  1984. {
  1985. numParts = fdesc.numParts();
  1986. IPropertyTree &props = fdesc.queryProperties();
  1987. recordCount = props.getPropInt64("@recordCount", -1);
  1988. totalSize = props.getPropInt64("@size", -1);
  1989. assertex(numParts);
  1990. map = new FilePartMapElement[numParts];
  1991. for (unsigned i = 0; i < numParts; i++)
  1992. {
  1993. IPartDescriptor &part = *fdesc.queryPart(i);
  1994. IPropertyTree &partProps = part.queryProperties();
  1995. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  1996. map[i].base = i ? map[i-1].top : 0;
  1997. if (size==(unsigned __int64) -1)
  1998. {
  1999. if (i==numParts-1)
  2000. map[i].top = (unsigned __int64) -1;
  2001. else
  2002. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file sizes not known for file %s", fileName.get());
  2003. }
  2004. else
  2005. map[i].top = map[i].base + size;
  2006. }
  2007. if (totalSize == (offset_t)-1)
  2008. totalSize = map[numParts-1].top;
  2009. else if (totalSize != map[numParts-1].top)
  2010. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  2011. }
  2012. ~CFilePartMap()
  2013. {
  2014. delete [] map;
  2015. }
  2016. virtual bool IsShared() const { return CInterface::IsShared(); };
  2017. virtual unsigned mapOffset(offset_t pos) const
  2018. {
  2019. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  2020. if (!part)
  2021. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %" I64F "d in file %s out of range (max offset permitted is %" I64F "d)", pos, fileName.str(), totalSize);
  2022. return (part-map)+1;
  2023. }
  2024. virtual unsigned getNumParts() const
  2025. {
  2026. return numParts;
  2027. }
  2028. virtual offset_t getTotalSize() const
  2029. {
  2030. return totalSize;
  2031. }
  2032. virtual offset_t getRecordCount() const
  2033. {
  2034. return recordCount;
  2035. }
  2036. virtual offset_t getBase(unsigned part) const
  2037. {
  2038. if (part > numParts || part == 0)
  2039. {
  2040. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existent file part %d (valid are 1-%d)", part, numParts);
  2041. }
  2042. return map[part-1].base;
  2043. }
  2044. virtual offset_t getFileSize() const
  2045. {
  2046. return map[numParts-1].top;
  2047. }
  2048. };
  2049. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  2050. {
  2051. return new CFilePartMap(fileName, fdesc);
  2052. }
  2053. //====================================================================================================
  2054. class CFileIOArray : implements IFileIOArray, public CInterface
  2055. {
  2056. mutable CriticalSection crit;
  2057. mutable unsigned __int64 totalSize = (unsigned __int64) -1; // Calculated on demand, and cached
  2058. mutable StringAttr id; // Calculated on demand, and cached
  2059. IPointerArrayOf<IFileIO> files;
  2060. UnsignedArray subfiles;
  2061. StringArray filenames;
  2062. Int64Array bases;
  2063. int actualCrc = 0;
  2064. unsigned valid = 0;
  2065. bool multipleFormatsSeen = false;
  2066. void _getId() const
  2067. {
  2068. md5_state_t md5;
  2069. md5_byte_t digest[16];
  2070. md5_init(&md5);
  2071. ForEachItemIn(idx, files)
  2072. {
  2073. IFileIO *file = files.item(idx);
  2074. if (file)
  2075. {
  2076. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  2077. }
  2078. }
  2079. md5_finish(&md5, digest);
  2080. char digestStr[33];
  2081. for (int i = 0; i < 16; i++)
  2082. {
  2083. sprintf(&digestStr[i*2],"%02x", digest[i]);
  2084. }
  2085. id.set(digestStr, 32);
  2086. }
  2087. public:
  2088. IMPLEMENT_IINTERFACE;
  2089. virtual bool IsShared() const { return CInterface::IsShared(); };
  2090. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base) const override
  2091. {
  2092. if (!files.isItem(partNo))
  2093. {
  2094. DBGLOG("getFilePart requested invalid part %d", partNo);
  2095. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  2096. }
  2097. IFileIO *file = files.item(partNo);
  2098. if (!file)
  2099. {
  2100. base = 0;
  2101. return NULL;
  2102. }
  2103. base = bases.item(partNo);
  2104. return LINK(file);
  2105. }
  2106. virtual const char *queryLogicalFilename(unsigned partNo) const override
  2107. {
  2108. if (!filenames.isItem(partNo))
  2109. {
  2110. DBGLOG("queryLogicalFilename requested invalid part %d", partNo);
  2111. throw MakeStringException(ROXIE_FILE_ERROR, "queryLogicalFilename requested invalid part %d", partNo);
  2112. }
  2113. return filenames.item(partNo);
  2114. }
  2115. void addFile(IFileIO *f, offset_t base, unsigned subfile, const char *filename, int _actualCrc)
  2116. {
  2117. if (f)
  2118. valid++;
  2119. files.append(f);
  2120. bases.append(base);
  2121. if (_actualCrc)
  2122. {
  2123. if (actualCrc && actualCrc != _actualCrc)
  2124. multipleFormatsSeen = true;
  2125. else
  2126. actualCrc = _actualCrc;
  2127. }
  2128. // MORE - lots of duplication in subfiles and filenames arrays
  2129. subfiles.append(subfile);
  2130. filenames.append(filename ? filename : "");
  2131. }
  2132. virtual unsigned length() const override
  2133. {
  2134. return files.length();
  2135. }
  2136. virtual unsigned numValid() const override
  2137. {
  2138. return valid;
  2139. }
  2140. virtual int queryActualFormatCrc() const override
  2141. {
  2142. return actualCrc;
  2143. }
  2144. virtual bool allFormatsMatch() const override
  2145. {
  2146. return !multipleFormatsSeen;
  2147. }
  2148. virtual bool isValid(unsigned partNo) const override
  2149. {
  2150. if (!files.isItem(partNo))
  2151. return false;
  2152. IFileIO *file = files.item(partNo);
  2153. if (!file)
  2154. return false;
  2155. return true;
  2156. }
  2157. virtual unsigned __int64 size() const override
  2158. {
  2159. CriticalBlock b(crit);
  2160. if (totalSize == (unsigned __int64) -1)
  2161. {
  2162. totalSize = 0;
  2163. ForEachItemIn(idx, files)
  2164. {
  2165. IFileIO *file = files.item(idx);
  2166. if (file)
  2167. totalSize += file->size();
  2168. }
  2169. }
  2170. return totalSize;
  2171. }
  2172. virtual StringBuffer &getId(StringBuffer &ret) const override
  2173. {
  2174. CriticalBlock b(crit);
  2175. if (!id)
  2176. _getId();
  2177. return ret.append(id);
  2178. }
  2179. virtual unsigned getSubFile(unsigned partNo) const override
  2180. {
  2181. return subfiles.item(partNo);
  2182. }
  2183. };
  2184. class CTranslatorSet : implements CInterfaceOf<ITranslatorSet>
  2185. {
  2186. IConstPointerArrayOf<IDynamicTransform> transformers;
  2187. IConstPointerArrayOf<IKeyTranslator> keyTranslators;
  2188. IPointerArrayOf<IOutputMetaData> actualLayouts;
  2189. const RtlRecord &targetLayout;
  2190. int targetFormatCrc = 0;
  2191. bool anyTranslators = false;
  2192. bool anyKeyedTranslators = false;
  2193. bool translatorsMatch = true;
  2194. public:
  2195. CTranslatorSet(const RtlRecord &_targetLayout, int _targetFormatCrc)
  2196. : targetLayout(_targetLayout), targetFormatCrc(_targetFormatCrc)
  2197. {}
  2198. void addTranslator(const IDynamicTransform *translator, const IKeyTranslator *keyTranslator, IOutputMetaData *actualLayout)
  2199. {
  2200. assertex(actualLayout);
  2201. if (translator || keyTranslator)
  2202. anyTranslators = true;
  2203. if (translator && translator->keyedTranslated())
  2204. anyKeyedTranslators = true;
  2205. if (transformers.ordinality() && (translator != transformers.item(0)))
  2206. translatorsMatch = false;
  2207. transformers.append(translator);
  2208. keyTranslators.append(keyTranslator);
  2209. actualLayouts.append(actualLayout);
  2210. }
  2211. virtual const RtlRecord &queryTargetFormat() const override
  2212. {
  2213. return targetLayout;
  2214. }
  2215. virtual int queryTargetFormatCrc() const override
  2216. {
  2217. return targetFormatCrc;
  2218. }
  2219. virtual const IDynamicTransform *queryTranslator(unsigned subFile) const override
  2220. {
  2221. // We need to have translated partnos to subfiles before calling this!
  2222. // Note: while the required projected format will be the same for all parts, the
  2223. // actual layout - and thus the required translation - may not be, for example if
  2224. // we have a superfile with mismatching formats.
  2225. if (anyTranslators && transformers.isItem(subFile))
  2226. return transformers.item(subFile);
  2227. return nullptr;
  2228. }
  2229. virtual const IKeyTranslator *queryKeyTranslator(unsigned subFile) const override
  2230. {
  2231. if (anyTranslators && keyTranslators.isItem(subFile))
  2232. return keyTranslators.item(subFile);
  2233. return nullptr;
  2234. }
  2235. virtual ISourceRowPrefetcher *getPrefetcher(unsigned subFile) const override
  2236. {
  2237. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  2238. assertex(actualLayout);
  2239. return actualLayout->createDiskPrefetcher();
  2240. }
  2241. virtual IOutputMetaData *queryActualLayout(unsigned subFile) const override
  2242. {
  2243. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  2244. assertex(actualLayout);
  2245. return actualLayout;
  2246. }
  2247. virtual bool isTranslating() const override
  2248. {
  2249. return anyTranslators;
  2250. }
  2251. virtual bool isTranslatingKeyed() const override
  2252. {
  2253. return anyKeyedTranslators;
  2254. }
  2255. virtual bool hasConsistentTranslation() const override
  2256. {
  2257. return translatorsMatch;
  2258. }
  2259. };
  2260. template <class X> class PerChannelCacheOf
  2261. {
  2262. IPointerArrayOf<X> cache;
  2263. UnsignedArray channels;
  2264. public:
  2265. // NOTE - typically only a couple of entries (but see PerFormatCacheOf below
  2266. void set(X *value, unsigned channel)
  2267. {
  2268. cache.append(value);
  2269. channels.append(channel);
  2270. }
  2271. X *get(unsigned channel) const
  2272. {
  2273. ForEachItemIn(idx, channels)
  2274. {
  2275. if (channels.item(idx)==channel)
  2276. return cache.item(idx);
  2277. }
  2278. return NULL;
  2279. }
  2280. };
  2281. template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
  2282. {
  2283. // Identical for now, but characteristics are different so implementations may diverge.
  2284. // For example, this one may want to be a hash table, and there may be many more entries
  2285. };
  2286. class CResolvedFile : implements IResolvedFileCreator, implements ISafeSDSSubscription, public CInterface
  2287. {
  2288. protected:
  2289. IResolvedFileCache *cached;
  2290. StringAttr lfn;
  2291. StringAttr physicalName;
  2292. Owned<IDistributedFile> dFile; // NULL on copies serialized to agents. Note that this implies we keep a lock on dali file for the lifetime of this object.
  2293. CDateTime fileTimeStamp;
  2294. offset_t fileSize;
  2295. unsigned fileCheckSum;
  2296. RoxieFileType fileType;
  2297. bool isSuper;
  2298. StringArray subNames;
  2299. IPointerArrayOf<IFileDescriptor> subFiles; // note - on agents, the file descriptors may have incomplete info. On originating server is always complete
  2300. IPointerArrayOf<IFileDescriptor> remoteSubFiles; // note - on agents, the file descriptors may have incomplete info. On originating server is always complete
  2301. IntArray formatCrcs;
  2302. IPointerArrayOf<IOutputMetaData> diskTypeInfo; // New info using RtlTypeInfo structures
  2303. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  2304. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  2305. Owned <IPropertyTree> properties;
  2306. Linked<IRoxieDaliHelper> daliHelper;
  2307. Owned<IDaliPackageWatcher> notifier;
  2308. virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
  2309. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  2310. {
  2311. subNames.append(subName);
  2312. subFiles.append(fdesc);
  2313. remoteSubFiles.append(remoteFDesc);
  2314. IPropertyTree const & props = fdesc->queryProperties();
  2315. // NOTE - grouping is not included in the formatCRC, nor is the trailing byte that indicates grouping
  2316. // included in the rtlTypeInfo.
  2317. const char *kind = props.queryProp("@kind");
  2318. if (kind)
  2319. {
  2320. RoxieFileType thisFileType = streq(kind, "key") ? ROXIE_KEY : ROXIE_FILE;
  2321. if (subFiles.length()==1)
  2322. fileType = thisFileType;
  2323. else
  2324. assertex(thisFileType==fileType);
  2325. }
  2326. bool isGrouped = props.getPropBool("@grouped", false);
  2327. int formatCrc = props.getPropInt("@formatCrc", 0);
  2328. // If formatCrc and grouping are same as previous, reuse previous typeInfo
  2329. Owned<IOutputMetaData> actualFormat;
  2330. unsigned prevIdx = formatCrcs.length()-1;
  2331. if (formatCrcs.length() && formatCrc == formatCrcs.item(prevIdx) &&
  2332. diskTypeInfo.item(prevIdx) && isGrouped==diskTypeInfo.item(prevIdx)->isGrouped())
  2333. actualFormat.set(diskTypeInfo.item(prevIdx));
  2334. else
  2335. actualFormat.setown(getDaliLayoutInfo(props));
  2336. diskTypeInfo.append(actualFormat.getClear());
  2337. formatCrcs.append(formatCrc);
  2338. unsigned numParts = fdesc->numParts();
  2339. offset_t base = 0;
  2340. for (unsigned i = 0; i < numParts; i++)
  2341. {
  2342. IPartDescriptor *pdesc = fdesc->queryPart(i);
  2343. IPropertyTree &partProps = pdesc->queryProperties();
  2344. offset_t dfsSize = partProps.getPropInt64("@size");
  2345. partProps.setPropInt64("@offset", base);
  2346. base += dfsSize;
  2347. }
  2348. fileSize += base;
  2349. }
  2350. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  2351. {
  2352. if (traceLevel > 2)
  2353. DBGLOG("Superfile %s change detected", lfn.get());
  2354. {
  2355. CriticalBlock b(lock);
  2356. if (cached)
  2357. {
  2358. cached->removeCache(this);
  2359. cached = NULL;
  2360. }
  2361. }
  2362. globalPackageSetManager->requestReload(false, false);
  2363. }
  2364. // We cache all the file maps/arrays etc here.
  2365. mutable CriticalSection lock;
  2366. mutable Owned<IFilePartMap> fileMap;
  2367. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  2368. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  2369. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  2370. public:
  2371. IMPLEMENT_IINTERFACE;
  2372. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* _daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
  2373. : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile), daliHelper(_daliHelper)
  2374. {
  2375. cached = NULL;
  2376. fileSize = 0;
  2377. fileCheckSum = 0;
  2378. if (dFile)
  2379. {
  2380. if (traceLevel > 5)
  2381. DBGLOG("Roxie server adding information for file %s", lfn.get());
  2382. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  2383. dFile->getFileCheckSum(fileCheckSum);
  2384. assertex(tsSet); // per Nigel, is always set
  2385. IDistributedSuperFile *superFile = dFile->querySuperFile();
  2386. if (superFile)
  2387. {
  2388. isSuper = true;
  2389. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  2390. ForEach(*subs)
  2391. {
  2392. IDistributedFile &sub = subs->query();
  2393. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  2394. Owned<IFileDescriptor> remoteFDesc;
  2395. if (daliHelper)
  2396. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt, defaultPrivilegedUser));
  2397. subDFiles.append(OLINK(sub));
  2398. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  2399. }
  2400. // We have to clone the properties since we don't want to keep the superfile locked
  2401. properties.setown(createPTreeFromIPT(&dFile->queryAttributes(), ipt_lowmem));
  2402. if (!isDynamic && !lockSuperFiles)
  2403. {
  2404. notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
  2405. dFile.clear(); // We don't lock superfiles, except dynamic ones
  2406. }
  2407. }
  2408. else // normal file, not superkey
  2409. {
  2410. isSuper = false;
  2411. properties.set(&dFile->queryAttributes());
  2412. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  2413. Owned<IFileDescriptor> remoteFDesc;
  2414. if (daliHelper)
  2415. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt, defaultPrivilegedUser));
  2416. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  2417. }
  2418. }
  2419. }
  2420. virtual void beforeDispose()
  2421. {
  2422. if (notifier)
  2423. daliHelper->releaseSubscription(notifier);
  2424. notifier.clear();
  2425. if (cached)
  2426. {
  2427. cached->removeCache(this);
  2428. }
  2429. }
  2430. virtual unsigned numSubFiles() const
  2431. {
  2432. return subNames.length();
  2433. }
  2434. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  2435. {
  2436. if (subNames.isItem(num))
  2437. {
  2438. name.append(subNames.item(num));
  2439. return true;
  2440. }
  2441. else
  2442. {
  2443. return false;
  2444. }
  2445. }
  2446. virtual unsigned findSubName(const char *subname) const
  2447. {
  2448. ForEachItemIn(idx, subNames)
  2449. {
  2450. if (strieq(subNames.item(idx), subname))
  2451. return idx;
  2452. }
  2453. return NotFound;
  2454. }
  2455. virtual unsigned getContents(StringArray &contents) const
  2456. {
  2457. ForEachItemIn(idx, subNames)
  2458. {
  2459. contents.append(subNames.item(idx));
  2460. }
  2461. return subNames.length();
  2462. }
  2463. virtual bool isSuperFile() const
  2464. {
  2465. return isSuper;
  2466. }
  2467. virtual bool isKey() const
  2468. {
  2469. return fileType==ROXIE_KEY;
  2470. }
  2471. virtual IFilePartMap *getFileMap() const
  2472. {
  2473. CriticalBlock b(lock);
  2474. if (!fileMap)
  2475. {
  2476. if (subFiles.length())
  2477. {
  2478. if (subFiles.length()!=1)
  2479. throw MakeStringException(0, "Roxie does not support FETCH or KEYED JOIN to superkey with multiple parts");
  2480. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  2481. }
  2482. }
  2483. return fileMap.getLink();
  2484. }
  2485. virtual unsigned getNumParts() const
  2486. {
  2487. CriticalBlock b(lock);
  2488. unsigned numParts = 0;
  2489. ForEachItemIn(idx, subFiles)
  2490. {
  2491. unsigned thisNumParts = subFiles.item(idx)->numParts();
  2492. if (thisNumParts > numParts)
  2493. numParts = thisNumParts;
  2494. }
  2495. return numParts;
  2496. }
  2497. bool serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  2498. {
  2499. // Find all the partno's that go to this channel
  2500. unsigned numParts = fdesc->numParts();
  2501. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  2502. numParts--; // don't want to send TLK
  2503. UnsignedArray partNos;
  2504. for (unsigned i = 1; i <= numParts; i++)
  2505. {
  2506. if (getBondedChannel(i)==channel || !isLocal)
  2507. {
  2508. partNos.append(i-1);
  2509. }
  2510. }
  2511. fdesc->serializeParts(mb, partNos);
  2512. return partNos.length();
  2513. }
  2514. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const override
  2515. {
  2516. if (traceLevel > 6)
  2517. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  2518. byte type = (byte) fileType;
  2519. mb.append(type);
  2520. fileTimeStamp.serialize(mb);
  2521. mb.append(fileCheckSum);
  2522. mb.append(fileSize);
  2523. mb.append(isSuper);
  2524. unsigned numSubFiles = subFiles.length();
  2525. mb.append(numSubFiles);
  2526. ForEachItemIn(idx, subFiles)
  2527. {
  2528. mb.append(subNames.item(idx));
  2529. IFileDescriptor *fdesc = subFiles.item(idx);
  2530. bool anyparts = serializeFDesc(mb, fdesc, channel, isLocal);
  2531. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2532. if (remoteFDesc)
  2533. {
  2534. mb.append(true);
  2535. anyparts |= serializeFDesc(mb, remoteFDesc, channel, isLocal);
  2536. }
  2537. else
  2538. mb.append(false);
  2539. mb.append(formatCrcs.item(idx));
  2540. IOutputMetaData *diskType = diskTypeInfo.item(idx);
  2541. if (anyparts && diskType)
  2542. {
  2543. if (idx && formatCrcs.item(idx)==formatCrcs.item(idx-1))
  2544. mb.append((byte) 3); // indicating same format as previous
  2545. else
  2546. {
  2547. mb.append((byte) (diskType->isGrouped() ? 2 : 1));
  2548. verifyex(dumpTypeInfo(mb, diskType->queryTypeInfo())); // Must be serializable, as we deserialized it...
  2549. }
  2550. }
  2551. else
  2552. mb.append((byte) 0);
  2553. }
  2554. if (properties)
  2555. {
  2556. mb.append(true);
  2557. properties->serialize(mb);
  2558. }
  2559. else
  2560. mb.append(false);
  2561. }
  2562. static FileFormatMode getMode(IFileDescriptor *fileDesc)
  2563. {
  2564. if (isFileKey(fileDesc))
  2565. return FileFormatMode::index;
  2566. else
  2567. {
  2568. const char *kind = fileDesc->queryKind();
  2569. if (kind)
  2570. {
  2571. if (streq("csv", kind))
  2572. return FileFormatMode::csv;
  2573. else if (streq("xml", kind))
  2574. return FileFormatMode::xml;
  2575. else if (streq("json", kind))
  2576. return FileFormatMode::xml; // MORE - is that right?
  2577. }
  2578. return FileFormatMode::flat;
  2579. }
  2580. }
  2581. virtual ITranslatorSet *getTranslators(int projectedFormatCrc, IOutputMetaData *projected, int expectedFormatCrc, IOutputMetaData *expected, RecordTranslationMode mode, FileFormatMode fileMode, const char *queryName) const override
  2582. {
  2583. // NOTE - projected and expected and anything fetched from them such as type info may reside in dynamically loaded (and unloaded)
  2584. // query DLLs - this means it is not safe to include them in any sort of cache that might outlive the current query.
  2585. Owned<CTranslatorSet> result = new CTranslatorSet(expected->queryRecordAccessor(true), projectedFormatCrc);
  2586. Owned<const IDynamicTransform> translator; // Translates rows from actual to projected
  2587. Owned<const IKeyTranslator> keyedTranslator; // translate filter conditions from expected to actual
  2588. int prevFormatCrc = 0;
  2589. assertex(projected != nullptr);
  2590. ForEachItemIn(idx, subFiles)
  2591. {
  2592. IFileDescriptor *subFile = subFiles.item(idx);
  2593. IOutputMetaData *actual = expected;
  2594. if (subFile)
  2595. {
  2596. FileFormatMode actualMode = getMode(subFile);
  2597. const char *subname = subNames.item(idx);
  2598. if (fileMode!=actualMode)
  2599. {
  2600. if (traceLevel>0)
  2601. DBGLOG("In query %s: Not translating %s as file type does not match", queryName, subname);
  2602. }
  2603. else if (projectedFormatCrc != 0) // projectedFormatCrc is currently 0 for csv/xml which should not create translators.
  2604. {
  2605. int thisFormatCrc = 0;
  2606. bool actualUnknown = true;
  2607. if (mode == RecordTranslationMode::AlwaysECL)
  2608. {
  2609. if (formatCrcs.item(idx) && expectedFormatCrc && (formatCrcs.item(idx) != expectedFormatCrc))
  2610. DBGLOG("Overriding stored record layout reading file %s", subname);
  2611. thisFormatCrc = expectedFormatCrc;
  2612. }
  2613. else
  2614. {
  2615. thisFormatCrc = formatCrcs.item(idx);
  2616. if (diskTypeInfo.item(idx))
  2617. {
  2618. actual = diskTypeInfo.item(idx);
  2619. actualUnknown = false;
  2620. }
  2621. else if (thisFormatCrc == expectedFormatCrc) // Type descriptors that cannot be serialized can still be read from code
  2622. {
  2623. actual = expected;
  2624. actualUnknown = false;
  2625. }
  2626. }
  2627. assertex(actual);
  2628. if ((thisFormatCrc != prevFormatCrc) || (idx == 0)) // Check if same translation as last subfile
  2629. {
  2630. translator.clear();
  2631. keyedTranslator.clear();
  2632. //Check if the file requires translation, but translation is disabled
  2633. if (thisFormatCrc && expectedFormatCrc && (thisFormatCrc != expectedFormatCrc) && (mode == RecordTranslationMode::None))
  2634. throwTranslationError(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true), subname);
  2635. if (thisFormatCrc == expectedFormatCrc && projectedFormatCrc == expectedFormatCrc && (actualUnknown || alwaysTrustFormatCrcs))
  2636. {
  2637. if (traceLevel > 5)
  2638. DBGLOG("In query %s: Assume no translation required for file %s, crc's match", queryName, subname);
  2639. }
  2640. else if (actualUnknown && mode != RecordTranslationMode::AlwaysECL)
  2641. {
  2642. if (thisFormatCrc)
  2643. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s (disk format not serialized)", subname);
  2644. else if (traceLevel > 5)
  2645. DBGLOG("In query %s: Assume no translation required for %s, disk format unknown", queryName, subname);
  2646. }
  2647. else
  2648. {
  2649. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), actual->queryRecordAccessor(true)));
  2650. if (traceLevel>0 && traceTranslations)
  2651. {
  2652. DBGLOG("In query %s: Record layout translator created for %s", queryName, subname);
  2653. translator->describe();
  2654. }
  2655. if (!translator || !translator->canTranslate())
  2656. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  2657. else if (translator->needsTranslate())
  2658. {
  2659. if (fileMode==FileFormatMode::index && translator->keyedTranslated())
  2660. throw MakeStringException(ROXIE_MISMATCH, "Record layout mismatch detected in keyed fields for file %s", subname);
  2661. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  2662. }
  2663. else
  2664. translator.clear();
  2665. }
  2666. }
  2667. prevFormatCrc = thisFormatCrc;
  2668. }
  2669. }
  2670. else if (traceLevel > 5)
  2671. DBGLOG("In query %s: Assume no translation required, subfile is null", queryName);
  2672. result->addTranslator(LINK(translator), LINK(keyedTranslator), LINK(actual));
  2673. }
  2674. return result.getClear();
  2675. }
  2676. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  2677. {
  2678. CriticalBlock b(lock);
  2679. IFileIOArray *ret = ioArrayMap.get(channel);
  2680. if (!ret)
  2681. {
  2682. ret = createIFileIOArray(isOpt, channel);
  2683. ioArrayMap.set(ret, channel);
  2684. }
  2685. return LINK(ret);
  2686. }
  2687. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  2688. {
  2689. Owned<CFileIOArray> f = new CFileIOArray;
  2690. f->addFile(nullptr, 0, 0, nullptr, 0);
  2691. ForEachItemIn(idx, subFiles)
  2692. {
  2693. IFileDescriptor *fdesc = subFiles.item(idx);
  2694. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2695. const char *subname = subNames.item(idx);
  2696. int thisFormatCrc = formatCrcs.item(idx);
  2697. if (fdesc)
  2698. {
  2699. unsigned numParts = fdesc->numParts();
  2700. for (unsigned i = 1; i <= numParts; i++)
  2701. {
  2702. if (!channel || getBondedChannel(i)==channel)
  2703. {
  2704. try
  2705. {
  2706. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  2707. assertex(pdesc);
  2708. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  2709. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  2710. IPropertyTree &partProps = pdesc->queryProperties();
  2711. f->addFile(file.getClear(), partProps.getPropInt64("@offset"), idx, subname, thisFormatCrc);
  2712. }
  2713. catch (IException *E)
  2714. {
  2715. StringBuffer err;
  2716. err.append("Could not load file ");
  2717. fdesc->getTraceName(err);
  2718. DBGLOG(E, err.str());
  2719. if (!isOpt)
  2720. throw;
  2721. E->Release();
  2722. f->addFile(nullptr, 0, idx, nullptr, 0);
  2723. }
  2724. }
  2725. else
  2726. f->addFile(nullptr, 0, idx, nullptr, 0);
  2727. }
  2728. }
  2729. }
  2730. return f.getClear();
  2731. }
  2732. virtual IKeyArray *getKeyArray(bool isOpt, unsigned channel) const override
  2733. {
  2734. unsigned maxParts = 0;
  2735. ForEachItemIn(subFile, subFiles)
  2736. {
  2737. IFileDescriptor *fdesc = subFiles.item(subFile);
  2738. if (fdesc)
  2739. {
  2740. unsigned numParts = fdesc->numParts();
  2741. if (numParts > 1)
  2742. numParts--; // Don't include TLK
  2743. if (numParts > maxParts)
  2744. maxParts = numParts;
  2745. }
  2746. }
  2747. CriticalBlock b(lock);
  2748. IKeyArray *ret = keyArrayMap.get(channel);
  2749. if (!ret)
  2750. {
  2751. ret = createKeyArray(isOpt, channel, maxParts);
  2752. keyArrayMap.set(ret, channel);
  2753. }
  2754. return LINK(ret);
  2755. }
  2756. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  2757. {
  2758. Owned<IKeyArray> ret = ::createKeyArray();
  2759. if (channel)
  2760. {
  2761. ret->addKey(NULL);
  2762. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  2763. {
  2764. if (channel == getBondedChannel(partNo))
  2765. {
  2766. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2767. ForEachItemIn(idx, subFiles)
  2768. {
  2769. IFileDescriptor *fdesc = subFiles.item(idx);
  2770. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2771. Owned <ILazyFileIO> part;
  2772. unsigned crc = 0;
  2773. if (fdesc) // NB there may be no parts for this channel
  2774. {
  2775. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  2776. if (pdesc)
  2777. {
  2778. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  2779. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  2780. pdesc->getCrc(crc);
  2781. }
  2782. }
  2783. if (part)
  2784. {
  2785. if (lazyOpen)
  2786. {
  2787. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2788. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), part->getFileIdx(), false));
  2789. }
  2790. else
  2791. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), part->getFileIdx(), false));
  2792. }
  2793. else
  2794. keyset->addIndex(NULL);
  2795. }
  2796. ret->addKey(keyset.getClear());
  2797. }
  2798. else
  2799. ret->addKey(NULL);
  2800. }
  2801. }
  2802. else
  2803. {
  2804. // Channel 0 means return the TLK
  2805. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2806. ForEachItemIn(idx, subFiles)
  2807. {
  2808. IFileDescriptor *fdesc = subFiles.item(idx);
  2809. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2810. Owned<IKeyIndexBase> key;
  2811. if (fdesc)
  2812. {
  2813. unsigned numParts = fdesc->numParts();
  2814. assertex(numParts > 0);
  2815. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  2816. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  2817. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  2818. unsigned crc = 0;
  2819. pdesc->getCrc(crc);
  2820. StringBuffer pname;
  2821. pdesc->getPath(pname);
  2822. if (lazyOpen)
  2823. {
  2824. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2825. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), keyFile->getFileIdx(), numParts>1));
  2826. }
  2827. else
  2828. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), keyFile->getFileIdx(), numParts>1));
  2829. keyset->addIndex(LINK(key->queryPart(0)));
  2830. }
  2831. else
  2832. keyset->addIndex(NULL);
  2833. }
  2834. if (keyset->numParts())
  2835. ret->addKey(keyset.getClear());
  2836. else if (!isOpt)
  2837. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  2838. else if (traceLevel > 4)
  2839. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  2840. }
  2841. return ret.getClear();
  2842. }
  2843. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IOutputMetaData *preloadLayout, bool preload) const
  2844. {
  2845. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  2846. // Failures to resolve will not be cached, only successes.
  2847. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  2848. CriticalBlock b(lock);
  2849. IInMemoryIndexManager *ret = indexMap.get(channel);
  2850. if (!ret)
  2851. {
  2852. ret = createInMemoryIndexManager(preloadLayout->queryRecordAccessor(true), isOpt, lfn);
  2853. Owned<IFileIOArray> files = getIFileIOArray(isOpt, channel);
  2854. ret->load(files, preloadLayout, preload); // note - files (passed in) are also channel specific
  2855. indexMap.set(ret, channel);
  2856. }
  2857. return LINK(ret);
  2858. }
  2859. virtual const CDateTime &queryTimeStamp() const
  2860. {
  2861. return fileTimeStamp;
  2862. }
  2863. virtual unsigned queryCheckSum() const
  2864. {
  2865. return fileCheckSum;
  2866. }
  2867. virtual offset_t getFileSize() const
  2868. {
  2869. return fileSize;
  2870. }
  2871. virtual hash64_t addHash64(hash64_t hashValue) const
  2872. {
  2873. hashValue = fileTimeStamp.getHash(hashValue);
  2874. if (fileCheckSum)
  2875. hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
  2876. return hashValue;
  2877. }
  2878. virtual void addSubFile(const IResolvedFile *_sub)
  2879. {
  2880. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  2881. if (subFiles.length())
  2882. assertex(sub->fileType==fileType);
  2883. else
  2884. fileType = sub->fileType;
  2885. subRFiles.append((IResolvedFile &) *LINK(_sub));
  2886. ForEachItemIn(idx, sub->subFiles)
  2887. {
  2888. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  2889. }
  2890. }
  2891. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  2892. {
  2893. addFile(lfn, _sub, _remoteSub);
  2894. }
  2895. virtual void addSubFile(const char *localFileName)
  2896. {
  2897. Owned<IFile> file = createIFile(localFileName);
  2898. assertex(file->exists());
  2899. offset_t size = file->size();
  2900. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  2901. if (isIndexFile(file))
  2902. fdesc->queryProperties().setProp("@kind", "key");
  2903. Owned<IPropertyTree> pp = createPTree("Part", ipt_lowmem);
  2904. pp->setPropInt64("@size",size);
  2905. pp->setPropBool("@local", true);
  2906. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  2907. addSubFile(fdesc.getClear(), NULL);
  2908. }
  2909. virtual void setCache(IResolvedFileCache *cache)
  2910. {
  2911. if (cached)
  2912. {
  2913. if (traceLevel > 9)
  2914. DBGLOG("setCache removing from prior cache %s", queryFileName());
  2915. if (cache==NULL)
  2916. cached->removeCache(this);
  2917. else
  2918. throwUnexpected();
  2919. }
  2920. cached = cache;
  2921. }
  2922. virtual bool isAliveAndLink() const
  2923. {
  2924. return CInterface::isAliveAndLink();
  2925. }
  2926. virtual const char *queryFileName() const
  2927. {
  2928. return lfn.get();
  2929. }
  2930. virtual const char *queryPhysicalName() const
  2931. {
  2932. return physicalName.get();
  2933. }
  2934. virtual const IPropertyTree *queryProperties() const
  2935. {
  2936. return properties;
  2937. }
  2938. virtual void remove()
  2939. {
  2940. subFiles.kill();
  2941. subDFiles.kill();
  2942. subRFiles.kill();
  2943. subNames.kill();
  2944. remoteSubFiles.kill();
  2945. properties.clear();
  2946. notifier.clear();
  2947. if (isSuper)
  2948. {
  2949. // Because we don't lock superfiles, we need to behave differently
  2950. UNIMPLEMENTED;
  2951. }
  2952. else if (dFile)
  2953. {
  2954. dFile->detach();
  2955. }
  2956. else if (!physicalName.isEmpty())
  2957. {
  2958. try
  2959. {
  2960. Owned<IFile> file = createIFile(physicalName.get());
  2961. file->remove();
  2962. }
  2963. catch (IException *e)
  2964. {
  2965. OERRLOG(-1, "Error removing file %s (%s)", lfn.get(), physicalName.get());
  2966. e->Release();
  2967. }
  2968. }
  2969. }
  2970. virtual bool exists() const
  2971. {
  2972. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  2973. // This will make more sense if/when we start to lock earlier.
  2974. if (dFile || isSuper)
  2975. return true; // MORE - may need some thought - especially the isSuper case
  2976. else if (!physicalName.isEmpty())
  2977. return checkFileExists(physicalName.get());
  2978. else
  2979. return false;
  2980. }
  2981. virtual bool isRestrictedAccess() const override
  2982. {
  2983. return (dFile && dFile->isRestrictedAccess());
  2984. }
  2985. };
  2986. /*----------------------------------------------------------------------------------------------------------
  2987. MORE
  2988. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  2989. ------------------------------------------------------------------------------------------------------------*/
  2990. class CAgentDynamicFile : public CResolvedFile
  2991. {
  2992. public:
  2993. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  2994. bool isLocal;
  2995. unsigned channel;
  2996. ServerIdentifier serverId;
  2997. public:
  2998. CAgentDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  2999. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), isOpt(_isOpt), isLocal(_isLocal), channel(header->channel), serverId(header->serverId)
  3000. {
  3001. // call back to the server to get the info
  3002. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  3003. try
  3004. {
  3005. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK, 0); // subchannel not relevant
  3006. bool ok = false;
  3007. for (unsigned i = 0; i < callbackRetries; i++)
  3008. {
  3009. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  3010. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  3011. char *buf = (char *) output->getBuffer(len, true);
  3012. buf[0] = isOpt;
  3013. buf[1] = isLocal;
  3014. strcpy(buf+2, lfn.get());
  3015. output->putBuffer(buf, len, true);
  3016. output->flush();
  3017. output.clear();
  3018. if (callback->wait(callbackTimeout))
  3019. {
  3020. ok = true;
  3021. break;
  3022. }
  3023. else
  3024. {
  3025. DBGLOG("timed out waiting for server callback - retrying");
  3026. }
  3027. }
  3028. if (ok)
  3029. {
  3030. if (traceLevel > 6)
  3031. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  3032. MemoryBuffer &serverData = callback->queryData();
  3033. byte type;
  3034. serverData.read(type);
  3035. fileType = (RoxieFileType) type;
  3036. fileTimeStamp.deserialize(serverData);
  3037. serverData.read(fileCheckSum);
  3038. serverData.read(fileSize);
  3039. serverData.read(isSuper);
  3040. unsigned numSubFiles;
  3041. serverData.read(numSubFiles);
  3042. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  3043. {
  3044. StringBuffer subName;
  3045. serverData.read(subName);
  3046. subNames.append(subName.str());
  3047. deserializeFilePart(serverData, subFiles, fileNo, false);
  3048. bool remotePresent;
  3049. serverData.read(remotePresent);
  3050. if (remotePresent)
  3051. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  3052. else
  3053. remoteSubFiles.append(NULL);
  3054. unsigned formatCrc;
  3055. serverData.read(formatCrc);
  3056. formatCrcs.append(formatCrc);
  3057. byte diskTypeInfoPresent;
  3058. serverData.read(diskTypeInfoPresent);
  3059. switch (diskTypeInfoPresent)
  3060. {
  3061. case 0:
  3062. diskTypeInfo.append(NULL);
  3063. break;
  3064. case 1:
  3065. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, false));
  3066. break;
  3067. case 2:
  3068. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, true));
  3069. break;
  3070. case 3:
  3071. assertex(fileNo > 0);
  3072. diskTypeInfo.append(LINK(diskTypeInfo.item(fileNo-1)));
  3073. break;
  3074. default:
  3075. throwUnexpected();
  3076. }
  3077. }
  3078. bool propertiesPresent;
  3079. serverData.read(propertiesPresent);
  3080. if (propertiesPresent)
  3081. properties.setown(createPTree(serverData, ipt_lowmem));
  3082. }
  3083. else
  3084. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  3085. }
  3086. catch (...)
  3087. {
  3088. ROQ->removePendingCallback(callback);
  3089. throw;
  3090. }
  3091. ROQ->removePendingCallback(callback);
  3092. }
  3093. private:
  3094. void deserializeFilePart(MemoryBuffer &serverData, IPointerArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  3095. {
  3096. IArrayOf<IPartDescriptor> parts;
  3097. deserializePartFileDescriptors(serverData, parts);
  3098. if (parts.length())
  3099. {
  3100. files.append(LINK(&parts.item(0).queryOwner()));
  3101. }
  3102. else
  3103. {
  3104. if (traceLevel > 6)
  3105. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  3106. files.append(NULL);
  3107. }
  3108. }
  3109. };
  3110. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  3111. {
  3112. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
  3113. }
  3114. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
  3115. {
  3116. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  3117. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
  3118. }
  3119. class CAgentDynamicFileCache : implements IAgentDynamicFileCache, public CInterface
  3120. {
  3121. unsigned tableSize;
  3122. mutable CriticalSection crit;
  3123. CIArrayOf<CAgentDynamicFile> files; // expect numbers to be small - probably not worth hashing
  3124. public:
  3125. IMPLEMENT_IINTERFACE;
  3126. CAgentDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  3127. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal) override
  3128. {
  3129. if (logctx.queryTraceLevel() > 5)
  3130. {
  3131. StringBuffer s;
  3132. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  3133. }
  3134. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  3135. CriticalBlock b(crit);
  3136. if (!cacheDate.isNull())
  3137. {
  3138. unsigned idx = 0;
  3139. while (files.isItem(idx))
  3140. {
  3141. CAgentDynamicFile &f = files.item(idx);
  3142. if (f.channel==header->channel && f.serverId==header->serverId && stricmp(f.queryFileName(), lfn)==0)
  3143. {
  3144. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  3145. {
  3146. if (f.isKey())
  3147. clearKeyStoreCacheEntry(f.queryFileName());
  3148. files.remove(idx);
  3149. idx--;
  3150. }
  3151. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  3152. {
  3153. files.swap(idx, 0);
  3154. return LINK(&f);
  3155. }
  3156. }
  3157. idx++;
  3158. }
  3159. }
  3160. Owned<CAgentDynamicFile> ret;
  3161. {
  3162. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  3163. CriticalUnblock b1(crit);
  3164. ret.setown(new CAgentDynamicFile(logctx, lfn, header, isOpt, isLocal));
  3165. }
  3166. if (!ret->isSuperFile())
  3167. {
  3168. // Cache results for improved performance - we DON'T cache superfiles as they are liable to change during the course of a query.
  3169. // Note that even caching non-superfiles is also potentially going to give stale results, if the cache persists beyond the current
  3170. // query.
  3171. while (files.length() > tableSize)
  3172. files.remove(files.length()-1);
  3173. files.add(*ret.getLink(), 0);
  3174. }
  3175. return ret.getClear();
  3176. }
  3177. virtual void releaseAll() override
  3178. {
  3179. CriticalBlock b(crit);
  3180. files.kill();
  3181. }
  3182. };
  3183. static CriticalSection agentDynamicFileCacheCrit;
  3184. static Owned<IAgentDynamicFileCache> agentDynamicFileCache;
  3185. extern IAgentDynamicFileCache *queryAgentDynamicFileCache()
  3186. {
  3187. if (!agentDynamicFileCache)
  3188. {
  3189. CriticalBlock b(agentDynamicFileCacheCrit);
  3190. if (!agentDynamicFileCache)
  3191. agentDynamicFileCache.setown(new CAgentDynamicFileCache(20));
  3192. }
  3193. return agentDynamicFileCache;
  3194. }
  3195. extern void releaseAgentDynamicFileCache()
  3196. {
  3197. CriticalBlock b(agentDynamicFileCacheCrit);
  3198. if (agentDynamicFileCache)
  3199. agentDynamicFileCache->releaseAll();
  3200. }
  3201. static Singleton<CRoxieFileCache> fileCache;
  3202. // Initialization/termination
  3203. MODULE_INIT(INIT_PRIORITY_STANDARD)
  3204. {
  3205. return true;
  3206. }
  3207. MODULE_EXIT()
  3208. {
  3209. auto cache = fileCache.queryExisting();
  3210. if (cache)
  3211. {
  3212. cache->join();
  3213. cache->Release();
  3214. }
  3215. }
  3216. extern IRoxieFileCache &queryFileCache()
  3217. {
  3218. return *fileCache.query([] { return new CRoxieFileCache; });
  3219. }
  3220. class CRoxieWriteHandler : implements IRoxieWriteHandler, public CInterface
  3221. {
  3222. public:
  3223. IMPLEMENT_IINTERFACE;
  3224. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  3225. : daliHelper(_daliHelper), dFile(_dFile)
  3226. {
  3227. ForEachItemIn(idx, _clusters)
  3228. {
  3229. addCluster(_clusters.item(idx));
  3230. }
  3231. if (dFile->queryDistributedFile())
  3232. {
  3233. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  3234. if (isTemporary)
  3235. {
  3236. UNIMPLEMENTED;
  3237. }
  3238. else
  3239. localFile.setown(dFile->getPartFile(0, 0));
  3240. }
  3241. else
  3242. {
  3243. isTemporary = false;
  3244. localFile.setown(dFile->getPartFile(0, 0));
  3245. }
  3246. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  3247. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  3248. }
  3249. virtual IFile *queryFile() const
  3250. {
  3251. return localFile;
  3252. }
  3253. void getClusters(StringArray &clusters) const
  3254. {
  3255. ForEachItemIn(idx, allClusters)
  3256. {
  3257. clusters.append(allClusters.item(idx));
  3258. }
  3259. }
  3260. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  3261. {
  3262. if (success)
  3263. {
  3264. copyPhysical();
  3265. if (daliHelper && daliHelper->connected())
  3266. publish(activity);
  3267. }
  3268. if (isTemporary || !success)
  3269. {
  3270. localFile->remove();
  3271. }
  3272. }
  3273. private:
  3274. bool isTemporary;
  3275. Linked<IRoxieDaliHelper> daliHelper;
  3276. Owned<ILocalOrDistributedFile> dFile;
  3277. Owned<IFile> localFile;
  3278. Owned<IGroup> localCluster;
  3279. StringAttr localClusterName;
  3280. IArrayOf<IGroup> remoteNodes;
  3281. StringArray allClusters;
  3282. void copyPhysical() const
  3283. {
  3284. RemoteFilename rfn, rdn;
  3285. dFile->getPartFilename(rfn, 0, 0);
  3286. StringBuffer physicalName, physicalDir, physicalBase;
  3287. rfn.getLocalPath(physicalName);
  3288. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  3289. rdn.setLocalPath(physicalDir.str());
  3290. if (remoteNodes.length())
  3291. {
  3292. ForEachItemIn(idx, remoteNodes)
  3293. {
  3294. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  3295. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  3296. Owned<IFile> targetdir = createIFile(rdn);
  3297. Owned<IFile> target = createIFile(rfn);
  3298. targetdir->createDirectory();
  3299. copyFile(target, localFile);
  3300. }
  3301. }
  3302. }
  3303. void publish(const IRoxiePublishCallback *activity)
  3304. {
  3305. if (!dFile->isExternal())
  3306. {
  3307. Owned<IFileDescriptor> desc = createFileDescriptor();
  3308. desc->setNumParts(1);
  3309. RemoteFilename rfn;
  3310. dFile->getPartFilename(rfn, 0, 0);
  3311. StringBuffer physicalName, physicalDir, physicalBase;
  3312. rfn.getLocalPath(physicalName);
  3313. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  3314. desc->setDefaultDir(physicalDir.str());
  3315. desc->setPartMask(physicalBase.str());
  3316. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  3317. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  3318. offset_t fileSize = localFile->size();
  3319. fileProps.setPropInt64("@size", fileSize);
  3320. partProps.setPropInt64("@size", fileSize);
  3321. CDateTime createTime, modifiedTime, accessedTime;
  3322. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  3323. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  3324. unsigned hour, min, sec, nanosec;
  3325. modifiedTime.getTime(hour, min, sec, nanosec);
  3326. modifiedTime.setTime(hour, min, sec, 0);
  3327. StringBuffer timestr;
  3328. modifiedTime.getString(timestr);
  3329. if(timestr.length())
  3330. partProps.setProp("@modified", timestr.str());
  3331. ClusterPartDiskMapSpec partmap;
  3332. if (localCluster)
  3333. {
  3334. desc->addCluster(localCluster, partmap);
  3335. desc->setClusterGroupName(0, localClusterName.get());
  3336. }
  3337. ForEachItemIn(idx, remoteNodes)
  3338. desc->addCluster(&remoteNodes.item(idx), partmap);
  3339. if (activity)
  3340. activity->setFileProperties(desc);
  3341. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  3342. publishFile->setAccessedTime(modifiedTime);
  3343. IUserDescriptor * userdesc = NULL;
  3344. if (activity)
  3345. userdesc = activity->queryUserDescriptor();
  3346. else
  3347. {
  3348. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  3349. if (daliHelper)
  3350. userdesc = daliHelper->queryUserDescriptor();//predeployed query mode
  3351. }
  3352. publishFile->attach(dFile->queryLogicalName(), userdesc);
  3353. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  3354. }
  3355. }
  3356. void addCluster(char const * cluster)
  3357. {
  3358. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  3359. if (!group)
  3360. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  3361. cluster, dFile->queryLogicalName());
  3362. #ifdef _CONTAINERIZED // NB: really is-off-nodestorage
  3363. localCluster.setown(group.getClear());
  3364. localClusterName.set(cluster);
  3365. #else
  3366. rank_t r = group->rank();
  3367. if (RANK_NULL != r)
  3368. {
  3369. if (localCluster)
  3370. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  3371. cluster, dFile->queryLogicalName());
  3372. SocketEndpointArray eps;
  3373. SocketEndpoint me(0, myNode.getIpAddress());
  3374. eps.append(me);
  3375. localCluster.setown(createIGroup(eps));
  3376. StringBuffer clusterName(cluster);
  3377. if (group->ordinality()>1)
  3378. clusterName.appendf("[%u]", r+1);
  3379. localClusterName.set(clusterName);
  3380. }
  3381. else
  3382. {
  3383. ForEachItemIn(idx, remoteNodes)
  3384. {
  3385. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  3386. if (group->isMember(other))
  3387. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  3388. cluster, dFile->queryLogicalName());
  3389. }
  3390. remoteNodes.append(*group.getClear());
  3391. }
  3392. #endif
  3393. allClusters.append(cluster);
  3394. }
  3395. };
  3396. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  3397. {
  3398. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  3399. }
  3400. //================================================================================================================
  3401. #ifdef _USE_CPPUNIT
  3402. #include "unittests.hpp"
  3403. class CcdFileTest : public CppUnit::TestFixture
  3404. {
  3405. CPPUNIT_TEST_SUITE(CcdFileTest);
  3406. CPPUNIT_TEST(testCopy);
  3407. CPPUNIT_TEST_SUITE_END();
  3408. protected:
  3409. class DummyPartDescriptor : public CInterfaceOf<IPartDescriptor>
  3410. {
  3411. virtual unsigned queryPartIndex() { UNIMPLEMENTED; }
  3412. virtual unsigned numCopies() { UNIMPLEMENTED; }
  3413. virtual INode *getNode(unsigned copy=0) { UNIMPLEMENTED; }
  3414. virtual INode *queryNode(unsigned copy=0) { UNIMPLEMENTED; }
  3415. virtual IPropertyTree &queryProperties() { UNIMPLEMENTED; }
  3416. virtual IPropertyTree *getProperties() { UNIMPLEMENTED; }
  3417. virtual RemoteFilename &getFilename(unsigned copy, RemoteFilename &rfn) { UNIMPLEMENTED; }
  3418. virtual StringBuffer &getTail(StringBuffer &name) { UNIMPLEMENTED; }
  3419. virtual StringBuffer &getDirectory(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  3420. virtual StringBuffer &getPath(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  3421. virtual void serialize(MemoryBuffer &tgt) { UNIMPLEMENTED; }
  3422. virtual bool isMulti() { UNIMPLEMENTED; }
  3423. virtual RemoteMultiFilename &getMultiFilename(unsigned copy, RemoteMultiFilename &rfn) { UNIMPLEMENTED; }
  3424. virtual bool getCrc(unsigned &crc) { UNIMPLEMENTED; }
  3425. virtual IFileDescriptor &queryOwner() { UNIMPLEMENTED; }
  3426. virtual const char *queryOverrideName() { UNIMPLEMENTED; }
  3427. virtual unsigned copyClusterNum(unsigned copy,unsigned *replicate=NULL) { UNIMPLEMENTED; }
  3428. virtual IReplicatedFile *getReplicatedFile() { UNIMPLEMENTED; }
  3429. };
  3430. void testCopy()
  3431. {
  3432. selfTestMode = true;
  3433. remove("test.local");
  3434. remove("test.remote");
  3435. remove("test.buddy");
  3436. StringArray remotes;
  3437. DummyPartDescriptor pdesc;
  3438. CDateTime dummy;
  3439. remotes.append("test.remote");
  3440. int f = open("test.remote", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  3441. CPPUNIT_ASSERT(f >= 0);
  3442. int val = 1;
  3443. int wrote = write(f, &val, sizeof(int));
  3444. CPPUNIT_ASSERT(wrote==sizeof(int));
  3445. close(f);
  3446. CRoxieFileCache &cache = static_cast<CRoxieFileCache &>(queryFileCache());
  3447. Owned<ILazyFileIO> io = cache.openFile("test.local", 0, 0, "test.local", NULL, remotes, sizeof(int), dummy);
  3448. CPPUNIT_ASSERT(io != NULL);
  3449. // Reading it should read 1
  3450. val = 0;
  3451. ssize_t bytesRead = io->read(0, sizeof(int), &val);
  3452. CPPUNIT_ASSERT(bytesRead==4);
  3453. CPPUNIT_ASSERT(val==1);
  3454. // Now create the buddy
  3455. f = open("test.buddy", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  3456. val = 2;
  3457. ssize_t numwritten = write(f, &val, sizeof(int));
  3458. CPPUNIT_ASSERT(numwritten == sizeof(int));
  3459. close(f);
  3460. // Reading it should still read 1...
  3461. val = 0;
  3462. io->read(0, sizeof(int), &val);
  3463. CPPUNIT_ASSERT(val==1);
  3464. // Now copy it - should copy the buddy
  3465. cache.doCopy(io, false);
  3466. // Reading it should read 2...
  3467. val = 0;
  3468. io->read(0, sizeof(int), &val);
  3469. CPPUNIT_ASSERT(val==2);
  3470. // And the data in the file should be 2
  3471. f = open("test.local", _O_RDONLY);
  3472. val = 0;
  3473. ssize_t numread = read(f, &val, sizeof(int));
  3474. CPPUNIT_ASSERT(numread == sizeof(int));
  3475. close(f);
  3476. CPPUNIT_ASSERT(val==2);
  3477. io.clear();
  3478. remove("test.local");
  3479. remove("test.remote");
  3480. remove("test.buddy");
  3481. }
  3482. };
  3483. CPPUNIT_TEST_SUITE_REGISTRATION( CcdFileTest );
  3484. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CcdFileTest, "CcdFileTest" );
  3485. #endif