ccdfile.cpp 108 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jmd5.hpp"
  16. #include "jfile.hpp"
  17. #include "jdebug.hpp"
  18. #include "jhtree.hpp"
  19. #include "jisem.hpp"
  20. #include "jqueue.tpp"
  21. #include "dautils.hpp"
  22. #include "keydiff.hpp"
  23. #include "ccd.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdquery.hpp"
  26. #include "ccdstate.hpp"
  27. #include "ccdsnmp.hpp"
  28. #include "rmtfile.hpp"
  29. #include "ccdqueue.ipp"
  30. #ifdef __linux__
  31. #include <sys/mman.h>
  32. #endif
  33. #if defined (__linux__)
  34. #include <sys/syscall.h>
  35. #include "ioprio.h"
  36. #endif
  37. #include "thorcommon.hpp"
  38. #include "eclhelper_dyn.hpp"
  39. #include "rtldynfield.hpp"
  40. atomic_t numFilesOpen[2];
  41. #define MAX_READ_RETRIES 2
  42. #ifdef _DEBUG
  43. //#define FAIL_20_READ
  44. //#define FAIL_20_OPEN
  45. #endif
  46. // We point unopened files at a FailingIO object, which avoids having to test for NULL on every access
  47. class DECL_EXCEPTION NotYetOpenException : implements IException, public CInterface
  48. {
  49. public:
  50. IMPLEMENT_IINTERFACE;
  51. virtual int errorCode() const { return 0; }
  52. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("not yet open"); }
  53. virtual MessageAudience errorAudience() const { return MSGAUD_programmer; }
  54. };
  55. class CFailingFileIO : implements IFileIO, public CInterface
  56. {
  57. #define THROWNOTOPEN throw new NotYetOpenException()
  58. public:
  59. IMPLEMENT_IINTERFACE;
  60. virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
  61. virtual offset_t size() { THROWNOTOPEN; }
  62. virtual void flush() { THROWNOTOPEN; }
  63. virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
  64. virtual void setSize(offset_t size) { UNIMPLEMENTED; }
  65. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
  66. virtual void close() { }
  67. virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
  68. } failure;
  69. class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public CInterface
  70. {
  71. protected:
  72. IArrayOf<IFile> sources;
  73. Owned<IFile> logical;
  74. unsigned currentIdx;
  75. Owned<IFileIO> current;
  76. Owned<IMemoryMappedFile> mmapped;
  77. mutable CriticalSection crit;
  78. bool remote;
  79. offset_t fileSize;
  80. CDateTime fileDate;
  81. unsigned lastAccess;
  82. bool copying;
  83. bool isCompressed;
  84. const IRoxieFileCache *cached;
  85. CRuntimeStatisticCollection fileStats;
  86. #ifdef FAIL_20_READ
  87. unsigned readCount;
  88. #endif
  89. public:
  90. IMPLEMENT_IINTERFACE;
  91. CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed)
  92. : logical(_logical), fileSize(size), isCompressed(_isCompressed), fileStats(diskLocalStatistics)
  93. {
  94. fileDate.set(_date);
  95. currentIdx = 0;
  96. current.set(&failure);
  97. remote = false;
  98. #ifdef FAIL_20_READ
  99. readCount = 0;
  100. #endif
  101. lastAccess = msTick();
  102. copying = false;
  103. cached = NULL;
  104. }
  105. ~CRoxieLazyFileIO()
  106. {
  107. setFailure(); // ensures the open file count properly maintained
  108. }
  109. virtual void beforeDispose()
  110. {
  111. if (cached)
  112. cached->removeCache(this);
  113. }
  114. void setCache(const IRoxieFileCache *cache)
  115. {
  116. assertex(!cached);
  117. cached = cache;
  118. }
  119. void removeCache(const IRoxieFileCache *cache)
  120. {
  121. assertex(cached==cache);
  122. cached = NULL;
  123. }
  124. inline void setRemote(bool _remote) { remote = _remote; }
  125. virtual void setCopying(bool _copying)
  126. {
  127. CriticalBlock b(crit);
  128. if (remote && currentIdx)
  129. {
  130. // The current location is not our preferred location. Recheck whether we can now access our preferred location
  131. setFailure();
  132. currentIdx = 0;
  133. _checkOpen();
  134. }
  135. copying = _copying;
  136. }
  137. virtual bool isCopying() const
  138. {
  139. CriticalBlock b(crit);
  140. return copying;
  141. }
  142. virtual bool isOpen() const
  143. {
  144. CriticalBlock b(crit);
  145. return current.get() != &failure;
  146. }
  147. virtual unsigned getLastAccessed() const
  148. {
  149. CriticalBlock b(crit);
  150. return lastAccess;
  151. }
  152. virtual void close()
  153. {
  154. CriticalBlock b(crit);
  155. setFailure();
  156. }
  157. virtual bool isRemote()
  158. {
  159. CriticalBlock b(crit);
  160. return remote;
  161. }
  162. void setFailure()
  163. {
  164. try
  165. {
  166. if (current.get()==&failure)
  167. return;
  168. atomic_dec(&numFilesOpen[remote]);
  169. mergeStats(fileStats, current);
  170. current.set(&failure);
  171. }
  172. catch (IException *E)
  173. {
  174. if (traceLevel > 5)
  175. {
  176. StringBuffer s;
  177. DBGLOG("setFailure ignoring exception %s from IFileIO close", E->errorMessage(s).str());
  178. }
  179. E->Release();
  180. }
  181. }
  182. void checkOpen()
  183. {
  184. CriticalBlock b(crit);
  185. _checkOpen();
  186. }
  187. IFileIO *getCheckOpen(unsigned &activeIdx)
  188. {
  189. CriticalBlock b(crit);
  190. _checkOpen();
  191. activeIdx = currentIdx;
  192. return LINK(current);
  193. }
  194. void _checkOpen()
  195. {
  196. if (current.get() == &failure)
  197. {
  198. StringBuffer filesTried;
  199. unsigned tries = 0;
  200. bool firstTime = true;
  201. RoxieFileStatus fileStatus = FileNotFound;
  202. for (;;)
  203. {
  204. if (currentIdx >= sources.length())
  205. currentIdx = 0;
  206. if (tries==sources.length())
  207. {
  208. if (firstTime) // if first time - reset and try again
  209. {
  210. firstTime = false;
  211. tries = 0;
  212. }
  213. else
  214. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Failed to open file %s at any of the following remote locations %s", logical->queryFilename(), filesTried.str()); // operations doesn't want a trap
  215. }
  216. const char *sourceName = sources.item(currentIdx).queryFilename();
  217. if (traceLevel > 10)
  218. DBGLOG("Trying to open %s", sourceName);
  219. try
  220. {
  221. #ifdef FAIL_20_OPEN
  222. openCount++;
  223. if ((openCount % 5) == 0)
  224. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
  225. #endif
  226. IFile *f = &sources.item(currentIdx);
  227. fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, isCompressed, false);
  228. if (fileStatus == FileIsValid)
  229. {
  230. if (isCompressed)
  231. current.setown(createCompressedFileReader(f));
  232. else
  233. current.setown(f->open(IFOread));
  234. if (current)
  235. {
  236. if (traceLevel > 5)
  237. DBGLOG("Opening %s", sourceName);
  238. disconnectRemoteIoOnExit(current);
  239. break;
  240. }
  241. // throwUnexpected(); - try another location if this one has the wrong version of the file
  242. }
  243. disconnectRemoteFile(f);
  244. }
  245. catch (IException *E)
  246. {
  247. E->Release();
  248. }
  249. currentIdx++;
  250. tries++;
  251. if (!firstTime) // log error on last attempt for each file name - it will have the "best" error condition
  252. {
  253. filesTried.appendf(" %s", sourceName); // only need to build this list once
  254. switch (fileStatus)
  255. {
  256. case FileNotFound:
  257. filesTried.append(": FileNotFound");
  258. break;
  259. case FileSizeMismatch:
  260. filesTried.append(": FileSizeMismatch");
  261. break;
  262. case FileDateMismatch:
  263. filesTried.append(": FileDateMismatch");
  264. break;
  265. }
  266. }
  267. }
  268. lastAccess = msTick();
  269. atomic_inc(&numFilesOpen[remote]);
  270. if ((unsigned) atomic_read(&numFilesOpen[remote]) > maxFilesOpen[remote])
  271. queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
  272. }
  273. }
  274. virtual void addSource(IFile *newSource)
  275. {
  276. if (newSource)
  277. {
  278. if (traceLevel > 10)
  279. DBGLOG("Adding information for location %s for %s", newSource->queryFilename(), logical->queryFilename());
  280. CriticalBlock b(crit);
  281. sources.append(*newSource);
  282. }
  283. }
  284. virtual size32_t read(offset_t pos, size32_t len, void * data)
  285. {
  286. unsigned activeIdx;
  287. Owned<IFileIO> active = getCheckOpen(activeIdx);
  288. unsigned tries = 0;
  289. for (;;)
  290. {
  291. try
  292. {
  293. size32_t ret = active->read(pos, len, data);
  294. lastAccess = msTick();
  295. return ret;
  296. }
  297. catch (NotYetOpenException *E)
  298. {
  299. E->Release();
  300. }
  301. catch (IException *E)
  302. {
  303. EXCLOG(MCoperatorError, E, "Read error");
  304. E->Release();
  305. DBGLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(activeIdx).queryFilename());
  306. {
  307. CriticalBlock b(crit);
  308. if (currentIdx == activeIdx)
  309. {
  310. currentIdx = activeIdx+1;
  311. setFailure();
  312. }
  313. }
  314. }
  315. active.setown(getCheckOpen(activeIdx));
  316. tries++;
  317. if (tries == MAX_READ_RETRIES)
  318. throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(activeIdx).queryFilename(), tries);
  319. }
  320. }
  321. virtual void flush()
  322. {
  323. Linked<IFileIO> active;
  324. {
  325. CriticalBlock b(crit);
  326. active.set(current);
  327. }
  328. if (active.get() != &failure)
  329. active->flush();
  330. }
  331. virtual offset_t size()
  332. {
  333. unsigned activeIdx;
  334. Owned<IFileIO> active = getCheckOpen(activeIdx);
  335. lastAccess = msTick();
  336. return active->size();
  337. }
  338. virtual unsigned __int64 getStatistic(StatisticKind kind)
  339. {
  340. unsigned __int64 v = fileStats.getStatisticValue(kind);
  341. CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
  342. return v + current->getStatistic(kind);
  343. }
  344. virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
  345. virtual void setSize(offset_t size) { throwUnexpected(); }
  346. virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
  347. virtual const char *queryFilename() { return logical->queryFilename(); }
  348. virtual bool isAlive() const { return CInterface::isAlive(); }
  349. virtual IMemoryMappedFile *getMappedFile() override
  350. {
  351. CriticalBlock b(crit);
  352. if (mmapped)
  353. return mmapped.getLink();
  354. if (!remote)
  355. {
  356. mmapped.setown(logical->openMemoryMapped());
  357. return mmapped.getLink();
  358. }
  359. return nullptr;
  360. }
  361. virtual IFileIO *getFileIO() override
  362. {
  363. return LINK(this);
  364. }
  365. virtual bool createHardFileLink()
  366. {
  367. unsigned tries = 0;
  368. for (;;)
  369. {
  370. StringBuffer filesTried;
  371. if (currentIdx >= sources.length())
  372. currentIdx = 0;
  373. if (tries==sources.length())
  374. return false;
  375. const char *sourceName = sources.item(currentIdx).queryFilename();
  376. filesTried.appendf(" %s", sourceName);
  377. try
  378. {
  379. if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, isCompressed) == FileIsValid)
  380. {
  381. StringBuffer source_drive;
  382. splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
  383. StringBuffer query_drive;
  384. splitFilename(logical->queryFilename(), &query_drive, NULL, NULL, NULL);
  385. // only try to create link if on the same drive
  386. if ( (stricmp(query_drive.str(), source_drive.str()) == 0))
  387. {
  388. try
  389. {
  390. DBGLOG("Trying to create Hard Link for %s", sourceName);
  391. createHardLink(logical->queryFilename(), sourceName);
  392. current.setown(sources.item(currentIdx).open(IFOread));
  393. return true;
  394. }
  395. catch(IException *E)
  396. {
  397. StringBuffer err;
  398. DBGLOG("HARD LINK ERROR %s", E->errorMessage(err).str());
  399. E->Release();
  400. }
  401. }
  402. }
  403. }
  404. catch (IException *E)
  405. {
  406. E->Release();
  407. }
  408. currentIdx++;
  409. tries++;
  410. }
  411. DBGLOG("Could not create any hard links for %s", logical->queryFilename());
  412. return false; // if we get here - no hardlink
  413. }
  414. void copyComplete()
  415. {
  416. {
  417. CriticalBlock b(crit);
  418. setFailure(); // lazyOpen will then reopen it...
  419. currentIdx = 0;
  420. remote = false;
  421. copying = false;
  422. sources.kill();
  423. sources.add(*logical.getLink(), 0);
  424. if (!lazyOpen)
  425. _checkOpen();
  426. }
  427. }
  428. virtual IFile *querySource()
  429. {
  430. CriticalBlock b(crit);
  431. _checkOpen();
  432. return &sources.item(currentIdx);
  433. };
  434. virtual IFile *queryTarget() { return logical; }
  435. virtual offset_t getSize() { return fileSize; }
  436. virtual CDateTime *queryDateTime() { return &fileDate; }
  437. static int compareAccess(IInterface * const *L, IInterface * const *R)
  438. {
  439. ILazyFileIO *LL = (ILazyFileIO *) *L;
  440. ILazyFileIO *RR = (ILazyFileIO *) *R;
  441. return LL->getLastAccessed() - RR->getLastAccessed();
  442. }
  443. };
  444. //----------------------------------------------------------------------------------------------
  445. static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
  446. {
  447. if (!remoteFDesc)
  448. return NULL;
  449. IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
  450. if (!remotePDesc)
  451. return NULL;
  452. unsigned int crc, remoteCrc;
  453. if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
  454. return remotePDesc;
  455. if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
  456. return remotePDesc;
  457. return NULL;
  458. }
  459. static int getClusterPriority(const char *clusterName)
  460. {
  461. assertex(preferredClusters);
  462. int *priority = preferredClusters->getValue(clusterName);
  463. return priority ? *priority : 100;
  464. }
  465. static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
  466. {
  467. IFileDescriptor &fdesc = pdesc->queryOwner();
  468. unsigned numCopies = pdesc->numCopies();
  469. unsigned lastClusterNo = (unsigned) -1;
  470. unsigned numThisCluster = 0;
  471. unsigned initialSize = locations.length();
  472. int priority = 0;
  473. IntArray priorities;
  474. for (unsigned copy = 0; copy < numCopies; copy++)
  475. {
  476. unsigned clusterNo = pdesc->copyClusterNum(copy);
  477. StringBuffer clusterName;
  478. fdesc.getClusterGroupName(clusterNo, clusterName);
  479. if (fromCluster && *fromCluster)
  480. {
  481. bool matches = strieq(clusterName.str(), fromCluster);
  482. if (matches!=includeFromCluster)
  483. continue;
  484. }
  485. RemoteFilename r;
  486. pdesc->getFilename(copy,r);
  487. StringBuffer path;
  488. r.getRemotePath(path);
  489. if (localFileName && r.isLocal())
  490. {
  491. StringBuffer l;
  492. r.getLocalPath(l);
  493. if (streq(l, localFileName))
  494. continue; // don't add ourself
  495. }
  496. if (clusterNo == lastClusterNo)
  497. {
  498. numThisCluster++;
  499. if (numThisCluster > 2) // Don't add more than 2 from one cluster
  500. continue;
  501. }
  502. else
  503. {
  504. numThisCluster = 1;
  505. lastClusterNo = clusterNo;
  506. if (preferredClusters)
  507. {
  508. priority = getClusterPriority(clusterName);
  509. }
  510. else
  511. priority = copy;
  512. }
  513. if (priority >= 0)
  514. {
  515. ForEachItemIn(idx, priorities)
  516. {
  517. if (priorities.item(idx) < priority)
  518. break;
  519. }
  520. priorities.add(priority, idx);
  521. locations.add(path.str(), idx+initialSize);
  522. }
  523. }
  524. }
  525. //----------------------------------------------------------------------------------------------
  526. typedef StringArray *StringArrayPtr;
  527. class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
  528. {
  529. friend class CcdFileTest;
  530. mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
  531. InterruptableSemaphore toCopy;
  532. InterruptableSemaphore toClose;
  533. mutable CopyMapStringToMyClass<ILazyFileIO> files;
  534. mutable CriticalSection crit;
  535. CriticalSection cpcrit;
  536. bool started;
  537. bool aborting;
  538. bool closing;
  539. bool testMode;
  540. bool closePending[2];
  541. StringAttrMapping fileErrorList;
  542. Semaphore bctStarted;
  543. Semaphore hctStarted;
  544. RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true)
  545. {
  546. // Ensure that SockFile does not keep these sockets open (or we will run out)
  547. class AutoDisconnector
  548. {
  549. public:
  550. AutoDisconnector(IFile *_f, bool isEnabled) { f = isEnabled ? _f : NULL; };
  551. ~AutoDisconnector() { if (f) disconnectRemoteFile(f); }
  552. private:
  553. IFile *f;
  554. } autoDisconnector(f, autoDisconnect);
  555. offset_t fileSize = f->size();
  556. if (fileSize != (offset_t) -1)
  557. {
  558. // only check size if specified
  559. if ( (size != -1) && !isCompressed && fileSize != size) // MORE - should be able to do better on compressed you'da thunk
  560. return FileSizeMismatch;
  561. CDateTime mt;
  562. return (modified.isNull() || (f->getTime(NULL, &mt, NULL) && mt.equals(modified, false))) ? FileIsValid : FileDateMismatch;
  563. }
  564. else
  565. return FileNotFound;
  566. }
  567. ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
  568. IPartDescriptor *pdesc,
  569. const StringArray &remoteLocationInfo,
  570. offset_t size, const CDateTime &modified)
  571. {
  572. Owned<IFile> local = createIFile(localLocation);
  573. bool isCompressed = testMode ? false : pdesc->queryOwner().isCompressed();
  574. Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed);
  575. RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, isCompressed);
  576. if (fileStatus == FileIsValid)
  577. {
  578. ret->addSource(local.getLink());
  579. ret->setRemote(false);
  580. }
  581. else if (local->exists() && !ignoreOrphans) // Implies local dali and local file out of sync
  582. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  583. else
  584. {
  585. bool addedOne = false;
  586. // put the peerRoxieLocations next in the list
  587. StringArray localLocations;
  588. if (testMode)
  589. localLocations.append("test.buddy");
  590. else
  591. appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true); // Adds all locations on the same cluster
  592. ForEachItemIn(roxie_idx, localLocations)
  593. {
  594. try
  595. {
  596. const char *remoteName = localLocations.item(roxie_idx);
  597. Owned<IFile> remote = createIFile(remoteName);
  598. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  599. if (status==FileIsValid)
  600. {
  601. if (miscDebugTraceLevel > 5)
  602. DBGLOG("adding peer location %s", remoteName);
  603. ret->addSource(remote.getClear());
  604. addedOne = true;
  605. }
  606. else if (status==FileNotFound)
  607. {
  608. // Even though it's not on the buddy (yet), add it to the locations since it may well be there
  609. // by the time we come to copy (and if it is, we want to copy from there)
  610. if (miscDebugTraceLevel > 5)
  611. DBGLOG("adding missing peer location %s", remoteName);
  612. ret->addSource(remote.getClear());
  613. // Don't set addedOne - we need to go to remote too
  614. }
  615. else if (miscDebugTraceLevel > 10)
  616. DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
  617. }
  618. catch (IException *E)
  619. {
  620. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  621. E->Release();
  622. }
  623. }
  624. if (!addedOne && (copyResources || useRemoteResources || testMode)) // If no peer locations available, go to remote
  625. {
  626. ForEachItemIn(idx, remoteLocationInfo)
  627. {
  628. try
  629. {
  630. const char *remoteName = remoteLocationInfo.item(idx);
  631. Owned<IFile> remote = createIFile(remoteName);
  632. if (traceLevel > 5)
  633. DBGLOG("checking remote location %s", remoteName);
  634. RoxieFileStatus status = fileUpToDate(remote, size, modified, isCompressed);
  635. if (status==FileIsValid)
  636. {
  637. if (miscDebugTraceLevel > 5)
  638. DBGLOG("adding remote location %s", remoteName);
  639. ret->addSource(remote.getClear());
  640. addedOne = true;
  641. }
  642. else if (miscDebugTraceLevel > 10)
  643. DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
  644. }
  645. catch (IException *E)
  646. {
  647. EXCLOG(MCoperatorError, E, "While creating remote file reference");
  648. E->Release();
  649. }
  650. }
  651. }
  652. if (!addedOne)
  653. {
  654. if (local->exists()) // Implies local dali and local file out of sync
  655. throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
  656. else
  657. {
  658. if (traceLevel >= 2)
  659. {
  660. DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
  661. ForEachItemIn(local_idx, localLocations)
  662. {
  663. DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
  664. }
  665. DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
  666. ForEachItemIn(remote_idx, remoteLocationInfo)
  667. {
  668. DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
  669. }
  670. }
  671. throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
  672. }
  673. }
  674. ret->setRemote(true);
  675. }
  676. ret->setCache(this);
  677. files.setValue(local->queryFilename(), (ILazyFileIO *)ret);
  678. return ret.getClear();
  679. }
  680. void deleteTempFiles(const char *targetFilename)
  681. {
  682. try
  683. {
  684. StringBuffer destPath;
  685. StringBuffer prevTempFile;
  686. splitFilename(targetFilename, &destPath, &destPath, &prevTempFile, &prevTempFile);
  687. prevTempFile.append("*.$$$");
  688. Owned<IDirectoryIterator> iter = createDirectoryIterator(destPath, prevTempFile, false, false);
  689. ForEach(*iter)
  690. {
  691. OwnedIFile thisFile = createIFile(iter->query().queryFilename());
  692. if (thisFile->isFile() == foundYes)
  693. thisFile->remove();
  694. }
  695. }
  696. catch(IException *E)
  697. {
  698. StringBuffer err;
  699. DBGLOG("Could not remove tmp file %s", E->errorMessage(err).str());
  700. E->Release();
  701. }
  702. catch(...)
  703. {
  704. }
  705. }
  706. bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
  707. {
  708. bool fileCopied = false;
  709. IFile *sourceFile;
  710. try
  711. {
  712. f->setCopying(true);
  713. sourceFile = f->querySource();
  714. }
  715. catch (IException *E)
  716. {
  717. f->setCopying(false);
  718. EXCLOG(MCoperatorError, E, "While trying to start copying file");
  719. throw;
  720. }
  721. unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
  722. deleteTempFiles(targetFilename);
  723. offset_t fileSize = sourceFile->size();
  724. if ( (fileSize + minFreeDiskSpace) > freeDiskSpace)
  725. {
  726. StringBuffer err;
  727. err.appendf("Insufficient disk space. File %s needs %" I64F "d bytes, but only %" I64F "d remains, and %" I64F "d is needed as a reserve", targetFilename, sourceFile->size(), freeDiskSpace, minFreeDiskSpace);
  728. IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
  729. EXCLOG(MCoperatorError, E);
  730. E->Release();
  731. f->setCopying(false);
  732. }
  733. else
  734. {
  735. IpSubNet subnet; // preferred set but not required
  736. IpAddress fromip; // returned
  737. Owned<IFile> destFile = createIFile(tempFile);
  738. bool hardLinkCreated = false;
  739. unsigned start = msTick();
  740. try
  741. {
  742. if (useHardLink)
  743. hardLinkCreated = f->createHardFileLink();
  744. if (hardLinkCreated)
  745. msg = "Hard Link";
  746. else
  747. {
  748. DBGLOG("%sing %s to %s", msg, sourceFile->queryFilename(), targetFilename);
  749. if (traceLevel > 5)
  750. {
  751. StringBuffer str;
  752. str.appendf("doCopyFile %s", sourceFile->queryFilename());
  753. TimeSection timing(str.str());
  754. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  755. }
  756. else
  757. {
  758. sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
  759. }
  760. }
  761. f->setCopying(false);
  762. fileCopied = true;
  763. }
  764. catch(IException *E)
  765. {
  766. f->setCopying(false);
  767. EXCLOG(E, "Copy exception - remove templocal");
  768. destFile->remove();
  769. deleteTempFiles(targetFilename);
  770. throw;
  771. }
  772. catch(...)
  773. {
  774. f->setCopying(false);
  775. DBGLOG("%s exception - remove templocal", msg);
  776. destFile->remove();
  777. deleteTempFiles(targetFilename);
  778. throw;
  779. }
  780. if (!hardLinkCreated) // for hardlinks no rename needed
  781. {
  782. try
  783. {
  784. destFile->rename(targetFilename);
  785. }
  786. catch(IException *)
  787. {
  788. f->setCopying(false);
  789. deleteTempFiles(targetFilename);
  790. throw;
  791. }
  792. unsigned elapsed = msTick() - start;
  793. double sizeMB = ((double) fileSize) / (1024*1024);
  794. double MBperSec = elapsed ? (sizeMB / elapsed) * 1000 : 0;
  795. DBGLOG("%s to %s complete in %d ms (%.1f MB/sec)", msg, targetFilename, elapsed, MBperSec);
  796. }
  797. f->copyComplete();
  798. }
  799. deleteTempFiles(targetFilename);
  800. return fileCopied;
  801. }
  802. bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage, CFflags copyFlags=CFnone)
  803. {
  804. if (!f->isRemote())
  805. f->copyComplete();
  806. else
  807. {
  808. if (displayFirstFileMessage)
  809. DBGLOG("Received files to copy");
  810. const char *targetFilename = f->queryTarget()->queryFilename();
  811. StringBuffer tempFile(targetFilename);
  812. StringBuffer destPath;
  813. splitFilename(tempFile.str(), &destPath, &destPath, NULL, NULL);
  814. if (destPath.length())
  815. recursiveCreateDirectory(destPath.str());
  816. else
  817. destPath.append('.');
  818. if (!checkDirExists(destPath.str())) {
  819. ERRLOG("Dest directory %s does not exist", destPath.str());
  820. return false;
  821. }
  822. tempFile.append(".$$$");
  823. const char *msg = background ? "Background copy" : "Copy";
  824. return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
  825. }
  826. return false; // if we get here there was no file copied
  827. }
  828. public:
  829. IMPLEMENT_IINTERFACE;
  830. CRoxieFileCache(bool _testMode = false) : bct(*this), hct(*this), testMode(_testMode)
  831. {
  832. aborting = false;
  833. closing = false;
  834. closePending[false] = false;
  835. closePending[true] = false;
  836. started = false;
  837. }
  838. ~CRoxieFileCache()
  839. {
  840. // NOTE - I assume that by the time I am being destroyed, system is single threaded.
  841. // Removing any possible race between destroying of the cache and destroying of the files in it would be complex otherwise
  842. HashIterator h(files);
  843. ForEach(h)
  844. {
  845. ILazyFileIO *f = files.mapToValue(&h.query());
  846. f->removeCache(this);
  847. }
  848. }
  849. virtual void start()
  850. {
  851. if (!started)
  852. {
  853. bct.start();
  854. hct.start();
  855. bctStarted.wait();
  856. hctStarted.wait();
  857. started = true;
  858. }
  859. }
  860. class BackgroundCopyThread : public Thread
  861. {
  862. CRoxieFileCache &owner;
  863. public:
  864. BackgroundCopyThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheBackgroundCopyThread") {}
  865. virtual int run()
  866. {
  867. return owner.runBackgroundCopy();
  868. }
  869. } bct;
  870. class HandleCloserThread : public Thread
  871. {
  872. CRoxieFileCache &owner;
  873. public:
  874. HandleCloserThread(CRoxieFileCache &_owner) : owner(_owner), Thread("CRoxieFileCacheHandleCloserThread") {}
  875. virtual int run()
  876. {
  877. return owner.runHandleCloser();
  878. }
  879. } hct;
  880. int runBackgroundCopy()
  881. {
  882. bctStarted.signal();
  883. #if defined(__linux__) && defined(SYS_ioprio_set)
  884. if (backgroundCopyClass)
  885. syscall(SYS_ioprio_set, IOPRIO_WHO_PROCESS, 0, IOPRIO_PRIO_VALUE(backgroundCopyClass, backgroundCopyPrio));
  886. #endif
  887. if (traceLevel)
  888. {
  889. #if defined(__linux__) && defined(SYS_ioprio_get)
  890. int ioprio = syscall(SYS_ioprio_get, IOPRIO_WHO_PROCESS, 0);
  891. int ioclass = IOPRIO_PRIO_CLASS(ioprio);
  892. ioprio = IOPRIO_PRIO_DATA(ioprio);
  893. DBGLOG("Background copy thread %p starting, io priority class %d, priority %d", this, ioclass, ioprio);
  894. #else
  895. DBGLOG("Background copy thread %p starting", this);
  896. #endif
  897. }
  898. try
  899. {
  900. int fileCopiedCount = 0;
  901. bool fileCopied = false;
  902. for (;;)
  903. {
  904. fileCopied = false;
  905. Linked<ILazyFileIO> next;
  906. toCopy.wait();
  907. {
  908. CriticalBlock b(crit);
  909. if (closing)
  910. break;
  911. if (todo.ordinality())
  912. {
  913. ILazyFileIO *popped = &todo.popGet();
  914. if (popped->isAlive())
  915. {
  916. next.set(popped);
  917. }
  918. numFilesToProcess--; // must decrement counter for SNMP accuracy
  919. }
  920. }
  921. if (next)
  922. {
  923. try
  924. {
  925. fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false, CFflush_rdwr);
  926. CriticalBlock b(crit);
  927. if (fileCopied)
  928. fileCopiedCount++;
  929. }
  930. catch (IException *E)
  931. {
  932. if (aborting)
  933. throw;
  934. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  935. E->Release();
  936. }
  937. catch (...)
  938. {
  939. EXCLOG(MCoperatorError, "Unknown exception in Roxie background copy");
  940. }
  941. }
  942. CriticalBlock b(crit);
  943. if ( (todo.ordinality()== 0) && (fileCopiedCount)) // finished last copy
  944. {
  945. DBGLOG("No more data files to copy");
  946. fileCopiedCount = 0;
  947. }
  948. }
  949. }
  950. catch (IException *E)
  951. {
  952. if (!aborting)
  953. EXCLOG(MCoperatorError, E, "Roxie background copy: ");
  954. E->Release();
  955. }
  956. catch (...)
  957. {
  958. DBGLOG("Unknown exception in background copy thread");
  959. }
  960. if (traceLevel)
  961. DBGLOG("Background copy thread %p exiting", this);
  962. return 0;
  963. }
  964. int runHandleCloser()
  965. {
  966. hctStarted.signal();
  967. if (traceLevel)
  968. DBGLOG("HandleCloser thread %p starting", this);
  969. try
  970. {
  971. for (;;)
  972. {
  973. toClose.wait(10 * 60 * 1000); // check expired file handles every 10 minutes
  974. if (closing)
  975. break;
  976. doCloseExpired(true);
  977. doCloseExpired(false);
  978. }
  979. }
  980. catch (IException *E)
  981. {
  982. if (!aborting)
  983. EXCLOG(MCoperatorError, E, "Roxie handle closer: ");
  984. E->Release();
  985. }
  986. catch (...)
  987. {
  988. DBGLOG("Unknown exception in handle closer thread");
  989. }
  990. if (traceLevel)
  991. DBGLOG("Handle closer thread %p exiting", this);
  992. return 0;
  993. }
  994. virtual void join(unsigned timeout=INFINITE)
  995. {
  996. aborting = true;
  997. if (started)
  998. {
  999. toCopy.interrupt();
  1000. toClose.interrupt();
  1001. bct.join(timeout);
  1002. hct.join(timeout);
  1003. }
  1004. }
  1005. virtual void wait()
  1006. {
  1007. closing = true;
  1008. if (started)
  1009. {
  1010. toCopy.signal();
  1011. toClose.signal();
  1012. bct.join();
  1013. hct.join();
  1014. }
  1015. }
  1016. virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  1017. {
  1018. return aborting ? CFPcancel : CFPcontinue;
  1019. }
  1020. virtual void removeCache(ILazyFileIO *file) const
  1021. {
  1022. CriticalBlock b(crit);
  1023. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1024. // So only remove from hash table if what we find there matches the item that is being deleted.
  1025. const char *filename = file->queryFilename();
  1026. ILazyFileIO *goer = files.getValue(filename);
  1027. if (goer == file)
  1028. files.remove(filename);
  1029. ForEachItemInRev(idx, todo)
  1030. {
  1031. if (file == &todo.item(idx))
  1032. {
  1033. todo.remove(idx);
  1034. numFilesToProcess--; // must decrement counter for SNMP accuracy
  1035. }
  1036. }
  1037. }
  1038. virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
  1039. IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
  1040. const StringArray &deployedLocationInfo, bool startFileCopy)
  1041. {
  1042. IPropertyTree &partProps = pdesc->queryProperties();
  1043. offset_t dfsSize = partProps.getPropInt64("@size", -1);
  1044. bool local = partProps.getPropBool("@local");
  1045. CDateTime dfsDate;
  1046. if (checkFileDate)
  1047. {
  1048. const char *dateStr = partProps.queryProp("@modified");
  1049. dfsDate.setString(dateStr);
  1050. }
  1051. unsigned partNo = pdesc->queryPartIndex() + 1;
  1052. StringBuffer localLocation;
  1053. if (local)
  1054. {
  1055. assertex(partNo==1 && numParts==1);
  1056. localLocation.append(lfn); // any resolution done earlier
  1057. }
  1058. else
  1059. {
  1060. // MORE - not at all sure about this. Foreign files should stay foreign ?
  1061. CDfsLogicalFileName dlfn;
  1062. dlfn.set(lfn);
  1063. if (dlfn.isForeign())
  1064. dlfn.clearForeign();
  1065. makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault);
  1066. }
  1067. Owned<ILazyFileIO> ret;
  1068. try
  1069. {
  1070. CriticalBlock b(crit);
  1071. Linked<ILazyFileIO> f = files.getValue(localLocation);
  1072. if (f && f->isAlive())
  1073. {
  1074. if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
  1075. (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
  1076. {
  1077. releaseSlaveDynamicFileCache(); // Slave dynamic file cache or...
  1078. if (fileType == ROXIE_KEY) // ...jhtree cache can keep files active and thus prevent us from loading a new version
  1079. clearKeyStoreCacheEntry(f); // Will release iff that is the only link
  1080. f.clear(); // Note - needs to be done before calling getValue() again, hence the need to make it separate from the f.set below
  1081. f.set(files.getValue(localLocation));
  1082. if (f) // May have been cleared above...
  1083. {
  1084. StringBuffer modifiedDt;
  1085. if (!dfsDate.isNull())
  1086. dfsDate.getString(modifiedDt);
  1087. StringBuffer fileDt;
  1088. f->queryDateTime()->getString(fileDt);
  1089. if (fileErrorList.find(lfn) == 0)
  1090. {
  1091. switch (fileType)
  1092. {
  1093. case ROXIE_KEY:
  1094. fileErrorList.setValue(lfn, "Key");
  1095. break;
  1096. case ROXIE_FILE:
  1097. fileErrorList.setValue(lfn, "File");
  1098. break;
  1099. }
  1100. }
  1101. throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %" I64F "d %" I64F "d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
  1102. }
  1103. }
  1104. else
  1105. return f.getClear();
  1106. }
  1107. ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
  1108. if (startFileCopy)
  1109. {
  1110. if (ret->isRemote())
  1111. {
  1112. if (copyResources) // MORE - should always copy peer files
  1113. {
  1114. if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
  1115. {
  1116. ret->checkOpen();
  1117. doCopy(ret, false, false, CFflush_rdwr);
  1118. return ret.getLink();
  1119. }
  1120. // Copies are popped from end of the todo list
  1121. // By putting the replicates on the front we ensure they are done after the primaries
  1122. // and are therefore likely to result in local rather than remote copies.
  1123. if (replicationLevel)
  1124. todo.add(*ret, 0);
  1125. else
  1126. todo.append(*ret);
  1127. numFilesToProcess++; // must increment counter for SNMP accuracy
  1128. toCopy.signal();
  1129. }
  1130. }
  1131. }
  1132. if (!lazyOpen)
  1133. ret->checkOpen();
  1134. }
  1135. catch(IException *e)
  1136. {
  1137. if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
  1138. {
  1139. if (fileErrorList.find(lfn) == 0)
  1140. {
  1141. switch (fileType)
  1142. {
  1143. case ROXIE_KEY:
  1144. fileErrorList.setValue(lfn, "Key");
  1145. break;
  1146. case ROXIE_FILE:
  1147. fileErrorList.setValue(lfn, "File");
  1148. break;
  1149. }
  1150. }
  1151. }
  1152. throw;
  1153. }
  1154. return ret.getLink();
  1155. }
  1156. virtual void closeExpired(bool remote)
  1157. {
  1158. // This schedules a close at the next available opportunity
  1159. CriticalBlock b(cpcrit); // paranoid...
  1160. if (!closePending[remote])
  1161. {
  1162. closePending[remote] = true;
  1163. DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) atomic_read(&numFilesOpen[remote]));
  1164. toClose.signal();
  1165. }
  1166. }
  1167. void doCloseExpired(bool remote)
  1168. {
  1169. {
  1170. CriticalBlock b(cpcrit); // paranoid...
  1171. closePending[remote] = false;
  1172. }
  1173. CriticalBlock b(crit);
  1174. ICopyArrayOf<ILazyFileIO> goers;
  1175. HashIterator h(files);
  1176. ForEach(h)
  1177. {
  1178. ILazyFileIO *f = files.mapToValue(&h.query());
  1179. if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
  1180. {
  1181. unsigned age = msTick() - f->getLastAccessed();
  1182. if (age > maxFileAge[remote])
  1183. {
  1184. if (traceLevel > 5)
  1185. {
  1186. // NOTE - querySource will cause the file to be opened if not already open
  1187. // That's OK here, since we know the file is open and remote.
  1188. // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
  1189. const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
  1190. DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local", fname, age);
  1191. }
  1192. f->close();
  1193. }
  1194. else
  1195. goers.append(*f);
  1196. }
  1197. }
  1198. unsigned numFilesLeft = goers.ordinality();
  1199. if (numFilesLeft > maxFilesOpen[remote])
  1200. {
  1201. goers.sort(CRoxieLazyFileIO::compareAccess);
  1202. DBGLOG("Closing LRU %s files, %d files are open", remote ? "remote" : "local", numFilesLeft);
  1203. unsigned idx = minFilesOpen[remote];
  1204. while (idx < numFilesLeft)
  1205. {
  1206. ILazyFileIO &f = goers.item(idx++);
  1207. if (!f.isCopying())
  1208. {
  1209. if (traceLevel > 5)
  1210. {
  1211. unsigned age = msTick() - f.getLastAccessed();
  1212. DBGLOG("Closing %s (last accessed %u ms ago)", f.queryFilename(), age);
  1213. }
  1214. f.close();
  1215. }
  1216. }
  1217. }
  1218. }
  1219. virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &xml)
  1220. {
  1221. Owned<IFile> dirf = createIFile(directory);
  1222. if (dirf->exists() && dirf->isDirectory())
  1223. {
  1224. try
  1225. {
  1226. Owned<IDirectoryIterator> iter = dirf->directoryFiles(NULL,false,true);
  1227. ForEach(*iter)
  1228. {
  1229. const char *thisName = iter->query().queryFilename();
  1230. flushUnusedDirectories(origBaseDir, thisName, xml);
  1231. }
  1232. if (stricmp(origBaseDir, directory) != 0)
  1233. {
  1234. try
  1235. {
  1236. dirf->remove();
  1237. xml.appendf("<Directory>%s</Directory>\n", directory);
  1238. DBGLOG("Deleted directory %s", directory);
  1239. }
  1240. catch (IException *e)
  1241. {
  1242. // don't care if we can't delete the directory
  1243. e->Release();
  1244. }
  1245. catch(...)
  1246. {
  1247. // don't care if we can't delete the directory
  1248. }
  1249. }
  1250. }
  1251. catch (IException *e)
  1252. {
  1253. // don't care if we can't delete the directory
  1254. e->Release();
  1255. }
  1256. catch(...)
  1257. {
  1258. // don't care if we can't delete the directory
  1259. }
  1260. }
  1261. }
  1262. int numFilesToCopy()
  1263. {
  1264. CriticalBlock b(crit);
  1265. return todo.ordinality();
  1266. }
  1267. virtual StringAttrMapping *queryFileErrorList() { return &fileErrorList; } // returns list of files that could not be open
  1268. static inline bool validFNameChar(char c)
  1269. {
  1270. static const char *invalids = "*\"/:<>?\\|";
  1271. return (c>=32 && c<127 && !strchr(invalids, c));
  1272. }
  1273. };
  1274. ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
  1275. {
  1276. StringArray remoteLocations;
  1277. const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
  1278. if (peerCluster)
  1279. {
  1280. if (*peerCluster!='-') // a remote cluster was specified explicitly
  1281. appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true); // Add only from specified cluster
  1282. }
  1283. else
  1284. appendRemoteLocations(pdesc, remoteLocations, NULL, roxieName, false); // Add from any cluster on same dali, other than mine
  1285. if (remotePDesc)
  1286. appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false); // Then any remote on remote dali
  1287. return queryFileCache().lookupFile(id, fileType, pdesc, numParts, getReplicationLevel(channel), remoteLocations, startCopy);
  1288. }
  1289. //====================================================================================================
  1290. class CFilePartMap : implements IFilePartMap, public CInterface
  1291. {
  1292. class FilePartMapElement
  1293. {
  1294. public:
  1295. offset_t base;
  1296. offset_t top;
  1297. inline int compare(offset_t offset)
  1298. {
  1299. if (offset < base)
  1300. return -1;
  1301. else if (offset >= top)
  1302. return 1;
  1303. else
  1304. return 0;
  1305. }
  1306. } *map;
  1307. static int compareParts(const void *l, const void *r)
  1308. {
  1309. offset_t lp = * (offset_t *) l;
  1310. FilePartMapElement *thisPart = (FilePartMapElement *) r;
  1311. return thisPart->compare(lp);
  1312. }
  1313. unsigned numParts;
  1314. offset_t recordCount;
  1315. offset_t totalSize;
  1316. StringAttr fileName;
  1317. public:
  1318. IMPLEMENT_IINTERFACE;
  1319. CFilePartMap(IPropertyTree &resource)
  1320. {
  1321. fileName.set(resource.queryProp("@id"));
  1322. numParts = resource.getPropInt("@numparts");
  1323. recordCount = resource.getPropInt64("@recordCount");
  1324. totalSize = resource.getPropInt64("@size");
  1325. assertex(numParts);
  1326. map = new FilePartMapElement[numParts];
  1327. for (unsigned i = 0; i < numParts; i++)
  1328. {
  1329. StringBuffer partPath;
  1330. partPath.appendf("Part[@num='%d']", i+1);
  1331. IPropertyTree *part = resource.queryPropTree(partPath.str());
  1332. if (!part)
  1333. {
  1334. partPath.clear().appendf("Part_%d", i+1); // legacy format support
  1335. part = resource.queryPropTree(partPath.str());
  1336. }
  1337. assertex(part);
  1338. offset_t size = part->getPropInt64("@size", (unsigned __int64) -1);
  1339. assertex(size != (unsigned __int64) -1);
  1340. map[i].base = i ? map[i-1].top : 0;
  1341. map[i].top = map[i].base + size;
  1342. }
  1343. if (totalSize == (offset_t)-1)
  1344. totalSize = map[numParts-1].top;
  1345. else if (totalSize != map[numParts-1].top)
  1346. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  1347. }
  1348. CFilePartMap(const char *_fileName, IFileDescriptor &fdesc)
  1349. : fileName(_fileName)
  1350. {
  1351. numParts = fdesc.numParts();
  1352. IPropertyTree &props = fdesc.queryProperties();
  1353. recordCount = props.getPropInt64("@recordCount", -1);
  1354. totalSize = props.getPropInt64("@size", -1);
  1355. assertex(numParts);
  1356. map = new FilePartMapElement[numParts];
  1357. for (unsigned i = 0; i < numParts; i++)
  1358. {
  1359. IPartDescriptor &part = *fdesc.queryPart(i);
  1360. IPropertyTree &partProps = part.queryProperties();
  1361. offset_t size = partProps.getPropInt64("@size", (unsigned __int64) -1);
  1362. map[i].base = i ? map[i-1].top : 0;
  1363. if (size==(unsigned __int64) -1)
  1364. {
  1365. if (i==numParts-1)
  1366. map[i].top = (unsigned __int64) -1;
  1367. else
  1368. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file sizes not known for file %s", fileName.get());
  1369. }
  1370. else
  1371. map[i].top = map[i].base + size;
  1372. }
  1373. if (totalSize == (offset_t)-1)
  1374. totalSize = map[numParts-1].top;
  1375. else if (totalSize != map[numParts-1].top)
  1376. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file part sizes do not add up to expected total size (%" I64F "d vs %" I64F "d", map[numParts-1].top, totalSize);
  1377. }
  1378. ~CFilePartMap()
  1379. {
  1380. delete [] map;
  1381. }
  1382. virtual bool IsShared() const { return CInterface::IsShared(); };
  1383. virtual unsigned mapOffset(offset_t pos) const
  1384. {
  1385. FilePartMapElement *part = (FilePartMapElement *) bsearch(&pos, map, numParts, sizeof(map[0]), compareParts);
  1386. if (!part)
  1387. throw MakeStringException(ROXIE_DATA_ERROR, "CFilePartMap: file position %" I64F "d in file %s out of range (max offset permitted is %" I64F "d)", pos, fileName.str(), totalSize);
  1388. return (part-map)+1;
  1389. }
  1390. virtual unsigned getNumParts() const
  1391. {
  1392. return numParts;
  1393. }
  1394. virtual offset_t getTotalSize() const
  1395. {
  1396. return totalSize;
  1397. }
  1398. virtual offset_t getRecordCount() const
  1399. {
  1400. return recordCount;
  1401. }
  1402. virtual offset_t getBase(unsigned part) const
  1403. {
  1404. if (part > numParts || part == 0)
  1405. {
  1406. throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existant file part %d (valid are 1-%d)", part, numParts);
  1407. }
  1408. return map[part-1].base;
  1409. }
  1410. virtual offset_t getFileSize() const
  1411. {
  1412. return map[numParts-1].top;
  1413. }
  1414. };
  1415. extern IFilePartMap *createFilePartMap(const char *fileName, IFileDescriptor &fdesc)
  1416. {
  1417. return new CFilePartMap(fileName, fdesc);
  1418. }
  1419. //====================================================================================================
  1420. class CFileIOArray : implements IFileIOArray, public CInterface
  1421. {
  1422. mutable CriticalSection crit;
  1423. mutable unsigned __int64 totalSize = (unsigned __int64) -1; // Calculated on demand, and cached
  1424. mutable StringAttr id; // Calculated on demand, and cached
  1425. IPointerArrayOf<IFileIO> files;
  1426. UnsignedArray subfiles;
  1427. StringArray filenames;
  1428. Int64Array bases;
  1429. int actualCrc = 0;
  1430. unsigned valid = 0;
  1431. bool multipleFormatsSeen = false;
  1432. void _getId() const
  1433. {
  1434. md5_state_t md5;
  1435. md5_byte_t digest[16];
  1436. md5_init(&md5);
  1437. ForEachItemIn(idx, files)
  1438. {
  1439. IFileIO *file = files.item(idx);
  1440. if (file)
  1441. {
  1442. md5_append(&md5, (const md5_byte_t *) &file, sizeof(file));
  1443. }
  1444. }
  1445. md5_finish(&md5, digest);
  1446. char digestStr[33];
  1447. for (int i = 0; i < 16; i++)
  1448. {
  1449. sprintf(&digestStr[i*2],"%02x", digest[i]);
  1450. }
  1451. id.set(digestStr, 32);
  1452. }
  1453. public:
  1454. IMPLEMENT_IINTERFACE;
  1455. virtual bool IsShared() const { return CInterface::IsShared(); };
  1456. virtual IFileIO *getFilePart(unsigned partNo, offset_t &base) const override
  1457. {
  1458. if (!files.isItem(partNo))
  1459. {
  1460. DBGLOG("getFilePart requested invalid part %d", partNo);
  1461. throw MakeStringException(ROXIE_FILE_ERROR, "getFilePart requested invalid part %d", partNo);
  1462. }
  1463. IFileIO *file = files.item(partNo);
  1464. if (!file)
  1465. {
  1466. base = 0;
  1467. return NULL;
  1468. }
  1469. base = bases.item(partNo);
  1470. return LINK(file);
  1471. }
  1472. virtual const char *queryLogicalFilename(unsigned partNo) const override
  1473. {
  1474. if (!filenames.isItem(partNo))
  1475. {
  1476. DBGLOG("queryLogicalFilename requested invalid part %d", partNo);
  1477. throw MakeStringException(ROXIE_FILE_ERROR, "queryLogicalFilename requested invalid part %d", partNo);
  1478. }
  1479. return filenames.item(partNo);
  1480. }
  1481. void addFile(IFileIO *f, offset_t base, unsigned subfile, const char *filename, int _actualCrc)
  1482. {
  1483. if (f)
  1484. valid++;
  1485. files.append(f);
  1486. bases.append(base);
  1487. if (_actualCrc)
  1488. {
  1489. if (actualCrc && actualCrc != _actualCrc)
  1490. multipleFormatsSeen = true;
  1491. else
  1492. actualCrc = _actualCrc;
  1493. }
  1494. // MORE - lots of duplication in subfiles and filenames arrays
  1495. subfiles.append(subfile);
  1496. filenames.append(filename ? filename : "");
  1497. }
  1498. virtual unsigned length() const override
  1499. {
  1500. return files.length();
  1501. }
  1502. virtual unsigned numValid() const override
  1503. {
  1504. return valid;
  1505. }
  1506. virtual int queryActualFormatCrc() const override
  1507. {
  1508. return actualCrc;
  1509. }
  1510. virtual bool allFormatsMatch() const override
  1511. {
  1512. return !multipleFormatsSeen;
  1513. }
  1514. virtual bool isValid(unsigned partNo) const override
  1515. {
  1516. if (!files.isItem(partNo))
  1517. return false;
  1518. IFileIO *file = files.item(partNo);
  1519. if (!file)
  1520. return false;
  1521. return true;
  1522. }
  1523. virtual unsigned __int64 size() const override
  1524. {
  1525. CriticalBlock b(crit);
  1526. if (totalSize == (unsigned __int64) -1)
  1527. {
  1528. totalSize = 0;
  1529. ForEachItemIn(idx, files)
  1530. {
  1531. IFileIO *file = files.item(idx);
  1532. if (file)
  1533. totalSize += file->size();
  1534. }
  1535. }
  1536. return totalSize;
  1537. }
  1538. virtual StringBuffer &getId(StringBuffer &ret) const override
  1539. {
  1540. CriticalBlock b(crit);
  1541. if (!id)
  1542. _getId();
  1543. return ret.append(id);
  1544. }
  1545. virtual unsigned getSubFile(unsigned partNo) const override
  1546. {
  1547. return subfiles.item(partNo);
  1548. }
  1549. };
  1550. class CTranslatorSet : implements CInterfaceOf<ITranslatorSet>
  1551. {
  1552. IConstPointerArrayOf<IDynamicTransform> transformers;
  1553. IConstPointerArrayOf<IKeyTranslator> keyTranslators;
  1554. IPointerArrayOf<IOutputMetaData> actualLayouts;
  1555. const RtlRecord &targetLayout;
  1556. int targetFormatCrc = 0;
  1557. bool anyTranslators = false;
  1558. bool anyKeyedTranslators = false;
  1559. bool translatorsMatch = true;
  1560. public:
  1561. CTranslatorSet(const RtlRecord &_targetLayout, int _targetFormatCrc)
  1562. : targetLayout(_targetLayout), targetFormatCrc(_targetFormatCrc)
  1563. {}
  1564. void addTranslator(const IDynamicTransform *translator, const IKeyTranslator *keyTranslator, IOutputMetaData *actualLayout)
  1565. {
  1566. assertex(actualLayout);
  1567. if (translator || keyTranslator)
  1568. anyTranslators = true;
  1569. if (translator && translator->keyedTranslated())
  1570. anyKeyedTranslators = true;
  1571. if (transformers.ordinality() && (translator != transformers.item(0)))
  1572. translatorsMatch = false;
  1573. transformers.append(translator);
  1574. keyTranslators.append(keyTranslator);
  1575. actualLayouts.append(actualLayout);
  1576. }
  1577. virtual const RtlRecord &queryTargetFormat() const override
  1578. {
  1579. return targetLayout;
  1580. }
  1581. virtual int queryTargetFormatCrc() const override
  1582. {
  1583. return targetFormatCrc;
  1584. }
  1585. virtual const IDynamicTransform *queryTranslator(unsigned subFile) const override
  1586. {
  1587. // We need to have translated partnos to subfiles before calling this!
  1588. // Note: while the required projected format will be the same for all parts, the
  1589. // actual layout - and thus the required translation - may not be, for example if
  1590. // we have a superfile with mismatching formats.
  1591. if (anyTranslators && transformers.isItem(subFile))
  1592. return transformers.item(subFile);
  1593. return nullptr;
  1594. }
  1595. virtual const IKeyTranslator *queryKeyTranslator(unsigned subFile) const override
  1596. {
  1597. if (anyTranslators && keyTranslators.isItem(subFile))
  1598. return keyTranslators.item(subFile);
  1599. return nullptr;
  1600. }
  1601. virtual ISourceRowPrefetcher *getPrefetcher(unsigned subFile) const override
  1602. {
  1603. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  1604. assertex(actualLayout);
  1605. return actualLayout->createDiskPrefetcher();
  1606. }
  1607. virtual IOutputMetaData *queryActualLayout(unsigned subFile) const override
  1608. {
  1609. IOutputMetaData *actualLayout = actualLayouts.item(subFile);
  1610. assertex(actualLayout);
  1611. return actualLayout;
  1612. }
  1613. virtual bool isTranslating() const override
  1614. {
  1615. return anyTranslators;
  1616. }
  1617. virtual bool isTranslatingKeyed() const override
  1618. {
  1619. return anyKeyedTranslators;
  1620. }
  1621. virtual bool hasConsistentTranslation() const override
  1622. {
  1623. return translatorsMatch;
  1624. }
  1625. };
  1626. template <class X> class PerChannelCacheOf
  1627. {
  1628. IPointerArrayOf<X> cache;
  1629. IntArray channels;
  1630. public:
  1631. // NOTE - typically only a couple of entries (but see PerFormatCacheOf below
  1632. void set(X *value, unsigned channel)
  1633. {
  1634. cache.append(value);
  1635. channels.append(channel);
  1636. }
  1637. X *get(unsigned channel) const
  1638. {
  1639. ForEachItemIn(idx, channels)
  1640. {
  1641. if (channels.item(idx)==channel)
  1642. return cache.item(idx);
  1643. }
  1644. return NULL;
  1645. }
  1646. };
  1647. template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
  1648. {
  1649. // Identical for now, but characteristics are different so implementations may diverge.
  1650. // For example, this one may want to be a hash table, and there may be many more entries
  1651. };
  1652. CRoxieFileCache * fileCache;
  1653. class CResolvedFile : implements IResolvedFileCreator, implements ISDSSubscription, public CInterface
  1654. {
  1655. protected:
  1656. IResolvedFileCache *cached;
  1657. StringAttr lfn;
  1658. StringAttr physicalName;
  1659. Owned<IDistributedFile> dFile; // NULL on copies serialized to slaves. Note that this implies we keep a lock on dali file for the lifetime of this object.
  1660. CDateTime fileTimeStamp;
  1661. offset_t fileSize;
  1662. unsigned fileCheckSum;
  1663. RoxieFileType fileType;
  1664. bool isSuper;
  1665. StringArray subNames;
  1666. IPointerArrayOf<IFileDescriptor> subFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1667. IPointerArrayOf<IFileDescriptor> remoteSubFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
  1668. IntArray formatCrcs;
  1669. IPointerArrayOf<IOutputMetaData> diskTypeInfo; // New info using RtlTypeInfo structures
  1670. IArrayOf<IDistributedFile> subDFiles; // To make sure subfiles get locked too
  1671. IArrayOf<IResolvedFile> subRFiles; // To make sure subfiles get locked too
  1672. Owned <IPropertyTree> properties;
  1673. Linked<IRoxieDaliHelper> daliHelper;
  1674. Owned<IDaliPackageWatcher> notifier;
  1675. void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
  1676. {
  1677. subNames.append(subName);
  1678. subFiles.append(fdesc);
  1679. remoteSubFiles.append(remoteFDesc);
  1680. IPropertyTree const & props = fdesc->queryProperties();
  1681. // NOTE - grouping is not included in the formatCRC, nor is the trailing byte that indicates grouping
  1682. // included in the rtlTypeInfo.
  1683. const char *kind = props.queryProp("@kind");
  1684. if (kind)
  1685. {
  1686. RoxieFileType thisFileType = streq(kind, "key") ? ROXIE_KEY : ROXIE_FILE;
  1687. if (subFiles.length()==1)
  1688. fileType = thisFileType;
  1689. else
  1690. assertex(thisFileType==fileType);
  1691. }
  1692. bool isGrouped = props.getPropBool("@grouped", false);
  1693. int formatCrc = props.getPropInt("@formatCrc", 0);
  1694. // If formatCrc and grouping are same as previous, reuse previous typeInfo
  1695. Owned<IOutputMetaData> actualFormat;
  1696. unsigned prevIdx = formatCrcs.length()-1;
  1697. if (formatCrcs.length() && formatCrc == formatCrcs.item(prevIdx) &&
  1698. diskTypeInfo.item(prevIdx) && isGrouped==diskTypeInfo.item(prevIdx)->isGrouped())
  1699. actualFormat.set(diskTypeInfo.item(prevIdx));
  1700. else
  1701. actualFormat.setown(getDaliLayoutInfo(props));
  1702. diskTypeInfo.append(actualFormat.getClear());
  1703. formatCrcs.append(formatCrc);
  1704. unsigned numParts = fdesc->numParts();
  1705. offset_t base = 0;
  1706. for (unsigned i = 0; i < numParts; i++)
  1707. {
  1708. IPartDescriptor *pdesc = fdesc->queryPart(i);
  1709. IPropertyTree &partProps = pdesc->queryProperties();
  1710. offset_t dfsSize = partProps.getPropInt64("@size");
  1711. partProps.setPropInt64("@offset", base);
  1712. base += dfsSize;
  1713. }
  1714. fileSize += base;
  1715. }
  1716. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1717. {
  1718. if (traceLevel > 2)
  1719. DBGLOG("Superfile %s change detected", lfn.get());
  1720. {
  1721. CriticalBlock b(lock);
  1722. if (cached)
  1723. {
  1724. cached->removeCache(this);
  1725. cached = NULL;
  1726. }
  1727. }
  1728. globalPackageSetManager->requestReload(false, false);
  1729. }
  1730. // We cache all the file maps/arrays etc here.
  1731. mutable CriticalSection lock;
  1732. mutable Owned<IFilePartMap> fileMap;
  1733. mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
  1734. mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
  1735. mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
  1736. public:
  1737. IMPLEMENT_IINTERFACE;
  1738. CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* _daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
  1739. : daliHelper(_daliHelper), lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
  1740. {
  1741. cached = NULL;
  1742. fileSize = 0;
  1743. fileCheckSum = 0;
  1744. if (dFile)
  1745. {
  1746. if (traceLevel > 5)
  1747. DBGLOG("Roxie server adding information for file %s", lfn.get());
  1748. bool tsSet = dFile->getModificationTime(fileTimeStamp);
  1749. bool csSet = dFile->getFileCheckSum(fileCheckSum);
  1750. assertex(tsSet); // per Nigel, is always set
  1751. IDistributedSuperFile *superFile = dFile->querySuperFile();
  1752. if (superFile)
  1753. {
  1754. isSuper = true;
  1755. Owned<IDistributedFileIterator> subs = superFile->getSubFileIterator(true);
  1756. ForEach(*subs)
  1757. {
  1758. IDistributedFile &sub = subs->query();
  1759. Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
  1760. Owned<IFileDescriptor> remoteFDesc;
  1761. if (daliHelper)
  1762. remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt));
  1763. subDFiles.append(OLINK(sub));
  1764. addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1765. }
  1766. // We have to clone the properties since we don't want to keep the superfile locked
  1767. properties.setown(createPTreeFromIPT(&dFile->queryAttributes(), ipt_lowmem));
  1768. if (!isDynamic && !lockSuperFiles)
  1769. {
  1770. notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
  1771. dFile.clear(); // We don't lock superfiles, except dynamic ones
  1772. }
  1773. }
  1774. else // normal file, not superkey
  1775. {
  1776. isSuper = false;
  1777. properties.set(&dFile->queryAttributes());
  1778. Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
  1779. Owned<IFileDescriptor> remoteFDesc;
  1780. if (daliHelper)
  1781. remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
  1782. addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
  1783. }
  1784. }
  1785. }
  1786. virtual void beforeDispose()
  1787. {
  1788. if (notifier)
  1789. daliHelper->releaseSubscription(notifier);
  1790. notifier.clear();
  1791. if (cached)
  1792. {
  1793. cached->removeCache(this);
  1794. }
  1795. }
  1796. virtual unsigned numSubFiles() const
  1797. {
  1798. return subNames.length();
  1799. }
  1800. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  1801. {
  1802. if (subNames.isItem(num))
  1803. {
  1804. name.append(subNames.item(num));
  1805. return true;
  1806. }
  1807. else
  1808. {
  1809. return false;
  1810. }
  1811. }
  1812. virtual unsigned findSubName(const char *subname) const
  1813. {
  1814. ForEachItemIn(idx, subNames)
  1815. {
  1816. if (stricmp(subNames.item(idx), subname))
  1817. return idx;
  1818. }
  1819. return NotFound;
  1820. }
  1821. virtual unsigned getContents(StringArray &contents) const
  1822. {
  1823. ForEachItemIn(idx, subNames)
  1824. {
  1825. contents.append(subNames.item(idx));
  1826. }
  1827. return subNames.length();
  1828. }
  1829. virtual bool isSuperFile() const
  1830. {
  1831. return isSuper;
  1832. }
  1833. virtual bool isKey() const
  1834. {
  1835. return fileType==ROXIE_KEY;
  1836. }
  1837. virtual IFilePartMap *getFileMap() const
  1838. {
  1839. CriticalBlock b(lock);
  1840. if (!fileMap)
  1841. {
  1842. if (subFiles.length())
  1843. {
  1844. if (subFiles.length()!=1)
  1845. throw MakeStringException(0, "Roxie does not support FETCH or KEYED JOIN to superkey with multiple parts");
  1846. fileMap.setown(createFilePartMap(lfn, *subFiles.item(0)));
  1847. }
  1848. }
  1849. return fileMap.getLink();
  1850. }
  1851. virtual unsigned getNumParts() const
  1852. {
  1853. CriticalBlock b(lock);
  1854. unsigned numParts = 0;
  1855. ForEachItemIn(idx, subFiles)
  1856. {
  1857. unsigned thisNumParts = subFiles.item(idx)->numParts();
  1858. if (thisNumParts > numParts)
  1859. numParts = thisNumParts;
  1860. }
  1861. return numParts;
  1862. }
  1863. bool serializeFDesc(MemoryBuffer &mb, IFileDescriptor *fdesc, unsigned channel, bool isLocal) const
  1864. {
  1865. // Find all the partno's that go to this channel
  1866. unsigned numParts = fdesc->numParts();
  1867. if (numParts > 1 && fileType==ROXIE_KEY && isLocal)
  1868. numParts--; // don't want to send TLK
  1869. UnsignedArray partNos;
  1870. for (unsigned i = 1; i <= numParts; i++)
  1871. {
  1872. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  1873. if (getBondedChannel(i)==channel || !isLocal)
  1874. {
  1875. partNos.append(i-1);
  1876. }
  1877. }
  1878. fdesc->serializeParts(mb, partNos);
  1879. return partNos.length();
  1880. }
  1881. virtual void serializePartial(MemoryBuffer &mb, unsigned channel, bool isLocal) const override
  1882. {
  1883. if (traceLevel > 6)
  1884. DBGLOG("Serializing file information for dynamic file %s, channel %d, local %d", lfn.get(), channel, isLocal);
  1885. byte type = (byte) fileType;
  1886. mb.append(type);
  1887. fileTimeStamp.serialize(mb);
  1888. mb.append(fileCheckSum);
  1889. mb.append(fileSize);
  1890. mb.append(isSuper);
  1891. unsigned numSubFiles = subFiles.length();
  1892. mb.append(numSubFiles);
  1893. ForEachItemIn(idx, subFiles)
  1894. {
  1895. mb.append(subNames.item(idx));
  1896. IFileDescriptor *fdesc = subFiles.item(idx);
  1897. bool anyparts = serializeFDesc(mb, fdesc, channel, isLocal);
  1898. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  1899. if (remoteFDesc)
  1900. {
  1901. mb.append(true);
  1902. anyparts |= serializeFDesc(mb, remoteFDesc, channel, isLocal);
  1903. }
  1904. else
  1905. mb.append(false);
  1906. mb.append(formatCrcs.item(idx));
  1907. IOutputMetaData *diskType = diskTypeInfo.item(idx);
  1908. if (anyparts && diskType)
  1909. {
  1910. if (idx && formatCrcs.item(idx)==formatCrcs.item(idx-1))
  1911. mb.append((byte) 3); // indicating same format as previous
  1912. else
  1913. {
  1914. mb.append((byte) (diskType->isGrouped() ? 2 : 1));
  1915. verifyex(dumpTypeInfo(mb, diskType->queryTypeInfo())); // Must be serializable, as we deserialized it...
  1916. }
  1917. }
  1918. else
  1919. mb.append((byte) 0);
  1920. }
  1921. if (properties)
  1922. {
  1923. mb.append(true);
  1924. properties->serialize(mb);
  1925. }
  1926. else
  1927. mb.append(false);
  1928. }
  1929. virtual ITranslatorSet *getTranslators(int projectedFormatCrc, IOutputMetaData *projected, int expectedFormatCrc, IOutputMetaData *expected, RecordTranslationMode mode, bool isIndex) const override
  1930. {
  1931. // NOTE - projected and expected and anything fetched from them such as type info may reside in dynamically loaded (and unloaded)
  1932. // query DLLs - this means it is not safe to include them in any sort of cache that might outlive the current query.
  1933. Owned<CTranslatorSet> result = new CTranslatorSet(expected->queryRecordAccessor(true), projectedFormatCrc);
  1934. Owned<const IDynamicTransform> translator; // Translates rows from actual to projected
  1935. Owned<const IKeyTranslator> keyedTranslator; // translate filter conditions from expected to actual
  1936. int prevFormatCrc = 0;
  1937. assertex(projected != nullptr);
  1938. ForEachItemIn(idx, subFiles)
  1939. {
  1940. IOutputMetaData *actual = expected;
  1941. if (projectedFormatCrc != 0) // projectedFormatCrc is currently 0 for csv/xml which should not create translators.
  1942. {
  1943. const char *subname = subNames.item(idx);
  1944. int thisFormatCrc = 0;
  1945. bool actualUnknown = true;
  1946. if (mode != RecordTranslationMode::AlwaysECL)
  1947. {
  1948. if (diskTypeInfo.item(idx))
  1949. actual = diskTypeInfo.item(idx);
  1950. else
  1951. actualUnknown = false;
  1952. thisFormatCrc = formatCrcs.item(idx);
  1953. }
  1954. assertex(actual);
  1955. if ((thisFormatCrc != prevFormatCrc) || (idx == 0)) // Check if same translation as last subfile
  1956. {
  1957. translator.clear();
  1958. keyedTranslator.clear();
  1959. //Check if the file requires translation, but translation is disabled
  1960. if (thisFormatCrc && expectedFormatCrc && (thisFormatCrc != expectedFormatCrc) && (mode == RecordTranslationMode::None))
  1961. throwTranslationError(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true), subname);
  1962. if (thisFormatCrc == expectedFormatCrc && projectedFormatCrc == expectedFormatCrc && (actualUnknown || alwaysTrustFormatCrcs))
  1963. {
  1964. if (traceLevel > 5)
  1965. DBGLOG("Assume no translation required, crc's match");
  1966. }
  1967. else
  1968. {
  1969. translator.setown(createRecordTranslator(projected->queryRecordAccessor(true), actual->queryRecordAccessor(true)));
  1970. if (traceLevel > 5)
  1971. translator->describe();
  1972. if (!translator || !translator->canTranslate())
  1973. throw MakeStringException(ROXIE_MISMATCH, "Untranslatable record layout mismatch detected for file %s", subname);
  1974. else if (translator->needsTranslate())
  1975. {
  1976. if (isIndex && translator->keyedTranslated())
  1977. throw MakeStringException(ROXIE_MISMATCH, "Record layout mismatch detected in keyed fields for file %s", subname);
  1978. keyedTranslator.setown(createKeyTranslator(actual->queryRecordAccessor(true), expected->queryRecordAccessor(true)));
  1979. }
  1980. else
  1981. translator.clear();
  1982. }
  1983. }
  1984. prevFormatCrc = thisFormatCrc;
  1985. }
  1986. result->addTranslator(LINK(translator), LINK(keyedTranslator), LINK(actual));
  1987. }
  1988. return result.getClear();
  1989. }
  1990. virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
  1991. {
  1992. CriticalBlock b(lock);
  1993. IFileIOArray *ret = ioArrayMap.get(channel);
  1994. if (!ret)
  1995. {
  1996. ret = createIFileIOArray(isOpt, channel);
  1997. ioArrayMap.set(ret, channel);
  1998. }
  1999. return LINK(ret);
  2000. }
  2001. IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
  2002. {
  2003. Owned<CFileIOArray> f = new CFileIOArray;
  2004. f->addFile(nullptr, 0, 0, nullptr, 0);
  2005. ForEachItemIn(idx, subFiles)
  2006. {
  2007. IFileDescriptor *fdesc = subFiles.item(idx);
  2008. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2009. const char *subname = subNames.item(idx);
  2010. int thisFormatCrc = formatCrcs.item(idx);
  2011. if (fdesc)
  2012. {
  2013. unsigned numParts = fdesc->numParts();
  2014. for (unsigned i = 1; i <= numParts; i++)
  2015. {
  2016. if (!channel || getBondedChannel(i)==channel)
  2017. {
  2018. try
  2019. {
  2020. IPartDescriptor *pdesc = fdesc->queryPart(i-1);
  2021. assertex(pdesc);
  2022. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
  2023. Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
  2024. IPropertyTree &partProps = pdesc->queryProperties();
  2025. f->addFile(file.getClear(), partProps.getPropInt64("@offset"), idx, subname, thisFormatCrc);
  2026. }
  2027. catch (IException *E)
  2028. {
  2029. StringBuffer err;
  2030. err.append("Could not load file ");
  2031. fdesc->getTraceName(err);
  2032. DBGLOG(E, err.str());
  2033. if (!isOpt)
  2034. throw;
  2035. E->Release();
  2036. f->addFile(nullptr, 0, idx, nullptr, 0);
  2037. }
  2038. }
  2039. else
  2040. f->addFile(nullptr, 0, idx, nullptr, 0);
  2041. }
  2042. }
  2043. }
  2044. return f.getClear();
  2045. }
  2046. virtual IKeyArray *getKeyArray(bool isOpt, unsigned channel) const override
  2047. {
  2048. unsigned maxParts = 0;
  2049. ForEachItemIn(subFile, subFiles)
  2050. {
  2051. IFileDescriptor *fdesc = subFiles.item(subFile);
  2052. if (fdesc)
  2053. {
  2054. unsigned numParts = fdesc->numParts();
  2055. if (numParts > 1)
  2056. numParts--; // Don't include TLK
  2057. if (numParts > maxParts)
  2058. maxParts = numParts;
  2059. }
  2060. }
  2061. CriticalBlock b(lock);
  2062. IKeyArray *ret = keyArrayMap.get(channel);
  2063. if (!ret)
  2064. {
  2065. ret = createKeyArray(isOpt, channel, maxParts);
  2066. keyArrayMap.set(ret, channel);
  2067. }
  2068. return LINK(ret);
  2069. }
  2070. IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
  2071. {
  2072. Owned<IKeyArray> ret = ::createKeyArray();
  2073. if (channel)
  2074. {
  2075. ret->addKey(NULL);
  2076. for (unsigned partNo = 1; partNo <= maxParts; partNo++)
  2077. {
  2078. if (channel == getBondedChannel(partNo))
  2079. {
  2080. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2081. ForEachItemIn(idx, subFiles)
  2082. {
  2083. IFileDescriptor *fdesc = subFiles.item(idx);
  2084. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2085. Owned <ILazyFileIO> part;
  2086. unsigned crc = 0;
  2087. if (fdesc) // NB there may be no parts for this channel
  2088. {
  2089. IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
  2090. if (pdesc)
  2091. {
  2092. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
  2093. part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
  2094. pdesc->getCrc(crc);
  2095. }
  2096. }
  2097. if (part)
  2098. {
  2099. if (lazyOpen)
  2100. {
  2101. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2102. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), false, false));
  2103. }
  2104. else
  2105. keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), false, false));
  2106. }
  2107. else
  2108. keyset->addIndex(NULL);
  2109. }
  2110. ret->addKey(keyset.getClear());
  2111. }
  2112. else
  2113. ret->addKey(NULL);
  2114. }
  2115. }
  2116. else
  2117. {
  2118. // Channel 0 means return the TLK
  2119. IArrayOf<IKeyIndexBase> subkeys;
  2120. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2121. ForEachItemIn(idx, subFiles)
  2122. {
  2123. IFileDescriptor *fdesc = subFiles.item(idx);
  2124. IFileDescriptor *remoteFDesc = remoteSubFiles.item(idx);
  2125. Owned<IKeyIndexBase> key;
  2126. if (fdesc)
  2127. {
  2128. unsigned numParts = fdesc->numParts();
  2129. assertex(numParts > 0);
  2130. IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
  2131. IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
  2132. Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
  2133. unsigned crc = 0;
  2134. pdesc->getCrc(crc);
  2135. StringBuffer pname;
  2136. pdesc->getPath(pname);
  2137. if (lazyOpen)
  2138. {
  2139. // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
  2140. key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), numParts>1, false));
  2141. }
  2142. else
  2143. key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), numParts>1, false));
  2144. keyset->addIndex(LINK(key->queryPart(0)));
  2145. }
  2146. else
  2147. keyset->addIndex(NULL);
  2148. }
  2149. if (keyset->numParts())
  2150. ret->addKey(keyset.getClear());
  2151. else if (!isOpt)
  2152. throw MakeStringException(ROXIE_FILE_ERROR, "Key %s has no key parts", lfn.get());
  2153. else if (traceLevel > 4)
  2154. DBGLOG(ROXIE_OPT_REPORTING, "Key %s has no key parts", lfn.get());
  2155. }
  2156. return ret.getClear();
  2157. }
  2158. virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IOutputMetaData *preloadLayout, bool preload) const
  2159. {
  2160. // MORE - I don't know that it makes sense to pass isOpt in to these calls
  2161. // Failures to resolve will not be cached, only successes.
  2162. // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
  2163. CriticalBlock b(lock);
  2164. IInMemoryIndexManager *ret = indexMap.get(channel);
  2165. if (!ret)
  2166. {
  2167. ret = createInMemoryIndexManager(preloadLayout->queryRecordAccessor(true), isOpt, lfn);
  2168. Owned<IFileIOArray> files = getIFileIOArray(isOpt, channel);
  2169. ret->load(files, preloadLayout, preload); // note - files (passed in) are also channel specific
  2170. indexMap.set(ret, channel);
  2171. }
  2172. return LINK(ret);
  2173. }
  2174. virtual const CDateTime &queryTimeStamp() const
  2175. {
  2176. return fileTimeStamp;
  2177. }
  2178. virtual unsigned queryCheckSum() const
  2179. {
  2180. return fileCheckSum;
  2181. }
  2182. virtual offset_t getFileSize() const
  2183. {
  2184. return fileSize;
  2185. }
  2186. virtual hash64_t addHash64(hash64_t hashValue) const
  2187. {
  2188. hashValue = fileTimeStamp.getHash(hashValue);
  2189. if (fileCheckSum)
  2190. hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
  2191. return hashValue;
  2192. }
  2193. virtual void addSubFile(const IResolvedFile *_sub)
  2194. {
  2195. const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
  2196. if (subFiles.length())
  2197. assertex(sub->fileType==fileType);
  2198. else
  2199. fileType = sub->fileType;
  2200. subRFiles.append((IResolvedFile &) *LINK(_sub));
  2201. ForEachItemIn(idx, sub->subFiles)
  2202. {
  2203. addFile(sub->subNames.item(idx), LINK(sub->subFiles.item(idx)), LINK(sub->remoteSubFiles.item(idx)));
  2204. }
  2205. }
  2206. virtual void addSubFile(IFileDescriptor *_sub, IFileDescriptor *_remoteSub)
  2207. {
  2208. addFile(lfn, _sub, _remoteSub);
  2209. }
  2210. virtual void addSubFile(const char *localFileName)
  2211. {
  2212. Owned<IFile> file = createIFile(localFileName);
  2213. assertex(file->exists());
  2214. offset_t size = file->size();
  2215. Owned<IFileDescriptor> fdesc = createFileDescriptor();
  2216. if (isIndexFile(file))
  2217. fdesc->queryProperties().setProp("@kind", "key");
  2218. Owned<IPropertyTree> pp = createPTree("Part", ipt_lowmem);
  2219. pp->setPropInt64("@size",size);
  2220. pp->setPropBool("@local", true);
  2221. fdesc->setPart(0, queryMyNode(), localFileName, pp);
  2222. addSubFile(fdesc.getClear(), NULL);
  2223. }
  2224. virtual void setCache(IResolvedFileCache *cache)
  2225. {
  2226. if (cached)
  2227. {
  2228. if (traceLevel > 9)
  2229. DBGLOG("setCache removing from prior cache %s", queryFileName());
  2230. if (cache==NULL)
  2231. cached->removeCache(this);
  2232. else
  2233. throwUnexpected();
  2234. }
  2235. cached = cache;
  2236. }
  2237. virtual bool isAlive() const
  2238. {
  2239. return CInterface::isAlive();
  2240. }
  2241. virtual const char *queryFileName() const
  2242. {
  2243. return lfn.get();
  2244. }
  2245. virtual const char *queryPhysicalName() const
  2246. {
  2247. return physicalName.get();
  2248. }
  2249. virtual const IPropertyTree *queryProperties() const
  2250. {
  2251. return properties;
  2252. }
  2253. virtual void remove()
  2254. {
  2255. subFiles.kill();
  2256. subDFiles.kill();
  2257. subRFiles.kill();
  2258. subNames.kill();
  2259. remoteSubFiles.kill();
  2260. properties.clear();
  2261. notifier.clear();
  2262. if (isSuper)
  2263. {
  2264. // Because we don't lock superfiles, we need to behave differently
  2265. UNIMPLEMENTED;
  2266. }
  2267. else if (dFile)
  2268. {
  2269. dFile->detach();
  2270. }
  2271. else if (!physicalName.isEmpty())
  2272. {
  2273. try
  2274. {
  2275. Owned<IFile> file = createIFile(physicalName.get());
  2276. file->remove();
  2277. }
  2278. catch (IException *e)
  2279. {
  2280. ERRLOG(-1, "Error removing file %s (%s)", lfn.get(), physicalName.get());
  2281. e->Release();
  2282. }
  2283. }
  2284. }
  2285. virtual bool exists() const
  2286. {
  2287. // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
  2288. // This will make more sense if/when we start to lock earlier.
  2289. if (dFile || isSuper)
  2290. return true; // MORE - may need some thought - especially the isSuper case
  2291. else if (!physicalName.isEmpty())
  2292. return checkFileExists(physicalName.get());
  2293. else
  2294. return false;
  2295. }
  2296. };
  2297. /*----------------------------------------------------------------------------------------------------------
  2298. MORE
  2299. - on remote() calls we can't pass the expected file date but we will pass it back with the file info.
  2300. ------------------------------------------------------------------------------------------------------------*/
  2301. class CSlaveDynamicFile : public CResolvedFile
  2302. {
  2303. public:
  2304. bool isOpt; // MORE - this is not very good. Needs some thought unless you cache opt / nonOpt separately which seems wasteful
  2305. bool isLocal;
  2306. unsigned channel;
  2307. unsigned serverIdx;
  2308. public:
  2309. CSlaveDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
  2310. : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
  2311. {
  2312. // call back to the server to get the info
  2313. IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
  2314. try
  2315. {
  2316. RoxiePacketHeader newHeader(*header, ROXIE_FILECALLBACK);
  2317. bool ok = false;
  2318. for (unsigned i = 0; i < callbackRetries; i++)
  2319. {
  2320. Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
  2321. unsigned len = strlen(lfn)+3; // 1 for isOpt, 1 for isLocal, 1 for null terminator
  2322. char *buf = (char *) output->getBuffer(len, true);
  2323. buf[0] = isOpt;
  2324. buf[1] = isLocal;
  2325. strcpy(buf+2, lfn.get());
  2326. output->putBuffer(buf, len, true);
  2327. output->flush(true);
  2328. output.clear();
  2329. if (callback->wait(callbackTimeout))
  2330. {
  2331. ok = true;
  2332. break;
  2333. }
  2334. else
  2335. {
  2336. DBGLOG("timed out waiting for server callback - retrying");
  2337. }
  2338. }
  2339. if (ok)
  2340. {
  2341. if (traceLevel > 6)
  2342. { StringBuffer s; DBGLOG("Processing information from server in response to %s", newHeader.toString(s).str()); }
  2343. MemoryBuffer &serverData = callback->queryData();
  2344. byte type;
  2345. serverData.read(type);
  2346. fileType = (RoxieFileType) type;
  2347. fileTimeStamp.deserialize(serverData);
  2348. serverData.read(fileCheckSum);
  2349. serverData.read(fileSize);
  2350. serverData.read(isSuper);
  2351. unsigned numSubFiles;
  2352. serverData.read(numSubFiles);
  2353. for (unsigned fileNo = 0; fileNo < numSubFiles; fileNo++)
  2354. {
  2355. StringBuffer subName;
  2356. serverData.read(subName);
  2357. subNames.append(subName.str());
  2358. deserializeFilePart(serverData, subFiles, fileNo, false);
  2359. bool remotePresent;
  2360. serverData.read(remotePresent);
  2361. if (remotePresent)
  2362. deserializeFilePart(serverData, remoteSubFiles, fileNo, true);
  2363. else
  2364. remoteSubFiles.append(NULL);
  2365. unsigned formatCrc;
  2366. serverData.read(formatCrc);
  2367. formatCrcs.append(formatCrc);
  2368. byte diskTypeInfoPresent;
  2369. serverData.read(diskTypeInfoPresent);
  2370. switch (diskTypeInfoPresent)
  2371. {
  2372. case 0:
  2373. diskTypeInfo.append(NULL);
  2374. break;
  2375. case 1:
  2376. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, false));
  2377. break;
  2378. case 2:
  2379. diskTypeInfo.append(createTypeInfoOutputMetaData(serverData, true));
  2380. break;
  2381. case 3:
  2382. assertex(fileNo > 0);
  2383. diskTypeInfo.append(LINK(diskTypeInfo.item(fileNo-1)));
  2384. break;
  2385. default:
  2386. throwUnexpected();
  2387. }
  2388. }
  2389. bool propertiesPresent;
  2390. serverData.read(propertiesPresent);
  2391. if (propertiesPresent)
  2392. properties.setown(createPTree(serverData, ipt_lowmem));
  2393. }
  2394. else
  2395. throw MakeStringException(ROXIE_CALLBACK_ERROR, "Failed to get response from server for dynamic file callback");
  2396. }
  2397. catch (...)
  2398. {
  2399. ROQ->removePendingCallback(callback);
  2400. throw;
  2401. }
  2402. ROQ->removePendingCallback(callback);
  2403. }
  2404. private:
  2405. void deserializeFilePart(MemoryBuffer &serverData, IPointerArrayOf<IFileDescriptor> &files, unsigned fileNo, bool remote)
  2406. {
  2407. IArrayOf<IPartDescriptor> parts;
  2408. deserializePartFileDescriptors(serverData, parts);
  2409. if (parts.length())
  2410. {
  2411. files.append(LINK(&parts.item(0).queryOwner()));
  2412. }
  2413. else
  2414. {
  2415. if (traceLevel > 6)
  2416. DBGLOG("No information for %s subFile %d of file %s", remote ? "remote" : "", fileNo, lfn.get());
  2417. files.append(NULL);
  2418. }
  2419. }
  2420. };
  2421. extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
  2422. {
  2423. return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
  2424. }
  2425. extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
  2426. {
  2427. const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
  2428. return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
  2429. }
  2430. class CSlaveDynamicFileCache : implements ISlaveDynamicFileCache, public CInterface
  2431. {
  2432. unsigned tableSize;
  2433. mutable CriticalSection crit;
  2434. CIArrayOf<CSlaveDynamicFile> files; // expect numbers to be small - probably not worth hashing
  2435. public:
  2436. IMPLEMENT_IINTERFACE;
  2437. CSlaveDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
  2438. virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal) override
  2439. {
  2440. if (logctx.queryTraceLevel() > 5)
  2441. {
  2442. StringBuffer s;
  2443. logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
  2444. }
  2445. // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
  2446. CriticalBlock b(crit);
  2447. if (!cacheDate.isNull())
  2448. {
  2449. unsigned idx = 0;
  2450. while (files.isItem(idx))
  2451. {
  2452. CSlaveDynamicFile &f = files.item(idx);
  2453. if (f.channel==header->channel && f.serverIdx==header->serverIdx && stricmp(f.queryFileName(), lfn)==0)
  2454. {
  2455. if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
  2456. {
  2457. if (f.isKey())
  2458. clearKeyStoreCacheEntry(f.queryFileName());
  2459. files.remove(idx);
  2460. idx--;
  2461. }
  2462. else if ((!f.isLocal || isLocal) && f.isOpt==isOpt)
  2463. {
  2464. files.swap(idx, 0);
  2465. return LINK(&f);
  2466. }
  2467. }
  2468. idx++;
  2469. }
  2470. }
  2471. Owned<CSlaveDynamicFile> ret;
  2472. {
  2473. // Don't prevent access to the cache while waiting for server to reply. Can deadlock if you do, apart from being inefficient
  2474. CriticalUnblock b1(crit);
  2475. ret.setown(new CSlaveDynamicFile(logctx, lfn, header, isOpt, isLocal));
  2476. }
  2477. if (!ret->isSuperFile())
  2478. {
  2479. // Cache results for improved performance - we DON'T cache superfiles as they are liable to change during the course of a query.
  2480. // Note that even caching non-superfiles is also potentially going to give stale results, if the cache persists beyond the current
  2481. // query.
  2482. while (files.length() > tableSize)
  2483. files.remove(files.length()-1);
  2484. files.add(*ret.getLink(), 0);
  2485. }
  2486. return ret.getClear();
  2487. }
  2488. virtual void releaseAll() override
  2489. {
  2490. CriticalBlock b(crit);
  2491. files.kill();
  2492. }
  2493. };
  2494. static CriticalSection slaveDynamicFileCacheCrit;
  2495. static Owned<ISlaveDynamicFileCache> slaveDynamicFileCache;
  2496. extern ISlaveDynamicFileCache *querySlaveDynamicFileCache()
  2497. {
  2498. if (!slaveDynamicFileCache)
  2499. {
  2500. CriticalBlock b(slaveDynamicFileCacheCrit);
  2501. if (!slaveDynamicFileCache)
  2502. slaveDynamicFileCache.setown(new CSlaveDynamicFileCache(20));
  2503. }
  2504. return slaveDynamicFileCache;
  2505. }
  2506. extern void releaseSlaveDynamicFileCache()
  2507. {
  2508. CriticalBlock b(slaveDynamicFileCacheCrit);
  2509. if (slaveDynamicFileCache)
  2510. slaveDynamicFileCache->releaseAll();
  2511. }
  2512. // Initialization/termination
  2513. MODULE_INIT(INIT_PRIORITY_STANDARD)
  2514. {
  2515. fileCache = new CRoxieFileCache;
  2516. return true;
  2517. }
  2518. MODULE_EXIT()
  2519. {
  2520. fileCache->join();
  2521. fileCache->Release();
  2522. }
  2523. extern IRoxieFileCache &queryFileCache()
  2524. {
  2525. return *fileCache;
  2526. }
  2527. class CRoxieWriteHandler : implements IRoxieWriteHandler, public CInterface
  2528. {
  2529. public:
  2530. IMPLEMENT_IINTERFACE;
  2531. CRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2532. : daliHelper(_daliHelper), dFile(_dFile)
  2533. {
  2534. ForEachItemIn(idx, _clusters)
  2535. {
  2536. addCluster(_clusters.item(idx));
  2537. }
  2538. if (dFile->queryDistributedFile())
  2539. {
  2540. isTemporary = (localCluster.get() == NULL); // if only writing to remote clusters, write to a temporary first, then copy
  2541. if (isTemporary)
  2542. {
  2543. UNIMPLEMENTED;
  2544. }
  2545. else
  2546. localFile.setown(dFile->getPartFile(0, 0));
  2547. }
  2548. else
  2549. {
  2550. isTemporary = false;
  2551. localFile.setown(dFile->getPartFile(0, 0));
  2552. }
  2553. if (!recursiveCreateDirectoryForFile(localFile->queryFilename()))
  2554. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot create directory for file %s", localFile->queryFilename());
  2555. }
  2556. virtual IFile *queryFile() const
  2557. {
  2558. return localFile;
  2559. }
  2560. void getClusters(StringArray &clusters) const
  2561. {
  2562. ForEachItemIn(idx, allClusters)
  2563. {
  2564. clusters.append(allClusters.item(idx));
  2565. }
  2566. }
  2567. virtual void finish(bool success, const IRoxiePublishCallback *activity)
  2568. {
  2569. if (success)
  2570. {
  2571. copyPhysical();
  2572. if (daliHelper && daliHelper->connected())
  2573. publish(activity);
  2574. }
  2575. if (isTemporary || !success)
  2576. {
  2577. localFile->remove();
  2578. }
  2579. }
  2580. private:
  2581. bool isTemporary;
  2582. Linked<IRoxieDaliHelper> daliHelper;
  2583. Owned<ILocalOrDistributedFile> dFile;
  2584. Owned<IFile> localFile;
  2585. Owned<IGroup> localCluster;
  2586. StringAttr localClusterName;
  2587. IArrayOf<IGroup> remoteNodes;
  2588. StringArray allClusters;
  2589. void copyPhysical() const
  2590. {
  2591. RemoteFilename rfn, rdn;
  2592. dFile->getPartFilename(rfn, 0, 0);
  2593. StringBuffer physicalName, physicalDir, physicalBase;
  2594. rfn.getLocalPath(physicalName);
  2595. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2596. rdn.setLocalPath(physicalDir.str());
  2597. if (localCluster && getNumNodes() > 1)
  2598. {
  2599. unsigned buddy = myNodeIndex+1;
  2600. if (buddy >= getNumNodes())
  2601. buddy = 0;
  2602. SocketEndpoint buddyNode(0, getNodeAddress(buddy));
  2603. rdn.setEp(buddyNode);
  2604. rfn.setEp(buddyNode);
  2605. Owned<IFile> targetdir = createIFile(rdn);
  2606. Owned<IFile> target = createIFile(rfn);
  2607. targetdir->createDirectory();
  2608. copyFile(target, localFile);
  2609. }
  2610. if (remoteNodes.length())
  2611. {
  2612. ForEachItemIn(idx, remoteNodes)
  2613. {
  2614. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2615. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2616. Owned<IFile> targetdir = createIFile(rdn);
  2617. Owned<IFile> target = createIFile(rfn);
  2618. targetdir->createDirectory();
  2619. copyFile(target, localFile);
  2620. }
  2621. }
  2622. }
  2623. void publish(const IRoxiePublishCallback *activity)
  2624. {
  2625. if (!dFile->isExternal())
  2626. {
  2627. Owned<IFileDescriptor> desc = createFileDescriptor();
  2628. desc->setNumParts(1);
  2629. RemoteFilename rfn;
  2630. dFile->getPartFilename(rfn, 0, 0);
  2631. StringBuffer physicalName, physicalDir, physicalBase;
  2632. rfn.getLocalPath(physicalName);
  2633. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2634. desc->setDefaultDir(physicalDir.str());
  2635. desc->setPartMask(physicalBase.str());
  2636. IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
  2637. IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
  2638. offset_t fileSize = localFile->size();
  2639. fileProps.setPropInt64("@size", fileSize);
  2640. partProps.setPropInt64("@size", fileSize);
  2641. CDateTime createTime, modifiedTime, accessedTime;
  2642. localFile->getTime(&createTime, &modifiedTime, &accessedTime);
  2643. // round file time down to nearest sec. Nanosec accurancy is not preserved elsewhere and can lead to mismatch later.
  2644. unsigned hour, min, sec, nanosec;
  2645. modifiedTime.getTime(hour, min, sec, nanosec);
  2646. modifiedTime.setTime(hour, min, sec, 0);
  2647. StringBuffer timestr;
  2648. modifiedTime.getString(timestr);
  2649. if(timestr.length())
  2650. partProps.setProp("@modified", timestr.str());
  2651. ClusterPartDiskMapSpec partmap;
  2652. if (localCluster)
  2653. {
  2654. desc->addCluster(localCluster, partmap);
  2655. desc->setClusterGroupName(0, localClusterName.get());
  2656. }
  2657. ForEachItemIn(idx, remoteNodes)
  2658. desc->addCluster(&remoteNodes.item(idx), partmap);
  2659. if (activity)
  2660. activity->setFileProperties(desc);
  2661. Owned<IDistributedFile> publishFile = queryDistributedFileDirectory().createNew(desc); // MORE - we'll create this earlier if we change the locking paradigm
  2662. publishFile->setAccessedTime(modifiedTime);
  2663. IUserDescriptor * userdesc = NULL;
  2664. if (activity)
  2665. userdesc = activity->queryUserDescriptor();
  2666. else
  2667. {
  2668. Owned<IRoxieDaliHelper> daliHelper = connectToDali(false);
  2669. if (daliHelper)
  2670. userdesc = daliHelper->queryUserDescriptor();//predeployed query mode
  2671. }
  2672. publishFile->attach(dFile->queryLogicalName(), userdesc);
  2673. // MORE should probably write to the roxielocalstate too in case Dali is down next time I look...
  2674. }
  2675. }
  2676. void addCluster(char const * cluster)
  2677. {
  2678. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  2679. if (!group)
  2680. throw MakeStringException(0, "Unknown cluster %s while writing file %s",
  2681. cluster, dFile->queryLogicalName());
  2682. if (group->isMember())
  2683. {
  2684. if (localCluster)
  2685. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2686. cluster, dFile->queryLogicalName());
  2687. SocketEndpointArray eps;
  2688. SocketEndpoint me(0, getNodeAddress(myNodeIndex));
  2689. eps.append(me);
  2690. if (getNumNodes() > 1)
  2691. {
  2692. unsigned buddy = myNodeIndex+1;
  2693. if (buddy >= getNumNodes())
  2694. buddy = 0;
  2695. SocketEndpoint buddyNode(0, getNodeAddress(buddy));
  2696. eps.append(buddyNode);
  2697. }
  2698. localCluster.setown(createIGroup(eps));
  2699. StringBuffer clusterName;
  2700. localClusterName.set(eps.getText(clusterName));
  2701. }
  2702. else
  2703. {
  2704. ForEachItemIn(idx, remoteNodes)
  2705. {
  2706. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  2707. if (group->isMember(other))
  2708. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2709. cluster, dFile->queryLogicalName());
  2710. }
  2711. remoteNodes.append(*group.getClear());
  2712. }
  2713. allClusters.append(cluster);
  2714. }
  2715. };
  2716. extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper, ILocalOrDistributedFile *_dFile, const StringArray &_clusters)
  2717. {
  2718. return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
  2719. }
  2720. //================================================================================================================
  2721. #ifdef _USE_CPPUNIT
  2722. #include "unittests.hpp"
  2723. class CcdFileTest : public CppUnit::TestFixture
  2724. {
  2725. CPPUNIT_TEST_SUITE(CcdFileTest);
  2726. CPPUNIT_TEST(testCopy);
  2727. CPPUNIT_TEST_SUITE_END();
  2728. protected:
  2729. class DummyPartDescriptor : public CInterfaceOf<IPartDescriptor>
  2730. {
  2731. virtual unsigned queryPartIndex() { UNIMPLEMENTED; }
  2732. virtual unsigned numCopies() { UNIMPLEMENTED; }
  2733. virtual INode *getNode(unsigned copy=0) { UNIMPLEMENTED; }
  2734. virtual INode *queryNode(unsigned copy=0) { UNIMPLEMENTED; }
  2735. virtual IPropertyTree &queryProperties() { UNIMPLEMENTED; }
  2736. virtual IPropertyTree *getProperties() { UNIMPLEMENTED; }
  2737. virtual RemoteFilename &getFilename(unsigned copy, RemoteFilename &rfn) { UNIMPLEMENTED; }
  2738. virtual StringBuffer &getTail(StringBuffer &name) { UNIMPLEMENTED; }
  2739. virtual StringBuffer &getDirectory(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  2740. virtual StringBuffer &getPath(StringBuffer &name,unsigned copy = 0) { UNIMPLEMENTED; }
  2741. virtual void serialize(MemoryBuffer &tgt) { UNIMPLEMENTED; }
  2742. virtual bool isMulti() { UNIMPLEMENTED; }
  2743. virtual RemoteMultiFilename &getMultiFilename(unsigned copy, RemoteMultiFilename &rfn) { UNIMPLEMENTED; }
  2744. virtual bool getCrc(unsigned &crc) { UNIMPLEMENTED; }
  2745. virtual IFileDescriptor &queryOwner() { UNIMPLEMENTED; }
  2746. virtual const char *queryOverrideName() { UNIMPLEMENTED; }
  2747. virtual unsigned copyClusterNum(unsigned copy,unsigned *replicate=NULL) { UNIMPLEMENTED; }
  2748. virtual IReplicatedFile *getReplicatedFile() { UNIMPLEMENTED; }
  2749. };
  2750. void testCopy()
  2751. {
  2752. remove("test.local");
  2753. remove("test.remote");
  2754. remove("test.buddy");
  2755. CRoxieFileCache cache(true);
  2756. StringArray remotes;
  2757. DummyPartDescriptor pdesc;
  2758. CDateTime dummy;
  2759. remotes.append("test.remote");
  2760. int f = open("test.remote", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  2761. CPPUNIT_ASSERT(f >= 0);
  2762. int val = 1;
  2763. int wrote = write(f, &val, sizeof(int));
  2764. CPPUNIT_ASSERT(wrote==sizeof(int));
  2765. close(f);
  2766. Owned<ILazyFileIO> io = cache.openFile("test.local", 0, "test.local", NULL, remotes, sizeof(int), dummy);
  2767. CPPUNIT_ASSERT(io != NULL);
  2768. // Reading it should read 1
  2769. val = 0;
  2770. ssize_t bytesRead = io->read(0, sizeof(int), &val);
  2771. CPPUNIT_ASSERT(bytesRead==4);
  2772. CPPUNIT_ASSERT(val==1);
  2773. // Now create the buddy
  2774. f = open("test.buddy", _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE);
  2775. val = 2;
  2776. ssize_t numwritten = write(f, &val, sizeof(int));
  2777. CPPUNIT_ASSERT(numwritten == sizeof(int));
  2778. close(f);
  2779. // Reading it should still read 1...
  2780. val = 0;
  2781. io->read(0, sizeof(int), &val);
  2782. CPPUNIT_ASSERT(val==1);
  2783. // Now copy it - should copy the buddy
  2784. cache.doCopy(io, false, false);
  2785. // Reading it should read 2...
  2786. val = 0;
  2787. io->read(0, sizeof(int), &val);
  2788. CPPUNIT_ASSERT(val==2);
  2789. // And the data in the file should be 2
  2790. f = open("test.local", _O_RDONLY);
  2791. val = 0;
  2792. ssize_t numread = read(f, &val, sizeof(int));
  2793. CPPUNIT_ASSERT(numread == sizeof(int));
  2794. close(f);
  2795. CPPUNIT_ASSERT(val==2);
  2796. io.clear();
  2797. remove("test.local");
  2798. remove("test.remote");
  2799. remove("test.buddy");
  2800. }
  2801. };
  2802. CPPUNIT_TEST_SUITE_REGISTRATION( CcdFileTest );
  2803. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( CcdFileTest, "CcdFileTest" );
  2804. #endif