Просмотр исходного кода

gh-254 Code assumes read() will read full requested amount

Various code in jlib was assuming that read() and pread() would never
return less than the requested amount except at end-of-file. The Posix
standard states that they may do so if a signal is received during the
read operation after some data has been retrieved. Testing on Lustre
suggests that this ican sometimes happen. We previously had code in to
handle cases where calls to pread() were interrupted before data had
been read, but they would not help all cases.

The calls to ::read / ::pread were not as well isolated as they should
have been (and there were also calls to _read to confuse the issue).

Refactor the code to common up calls to ::read as much as possible, and
add logic to retry interrupted reads.
read full requested amount.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 лет назад
Родитель
Сommit
295c8fff32
5 измененных файлов с 141 добавлено и 159 удалено
  1. 4 42
      system/jlib/jfile.cpp
  2. 0 1
      system/jlib/jfile.hpp
  3. 132 109
      system/jlib/jio.cpp
  4. 3 5
      system/jlib/jio.hpp
  5. 2 2
      system/jlib/jutil.cpp

+ 4 - 42
system/jlib/jfile.cpp

@@ -1627,8 +1627,8 @@ public:
     size32_t read(offset_t _pos, size32_t len, void * data)
     {
         checkPos("read",_pos);
-        size32_t ret;
 #ifdef _WIN32
+        // Can't use checked_read because don't have the c fileno for it
         DWORD numRead;
         if (ReadFile(file,data,len,&numRead,NULL) == 0) {
             DWORD err = GetLastError();
@@ -1636,12 +1636,9 @@ public:
                 return 0;
             throw MakeOsException(GetLastError(),"CSequentialFileIO::read"); 
         }
-
-        ret = (size32_t)numRead;
+        size32_t ret = (size32_t)numRead;
 #else
-        ret = ::read(file,data,len);
-        if (ret==(size32_t)-1)
-            throw MakeErrnoException(errno,"CSequentialFileIO::read");
+        size32_t ret = checked_read(file, data, len);
 #endif
         pos += ret;
         return ret;
@@ -1798,8 +1795,6 @@ void CFileIO::setSize(offset_t pos)
         throw MakeOsException(GetLastError(), "CFileIO::setSize");
 }
 
-void setIORetryCount(unsigned _ioRetryCount) { } // linux only
-
 #else
 
 //-- Unix implementation ----------------------------------------------------
@@ -1831,43 +1826,10 @@ offset_t CFileIO::size()
     return length;
 }
 
-static unsigned ioRetryCount=0;
-void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
-{
-    ioRetryCount = _ioRetryCount;
-}
-
 size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 {
     if (0==len) return 0;
-    size32_t ret;
-    if (!ioRetryCount)
-    {
-        ret = pread(file,data,len,pos);
-        if ((size32_t)-1 != ret)
-            return ret;
-    }
-    else
-    {
-        unsigned attempt=1;
-        do
-        {
-            unsigned __int64 startCycles = get_cycles_now();
-            ret = pread(file,data,len,pos);
-            if ((size32_t)-1 != ret)
-                return ret;
-            StringBuffer callStr("pread");
-            callStr.append("[errno=").append(errno);
-            unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
-            callStr.append(", took=").append(elapsedMs);
-            callStr.append(", attempt=").append(attempt).append("](handle=");
-            callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
-            PROGLOG("%s", callStr.str());
-            attempt++;
-        }
-        while (attempt<=ioRetryCount);
-    }
-    throw MakeErrnoException(errno,"CFileIO::read");
+    return checked_pread(file, data, len, pos);
 }
 
 void CFileIO::setPos(offset_t newPos)

+ 0 - 1
system/jlib/jfile.hpp

@@ -597,7 +597,6 @@ extern jlib_decl bool containsFileWildcard(const char * path);
 extern jlib_decl bool isDirectory(const char * path);
 
 extern jlib_decl IFileIOCache* createFileIOCache(unsigned max);
-extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails, linux only
 extern jlib_decl IFile * createSentinelTarget();
 extern jlib_decl void writeSentinelFile(IFile * file);
 extern jlib_decl void removeSentinelFile(IFile * file);

+ 132 - 109
system/jlib/jio.cpp

@@ -45,22 +45,10 @@
 #define MINCOMPRESSEDROWSIZE 16
 #define MAXCOMPRESSEDROWSIZE 0x4000
 
-
-static inline size32_t checked_write( int handle, const void *buffer, unsigned int count )
-{
-    int ret=_write(handle,buffer,count);
-    if ((size32_t)ret!=count) { 
-        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
-    }
-    return (size32_t)ret;
-}
-
-static inline size32_t checked_read( int handle, void *buffer, unsigned int count )
+static unsigned ioRetryCount=0;
+void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
 {
-    int ret=_read(handle,buffer,count);
-    if (ret==-1) 
-        throw MakeErrnoException("checked_read");
-    return (size32_t)ret;
+    ioRetryCount = _ioRetryCount;
 }
 
 static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin )
@@ -71,7 +59,135 @@ static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin
     return ret;
 }
 
+extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
+{
+    if (0==len) return 0;
+    unsigned attempts = 0;
+    size32_t ret = 0;
+    unsigned __int64 startCycles = get_cycles_now();
+    loop
+    {
+        size_t readNow = _read(file, buffer, len);
+        if (readNow == (size32_t)-1)
+        {
+            switch (errno)
+            {
+            case EINTR:
+                readNow = 0;
+                break;
+            default:
+                if (attempts < ioRetryCount)
+                {
+                    attempts++;
+                    StringBuffer callStr("read");
+                    callStr.append("[errno=").append(errno);
+                    unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
+                    callStr.append(", took=").append(elapsedMs);
+                    callStr.append(", attempt=").append(attempts).append("](handle=");
+                    callStr.append(file).append(", len=").append(len).append(")");
+                    PROGLOG("%s", callStr.str());
+                    readNow = 0;
+                    break;
+                }
+                throw MakeErrnoException(errno, "checked_read");
+            }
+        }
+        else if (!readNow)
+            break;
+        ret += readNow;
+        if (readNow == len)
+            break;
+        buffer = ((char *) buffer) + readNow;
+        len -= readNow;
+    }
+    return ret;
+}
+
+#ifdef WIN32
+static bool atomicsupported = true;
+static CriticalSection atomicsection;
+#endif
 
+extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
+{
+    if (0==len) return 0;
+#ifdef WIN32
+    if (atomicsupported)
+    {
+        HANDLE hFile = (HANDLE)_get_osfhandle(file);
+        DWORD rread;
+        OVERLAPPED overlapped;
+        memset(&overlapped, 0, sizeof(overlapped));
+        overlapped.Offset = (DWORD) pos;
+        overlapped.OffsetHigh = (DWORD)(pos>>32);
+        if (ReadFile(hFile, buffer, len, &rread, &overlapped))
+            return rread;
+        int err = (int)GetLastError();
+        if (err == ERROR_HANDLE_EOF)
+            return 0;
+        if (err == ERROR_INVALID_PARAMETER) // Win98 etc
+            atomicsupported = false;
+        else
+            throw MakeOsException(GetLastError(), "checked_pread");
+    }
+    {
+        CriticalBlock blk(atomicsection);
+        checked_lseeki64(file, pos, FILE_BEGIN);
+        return checked_read(file, buffer, len);
+    }
+#else
+    size32_t ret = 0;
+    unsigned attempts = 0;
+    unsigned __int64 startCycles = get_cycles_now();
+    loop
+    {
+        size_t readNow = ::pread(file, buffer, len, pos);
+        if (readNow == (size32_t)-1)
+        {
+            switch (errno)
+            {
+            case EINTR:
+                readNow = 0;
+                break;
+            default:
+                if (attempts < ioRetryCount)
+                {
+                    attempts++;
+                    StringBuffer callStr("pread");
+                    callStr.append("[errno=").append(errno);
+                    unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
+                    callStr.append(", took=").append(elapsedMs);
+                    callStr.append(", attempt=").append(attempts).append("](handle=");
+                    callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
+                    PROGLOG("%s", callStr.str());
+                    readNow = 0;
+                    break;
+                }
+                throw MakeErrnoException(errno,"checked_pread");
+            }
+        }
+        else if (!readNow)
+            break;
+        ret += readNow;
+        if (readNow == len)
+            break;
+        pos += readNow;
+        buffer = ((char *) buffer) + readNow;
+        len -= readNow;
+    }
+    return ret;
+#endif
+}
+
+static inline size32_t checked_write( int handle, const void *buffer, size32_t count )
+{
+    int ret=_write(handle,buffer,count);
+    if ((size32_t)ret != count)
+    {
+        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
+    }
+    return (size32_t)ret;
+}
 
 class CReadSeq : public CInterface, public IReadSeq
 {
@@ -125,7 +241,7 @@ class CReadSeq : public CInterface, public IReadSeq
         if (endpos-nextbufpos<(offset_t)rd)
             rd = (size32_t)(endpos-nextbufpos);
         if (rd) 
-            rd = atomicRead(fh, buffer+left, rd,nextbufpos);
+            rd = checked_pread(fh, buffer+left, rd, nextbufpos);
         nextbufpos += rd;
         bytesInBuffer = left+rd;
         ptr = buffer;
@@ -923,99 +1039,6 @@ unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
     return ret;
 }
 
-
-// atomic read write
-#ifdef _WIN32
-
-
-static bool atomicsupported=false;
-static CriticalSection atomicsection;
-
-size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset)
-{
-    if (atomicsupported) {
-        HANDLE hFile = (HANDLE)_get_osfhandle(fildes);
-        DWORD rread;
-        OVERLAPPED overlapped;
-        memset(&overlapped,0,sizeof(overlapped));
-        overlapped.Offset = (DWORD)offset;
-        overlapped.OffsetHigh = (DWORD)(offset>>32);
-        if (ReadFile(hFile,buf,nbyte,&rread,&overlapped)) {
-            return rread;
-        }
-        int err=(int)GetLastError();
-        if (err==ERROR_HANDLE_EOF)
-            return 0;
-        if (err==ERROR_INVALID_PARAMETER) { // Win98 etc
-            atomicsupported = false;
-        }
-        else {
-            IException *e = MakeErrnoException(5, "AtomicRead: ReadFile failed, handle = %d, GetLastError()==%x\n", fildes, err);
-            PrintExceptionLog(e, NULL);
-            throw e;
-        }
-    }
-    {
-        CriticalBlock blk(atomicsection);
-        checked_lseeki64(fildes,offset,FILE_BEGIN);
-        return checked_read(fildes,buf,nbyte);
-    }
-}
-
-size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset)
-{
-    if (atomicsupported) {
-        HANDLE hFile = (HANDLE)_get_osfhandle(fildes);
-        DWORD rwrit;
-        OVERLAPPED overlapped;
-        memset(&overlapped,0,sizeof(overlapped));
-        overlapped.Offset = (DWORD)offset;
-        overlapped.OffsetHigh = (DWORD)(offset>>32);
-        if (WriteFile(hFile,buf,nbyte,&rwrit,&overlapped)) {
-            return rwrit;
-        }
-        int err=(int)GetLastError();
-        if (err==ERROR_INVALID_PARAMETER) { // Win98 etc
-            atomicsupported = false;
-        }
-        else {
-            PrintLog("AtomicWrite: WriteFile failed, handle = %d, GetLastError()==%x\n", fildes, err);
-            errno = 5; // general i/o error
-            return -1;
-        }
-    }
-    {
-        CriticalBlock blk(atomicsection);
-        checked_lseeki64(fildes,offset,FILE_BEGIN);
-        return checked_write(fildes,buf,nbyte);
-    }
-}
-
-#else
-
-
-size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset)
-{
-    if (0==nbyte) return 0;
-    int ret = pread(fildes,buf,nbyte,offset);
-    if (ret==-1) 
-        throw MakeErrnoException("atomicRead");
-    return (size32_t)ret;
-}
-
-size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset)
-{
-    int ret = pwrite(fildes,buf,nbyte,offset);
-    if ((size32_t)ret!=nbyte) 
-        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "atomicWrite");
-    return (size32_t)ret;
-}
-
-#endif
-
-
-
-
 /////////////////
 
 CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)

+ 3 - 5
system/jlib/jio.hpp

@@ -135,11 +135,9 @@ extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delt
 
 extern jlib_decl unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize);
 
-//atomic read/write - used with _open 
-// avoids the need to seek separately
-
-extern jlib_decl size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset);
-extern jlib_decl size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset);
+extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails
+extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len);
+extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos);
 
 class CachedRecordSize
 {

+ 2 - 2
system/jlib/jutil.cpp

@@ -2192,10 +2192,10 @@ StringBuffer jlib_decl passwordInput(const char* prompt, StringBuffer& passwd)
     set_term.c_lflag &= ~(ECHO|ECHOE|ECHOK|ECHONL);
     tcsetattr(termfd, TCSAFLUSH, &set_term);
     char c = EOF;
-    int rd = _read(termfd,&c,1);
+    int rd = ::read(termfd,&c,1);
     while ((rd==1)&&(c!='\r')&&(c!='\n')&&(c!=EOF)) {
         passwd.append(c);
-        rd = _read(termfd,&c,1);
+        rd = ::read(termfd,&c,1);
     }
     int err = (rd<0)?errno:0;
     tcsetattr(termfd, TCSAFLUSH, &saved_term);