jaio.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <assert.h>
  15. #include <stdio.h>
  16. #include "jaio.hpp"
  17. #ifdef _WIN32
  18. #define ASYNC_READ_TIMEOUT 5000 // milliseconds
  19. #define ASYNC_MAX_TIMEOUTS 10 // avoid 'deadlock' if something goes very wrong
  20. #endif
  21. #if defined(__linux__)
  22. struct aio_result_t
  23. {
  24. int aio_return;
  25. int aio_errno;
  26. aio_result_t *next;
  27. aiocb *cb; // for use by posix style aio.
  28. };
  29. #define AIO_INPROGRESS (-2)
  30. class AsyncRequest
  31. {
  32. public:
  33. aio_result_t result; // must be first
  34. int ready;
  35. char * buffer;
  36. AsyncRequest()
  37. {
  38. result.cb = (aiocb *) calloc(1, sizeof(aiocb));
  39. result.cb->aio_reqprio = 0; // JCSMORE-Solaris man pages say that this should be set to 0 for portability...?
  40. result.cb->aio_sigevent.sigev_signo = SIGUSR1;
  41. result.cb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
  42. }
  43. ~AsyncRequest() { free(result.cb); }
  44. };
  45. AsyncBlockReader::AsyncBlockReader()
  46. {
  47. cur = new AsyncRequest;
  48. next = new AsyncRequest;
  49. eof = 1;
  50. }
  51. AsyncBlockReader::~AsyncBlockReader()
  52. {
  53. finish();
  54. delete cur;
  55. delete next;
  56. }
  57. __declspec (thread) aio_result_t *readyTail = NULL;
  58. __declspec (thread) aio_result_t *ready = NULL;
  59. void aioSigHandler(int sig, siginfo_t *info, void *pContext)
  60. {
  61. aio_result_t *resultp = (aio_result_t *) info->si_value.sival_ptr;
  62. if (aio_error(resultp->cb) == EINPROGRESS)
  63. resultp->aio_return = -1;
  64. else
  65. {
  66. resultp->aio_return = aio_return(resultp->cb);
  67. if (NULL == ready)
  68. ready = readyTail = resultp;
  69. else
  70. {
  71. readyTail->next = resultp;
  72. readyTail = resultp;
  73. }
  74. }
  75. resultp->aio_errno = errno;
  76. }
  77. int aioread64(int fildes, char *bufp, int bufs, offset_t offset, int whence, aio_result_t *resultp)
  78. {
  79. assertex(whence == FILE_BEGIN); // JCSMORE
  80. struct aiocb *cb = resultp->cb;
  81. resultp->cb->aio_fildes = fildes;
  82. resultp->cb->aio_buf = bufp;
  83. resultp->cb->aio_nbytes = bufs;
  84. resultp->cb->aio_offset = offset;
  85. cb->aio_sigevent.sigev_value.sival_ptr = (void *) resultp;
  86. if (-1 == aio_read(cb))
  87. resultp->aio_return = -1;
  88. resultp->aio_errno = errno;
  89. return 0;
  90. };
  91. aio_result_t *aiowait(void *)
  92. {
  93. aio_result_t * r;
  94. while (NULL == ready) // JCSMORE - isn't there a race condition here? As with aiowait()?
  95. pause();
  96. assertex(errno == EINTR);
  97. r = ready;
  98. ready = ready->next;
  99. if (NULL == ready)
  100. readyTail = NULL;
  101. return r;
  102. }
  103. void AsyncBlockReader::waitblk()
  104. {
  105. assertex(next);
  106. AsyncRequest *req;
  107. while(!next->ready) {
  108. req = (AsyncRequest *)aiowait(NULL);
  109. req->ready = 1;
  110. }
  111. req = next;
  112. next = cur;
  113. cur = req;
  114. }
  115. void AsyncBlockReader::enqueue(AsyncRequest *req)
  116. {
  117. if (offset==insize) {
  118. req->result.aio_return = 0;
  119. req->result.aio_errno = 0;
  120. req->ready = 1;
  121. return;
  122. }
  123. assertex(req!=NULL);
  124. assertex(req->ready);
  125. req->result.aio_return = AIO_INPROGRESS;
  126. req->result.aio_errno = 0;
  127. req->ready = 0;
  128. int rd = blksize;
  129. if (insize-offset<blksize)
  130. rd = (int)(insize-offset);
  131. if (aioread64(infile, req->buffer, rd, offset, FILE_BEGIN, &req->result)==-1)
  132. {
  133. throw makeOSException("async failed ");
  134. }
  135. offset+=rd;
  136. }
  137. void AsyncBlockReader::init(int file,offset_t st,size32_t blocksize,void *buf1,void *buf2)
  138. {
  139. struct sigaction action;
  140. sigemptyset(&action.sa_mask);
  141. action.sa_sigaction = aioSigHandler;
  142. action.sa_flags = SA_SIGINFO;
  143. sigaction(SIGUSR1, &action, NULL);
  144. finish();
  145. infile = file;
  146. struct _stat sb;
  147. if(_fstat(infile, &sb)==-1)
  148. assertex(!"Illegal input file");
  149. insize = sb.st_size;
  150. blksize = blocksize;
  151. eof = 0;
  152. cur->buffer = (char *)buf1;
  153. cur->ready = 1;
  154. next->buffer = (char *)buf2;
  155. next->ready = 1;
  156. start = st;
  157. offset = start;
  158. pos = start;
  159. enqueue(next);
  160. }
  161. void *AsyncBlockReader::readnext(size32_t &ret)
  162. {
  163. if(eof) {
  164. ret = 0;
  165. return 0;
  166. }
  167. waitblk();
  168. if(cur->result.aio_return == -1) {
  169. errno = cur->result.aio_errno;
  170. eof = 1;
  171. }
  172. else if(offset==insize) {
  173. eof = 1;
  174. }
  175. else
  176. enqueue(next);
  177. ret = cur->result.aio_return;
  178. pos += ret;
  179. return cur->buffer;
  180. }
  181. void AsyncBlockReader::finish()
  182. {
  183. if (!eof) {
  184. waitblk();
  185. }
  186. }
  187. void AsyncBlockReader::getinfo(offset_t &of,offset_t &p,offset_t &sz)
  188. {
  189. of = start;
  190. p = pos;
  191. sz = insize;
  192. }
  193. #endif
  194. #ifdef _WIN32
  195. class CW32AsyncRequest
  196. {
  197. public:
  198. char * buffer;
  199. CW32AsyncRequest() {buffer = NULL; }
  200. void setBuffer(void * src) {buffer = static_cast <char *> (src); }
  201. };
  202. CW32AsyncBlockReader::CW32AsyncBlockReader()
  203. {
  204. currentRequest = new CW32AsyncRequest();
  205. nextRequest = new CW32AsyncRequest();
  206. eof = true;
  207. pending = false;
  208. overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  209. }
  210. CW32AsyncBlockReader::~CW32AsyncBlockReader()
  211. {
  212. finish();
  213. CloseHandle(overlapped.hEvent);
  214. delete nextRequest;
  215. delete currentRequest;
  216. }
  217. void CW32AsyncBlockReader::finish()
  218. {
  219. // CancelIo(hfile); // not supported in Win95
  220. if(!eof && pending)
  221. {
  222. waitblk();
  223. eof = true;
  224. }
  225. }
  226. void CW32AsyncBlockReader::init(HANDLE file, offset_t _start, size32_t _blockSize, void * buffer1, void * buffer2)
  227. {
  228. hfile = file;
  229. start = _start;
  230. blockSize = _blockSize;
  231. DWORD sizeHi, sizeLo;
  232. sizeLo = GetFileSize(hfile, &sizeHi);
  233. insize.set(sizeLo, sizeHi);
  234. currentRequest->setBuffer(buffer1);
  235. nextRequest->setBuffer(buffer2);
  236. reset();
  237. }
  238. void CW32AsyncBlockReader::reset()
  239. {
  240. finish();
  241. eof = false;
  242. offset = start;
  243. offset.get(overlapped.Offset, overlapped.OffsetHigh);
  244. enqueue();
  245. }
  246. void CW32AsyncBlockReader::enqueue() // reads next buffer
  247. {
  248. if (offset < insize)
  249. {
  250. DWORD bytesRead;
  251. if(!ReadFile(hfile, nextRequest->buffer, blockSize, &bytesRead, &overlapped))
  252. {
  253. switch(GetLastError())
  254. {
  255. case ERROR_IO_PENDING:
  256. pending = true;
  257. break;
  258. default:
  259. pending = false;
  260. eof = true; // effectively
  261. }
  262. }
  263. else // cached operations will usually complete immediately, but "pending" is still true
  264. {
  265. pending = true;
  266. }
  267. }
  268. else
  269. {
  270. pending = false;
  271. eof = true;
  272. }
  273. }
  274. DWORD CW32AsyncBlockReader::waitblk()
  275. {
  276. assertex(pending == true);
  277. DWORD bytesRead;
  278. BOOL _WaitForSingleObject;
  279. int attempts = ASYNC_MAX_TIMEOUTS;
  280. do
  281. {
  282. switch(WaitForSingleObject(overlapped.hEvent, ASYNC_READ_TIMEOUT))
  283. {
  284. case WAIT_OBJECT_0:
  285. pending = false; // pending operation complete
  286. if(GetOverlappedResult(hfile, &overlapped, &bytesRead, FALSE))
  287. {
  288. offset += bytesRead;
  289. offset.get(overlapped.Offset, overlapped.OffsetHigh);
  290. CW32AsyncRequest * r = nextRequest;
  291. nextRequest = currentRequest;
  292. currentRequest = r;
  293. }
  294. else
  295. {
  296. assertex(false);
  297. bytesRead = 0;
  298. eof = true;
  299. }
  300. _WaitForSingleObject = FALSE;
  301. break;
  302. case WAIT_TIMEOUT:
  303. --attempts;
  304. if(!attempts) // ran out of attempts
  305. {
  306. pending = false;
  307. _WaitForSingleObject = FALSE;
  308. bytesRead = 0;
  309. eof = true;
  310. }
  311. else
  312. {
  313. _WaitForSingleObject = TRUE;
  314. }
  315. break;
  316. default:
  317. assertex(false); // overlapped structure probably corrupt
  318. pending = false;
  319. eof = true;
  320. bytesRead = 0;
  321. _WaitForSingleObject = FALSE;
  322. }
  323. }
  324. while (_WaitForSingleObject);
  325. return bytesRead;
  326. }
  327. void * CW32AsyncBlockReader::readnext(size32_t &readLength)
  328. {
  329. if (eof)
  330. {
  331. readLength = 0;
  332. return NULL;
  333. }
  334. readLength = waitblk();
  335. if (!eof)
  336. enqueue();
  337. return currentRequest->buffer;
  338. }
  339. void CW32AsyncBlockReader::getinfo(offset_t &of, offset_t &p, offset_t &sz)
  340. {
  341. of = start;
  342. p = offset.get();
  343. sz = insize.get();
  344. }
  345. #endif