Ver código fonte

Merge pull request #14113 from richardkchapman/roxie-cache-warm

HPCC-23425 Roxie cache prewarming

Reviewed-by: Jake Smith
Reviewed-by: Mark Kelly
Reviewed-by: Gavin Halliday
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 4 anos atrás
pai
commit
c3e7161b6e

+ 2 - 0
roxie/ccd/ccd.hpp

@@ -354,6 +354,8 @@ extern bool defaultDisableLocalOptimizations;
 extern unsigned agentQueryReleaseDelaySeconds;
 extern unsigned coresPerQuery;
 
+extern unsigned cacheReportPeriodSeconds;
+
 extern StringBuffer logDirectory;
 extern StringBuffer pluginDirectory;
 extern StringBuffer pluginsList;

+ 628 - 42
roxie/ccd/ccdfile.cpp

@@ -27,6 +27,7 @@
 
 #include "keydiff.hpp"
 
+#include "udptopo.hpp"
 #include "ccd.hpp"
 #include "ccdfile.hpp"
 #include "ccdquery.hpp"
@@ -34,7 +35,7 @@
 #include "ccdsnmp.hpp"
 #include "rmtfile.hpp"
 #include "ccdqueue.ipp"
-#ifdef __linux__
+#if defined(__linux__) || defined(__APPLE__)
 #include <sys/mman.h>
 #endif
 #if defined (__linux__)
@@ -86,17 +87,19 @@ class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public
 protected:
     IArrayOf<IFile> sources;
     Owned<IFile> logical;
-    unsigned currentIdx;
     Owned<IFileIO> current;
     Owned<IMemoryMappedFile> mmapped;
     mutable CriticalSection crit;
-    bool remote;
     offset_t fileSize;
-    CDateTime fileDate;
+    unsigned currentIdx;
     unsigned lastAccess;
-    bool copying;
-    bool isCompressed;
-    const IRoxieFileCache *cached;
+    CDateTime fileDate;
+    bool copying = false;
+    bool isCompressed = false;
+    bool remote = false;
+    IRoxieFileCache *cached = nullptr;
+    unsigned fileIdx = 0;
+    unsigned crc = 0;
     CRuntimeStatisticCollection fileStats;
 
 #ifdef FAIL_20_READ
@@ -106,19 +109,16 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed)
-        : logical(_logical), fileSize(size), isCompressed(_isCompressed), fileStats(diskLocalStatistics)
+    CRoxieLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, bool _isCompressed, unsigned _crc)
+        : logical(_logical), fileSize(size), isCompressed(_isCompressed), crc(_crc), fileStats(diskLocalStatistics)
     {
         fileDate.set(_date);
         currentIdx = 0;
         current.set(&failure);
-        remote = false;
 #ifdef FAIL_20_READ
         readCount = 0;
 #endif
         lastAccess = msTick();
-        copying = false;
-        cached = NULL;
     }
     
     ~CRoxieLazyFileIO()
@@ -132,10 +132,21 @@ public:
             cached->removeCache(this);
     }
 
-    void setCache(const IRoxieFileCache *cache)
+    virtual unsigned getFileIdx() const override
+    {
+        return fileIdx;
+    }
+
+    virtual unsigned getCrc() const override
+    {
+        return crc;
+    }
+
+    void setCache(IRoxieFileCache *cache, unsigned _fileIdx)
     {
         assertex(!cached);
         cached = cache;
+        fileIdx = _fileIdx;
     }
 
     void removeCache(const IRoxieFileCache *cache)
@@ -331,6 +342,8 @@ public:
             {
                 size32_t ret = active->read(pos, len, data);
                 lastAccess = msTick();
+                if (cached && !remote)
+                    cached->noteRead(fileIdx, pos, ret);
                 return ret;
 
             }
@@ -587,24 +600,200 @@ static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
 
 typedef StringArray *StringArrayPtr;
 
+// A circular buffer recording recent disk read operations that can be used to "prewarm" the cache
+
+struct CacheInfoEntry
+{
+    union
+    {
+        struct
+        {
+            bool diskCache : 1;   // false means it's in the jhtree cache, true means it's only in OS disk cache
+            __uint64 page: 39;
+            unsigned file: 24;
+        } b;
+        __uint64 u;
+    };
+
+    inline CacheInfoEntry() { u = 0; }
+    inline CacheInfoEntry(unsigned _file, offset_t _pos, bool _diskCache)
+    {
+        b.file = _file;
+        b.page = _pos >> pageBits;
+        b.diskCache = _diskCache;
+    }
+    inline bool operator < ( const CacheInfoEntry &l) const { return u < l.u; }
+    inline bool operator <= ( const CacheInfoEntry &l) const { return u <= l.u; }
+    inline void operator++ () { b.page++; }
+
+    static constexpr unsigned pageBits = 13;  // 8k 'pages'
+};
+
+class CacheReportingBuffer : public CInterfaceOf<ICacheInfoRecorder>
+{
+    // A circular buffer recording recent file activity. Note that noteRead() and clear() may be called from multiple threads
+    // (other functions are assumed single-threaded) and that locking is kept to a minimum, even if it means information may be slightly inaccurate.
+    CacheInfoEntry *recentReads = nullptr;
+    std::atomic<unsigned> recentReadHead = {0};
+    unsigned recentReadSize;
+
+public:
+
+    CacheReportingBuffer(offset_t trackSize)
+    {
+        recentReadSize = trackSize >> CacheInfoEntry::pageBits;
+        if (traceLevel)
+            DBGLOG("Creating CacheReportingBuffer with %d elements", recentReadSize);
+        assertex(recentReadSize);
+        recentReads = new CacheInfoEntry[recentReadSize];
+        recentReadHead = 0;
+    }
+    CacheReportingBuffer(const CacheReportingBuffer &from)
+    {
+        // NOTE - from may be updated concurrently - we do not want to lock it
+        // There are therefore races in here, but they do not matter (may result in very recent data being regarded as very old or vice versa).
+        recentReadSize = from.recentReadSize;
+        recentReadHead = from.recentReadHead.load(std::memory_order_relaxed);
+        recentReads = new CacheInfoEntry[recentReadSize];
+        memcpy(recentReads, from.recentReads, recentReadSize * sizeof(CacheInfoEntry));
+    }
+
+    ~CacheReportingBuffer()
+    {
+        delete [] recentReads;
+    }
+
+    void clear()
+    {
+        recentReadHead = 0;
+    }
+
+    void noteRead(unsigned fileIdx, offset_t pos, unsigned len, bool diskCache)
+    {
+        if (recentReads && len)
+        {
+            CacheInfoEntry start(fileIdx, pos, diskCache);
+            CacheInfoEntry end(fileIdx, pos+len-1, diskCache);
+            for(;start <= end; ++start)
+            {
+                recentReads[recentReadHead++ % recentReadSize] = start;
+            }
+        }
+    }
+
+    void sortAndDedup()
+    {
+        // NOTE: single-threaded
+        unsigned sortSize;
+        if (recentReadHead > recentReadSize)
+            sortSize = recentReadSize;
+        else
+            sortSize = recentReadHead;
+        std::sort(recentReads, recentReads + sortSize);
+        CacheInfoEntry lastPos(-1,-1,false);
+        unsigned dest = 0;
+        for (unsigned idx = 0; idx < sortSize; idx++)
+        {
+            CacheInfoEntry pos = recentReads[idx];
+            if (pos.b.file != lastPos.b.file || pos.b.page != lastPos.b.page)   // Ignore inNodeCache bit when deduping
+            {
+                recentReads[dest++] = pos;
+                lastPos = pos;
+            }
+        }
+        recentReadHead = dest;
+    }
+
+    void report(StringBuffer &ret, unsigned channel, const StringArray &cacheIndexes, const UnsignedShortArray &cacheIndexChannels)
+    {
+        // NOTE: single-threaded
+        assertex(recentReadHead <= recentReadSize);  // Should have sorted and deduped before calling this
+        unsigned lastFileIdx = (unsigned) -1;
+        offset_t lastPage = (offset_t) -1;
+        offset_t startRange = 0;
+        bool lastDiskCache = false;
+        bool includeFile = false;
+        for (unsigned idx = 0; idx < recentReadHead; idx++)
+        {
+            CacheInfoEntry pos = recentReads[idx];
+            if (pos.b.file != lastFileIdx)
+            {
+                if (includeFile)
+                    appendRange(ret, startRange, lastPage, lastDiskCache).newline();
+                lastFileIdx = pos.b.file;
+                if (channel==(unsigned) -1 || cacheIndexChannels.item(lastFileIdx)==channel)
+                {
+                    ret.appendf("%u|%s|", cacheIndexChannels.item(lastFileIdx), cacheIndexes.item(lastFileIdx));
+                    includeFile = true;
+                }
+                else
+                    includeFile = false;
+                startRange = pos.b.page;
+            }
+            else if ((pos.b.page == lastPage || pos.b.page == lastPage+1) && pos.b.diskCache == lastDiskCache)
+            {
+                // Still in current range
+            }
+            else
+            {
+                if (includeFile)
+                    appendRange(ret, startRange, lastPage, lastDiskCache);
+                startRange = pos.b.page;
+            }
+            lastPage = pos.b.page;
+            lastDiskCache = pos.b.diskCache;
+        }
+        if (includeFile)
+            appendRange(ret, startRange, lastPage, lastDiskCache).newline();
+    }
+
+    virtual void noteWarm(unsigned fileIdx, offset_t pos, unsigned len) override
+    {
+        noteRead(fileIdx, pos, len, false);
+    }
+
+private:
+    static StringBuffer &appendRange(StringBuffer &ret, offset_t start, offset_t end, bool diskCache)
+    {
+        if (!diskCache)
+            ret.append('*');
+        if (start==end)
+            ret.appendf("%" I64F "x", start);
+        else
+            ret.appendf("%" I64F "x-%" I64F "x", start, end);
+        return ret;
+    }
+};
+
 class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress, public CInterface
 {
     friend class CcdFileTest;
     mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
     InterruptableSemaphore toCopy;
     InterruptableSemaphore toClose;
+#ifdef _CONTAINERIZED
+    InterruptableSemaphore cidtSleep;
+#endif
     mutable CopyMapStringToMyClass<ILazyFileIO> files;
     mutable CriticalSection crit;
     CriticalSection cpcrit;
     bool started;
     bool aborting;
-    bool closing;
-    bool testMode;
+    std::atomic<bool> closing;
     bool closePending[2];
     StringAttrMapping fileErrorList;
+#ifdef _CONTAINERIZED
+    Semaphore cidtStarted;
+#endif
     Semaphore bctStarted;
     Semaphore hctStarted;
 
+    // Read-tracking code for pre-warming OS caches
+
+    StringArray cacheIndexes;
+    UnsignedShortArray cacheIndexChannels;
+    CacheReportingBuffer *activeCacheReportingBuffer = nullptr;
+
     RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true)
     {
         // Ensure that SockFile does not keep these sockets open (or we will run out)
@@ -633,14 +822,98 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
             return FileNotFound;
     }
 
-    ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
+#ifdef _CONTAINERIZED
+    int runCacheInfoDump()
+    {
+        cidtStarted.signal();
+        if (traceLevel)
+            DBGLOG("Cache info dump thread %p starting", this);
+        try
+        {
+            for (;;)
+            {
+                cidtSleep.wait(cacheReportPeriodSeconds * 1000);
+                if (closing)
+                    break;
+                if (traceLevel)
+                    DBGLOG("Cache info dump");
+                // Note - cache info is stored in the DLLSERVER persistent area - which we should perhaps consider renaming
+                const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
+                assertex(dllserver_root != nullptr);
+
+                Owned<const ITopologyServer> topology = getTopology();
+                Owned<CacheReportingBuffer> tempCacheReportingBuffer = new CacheReportingBuffer(*activeCacheReportingBuffer);
+                getNodeCacheInfo(*tempCacheReportingBuffer);
+
+                tempCacheReportingBuffer->sortAndDedup();
+                StringBuffer ret;
+                tempCacheReportingBuffer->report(ret, 0, cacheIndexes, cacheIndexChannels);
+                if (ret.length())
+                {
+                    // NOTE - this location is shared with other nodes - who may also be writing
+                    VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), 0);
+                    atomicWriteFile(cacheFileName, ret);
+                    if (traceLevel > 8)
+                        DBGLOG("Channel 0 cache info:\n%s", ret.str());
+                }
+                for (unsigned channel : topology->queryChannels())
+                {
+                    tempCacheReportingBuffer->report(ret.clear(), channel, cacheIndexes, cacheIndexChannels);
+                    if (ret.length())
+                    {
+                        VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), channel);
+                        atomicWriteFile(cacheFileName, ret);
+                        if (traceLevel > 8)
+                            DBGLOG("Channel %u cache info:\n%s", channel, ret.str());
+                    }
+                }
+                // We could at this point put deduped back into active
+            }
+        }
+        catch (IException *E)
+        {
+            // Any exceptions terminate the thread - probably a better option than flooding the log
+            if (!aborting)
+                EXCLOG(MCoperatorError, E, "Cache info dumper: ");
+            E->Release();
+        }
+        catch (...)
+        {
+            IERRLOG("Unknown exception in cache info dump thread");
+        }
+        if (traceLevel)
+            DBGLOG("Cache info dump thread %p exiting", this);
+        return 0;
+    }
+#endif
+
+    unsigned trackCache(const char *filename, unsigned channel)
+    {
+        // NOTE - called from openFile, with crit already held
+        if (!activeCacheReportingBuffer)
+            return (unsigned) -1;
+        cacheIndexes.append(filename);
+        cacheIndexChannels.append(channel);
+        return cacheIndexes.length()-1;
+    }
+
+    virtual void noteRead(unsigned fileIdx, offset_t pos, unsigned len) override
+    {
+        if (activeCacheReportingBuffer)
+            activeCacheReportingBuffer->noteRead(fileIdx, pos, len, true);
+    }
+
+    ILazyFileIO *openFile(const char *lfn, unsigned partNo, unsigned channel, const char *localLocation,
                            IPartDescriptor *pdesc,
                            const StringArray &remoteLocationInfo,
                            offset_t size, const CDateTime &modified)
     {
         Owned<IFile> local = createIFile(localLocation);
-        bool isCompressed = testMode ? false : pdesc->queryOwner().isCompressed();
-        Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed);
+        bool isCompressed = selfTestMode ? false : pdesc->queryOwner().isCompressed();
+        unsigned crc = 0;
+        if (!selfTestMode)
+            pdesc->getCrc(crc);
+        Owned<CRoxieLazyFileIO> ret = new CRoxieLazyFileIO(local.getLink(), size, modified, isCompressed, crc);
         RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, isCompressed);
         if (fileStatus == FileIsValid)
         {
@@ -655,7 +928,7 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
 
             // put the peerRoxieLocations next in the list
             StringArray localLocations;
-            if (testMode)
+            if (selfTestMode)
                 localLocations.append("test.buddy");
             else
                 appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true);  // Adds all locations on the same cluster
@@ -690,9 +963,10 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
                     EXCLOG(MCoperatorError, E, "While creating remote file reference");
                     E->Release();
                 }
+                ret->setRemote(true);
             }
 
-            if (!addedOne && (copyResources || useRemoteResources || testMode))  // If no peer locations available, go to remote
+            if (!addedOne && (copyResources || useRemoteResources || selfTestMode))  // If no peer locations available, go to remote
             {
                 ForEachItemIn(idx, remoteLocationInfo)
                 {
@@ -707,6 +981,10 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
                         {
                             if (miscDebugTraceLevel > 5)
                                 DBGLOG("adding remote location %s", remoteName);
+                            RemoteFilename rfn;
+                            rfn.setRemotePath(remoteName);
+                            if (!rfn.isLocal())    // MORE - may still want to copy files even if they are on a posix-accessible path, for local caching? Probably really want to know if hooked or not...
+                                ret->setRemote(true);
                             ret->addSource(remote.getClear());
                             addedOne = true;
                         }
@@ -743,9 +1021,8 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
                     throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
                 }
             }
-            ret->setRemote(true);
         }
-        ret->setCache(this);
+        ret->setCache(this, trackCache(local->queryFilename(), channel));
         files.setValue(local->queryFilename(), (ILazyFileIO *)ret);
         return ret.getClear();
     }
@@ -908,13 +1185,44 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
 public:
     IMPLEMENT_IINTERFACE;
 
-    CRoxieFileCache(bool _testMode = false) : testMode(_testMode), bct(*this), hct(*this)
+    CRoxieFileCache() :
+#ifdef _CONTAINERIZED
+                        cidt(*this),
+#endif
+                        bct(*this), hct(*this)
     {
         aborting = false;
         closing = false;
         closePending[false] = false;
         closePending[true] = false;
         started = false;
+        if (!selfTestMode && !allFilesDynamic)
+        {
+            offset_t cacheTrackSize = queryComponentConfig().getPropInt64("@cacheTrackSize", (offset_t) -1);
+            if (cacheTrackSize == (offset_t) -1)
+            {
+                const char *memLimit = queryComponentConfig().queryProp("resources/limits/@memory");
+                if (!memLimit)
+                    memLimit = queryComponentConfig().queryProp("resources/requests/@memory");
+                if (memLimit)
+                {
+                    try
+                    {
+                        cacheTrackSize = friendlyStringToSize(memLimit);
+                    }
+                    catch (IException *E)
+                    {
+                        EXCLOG(E);
+                        E->Release();
+                        cacheTrackSize = 0;
+                    }
+                }
+                else
+                    cacheTrackSize = 0x10000 * (1<<CacheInfoEntry::pageBits);
+            }
+            if (cacheTrackSize)
+                activeCacheReportingBuffer = new CacheReportingBuffer(cacheTrackSize);
+        }
     }
 
     ~CRoxieFileCache()
@@ -927,6 +1235,7 @@ public:
             ILazyFileIO *f = files.mapToValue(&h.query());
             f->removeCache(this);
         }
+        delete activeCacheReportingBuffer;
     }
 
     virtual void start()
@@ -937,15 +1246,40 @@ public:
             hct.start();
             bctStarted.wait();
             hctStarted.wait();
-            started = true;
         }
+        started = true;
+    }
+
+    virtual void startCacheReporter() override
+    {
+#ifdef _CONTAINERIZED
+        if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
+        {
+            cidt.start();
+            cidtStarted.wait();
+        }
+#endif
     }
 
+#ifdef _CONTAINERIZED
+    class CacheInfoDumpThread : public Thread
+    {
+        CRoxieFileCache &owner;
+    public:
+        CacheInfoDumpThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-CacheInfoDumpThread"), owner(_owner) {}
+
+        virtual int run()
+        {
+            return owner.runCacheInfoDump();
+        }
+    } cidt;
+#endif
+
     class BackgroundCopyThread : public Thread
     {
         CRoxieFileCache &owner;
     public:
-        BackgroundCopyThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCacheBackgroundCopyThread"), owner(_owner) {}
+        BackgroundCopyThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-BackgroundCopyThread"), owner(_owner) {}
 
         virtual int run()
         {
@@ -957,7 +1291,7 @@ public:
     {
         CRoxieFileCache &owner;
     public:
-        HandleCloserThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCacheHandleCloserThread"), owner(_owner) {}
+        HandleCloserThread(CRoxieFileCache &_owner) : Thread("CRoxieFileCache-HandleCloserThread"), owner(_owner) {}
         virtual int run()
         {
             return owner.runHandleCloser();
@@ -1091,6 +1425,13 @@ public:
             toClose.interrupt();
             bct.join(timeout);
             hct.join(timeout);
+#ifdef _CONTAINERIZED
+            if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
+            {
+                cidtSleep.interrupt();
+                cidt.join(timeout);
+            }
+#endif
         }
     }
 
@@ -1103,6 +1444,13 @@ public:
             toClose.signal();
             bct.join();
             hct.join();
+#ifdef _CONTAINERIZED
+            if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
+            {
+                cidtSleep.signal();
+                cidt.join();
+            }
+#endif
         }
     }
 
@@ -1131,9 +1479,10 @@ public:
     }
 
     virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
-                                     IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
+                                     IPartDescriptor *pdesc, unsigned numParts, unsigned channel,
                                      const StringArray &deployedLocationInfo, bool startFileCopy)
     {
+        unsigned replicationLevel = getReplicationLevel(channel);
         IPropertyTree &partProps = pdesc->queryProperties();
         offset_t dfsSize = partProps.getPropInt64("@size", -1);
         bool local = partProps.getPropBool("@local");
@@ -1204,7 +1553,7 @@ public:
                     return f.getClear();
             }
 
-            ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
+            ret.setown(openFile(lfn, partNo, channel, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
 
             if (startFileCopy)
             {
@@ -1258,6 +1607,22 @@ public:
         return ret.getLink();
     }
 
+    ILazyFileIO *lookupLocalFile(const char *filename)
+    {
+        try
+        {
+            CriticalBlock b(crit);
+            ILazyFileIO * match = files.getValue(filename);
+            if (match && match->isAliveAndLink())
+                return match;
+        }
+        catch(IException *e)
+        {
+            e->Release();
+        }
+        return nullptr;
+    }
+
     virtual void closeExpired(bool remote)
     {
         // This schedules a close at the next available opportunity
@@ -1270,6 +1635,225 @@ public:
         }
     }
 
+    static unsigned __int64 readPage(const char * &_t)
+    {
+        const char *t = _t;
+        unsigned __int64 v = 0;
+        for (;;)
+        {
+            char c = *t;
+            if ((c >= '0') && (c <= '9'))
+                v = v * 16 + (c-'0');
+            else if ((c >= 'a') && (c <= 'f'))
+                v = v * 16 + (c-'a'+10);
+            else if ((c >= 'A') && (c <= 'F'))
+                v = v * 16 + (c-'A'+10);
+            else
+                break;
+            t++;
+        }
+        _t = t;
+        return v;
+    }
+
+    virtual void loadSavedOsCacheInfo() override
+    {
+        Owned<const ITopologyServer> topology = getTopology();
+        for (unsigned channel : topology->queryChannels())
+            doLoadSavedOsCacheInfo(channel);
+        doLoadSavedOsCacheInfo(0);  // MORE - maybe only if I am also a server?
+    }
+
+    void doLoadSavedOsCacheInfo(unsigned channel)
+    {
+        const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");
+        assertex(dllserver_root != nullptr);
+        VStringBuffer cacheFileName("%s/%s/cacheInfo.%d", dllserver_root, roxieName.str(), channel);
+        StringBuffer cacheInfo;
+        try
+        {
+            if (checkFileExists(cacheFileName))
+            {
+                cacheInfo.loadFile(cacheFileName, false);
+                warmOsCache(cacheInfo);
+                if (traceLevel)
+                    DBGLOG("Loaded cache information from %s for channel %d", cacheFileName.str(), channel);
+            }
+        }
+        catch(IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+        }
+    }
+
+
+    virtual void warmOsCache(const char *cacheInfo) override
+    {
+        if (!cacheInfo)
+            return;
+#ifndef _WIN32
+        size_t os_page_size = getpagesize();
+#endif
+        char t = 0;
+        unsigned touched = 0;
+        unsigned preloaded = 0;
+        Owned<const ITopologyServer> topology = getTopology();
+        while (*cacheInfo)
+        {
+            // We are parsing lines that look like:
+            // <channel>|<filename>|<pagelist>
+            //
+            // Where pagelist is a space-separated list of page numers or (inclusive) ranges.
+            // A page number or range prefixed by a * means that the page(s) was found in the jhtree cache.
+            //
+            // For example,
+            // 1|/var/lib/HPCCSystems/hpcc-data/unknown/regress/multi/dg_index_evens._1_of_3|*0 3-4
+            // Pages are always recorded and specified as 8192 bytes (unless pagebits ever changes).
+
+            unsigned fileChannel = strtoul(cacheInfo, (char **) &cacheInfo, 10);
+            if (*cacheInfo != '|')
+                break;
+            if (!topology->implementsChannel(fileChannel))
+            {
+                const char *eol = strchr(cacheInfo, '\n');
+                if (!eol)
+                    break;
+                cacheInfo = eol+1;
+                continue;
+            }
+            cacheInfo++;
+            const char *endName = strchr(cacheInfo, '|');
+            assert(endName);
+            if (!endName)
+                break;
+            StringBuffer fileName(endName-cacheInfo, cacheInfo);
+            Owned<IKeyIndex> keyIndex;
+            bool keyFailed = false;
+            unsigned fileIdx = (unsigned) -1;
+#ifndef _WIN32
+            char *file_mmap = nullptr;
+            int fd = open(fileName, 0);
+            struct stat file_stat;
+            if (fd != -1)
+            {
+                fstat(fd, &file_stat);
+                file_mmap = (char *) mmap((void *)0, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
+                if (file_mmap == MAP_FAILED)
+                {
+                    DBGLOG("Failed to map file %s to pre-warm cache (error %d)", fileName.str(), errno);
+                    file_mmap = nullptr;
+                }
+            }
+            else if (traceLevel)
+            {
+                DBGLOG("Failed to open file %s to pre-warm cache (error %d)", fileName.str(), errno);
+            }
+#endif
+            Owned<ILazyFileIO> localFile = lookupLocalFile(fileName);
+            if (localFile)
+            {
+                fileIdx = localFile->getFileIdx();
+            }
+            cacheInfo = endName+1;  // Skip the |
+            while (*cacheInfo==' ')
+                cacheInfo++;
+            for (;;)
+            {
+                bool inNodeCache = (*cacheInfo=='*');
+                if (inNodeCache)
+                    cacheInfo++;
+                __uint64 startPage = readPage(cacheInfo);
+                __uint64 endPage;
+                if (*cacheInfo=='-')
+                {
+                    cacheInfo++;
+                    endPage = readPage(cacheInfo);
+                }
+                else
+                    endPage = startPage;
+                if (traceLevel > 8)
+                    DBGLOG("Touching %s %" I64F "x-%" I64F "x", fileName.str(), startPage, endPage);
+                offset_t startOffset = startPage << CacheInfoEntry::pageBits;
+                offset_t endOffset = (endPage+1) << CacheInfoEntry::pageBits;
+                if (inNodeCache && !keyFailed && localFile && !keyIndex)
+                {
+                    keyIndex.setown(createKeyIndex(fileName, localFile->getCrc(), *localFile.get(), fileIdx, false, false));  // MORE - we don't know if it's a TLK, but hopefully it doesn't matter
+                    if (!keyIndex)
+                        keyFailed = true;
+                }
+                if (inNodeCache && keyIndex)
+                {
+                    // Round startOffset up to nearest multiple of index node size
+                    unsigned nodeSize = keyIndex->getNodeSize();
+                    startOffset = ((startOffset+nodeSize-1)/nodeSize)*nodeSize;
+                    do
+                    {
+                        bool loaded = keyIndex->prewarmPage(startOffset);
+                        if (!loaded)
+                            break;
+                        preloaded++;
+                        startOffset += nodeSize;
+                    }
+                    while (startOffset < endOffset);
+                }
+#ifndef _WIN32
+                else if (file_mmap)
+                {
+                    if (fileIdx != (unsigned) -1)
+                        noteRead(fileIdx, startOffset, (endOffset-1) - startOffset);  // Ensure pages we prewarm are recorded in our cache tracker
+                    do
+                    {
+                        if (startOffset >= (offset_t) file_stat.st_size)
+                            break;    // Let's not core if the file has changed size since we recorded the info...
+                        t += file_mmap[startOffset];  // NOTE - t reported below so it cannot be optimized out
+                        touched++;
+                        startOffset += os_page_size;
+                    }
+                    while (startOffset < endOffset);
+                }
+#endif
+                if (*cacheInfo != ' ')
+                    break;
+                cacheInfo++;
+            }
+#ifndef _WIN32
+            if (file_mmap)
+                munmap(file_mmap, file_stat.st_size);
+            if (fd != -1)
+                close(fd);
+#endif
+            if (*cacheInfo != '\n')
+                break;
+            cacheInfo++;
+        }
+        assert(!*cacheInfo);
+        if (*cacheInfo)
+        {
+            DBGLOG("WARNING: Unrecognized cacheInfo format at %.20s", cacheInfo);
+        }
+        if (traceLevel)
+            DBGLOG("Touched %d pages, preloaded %d index nodes, result %d", touched, preloaded, t);  // We report t to make sure that compiler doesn't decide to optimize it away entirely
+    }
+
+    virtual void clearOsCache() override
+    {
+        if (activeCacheReportingBuffer)
+            activeCacheReportingBuffer->clear();
+    }
+
+    virtual void reportOsCache(StringBuffer &ret, unsigned channel) const override
+    {
+        if (activeCacheReportingBuffer)
+        {
+            Owned<CacheReportingBuffer> temp = new CacheReportingBuffer(*activeCacheReportingBuffer);
+            getNodeCacheInfo(*temp);
+            temp->sortAndDedup();
+            temp->report(ret, channel, cacheIndexes, cacheIndexChannels);
+            // We could at this point put deduped back into active
+        }
+    }
+
     void doCloseExpired(bool remote)
     {
         {
@@ -1403,7 +1987,7 @@ ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDes
     if (remotePDesc)
         appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false);    // Then any remote on remote dali
 
-    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, getReplicationLevel(channel), remoteLocations, startCopy);
+    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, channel, remoteLocations, startCopy);
 }
 
 //====================================================================================================
@@ -1809,8 +2393,6 @@ template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
     // For example, this one may want to be a hash table, and there may be many more entries
 };
 
-CRoxieFileCache * fileCache;
-
 class CResolvedFile : implements IResolvedFileCreator, implements ISafeSDSSubscription, public CInterface
 {
 protected:
@@ -2326,10 +2908,10 @@ public:
                             if (lazyOpen)
                             {
                                 // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
-                                keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), false, false));
+                                keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *QUERYINTERFACE(part.get(), IDelayedFile), part->getFileIdx(), false, false));
                             }
                             else
-                                keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), false, false));
+                                keyset->addIndex(createKeyIndex(part->queryFilename(), crc, *part.get(), part->getFileIdx(), false, false));
                         }
                         else
                             keyset->addIndex(NULL);
@@ -2343,7 +2925,6 @@ public:
         else
         {
             // Channel 0 means return the TLK
-            IArrayOf<IKeyIndexBase> subkeys;
             Owned<IKeyIndexSet> keyset = createKeyIndexSet();
             ForEachItemIn(idx, subFiles)
             {
@@ -2364,10 +2945,10 @@ public:
                     if (lazyOpen)
                     {
                         // We pass the IDelayedFile interface to createKeyIndex, so that it does not open the file immediately
-                        key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), numParts>1, false));
+                        key.setown(createKeyIndex(pname.str(), crc, *QUERYINTERFACE(keyFile.get(), IDelayedFile), keyFile->getFileIdx(), numParts>1, false));
                     }
                     else
-                        key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), numParts>1, false));
+                        key.setown(createKeyIndex(pname.str(), crc, *keyFile.get(), keyFile->getFileIdx(), numParts>1, false));
                     keyset->addIndex(LINK(key->queryPart(0)));
                 }
                 else
@@ -2766,23 +3347,27 @@ extern void releaseAgentDynamicFileCache()
         agentDynamicFileCache->releaseAll();
 }
 
+static Singleton<CRoxieFileCache> fileCache;
 
 // Initialization/termination
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
-    fileCache = new CRoxieFileCache;
     return true;
 }
 
 MODULE_EXIT()
 { 
-    fileCache->join();
-    fileCache->Release();
+    auto cache = fileCache.queryExisting();
+    if (cache)
+    {
+        cache->join();
+        cache->Release();
+    }
 }
 
 extern IRoxieFileCache &queryFileCache()
 {
-    return *fileCache;
+    return *fileCache.query([] { return new CRoxieFileCache; });
 }
 
 class CRoxieWriteHandler : implements IRoxieWriteHandler, public CInterface
@@ -3020,10 +3605,10 @@ protected:
 
     void testCopy()
     {
+        selfTestMode = true;
         remove("test.local");
         remove("test.remote");
         remove("test.buddy");
-        CRoxieFileCache cache(true);
         StringArray remotes;
         DummyPartDescriptor pdesc;
         CDateTime dummy;
@@ -3035,8 +3620,9 @@ protected:
         int wrote = write(f, &val, sizeof(int));
         CPPUNIT_ASSERT(wrote==sizeof(int));
         close(f);
+        CRoxieFileCache &cache = static_cast<CRoxieFileCache &>(queryFileCache());
 
-        Owned<ILazyFileIO> io = cache.openFile("test.local", 0, "test.local", NULL, remotes, sizeof(int), dummy);
+        Owned<ILazyFileIO> io = cache.openFile("test.local", 0, 0, "test.local", NULL, remotes, sizeof(int), dummy);
         CPPUNIT_ASSERT(io != NULL);
 
         // Reading it should read 1

+ 10 - 2
roxie/ccd/ccdfile.hpp

@@ -50,14 +50,16 @@ interface ILazyFileIO : extends IFileIO
     virtual bool isCopying() const = 0;
     virtual IMemoryMappedFile *getMappedFile() = 0;
 
-    virtual void setCache(const IRoxieFileCache *) = 0;
+    virtual void setCache(IRoxieFileCache *, unsigned fileIdx) = 0;
     virtual void removeCache(const IRoxieFileCache *) = 0;
+    virtual unsigned getFileIdx() const = 0;
+    virtual unsigned getCrc() const = 0;
 };
 
 interface IRoxieFileCache : extends IInterface
 {
     virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType, IPartDescriptor *pdesc, unsigned numParts,
-                                      unsigned replicationLevel, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
+                                      unsigned channel, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
     virtual RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true) = 0;
     virtual int numFilesToCopy() = 0;
     virtual void closeExpired(bool remote) = 0;
@@ -65,6 +67,12 @@ interface IRoxieFileCache : extends IInterface
     virtual void flushUnusedDirectories(const char *origBaseDir, const char *directory, StringBuffer &info) = 0;
     virtual void start() = 0;
     virtual void removeCache(ILazyFileIO *file) const = 0;
+    virtual void reportOsCache(StringBuffer &ret, unsigned channel) const = 0;
+    virtual void clearOsCache() = 0;
+    virtual void warmOsCache(const char *cacheInfo) = 0;
+    virtual void loadSavedOsCacheInfo() = 0;
+    virtual void noteRead(unsigned fileIdx, offset_t pos, unsigned len) = 0;
+    virtual void startCacheReporter() = 0;
 };
 
 interface IDiffFileInfoCache : extends IInterface

+ 10 - 1
roxie/ccd/ccdmain.cpp

@@ -177,6 +177,8 @@ bool adhocRoxie = false;
 unsigned __int64 minFreeDiskSpace = 1024 * 0x100000;  // default to 1 GB
 unsigned socketCheckInterval = 5000;
 
+unsigned cacheReportPeriodSeconds = 5*60;
+
 StringBuffer logDirectory;
 StringBuffer pluginDirectory;
 StringBuffer queryDirectory;
@@ -713,6 +715,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         if (standAloneDll || wuid)
         {
             oneShotRoxie = true;
+            allFilesDynamic = true;
             if (topology->getPropBool("@server", false))
             {
 #ifdef _CONTAINERIZED
@@ -1047,6 +1050,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             parallelLoadQueries = 1;
 
         enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
+        cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60);
 
         // NB: these directories will have been setup by topology earlier
         const char *primaryDirectory = queryBaseDirectory(grp_unknown, 0);
@@ -1184,7 +1188,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             loadPlugins();
         unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
 #ifdef _CONTAINERIZED
-        initializeTopology(topoValues, myRoles, traceLevel);
+        initializeTopology(topoValues, myRoles);
 #endif
         createDelayedReleaser();
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
@@ -1350,6 +1354,11 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
                     time(&startupTime);
                     roxieServer->start();
                 }
+#ifdef _CONTAINERIZED
+                queryFileCache().loadSavedOsCacheInfo();
+                queryFileCache().startCacheReporter();
+                publishTopology(traceLevel);
+#endif
                 writeSentinelFile(sentinelFile);
                 DBGLOG("Startup completed - LPT=%u APT=%u", queryNumLocalTrees(), queryNumAtomTrees());
                 DBGLOG("Waiting for queries");

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -303,6 +303,7 @@ public:
         case TAKdiskcount:
         case TAKdiskgroupaggregate:
         case TAKdiskexists:
+        case TAKkeyedjoin:  // The fetch part...
             return FileFormatMode::flat;
         default:
 #ifdef _DEBUG

+ 26 - 1
roxie/ccd/ccdstate.cpp

@@ -2131,7 +2131,23 @@ private:
             break;
 
         case 'C':
-            if (stricmp(queryName, "control:checkCompleted")==0)
+            if (stricmp(queryName, "control:cacheInfo")==0)
+            {
+                bool clear = control->getPropBool("@clear", false);
+                unsigned channel = control->getPropInt("@channel", -1);
+                if (clear)
+                    queryFileCache().clearOsCache();
+                else
+                {
+                    reply.append(" <CacheInfo");
+                    if (channel != (unsigned) -1)
+                        reply.appendf(" channel='%u'", channel);
+                    reply.append(">\n");
+                    queryFileCache().reportOsCache(reply, channel);
+                    reply.appendf(" </CacheInfo>\n");
+                }
+            }
+            else if (stricmp(queryName, "control:checkCompleted")==0)
             {
                 checkCompleted = control->getPropBool("@val", true);
                 topology->setPropBool("@checkCompleted", checkCompleted );
@@ -2671,6 +2687,15 @@ private:
                 topology->setPropInt64("@affinity", affinity);
                 updateAffinity(affinity);
             }
+            else if (stricmp(queryName, "control:setCacheInfo")==0)
+            {
+                Owned<IPTreeIterator> infos = control->getElements(".//CacheInfo");
+                ForEach(*infos)
+                {
+                    IPropertyTree &info = infos->query();
+                    queryFileCache().warmOsCache(info.queryProp(""));
+                }
+            }
             else if (stricmp(queryName, "control:setCopyResources")==0)
             {
                 copyResources = control->getPropBool("@val", true);

+ 1 - 1
roxie/roxie/roxie.cpp

@@ -47,7 +47,7 @@ static void roxie_server_usage()
 static constexpr const char * defaultYaml = R"!!(
 version: "1.0"
 roxie:
-  allFilesDynamic: true
+  allFilesDynamic: false
   localSlave: true
   numChannels: 1
   queueNames: roxie.roxie

+ 17 - 2
roxie/udplib/udptopo.cpp

@@ -96,6 +96,7 @@ public:
     virtual const SocketEndpointArray &queryServers(unsigned port) const override;
     virtual const ChannelInfo &queryChannelInfo(unsigned channel) const override;
     virtual const std::vector<unsigned> &queryChannels() const override;
+    virtual bool implementsChannel(unsigned channel) const override;
 
 private:
     std::map<unsigned, SocketEndpointArray> agents;  // indexed by channel
@@ -200,6 +201,15 @@ const std::vector<unsigned> &CTopologyServer::queryChannels() const
     return channels;
 }
 
+bool CTopologyServer::implementsChannel(unsigned channel) const
+{
+    if (channel)
+    {
+        return std::find(channels.begin(), channels.end(), channel) != channels.end();
+    }
+    else
+        return true;   // Kinda-sorta - perhaps not true if separated servers from agents, but even then child queries may access channel 0
+}
 const SocketEndpointArray CTopologyServer::nullArray;
 
 // Class TopologyManager (there is a single instance) handles interaction with topology servers
@@ -214,6 +224,7 @@ public:
     const ITopologyServer &getCurrent();
 
     bool update();
+    unsigned numServers() const { return topoServers.length(); }
 private:
     Owned<const ITopologyServer> currentTopology;
     SpinLock lock;
@@ -356,11 +367,15 @@ static std::thread topoThread;
 static Semaphore abortTopo;
 const unsigned topoUpdateInterval = 5000;
 
-extern UDPLIB_API void initializeTopology(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel)
+extern UDPLIB_API void initializeTopology(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles)
 {
     topologyManager.setServers(topoValues);
     topologyManager.setRoles(myRoles);
-    if (topoValues.length())
+}
+
+extern UDPLIB_API void publishTopology(unsigned traceLevel)
+{
+    if (topologyManager.numServers())
     {
         topoThread = std::thread([traceLevel]()
         {

+ 3 - 1
roxie/udplib/udptopo.hpp

@@ -102,6 +102,7 @@ interface ITopologyServer : public IInterface
     virtual const SocketEndpointArray &queryServers(unsigned port) const = 0;
     virtual const ChannelInfo &queryChannelInfo(unsigned channel) const = 0;
     virtual const std::vector<unsigned> &queryChannels() const = 0;
+    virtual bool implementsChannel(unsigned channel) const = 0;
 };
 
 extern UDPLIB_API unsigned getNumAgents(unsigned channel);
@@ -115,7 +116,8 @@ struct RoxieEndpointInfo
     unsigned replicationLevel;
 };
 
-extern UDPLIB_API void initializeTopology(const StringArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);
+extern UDPLIB_API void initializeTopology(const StringArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles);
+extern UDPLIB_API void publishTopology(unsigned traceLevel);
 #ifndef _CONTAINERIZED
 extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo> &allRoles, unsigned traceLevel);
 #endif

+ 5 - 0
system/jhtree/ctfile.cpp

@@ -544,6 +544,11 @@ char *CJHTreeNode::expandKeys(void *src,size32_t &retsize)
     return outkeys;
 }
 
+size32_t CJHTreeNode::getNodeSize() const
+{
+    return keyHdr->getNodeSize();
+}
+
 void CJHTreeNode::unpack(const void *node, bool needCopy)
 {
     memcpy(&hdr, node, sizeof(hdr));

+ 1 - 0
system/jhtree/ctfile.hpp

@@ -238,6 +238,7 @@ public:
     inline offset_t getRightSib() const { return hdr.rightSib; }
     inline offset_t getLeftSib() const { return hdr.leftSib; }
     unsigned __int64 getSequence(unsigned int num) const;
+    size32_t getNodeSize() const;
 };
 
 class CJHVarTreeNode : public CJHTreeNode 

+ 71 - 41
system/jhtree/jhtree.cpp

@@ -632,6 +632,16 @@ public:
         CJHTreeNode &node = mapping->queryElement();
         sizeInMem -= (FIXED_NODE_OVERHEAD+node.getMemSize());
     }
+    void reportEntries(ICacheInfoRecorder &cacheInfo)
+    {
+        Owned<CNodeMRUCache::CMRUIterator> iter = getIterator();
+        ForEach(*iter)
+        {
+            CNodeMapping &mapping = iter->query();
+            const CKeyIdAndPos &key = mapping.queryFindValue();
+            cacheInfo.noteWarm(key.keyId, key.pos, mapping.queryElement().getNodeSize());
+        }
+    }
 };
 
 class CNodeCache : public CInterface
@@ -656,10 +666,11 @@ public:
         preloadNodes = false;
         // note that each index caches the last blob it unpacked so that sequential blobfetches are still ok
     }
-    CJHTreeNode *getNode(INodeLoader *key, int keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
-    void preload(CJHTreeNode *node, int keyID, offset_t pos, IContextLogger *ctx);
+    CJHTreeNode *getNode(INodeLoader *key, unsigned keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
+    void getCacheInfo(ICacheInfoRecorder &cacheInfo);
+    void preload(CJHTreeNode *node, unsigned keyID, offset_t pos, IContextLogger *ctx);
 
-    bool isPreloaded(int keyID, offset_t pos);
+    bool isPreloaded(unsigned keyID, offset_t pos);
 
     inline bool getNodeCachePreload() 
     {
@@ -733,7 +744,6 @@ unsigned setKeyIndexCacheSize(unsigned limit)
 
 CKeyStore::CKeyStore() : keyIndexCache(defaultKeyIndexLimit)
 {
-    nextId = 0;
 #if 0
     mm.setown(createSharedMemoryManager("RichardsSharedMemManager", 0x100000));
     try
@@ -757,7 +767,7 @@ unsigned CKeyStore::setKeyCacheLimit(unsigned limit)
     return keyIndexCache.setCacheLimit(limit);
 }
 
-IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
+IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, unsigned fileIdx, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
 {
     // isTLK provided by caller since flags in key header unreliable. If either say it's a TLK, I believe it.
     {
@@ -774,15 +784,16 @@ IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile
             if (iMappedFile)
             {
                 assert(!iFileIO && !part);
-                keyIndex = new CMemKeyIndex(getUniqId(), LINK(iMappedFile), fname, isTLK);
+                keyIndex = new CMemKeyIndex(getUniqId(fileIdx), LINK(iMappedFile), fname, isTLK);
             }
             else if (iFileIO)
             {
                 assert(!part);
-                keyIndex = new CDiskKeyIndex(getUniqId(), LINK(iFileIO), fname, isTLK, allowPreload);
+                keyIndex = new CDiskKeyIndex(getUniqId(fileIdx), LINK(iFileIO), fname, isTLK, allowPreload);
             }
             else
             {
+                assert(fileIdx==(unsigned) -1);
                 Owned<IFile> iFile;
                 if (part)
                 {
@@ -794,7 +805,7 @@ IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile
                     iFile.setown(createIFile(fileName));
                 IFileIO *fio = iFile->open(IFOread);
                 if (fio)
-                    keyIndex = new CDiskKeyIndex(getUniqId(), fio, fname, isTLK, allowPreload);
+                    keyIndex = new CDiskKeyIndex(getUniqId(fileIdx), fio, fname, isTLK, allowPreload);
                 else
                     throw MakeStringException(0, "Failed to open index file %s", fileName);
             }
@@ -809,31 +820,26 @@ IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile
     }
 }
 
-IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload)
+IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, unsigned fileIdx, bool isTLK, bool allowPreload)
 {
-    return doload(fileName, crc, NULL, iFileIO, NULL, isTLK, allowPreload);
+    return doload(fileName, crc, NULL, iFileIO, fileIdx, NULL, isTLK, allowPreload);
 }
 
 IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
 {
-    return doload(fileName, crc, NULL, NULL, iMappedFile, isTLK, allowPreload);
-}
-
-// fileName+crc used only as key for cache
-IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload)
-{
-    return doload(fileName, crc, &part, NULL, NULL, isTLK, allowPreload);
+    return doload(fileName, crc, NULL, NULL, (unsigned) -1, iMappedFile, isTLK, allowPreload);
 }
 
 IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload)
 {
-    return doload(fileName, crc, NULL, NULL, NULL, isTLK, allowPreload);
+    return doload(fileName, crc, NULL, NULL, (unsigned) -1, NULL, isTLK, allowPreload);
 }
 
 StringBuffer &CKeyStore::getMetrics(StringBuffer &xml)
 {
     xml.append(" <IndexMetrics>\n");
 
+    synchronized block(mutex);
     Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
     ForEach(*iter)
     {           
@@ -940,7 +946,7 @@ void CKeyStore::clearCacheEntry(const IFileIO *io)
 
 // CKeyIndex impl.
 
-CKeyIndex::CKeyIndex(int _iD, const char *_name) : name(_name)
+CKeyIndex::CKeyIndex(unsigned _iD, const char *_name) : name(_name)
 {
     iD = _iD;
     cache = queryNodeCache(); // use one node cache for all key indexes;
@@ -977,7 +983,7 @@ void CKeyIndex::cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK)
 void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
 {
     if (isTLK)
-        hdr.ktype |= HTREE_TOPLEVEL_KEY; // thor does not set
+        hdr.ktype |= HTREE_TOPLEVEL_KEY; // Once upon a time, thor did not set
     keyHdr = new CKeyHdr();
     try
     {
@@ -1023,7 +1029,7 @@ CKeyIndex::~CKeyIndex()
     ::Release(rootNode);
 }
 
-CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
+CMemKeyIndex::CMemKeyIndex(unsigned _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
     : CKeyIndex(_iD, _name)
 {
     io.setown(_io);
@@ -1057,7 +1063,7 @@ CJHTreeNode *CMemKeyIndex::loadNode(offset_t pos)
     return CKeyIndex::loadNode(nodeData, pos, false);
 }
 
-CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
+CDiskKeyIndex::CDiskKeyIndex(unsigned _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
     : CKeyIndex(_iD, _name)
 {
     io.setown(_io);
@@ -1170,7 +1176,8 @@ IKeyCursor *CKeyIndex::getCursor(const IIndexFilterList *filter, bool logExcessi
 CJHTreeNode *CKeyIndex::getNode(offset_t offset, IContextLogger *ctx) 
 { 
     latestGetNodeOffset = offset;
-    return cache->getNode(this, iD, offset, ctx, isTopLevelKey()); 
+    CJHTreeNode *node = cache->getNode(this, iD, offset, ctx, isTopLevelKey());
+    return node;
 }
 
 void dumpNode(FILE *out, CJHTreeNode *node, int length, unsigned rowCount, bool raw)
@@ -1345,6 +1352,20 @@ IPropertyTree * CKeyIndex::getMetadata()
     return ret;
 }
 
+bool CKeyIndex::prewarmPage(offset_t offset)
+{
+    try
+    {
+        Owned<CJHTreeNode> page = loadNode(offset);
+        return page != nullptr;
+    }
+    catch(IException *E)
+    {
+        ::Release(E);
+    }
+    return false;
+}
+
 CJHTreeNode *CKeyIndex::locateFirstNode(KeyStatsCollector &stats)
 {
     keySeeks++;
@@ -2174,6 +2195,7 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 {
     StringAttr keyfile;
     unsigned crc; 
+    unsigned fileIdx;
     Linked<IDelayedFile> delayedFile;
     mutable Owned<IFileIO> iFileIO;
     mutable Owned<IKeyIndex> realKey;
@@ -2192,7 +2214,7 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
             else
             {
                 iFileIO.setown(delayedFile->getFileIO());
-                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
+                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, fileIdx, isTLK, preloadAllowed));
             }
             if (!realKey)
             {
@@ -2205,8 +2227,8 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 
 public:
     IMPLEMENT_IINTERFACE;
-    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
-        : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
+    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, unsigned _fileIdx, bool _isTLK, bool _preloadAllowed)
+        : keyfile(_keyfile), crc(_crc), fileIdx(_fileIdx), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
     {}
 
     virtual bool IsShared() const { return CInterface::IsShared(); }
@@ -2235,11 +2257,13 @@ public:
     virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
     virtual bool hasSpecialFileposition() const { return checkOpen().hasSpecialFileposition(); }
     virtual bool needsRowBuffer() const { return checkOpen().needsRowBuffer(); }
+    virtual bool prewarmPage(offset_t offset) { return checkOpen().prewarmPage(offset); }
+
 };
 
-extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
+extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, unsigned fileIdx, bool isTLK, bool preloadAllowed)
 {
-    return queryKeyStore()->load(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
+    return queryKeyStore()->load(keyfile, crc, &iFileIO, fileIdx, isTLK, preloadAllowed);
 }
 
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, bool isTLK, bool preloadAllowed)
@@ -2247,17 +2271,9 @@ extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc,
     return queryKeyStore()->load(keyfile, crc, isTLK, preloadAllowed);
 }
 
-extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed)
-{
-    StringBuffer filePath;
-    const RemoteFilename &rfn = part.queryCopies().item(0);
-    rfn.getPath(filePath);
-    return queryKeyStore()->load(filePath.str(), crc, part, isTLK, preloadAllowed);
-}
-
-extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, bool isTLK, bool preloadAllowed)
+extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, unsigned fileIdx, bool isTLK, bool preloadAllowed)
 {
-    return new CLazyKeyIndex(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
+    return new CLazyKeyIndex(keyfile, crc, &iFileIO, fileIdx, isTLK, preloadAllowed);
 }
 
 extern jhtree_decl void clearKeyStoreCache(bool killAll)
@@ -2305,12 +2321,26 @@ extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize)
     return queryNodeCache()->setBlobCacheMem(cacheSize);
 }
 
+extern jhtree_decl void getNodeCacheInfo(ICacheInfoRecorder &cacheInfo)
+{
+    // MORE - consider reporting root nodes of open IKeyIndexes too?
+    queryNodeCache()->getCacheInfo(cacheInfo);
+}
 
 ///////////////////////////////////////////////////////////////////////////////
 // CNodeCache impl.
 ///////////////////////////////////////////////////////////////////////////////
 
-CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IContextLogger *ctx, bool isTLK)
+void CNodeCache::getCacheInfo(ICacheInfoRecorder &cacheInfo)
+{
+    CriticalBlock block(lock);
+    preloadCache.reportEntries(cacheInfo); // Debatable whether we should include this
+    blobCache.reportEntries(cacheInfo);
+    nodeCache.reportEntries(cacheInfo);
+    leafCache.reportEntries(cacheInfo);
+}
+
+CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, unsigned iD, offset_t pos, IContextLogger *ctx, bool isTLK)
 {
     // MORE - could probably be improved - I think having the cache template separate is not helping us here
     // Also one cache per key would surely be faster, and could still use a global total
@@ -2428,7 +2458,7 @@ CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IC
     }
 }
 
-void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger *ctx)
+void CNodeCache::preload(CJHTreeNode *node, unsigned iD, offset_t pos, IContextLogger *ctx)
 {
     assertex(pos);
     assertex(preloadNodes);
@@ -2444,7 +2474,7 @@ void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger
     }
 }
 
-bool CNodeCache::isPreloaded(int iD, offset_t pos)
+bool CNodeCache::isPreloaded(unsigned iD, offset_t pos)
 {
     CriticalBlock block(lock);
     CKeyIdAndPos key(iD, pos);

+ 9 - 4
system/jhtree/jhtree.hpp

@@ -113,6 +113,7 @@ interface jhtree_decl IKeyIndex : public IKeyIndexBase
     virtual const IFileIO *queryFileIO() const = 0;
     virtual bool hasSpecialFileposition() const = 0;
     virtual bool needsRowBuffer() const = 0;
+    virtual bool prewarmPage(offset_t offset) = 0;
 };
 
 interface IKeyArray : extends IInterface
@@ -132,7 +133,11 @@ interface jhtree_decl IKeyIndexSet : public IKeyIndexBase
     virtual offset_t getTotalSize() = 0;
 };
 
-interface IReplicatedFile;
+interface ICacheInfoRecorder
+{
+    virtual void noteWarm(unsigned fileIdx, offset_t page, size32_t len) = 0;
+};
+
 
 extern jhtree_decl void clearKeyStoreCache(bool killAll);
 extern jhtree_decl void clearKeyStoreCacheEntry(const char *name);
@@ -145,11 +150,11 @@ extern jhtree_decl size32_t setNodeCacheMem(size32_t cacheSize);
 extern jhtree_decl size32_t setLeafCacheMem(size32_t cacheSize);
 extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize);
 
+extern jhtree_decl void getNodeCacheInfo(ICacheInfoRecorder &cacheInfo);
 
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *filename, unsigned crc, bool isTLK, bool preloadAllowed);
-extern jhtree_decl IKeyIndex *createKeyIndex(const char *filename, unsigned crc, IFileIO &ifile, bool isTLK, bool preloadAllowed);
-extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed);
-extern jhtree_decl IKeyIndex *createKeyIndex(const char *filename, unsigned crc, IDelayedFile &ifile, bool isTLK, bool preloadAllowed);
+extern jhtree_decl IKeyIndex *createKeyIndex(const char *filename, unsigned crc, IFileIO &ifile, unsigned fileIdx, bool isTLK, bool preloadAllowed);
+extern jhtree_decl IKeyIndex *createKeyIndex(const char *filename, unsigned crc, IDelayedFile &ifile, unsigned fileIdx, bool isTLK, bool preloadAllowed);
 
 extern jhtree_decl bool isIndexFile(const char *fileName);
 extern jhtree_decl bool isIndexFile(IFile *file);

+ 14 - 10
system/jhtree/jhtree.ipp

@@ -36,18 +36,21 @@ class CKeyStore
 {
 private:
     Mutex mutex;
-    Mutex idmutex;
     CKeyIndexMRUCache keyIndexCache;
-    int nextId;
-    int getUniqId() { synchronized procedure(idmutex); return ++nextId; }
-    IKeyIndex *doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload);
+    std::atomic<unsigned> nextId { 0x80000000 };
+    unsigned getUniqId(unsigned useId)
+    {
+        if (useId != (unsigned) -1)
+            return useId;
+        return ++nextId;
+    }
+    IKeyIndex *doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, unsigned fileIdx, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload);
 public:
     CKeyStore();
     ~CKeyStore();
     IKeyIndex *load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload);
-    IKeyIndex *load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload);
+    IKeyIndex *load(const char *fileName, unsigned crc, IFileIO *iFileIO, unsigned fileIdx, bool isTLK, bool allowPreload);
     IKeyIndex *load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload);
-    IKeyIndex *load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload);
     void clearCache(bool killAll);
     void clearCacheEntry(const char *name);
     void clearCacheEntry(const IFileIO *io);
@@ -76,7 +79,7 @@ private:
     CKeyIndex(CKeyIndex &);
 
 protected:
-    int iD;
+    unsigned iD;
     StringAttr name;
     CriticalSection blobCacheCrit;
     Owned<CJHTreeBlobNode> cachedBlobNode;
@@ -95,7 +98,7 @@ protected:
     CJHTreeBlobNode *getBlobNode(offset_t nodepos);
 
 
-    CKeyIndex(int _iD, const char *_name);
+    CKeyIndex(unsigned _iD, const char *_name);
     ~CKeyIndex();
     void init(KeyHdr &hdr, bool isTLK, bool allowPreload);
     void cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK);
@@ -135,6 +138,7 @@ public:
     virtual unsigned getNodeSize() { return keyHdr->getNodeSize(); }
     virtual bool hasSpecialFileposition() const;
     virtual bool needsRowBuffer() const;
+    virtual bool prewarmPage(offset_t page);
  
  // INodeLoader impl.
     virtual CJHTreeNode *loadNode(offset_t offset) = 0;
@@ -147,7 +151,7 @@ class jhtree_decl CMemKeyIndex : public CKeyIndex
 private:
     Linked<IMemoryMappedFile> io;
 public:
-    CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool _isTLK);
+    CMemKeyIndex(unsigned _iD, IMemoryMappedFile *_io, const char *_name, bool _isTLK);
 
     virtual const char *queryFileName() { return name.get(); }
     virtual const IFileIO *queryFileIO() const override { return nullptr; }
@@ -162,7 +166,7 @@ private:
     void cacheNodes(CNodeCache *cache, offset_t firstnode, bool isTLK);
     
 public:
-    CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool _isTLK, bool _allowPreload);
+    CDiskKeyIndex(unsigned _iD, IFileIO *_io, const char *_name, bool _isTLK, bool _allowPreload);
 
     virtual const char *queryFileName() { return name.get(); }
     virtual const IFileIO *queryFileIO() const override { return io; }

+ 1 - 1
system/jhtree/keydiff.cpp

@@ -221,7 +221,7 @@ public:
         keyFileIO.setown(keyFile->open(IFOread));
         if(!keyFileIO)
             throw MakeStringException(0, "Could not read index file %s", filename);
-        keyIndex.setown(createKeyIndex(filename, 0, *keyFileIO, false, false)); // MORE - should we care about crc?
+        keyIndex.setown(createKeyIndex(filename, 0, *keyFileIO, (unsigned) -1, false, false)); // MORE - should we care about crc?
         unsigned flags = keyIndex->getFlags();
         variableWidth = ((flags & HTREE_VARSIZE) == HTREE_VARSIZE);
         if((flags & HTREE_QUICK_COMPRESSED_KEY) == HTREE_QUICK_COMPRESSED_KEY)

+ 1 - 1
system/jlib/jsuperhash.hpp

@@ -433,7 +433,7 @@ public:
     HTMapping(ET &_et, const FP &_fp) : et(_et), fp(_fp) { }
     const void *queryFindParam() const { return &fp; }
     ET &queryElement() const { return et; }
-    FP &queryFindValue() const { return fp; }
+    const FP &queryFindValue() const { return fp; }
 };
 
 // template mapping object for base type to IInterface object

+ 77 - 0
system/jlib/jutil.cpp

@@ -3030,6 +3030,83 @@ int getEnum(const char *v, const EnumMapping *map, int defval)
 
 //---------------------------------------------------------------------------------------------------------------------
 
+extern jlib_decl offset_t friendlyStringToSize(const char *in)
+{
+    char *tail;
+    offset_t result = strtoull(in, &tail, 10);
+    offset_t scale = 1;
+    if (*tail)
+    {
+        if (tail[1] == '\0')
+        {
+            switch (*tail)
+            {
+            case 'K': scale = 1000; break;
+            case 'M': scale = 1000000; break;
+            case 'G': scale = 1000000000; break;
+            case 'T': scale = 1000000000000; break;
+            case 'P': scale = 1000000000000000; break;
+            case 'E': scale = 1000000000000000000; break;
+            default:
+                throw makeStringExceptionV(0, "Invalid size suffix %s", tail);
+            }
+        }
+        else if (streq(tail+1, "i"))
+        {
+            switch (*tail)
+            {
+            case 'K': scale = 1llu<<10; break;
+            case 'M': scale = 1llu<<20; break;
+            case 'G': scale = 1llu<<30; break;
+            case 'T': scale = 1llu<<40; break;
+            case 'P': scale = 1llu<<50; break;
+            case 'E': scale = 1llu<<60; break;
+            default:
+                throw makeStringExceptionV(0, "Invalid size suffix %s", tail);
+            }
+        }
+        else
+            throw makeStringExceptionV(0, "Invalid size suffix %s", tail);
+    }
+    return result * scale;
+}
+
+void jlib_decl atomicWriteFile(const char *fileName, const char *output)
+{
+    recursiveCreateDirectoryForFile(fileName);
+#ifdef _WIN32
+    StringBuffer newFileName;
+    makeTempCopyName(newFileName, fileName);
+    Owned<IFile> newFile = createIFile(newFileName);
+    Owned<IFile> file = createIFile(fileName);
+    {
+        OwnedIFileIO ifileio = newFile->open(IFOcreate);
+        if (!ifileio)
+            throw MakeStringException(0, "atomicWriteFile: could not create output file %s", newFileName.str());
+        ifileio->write(0, strlen(output), output);
+    }
+#else
+    VStringBuffer newFileName("%s.XXXXXX", fileName);
+    int fh = mkstemp(const_cast<char *>(newFileName.str()));
+    if (fh==-1)
+        throw MakeStringException(0, "atomicWriteFile: could not create output file for %s", fileName);
+    Owned<IFile> newFile = createIFile(newFileName);
+    Owned<IFile> file = createIFile(fileName);
+    {
+        OwnedIFileIO ifileio = createIFileIO(fh, IFOwrite);
+        if (!ifileio)
+            throw MakeStringException(0, "atomicWriteFile: could not create output file %s", newFileName.str());
+        ifileio->write(0, strlen(output), output);
+    }
+#endif
+    if (file->exists())
+        file->remove();
+    newFile->rename(fileName);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+
 //#define TESTURL
 #ifdef TESTURL
 

+ 6 - 0
system/jlib/jutil.hpp

@@ -66,6 +66,12 @@ int jlib_decl numtostr(char *dst, unsigned int value);
 int jlib_decl numtostr(char *dst, unsigned long value);
 int jlib_decl numtostr(char *dst, unsigned __int64 _value);
 
+// Translate "human readable" size strings like 4G to numbers
+extern jlib_decl offset_t friendlyStringToSize(const char *in);
+
+// Write a string as file contents, atomically
+extern void jlib_decl atomicWriteFile(const char *fileName, const char *output);
+
 #ifndef _WIN32
 /**
  * Return full path name of a currently loaded dll that matches the supplied tail

+ 39 - 0
testing/unittests/jlibtests.cpp

@@ -30,6 +30,7 @@
 #include "jlzw.hpp"
 #include "jqueue.hpp"
 #include "jregexp.hpp"
+#include "jutil.hpp"
 
 #include "unittests.hpp"
 
@@ -2377,5 +2378,43 @@ public:
 CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionTestsStress );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionTestsStress, "JlibCompressionTestsStress" );
 
+class JlibFriendlySizeTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(JlibFriendlySizeTest);
+        CPPUNIT_TEST(test);
+    CPPUNIT_TEST_SUITE_END();
+
+public:
+    void test()
+    {
+        CPPUNIT_ASSERT(friendlyStringToSize("0") == 0);
+        CPPUNIT_ASSERT(friendlyStringToSize("2000") == 2000);
+        CPPUNIT_ASSERT(friendlyStringToSize("1K") == 1000);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Ki") == 2048);
+        CPPUNIT_ASSERT(friendlyStringToSize("1M") == 1000000);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Mi") == 2048*1024);
+        CPPUNIT_ASSERT(friendlyStringToSize("1G") == 1000000000);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Gi") == 2048llu*1024*1024);
+        CPPUNIT_ASSERT(friendlyStringToSize("1T") == 1000000000000ll);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Ti") == 2048llu*1024*1024*1024);
+        CPPUNIT_ASSERT(friendlyStringToSize("1P") == 1000000000000000ll);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Pi") == 2048llu*1024*1024*1024*1024);
+        CPPUNIT_ASSERT(friendlyStringToSize("1E") == 1000000000000000000ll);
+        CPPUNIT_ASSERT(friendlyStringToSize("2Ei") == 2048llu*1024*1024*1024*1024*1024);
+        try
+        {
+            friendlyStringToSize("1Kb");
+            CPPUNIT_ASSERT(false);
+        }
+        catch (IException *E)
+        {
+            E->Release();
+        }
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( JlibFriendlySizeTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibFriendlySizeTest, "JlibFriendlySizeTest" );
+
 
 #endif // _USE_CPPUNIT

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2505,7 +2505,7 @@ public:
         // NB: this TLK is an in-memory TLK serialized from the master - the name is for tracing by the key code only
         VStringBuffer name("index");
         name.append(queryId()).append("_tlk");
-        lookup = new CKeyLookup(*this, helper, createKeyIndex(name.str(), 0, *iFileIO, true, false)); // MORE - crc is not 0...
+        lookup = new CKeyLookup(*this, helper, createKeyIndex(name.str(), 0, *iFileIO, (unsigned) -1, true, false)); // MORE - crc is not 0...
         ihash = lookup;
     }
 };

+ 1 - 1
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -306,7 +306,7 @@ public:
                 StringBuffer path;
                 rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
 
-                Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, false, false);
+                Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false, false);
                 Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, nullptr, helper->hasNewSegmentMonitors(), false);
                 if (localMerge)
                 {

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -1637,7 +1637,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
             Owned<IFileIO> lazyFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
             Owned<IDelayedFile> delayedFile = createDelayedFile(lazyFileIO);
-            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
+            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, false);
             keyIndexes.append(*keyIndex.getClear());
         }
     }
@@ -1960,7 +1960,7 @@ public:
                     Owned<IFileIO> iFileIO = createIFileI(lenArray.item(p), tlkMb.toByteArray()+posArray.item(p));
                     StringBuffer name("TLK");
                     name.append('_').append(container.queryId()).append('_');
-                    tlkKeySet->addIndex(createKeyIndex(name.append(p).str(), 0, *iFileIO, true, false)); // MORE - not the right crc
+                    tlkKeySet->addIndex(createKeyIndex(name.append(p).str(), 0, *iFileIO, (unsigned) -1, true, false)); // MORE - not the right crc
                 }
             }
             if (needsDiskRead)

+ 3 - 3
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -2334,7 +2334,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
         {
             Owned<IFileIO> lazyFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
             Owned<IDelayedFile> delayedFile = createDelayedFile(lazyFileIO);
-            return createKeyIndex(filename, crc, *delayedFile, false, false);
+            return createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, false);
         }
         else
         {
@@ -2343,7 +2343,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
              * The underlying IFileIO can later be closed by fhe file caching mechanism.
              */
             Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
-            return createKeyIndex(filename, crc, *lazyIFileIO, false, false);
+            return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, false);
         }
     }
     IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy)
@@ -2973,7 +2973,7 @@ public:
                     Owned<IFileIO> iFileIO = createIFileI(lenArray.item(p), tlkMb.toByteArray()+posArray.item(p));
                     StringBuffer name("TLK");
                     name.append('_').append(container.queryId()).append('_');
-                    Owned<IKeyIndex> tlkKeyIndex = createKeyIndex(name.append(p).str(), 0, *iFileIO, true, false); // MORE - not the right crc
+                    Owned<IKeyIndex> tlkKeyIndex = createKeyIndex(name.append(p).str(), 0, *iFileIO, (unsigned) -1, true, false); // MORE - not the right crc
                     tlkKeyIndexes.append(*tlkKeyIndex.getClear());
                 }
             }