jio.cpp 32 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include <algorithm>
  16. #include "jfile.hpp"
  17. #include "jthread.hpp"
  18. #include "jio.ipp"
  19. #include "jlzw.ipp"
  20. #include "jmisc.hpp"
  21. #include <time.h>
  22. #include <limits.h>
  23. #include "jexcept.hpp"
  24. #include "jqueue.tpp"
  25. #ifdef _WIN32
  26. #include <io.h>
  27. #endif
  28. #define DEFAULTBUFFERSIZE 0x10000 // 64K
  29. #define RANDOM_BUFFER_SIZE DEFAULTBUFFERSIZE
  30. #define MAX_RANDOM_CACHE_SIZE 0x10000
  31. #define RANDOM_CACHE_DEPTH 10
  32. #define threshold 1024
  33. #define timelimit 100
  34. #define MINCOMPRESSEDROWSIZE 16
  35. #define MAXCOMPRESSEDROWSIZE 0x4000
  36. static unsigned ioRetryCount=0;
  37. void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
  38. {
  39. ioRetryCount = _ioRetryCount;
  40. PROGLOG("setIORetryCount set to : %d", ioRetryCount);
  41. }
  42. extern jlib_decl offset_t checked_lseeki64( int handle, offset_t offset, int origin )
  43. {
  44. offset_t ret=_lseeki64(handle,offset,origin);
  45. if (ret==(offset_t)-1)
  46. throw MakeErrnoException("checked_lseeki64");
  47. return ret;
  48. }
  49. extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
  50. {
  51. if (0==len) return 0;
  52. unsigned attempts = 0;
  53. size32_t ret = 0;
  54. unsigned __int64 startCycles = get_cycles_now();
  55. loop
  56. {
  57. ssize_t readNow = _read(file, buffer, len);
  58. if (readNow == (ssize_t)-1)
  59. {
  60. switch (errno)
  61. {
  62. case EINTR:
  63. readNow = 0;
  64. break;
  65. default:
  66. if (attempts < ioRetryCount)
  67. {
  68. attempts++;
  69. StringBuffer callStr("read");
  70. callStr.append("[errno=").append(errno);
  71. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  72. callStr.append(", took=").append(elapsedMs);
  73. callStr.append(", attempt=").append(attempts).append("](handle=");
  74. callStr.append(file).append(", len=").append(len).append(")");
  75. PROGLOG("%s", callStr.str());
  76. readNow = 0;
  77. break;
  78. }
  79. throw MakeErrnoException(errno, "checked_read");
  80. }
  81. }
  82. else if (!readNow)
  83. break;
  84. ret += readNow;
  85. if (readNow == len)
  86. break;
  87. buffer = ((char *) buffer) + readNow;
  88. len -= readNow;
  89. }
  90. return ret;
  91. }
  92. #ifdef WIN32
  93. static bool atomicsupported = true;
  94. static CriticalSection atomicsection;
  95. #endif
  96. extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
  97. {
  98. if (0==len) return 0;
  99. #ifdef WIN32
  100. if (atomicsupported)
  101. {
  102. HANDLE hFile = (HANDLE)_get_osfhandle(file);
  103. DWORD rread;
  104. OVERLAPPED overlapped;
  105. memset(&overlapped, 0, sizeof(overlapped));
  106. overlapped.Offset = (DWORD) pos;
  107. overlapped.OffsetHigh = (DWORD)(pos>>32);
  108. if (ReadFile(hFile, buffer, len, &rread, &overlapped))
  109. return rread;
  110. int err = (int)GetLastError();
  111. if (err == ERROR_HANDLE_EOF)
  112. return 0;
  113. if (err == ERROR_INVALID_PARAMETER) // Win98 etc
  114. atomicsupported = false;
  115. else
  116. throw MakeOsException(GetLastError(), "checked_pread");
  117. }
  118. {
  119. CriticalBlock blk(atomicsection);
  120. checked_lseeki64(file, pos, FILE_BEGIN);
  121. return checked_read(file, buffer, len);
  122. }
  123. #else
  124. size32_t ret = 0;
  125. unsigned attempts = 0;
  126. unsigned __int64 startCycles = get_cycles_now();
  127. loop
  128. {
  129. ssize_t readNow = ::pread(file, buffer, len, pos);
  130. if (readNow == (ssize_t)-1)
  131. {
  132. switch (errno)
  133. {
  134. case EINTR:
  135. readNow = 0;
  136. break;
  137. default:
  138. if (attempts < ioRetryCount)
  139. {
  140. attempts++;
  141. StringBuffer callStr("pread");
  142. callStr.append("[errno=").append(errno);
  143. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  144. callStr.append(", took=").append(elapsedMs);
  145. callStr.append(", attempt=").append(attempts).append("](handle=");
  146. callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
  147. PROGLOG("%s", callStr.str());
  148. readNow = 0;
  149. break;
  150. }
  151. throw MakeErrnoException(errno,"checked_pread");
  152. }
  153. }
  154. else if (!readNow)
  155. break;
  156. ret += readNow;
  157. if (readNow == len)
  158. break;
  159. pos += readNow;
  160. buffer = ((char *) buffer) + readNow;
  161. len -= readNow;
  162. }
  163. return ret;
  164. #endif
  165. }
  166. extern jlib_decl size32_t checked_write( int handle, const void *buffer, size32_t count )
  167. {
  168. int ret=_write(handle,buffer,count);
  169. if ((size32_t)ret != count)
  170. {
  171. throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
  172. }
  173. return (size32_t)ret;
  174. }
  175. class CReadSeq : public CInterface, public IReadSeq
  176. {
  177. int fh;
  178. size32_t size;
  179. char *buffer;
  180. char *ptr;
  181. size32_t bufSize;
  182. size32_t bytesInBuffer;
  183. offset_t startpos;
  184. offset_t endpos;
  185. offset_t nextbufpos;
  186. bool compressed;
  187. void *prev;
  188. size32_t maxcompsize;
  189. bool first;
  190. inline unsigned remaining()
  191. {
  192. return (unsigned)(buffer+bytesInBuffer-ptr);
  193. }
  194. size32_t getBytes(void *dst, size32_t _size)
  195. {
  196. size32_t left = remaining();
  197. size32_t read = 0;
  198. while (_size>left) {
  199. if (left) {
  200. memcpy(dst, ptr, left);
  201. dst = (char *)dst + left;
  202. _size -= left;
  203. read += left;
  204. ptr+=left;
  205. }
  206. refill();
  207. left = bytesInBuffer;
  208. if (!left)
  209. return read;
  210. }
  211. memcpy(dst, ptr, _size);
  212. ptr += _size;
  213. read += _size;
  214. return read;
  215. }
  216. void refill()
  217. {
  218. size32_t left = remaining();
  219. memmove(buffer,ptr,left);
  220. size32_t rd=bufSize-left;
  221. if (endpos-nextbufpos<(offset_t)rd)
  222. rd = (size32_t)(endpos-nextbufpos);
  223. if (rd)
  224. rd = checked_pread(fh, buffer+left, rd, nextbufpos);
  225. nextbufpos += rd;
  226. bytesInBuffer = left+rd;
  227. ptr = buffer;
  228. }
  229. public:
  230. IMPLEMENT_IINTERFACE;
  231. CReadSeq(int _fh, offset_t _offset, unsigned maxrecs, size32_t _size, size32_t _bufsize, bool _compressed)
  232. {
  233. assertex(_size);
  234. fh = _fh;
  235. size = _size;
  236. bufSize = (_bufsize==(unsigned) -1)?DEFAULTBUFFERSIZE:_bufsize;
  237. bytesInBuffer = 0;
  238. startpos = _offset;
  239. nextbufpos = _offset;
  240. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  241. if (compressed) {
  242. maxcompsize = size+size/3+3; // migger than needed
  243. buffer = (char *) malloc(bufSize+size);
  244. prev = buffer+bufSize;
  245. }
  246. else
  247. buffer = (char *) malloc(bufSize);
  248. ptr = buffer;
  249. first = true;
  250. endpos = (maxrecs!=(unsigned)-1)?(_offset+(offset_t)maxrecs*(offset_t)_size):I64C(0x7ffffffffff);
  251. }
  252. ~CReadSeq()
  253. {
  254. free(buffer);
  255. }
  256. virtual bool get(void *dst)
  257. {
  258. if (!compressed)
  259. return getBytes(dst, size)==size;
  260. return (getn(dst,1)==1);
  261. }
  262. virtual unsigned getn(void *dst, unsigned n)
  263. {
  264. if (!compressed)
  265. return getBytes(dst, size*n)/size;
  266. byte *d = (byte *)dst;
  267. byte *e = d+(size*n);
  268. byte *p = (byte *)prev;
  269. unsigned ret = 0;
  270. while (d!=e) {
  271. if (first) {
  272. if (getBytes(d, size)!=size)
  273. break;
  274. first = false;
  275. }
  276. else {
  277. if (remaining()<maxcompsize)
  278. refill();
  279. if (remaining()==0)
  280. break;
  281. ptr += DiffExpand(ptr,d,p,size);
  282. }
  283. p = d;
  284. d += size;
  285. ret++;
  286. }
  287. if (ret) // we got at least 1 so copy to prev
  288. memcpy(prev,e-size,size);
  289. return ret;
  290. }
  291. virtual unsigned getRecordSize()
  292. {
  293. return size;
  294. }
  295. virtual void reset()
  296. {
  297. nextbufpos = startpos;
  298. ptr = buffer;
  299. bytesInBuffer = 0;
  300. first = true;
  301. }
  302. virtual void stop()
  303. {
  304. free(buffer); // no one should access after stop
  305. buffer = NULL;
  306. }
  307. };
  308. IReadSeq *createReadSeq(int fh, offset_t _offset, size32_t size, size32_t bufsize, unsigned maxrecs, bool compressed)
  309. {
  310. if (!bufsize) {
  311. IReadSeq *seq=new CUnbufferedReadWriteSeq(fh, _offset, size);
  312. seq->reset(); // not done itself
  313. return seq;
  314. }
  315. return new CReadSeq(fh, _offset, maxrecs, size, bufsize, compressed);
  316. }
  317. //================================================================================================
  318. class CWriteSeq : public CInterface, public IWriteSeq
  319. {
  320. private:
  321. int fh;
  322. size32_t size;
  323. char *buffer;
  324. char *ptr;
  325. size32_t bufSize;
  326. offset_t fpos;
  327. bool compressed;
  328. size32_t maxcompsize;
  329. void *prev;
  330. void *aux;
  331. bool first;
  332. inline size32_t remaining()
  333. {
  334. return (size32_t)(bufSize - (ptr-buffer));
  335. }
  336. void putBytes(const void *src, size32_t _size)
  337. {
  338. fpos += _size;
  339. size32_t left = remaining();
  340. if (_size>left)
  341. {
  342. if (ptr!=buffer) { // don't buffer if entire block
  343. memcpy(ptr, src, left);
  344. ptr += left;
  345. src = (char *)src + left;
  346. _size -= left;
  347. flush();
  348. left = bufSize;
  349. }
  350. while (_size>=bufSize) // write out directly
  351. {
  352. checked_write(fh, src, bufSize); // stick to writing bufSize blocks
  353. src = (char *)src + bufSize;
  354. _size -= bufSize;
  355. }
  356. }
  357. memcpy(ptr, src, _size);
  358. ptr += _size;
  359. }
  360. public:
  361. IMPLEMENT_IINTERFACE;
  362. CWriteSeq(int _fh, size32_t _size, size32_t _bufsize, bool _compressed)
  363. {
  364. assertex(_fh);
  365. assertex(_size);
  366. fh = _fh;
  367. size = _size;
  368. fpos = 0;
  369. if (_bufsize == (unsigned) -1)
  370. _bufsize = DEFAULTBUFFERSIZE;
  371. bufSize = _bufsize;
  372. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  373. if (compressed) {
  374. maxcompsize = size+size/3+3; // bigger than needed
  375. buffer = (char *) malloc(bufSize+size+maxcompsize);
  376. prev = buffer+bufSize;
  377. aux = (char *)prev+size;
  378. }
  379. else
  380. buffer = (char *) malloc(bufSize);
  381. ptr = buffer;
  382. first = true;
  383. }
  384. ~CWriteSeq()
  385. {
  386. free(buffer);
  387. }
  388. void put(const void *src)
  389. {
  390. if (compressed) {
  391. if (first) {
  392. first = false;
  393. memcpy(prev,src,size);
  394. }
  395. else if (remaining()>=maxcompsize) {
  396. size32_t sz = DiffCompress(src,ptr,prev,size);
  397. fpos += sz;
  398. ptr += sz;
  399. return;
  400. }
  401. else {
  402. putBytes(aux, DiffCompress(src,aux,prev,size));
  403. return;
  404. }
  405. }
  406. putBytes(src, size);
  407. }
  408. void putn(const void *src, unsigned numRecs)
  409. {
  410. if (compressed) {
  411. while (numRecs) {
  412. put(src);
  413. src = (byte *)src+size;
  414. numRecs--;
  415. }
  416. }
  417. else
  418. putBytes(src, size*numRecs);
  419. }
  420. void flush()
  421. {
  422. if (ptr != buffer)
  423. {
  424. checked_write(fh, buffer, (size32_t)(ptr-buffer));
  425. ptr = buffer;
  426. }
  427. }
  428. offset_t getPosition()
  429. {
  430. return fpos;
  431. }
  432. virtual size32_t getRecordSize()
  433. {
  434. return size;
  435. }
  436. };
  437. IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize, bool compressed)
  438. {
  439. // Async TBD
  440. if (!bufsize)
  441. return new CUnbufferedReadWriteSeq(fh, 0, size);
  442. else
  443. return new CWriteSeq(fh, size, bufsize,compressed);
  444. }
  445. IWriteSeq *createTeeWriteSeq(IWriteSeq *f1, IWriteSeq *f2)
  446. {
  447. return new CTeeWriteSeq(f1, f2);
  448. }
  449. //===========================================================================================
  450. CUnbufferedReadWriteSeq::CUnbufferedReadWriteSeq(int _fh, offset_t _offset, size32_t _size)
  451. {
  452. fh = _fh;
  453. size = _size;
  454. offset = _offset;
  455. fpos = _offset;
  456. }
  457. void CUnbufferedReadWriteSeq::put(const void *src)
  458. {
  459. checked_write(fh, src, size);
  460. fpos += size;
  461. }
  462. void CUnbufferedReadWriteSeq::putn(const void *src, unsigned n)
  463. {
  464. checked_write(fh, src, size*n);
  465. fpos += size*n;
  466. }
  467. void CUnbufferedReadWriteSeq::flush()
  468. {}
  469. offset_t CUnbufferedReadWriteSeq::getPosition()
  470. {
  471. return fpos;
  472. }
  473. bool CUnbufferedReadWriteSeq::get(void *dst)
  474. {
  475. size32_t toread = size;
  476. while (toread)
  477. {
  478. int read = checked_read(fh, dst, toread);
  479. if (!read)
  480. return false;
  481. toread -= read;
  482. dst = (char *) dst + read;
  483. }
  484. return true;
  485. }
  486. unsigned CUnbufferedReadWriteSeq::getn(void *dst, unsigned n)
  487. {
  488. size32_t toread = size*n;
  489. size32_t totread = 0;
  490. while (toread)
  491. {
  492. int read = checked_read(fh, dst, toread);
  493. if (!read)
  494. break;
  495. toread -= read;
  496. totread += read;
  497. dst = (char *) dst + read;
  498. }
  499. return totread/size;
  500. }
  501. void CUnbufferedReadWriteSeq::reset()
  502. {
  503. checked_lseeki64(fh, offset, SEEK_SET);
  504. fpos = offset;
  505. }
  506. //===========================================================================================
  507. //===========================================================================================
  508. CTeeWriteSeq::CTeeWriteSeq(IWriteSeq *_f1, IWriteSeq *_f2)
  509. {
  510. w1 = _f1;
  511. w1->Link();
  512. w2 = _f2;
  513. w2->Link();
  514. assertex(w1->getRecordSize()==w2->getRecordSize());
  515. }
  516. CTeeWriteSeq::~CTeeWriteSeq()
  517. {
  518. w1->Release();
  519. w2->Release();
  520. }
  521. void CTeeWriteSeq::put(const void *src)
  522. {
  523. w1->put(src);
  524. w2->put(src);
  525. }
  526. void CTeeWriteSeq::putn(const void *src, unsigned n)
  527. {
  528. w1->putn(src, n);
  529. w2->putn(src, n);
  530. }
  531. void CTeeWriteSeq::flush()
  532. {
  533. w1->flush();
  534. w2->flush();
  535. }
  536. size32_t CTeeWriteSeq::getRecordSize()
  537. {
  538. return w1->getRecordSize();
  539. }
  540. offset_t CTeeWriteSeq::getPosition()
  541. {
  542. return w1->getPosition();
  543. }
  544. //==================================================================================================
  545. class CFixedRecordSize: public CInterface, public IRecordSize
  546. {
  547. protected:
  548. size32_t recsize;
  549. public:
  550. IMPLEMENT_IINTERFACE;
  551. CFixedRecordSize(size32_t _recsize) { recsize=_recsize; }
  552. virtual size32_t getRecordSize(const void *)
  553. {
  554. return recsize;
  555. }
  556. size32_t getFixedSize() const
  557. {
  558. return recsize;
  559. }
  560. };
  561. IRecordSize *createFixedRecordSize(size32_t recsize)
  562. {
  563. return new CFixedRecordSize(recsize);
  564. }
  565. class CDeltaRecordSize: public CInterface, public IRecordSize
  566. {
  567. protected:
  568. Owned<IRecordSize> recordSize;
  569. int delta;
  570. public:
  571. CDeltaRecordSize(IRecordSize * _recordSize, int _delta) { recordSize.set(_recordSize); delta = _delta; }
  572. IMPLEMENT_IINTERFACE;
  573. virtual size32_t getRecordSize(const void * data)
  574. {
  575. return recordSize->getRecordSize(data) + delta;
  576. }
  577. size32_t getFixedSize() const
  578. {
  579. return recordSize->getFixedSize()?recordSize->getFixedSize()+delta:0;
  580. }
  581. };
  582. extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta)
  583. {
  584. if (delta == 0)
  585. return LINK(size);
  586. return new CDeltaRecordSize(size, delta);
  587. }
  588. //==================================================================================================
  589. // Elevator scanning
  590. #define MAX_PENDING 20000
  591. class ElevatorScanner;
  592. class PendingFetch : public CInterface, public IInterface
  593. {
  594. public:
  595. IMPLEMENT_IINTERFACE;
  596. static int compare(const void *a, const void *b);
  597. offset_t pos;
  598. IReceiver *receiver;
  599. void *target;
  600. IRecordFetchChannel *channel;
  601. };
  602. class ElevatorChannel : public CInterface, implements IRecordFetchChannel
  603. {
  604. private:
  605. bool cancelled;
  606. bool immediate;
  607. ElevatorScanner &scanner;
  608. public:
  609. IMPLEMENT_IINTERFACE;
  610. ElevatorChannel(ElevatorScanner &, bool);
  611. ~ElevatorChannel();
  612. //Interface IRecordFetchChannel
  613. virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver);
  614. virtual void flush();
  615. virtual void abort() { cancelled = true; }
  616. virtual bool isAborted() { return cancelled; }
  617. virtual bool isImmediate() { return immediate; }
  618. };
  619. class ElevatorScanner : public Thread, public IRecordFetcher
  620. {
  621. private:
  622. Monitor scanlist;
  623. Monitor isRoom;
  624. PendingFetch pending[MAX_PENDING];
  625. unsigned nextSlot;
  626. size32_t recordSize;
  627. int file;
  628. offset_t reads;
  629. unsigned scans;
  630. bool stopped;
  631. unsigned duetime;
  632. void scan();
  633. void doFetch(PendingFetch &);
  634. void stop();
  635. void resetTimer()
  636. {
  637. duetime = msTick()+timelimit;
  638. }
  639. public:
  640. IMPLEMENT_IINTERFACE;
  641. virtual void beforeDispose();
  642. ElevatorScanner(int file, size32_t recordSize);
  643. ~ElevatorScanner();
  644. //Interface IRecordFetcher
  645. virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); }
  646. //Interface Thread
  647. virtual int run();
  648. void flush(IRecordFetchChannel *);
  649. void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *);
  650. };
  651. int PendingFetch::compare(const void *a, const void *b)
  652. {
  653. offset_t aa = ((PendingFetch *) a)->pos;
  654. offset_t bb = ((PendingFetch *) b)->pos;
  655. if (aa > bb)
  656. return 1;
  657. else if (aa == bb)
  658. return 0;
  659. else
  660. return -1;
  661. }
  662. ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner)
  663. {
  664. scanner.Link();
  665. cancelled = false;
  666. immediate = _immediate;
  667. }
  668. ElevatorChannel::~ElevatorChannel()
  669. {
  670. flush();
  671. scanner.Release();
  672. }
  673. void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver)
  674. {
  675. scanner.fetch(fpos, buffer, receiver, this);
  676. }
  677. void ElevatorChannel::flush()
  678. {
  679. scanner.flush(this);
  680. }
  681. ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner")
  682. {
  683. file = _file;
  684. recordSize = _recordSize;
  685. nextSlot = 0;
  686. reads = 0;
  687. scans = 0;
  688. stopped = false;
  689. start();
  690. }
  691. ElevatorScanner::~ElevatorScanner()
  692. {
  693. PrintLog("Elevator scanner statistics: %"I64F"d reads (%"I64F"d bytes), %d scans", reads, reads*recordSize, scans);
  694. }
  695. void ElevatorScanner::beforeDispose()
  696. {
  697. stop();
  698. join();
  699. }
  700. void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel)
  701. {
  702. synchronized procedure(scanlist);
  703. if (channel->isImmediate())
  704. {
  705. // MORE - atomic seek/read would be preferable!
  706. checked_lseeki64(file, fpos, SEEK_SET);
  707. checked_read(file, buffer, recordSize);
  708. reads++;
  709. if (!receiver->takeRecord(fpos))
  710. channel->abort();
  711. return;
  712. }
  713. {
  714. synchronized block(isRoom);
  715. while (nextSlot >= MAX_PENDING)
  716. isRoom.wait();
  717. }
  718. if (!channel->isAborted())
  719. {
  720. pending[nextSlot].pos = fpos;
  721. pending[nextSlot].receiver = receiver;
  722. pending[nextSlot].target = buffer;
  723. pending[nextSlot].channel = channel;
  724. nextSlot++;
  725. resetTimer();
  726. scanlist.notify();
  727. }
  728. }
  729. void ElevatorScanner::doFetch(PendingFetch &next)
  730. {
  731. if (!next.channel->isAborted())
  732. {
  733. // MORE - atomic seek/read would be preferable!
  734. checked_lseeki64(file, next.pos, SEEK_SET);
  735. checked_read(file, next.target, recordSize);
  736. reads++;
  737. if (!next.receiver->takeRecord(next.pos))
  738. next.channel->abort();
  739. }
  740. }
  741. void ElevatorScanner::scan()
  742. {
  743. PrintLog("Starting elevator scan of %d items", nextSlot);
  744. scans++;
  745. qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare);
  746. for (unsigned i = 0; i < nextSlot; i++)
  747. {
  748. doFetch(pending[i]);
  749. }
  750. nextSlot = 0;
  751. {
  752. synchronized block(isRoom);
  753. isRoom.notify();
  754. }
  755. PrintLog("Finished elevator scan");
  756. }
  757. void ElevatorScanner::flush(IRecordFetchChannel *)
  758. {
  759. // MORE - I could just flush what was asked for, but I may as well flush the lot.
  760. synchronized procedure(scanlist);
  761. if (nextSlot)
  762. scan();
  763. }
  764. int ElevatorScanner::run()
  765. {
  766. scanlist.lock();
  767. for (;;)
  768. {
  769. while (nextSlot<MAX_PENDING)
  770. {
  771. if (nextSlot)
  772. {
  773. // if (!connections.length())
  774. // break;
  775. int timeleft = (int)(duetime-msTick());
  776. if (timeleft<=0)
  777. break;
  778. }
  779. if (stopped)
  780. {
  781. scanlist.unlock();
  782. return 0;
  783. }
  784. // MORE - need a timeout on the wait!
  785. scanlist.wait();
  786. }
  787. scan();
  788. }
  789. }
  790. void ElevatorScanner::stop()
  791. {
  792. synchronized procedure(scanlist);
  793. if (!stopped)
  794. {
  795. stopped = true;
  796. scanlist.notify();
  797. }
  798. }
  799. extern jlib_decl IRecordFetcher *createElevatorFetcher(int file, size32_t recSize)
  800. {
  801. return new ElevatorScanner(file, recSize);
  802. }
  803. //==================================================================================================
  804. // chained routines allowing multiple streams to be concatenated
  805. // all streams assumed to have same record size
  806. class CChainedWriteSeq : public CInterface, public IWriteSeq
  807. {
  808. protected:
  809. IWriteSeq *stream;
  810. IWriteSeqAllocator *allocator;
  811. unsigned num;
  812. size32_t recsize;
  813. offset_t pos;
  814. public:
  815. IMPLEMENT_IINTERFACE;
  816. CChainedWriteSeq(IWriteSeqAllocator *_allocator)
  817. {
  818. allocator = _allocator;
  819. allocator->Link();
  820. num = 0;
  821. recsize = 0;
  822. pos = 0;
  823. }
  824. virtual ~CChainedWriteSeq()
  825. {
  826. ::Release(stream);
  827. allocator->Release();
  828. }
  829. void flush() { if (stream) stream->flush(); }
  830. void put(const void *dst) { putn(dst,1); }
  831. void putn(const void *dst, unsigned numrecs)
  832. {
  833. if (numrecs==0) return;
  834. if (stream==NULL) return; // could raise exception instead
  835. byte *out=(byte *)dst;
  836. while (numrecs>num) {
  837. stream->putn(out,num);
  838. pos+=num;
  839. numrecs-=num;
  840. stream->flush();
  841. IWriteSeq *oldstream=stream;
  842. stream = allocator->next(num);
  843. oldstream->Release();
  844. if (!stream) {
  845. return; // could raise exception
  846. }
  847. }
  848. stream->putn(out,numrecs);
  849. pos+=numrecs;
  850. }
  851. virtual size32_t getRecordSize() { if ((recsize==0)&&stream) recsize = stream->getRecordSize(); return recsize; }
  852. virtual offset_t getPosition() { return pos; }
  853. };
  854. class CChainedReadSeq : public CInterface, public IReadSeq
  855. {
  856. protected:
  857. IReadSeq *stream;
  858. IReadSeqAllocator *allocator;
  859. unsigned num;
  860. size32_t recsize;
  861. public:
  862. IMPLEMENT_IINTERFACE;
  863. CChainedReadSeq(IReadSeqAllocator *_allocator)
  864. {
  865. allocator = _allocator;
  866. allocator->Link();
  867. stream = allocator->next();
  868. num = 0;
  869. recsize = 0;
  870. }
  871. virtual ~CChainedReadSeq()
  872. {
  873. ::Release(stream);
  874. allocator->Release();
  875. }
  876. virtual bool get(void *dst) { return (getn(dst,1)==1); }
  877. virtual unsigned getn(void *dst, unsigned n)
  878. {
  879. unsigned done=0;
  880. while (stream&&n) {
  881. unsigned r = stream->getn(dst,n);
  882. if (r==0) {
  883. IReadSeq *oldstream=stream;
  884. stream = allocator->next();
  885. oldstream->Release();
  886. }
  887. else {
  888. n-=r;
  889. done+=r;
  890. }
  891. }
  892. return done;
  893. }
  894. virtual size32_t getRecordSize() { return stream->getRecordSize(); }
  895. virtual void reset() { stream->reset(); }
  896. virtual void stop() { stream->stop(); }
  897. };
  898. unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
  899. {
  900. size32_t recsize=from->getRecordSize();
  901. assertex(recsize==to->getRecordSize());
  902. unsigned nbuf = bufsize/recsize;
  903. if (nbuf==0)
  904. nbuf = 1;
  905. MemoryAttr ma;
  906. byte *buf=(byte *)ma.allocate(nbuf*recsize);
  907. unsigned ret = 0;
  908. loop {
  909. unsigned n = from->getn(buf,nbuf);
  910. if (n==0)
  911. break;
  912. to->putn(buf,n);
  913. ret += n;
  914. }
  915. return ret;
  916. }
  917. /////////////////
  918. CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)
  919. {
  920. bufferSize = _bufferSize;
  921. numInBuffer = 0;
  922. curBufferOffset = 0;
  923. reading = true;
  924. minDirectSize = std::min(bufferSize/4,(size32_t)0x2000); // size where we don't bother copying into the buffer
  925. }
  926. size32_t CBufferedIOStreamBase::doread(size32_t len, void * data)
  927. {
  928. if (!reading)
  929. {
  930. doflush();
  931. reading = true;
  932. }
  933. size32_t sizeGot = readFromBuffer(len, data);
  934. len -= sizeGot;
  935. if (len!=0)
  936. {
  937. data = (char *)data + sizeGot;
  938. if (len >= minDirectSize)
  939. sizeGot += directRead(len, data); // if direct read specified don't loop
  940. else
  941. {
  942. do
  943. {
  944. if (!fillBuffer())
  945. break;
  946. size32_t numRead = readFromBuffer(len, data);
  947. sizeGot += numRead;
  948. len -= numRead;
  949. data = (char *)data + numRead;
  950. } while (len);
  951. }
  952. }
  953. return sizeGot;
  954. }
  955. size32_t CBufferedIOStreamBase::dowrite(size32_t len, const void * data)
  956. {
  957. if (reading)
  958. {
  959. curBufferOffset = 0;
  960. numInBuffer = 0;
  961. reading = false;
  962. }
  963. size32_t ret = len;
  964. while (len) { // tries to write in buffer size chunks, also flushes as soon as possible
  965. size32_t wr;
  966. if (numInBuffer != 0) {
  967. wr = std::min(len,bufferSize-curBufferOffset);
  968. writeToBuffer(wr, data);
  969. if (numInBuffer==bufferSize)
  970. doflush();
  971. len -= wr;
  972. if (len==0)
  973. break;
  974. data = (char *)data + wr;
  975. }
  976. if (len >= minDirectSize)
  977. return directWrite(len, data)+ret-len;
  978. wr = std::min(len,bufferSize);
  979. writeToBuffer(wr, data);
  980. if (numInBuffer==bufferSize)
  981. doflush();
  982. len -= wr;
  983. data = (char *)data + wr;
  984. }
  985. return ret; // there is a bit of an assumption here that flush always works
  986. }
  987. ////////////////////////////
  988. class CBufferedIIOStream : public CBufferedIOStreamBase, implements IIOStream
  989. {
  990. Linked<IIOStream> io;
  991. public:
  992. IMPLEMENT_IINTERFACE;
  993. CBufferedIIOStream(IIOStream *_io, unsigned bufSize) : CBufferedIOStreamBase(bufSize), io(_io)
  994. {
  995. buffer = new byte[bufSize];
  996. }
  997. ~CBufferedIIOStream()
  998. {
  999. try { flush(); }
  1000. catch (IException *)
  1001. {
  1002. delete [] buffer;
  1003. throw;
  1004. }
  1005. delete [] buffer;
  1006. }
  1007. virtual bool fillBuffer()
  1008. {
  1009. reading = true;
  1010. numInBuffer = io->read(bufferSize, buffer);
  1011. curBufferOffset = 0;
  1012. return numInBuffer!=0;
  1013. }
  1014. virtual size32_t directRead(size32_t len, void * data)
  1015. {
  1016. return io->read(len, data);
  1017. }
  1018. virtual size32_t directWrite(size32_t len, const void * data)
  1019. {
  1020. return io->write(len,data);
  1021. }
  1022. virtual void doflush()
  1023. {
  1024. if (!reading && numInBuffer)
  1025. {
  1026. //Copy numInBuffer out before flush so that destructor doesn't attempt to flush again.
  1027. size32_t numToWrite = numInBuffer;
  1028. numInBuffer = 0;
  1029. io->write(numToWrite, buffer);
  1030. curBufferOffset = 0;
  1031. }
  1032. }
  1033. // IIOStream impl.
  1034. virtual size32_t read(size32_t len, void * data) { return CBufferedIOStreamBase::doread(len, data); }
  1035. virtual size32_t write(size32_t len, const void * data) { return CBufferedIOStreamBase::dowrite(len, data); }
  1036. virtual void flush() { doflush(); }
  1037. };
  1038. IIOStream *createBufferedIOStream(IIOStream *io, unsigned bufSize)
  1039. {
  1040. if (bufSize == (unsigned)-1)
  1041. bufSize = DEFAULT_BUFFER_SIZE;
  1042. return new CBufferedIIOStream(io, bufSize);
  1043. }
  1044. IRowStream *createNullRowStream()
  1045. {
  1046. class cNullStream: public CInterface, implements IRowStream
  1047. {
  1048. const void *nextRow() { return NULL; }
  1049. void stop() {}
  1050. public:
  1051. IMPLEMENT_IINTERFACE;
  1052. cNullStream() {}
  1053. };
  1054. return new cNullStream;
  1055. }
  1056. unsigned copyRowStream(IRowStream *in, IRowWriter *out)
  1057. {
  1058. unsigned ret=0;
  1059. loop {
  1060. const void *row = in->nextRow();
  1061. if (!row)
  1062. break;
  1063. ret ++;
  1064. out->putRow(row);
  1065. }
  1066. return ret;
  1067. }
  1068. unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1069. {
  1070. unsigned ret=0;
  1071. loop {
  1072. const void *row = in->nextRow();
  1073. if (!row) {
  1074. row = in->nextRow();
  1075. if (!row)
  1076. break;
  1077. out->putRow(NULL);
  1078. }
  1079. ret ++;
  1080. out->putRow(row);
  1081. }
  1082. return ret;
  1083. }
  1084. unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1085. {
  1086. unsigned ret=0;
  1087. loop {
  1088. const void *row = in->ungroupedNextRow();
  1089. if (!row)
  1090. break;
  1091. ret ++;
  1092. out->putRow(row);
  1093. }
  1094. return ret;
  1095. }
  1096. class CConcatRowStream : public CInterface, public IRowStream
  1097. {
  1098. IArrayOf<IRowStream> oinstreams;
  1099. public:
  1100. unsigned num;
  1101. unsigned idx;
  1102. IRowStream **in;
  1103. bool grouped;
  1104. bool needeog;
  1105. public:
  1106. IMPLEMENT_IINTERFACE;
  1107. CConcatRowStream(unsigned _num, IRowStream **_in,bool _grouped)
  1108. {
  1109. // in streams assumed valid throughout (i.e. not linked)
  1110. num = _num;
  1111. idx = 0;
  1112. assertex(num);
  1113. oinstreams.ensure(num);
  1114. for (unsigned n = 0;n<num;n++)
  1115. oinstreams.append(*LINK(_in[n]));
  1116. in = oinstreams.getArray();
  1117. grouped = _grouped;
  1118. needeog = false;
  1119. }
  1120. const void *nextRow()
  1121. {
  1122. while (idx<num) {
  1123. const void *row;
  1124. if (grouped) {
  1125. row = in[idx]->nextRow();
  1126. if (row) {
  1127. needeog = true;
  1128. return row;
  1129. }
  1130. if (needeog) {
  1131. needeog = false;
  1132. return NULL;
  1133. }
  1134. }
  1135. else {
  1136. row = in[idx]->ungroupedNextRow();
  1137. if (row)
  1138. return row;
  1139. }
  1140. idx++;
  1141. }
  1142. return NULL;
  1143. }
  1144. virtual void stop()
  1145. {
  1146. while (idx<num)
  1147. in[idx++]->stop();
  1148. }
  1149. };
  1150. extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa)
  1151. {
  1152. return new CChainedWriteSeq(iwsa);
  1153. }
  1154. extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa)
  1155. {
  1156. return new CChainedReadSeq(irsa);
  1157. }
  1158. IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped)
  1159. {
  1160. return new CConcatRowStream(numstreams,streams,grouped);
  1161. }
  1162. #ifdef __x86_64__
  1163. void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
  1164. void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
  1165. void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
  1166. #endif