Kaynağa Gözat

Implement IFileIO::flush()

Add a flush function to file file io, so that problems writing to the
disk can be detected before the file is closed in the destructor -
by which point it is too late to throw an exception.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 13 yıl önce
ebeveyn
işleme
be28914660

+ 4 - 0
common/remote/sockfile.cpp

@@ -2290,6 +2290,10 @@ public:
         return got;
     }
 
+    virtual void flush()
+    {
+    }
+
     const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
     {
         unsigned tries=0;

+ 1 - 0
dali/datest/datest.cpp

@@ -2348,6 +2348,7 @@ NULL
         }
         virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { UNIMPLEMENTED; return 0; }
         virtual void setSize(offset_t size) { UNIMPLEMENTED; }
+        virtual void flush() { }
     };
 
     const char *newFileName = "xpathTests.out";

+ 8 - 0
roxie/ccd/ccdfile.cpp

@@ -67,6 +67,7 @@ public:
     IMPLEMENT_IINTERFACE;
     virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
     virtual offset_t size() { THROWNOTOPEN; }
+    virtual void flush() { THROWNOTOPEN; }
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
     virtual void setSize(offset_t size) { UNIMPLEMENTED; }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
@@ -336,6 +337,13 @@ public:
         }
     }
 
+    virtual void flush()
+    {
+        CriticalBlock b(crit);
+        if (current.get() != &failure)
+            current->flush();
+    }
+
     virtual offset_t size() 
     { 
         CriticalBlock b(crit);

+ 40 - 3
system/jlib/jfile.cpp

@@ -1545,7 +1545,7 @@ IFileIO *_createIFileIO(const void *buffer, unsigned sz, bool readOnly)
             memcpy((byte *)buffer+pos, data, len);
             return len;
         }
-
+        virtual void flush() {}
         virtual void setSize(offset_t size)
         {
             if (size > mb.length())
@@ -1697,7 +1697,6 @@ extern jlib_decl IFileIO *createIFileIO(HANDLE handle)
     return new CFileIO(handle,IFSHfull);
 }
 
-
 offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
 {
     if (!file)
@@ -1742,6 +1741,12 @@ CFileIO::~CFileIO()
     file = NULLFILE;
 }
 
+void CFileIO::flush()
+{
+    if (!FlushFileBuffers(file))
+        throw MakeOsException(GetLastError(),"CFileIO::flush");
+}
+
 offset_t CFileIO::size()
 {
     LARGE_INTEGER pos;
@@ -1813,10 +1818,17 @@ CFileIO::~CFileIO()
     if (file != NULLFILE) {
         close(file);
         file=NULLFILE;
-
     }
 }
 
+void CFileIO::flush()
+{
+    CriticalBlock procedure(cs);
+    if (fdatasync(file) != 0)
+        throw MakeOsException(DISK_FULL_EXCEPTION_CODE,"CFileIO::flush");
+}
+
+
 offset_t CFileIO::size()
 {
     CriticalBlock procedure(cs);
@@ -1894,6 +1906,25 @@ size32_t CFileRangeIO::write(offset_t pos, size32_t len, const void * data)
 
 //--------------------------------------------------------------------------
 
+void CFileAsyncIO::flush()
+{
+    //This could wait until all pending results are done.
+    loop
+    {
+        Owned<IFileAsyncResult> next;
+        {
+            CriticalBlock block(cs);
+            if (results.ordinality())
+                next.set(&results.tos());
+        }
+        if (!next)
+            return;
+
+        size32_t value;
+        next->getResult(value, true);
+    }
+}
+
 offset_t CFileAsyncIO::appendFile(IFile *file,offset_t pos,offset_t len)
 {
     // will implemented if needed
@@ -6101,6 +6132,12 @@ public:
         Owned<IFileIO> io = open();
         return io->write(pos,len,data);
     }
+    virtual void flush()
+    {
+        CriticalBlock block(sect);
+        if (cachedio)
+            cachedio->flush();
+    }
     offset_t appendFile(IFile *file,offset_t pos,offset_t len)
     {
         CriticalBlock block(sect);

+ 1 - 0
system/jlib/jfile.hpp

@@ -164,6 +164,7 @@ interface IFileIO : public IInterface
     virtual size32_t write(offset_t pos, size32_t len, const void * data) = 0;
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) =0;
     virtual void setSize(offset_t size) = 0;
+    virtual void flush() = 0;
 };
 
 interface IFileIOCache : extends IInterface

+ 3 - 1
system/jlib/jfile.ipp

@@ -106,6 +106,7 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual void setSize(offset_t size);
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
+    virtual void flush();
 
     bool create(const char * filename, bool replace);
     bool open(const char * filename);
@@ -134,7 +135,7 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual void setSize(offset_t size) { UNIMPLEMENTED; }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
-
+    virtual void flush() { io->flush(); }
 
 protected:
     Linked<IFileIO>     io;
@@ -155,6 +156,7 @@ public:
     virtual offset_t size();
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
+    virtual void flush();
 
     virtual void setSize(offset_t size);
     virtual IFileAsyncResult *readAsync(offset_t pos, size32_t len, void * data);

+ 1 - 0
thorlcr/activities/thactivityutil.cpp

@@ -824,6 +824,7 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
     virtual void setSize(offset_t size) { primaryio->setSize(size); }
+    virtual void flush() { primaryio->flush(); }
 };
 
 IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)

+ 6 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1199,6 +1199,12 @@ public:
         checkOpen();
         return iFileIO->write(pos, len, data);
     }
+    virtual void flush()
+    {
+        CriticalBlock b(crit);
+        if (iFileIO)
+            iFileIO->flush();
+    }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
     {
         CriticalBlock b(crit);