|
@@ -35,6 +35,7 @@
|
|
|
#include "ccdsnmp.hpp"
|
|
|
#include "rmtfile.hpp"
|
|
|
#include "ccdqueue.ipp"
|
|
|
+#include "ccdcache.hpp"
|
|
|
#if defined(__linux__) || defined(__APPLE__)
|
|
|
#include <sys/mman.h>
|
|
|
#endif
|
|
@@ -624,57 +625,6 @@ typedef StringArray *StringArrayPtr;
|
|
|
|
|
|
// A circular buffer recording recent disk read operations that can be used to "prewarm" the cache
|
|
|
|
|
|
-struct CacheInfoEntry
|
|
|
-{
|
|
|
- //For convenience the values for PageType match the NodeX enumeration (see noteWarm).
|
|
|
- //Ensure disk entries sort last so that index nodes take precedence when deduping offsets.
|
|
|
- enum PageType : unsigned
|
|
|
- {
|
|
|
- PageTypeBranch = 0,
|
|
|
- PageTypeLeaf = 1,
|
|
|
- PageTypeBlob = 2,
|
|
|
- PageTypeDisk = 3,
|
|
|
- };
|
|
|
-
|
|
|
- union
|
|
|
- {
|
|
|
- struct
|
|
|
- {
|
|
|
-#ifndef _WIN32
|
|
|
- unsigned type: 2; // disk or the kind of index node
|
|
|
- __uint64 page: 38; // Support file sizes up to 2^51 i.e. 2PB
|
|
|
- unsigned file: 24; // Up to 4 million files
|
|
|
-#else
|
|
|
-//Windows does not like packing bitfields with different base types - fails the statck assert
|
|
|
- __uint64 type: 2; // disk or the kind of index node
|
|
|
- __uint64 page: 38; // Support file sizes up to 2^51 i.e. 2PB
|
|
|
- __uint64 file: 24; // Up to 4 million files
|
|
|
-#endif
|
|
|
- } b;
|
|
|
- __uint64 u;
|
|
|
- };
|
|
|
-
|
|
|
-#ifndef _WIN32
|
|
|
- static_assert(sizeof(b) == sizeof(u), "Unexpected packing issue in CacheInfoEntry");
|
|
|
-#elif _MSC_VER >= 1900
|
|
|
- //Older versions of the windows compiler complain CacheInfoEntry::b is not a type name
|
|
|
- static_assert(sizeof(b) == sizeof(u), "Unexpected packing issue in CacheInfoEntry");
|
|
|
-#endif
|
|
|
-
|
|
|
- inline CacheInfoEntry() { u = 0; }
|
|
|
- inline CacheInfoEntry(unsigned _file, offset_t _pos, PageType pageType)
|
|
|
- {
|
|
|
- b.file = _file;
|
|
|
- b.page = _pos >> pageBits;
|
|
|
- b.type = pageType;
|
|
|
- }
|
|
|
- inline bool operator < ( const CacheInfoEntry &l) const { return u < l.u; }
|
|
|
- inline bool operator <= ( const CacheInfoEntry &l) const { return u <= l.u; }
|
|
|
- inline void operator++ () { b.page++; }
|
|
|
-
|
|
|
- static constexpr unsigned pageBits = 13; // 8k 'pages'
|
|
|
-};
|
|
|
-
|
|
|
class CacheReportingBuffer : public CInterfaceOf<ICacheInfoRecorder>
|
|
|
{
|
|
|
// A circular buffer recording recent file activity. Note that noteRead() and clear() may be called from multiple threads
|
|
@@ -817,6 +767,75 @@ private:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+class IndexCacheWarmer : implements ICacheWarmer
|
|
|
+{
|
|
|
+ IRoxieFileCache *cache = nullptr;
|
|
|
+ Owned<ILazyFileIO> localFile;
|
|
|
+ Owned<IKeyIndex> keyIndex;
|
|
|
+ bool keyFailed = false;
|
|
|
+ unsigned fileIdx = (unsigned) -1;
|
|
|
+ unsigned filesProcessed = 0;
|
|
|
+ unsigned pagesPreloaded = 0;
|
|
|
+public:
|
|
|
+ IndexCacheWarmer(IRoxieFileCache *_cache) : cache(_cache) {}
|
|
|
+
|
|
|
+ virtual void startFile(const char *filename) override
|
|
|
+ {
|
|
|
+ // "filename" is the filename that roxie would use if it copied the file locally. This may not
|
|
|
+ // match the name of the actual file - e.g. if the file is local but in a different location.
|
|
|
+ localFile.setown(cache->lookupLocalFile(filename));
|
|
|
+ if (localFile)
|
|
|
+ {
|
|
|
+ fileIdx = localFile->getFileIdx();
|
|
|
+ }
|
|
|
+ keyFailed = false;
|
|
|
+ filesProcessed++;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool warmBlock(const char *filename, NodeType nodeType, offset_t startOffset, offset_t endOffset) override
|
|
|
+ {
|
|
|
+ if (nodeType != NodeNone && !keyFailed && localFile && !keyIndex)
|
|
|
+ {
|
|
|
+ //Pass false for isTLK - it will be initialised from the index header
|
|
|
+ keyIndex.setown(createKeyIndex(filename, localFile->getCrc(), *localFile.get(), fileIdx, false));
|
|
|
+ if (!keyIndex)
|
|
|
+ keyFailed = true;
|
|
|
+ }
|
|
|
+ if (nodeType != NodeNone && keyIndex)
|
|
|
+ {
|
|
|
+ // Round startOffset up to nearest multiple of index node size
|
|
|
+ unsigned nodeSize = keyIndex->getNodeSize();
|
|
|
+ startOffset = ((startOffset+nodeSize-1)/nodeSize)*nodeSize;
|
|
|
+ do
|
|
|
+ {
|
|
|
+ if (traceLevel > 8)
|
|
|
+ DBGLOG("prewarming index page %u %s %" I64F "x-%" I64F "x", (int) nodeType, filename, startOffset, endOffset);
|
|
|
+ bool loaded = keyIndex->prewarmPage(startOffset, nodeType);
|
|
|
+ if (!loaded)
|
|
|
+ break;
|
|
|
+ pagesPreloaded++;
|
|
|
+ startOffset += nodeSize;
|
|
|
+ }
|
|
|
+ while (startOffset < endOffset);
|
|
|
+ }
|
|
|
+ else if (fileIdx != (unsigned) -1)
|
|
|
+ cache->noteRead(fileIdx, startOffset, (endOffset-1) - startOffset); // Ensure pages we prewarm are recorded in our cache tracker
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void endFile() override
|
|
|
+ {
|
|
|
+ localFile.clear();
|
|
|
+ keyIndex.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void report() override
|
|
|
+ {
|
|
|
+ if (traceLevel)
|
|
|
+ DBGLOG("Processed %u files and preloaded %u index nodes", filesProcessed, pagesPreloaded);
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
|
|
|
{
|
|
|
friend class CcdFileTest;
|
|
@@ -1772,7 +1791,7 @@ public:
|
|
|
return ret.getLink();
|
|
|
}
|
|
|
|
|
|
- ILazyFileIO *lookupLocalFile(const char *filename)
|
|
|
+ virtual ILazyFileIO *lookupLocalFile(const char *filename)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
@@ -1823,6 +1842,8 @@ public:
|
|
|
|
|
|
virtual void loadSavedOsCacheInfo() override
|
|
|
{
|
|
|
+ if (!topology->getPropBool("@warmOsCache", true))
|
|
|
+ return;
|
|
|
Owned<const ITopologyServer> topology = getTopology();
|
|
|
for (unsigned channel : topology->queryChannels())
|
|
|
doLoadSavedOsCacheInfo(channel);
|
|
@@ -1839,16 +1860,35 @@ public:
|
|
|
if (!dllserver_root)
|
|
|
return;
|
|
|
#endif
|
|
|
+ unsigned cacheWarmTraceLevel = topology->getPropInt("@cacheWarmTraceLevel", traceLevel);
|
|
|
VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), channel);
|
|
|
StringBuffer cacheInfo;
|
|
|
try
|
|
|
{
|
|
|
if (checkFileExists(cacheFileName))
|
|
|
{
|
|
|
+#ifndef _WIN32
|
|
|
+ StringBuffer output;
|
|
|
+ VStringBuffer command("ccdcache %s -t %u", cacheFileName.str(), cacheWarmTraceLevel);
|
|
|
+ unsigned retcode = runExternalCommand(nullptr, output, output, command, nullptr);
|
|
|
+ if (output.length())
|
|
|
+ {
|
|
|
+ StringArray outputLines;
|
|
|
+ outputLines.appendList(output, "\n");
|
|
|
+ ForEachItemIn(idx, outputLines)
|
|
|
+ {
|
|
|
+ const char *line = outputLines.item(idx);
|
|
|
+ if (line && *line)
|
|
|
+ DBGLOG("ccdcache: %s", line);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (retcode)
|
|
|
+ DBGLOG("ccdcache failed with exit code %u", retcode);
|
|
|
+#endif
|
|
|
cacheInfo.loadFile(cacheFileName, false);
|
|
|
- warmOsCache(cacheInfo);
|
|
|
if (traceLevel)
|
|
|
- DBGLOG("Loaded cache information from %s for channel %d", cacheFileName.str(), channel);
|
|
|
+ DBGLOG("Loading cache information from %s for channel %d", cacheFileName.str(), channel);
|
|
|
+ warmOsCache(cacheInfo);
|
|
|
}
|
|
|
}
|
|
|
catch(IException *E)
|
|
@@ -1863,163 +1903,10 @@ public:
|
|
|
{
|
|
|
if (!cacheInfo)
|
|
|
return;
|
|
|
-#ifndef _WIN32
|
|
|
- size_t os_page_size = getpagesize();
|
|
|
-#endif
|
|
|
- char t = 0;
|
|
|
- unsigned touched = 0;
|
|
|
- unsigned preloaded = 0;
|
|
|
- Owned<const ITopologyServer> topology = getTopology();
|
|
|
- while (*cacheInfo)
|
|
|
- {
|
|
|
- // We are parsing lines that look like:
|
|
|
- // <channel>|<filename>|<pagelist>
|
|
|
- //
|
|
|
- // Where pagelist is a space-separated list of page numbers or (inclusive) ranges.
|
|
|
- // A page number or range prefixed by a * means that the page(s) was found in the jhtree cache.
|
|
|
- //
|
|
|
- // For example,
|
|
|
- // 1|/var/lib/HPCCSystems/hpcc-data/unknown/regress/multi/dg_index_evens._1_of_3|*0 3-4
|
|
|
- // Pages are always recorded and specified as 8192 bytes (unless pagebits ever changes).
|
|
|
-
|
|
|
- unsigned fileChannel = strtoul(cacheInfo, (char **) &cacheInfo, 10);
|
|
|
- if (*cacheInfo != '|')
|
|
|
- break;
|
|
|
- if (!topology->implementsChannel(fileChannel))
|
|
|
- {
|
|
|
- const char *eol = strchr(cacheInfo, '\n');
|
|
|
- if (!eol)
|
|
|
- break;
|
|
|
- cacheInfo = eol+1;
|
|
|
- continue;
|
|
|
- }
|
|
|
- cacheInfo++;
|
|
|
- const char *endName = strchr(cacheInfo, '|');
|
|
|
- assert(endName);
|
|
|
- if (!endName)
|
|
|
- break;
|
|
|
- StringBuffer fileName(endName-cacheInfo, cacheInfo);
|
|
|
- Owned<IKeyIndex> keyIndex;
|
|
|
- bool keyFailed = false;
|
|
|
- unsigned fileIdx = (unsigned) -1;
|
|
|
-#ifndef _WIN32
|
|
|
- char *file_mmap = nullptr;
|
|
|
- int fd = open(fileName, 0);
|
|
|
- struct stat file_stat;
|
|
|
- if (fd != -1)
|
|
|
- {
|
|
|
- fstat(fd, &file_stat);
|
|
|
- file_mmap = (char *) mmap((void *)0, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
|
|
|
- if (file_mmap == MAP_FAILED)
|
|
|
- {
|
|
|
- DBGLOG("Failed to map file %s to pre-warm cache (error %d)", fileName.str(), errno);
|
|
|
- file_mmap = nullptr;
|
|
|
- }
|
|
|
- }
|
|
|
- else if (traceLevel)
|
|
|
- {
|
|
|
- DBGLOG("Failed to open file %s to pre-warm cache (error %d)", fileName.str(), errno);
|
|
|
- }
|
|
|
-#endif
|
|
|
- // "fileName" is the filename that roxie would use if it copied the file locally. This may not
|
|
|
- // match the name of the actual file - e.g. if the file is local but in a different location.
|
|
|
- Owned<ILazyFileIO> localFile = lookupLocalFile(fileName);
|
|
|
- if (localFile)
|
|
|
- {
|
|
|
- fileIdx = localFile->getFileIdx();
|
|
|
- }
|
|
|
- cacheInfo = endName+1; // Skip the |
|
|
|
- while (*cacheInfo==' ')
|
|
|
- cacheInfo++;
|
|
|
- for (;;)
|
|
|
- {
|
|
|
- bool inNodeCache = (*cacheInfo=='*');
|
|
|
- NodeType nodeType = NodeNone;
|
|
|
- if (inNodeCache)
|
|
|
- {
|
|
|
- cacheInfo++;
|
|
|
- switch (*cacheInfo)
|
|
|
- {
|
|
|
- case 'R': nodeType = NodeBranch; break;
|
|
|
- case 'L': nodeType = NodeLeaf; break;
|
|
|
- case 'B': nodeType = NodeBlob; break;
|
|
|
- default:
|
|
|
- throwUnexpectedX("Unknown node type");
|
|
|
- }
|
|
|
- cacheInfo++;
|
|
|
- }
|
|
|
- __uint64 startPage = readPage(cacheInfo);
|
|
|
- __uint64 endPage;
|
|
|
- if (*cacheInfo=='-')
|
|
|
- {
|
|
|
- cacheInfo++;
|
|
|
- endPage = readPage(cacheInfo);
|
|
|
- }
|
|
|
- else
|
|
|
- endPage = startPage;
|
|
|
- if (traceLevel > 8)
|
|
|
- DBGLOG("Touching %s %" I64F "x-%" I64F "x", fileName.str(), startPage, endPage);
|
|
|
- offset_t startOffset = startPage << CacheInfoEntry::pageBits;
|
|
|
- offset_t endOffset = (endPage+1) << CacheInfoEntry::pageBits;
|
|
|
- if (inNodeCache && !keyFailed && localFile && !keyIndex)
|
|
|
- {
|
|
|
- //Pass false for isTLK - it will be initialised from the index header
|
|
|
- keyIndex.setown(createKeyIndex(fileName, localFile->getCrc(), *localFile.get(), fileIdx, false));
|
|
|
- if (!keyIndex)
|
|
|
- keyFailed = true;
|
|
|
- }
|
|
|
- if (inNodeCache && keyIndex)
|
|
|
- {
|
|
|
- // Round startOffset up to nearest multiple of index node size
|
|
|
- unsigned nodeSize = keyIndex->getNodeSize();
|
|
|
- startOffset = ((startOffset+nodeSize-1)/nodeSize)*nodeSize;
|
|
|
- do
|
|
|
- {
|
|
|
- bool loaded = keyIndex->prewarmPage(startOffset, nodeType);
|
|
|
- if (!loaded)
|
|
|
- break;
|
|
|
- preloaded++;
|
|
|
- startOffset += nodeSize;
|
|
|
- }
|
|
|
- while (startOffset < endOffset);
|
|
|
- }
|
|
|
-#ifndef _WIN32
|
|
|
- else if (file_mmap)
|
|
|
- {
|
|
|
- if (fileIdx != (unsigned) -1)
|
|
|
- noteRead(fileIdx, startOffset, (endOffset-1) - startOffset); // Ensure pages we prewarm are recorded in our cache tracker
|
|
|
- do
|
|
|
- {
|
|
|
- if (startOffset >= (offset_t) file_stat.st_size)
|
|
|
- break; // Let's not core if the file has changed size since we recorded the info...
|
|
|
- t += file_mmap[startOffset]; // NOTE - t reported below so it cannot be optimized out
|
|
|
- touched++;
|
|
|
- startOffset += os_page_size;
|
|
|
- }
|
|
|
- while (startOffset < endOffset);
|
|
|
- }
|
|
|
-#endif
|
|
|
- if (*cacheInfo != ' ')
|
|
|
- break;
|
|
|
- cacheInfo++;
|
|
|
- }
|
|
|
-#ifndef _WIN32
|
|
|
- if (file_mmap)
|
|
|
- munmap(file_mmap, file_stat.st_size);
|
|
|
- if (fd != -1)
|
|
|
- close(fd);
|
|
|
-#endif
|
|
|
- if (*cacheInfo != '\n')
|
|
|
- break;
|
|
|
- cacheInfo++;
|
|
|
- }
|
|
|
- assert(!*cacheInfo);
|
|
|
- if (*cacheInfo)
|
|
|
- {
|
|
|
- DBGLOG("WARNING: Unrecognized cacheInfo format at %.20s", cacheInfo);
|
|
|
- }
|
|
|
- if (traceLevel)
|
|
|
- DBGLOG("Touched %d pages, preloaded %d index nodes, result %d", touched, preloaded, t); // We report t to make sure that compiler doesn't decide to optimize it away entirely
|
|
|
+ IndexCacheWarmer warmer(this);
|
|
|
+ if (!::warmOsCache(cacheInfo, &warmer))
|
|
|
+ DBGLOG("WARNING: Unrecognized cacheInfo format");
|
|
|
+ warmer.report();
|
|
|
}
|
|
|
|
|
|
virtual void clearOsCache() override
|
|
@@ -2305,7 +2192,7 @@ public:
|
|
|
{
|
|
|
if (part > numParts || part == 0)
|
|
|
{
|
|
|
- throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existant file part %d (valid are 1-%d)", part, numParts);
|
|
|
+ throw MakeStringException(ROXIE_FILE_ERROR, "Internal error - requesting base for non-existent file part %d (valid are 1-%d)", part, numParts);
|
|
|
}
|
|
|
return map[part-1].base;
|
|
|
}
|