123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include <assert.h>
- #include <stdio.h>
- #include "jaio.hpp"
- #ifdef _WIN32
- #define ASYNC_READ_TIMEOUT 5000 // milliseconds
- #define ASYNC_MAX_TIMEOUTS 10 // avoid 'deadlock' if something goes very wrong
- #endif
- #if defined(__linux__)
- struct aio_result_t
- {
- int aio_return;
- int aio_errno;
- aio_result_t *next;
- aiocb *cb; // for use by posix style aio.
- };
- #define AIO_INPROGRESS (-2)
- class AsyncRequest
- {
- public:
- aio_result_t result; // must be first
- int ready;
- char * buffer;
- AsyncRequest()
- {
- result.cb = (aiocb *) calloc(1, sizeof(aiocb));
- result.cb->aio_reqprio = 0; // JCSMORE-Solaris man pages say that this should be set to 0 for portability...?
- result.cb->aio_sigevent.sigev_signo = SIGUSR1;
- result.cb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
- }
- ~AsyncRequest() { free(result.cb); }
- };
- AsyncBlockReader::AsyncBlockReader()
- {
- cur = new AsyncRequest;
- next = new AsyncRequest;
- eof = 1;
- }
- AsyncBlockReader::~AsyncBlockReader()
- {
- finish();
- delete cur;
- delete next;
- }
- __declspec (thread) aio_result_t *readyTail = NULL;
- __declspec (thread) aio_result_t *ready = NULL;
- void aioSigHandler(int sig, siginfo_t *info, void *pContext)
- {
- aio_result_t *resultp = (aio_result_t *) info->si_value.sival_ptr;
- if (aio_error(resultp->cb) == EINPROGRESS)
- resultp->aio_return = -1;
- else
- {
- resultp->aio_return = aio_return(resultp->cb);
-
- if (NULL == ready)
- ready = readyTail = resultp;
- else
- {
- readyTail->next = resultp;
- readyTail = resultp;
- }
- }
- resultp->aio_errno = errno;
- }
- int aioread64(int fildes, char *bufp, int bufs, offset_t offset, int whence, aio_result_t *resultp)
- {
- assertex(whence == FILE_BEGIN); // JCSMORE
- struct aiocb *cb = resultp->cb;
- resultp->cb->aio_fildes = fildes;
- resultp->cb->aio_buf = bufp;
- resultp->cb->aio_nbytes = bufs;
- resultp->cb->aio_offset = offset;
- cb->aio_sigevent.sigev_value.sival_ptr = (void *) resultp;
- if (-1 == aio_read(cb))
- resultp->aio_return = -1;
- resultp->aio_errno = errno;
- return 0;
- };
- aio_result_t *aiowait(void *)
- {
- aio_result_t * r;
- while (NULL == ready) // JCSMORE - isn't there a race condition here? As with aiowait()?
- pause();
- assertex(errno == EINTR);
- r = ready;
- ready = ready->next;
- if (NULL == ready)
- readyTail = NULL;
- return r;
- }
- void AsyncBlockReader::waitblk()
- {
- assertex(next);
- AsyncRequest *req;
- while(!next->ready) {
- req = (AsyncRequest *)aiowait(NULL);
- req->ready = 1;
- }
- req = next;
- next = cur;
- cur = req;
- }
- void AsyncBlockReader::enqueue(AsyncRequest *req)
- {
- if (offset==insize) {
- req->result.aio_return = 0;
- req->result.aio_errno = 0;
- req->ready = 1;
- return;
- }
- assertex(req!=NULL);
- assertex(req->ready);
- req->result.aio_return = AIO_INPROGRESS;
- req->result.aio_errno = 0;
- req->ready = 0;
- int rd = blksize;
- if (insize-offset<blksize)
- rd = (int)(insize-offset);
- if (aioread64(infile, req->buffer, rd, offset, FILE_BEGIN, &req->result)==-1)
- {
- throw makeOSException("async failed ");
- }
- offset+=rd;
- }
- void AsyncBlockReader::init(int file,offset_t st,size32_t blocksize,void *buf1,void *buf2)
- {
- struct sigaction action;
- sigemptyset(&action.sa_mask);
- action.sa_sigaction = aioSigHandler;
- action.sa_flags = SA_SIGINFO;
- sigaction(SIGUSR1, &action, NULL);
- finish();
- infile = file;
- struct _stat sb;
- if(_fstat(infile, &sb)==-1)
- assertex(!"Illegal input file");
- insize = sb.st_size;
- blksize = blocksize;
- eof = 0;
- cur->buffer = (char *)buf1;
- cur->ready = 1;
- next->buffer = (char *)buf2;
- next->ready = 1;
- start = st;
- offset = start;
- pos = start;
- enqueue(next);
- }
- void *AsyncBlockReader::readnext(size32_t &ret)
- {
- if(eof) {
- ret = 0;
- return 0;
- }
- waitblk();
- if(cur->result.aio_return == -1) {
- errno = cur->result.aio_errno;
- eof = 1;
- }
- else if(offset==insize) {
- eof = 1;
- }
- else
- enqueue(next);
- ret = cur->result.aio_return;
- pos += ret;
- return cur->buffer;
- }
- void AsyncBlockReader::finish()
- {
- if (!eof) {
- waitblk();
- }
- }
- void AsyncBlockReader::getinfo(offset_t &of,offset_t &p,offset_t &sz)
- {
- of = start;
- p = pos;
- sz = insize;
- }
- #endif
- #ifdef _WIN32
- class CW32AsyncRequest
- {
- public:
- char * buffer;
- CW32AsyncRequest() {buffer = NULL; }
- void setBuffer(void * src) {buffer = static_cast <char *> (src); }
- };
- CW32AsyncBlockReader::CW32AsyncBlockReader()
- {
- currentRequest = new CW32AsyncRequest();
- nextRequest = new CW32AsyncRequest();
- eof = true;
- pending = false;
- overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- }
- CW32AsyncBlockReader::~CW32AsyncBlockReader()
- {
- finish();
- CloseHandle(overlapped.hEvent);
- delete nextRequest;
- delete currentRequest;
- }
- void CW32AsyncBlockReader::finish()
- {
- // CancelIo(hfile); // not supported in Win95
- if(!eof && pending)
- {
- waitblk();
- eof = true;
- }
- }
- void CW32AsyncBlockReader::init(HANDLE file, offset_t _start, size32_t _blockSize, void * buffer1, void * buffer2)
- {
- hfile = file;
- start = _start;
- blockSize = _blockSize;
- DWORD sizeHi, sizeLo;
- sizeLo = GetFileSize(hfile, &sizeHi);
- insize.set(sizeLo, sizeHi);
- currentRequest->setBuffer(buffer1);
- nextRequest->setBuffer(buffer2);
- reset();
- }
- void CW32AsyncBlockReader::reset()
- {
- finish();
- eof = false;
- offset = start;
- offset.get(overlapped.Offset, overlapped.OffsetHigh);
- enqueue();
- }
- void CW32AsyncBlockReader::enqueue() // reads next buffer
- {
- if (offset < insize)
- {
- DWORD bytesRead;
- if(!ReadFile(hfile, nextRequest->buffer, blockSize, &bytesRead, &overlapped))
- {
- switch(GetLastError())
- {
- case ERROR_IO_PENDING:
- pending = true;
- break;
- default:
- pending = false;
- eof = true; // effectively
- }
- }
- else // cached operations will usually complete immediately, but "pending" is still true
- {
- pending = true;
- }
- }
- else
- {
- pending = false;
- eof = true;
- }
- }
- DWORD CW32AsyncBlockReader::waitblk()
- {
- assertex(pending == true);
- DWORD bytesRead;
- BOOL _WaitForSingleObject;
- int attempts = ASYNC_MAX_TIMEOUTS;
- do
- {
- switch(WaitForSingleObject(overlapped.hEvent, ASYNC_READ_TIMEOUT))
- {
- case WAIT_OBJECT_0:
- pending = false; // pending operation complete
- if(GetOverlappedResult(hfile, &overlapped, &bytesRead, FALSE))
- {
- offset += bytesRead;
- offset.get(overlapped.Offset, overlapped.OffsetHigh);
- CW32AsyncRequest * r = nextRequest;
- nextRequest = currentRequest;
- currentRequest = r;
- }
- else
- {
- assertex(false);
- bytesRead = 0;
- eof = true;
- }
- _WaitForSingleObject = FALSE;
- break;
- case WAIT_TIMEOUT:
- --attempts;
- if(!attempts) // ran out of attempts
- {
- pending = false;
- _WaitForSingleObject = FALSE;
- bytesRead = 0;
- eof = true;
- }
- else
- {
- _WaitForSingleObject = TRUE;
- }
- break;
- default:
- assertex(false); // overlapped structure probably corrupt
- pending = false;
- eof = true;
- bytesRead = 0;
- _WaitForSingleObject = FALSE;
- }
- }
- while (_WaitForSingleObject);
- return bytesRead;
- }
- void * CW32AsyncBlockReader::readnext(size32_t &readLength)
- {
- if (eof)
- {
- readLength = 0;
- return NULL;
- }
- readLength = waitblk();
- if (!eof)
- enqueue();
- return currentRequest->buffer;
- }
- void CW32AsyncBlockReader::getinfo(offset_t &of, offset_t &p, offset_t &sz)
- {
- of = start;
- p = offset.get();
- sz = insize.get();
- }
- #endif
|