Browse Source

Merge pull request #1217 from ghalliday/flush

Implement IFileIO::flush()

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
02fe30a3a9

+ 23 - 15
common/remote/sockfile.cpp

@@ -2195,24 +2195,11 @@ public:
         disconnectonexit = false;
     }
 
-
     ~CRemoteFileIO()
     {
         if (handle) {
             try {
-                MemoryBuffer sendBuffer;
-                initSendBuffer(sendBuffer);
-                sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
-                parent->sendRemoteCommand(sendBuffer,false);
-            }
-            catch (IDAFS_Exception *e) {
-                if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle)) { // ignore already disconnected
-                    StringBuffer s;
-                    e->errorMessage(s);
-                    WARNLOG("CRemoteFileIO close file: %s",s.str());
-                }
-                e->Release();
-                
+                close();
             }
             catch (IException *e) {
                 StringBuffer s;
@@ -2223,7 +2210,24 @@ public:
         }
         if (disconnectonexit)
             parent->disconnect();
-        handle = 0;
+    }
+
+    void close()
+    {
+        if (handle) {
+            try {
+                MemoryBuffer sendBuffer;
+                initSendBuffer(sendBuffer);
+                sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
+                parent->sendRemoteCommand(sendBuffer,false);
+            }
+            catch (IDAFS_Exception *e) {
+                if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
+                    throw;
+                e->Release();
+            }
+            handle = 0;
+        }
     }
 
     bool open(IFOmode _mode,compatIFSHmode _compatmode) 
@@ -2290,6 +2294,10 @@ public:
         return got;
     }
 
+    virtual void flush()
+    {
+    }
+
     const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
     {
         unsigned tries=0;

+ 2 - 0
dali/datest/datest.cpp

@@ -2348,6 +2348,8 @@ NULL
         }
         virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { UNIMPLEMENTED; return 0; }
         virtual void setSize(offset_t size) { UNIMPLEMENTED; }
+        virtual void flush() { }
+        virtual void close() { }
     };
 
     const char *newFileName = "xpathTests.out";

+ 9 - 0
roxie/ccd/ccdfile.cpp

@@ -67,9 +67,11 @@ public:
     IMPLEMENT_IINTERFACE;
     virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
     virtual offset_t size() { THROWNOTOPEN; }
+    virtual void flush() { THROWNOTOPEN; }
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
     virtual void setSize(offset_t size) { UNIMPLEMENTED; }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
+    virtual void close() { }
 } failure;
 
 class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelayedFile
@@ -336,6 +338,13 @@ public:
         }
     }
 
+    virtual void flush()
+    {
+        CriticalBlock b(crit);
+        if (current.get() != &failure)
+            current->flush();
+    }
+
     virtual offset_t size() 
     { 
         CriticalBlock b(crit);

+ 99 - 14
system/jlib/jfile.cpp

@@ -1545,7 +1545,8 @@ IFileIO *_createIFileIO(const void *buffer, unsigned sz, bool readOnly)
             memcpy((byte *)buffer+pos, data, len);
             return len;
         }
-
+        virtual void flush() {}
+        virtual void close() {}
         virtual void setSize(offset_t size)
         {
             if (size > mb.length())
@@ -1697,7 +1698,6 @@ extern jlib_decl IFileIO *createIFileIO(HANDLE handle)
     return new CFileIO(handle,IFSHfull);
 }
 
-
 offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
 {
     if (!file)
@@ -1738,10 +1738,35 @@ CFileIO::CFileIO(HANDLE handle, IFSHmode _sharemode)
 
 CFileIO::~CFileIO()
 {
-    if (file != NULLFILE) CloseHandle(file);
+    try
+    {
+        //note this will not call the virtual close() if anyone ever derived from this class.
+        //the clean fix is to move this code to beforeDispose()
+        close();
+    }
+    catch (IException * e)
+    {
+        EXCLOG(e, "CFileIO::~CFileIO");
+        e->Release();
+    }
+}
+
+void CFileIO::close()
+{
+    if (file != NULLFILE)
+    {
+        if (!CloseHandle(file))
+            throw MakeOsException(GetLastError(),"CFileIO::close");
+    }
     file = NULLFILE;
 }
 
+void CFileIO::flush()
+{
+    if (!FlushFileBuffers(file))
+        throw MakeOsException(GetLastError(),"CFileIO::flush");
+}
+
 offset_t CFileIO::size()
 {
     LARGE_INTEGER pos;
@@ -1810,13 +1835,34 @@ CFileIO::CFileIO(HANDLE handle, IFSHmode _sharemode)
 
 CFileIO::~CFileIO()
 {
+    try
+    {
+        close();
+    }
+    catch (IException * e)
+    {
+        EXCLOG(e, "CFileIO::~CFileIO");
+        e->Release();
+    }
+}
+
+void CFileIO::close()
+{
     if (file != NULLFILE) {
-        close(file);
+        if (::close(file) < 0)
+            throw MakeErrnoException(errno,"CFileIO::close");
         file=NULLFILE;
-
     }
 }
 
+void CFileIO::flush()
+{
+    CriticalBlock procedure(cs);
+    if (fdatasync(file) != 0)
+        throw MakeOsException(DISK_FULL_EXCEPTION_CODE,"CFileIO::flush");
+}
+
+
 offset_t CFileIO::size()
 {
     CriticalBlock procedure(cs);
@@ -1894,6 +1940,29 @@ size32_t CFileRangeIO::write(offset_t pos, size32_t len, const void * data)
 
 //--------------------------------------------------------------------------
 
+CFileAsyncIO::~CFileAsyncIO()
+{
+    try
+    {
+        close();
+    }
+    catch (IException * e)
+    {
+        EXCLOG(e, "CFileAsyncIO::~CFileAsyncIO");
+        e->Release();
+    }
+}
+
+void CFileAsyncIO::flush()
+{
+    // wait for all outstanding results
+    CriticalBlock block(cs);
+    ForEachItemInRev(i,results) {
+        size32_t dummy;
+        results.item(i).getResult(dummy,true);
+    }
+}
+
 offset_t CFileAsyncIO::appendFile(IFile *file,offset_t pos,offset_t len)
 {
     // will implemented if needed
@@ -1968,15 +2037,15 @@ CFileAsyncIO::CFileAsyncIO(HANDLE handle, IFSHmode _sharemode)
     sharemode = _sharemode;
 }
 
-CFileAsyncIO::~CFileAsyncIO()
+void CFileAsyncIO::close()
 {
-    CriticalBlock block(cs);
-    ForEachItemInRev(i,results) {
-        size32_t dummy;
-        results.item(i).getResult(dummy,true);
-    }
+    flush();
     // wait for all outstanding results
-    if (file != NULLFILE) CloseHandle(file);
+    if (file != NULLFILE)
+    {
+        if (!CloseHandle(file))
+            throw MakeOsException(GetLastError(),"CFileAsyncIO::close");
+    }
     file = NULLFILE;
 }
 
@@ -2138,11 +2207,12 @@ CFileAsyncIO::CFileAsyncIO(HANDLE handle, IFSHmode _sharemode)
 }
 
 
-CFileAsyncIO::~CFileAsyncIO()
+void CFileAsyncIO::close()
 {
     if (file != NULLFILE) {
         aio_cancel(file,NULL);
-        _lclose(file);
+        if (_lclose(file) < 0)
+            throw MakeErrnoException(errno, "CFileAsyncIO::close");
     }
     file=NULLFILE;
 }
@@ -6101,6 +6171,21 @@ public:
         Owned<IFileIO> io = open();
         return io->write(pos,len,data);
     }
+    virtual void flush()
+    {
+        CriticalBlock block(sect);
+        if (cachedio)
+            cachedio->flush();
+    }
+    virtual void close()
+    {
+        CriticalBlock block(sect);
+        if (cachedio)
+        {
+            cachedio->close();
+            cachedio.clear();
+        }
+    }
     offset_t appendFile(IFile *file,offset_t pos,offset_t len)
     {
         CriticalBlock block(sect);

+ 2 - 0
system/jlib/jfile.hpp

@@ -164,6 +164,8 @@ interface IFileIO : public IInterface
     virtual size32_t write(offset_t pos, size32_t len, const void * data) = 0;
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) =0;
     virtual void setSize(offset_t size) = 0;
+    virtual void flush() = 0;
+    virtual void close() = 0;       // no other access is allowed after this call
 };
 
 interface IFileIOCache : extends IInterface

+ 6 - 1
system/jlib/jfile.ipp

@@ -106,6 +106,8 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual void setSize(offset_t size);
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
+    virtual void flush();
+    virtual void close();
 
     bool create(const char * filename, bool replace);
     bool open(const char * filename);
@@ -134,7 +136,8 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual void setSize(offset_t size) { UNIMPLEMENTED; }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
-
+    virtual void flush() { io->flush(); }
+    virtual void close() { io->close(); }
 
 protected:
     Linked<IFileIO>     io;
@@ -155,6 +158,8 @@ public:
     virtual offset_t size();
     virtual size32_t write(offset_t pos, size32_t len, const void * data);
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
+    virtual void flush();
+    virtual void close();
 
     virtual void setSize(offset_t size);
     virtual IFileAsyncResult *readAsync(offset_t pos, size32_t len, void * data);

+ 2 - 0
thorlcr/activities/thactivityutil.cpp

@@ -824,6 +824,8 @@ public:
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
     virtual void setSize(offset_t size) { primaryio->setSize(size); }
+    virtual void flush() { primaryio->flush(); }
+    virtual void close() { primaryio->close(); }
 };
 
 IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)

+ 13 - 6
thorlcr/graph/thgraphslave.cpp

@@ -1172,12 +1172,6 @@ public:
         iFileIO.clear();
     }
 
-    void close()
-    {
-        CriticalBlock b(crit);
-        iFileIO.clear();
-    }
-
     const char *queryFindString() const { return filename.get(); } // for string HT
 
 // IFileIO impl.
@@ -1199,6 +1193,19 @@ public:
         checkOpen();
         return iFileIO->write(pos, len, data);
     }
+    virtual void flush()
+    {
+        CriticalBlock b(crit);
+        if (iFileIO)
+            iFileIO->flush();
+    }
+    virtual void close()
+    {
+        CriticalBlock b(crit);
+        if (iFileIO)
+            iFileIO->close();
+        iFileIO.clear();
+    }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
     {
         CriticalBlock b(crit);