Browse Source

Merge pull request #257 from richardkchapman/ReadAll

gh-254 Code assumes read() will read full requested amount
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 14 years ago
parent
commit
ccd3fa4d7a
5 changed files with 141 additions and 159 deletions
  1. 4 42
      system/jlib/jfile.cpp
  2. 0 1
      system/jlib/jfile.hpp
  3. 132 109
      system/jlib/jio.cpp
  4. 3 5
      system/jlib/jio.hpp
  5. 2 2
      system/jlib/jutil.cpp

+ 4 - 42
system/jlib/jfile.cpp

@@ -1627,8 +1627,8 @@ public:
     size32_t read(offset_t _pos, size32_t len, void * data)
     size32_t read(offset_t _pos, size32_t len, void * data)
     {
     {
         checkPos("read",_pos);
         checkPos("read",_pos);
-        size32_t ret;
 #ifdef _WIN32
 #ifdef _WIN32
+        // Can't use checked_read because don't have the c fileno for it
         DWORD numRead;
         DWORD numRead;
         if (ReadFile(file,data,len,&numRead,NULL) == 0) {
         if (ReadFile(file,data,len,&numRead,NULL) == 0) {
             DWORD err = GetLastError();
             DWORD err = GetLastError();
@@ -1636,12 +1636,9 @@ public:
                 return 0;
                 return 0;
             throw MakeOsException(GetLastError(),"CSequentialFileIO::read"); 
             throw MakeOsException(GetLastError(),"CSequentialFileIO::read"); 
         }
         }
-
-        ret = (size32_t)numRead;
+        size32_t ret = (size32_t)numRead;
 #else
 #else
-        ret = ::read(file,data,len);
-        if (ret==(size32_t)-1)
-            throw MakeErrnoException(errno,"CSequentialFileIO::read");
+        size32_t ret = checked_read(file, data, len);
 #endif
 #endif
         pos += ret;
         pos += ret;
         return ret;
         return ret;
@@ -1798,8 +1795,6 @@ void CFileIO::setSize(offset_t pos)
         throw MakeOsException(GetLastError(), "CFileIO::setSize");
         throw MakeOsException(GetLastError(), "CFileIO::setSize");
 }
 }
 
 
-void setIORetryCount(unsigned _ioRetryCount) { } // linux only
-
 #else
 #else
 
 
 //-- Unix implementation ----------------------------------------------------
 //-- Unix implementation ----------------------------------------------------
@@ -1831,43 +1826,10 @@ offset_t CFileIO::size()
     return length;
     return length;
 }
 }
 
 
-static unsigned ioRetryCount=0;
-void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
-{
-    ioRetryCount = _ioRetryCount;
-}
-
 size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 {
 {
     if (0==len) return 0;
     if (0==len) return 0;
-    size32_t ret;
-    if (!ioRetryCount)
-    {
-        ret = pread(file,data,len,pos);
-        if ((size32_t)-1 != ret)
-            return ret;
-    }
-    else
-    {
-        unsigned attempt=1;
-        do
-        {
-            unsigned __int64 startCycles = get_cycles_now();
-            ret = pread(file,data,len,pos);
-            if ((size32_t)-1 != ret)
-                return ret;
-            StringBuffer callStr("pread");
-            callStr.append("[errno=").append(errno);
-            unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
-            callStr.append(", took=").append(elapsedMs);
-            callStr.append(", attempt=").append(attempt).append("](handle=");
-            callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
-            PROGLOG("%s", callStr.str());
-            attempt++;
-        }
-        while (attempt<=ioRetryCount);
-    }
-    throw MakeErrnoException(errno,"CFileIO::read");
+    return checked_pread(file, data, len, pos);
 }
 }
 
 
 void CFileIO::setPos(offset_t newPos)
 void CFileIO::setPos(offset_t newPos)

+ 0 - 1
system/jlib/jfile.hpp

@@ -597,7 +597,6 @@ extern jlib_decl bool containsFileWildcard(const char * path);
 extern jlib_decl bool isDirectory(const char * path);
 extern jlib_decl bool isDirectory(const char * path);
 
 
 extern jlib_decl IFileIOCache* createFileIOCache(unsigned max);
 extern jlib_decl IFileIOCache* createFileIOCache(unsigned max);
-extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails, linux only
 extern jlib_decl IFile * createSentinelTarget();
 extern jlib_decl IFile * createSentinelTarget();
 extern jlib_decl void writeSentinelFile(IFile * file);
 extern jlib_decl void writeSentinelFile(IFile * file);
 extern jlib_decl void removeSentinelFile(IFile * file);
 extern jlib_decl void removeSentinelFile(IFile * file);

+ 132 - 109
system/jlib/jio.cpp

@@ -45,22 +45,10 @@
 #define MINCOMPRESSEDROWSIZE 16
 #define MINCOMPRESSEDROWSIZE 16
 #define MAXCOMPRESSEDROWSIZE 0x4000
 #define MAXCOMPRESSEDROWSIZE 0x4000
 
 
-
-static inline size32_t checked_write( int handle, const void *buffer, unsigned int count )
-{
-    int ret=_write(handle,buffer,count);
-    if ((size32_t)ret!=count) { 
-        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
-    }
-    return (size32_t)ret;
-}
-
-static inline size32_t checked_read( int handle, void *buffer, unsigned int count )
+static unsigned ioRetryCount=0;
+void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
 {
 {
-    int ret=_read(handle,buffer,count);
-    if (ret==-1) 
-        throw MakeErrnoException("checked_read");
-    return (size32_t)ret;
+    ioRetryCount = _ioRetryCount;
 }
 }
 
 
 static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin )
 static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin )
@@ -71,7 +59,135 @@ static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin
     return ret;
     return ret;
 }
 }
 
 
+extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
+{
+    if (0==len) return 0;
+    unsigned attempts = 0;
+    size32_t ret = 0;
+    unsigned __int64 startCycles = get_cycles_now();
+    loop
+    {
+        size_t readNow = _read(file, buffer, len);
+        if (readNow == (size32_t)-1)
+        {
+            switch (errno)
+            {
+            case EINTR:
+                readNow = 0;
+                break;
+            default:
+                if (attempts < ioRetryCount)
+                {
+                    attempts++;
+                    StringBuffer callStr("read");
+                    callStr.append("[errno=").append(errno);
+                    unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
+                    callStr.append(", took=").append(elapsedMs);
+                    callStr.append(", attempt=").append(attempts).append("](handle=");
+                    callStr.append(file).append(", len=").append(len).append(")");
+                    PROGLOG("%s", callStr.str());
+                    readNow = 0;
+                    break;
+                }
+                throw MakeErrnoException(errno, "checked_read");
+            }
+        }
+        else if (!readNow)
+            break;
+        ret += readNow;
+        if (readNow == len)
+            break;
+        buffer = ((char *) buffer) + readNow;
+        len -= readNow;
+    }
+    return ret;
+}
+
+#ifdef WIN32
+static bool atomicsupported = true;
+static CriticalSection atomicsection;
+#endif
 
 
+extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
+{
+    if (0==len) return 0;
+#ifdef WIN32
+    if (atomicsupported)
+    {
+        HANDLE hFile = (HANDLE)_get_osfhandle(file);
+        DWORD rread;
+        OVERLAPPED overlapped;
+        memset(&overlapped, 0, sizeof(overlapped));
+        overlapped.Offset = (DWORD) pos;
+        overlapped.OffsetHigh = (DWORD)(pos>>32);
+        if (ReadFile(hFile, buffer, len, &rread, &overlapped))
+            return rread;
+        int err = (int)GetLastError();
+        if (err == ERROR_HANDLE_EOF)
+            return 0;
+        if (err == ERROR_INVALID_PARAMETER) // Win98 etc
+            atomicsupported = false;
+        else
+            throw MakeOsException(GetLastError(), "checked_pread");
+    }
+    {
+        CriticalBlock blk(atomicsection);
+        checked_lseeki64(file, pos, FILE_BEGIN);
+        return checked_read(file, buffer, len);
+    }
+#else
+    size32_t ret = 0;
+    unsigned attempts = 0;
+    unsigned __int64 startCycles = get_cycles_now();
+    loop
+    {
+        size_t readNow = ::pread(file, buffer, len, pos);
+        if (readNow == (size32_t)-1)
+        {
+            switch (errno)
+            {
+            case EINTR:
+                readNow = 0;
+                break;
+            default:
+                if (attempts < ioRetryCount)
+                {
+                    attempts++;
+                    StringBuffer callStr("pread");
+                    callStr.append("[errno=").append(errno);
+                    unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
+                    callStr.append(", took=").append(elapsedMs);
+                    callStr.append(", attempt=").append(attempts).append("](handle=");
+                    callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
+                    PROGLOG("%s", callStr.str());
+                    readNow = 0;
+                    break;
+                }
+                throw MakeErrnoException(errno,"checked_pread");
+            }
+        }
+        else if (!readNow)
+            break;
+        ret += readNow;
+        if (readNow == len)
+            break;
+        pos += readNow;
+        buffer = ((char *) buffer) + readNow;
+        len -= readNow;
+    }
+    return ret;
+#endif
+}
+
+static inline size32_t checked_write( int handle, const void *buffer, size32_t count )
+{
+    int ret=_write(handle,buffer,count);
+    if ((size32_t)ret != count)
+    {
+        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
+    }
+    return (size32_t)ret;
+}
 
 
 class CReadSeq : public CInterface, public IReadSeq
 class CReadSeq : public CInterface, public IReadSeq
 {
 {
@@ -125,7 +241,7 @@ class CReadSeq : public CInterface, public IReadSeq
         if (endpos-nextbufpos<(offset_t)rd)
         if (endpos-nextbufpos<(offset_t)rd)
             rd = (size32_t)(endpos-nextbufpos);
             rd = (size32_t)(endpos-nextbufpos);
         if (rd) 
         if (rd) 
-            rd = atomicRead(fh, buffer+left, rd,nextbufpos);
+            rd = checked_pread(fh, buffer+left, rd, nextbufpos);
         nextbufpos += rd;
         nextbufpos += rd;
         bytesInBuffer = left+rd;
         bytesInBuffer = left+rd;
         ptr = buffer;
         ptr = buffer;
@@ -923,99 +1039,6 @@ unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
     return ret;
     return ret;
 }
 }
 
 
-
-// atomic read write
-#ifdef _WIN32
-
-
-static bool atomicsupported=false;
-static CriticalSection atomicsection;
-
-size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset)
-{
-    if (atomicsupported) {
-        HANDLE hFile = (HANDLE)_get_osfhandle(fildes);
-        DWORD rread;
-        OVERLAPPED overlapped;
-        memset(&overlapped,0,sizeof(overlapped));
-        overlapped.Offset = (DWORD)offset;
-        overlapped.OffsetHigh = (DWORD)(offset>>32);
-        if (ReadFile(hFile,buf,nbyte,&rread,&overlapped)) {
-            return rread;
-        }
-        int err=(int)GetLastError();
-        if (err==ERROR_HANDLE_EOF)
-            return 0;
-        if (err==ERROR_INVALID_PARAMETER) { // Win98 etc
-            atomicsupported = false;
-        }
-        else {
-            IException *e = MakeErrnoException(5, "AtomicRead: ReadFile failed, handle = %d, GetLastError()==%x\n", fildes, err);
-            PrintExceptionLog(e, NULL);
-            throw e;
-        }
-    }
-    {
-        CriticalBlock blk(atomicsection);
-        checked_lseeki64(fildes,offset,FILE_BEGIN);
-        return checked_read(fildes,buf,nbyte);
-    }
-}
-
-size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset)
-{
-    if (atomicsupported) {
-        HANDLE hFile = (HANDLE)_get_osfhandle(fildes);
-        DWORD rwrit;
-        OVERLAPPED overlapped;
-        memset(&overlapped,0,sizeof(overlapped));
-        overlapped.Offset = (DWORD)offset;
-        overlapped.OffsetHigh = (DWORD)(offset>>32);
-        if (WriteFile(hFile,buf,nbyte,&rwrit,&overlapped)) {
-            return rwrit;
-        }
-        int err=(int)GetLastError();
-        if (err==ERROR_INVALID_PARAMETER) { // Win98 etc
-            atomicsupported = false;
-        }
-        else {
-            PrintLog("AtomicWrite: WriteFile failed, handle = %d, GetLastError()==%x\n", fildes, err);
-            errno = 5; // general i/o error
-            return -1;
-        }
-    }
-    {
-        CriticalBlock blk(atomicsection);
-        checked_lseeki64(fildes,offset,FILE_BEGIN);
-        return checked_write(fildes,buf,nbyte);
-    }
-}
-
-#else
-
-
-size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset)
-{
-    if (0==nbyte) return 0;
-    int ret = pread(fildes,buf,nbyte,offset);
-    if (ret==-1) 
-        throw MakeErrnoException("atomicRead");
-    return (size32_t)ret;
-}
-
-size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset)
-{
-    int ret = pwrite(fildes,buf,nbyte,offset);
-    if ((size32_t)ret!=nbyte) 
-        throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "atomicWrite");
-    return (size32_t)ret;
-}
-
-#endif
-
-
-
-
 /////////////////
 /////////////////
 
 
 CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)
 CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)

+ 3 - 5
system/jlib/jio.hpp

@@ -135,11 +135,9 @@ extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delt
 
 
 extern jlib_decl unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize);
 extern jlib_decl unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize);
 
 
-//atomic read/write - used with _open 
-// avoids the need to seek separately
-
-extern jlib_decl size32_t atomicRead(int fildes, void *buf, size32_t nbyte, offset_t offset);
-extern jlib_decl size32_t atomicWrite(int fildes, const void *buf, size32_t nbyte, offset_t offset);
+extern jlib_decl void setIORetryCount(unsigned _ioRetryCount); // default 0 == off, retries if read op. fails
+extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len);
+extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos);
 
 
 class CachedRecordSize
 class CachedRecordSize
 {
 {

+ 2 - 2
system/jlib/jutil.cpp

@@ -2192,10 +2192,10 @@ StringBuffer jlib_decl passwordInput(const char* prompt, StringBuffer& passwd)
     set_term.c_lflag &= ~(ECHO|ECHOE|ECHOK|ECHONL);
     set_term.c_lflag &= ~(ECHO|ECHOE|ECHOK|ECHONL);
     tcsetattr(termfd, TCSAFLUSH, &set_term);
     tcsetattr(termfd, TCSAFLUSH, &set_term);
     char c = EOF;
     char c = EOF;
-    int rd = _read(termfd,&c,1);
+    int rd = ::read(termfd,&c,1);
     while ((rd==1)&&(c!='\r')&&(c!='\n')&&(c!=EOF)) {
     while ((rd==1)&&(c!='\r')&&(c!='\n')&&(c!=EOF)) {
         passwd.append(c);
         passwd.append(c);
-        rd = _read(termfd,&c,1);
+        rd = ::read(termfd,&c,1);
     }
     }
     int err = (rd<0)?errno:0;
     int err = (rd<0)?errno:0;
     tcsetattr(termfd, TCSAFLUSH, &saved_term);
     tcsetattr(termfd, TCSAFLUSH, &saved_term);