jio.cpp 32 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include <algorithm>
  16. #include "jfile.hpp"
  17. #include "jthread.hpp"
  18. #include "jio.ipp"
  19. #include "jlzw.ipp"
  20. #include "jmisc.hpp"
  21. #include <time.h>
  22. #include <limits.h>
  23. #include "jexcept.hpp"
  24. #include "jqueue.tpp"
  25. #ifdef _WIN32
  26. #include <io.h>
  27. #endif
  28. #define DEFAULTBUFFERSIZE 0x10000 // 64K
  29. #define RANDOM_BUFFER_SIZE DEFAULTBUFFERSIZE
  30. #define MAX_RANDOM_CACHE_SIZE 0x10000
  31. #define RANDOM_CACHE_DEPTH 10
  32. #define threshold 1024
  33. #define timelimit 100
  34. #define MINCOMPRESSEDROWSIZE 16
  35. #define MAXCOMPRESSEDROWSIZE 0x4000
  36. static unsigned ioRetryCount=0;
  37. void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
  38. {
  39. ioRetryCount = _ioRetryCount;
  40. }
  41. static inline offset_t checked_lseeki64( int handle, offset_t offset, int origin )
  42. {
  43. offset_t ret=_lseeki64(handle,offset,origin);
  44. if (ret==(offset_t)-1)
  45. throw MakeErrnoException("checked_lseeki64");
  46. return ret;
  47. }
  48. extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
  49. {
  50. if (0==len) return 0;
  51. unsigned attempts = 0;
  52. size32_t ret = 0;
  53. unsigned __int64 startCycles = get_cycles_now();
  54. loop
  55. {
  56. size_t readNow = _read(file, buffer, len);
  57. if (readNow == (size32_t)-1)
  58. {
  59. switch (errno)
  60. {
  61. case EINTR:
  62. readNow = 0;
  63. break;
  64. default:
  65. if (attempts < ioRetryCount)
  66. {
  67. attempts++;
  68. StringBuffer callStr("read");
  69. callStr.append("[errno=").append(errno);
  70. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  71. callStr.append(", took=").append(elapsedMs);
  72. callStr.append(", attempt=").append(attempts).append("](handle=");
  73. callStr.append(file).append(", len=").append(len).append(")");
  74. PROGLOG("%s", callStr.str());
  75. readNow = 0;
  76. break;
  77. }
  78. throw MakeErrnoException(errno, "checked_read");
  79. }
  80. }
  81. else if (!readNow)
  82. break;
  83. ret += readNow;
  84. if (readNow == len)
  85. break;
  86. buffer = ((char *) buffer) + readNow;
  87. len -= readNow;
  88. }
  89. return ret;
  90. }
  91. #ifdef WIN32
  92. static bool atomicsupported = true;
  93. static CriticalSection atomicsection;
  94. #endif
  95. extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
  96. {
  97. if (0==len) return 0;
  98. #ifdef WIN32
  99. if (atomicsupported)
  100. {
  101. HANDLE hFile = (HANDLE)_get_osfhandle(file);
  102. DWORD rread;
  103. OVERLAPPED overlapped;
  104. memset(&overlapped, 0, sizeof(overlapped));
  105. overlapped.Offset = (DWORD) pos;
  106. overlapped.OffsetHigh = (DWORD)(pos>>32);
  107. if (ReadFile(hFile, buffer, len, &rread, &overlapped))
  108. return rread;
  109. int err = (int)GetLastError();
  110. if (err == ERROR_HANDLE_EOF)
  111. return 0;
  112. if (err == ERROR_INVALID_PARAMETER) // Win98 etc
  113. atomicsupported = false;
  114. else
  115. throw MakeOsException(GetLastError(), "checked_pread");
  116. }
  117. {
  118. CriticalBlock blk(atomicsection);
  119. checked_lseeki64(file, pos, FILE_BEGIN);
  120. return checked_read(file, buffer, len);
  121. }
  122. #else
  123. size32_t ret = 0;
  124. unsigned attempts = 0;
  125. unsigned __int64 startCycles = get_cycles_now();
  126. loop
  127. {
  128. size_t readNow = ::pread(file, buffer, len, pos);
  129. if (readNow == (size32_t)-1)
  130. {
  131. switch (errno)
  132. {
  133. case EINTR:
  134. readNow = 0;
  135. break;
  136. default:
  137. if (attempts < ioRetryCount)
  138. {
  139. attempts++;
  140. StringBuffer callStr("pread");
  141. callStr.append("[errno=").append(errno);
  142. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  143. callStr.append(", took=").append(elapsedMs);
  144. callStr.append(", attempt=").append(attempts).append("](handle=");
  145. callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
  146. PROGLOG("%s", callStr.str());
  147. readNow = 0;
  148. break;
  149. }
  150. throw MakeErrnoException(errno,"checked_pread");
  151. }
  152. }
  153. else if (!readNow)
  154. break;
  155. ret += readNow;
  156. if (readNow == len)
  157. break;
  158. pos += readNow;
  159. buffer = ((char *) buffer) + readNow;
  160. len -= readNow;
  161. }
  162. return ret;
  163. #endif
  164. }
  165. static inline size32_t checked_write( int handle, const void *buffer, size32_t count )
  166. {
  167. int ret=_write(handle,buffer,count);
  168. if ((size32_t)ret != count)
  169. {
  170. throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
  171. }
  172. return (size32_t)ret;
  173. }
  174. class CReadSeq : public CInterface, public IReadSeq
  175. {
  176. int fh;
  177. size32_t size;
  178. char *buffer;
  179. char *ptr;
  180. size32_t bufSize;
  181. size32_t bytesInBuffer;
  182. offset_t startpos;
  183. offset_t endpos;
  184. offset_t nextbufpos;
  185. bool compressed;
  186. void *prev;
  187. size32_t maxcompsize;
  188. bool first;
  189. inline unsigned remaining()
  190. {
  191. return (unsigned)(buffer+bytesInBuffer-ptr);
  192. }
  193. size32_t getBytes(void *dst, size32_t _size)
  194. {
  195. size32_t left = remaining();
  196. size32_t read = 0;
  197. while (_size>left) {
  198. if (left) {
  199. memcpy(dst, ptr, left);
  200. dst = (char *)dst + left;
  201. _size -= left;
  202. read += left;
  203. ptr+=left;
  204. }
  205. refill();
  206. left = bytesInBuffer;
  207. if (!left)
  208. return read;
  209. }
  210. memcpy(dst, ptr, _size);
  211. ptr += _size;
  212. read += _size;
  213. return read;
  214. }
  215. void refill()
  216. {
  217. size32_t left = remaining();
  218. memmove(buffer,ptr,left);
  219. size32_t rd=bufSize-left;
  220. if (endpos-nextbufpos<(offset_t)rd)
  221. rd = (size32_t)(endpos-nextbufpos);
  222. if (rd)
  223. rd = checked_pread(fh, buffer+left, rd, nextbufpos);
  224. nextbufpos += rd;
  225. bytesInBuffer = left+rd;
  226. ptr = buffer;
  227. }
  228. public:
  229. IMPLEMENT_IINTERFACE;
  230. CReadSeq(int _fh, offset_t _offset, unsigned maxrecs, size32_t _size, size32_t _bufsize, bool _compressed)
  231. {
  232. assertex(_size);
  233. fh = _fh;
  234. size = _size;
  235. bufSize = (_bufsize==(unsigned) -1)?DEFAULTBUFFERSIZE:_bufsize;
  236. bytesInBuffer = 0;
  237. startpos = _offset;
  238. nextbufpos = _offset;
  239. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  240. if (compressed) {
  241. maxcompsize = size+size/3+3; // migger than needed
  242. buffer = (char *) malloc(bufSize+size);
  243. prev = buffer+bufSize;
  244. }
  245. else
  246. buffer = (char *) malloc(bufSize);
  247. ptr = buffer;
  248. first = true;
  249. endpos = (maxrecs!=(unsigned)-1)?(_offset+(offset_t)maxrecs*(offset_t)_size):I64C(0x7ffffffffff);
  250. }
  251. ~CReadSeq()
  252. {
  253. free(buffer);
  254. }
  255. virtual bool get(void *dst)
  256. {
  257. if (!compressed)
  258. return getBytes(dst, size)==size;
  259. return (getn(dst,1)==1);
  260. }
  261. virtual unsigned getn(void *dst, unsigned n)
  262. {
  263. if (!compressed)
  264. return getBytes(dst, size*n)/size;
  265. byte *d = (byte *)dst;
  266. byte *e = d+(size*n);
  267. byte *p = (byte *)prev;
  268. unsigned ret = 0;
  269. while (d!=e) {
  270. if (first) {
  271. if (getBytes(d, size)!=size)
  272. break;
  273. first = false;
  274. }
  275. else {
  276. if (remaining()<maxcompsize)
  277. refill();
  278. if (remaining()==0)
  279. break;
  280. ptr += DiffExpand(ptr,d,p,size);
  281. }
  282. p = d;
  283. d += size;
  284. ret++;
  285. }
  286. if (ret) // we got at least 1 so copy to prev
  287. memcpy(prev,e-size,size);
  288. return ret;
  289. }
  290. virtual unsigned getRecordSize()
  291. {
  292. return size;
  293. }
  294. virtual void reset()
  295. {
  296. nextbufpos = startpos;
  297. ptr = buffer;
  298. bytesInBuffer = 0;
  299. first = true;
  300. }
  301. virtual void stop()
  302. {
  303. free(buffer); // no one should access after stop
  304. buffer = NULL;
  305. }
  306. };
  307. IReadSeq *createReadSeq(int fh, offset_t _offset, size32_t size, size32_t bufsize, unsigned maxrecs, bool compressed)
  308. {
  309. if (!bufsize) {
  310. IReadSeq *seq=new CUnbufferedReadWriteSeq(fh, _offset, size);
  311. seq->reset(); // not done itself
  312. return seq;
  313. }
  314. return new CReadSeq(fh, _offset, maxrecs, size, bufsize, compressed);
  315. }
  316. //================================================================================================
  317. class CWriteSeq : public CInterface, public IWriteSeq
  318. {
  319. private:
  320. int fh;
  321. size32_t size;
  322. char *buffer;
  323. char *ptr;
  324. size32_t bufSize;
  325. offset_t fpos;
  326. bool compressed;
  327. size32_t maxcompsize;
  328. void *prev;
  329. void *aux;
  330. bool first;
  331. inline size32_t remaining()
  332. {
  333. return (size32_t)(bufSize - (ptr-buffer));
  334. }
  335. void putBytes(const void *src, size32_t _size)
  336. {
  337. fpos += _size;
  338. size32_t left = remaining();
  339. if (_size>left)
  340. {
  341. if (ptr!=buffer) { // don't buffer if entire block
  342. memcpy(ptr, src, left);
  343. ptr += left;
  344. src = (char *)src + left;
  345. _size -= left;
  346. flush();
  347. left = bufSize;
  348. }
  349. while (_size>=bufSize) // write out directly
  350. {
  351. checked_write(fh, src, bufSize); // stick to writing bufSize blocks
  352. src = (char *)src + bufSize;
  353. _size -= bufSize;
  354. }
  355. }
  356. memcpy(ptr, src, _size);
  357. ptr += _size;
  358. }
  359. public:
  360. IMPLEMENT_IINTERFACE;
  361. CWriteSeq(int _fh, size32_t _size, size32_t _bufsize, bool _compressed)
  362. {
  363. assertex(_fh);
  364. assertex(_size);
  365. fh = _fh;
  366. size = _size;
  367. fpos = 0;
  368. if (_bufsize == (unsigned) -1)
  369. _bufsize = DEFAULTBUFFERSIZE;
  370. bufSize = _bufsize;
  371. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  372. if (compressed) {
  373. maxcompsize = size+size/3+3; // bigger than needed
  374. buffer = (char *) malloc(bufSize+size+maxcompsize);
  375. prev = buffer+bufSize;
  376. aux = (char *)prev+size;
  377. }
  378. else
  379. buffer = (char *) malloc(bufSize);
  380. ptr = buffer;
  381. first = true;
  382. }
  383. ~CWriteSeq()
  384. {
  385. free(buffer);
  386. }
  387. void put(const void *src)
  388. {
  389. if (compressed) {
  390. if (first) {
  391. first = false;
  392. memcpy(prev,src,size);
  393. }
  394. else if (remaining()>=maxcompsize) {
  395. size32_t sz = DiffCompress(src,ptr,prev,size);
  396. fpos += sz;
  397. ptr += sz;
  398. return;
  399. }
  400. else {
  401. putBytes(aux, DiffCompress(src,aux,prev,size));
  402. return;
  403. }
  404. }
  405. putBytes(src, size);
  406. }
  407. void putn(const void *src, unsigned numRecs)
  408. {
  409. if (compressed) {
  410. while (numRecs) {
  411. put(src);
  412. src = (byte *)src+size;
  413. numRecs--;
  414. }
  415. }
  416. else
  417. putBytes(src, size*numRecs);
  418. }
  419. void flush()
  420. {
  421. if (ptr != buffer)
  422. {
  423. checked_write(fh, buffer, (size32_t)(ptr-buffer));
  424. ptr = buffer;
  425. }
  426. }
  427. offset_t getPosition()
  428. {
  429. return fpos;
  430. }
  431. virtual size32_t getRecordSize()
  432. {
  433. return size;
  434. }
  435. };
  436. IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize, bool compressed)
  437. {
  438. // Async TBD
  439. if (!bufsize)
  440. return new CUnbufferedReadWriteSeq(fh, 0, size);
  441. else
  442. return new CWriteSeq(fh, size, bufsize,compressed);
  443. }
  444. IWriteSeq *createTeeWriteSeq(IWriteSeq *f1, IWriteSeq *f2)
  445. {
  446. return new CTeeWriteSeq(f1, f2);
  447. }
  448. //===========================================================================================
  449. CUnbufferedReadWriteSeq::CUnbufferedReadWriteSeq(int _fh, offset_t _offset, size32_t _size)
  450. {
  451. fh = _fh;
  452. size = _size;
  453. offset = _offset;
  454. fpos = _offset;
  455. }
  456. void CUnbufferedReadWriteSeq::put(const void *src)
  457. {
  458. checked_write(fh, src, size);
  459. fpos += size;
  460. }
  461. void CUnbufferedReadWriteSeq::putn(const void *src, unsigned n)
  462. {
  463. checked_write(fh, src, size*n);
  464. fpos += size*n;
  465. }
  466. void CUnbufferedReadWriteSeq::flush()
  467. {}
  468. offset_t CUnbufferedReadWriteSeq::getPosition()
  469. {
  470. return fpos;
  471. }
  472. bool CUnbufferedReadWriteSeq::get(void *dst)
  473. {
  474. size32_t toread = size;
  475. while (toread)
  476. {
  477. int read = checked_read(fh, dst, toread);
  478. if (!read)
  479. return false;
  480. toread -= read;
  481. dst = (char *) dst + read;
  482. }
  483. return true;
  484. }
  485. unsigned CUnbufferedReadWriteSeq::getn(void *dst, unsigned n)
  486. {
  487. size32_t toread = size*n;
  488. size32_t totread = 0;
  489. while (toread)
  490. {
  491. int read = checked_read(fh, dst, toread);
  492. if (!read)
  493. break;
  494. toread -= read;
  495. totread += read;
  496. dst = (char *) dst + read;
  497. }
  498. return totread/size;
  499. }
  500. void CUnbufferedReadWriteSeq::reset()
  501. {
  502. checked_lseeki64(fh, offset, SEEK_SET);
  503. fpos = offset;
  504. }
  505. //===========================================================================================
  506. //===========================================================================================
  507. CTeeWriteSeq::CTeeWriteSeq(IWriteSeq *_f1, IWriteSeq *_f2)
  508. {
  509. w1 = _f1;
  510. w1->Link();
  511. w2 = _f2;
  512. w2->Link();
  513. assertex(w1->getRecordSize()==w2->getRecordSize());
  514. }
  515. CTeeWriteSeq::~CTeeWriteSeq()
  516. {
  517. w1->Release();
  518. w2->Release();
  519. }
  520. void CTeeWriteSeq::put(const void *src)
  521. {
  522. w1->put(src);
  523. w2->put(src);
  524. }
  525. void CTeeWriteSeq::putn(const void *src, unsigned n)
  526. {
  527. w1->putn(src, n);
  528. w2->putn(src, n);
  529. }
  530. void CTeeWriteSeq::flush()
  531. {
  532. w1->flush();
  533. w2->flush();
  534. }
  535. size32_t CTeeWriteSeq::getRecordSize()
  536. {
  537. return w1->getRecordSize();
  538. }
  539. offset_t CTeeWriteSeq::getPosition()
  540. {
  541. return w1->getPosition();
  542. }
  543. //==================================================================================================
  544. class CFixedRecordSize: public CInterface, public IRecordSize
  545. {
  546. protected:
  547. size32_t recsize;
  548. public:
  549. IMPLEMENT_IINTERFACE;
  550. CFixedRecordSize(size32_t _recsize) { recsize=_recsize; }
  551. virtual size32_t getRecordSize(const void *)
  552. {
  553. return recsize;
  554. }
  555. size32_t getFixedSize() const
  556. {
  557. return recsize;
  558. }
  559. };
  560. IRecordSize *createFixedRecordSize(size32_t recsize)
  561. {
  562. return new CFixedRecordSize(recsize);
  563. }
  564. class CDeltaRecordSize: public CInterface, public IRecordSize
  565. {
  566. protected:
  567. Owned<IRecordSize> recordSize;
  568. int delta;
  569. public:
  570. CDeltaRecordSize(IRecordSize * _recordSize, int _delta) { recordSize.set(_recordSize); delta = _delta; }
  571. IMPLEMENT_IINTERFACE;
  572. virtual size32_t getRecordSize(const void * data)
  573. {
  574. return recordSize->getRecordSize(data) + delta;
  575. }
  576. size32_t getFixedSize() const
  577. {
  578. return recordSize->getFixedSize()?recordSize->getFixedSize()+delta:0;
  579. }
  580. };
  581. extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta)
  582. {
  583. if (delta == 0)
  584. return LINK(size);
  585. return new CDeltaRecordSize(size, delta);
  586. }
  587. //==================================================================================================
  588. // Elevator scanning
  589. #define MAX_PENDING 20000
  590. class ElevatorScanner;
  591. class PendingFetch : public CInterface, public IInterface
  592. {
  593. public:
  594. IMPLEMENT_IINTERFACE;
  595. static int compare(const void *a, const void *b);
  596. offset_t pos;
  597. IReceiver *receiver;
  598. void *target;
  599. IRecordFetchChannel *channel;
  600. };
  601. class ElevatorChannel : public CInterface, implements IRecordFetchChannel
  602. {
  603. private:
  604. bool cancelled;
  605. bool immediate;
  606. ElevatorScanner &scanner;
  607. public:
  608. IMPLEMENT_IINTERFACE;
  609. ElevatorChannel(ElevatorScanner &, bool);
  610. ~ElevatorChannel();
  611. //Interface IRecordFetchChannel
  612. virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver);
  613. virtual void flush();
  614. virtual void abort() { cancelled = true; }
  615. virtual bool isAborted() { return cancelled; }
  616. virtual bool isImmediate() { return immediate; }
  617. };
  618. class ElevatorScanner : public Thread, public IRecordFetcher
  619. {
  620. private:
  621. Monitor scanlist;
  622. Monitor isRoom;
  623. PendingFetch pending[MAX_PENDING];
  624. unsigned nextSlot;
  625. size32_t recordSize;
  626. int file;
  627. offset_t reads;
  628. unsigned scans;
  629. bool stopped;
  630. unsigned duetime;
  631. void scan();
  632. void doFetch(PendingFetch &);
  633. void stop();
  634. void resetTimer()
  635. {
  636. duetime = msTick()+timelimit;
  637. }
  638. public:
  639. IMPLEMENT_IINTERFACE;
  640. virtual void beforeDispose();
  641. ElevatorScanner(int file, size32_t recordSize);
  642. ~ElevatorScanner();
  643. //Interface IRecordFetcher
  644. virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); }
  645. //Interface Thread
  646. virtual int run();
  647. void flush(IRecordFetchChannel *);
  648. void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *);
  649. };
  650. int PendingFetch::compare(const void *a, const void *b)
  651. {
  652. offset_t aa = ((PendingFetch *) a)->pos;
  653. offset_t bb = ((PendingFetch *) b)->pos;
  654. if (aa > bb)
  655. return 1;
  656. else if (aa == bb)
  657. return 0;
  658. else
  659. return -1;
  660. }
  661. ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner)
  662. {
  663. scanner.Link();
  664. cancelled = false;
  665. immediate = _immediate;
  666. }
  667. ElevatorChannel::~ElevatorChannel()
  668. {
  669. flush();
  670. scanner.Release();
  671. }
  672. void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver)
  673. {
  674. scanner.fetch(fpos, buffer, receiver, this);
  675. }
  676. void ElevatorChannel::flush()
  677. {
  678. scanner.flush(this);
  679. }
  680. ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner")
  681. {
  682. file = _file;
  683. recordSize = _recordSize;
  684. nextSlot = 0;
  685. reads = 0;
  686. scans = 0;
  687. stopped = false;
  688. start();
  689. }
  690. ElevatorScanner::~ElevatorScanner()
  691. {
  692. PrintLog("Elevator scanner statistics: %"I64F"d reads (%"I64F"d bytes), %d scans", reads, reads*recordSize, scans);
  693. }
  694. void ElevatorScanner::beforeDispose()
  695. {
  696. stop();
  697. join();
  698. }
  699. void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel)
  700. {
  701. synchronized procedure(scanlist);
  702. if (channel->isImmediate())
  703. {
  704. // MORE - atomic seek/read would be preferable!
  705. checked_lseeki64(file, fpos, SEEK_SET);
  706. checked_read(file, buffer, recordSize);
  707. reads++;
  708. if (!receiver->takeRecord(fpos))
  709. channel->abort();
  710. return;
  711. }
  712. {
  713. synchronized block(isRoom);
  714. while (nextSlot >= MAX_PENDING)
  715. isRoom.wait();
  716. }
  717. if (!channel->isAborted())
  718. {
  719. pending[nextSlot].pos = fpos;
  720. pending[nextSlot].receiver = receiver;
  721. pending[nextSlot].target = buffer;
  722. pending[nextSlot].channel = channel;
  723. nextSlot++;
  724. resetTimer();
  725. scanlist.notify();
  726. }
  727. }
  728. void ElevatorScanner::doFetch(PendingFetch &next)
  729. {
  730. if (!next.channel->isAborted())
  731. {
  732. // MORE - atomic seek/read would be preferable!
  733. checked_lseeki64(file, next.pos, SEEK_SET);
  734. checked_read(file, next.target, recordSize);
  735. reads++;
  736. if (!next.receiver->takeRecord(next.pos))
  737. next.channel->abort();
  738. }
  739. }
  740. void ElevatorScanner::scan()
  741. {
  742. PrintLog("Starting elevator scan of %d items", nextSlot);
  743. scans++;
  744. qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare);
  745. for (unsigned i = 0; i < nextSlot; i++)
  746. {
  747. doFetch(pending[i]);
  748. }
  749. nextSlot = 0;
  750. {
  751. synchronized block(isRoom);
  752. isRoom.notify();
  753. }
  754. PrintLog("Finished elevator scan");
  755. }
  756. void ElevatorScanner::flush(IRecordFetchChannel *)
  757. {
  758. // MORE - I could just flush what was asked for, but I may as well flush the lot.
  759. synchronized procedure(scanlist);
  760. if (nextSlot)
  761. scan();
  762. }
  763. int ElevatorScanner::run()
  764. {
  765. scanlist.lock();
  766. for (;;)
  767. {
  768. while (nextSlot<MAX_PENDING)
  769. {
  770. if (nextSlot)
  771. {
  772. // if (!connections.length())
  773. // break;
  774. int timeleft = (int)(duetime-msTick());
  775. if (timeleft<=0)
  776. break;
  777. }
  778. if (stopped)
  779. {
  780. scanlist.unlock();
  781. return 0;
  782. }
  783. // MORE - need a timeout on the wait!
  784. scanlist.wait();
  785. }
  786. scan();
  787. }
  788. }
  789. void ElevatorScanner::stop()
  790. {
  791. synchronized procedure(scanlist);
  792. if (!stopped)
  793. {
  794. stopped = true;
  795. scanlist.notify();
  796. }
  797. }
  798. extern jlib_decl IRecordFetcher *createElevatorFetcher(int file, size32_t recSize)
  799. {
  800. return new ElevatorScanner(file, recSize);
  801. }
  802. //==================================================================================================
  803. // chained routines allowing multiple streams to be concatenated
  804. // all streams assumed to have same record size
  805. class CChainedWriteSeq : public CInterface, public IWriteSeq
  806. {
  807. protected:
  808. IWriteSeq *stream;
  809. IWriteSeqAllocator *allocator;
  810. unsigned num;
  811. size32_t recsize;
  812. offset_t pos;
  813. public:
  814. IMPLEMENT_IINTERFACE;
  815. CChainedWriteSeq(IWriteSeqAllocator *_allocator)
  816. {
  817. allocator = _allocator;
  818. allocator->Link();
  819. num = 0;
  820. recsize = 0;
  821. pos = 0;
  822. }
  823. virtual ~CChainedWriteSeq()
  824. {
  825. ::Release(stream);
  826. allocator->Release();
  827. }
  828. void flush() { if (stream) stream->flush(); }
  829. void put(const void *dst) { putn(dst,1); }
  830. void putn(const void *dst, unsigned numrecs)
  831. {
  832. if (numrecs==0) return;
  833. if (stream==NULL) return; // could raise exception instead
  834. byte *out=(byte *)dst;
  835. while (numrecs>num) {
  836. stream->putn(out,num);
  837. pos+=num;
  838. numrecs-=num;
  839. stream->flush();
  840. IWriteSeq *oldstream=stream;
  841. stream = allocator->next(num);
  842. oldstream->Release();
  843. if (!stream) {
  844. return; // could raise exception
  845. }
  846. }
  847. stream->putn(out,numrecs);
  848. pos+=numrecs;
  849. }
  850. virtual size32_t getRecordSize() { if ((recsize==0)&&stream) recsize = stream->getRecordSize(); return recsize; }
  851. virtual offset_t getPosition() { return pos; }
  852. };
  853. class CChainedReadSeq : public CInterface, public IReadSeq
  854. {
  855. protected:
  856. IReadSeq *stream;
  857. IReadSeqAllocator *allocator;
  858. unsigned num;
  859. size32_t recsize;
  860. public:
  861. IMPLEMENT_IINTERFACE;
  862. CChainedReadSeq(IReadSeqAllocator *_allocator)
  863. {
  864. allocator = _allocator;
  865. allocator->Link();
  866. stream = allocator->next();
  867. num = 0;
  868. recsize = 0;
  869. }
  870. virtual ~CChainedReadSeq()
  871. {
  872. ::Release(stream);
  873. allocator->Release();
  874. }
  875. virtual bool get(void *dst) { return (getn(dst,1)==1); }
  876. virtual unsigned getn(void *dst, unsigned n)
  877. {
  878. unsigned done=0;
  879. while (stream&&n) {
  880. unsigned r = stream->getn(dst,n);
  881. if (r==0) {
  882. IReadSeq *oldstream=stream;
  883. stream = allocator->next();
  884. oldstream->Release();
  885. }
  886. else {
  887. n-=r;
  888. done+=r;
  889. }
  890. }
  891. return done;
  892. }
  893. virtual size32_t getRecordSize() { return stream->getRecordSize(); }
  894. virtual void reset() { stream->reset(); }
  895. virtual void stop() { stream->stop(); }
  896. };
  897. unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
  898. {
  899. size32_t recsize=from->getRecordSize();
  900. assertex(recsize==to->getRecordSize());
  901. unsigned nbuf = bufsize/recsize;
  902. if (nbuf==0)
  903. nbuf = 1;
  904. MemoryAttr ma;
  905. byte *buf=(byte *)ma.allocate(nbuf*recsize);
  906. unsigned ret = 0;
  907. loop {
  908. unsigned n = from->getn(buf,nbuf);
  909. if (n==0)
  910. break;
  911. to->putn(buf,n);
  912. ret += n;
  913. }
  914. return ret;
  915. }
  916. /////////////////
  917. CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)
  918. {
  919. bufferSize = _bufferSize;
  920. numInBuffer = 0;
  921. curBufferOffset = 0;
  922. reading = true;
  923. minDirectSize = std::min(bufferSize/4,(size32_t)0x2000); // size where we don't bother copying into the buffer
  924. }
  925. size32_t CBufferedIOStreamBase::doread(size32_t len, void * data)
  926. {
  927. if (!reading)
  928. {
  929. doflush();
  930. reading = true;
  931. }
  932. size32_t sizeGot = readFromBuffer(len, data);
  933. len -= sizeGot;
  934. if (len!=0)
  935. {
  936. data = (char *)data + sizeGot;
  937. if (len >= minDirectSize)
  938. sizeGot += directRead(len, data); // if direct read specified don't loop
  939. else
  940. {
  941. do
  942. {
  943. if (!fillBuffer())
  944. break;
  945. size32_t numRead = readFromBuffer(len, data);
  946. sizeGot += numRead;
  947. len -= numRead;
  948. data = (char *)data + numRead;
  949. } while (len);
  950. }
  951. }
  952. return sizeGot;
  953. }
  954. size32_t CBufferedIOStreamBase::dowrite(size32_t len, const void * data)
  955. {
  956. if (reading)
  957. {
  958. curBufferOffset = 0;
  959. numInBuffer = 0;
  960. reading = false;
  961. }
  962. size32_t ret = len;
  963. while (len) { // tries to write in buffer size chunks, also flushes as soon as possible
  964. size32_t wr;
  965. if (numInBuffer != 0) {
  966. wr = std::min(len,bufferSize-curBufferOffset);
  967. writeToBuffer(wr, data);
  968. if (numInBuffer==bufferSize)
  969. doflush();
  970. len -= wr;
  971. if (len==0)
  972. break;
  973. data = (char *)data + wr;
  974. }
  975. if (len >= minDirectSize)
  976. return directWrite(len, data)+ret-len;
  977. wr = std::min(len,bufferSize);
  978. writeToBuffer(wr, data);
  979. if (numInBuffer==bufferSize)
  980. doflush();
  981. len -= wr;
  982. data = (char *)data + wr;
  983. }
  984. return ret; // there is a bit of an assumption here that flush always works
  985. }
  986. ////////////////////////////
  987. class CBufferedIIOStream : public CBufferedIOStreamBase, implements IIOStream
  988. {
  989. Linked<IIOStream> io;
  990. public:
  991. IMPLEMENT_IINTERFACE;
  992. CBufferedIIOStream(IIOStream *_io, unsigned bufSize) : CBufferedIOStreamBase(bufSize), io(_io)
  993. {
  994. buffer = new byte[bufSize];
  995. }
  996. ~CBufferedIIOStream()
  997. {
  998. try { flush(); }
  999. catch (IException *)
  1000. {
  1001. delete [] buffer;
  1002. throw;
  1003. }
  1004. delete [] buffer;
  1005. }
  1006. virtual bool fillBuffer()
  1007. {
  1008. reading = true;
  1009. numInBuffer = io->read(bufferSize, buffer);
  1010. curBufferOffset = 0;
  1011. return numInBuffer!=0;
  1012. }
  1013. virtual size32_t directRead(size32_t len, void * data)
  1014. {
  1015. return io->read(len, data);
  1016. }
  1017. virtual size32_t directWrite(size32_t len, const void * data)
  1018. {
  1019. return io->write(len,data);
  1020. }
  1021. virtual void doflush()
  1022. {
  1023. if (!reading && numInBuffer)
  1024. {
  1025. //Copy numInBuffer out before flush so that destructor doesn't attempt to flush again.
  1026. size32_t numToWrite = numInBuffer;
  1027. numInBuffer = 0;
  1028. io->write(numToWrite, buffer);
  1029. curBufferOffset = 0;
  1030. }
  1031. }
  1032. // IIOStream impl.
  1033. virtual size32_t read(size32_t len, void * data) { return CBufferedIOStreamBase::doread(len, data); }
  1034. virtual size32_t write(size32_t len, const void * data) { return CBufferedIOStreamBase::dowrite(len, data); }
  1035. virtual void flush() { doflush(); }
  1036. };
  1037. IIOStream *createBufferedIOStream(IIOStream *io, unsigned bufSize)
  1038. {
  1039. if (bufSize == (unsigned)-1)
  1040. bufSize = DEFAULT_BUFFER_SIZE;
  1041. return new CBufferedIIOStream(io, bufSize);
  1042. }
  1043. IRowStream *createNullRowStream()
  1044. {
  1045. class cNullStream: public CInterface, implements IRowStream
  1046. {
  1047. const void *nextRow() { return NULL; }
  1048. void stop() {}
  1049. public:
  1050. IMPLEMENT_IINTERFACE;
  1051. cNullStream() {}
  1052. };
  1053. return new cNullStream;
  1054. }
  1055. unsigned copyRowStream(IRowStream *in, IRowWriter *out)
  1056. {
  1057. unsigned ret=0;
  1058. loop {
  1059. const void *row = in->nextRow();
  1060. if (!row)
  1061. break;
  1062. ret ++;
  1063. out->putRow(row);
  1064. }
  1065. return ret;
  1066. }
  1067. unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1068. {
  1069. unsigned ret=0;
  1070. loop {
  1071. const void *row = in->nextRow();
  1072. if (!row) {
  1073. row = in->nextRow();
  1074. if (!row)
  1075. break;
  1076. out->putRow(NULL);
  1077. }
  1078. ret ++;
  1079. out->putRow(row);
  1080. }
  1081. return ret;
  1082. }
  1083. unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1084. {
  1085. unsigned ret=0;
  1086. loop {
  1087. const void *row = in->ungroupedNextRow();
  1088. if (!row)
  1089. break;
  1090. ret ++;
  1091. out->putRow(row);
  1092. }
  1093. return ret;
  1094. }
  1095. class CConcatRowStream : public CInterface, public IRowStream
  1096. {
  1097. IArrayOf<IRowStream> oinstreams;
  1098. public:
  1099. unsigned num;
  1100. unsigned idx;
  1101. IRowStream **in;
  1102. bool grouped;
  1103. bool needeog;
  1104. public:
  1105. IMPLEMENT_IINTERFACE;
  1106. CConcatRowStream(unsigned _num, IRowStream **_in,bool _grouped)
  1107. {
  1108. // in streams assumed valid throughout (i.e. not linked)
  1109. num = _num;
  1110. idx = 0;
  1111. assertex(num);
  1112. oinstreams.ensure(num);
  1113. for (unsigned n = 0;n<num;n++)
  1114. oinstreams.append(*LINK(_in[n]));
  1115. in = oinstreams.getArray();
  1116. grouped = _grouped;
  1117. needeog = false;
  1118. }
  1119. const void *nextRow()
  1120. {
  1121. while (idx<num) {
  1122. const void *row;
  1123. if (grouped) {
  1124. row = in[idx]->nextRow();
  1125. if (row) {
  1126. needeog = true;
  1127. return row;
  1128. }
  1129. if (needeog) {
  1130. needeog = false;
  1131. return NULL;
  1132. }
  1133. }
  1134. else {
  1135. row = in[idx]->ungroupedNextRow();
  1136. if (row)
  1137. return row;
  1138. }
  1139. idx++;
  1140. }
  1141. return NULL;
  1142. }
  1143. virtual void stop()
  1144. {
  1145. while (idx<num)
  1146. in[idx++]->stop();
  1147. }
  1148. };
  1149. extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa)
  1150. {
  1151. return new CChainedWriteSeq(iwsa);
  1152. }
  1153. extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa)
  1154. {
  1155. return new CChainedReadSeq(irsa);
  1156. }
  1157. IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped)
  1158. {
  1159. return new CConcatRowStream(numstreams,streams,grouped);
  1160. }
  1161. #ifdef __x86_64__
  1162. void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
  1163. void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
  1164. void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
  1165. #endif