浏览代码

HPCC-9017 Try not to use kernel page cache for roxie copies, update3

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 11 年之前
父节点
当前提交
780b5f4dbf

+ 5 - 5
common/remote/hooks/git/gitfile.cpp

@@ -184,7 +184,7 @@ public:
             return notFound;
         return foundYes;
     }
-    virtual IFileIO * open(IFOmode mode)
+    virtual IFileIO * open(IFOmode mode, IFEflags extraFlags=IFEnone)
     {
         assertex(mode==IFOread && isExisting);
         return new GitRepositoryFileIO(gitDirectory, revision, relFileName);
@@ -193,7 +193,7 @@ public:
     {
         UNIMPLEMENTED;
     }
-    virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode)
+    virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags=IFEnone)
     {
         assertex(mode==IFOread && isExisting);
         return new GitRepositoryFileIO(gitDirectory, revision, relFileName);
@@ -247,10 +247,10 @@ public:
                                   unsigned checkinterval=60*1000,
                                   unsigned timeout=(unsigned)-1,
                                   Semaphore *abortsem=NULL)  { UNIMPLEMENTED; }
-    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL) { UNIMPLEMENTED; }
-    virtual void copyTo(IFile *dest, size32_t buffersize=0x100000, ICopyFileProgress *progress=NULL, bool usetmp=false) { UNIMPLEMENTED; }
+    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
+    virtual void copyTo(IFile *dest, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL, bool usetmp=false, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
     virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs=0, memsize_t len=(memsize_t)-1, bool write=false)  { UNIMPLEMENTED; }
-    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false) { UNIMPLEMENTED; }
+    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false,CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
 
 
 protected:

+ 5 - 5
common/remote/hooks/libarchive/archive.cpp

@@ -336,7 +336,7 @@ public:
             return notFound;
         return foundYes;
     }
-    virtual IFileIO * open(IFOmode mode)
+    virtual IFileIO * open(IFOmode mode, IFEflags extraFlags=IFEnone)
     {
         assertex(mode==IFOread && entry != NULL);
         return new ArchiveFileIO(fullName.str());
@@ -345,7 +345,7 @@ public:
     {
         UNIMPLEMENTED;
     }
-    virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode)
+    virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags=IFEnone)
     {
         assertex(mode==IFOread && entry != NULL);
         return new ArchiveFileIO(fullName.str());
@@ -401,10 +401,10 @@ public:
                                   unsigned checkinterval=60*1000,
                                   unsigned timeout=(unsigned)-1,
                                   Semaphore *abortsem=NULL)  { UNIMPLEMENTED; }
-    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL) { UNIMPLEMENTED; }
-    virtual void copyTo(IFile *dest, size32_t buffersize=0x100000, ICopyFileProgress *progress=NULL, bool usetmp=false) { UNIMPLEMENTED; }
+    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
+    virtual void copyTo(IFile *dest, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL, bool usetmp=false, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
     virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs=0, memsize_t len=(memsize_t)-1, bool write=false)  { UNIMPLEMENTED; }
-    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false) { UNIMPLEMENTED; }
+    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false,CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
 
 
 protected:

+ 20 - 18
common/remote/sockfile.cpp

@@ -797,7 +797,7 @@ static Semaphore                 treeCopySem;
 
 #define DEBUGSAMEIP false
 
-static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler)
+static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const char *net, const char *mask, IpAddress &ip, bool usetmp, CThrottler *throttler, CFflags copyFlags=CFnone)
 {
     unsigned start = msTick();
     Owned<IFile> dstfile = createIFile(dstfn);
@@ -873,7 +873,7 @@ static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const cha
                                 PROGLOG("TREECOPY(started) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
                             {
                                 CriticalUnblock unblock(treeCopyCrit); // note we have tc linked
-                                rmtfile->copyTo(dstfile,0x100000,NULL,usetmp);
+                                rmtfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
                             }
                             if (TF_TRACE_TREE_COPY)
                                 PROGLOG("TREECOPY(done) %s to %s",rmtfile->queryFilename(),dstfile->queryFilename());
@@ -928,7 +928,7 @@ static void treeCopyFile(RemoteFilename &srcfn, RemoteFilename &dstfn, const cha
         PROGLOG("TREECOPY(started,fallback) %s to %s",srcfile->queryFilename(),dstfile->queryFilename());
     try {
         GetHostIp(ip);
-        srcfile->copyTo(dstfile,0x100000,NULL,usetmp);
+        srcfile->copyTo(dstfile,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
     }
     catch (IException *e) {
         EXCLOG(e,"TREECOPY(done,fallback)");
@@ -1702,8 +1702,8 @@ public:
         return (fileBool)ret;
     }
 
-    IFileIO * open(IFOmode mode);
-    IFileIO * openShared(IFOmode mode,IFSHmode shmode);
+    IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone);
+    IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
     IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported
 
     const char * queryFilename()
@@ -2028,7 +2028,7 @@ public:
         return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception
     }
 
-    void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress)
+    void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags=CFnone)
     {
         StringBuffer uuid;
         genUUID(uuid,true);
@@ -2036,14 +2036,14 @@ public:
         while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout));
     }
 
-    void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp);
+    void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
 
     virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write)
     {
         return NULL;
     }
 
-    void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp)
+    void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp,CFflags copyFlags=CFnone)
     {
         resfrom.ipset(NULL);
         MemoryBuffer sendBuffer;
@@ -2078,7 +2078,7 @@ public:
             resfrom.ipset(ep);
             StringBuffer tmp;
             WARNLOG("dafilesrv on %s does not support treeCopyTo - falling back to copyTo",resfrom.getIpText(tmp).str());
-            copyTo(dest,0x100000,NULL,usetmp);
+            copyTo(dest,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
             status = 0;
         }
         else if (status==0)
@@ -2207,6 +2207,7 @@ protected:
     RemoteFileIOHandle  handle;
     IFOmode mode;
     compatIFSHmode compatmode;
+    IFEflags extraFlags;
     bool disconnectonexit;
 public:
     IMPLEMENT_IINTERFACE
@@ -2252,7 +2253,7 @@ public:
         }
     }
 
-    bool open(IFOmode _mode,compatIFSHmode _compatmode) 
+    bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
     {
         MemoryBuffer sendBuffer;
         initSendBuffer(sendBuffer);
@@ -2276,6 +2277,7 @@ public:
             mode = _mode;
         }
         compatmode = _compatmode;
+        extraFlags = _extraFlags;
         return true;
     }
 
@@ -2283,7 +2285,7 @@ public:
     {
         StringBuffer s;
         PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str());
-        if (open(mode,compatmode)) {
+        if (open(mode,compatmode,extraFlags)) {
             return true;
         }
         return false;
@@ -2446,7 +2448,7 @@ void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set)
 
 
 
-IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode)
+IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags)
 {
     assertex(((unsigned)shmode&0xffffffc7)==0);
     compatIFSHmode compatmode;
@@ -2462,19 +2464,19 @@ IFileIO * CRemoteFile::openShared(IFOmode mode,IFSHmode shmode)
     else
         compatmode = compatIFSHread;
     Owned<CRemoteFileIO> res = new CRemoteFileIO(this);
-    if (res->open(mode,compatmode))
+    if (res->open(mode,compatmode,extraFlags))
         return res.getClear();
     return NULL;
 }
 
-IFileIO * CRemoteFile::open(IFOmode mode)
+IFileIO * CRemoteFile::open(IFOmode mode,IFEflags extraFlags)
 {
-    return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)));
+    return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)),extraFlags);
 }
 
 //---------------------------------------------------------------------------
 
-void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp)
+void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags)
 {
     CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile);
     if (dstfile&&!dstfile->queryEp().isLocal()) {
@@ -2490,7 +2492,7 @@ void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *pr
         destf.setown(createIFile(dest));
         try {
             // following may fail if new dafilesrv not deployed on src
-            copySection(dest,(offset_t)-1,0,(offset_t)-1,progress);
+            copySection(dest,(offset_t)-1,0,(offset_t)-1,progress,copyFlags);
             if (usetmp) {
                 StringAttr tail(pathTail(dstfile->queryLocalName()));
                 dstfile->remove();
@@ -2542,7 +2544,7 @@ void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *pr
             return got;
         }
     } intercept;
-    doCopyFile(dest,this,buffersize,progress,&intercept,usetmp);
+    doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
 }
 
 

+ 1 - 1
dali/datest/datest.cpp

@@ -2484,7 +2484,7 @@ NULL
 #else
     out = fileno(stdout);
 #endif
-    Owned<IFileIO> stdOutFileIO = createIFileIO(out);
+    Owned<IFileIO> stdOutFileIO = createIFileIO(out,IFOwrite);
     if (testParams.ordinality())
     {
         newFileName = testParams.item(0);

+ 4 - 2
dali/ft/fttransform.cpp

@@ -92,7 +92,8 @@ size32_t CTransformer::read(size32_t maxLength, void * buffer)
 bool CTransformer::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey)
 {
     CTransformerBase::setPartition(remoteInputName, _startOffset, _length);
-    input.setown(inputFile->open(IFOread));
+    // if we want spray to not fill page cache use IFEnocache
+    input.setown(inputFile->open(IFOread,IFEnone));
     if (compressedInput) {                          
         Owned<IExpander> expander;
         if (decryptKey&&*decryptKey) {
@@ -823,7 +824,8 @@ processedProgress:
                 throw MakeOsException(GetLastError(), "Failed to create directory for file: %s", localFilename.str());
 
             OwnedIFile outFile = createIFile(localFilename.str());
-            OwnedIFileIO outio = outFile->openShared(IFOcreate,IFSHnone);
+            // if we want spray to not fill page cache use IFEnocache
+            OwnedIFileIO outio = outFile->openShared(IFOcreate,IFSHnone,IFEnone);
             if (!outio)
                 throwError1(DFTERR_CouldNotCreateOutput, localFilename.str());
             if (compressOutput) {

+ 2 - 0
initfiles/etc/DIR_NAME/environment.conf.in

@@ -19,3 +19,5 @@ blockname=${DIR_NAME}
 interface=*
 # enable epoll method for notification events (true/false)
 use_epoll=true
+# allow kernel pagecache flushing where enabled (true/false)
+allow_pgcache_flush=true

+ 9 - 9
roxie/ccd/ccdfile.cpp

@@ -706,7 +706,7 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
         }
     }
 
-    bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg)
+    bool doCopyFile(ILazyFileIO *f, const char *tempFile, const char *targetFilename, const char *destPath, const char *msg, CFflags copyFlags=CFnone)
     {
         bool fileCopied = false;
         IFile *sourceFile;
@@ -755,16 +755,16 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
                         str.appendf("doCopyFile %s", sourceFile->queryFilename());
                         TimeSection timing(str.str());
                         if (useTreeCopy)
-                            sourceFile->treeCopyTo(destFile, subnet, fromip, true);
+                            sourceFile->treeCopyTo(destFile, subnet, fromip, true, copyFlags);
                         else
-                            sourceFile->copyTo(destFile);
+                            sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
                     }
                     else
                     {
                         if (useTreeCopy)
-                            sourceFile->treeCopyTo(destFile, subnet, fromip, true);
+                            sourceFile->treeCopyTo(destFile, subnet, fromip, true, copyFlags);
                         else
-                            sourceFile->copyTo(destFile);
+                            sourceFile->copyTo(destFile,DEFAULT_COPY_BLKSIZE,NULL,false,copyFlags);
                     }
                 }
                 f->setCopying(false);
@@ -822,7 +822,7 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
         return fileCopied;
     }
 
-    bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage)
+    bool doCopy(ILazyFileIO *f, bool background, bool displayFirstFileMessage, CFflags copyFlags=CFnone)
     {
         if (!f->isRemote())
             f->copyComplete();
@@ -846,7 +846,7 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
             
             tempFile.append(".$$$");
             const char *msg = background ? "Background copy" : "Copy";
-            return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg);
+            return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg, copyFlags);
         }
         return false;  // if we get here there was no file copied
     }
@@ -951,7 +951,7 @@ public:
                 {
                     try
                     {
-                        fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false);
+                        fileCopied = doCopy(next, true, (fileCopiedCount==0) ? true : false, CFflush_rdwr);
                         CriticalBlock b(crit);
                         if (fileCopied)
                             fileCopiedCount++;
@@ -1150,7 +1150,7 @@ public:
                         if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
                         {
                             ret->checkOpen();
-                            doCopy(ret, false, false);
+                            doCopy(ret, false, false, CFflush_rdwr);
                             return ret.getLink();
                         }
 

+ 10 - 1
system/jlib/jatomic.hpp

@@ -51,6 +51,7 @@ typedef volatile long atomic_t;
 #define atomic_set(v,i)                 ((*v) = (i))
 #define atomic_xchg(i, v)               InterlockedExchange(v, i)
 #define atomic_add(v,i)                 InterlockedExchangeAdd(v,i)
+#define atomic_add_and_read(v,i)        InterlockedAdd(v,i)
 #define atomic_add_exchange(v, i)       InterlockedExchangeAdd(v,i)
 #define atomic_xchg_ptr(p, v)           InterlockedExchangePointer(v,p)
 #if defined (_MSC_VER) && (_MSC_VER <= 1200)
@@ -111,10 +112,16 @@ static __inline__ int atomic_xchg(int i, atomic_t *v)
 
 static __inline__ void atomic_add(atomic_t *v,int i)
 {
-    // (*v)+=i;
+    // (*v) += i;
     __sync_add_and_fetch(&v->counter,i);
 }
 
+static __inline__ int atomic_add_and_read(atomic_t *v,int i)
+{
+    // (*v) += i; return *v;
+    return __sync_add_and_fetch(&v->counter,i);
+}
+
 static __inline__ int atomic_add_exchange(atomic_t *v,int i)
 {
     // int ret = *v; (*v) += i; return ret;
@@ -149,6 +156,7 @@ int jlib_decl poor_atomic_dec_and_read(atomic_t * v);
 bool jlib_decl poor_atomic_inc_and_test(atomic_t * v);
 int jlib_decl poor_atomic_xchg(int i, atomic_t * v);
 void jlib_decl poor_atomic_add(atomic_t * v, int i);
+int jlib_decl poor_atomic_add_and_read(atomic_t * v, int i);
 int jlib_decl poor_atomic_add_exchange(atomic_t * v, int i);
 bool jlib_decl poor_atomic_cas(atomic_t * v, int newvalue, int expectedvalue);
 void jlib_decl *poor_atomic_xchg_ptr(void *p, void **v);
@@ -165,6 +173,7 @@ void jlib_decl poor_compiler_memory_barrier();
 #define atomic_set(v,i)                 ((*v) = (i))
 #define atomic_xchg(i, v)               poor_atomic_xchg(i, v)
 #define atomic_add(v,i)                 poor_atomic_add(v, i)
+#define atomic_add_and_read(v,i)        poor_atomic_add_and_read(v, i)
 #define atomic_add_exchange(v, i)       poor_atomic_add_exchange(v, i)
 #define atomic_cas(v,newvalue,expectedvalue)    poor_atomic_cas(v,newvalue,expectedvalue)
 #define atomic_xchg_ptr(p, v)               poor_atomic_xchg_ptr(p, v)

+ 135 - 42
system/jlib/jfile.cpp

@@ -56,6 +56,8 @@
 #include "jregexp.hpp"
 #include "portlist.h"
 
+#include "build-config.h"
+#include "jprop.hpp"
 
 // #define REMOTE_DISCONNECT_ON_DESTRUCTOR  // enable to disconnect on IFile destructor
                                             // this should not be enabled in WindowRemoteDirectory used
@@ -80,8 +82,11 @@
 #define NULLFILE -1
 #endif
 
+// #define CFILEIOTRACE 1
+
 static IFile *createIFileByHook(const RemoteFilename & filename);
 static IFile *createContainedIFileByHook(const char *filename);
+static inline bool isPCFlushAllowed();
 
 static char ShareChar='$';
 
@@ -646,15 +651,19 @@ HANDLE CFile::openHandle(IFOmode mode, IFSHmode sharemode, bool async, int stdh)
         handle = NULLFILE;
         throw MakeErrnoException(EISDIR, "CFile::open %s", filename.get());
     }
-    
+
+#ifdef CFILEIOTRACE
+    DBGLOG("CFile::openHandle(%s,%d) returns %d", filename.get(), mode, handle);
+#endif
+
 #endif
     return handle;
 }
 
-IFileIO * CFile::open(IFOmode mode)
+IFileIO * CFile::open(IFOmode mode,IFEflags extraFlags)
 {
     // we may want mode dependant defaults later 
-    return openShared(mode,(IFSHmode)(flags&(IFSHfull|IFSHread)));
+    return openShared(mode,(IFSHmode)(flags&(IFSHfull|IFSHread)),extraFlags);
 }
 
 
@@ -822,28 +831,33 @@ bool CFile::fastCopyFile(CFile &target, size32_t buffersize, ICopyFileProgress *
 }
 
 
-void CFile::copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress)
+void CFile::copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags)
 {
     // check to see if src and target are remote
 
-
     Owned<IFile> target = createIFile(dest);
-    const size32_t buffersize = 0x100000;
+    const size32_t buffersize = DEFAULT_COPY_BLKSIZE;
     IFOmode omode = IFOwrite;
     if (toOfs==(offset_t)-1) {
         if (fromOfs==0) {
-            copyFile(target,this,buffersize,progress);
+            copyFile(target,this,buffersize,progress,copyFlags);
             return;
         }
         omode = IFOcreate;
         toOfs = 0;
     }
-    OwnedIFileIO targetIO = target->open(IFOwrite);
+    IFEflags tgtFlags = IFEnone;
+    if (copyFlags & CFflush_write)
+        tgtFlags = IFEnocache;
+    OwnedIFileIO targetIO = target->open(IFOwrite, tgtFlags);
     if (!targetIO)
         throw MakeStringException(-1, "copyFile: target path '%s' could not be created", target->queryFilename());
     MemoryAttr mb;
     void * buffer = mb.allocate(buffersize);
-    OwnedIFileIO sourceIO = open(IFOread);
+    IFEflags srcFlags = IFEnone;
+    if (copyFlags & CFflush_read)
+        srcFlags = IFEnocache;
+    OwnedIFileIO sourceIO = open(IFOread, srcFlags);
     if (!sourceIO)
         throw MakeStringException(-1, "copySection: source '%s' not found", queryFilename());
     
@@ -880,9 +894,9 @@ void CFile::copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fro
     }
 }
 
-void CFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress,bool usetmp)
+void CFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress,bool usetmp,CFflags copyFlags)
 {
-    doCopyFile(dest,this,buffersize,progress,NULL,usetmp);
+    doCopyFile(dest,this,buffersize,progress,NULL,usetmp,copyFlags);
 }
 
 
@@ -1087,7 +1101,8 @@ public:
 
     void unlock()
     {
-        if (handle!=NULLFILE) {
+        if (handle!=NULLFILE)
+        {
 #ifdef _WIN32
             OVERLAPPED overlapped;
             memset(&overlapped,0,sizeof(overlapped));
@@ -1332,15 +1347,15 @@ public:
             ok = ifile->isFile();
         return ok;
     }
-    virtual IFileIO * open(IFOmode mode)
+    virtual IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone)
     {
         connect();
-        return ifile->open(mode);
+        return ifile->open(mode,extraFlags);
     }
-    virtual IFileIO * openShared(IFOmode mode,IFSHmode shared)
+    virtual IFileIO * openShared(IFOmode mode,IFSHmode shared,IFEflags extraFlags=IFEnone)
     {
         connect();
-        return ifile->openShared(mode,shared);
+        return ifile->openShared(mode,shared,extraFlags);
     }
     virtual IFileAsyncIO * openAsync(IFOmode mode)
     {
@@ -1449,9 +1464,9 @@ public:
 #endif
     }
 
-    void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp)
+    void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp,CFflags copyFlags)
     {
-        doCopyFile(dest,this,buffersize,progress,NULL,usetmp);
+        doCopyFile(dest,this,buffersize,progress,NULL,usetmp,copyFlags);
     }
 
 
@@ -1504,10 +1519,10 @@ public:
         ifile->setShareMode(shmode);
     }
 
-    void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress)
+    void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags)
     {
         connect();
-        ifile->copySection(dest,toOfs,fromOfs,size,progress);
+        ifile->copySection(dest,toOfs,fromOfs,size,progress,copyFlags);
     }
 
 
@@ -1517,11 +1532,11 @@ public:
         return NULL;
     }
 
-    void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom, bool usetmp)
+    void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom, bool usetmp, CFflags copyFlags)
     {
         // no special action for windows
         GetHostIp(resfrom);
-        copyTo(dest,0x100000,NULL,usetmp);
+        copyTo(dest,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
     }
 
 
@@ -1635,8 +1650,8 @@ class jlib_decl CSequentialFileIO : public CFileIO
     }
 
 public:
-    CSequentialFileIO(HANDLE h,IFSHmode _sharemode)
-        : CFileIO(h,_sharemode)
+    CSequentialFileIO(HANDLE h,IFOmode _openmode,IFSHmode _sharemode,IFEflags _extraFlags)
+        : CFileIO(h,_openmode,_sharemode,_extraFlags)
     {
         pos = 0;
     }
@@ -1695,7 +1710,7 @@ public:
 
 };
 
-IFileIO * CFile::openShared(IFOmode mode,IFSHmode share)
+IFileIO * CFile::openShared(IFOmode mode,IFSHmode share,IFEflags extraFlags)
 {
     int stdh = stdIoHandle(filename);
     HANDLE handle = openHandle(mode,share,false, stdh);
@@ -1705,8 +1720,8 @@ IFileIO * CFile::openShared(IFOmode mode,IFSHmode share)
     set_inherit(handle, false);
 #endif
     if (stdh>=0)
-        return new CSequentialFileIO(handle,share);
-    return new CFileIO(handle,share);
+        return new CSequentialFileIO(handle,mode,share,extraFlags);
+    return new CFileIO(handle,mode,share,extraFlags);
 }
 
 
@@ -1714,9 +1729,9 @@ IFileIO * CFile::openShared(IFOmode mode,IFSHmode share)
 //---------------------------------------------------------------------------
 
 
-extern jlib_decl IFileIO *createIFileIO(HANDLE handle)
+extern jlib_decl IFileIO *createIFileIO(HANDLE handle,IFOmode openmode,IFEflags extraFlags)
 {
-    return new CFileIO(handle,IFSHfull);
+    return new CFileIO(handle,openmode,IFSHfull,extraFlags);
 }
 
 offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
@@ -1749,12 +1764,20 @@ offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
 #ifdef _WIN32
 
 //-- Windows implementation -------------------------------------------------
-CFileIO::CFileIO(HANDLE handle, IFSHmode _sharemode)
+
+CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags _extraFlags)
 {
     assertex(handle != NULLFILE);
     throwOnError = false;
     file = handle;
     sharemode = _sharemode;
+    openmode = _openmode;
+    extraFlags = _extraFlags; // page cache flush option silently ignored on Windows for now
+    if (extraFlags & IFEnocache)
+        if (!isPCFlushAllowed())
+            extraFlags = static_cast<IFEflags>(extraFlags & ~IFEnocache);
+    atomic_set(&bytesRead, 0);
+    atomic_set(&bytesWritten, 0);
 }
 
 CFileIO::~CFileIO()
@@ -1846,12 +1869,23 @@ void CFileIO::setSize(offset_t pos)
 //-- Unix implementation ----------------------------------------------------
 
 // More errorno checking TBD
-CFileIO::CFileIO(HANDLE handle, IFSHmode _sharemode)
+CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags _extraFlags)
 {
     assertex(handle != NULLFILE);
     throwOnError = false;
     file = handle;
     sharemode = _sharemode;
+    openmode = _openmode;
+    extraFlags = _extraFlags;
+    if (extraFlags & IFEnocache)
+        if (!isPCFlushAllowed())
+            extraFlags = static_cast<IFEflags>(extraFlags & ~IFEnocache);
+    atomic_set(&bytesRead, 0);
+    atomic_set(&bytesWritten, 0);
+
+#ifdef CFILEIOTRACE
+    DBGLOG("CFileIO::CfileIO(%d,%d,%d,%d)", handle, _openmode, _sharemode, _extraFlags);
+#endif
 }
 
 CFileIO::~CFileIO()
@@ -1869,7 +1903,17 @@ CFileIO::~CFileIO()
 
 void CFileIO::close()
 {
-    if (file != NULLFILE) {
+    if (file != NULLFILE)
+    {
+#ifdef CFILEIOTRACE
+        DBGLOG("CFileIO::close(%d), extraFlags = %d", file, extraFlags);
+#endif
+        if (extraFlags & IFEnocache)
+        {
+            if (openmode != IFOread)
+                fdatasync(file);
+            posix_fadvise(file, 0, 0, POSIX_FADV_DONTNEED);
+        }
         if (::close(file) < 0)
             throw MakeErrnoException(errno,"CFileIO::close");
         file=NULLFILE;
@@ -1885,6 +1929,8 @@ void CFileIO::flush()
     if (fdatasync(file) != 0)
 #endif
         throw MakeOsException(DISK_FULL_EXCEPTION_CODE,"CFileIO::flush");
+    if (extraFlags & IFEnocache)
+        posix_fadvise(file, 0, 0, POSIX_FADV_DONTNEED);
 }
 
 
@@ -1900,7 +1946,16 @@ offset_t CFileIO::size()
 size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 {
     if (0==len) return 0;
-    return checked_pread(file, data, len, pos);
+    size32_t ret = checked_pread(file, data, len, pos);
+    if ( (extraFlags & IFEnocache) && (ret > 0) )
+    {
+        if (atomic_add_and_read(&bytesRead, ret) >= PGCFLUSH_BLKSIZE)
+        {
+            atomic_set(&bytesRead, 0);
+            posix_fadvise(file, 0, 0, POSIX_FADV_DONTNEED);
+        }
+    }
+    return ret;
 }
 
 void CFileIO::setPos(offset_t newPos)
@@ -1917,6 +1972,17 @@ size32_t CFileIO::write(offset_t pos, size32_t len, const void * data)
     }
     if (ret<len)
         throw MakeOsException(DISK_FULL_EXCEPTION_CODE,"CFileIO::write");
+    if ( (extraFlags & IFEnocache) && (ret > 0) )
+    {
+        if (atomic_add_and_read(&bytesWritten, ret) >= PGCFLUSH_BLKSIZE)
+        {
+            atomic_set(&bytesWritten, 0);
+            // non-blocking request to commit dirty pages [or block with fdatasync()]
+            sync_file_range(file, 0, 0, SYNC_FILE_RANGE_WRITE);
+            // flush previously committed dirty pages
+            posix_fadvise(file, 0, 0, POSIX_FADV_DONTNEED);
+        }
+    }
     return ret;
 }
 
@@ -2234,7 +2300,8 @@ CFileAsyncIO::CFileAsyncIO(HANDLE handle, IFSHmode _sharemode)
 
 void CFileAsyncIO::close()
 {
-    if (file != NULLFILE) {
+    if (file != NULLFILE)
+    {
         aio_cancel(file,NULL);
         if (_lclose(file) < 0)
             throw MakeErrnoException(errno, "CFileAsyncIO::close");
@@ -2638,6 +2705,26 @@ public:
 
 //-- Helper routines --------------------------------------------------------
 
+enum GblFlushEnum { FLUSH_INIT, FLUSH_DISALLOWED, FLUSH_ALLOWED };
+static GblFlushEnum gbl_flush_allowed = FLUSH_INIT;
+static CriticalSection flushsect;
+
+static inline bool isPCFlushAllowed()
+{
+    CriticalBlock block(flushsect);
+    if (gbl_flush_allowed == FLUSH_INIT)
+    {
+        Owned<IProperties> conf = createProperties(CONFIG_DIR PATHSEPSTR "environment.conf", true);
+        if (conf->getPropBool("allow_pgcache_flush", true))
+            gbl_flush_allowed = FLUSH_ALLOWED;
+        else
+            gbl_flush_allowed = FLUSH_DISALLOWED;
+    }
+    if (gbl_flush_allowed == FLUSH_ALLOWED)
+        return true;
+    return false;
+}
+
 static inline size32_t doread(IFileIOStream * stream,void *dst, size32_t size)
 {
     size32_t toread=size;
@@ -2721,7 +2808,7 @@ size32_t read(IFileIO * in, offset_t pos, size32_t len, MemoryBuffer & buffer)
     return lenRead;
 }
 
-void copyFile(const char *target, const char *source, size32_t buffersize, ICopyFileProgress *progress)
+void copyFile(const char *target, const char *source, size32_t buffersize, ICopyFileProgress *progress, CFflags copyFlags)
 {
     OwnedIFile src = createIFile(source);
     if (!src)
@@ -2729,18 +2816,18 @@ void copyFile(const char *target, const char *source, size32_t buffersize, ICopy
     OwnedIFile tgt = createIFile(target);
     if (!tgt)
         throw MakeStringException(-1, "copyFile: target path '%s' could not be created", target);
-    copyFile(tgt, src, buffersize,progress);
+    copyFile(tgt, src, buffersize, progress, copyFlags);
 }
 
-void copyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress)
+void copyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress, CFflags copyFlags)
 {
-    source->copyTo(target,buffersize,progress);
+    source->copyTo(target,buffersize,progress,copyFlags);
 }
 
-void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress, ICopyFileIntercept *copyintercept, bool usetmp)
+void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress, ICopyFileIntercept *copyintercept, bool usetmp, CFflags copyFlags)
 {
     if (!buffersize)
-        buffersize = 0x100000;
+        buffersize = DEFAULT_COPY_BLKSIZE;
 #ifdef _WIN32
     if (!usetmp) { 
         CFile *src = QUERYINTERFACE(source,CFile);
@@ -2770,7 +2857,10 @@ void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFilePr
         }
     }
 #endif
-    OwnedIFileIO sourceIO = source->open(IFOread);
+    IFEflags srcFlags = IFEnone;
+    if (copyFlags & CFflush_read)
+        srcFlags = IFEnocache;
+    OwnedIFileIO sourceIO = source->open(IFOread, srcFlags);
     if (!sourceIO)
         throw MakeStringException(-1, "copyFile: source '%s' not found", source->queryFilename());
 
@@ -2794,7 +2884,10 @@ void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFilePr
     }
     else
         dest = target;
-    targetIO.setown(dest->open(IFOcreate));
+    IFEflags tgtFlags = IFEnone;
+    if (copyFlags & CFflush_write)
+        tgtFlags = IFEnocache;
+    targetIO.setown(dest->open(IFOcreate, tgtFlags));
     if (!targetIO)
         throw MakeStringException(-1, "copyFile: target path '%s' could not be created", dest->queryFilename());
     MemoryAttr mb;

+ 14 - 10
system/jlib/jfile.hpp

@@ -40,7 +40,7 @@ enum IFOmode { IFOcreate, IFOread, IFOwrite, IFOreadwrite, IFOcreaterw };    //
 enum IFSHmode { IFSHnone, IFSHread=0x8, IFSHfull=0x10};   // sharing modes
 enum IFSmode { IFScurrent = FILE_CURRENT, IFSend = FILE_END, IFSbegin = FILE_BEGIN };    // seek mode
 enum CFPmode { CFPcontinue, CFPcancel, CFPstop };    // modes for ICopyFileProgress::onProgress return
-
+enum IFEflags { IFEnone=0x0, IFEnocache=0x1 };
 class CDateTime;
 
 interface IDirectoryIterator : extends IIteratorOf<IFile> 
@@ -52,6 +52,10 @@ interface IDirectoryIterator : extends IIteratorOf<IFile>
 
 };
 
+#define PGCFLUSH_BLKSIZE      0x200000
+#define DEFAULT_COPY_BLKSIZE  0x100000
+enum CFflags { CFnone=0x0, CFflush_read=0x1, CFflush_write=0x2, CFflush_rdwr=0x3 };
+
 
 #define IDDIunchanged   1
 #define IDDImodified    2
@@ -83,9 +87,9 @@ interface IFile :extends IInterface
     virtual fileBool isDirectory() = 0;
     virtual fileBool isFile() = 0;
     virtual fileBool isReadOnly() = 0;
-    virtual IFileIO * open(IFOmode mode) = 0;
+    virtual IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone) = 0;
     virtual IFileAsyncIO * openAsync(IFOmode mode) = 0;
-    virtual IFileIO * openShared(IFOmode mode,IFSHmode shmode) = 0;
+    virtual IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone) = 0;
     virtual const char * queryFilename() = 0;
     virtual bool remove() = 0;
     virtual void rename(const char *newTail) = 0;       // tail only preferred but can have full path if exactly matches existing dir
@@ -111,14 +115,14 @@ interface IFile :extends IInterface
                                   Semaphore *abortsem=NULL)=0; // returns NULL if timed out or abortsem signalled
     virtual bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime) = 0; // return false if doesn't exist
                                                                             // size is undefined if directory
-    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL) =0;
+    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL, CFflags copyFlags=CFnone) = 0;
     // if toOfs is (offset_t)-1 then copies entire file 
 
-    virtual void copyTo(IFile *dest, size32_t buffersize=0x100000, ICopyFileProgress *progress=NULL, bool usetmp=false)=0;
+    virtual void copyTo(IFile *dest, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL, bool usetmp=false, CFflags copyFlags=CFnone)=0;
 
     virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs=0, memsize_t len=(memsize_t)-1, bool write=false)=0;
 
-    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false) = 0;
+    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp=false,CFflags copyFlags=CFnone) = 0;
 
 
 };
@@ -239,8 +243,8 @@ extern jlib_decl void setPasswordProvider(IPasswordProvider * provider);
 
 
 extern jlib_decl size32_t read(IFileIO * in, offset_t pos, size32_t len, MemoryBuffer & buffer);
-extern jlib_decl void copyFile(const char *target, const char *source, size32_t buffersize=0x100000, ICopyFileProgress *progress=NULL);
-extern jlib_decl void copyFile(IFile * target, IFile * source,size32_t buffersize=0x100000, ICopyFileProgress *progress=NULL);
+extern jlib_decl void copyFile(const char *target, const char *source, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL,CFflags copyFlags=CFnone);
+extern jlib_decl void copyFile(IFile * target, IFile * source,size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL,CFflags copyFlags=CFnone);
 extern jlib_decl bool recursiveCreateDirectory(const char * path);              // only works locally, use IFile::createDirectory() for remote
 extern jlib_decl bool recursiveCreateDirectoryForFile(const char *filename);    // only works locally, use IFile::createDirectory() for remote
 
@@ -258,7 +262,7 @@ extern jlib_decl void createHardLink(const char* fileName, const char* existingF
 
 extern jlib_decl IFile * createIFile(const char * filename);
 extern jlib_decl IFile * createIFile(MemoryBuffer & buffer);
-extern jlib_decl IFileIO * createIFileIO(HANDLE handle);
+extern jlib_decl IFileIO * createIFileIO(HANDLE handle,IFOmode=IFOreadwrite,IFEflags extraFlags=IFEnone);
 extern jlib_decl IDirectoryIterator * createDirectoryIterator(const char * path = NULL, const char * wildcard = NULL);
 extern jlib_decl IDirectoryIterator * createNullDirectoryIterator();
 extern jlib_decl IFileIO * createIORange(IFileIO * file, offset_t header, offset_t length);     // restricts input/output to a section of a file.
@@ -605,7 +609,7 @@ interface ICopyFileIntercept
 {
     virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz)=0;
 };
-extern jlib_decl void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress, ICopyFileIntercept *copyintercept, bool usetmp);
+extern jlib_decl void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFileProgress *progress, ICopyFileIntercept *copyintercept, bool usetmp, CFflags copyFlags=CFnone);
 extern jlib_decl void makeTempCopyName(StringBuffer &tmpname,const char *destname);
 extern jlib_decl size32_t SendFile(ISocket *target, IFileIO *fileio,offset_t start,size32_t len);
 extern jlib_decl void asyncClose(IFileIO *io);

+ 11 - 7
system/jlib/jfile.ipp

@@ -42,9 +42,9 @@ public:
     virtual fileBool isDirectory();
     virtual fileBool isFile();
     virtual fileBool isReadOnly();
-    virtual IFileIO * open(IFOmode mode);
+    virtual IFileIO * open(IFOmode mode, IFEflags extraFlags=IFEnone);
     virtual IFileAsyncIO * openAsync(IFOmode mode);
-    virtual IFileIO * openShared(IFOmode mode,IFSHmode shmode);
+    virtual IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
     virtual const char * queryFilename();
     virtual bool remove();
     virtual void rename(const char *newTail);
@@ -73,18 +73,18 @@ public:
     virtual void setShareMode(IFSHmode shmode);
     virtual bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime);
 
-    virtual void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress=NULL);
+    virtual void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress=NULL, CFflags copyFlags=CFnone);
     // if toOfs is (offset_t)-1 then copies entire file 
 
-    virtual void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp);
+    virtual void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
 
     virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs=0, memsize_t len=(memsize_t)-1, bool write=false);
     
-    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp)
+    virtual void treeCopyTo(IFile *dest,IpSubNet &subnet,IpAddress &resfrom,bool usetmp,CFflags copyFlags=CFnone)
     {
         // not really for local but simulate
         GetHostIp(resfrom);
-        copyTo(dest,0x100000,NULL,usetmp);
+        copyTo(dest,DEFAULT_COPY_BLKSIZE,NULL,usetmp,copyFlags);
     }
     
 protected:
@@ -96,7 +96,7 @@ protected:
 class jlib_decl CFileIO : public CInterface, implements IFileIO
 {
 public:
-    CFileIO(HANDLE,IFSHmode _sharemode);
+    CFileIO(HANDLE,IFOmode _openmode,IFSHmode _sharemode,IFEflags _extraFlags);
     ~CFileIO();
     IMPLEMENT_IINTERFACE
 
@@ -119,6 +119,10 @@ protected:
     HANDLE              file;
     bool                throwOnError;
     IFSHmode            sharemode;
+    IFOmode             openmode;
+    IFEflags            extraFlags;
+    atomic_t            bytesRead;
+    atomic_t            bytesWritten;
 private:
     void setPos(offset_t pos);
 

+ 1 - 0
system/jlib/jlog.cpp

@@ -2472,6 +2472,7 @@ int CSysLogEventLogger::writeDataLog(size32_t datasize, byte const * data)
     }
 #ifdef __linux__
     fdatasync(dataLogFile);
+    posix_fadvise(dataLogFile, 0, 0, POSIX_FADV_DONTNEED);
 #endif
     return fpos;
 }