jio.cpp 34 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <algorithm>
  15. #include "jfile.hpp"
  16. #include "jthread.hpp"
  17. #include "jio.ipp"
  18. #include "jlzw.ipp"
  19. #include "jmisc.hpp"
  20. #include <time.h>
  21. #include <limits.h>
  22. #include "jexcept.hpp"
  23. #include "jqueue.tpp"
  24. #ifdef _WIN32
  25. #include <io.h>
  26. #endif
  27. #define DEFAULTBUFFERSIZE 0x10000 // 64K
  28. #define RANDOM_BUFFER_SIZE DEFAULTBUFFERSIZE
  29. #define MAX_RANDOM_CACHE_SIZE 0x10000
  30. #define RANDOM_CACHE_DEPTH 10
  31. #define threshold 1024
  32. #define timelimit 100
  33. #define MINCOMPRESSEDROWSIZE 16
  34. #define MAXCOMPRESSEDROWSIZE 0x4000
  35. static unsigned ioRetryCount=0;
  36. void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
  37. {
  38. ioRetryCount = _ioRetryCount;
  39. PROGLOG("setIORetryCount set to : %d", ioRetryCount);
  40. }
  41. extern jlib_decl offset_t checked_lseeki64( int handle, offset_t offset, int origin )
  42. {
  43. offset_t ret=_lseeki64(handle,offset,origin);
  44. if (ret==(offset_t)-1)
  45. throw makeErrnoException("checked_lseeki64");
  46. return ret;
  47. }
  48. extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
  49. {
  50. if (0==len) return 0;
  51. unsigned attempts = 0;
  52. size32_t ret = 0;
  53. unsigned __int64 startCycles = get_cycles_now();
  54. for (;;)
  55. {
  56. ssize_t readNow = _read(file, buffer, len);
  57. if (readNow == (ssize_t)-1)
  58. {
  59. switch (errno)
  60. {
  61. case EINTR:
  62. readNow = 0;
  63. break;
  64. default:
  65. if (attempts < ioRetryCount)
  66. {
  67. attempts++;
  68. StringBuffer callStr("read");
  69. callStr.append("[errno=").append(errno);
  70. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  71. callStr.append(", took=").append(elapsedMs);
  72. callStr.append(", attempt=").append(attempts).append("](handle=");
  73. callStr.append(file).append(", len=").append(len).append(")");
  74. PROGLOG("%s", callStr.str());
  75. readNow = 0;
  76. break;
  77. }
  78. throw makeErrnoException(errno, "checked_read");
  79. }
  80. }
  81. else if (!readNow)
  82. break;
  83. ret += readNow;
  84. if (readNow == (ssize_t)len)
  85. break;
  86. buffer = ((char *) buffer) + readNow;
  87. len -= readNow;
  88. }
  89. return ret;
  90. }
  91. #ifdef WIN32
  92. static bool atomicsupported = true;
  93. static CriticalSection atomicsection;
  94. #endif
  95. extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
  96. {
  97. if (0==len) return 0;
  98. #ifdef WIN32
  99. if (atomicsupported)
  100. {
  101. HANDLE hFile = (HANDLE)_get_osfhandle(file);
  102. DWORD rread;
  103. OVERLAPPED overlapped;
  104. memset(&overlapped, 0, sizeof(overlapped));
  105. overlapped.Offset = (DWORD) pos;
  106. overlapped.OffsetHigh = (DWORD)(pos>>32);
  107. if (ReadFile(hFile, buffer, len, &rread, &overlapped))
  108. return rread;
  109. int err = (int)GetLastError();
  110. if (err == ERROR_HANDLE_EOF)
  111. return 0;
  112. if (err == ERROR_INVALID_PARAMETER) // Win98 etc
  113. atomicsupported = false;
  114. else
  115. throw makeOsException(GetLastError(), "checked_pread");
  116. }
  117. {
  118. CriticalBlock blk(atomicsection);
  119. checked_lseeki64(file, pos, FILE_BEGIN);
  120. return checked_read(file, buffer, len);
  121. }
  122. #else
  123. size32_t ret = 0;
  124. unsigned attempts = 0;
  125. unsigned __int64 startCycles = get_cycles_now();
  126. for (;;)
  127. {
  128. ssize_t readNow = ::pread(file, buffer, len, pos);
  129. if (readNow == (ssize_t)-1)
  130. {
  131. switch (errno)
  132. {
  133. case EINTR:
  134. readNow = 0;
  135. break;
  136. default:
  137. if (attempts < ioRetryCount)
  138. {
  139. attempts++;
  140. StringBuffer callStr("pread");
  141. callStr.append("[errno=").append(errno);
  142. unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
  143. callStr.append(", took=").append(elapsedMs);
  144. callStr.append(", attempt=").append(attempts).append("](handle=");
  145. callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
  146. PROGLOG("%s", callStr.str());
  147. readNow = 0;
  148. break;
  149. }
  150. throw makeErrnoException(errno, "checked_pread");
  151. }
  152. }
  153. else if (!readNow)
  154. break;
  155. ret += readNow;
  156. if (readNow == (ssize_t)len)
  157. break;
  158. pos += readNow;
  159. buffer = ((char *) buffer) + readNow;
  160. len -= readNow;
  161. }
  162. return ret;
  163. #endif
  164. }
  165. extern jlib_decl size32_t checked_write( int handle, const void *buffer, size32_t count )
  166. {
  167. int ret=_write(handle,buffer,count);
  168. if ((size32_t)ret != count)
  169. {
  170. throw makeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
  171. }
  172. return (size32_t)ret;
  173. }
  174. class CReadSeq : public IReadSeq, public CInterface
  175. {
  176. int fh;
  177. size32_t size;
  178. char *buffer;
  179. char *ptr;
  180. size32_t bufSize;
  181. size32_t bytesInBuffer;
  182. offset_t startpos;
  183. offset_t endpos;
  184. offset_t nextbufpos;
  185. bool compressed;
  186. void *prev;
  187. size32_t maxcompsize;
  188. bool first;
  189. inline unsigned remaining()
  190. {
  191. return (unsigned)(buffer+bytesInBuffer-ptr);
  192. }
  193. size32_t getBytes(void *dst, size32_t _size)
  194. {
  195. size32_t left = remaining();
  196. size32_t read = 0;
  197. while (_size>left) {
  198. if (left) {
  199. memcpy(dst, ptr, left);
  200. dst = (char *)dst + left;
  201. _size -= left;
  202. read += left;
  203. ptr+=left;
  204. }
  205. refill();
  206. left = bytesInBuffer;
  207. if (!left)
  208. return read;
  209. }
  210. memcpy(dst, ptr, _size);
  211. ptr += _size;
  212. read += _size;
  213. return read;
  214. }
  215. void refill()
  216. {
  217. size32_t left = remaining();
  218. memmove(buffer,ptr,left);
  219. size32_t rd=bufSize-left;
  220. if (endpos-nextbufpos<(offset_t)rd)
  221. rd = (size32_t)(endpos-nextbufpos);
  222. if (rd)
  223. rd = checked_pread(fh, buffer+left, rd, nextbufpos);
  224. nextbufpos += rd;
  225. bytesInBuffer = left+rd;
  226. ptr = buffer;
  227. }
  228. public:
  229. IMPLEMENT_IINTERFACE;
  230. CReadSeq(int _fh, offset_t _offset, unsigned maxrecs, size32_t _size, size32_t _bufsize, bool _compressed)
  231. {
  232. assertex(_size);
  233. fh = _fh;
  234. size = _size;
  235. bufSize = (_bufsize==(unsigned) -1)?DEFAULTBUFFERSIZE:_bufsize;
  236. bytesInBuffer = 0;
  237. startpos = _offset;
  238. nextbufpos = _offset;
  239. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  240. if (compressed) {
  241. maxcompsize = size+size/3+3; // migger than needed
  242. buffer = (char *) malloc(bufSize+size);
  243. prev = buffer+bufSize;
  244. }
  245. else
  246. buffer = (char *) malloc(bufSize);
  247. ptr = buffer;
  248. first = true;
  249. endpos = (maxrecs!=(unsigned)-1)?(_offset+(offset_t)maxrecs*(offset_t)_size):I64C(0x7ffffffffff);
  250. }
  251. ~CReadSeq()
  252. {
  253. free(buffer);
  254. }
  255. virtual bool get(void *dst)
  256. {
  257. if (!compressed)
  258. return getBytes(dst, size)==size;
  259. return (getn(dst,1)==1);
  260. }
  261. virtual unsigned getn(void *dst, unsigned n)
  262. {
  263. if (!compressed)
  264. return getBytes(dst, size*n)/size;
  265. byte *d = (byte *)dst;
  266. byte *e = d+(size*n);
  267. byte *p = (byte *)prev;
  268. unsigned ret = 0;
  269. while (d!=e) {
  270. if (first) {
  271. if (getBytes(d, size)!=size)
  272. break;
  273. first = false;
  274. }
  275. else {
  276. if (remaining()<maxcompsize)
  277. refill();
  278. if (remaining()==0)
  279. break;
  280. ptr += DiffExpand(ptr,d,p,size);
  281. }
  282. p = d;
  283. d += size;
  284. ret++;
  285. }
  286. if (ret) // we got at least 1 so copy to prev
  287. memcpy(prev,e-size,size);
  288. return ret;
  289. }
  290. virtual unsigned getRecordSize()
  291. {
  292. return size;
  293. }
  294. virtual void reset()
  295. {
  296. nextbufpos = startpos;
  297. ptr = buffer;
  298. bytesInBuffer = 0;
  299. first = true;
  300. }
  301. virtual void stop()
  302. {
  303. free(buffer); // no one should access after stop
  304. buffer = NULL;
  305. }
  306. };
  307. IReadSeq *createReadSeq(int fh, offset_t _offset, size32_t size, size32_t bufsize, unsigned maxrecs, bool compressed)
  308. {
  309. if (!bufsize) {
  310. IReadSeq *seq=new CUnbufferedReadWriteSeq(fh, _offset, size);
  311. seq->reset(); // not done itself
  312. return seq;
  313. }
  314. return new CReadSeq(fh, _offset, maxrecs, size, bufsize, compressed);
  315. }
  316. //================================================================================================
  317. class CWriteSeq : public IWriteSeq, public CInterface
  318. {
  319. private:
  320. int fh;
  321. size32_t size;
  322. char *buffer;
  323. char *ptr;
  324. size32_t bufSize;
  325. offset_t fpos;
  326. bool compressed;
  327. size32_t maxcompsize;
  328. void *prev;
  329. void *aux;
  330. bool first;
  331. inline size32_t remaining()
  332. {
  333. return (size32_t)(bufSize - (ptr-buffer));
  334. }
  335. void putBytes(const void *src, size32_t _size)
  336. {
  337. fpos += _size;
  338. size32_t left = remaining();
  339. if (_size>left)
  340. {
  341. if (ptr!=buffer) { // don't buffer if entire block
  342. memcpy(ptr, src, left);
  343. ptr += left;
  344. src = (char *)src + left;
  345. _size -= left;
  346. flush();
  347. left = bufSize;
  348. }
  349. while (_size>=bufSize) // write out directly
  350. {
  351. checked_write(fh, src, bufSize); // stick to writing bufSize blocks
  352. src = (char *)src + bufSize;
  353. _size -= bufSize;
  354. }
  355. }
  356. memcpy(ptr, src, _size);
  357. ptr += _size;
  358. }
  359. public:
  360. IMPLEMENT_IINTERFACE;
  361. CWriteSeq(int _fh, size32_t _size, size32_t _bufsize, bool _compressed)
  362. {
  363. assertex(_fh);
  364. assertex(_size);
  365. fh = _fh;
  366. size = _size;
  367. fpos = 0;
  368. if (_bufsize == (unsigned) -1)
  369. _bufsize = DEFAULTBUFFERSIZE;
  370. bufSize = _bufsize;
  371. compressed = ((size<bufSize/2)&&(size>=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
  372. if (compressed) {
  373. maxcompsize = size+size/3+3; // bigger than needed
  374. buffer = (char *) malloc(bufSize+size+maxcompsize);
  375. prev = buffer+bufSize;
  376. aux = (char *)prev+size;
  377. }
  378. else
  379. buffer = (char *) malloc(bufSize);
  380. ptr = buffer;
  381. first = true;
  382. }
  383. ~CWriteSeq()
  384. {
  385. free(buffer);
  386. }
  387. void put(const void *src)
  388. {
  389. if (compressed) {
  390. if (first) {
  391. first = false;
  392. memcpy(prev,src,size);
  393. }
  394. else if (remaining()>=maxcompsize) {
  395. size32_t sz = DiffCompress(src,ptr,prev,size);
  396. fpos += sz;
  397. ptr += sz;
  398. return;
  399. }
  400. else {
  401. putBytes(aux, DiffCompress(src,aux,prev,size));
  402. return;
  403. }
  404. }
  405. putBytes(src, size);
  406. }
  407. void putn(const void *src, unsigned numRecs)
  408. {
  409. if (compressed) {
  410. while (numRecs) {
  411. put(src);
  412. src = (byte *)src+size;
  413. numRecs--;
  414. }
  415. }
  416. else
  417. putBytes(src, size*numRecs);
  418. }
  419. void flush()
  420. {
  421. if (ptr != buffer)
  422. {
  423. checked_write(fh, buffer, (size32_t)(ptr-buffer));
  424. ptr = buffer;
  425. }
  426. }
  427. offset_t getPosition()
  428. {
  429. return fpos;
  430. }
  431. virtual size32_t getRecordSize()
  432. {
  433. return size;
  434. }
  435. };
  436. IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize, bool compressed)
  437. {
  438. // Async TBD
  439. if (!bufsize)
  440. return new CUnbufferedReadWriteSeq(fh, 0, size);
  441. else
  442. return new CWriteSeq(fh, size, bufsize,compressed);
  443. }
  444. IWriteSeq *createTeeWriteSeq(IWriteSeq *f1, IWriteSeq *f2)
  445. {
  446. return new CTeeWriteSeq(f1, f2);
  447. }
  448. //===========================================================================================
  449. CUnbufferedReadWriteSeq::CUnbufferedReadWriteSeq(int _fh, offset_t _offset, size32_t _size)
  450. {
  451. fh = _fh;
  452. size = _size;
  453. offset = _offset;
  454. fpos = _offset;
  455. }
  456. void CUnbufferedReadWriteSeq::put(const void *src)
  457. {
  458. checked_write(fh, src, size);
  459. fpos += size;
  460. }
  461. void CUnbufferedReadWriteSeq::putn(const void *src, unsigned n)
  462. {
  463. checked_write(fh, src, size*n);
  464. fpos += size*n;
  465. }
  466. void CUnbufferedReadWriteSeq::flush()
  467. {}
  468. offset_t CUnbufferedReadWriteSeq::getPosition()
  469. {
  470. return fpos;
  471. }
  472. bool CUnbufferedReadWriteSeq::get(void *dst)
  473. {
  474. size32_t toread = size;
  475. while (toread)
  476. {
  477. int read = checked_read(fh, dst, toread);
  478. if (!read)
  479. return false;
  480. toread -= read;
  481. dst = (char *) dst + read;
  482. }
  483. return true;
  484. }
  485. unsigned CUnbufferedReadWriteSeq::getn(void *dst, unsigned n)
  486. {
  487. size32_t toread = size*n;
  488. size32_t totread = 0;
  489. while (toread)
  490. {
  491. int read = checked_read(fh, dst, toread);
  492. if (!read)
  493. break;
  494. toread -= read;
  495. totread += read;
  496. dst = (char *) dst + read;
  497. }
  498. return totread/size;
  499. }
  500. void CUnbufferedReadWriteSeq::reset()
  501. {
  502. checked_lseeki64(fh, offset, SEEK_SET);
  503. fpos = offset;
  504. }
  505. //===========================================================================================
  506. //===========================================================================================
  507. CTeeWriteSeq::CTeeWriteSeq(IWriteSeq *_f1, IWriteSeq *_f2)
  508. {
  509. w1 = _f1;
  510. w1->Link();
  511. w2 = _f2;
  512. w2->Link();
  513. assertex(w1->getRecordSize()==w2->getRecordSize());
  514. }
  515. CTeeWriteSeq::~CTeeWriteSeq()
  516. {
  517. w1->Release();
  518. w2->Release();
  519. }
  520. void CTeeWriteSeq::put(const void *src)
  521. {
  522. w1->put(src);
  523. w2->put(src);
  524. }
  525. void CTeeWriteSeq::putn(const void *src, unsigned n)
  526. {
  527. w1->putn(src, n);
  528. w2->putn(src, n);
  529. }
  530. void CTeeWriteSeq::flush()
  531. {
  532. w1->flush();
  533. w2->flush();
  534. }
  535. size32_t CTeeWriteSeq::getRecordSize()
  536. {
  537. return w1->getRecordSize();
  538. }
  539. offset_t CTeeWriteSeq::getPosition()
  540. {
  541. return w1->getPosition();
  542. }
  543. //==================================================================================================
  544. class CFixedRecordSize: public IRecordSize, public CInterface
  545. {
  546. protected:
  547. size32_t recsize;
  548. public:
  549. IMPLEMENT_IINTERFACE;
  550. CFixedRecordSize(size32_t _recsize) { recsize=_recsize; }
  551. virtual size32_t getRecordSize(const void *)
  552. {
  553. return recsize;
  554. }
  555. virtual size32_t getFixedSize() const
  556. {
  557. return recsize;
  558. }
  559. virtual size32_t getMinRecordSize() const
  560. {
  561. return recsize;
  562. }
  563. };
  564. IRecordSize *createFixedRecordSize(size32_t recsize)
  565. {
  566. return new CFixedRecordSize(recsize);
  567. }
  568. class CDeltaRecordSize: public IRecordSize, public CInterface
  569. {
  570. protected:
  571. Owned<IRecordSize> recordSize;
  572. int delta;
  573. public:
  574. CDeltaRecordSize(IRecordSize * _recordSize, int _delta) { recordSize.set(_recordSize); delta = _delta; }
  575. IMPLEMENT_IINTERFACE;
  576. virtual size32_t getRecordSize(const void * data)
  577. {
  578. return recordSize->getRecordSize(data) + delta;
  579. }
  580. virtual size32_t getFixedSize() const
  581. {
  582. return recordSize->getFixedSize()?recordSize->getFixedSize()+delta:0;
  583. }
  584. virtual size32_t getMinRecordSize() const
  585. {
  586. return recordSize->getMinRecordSize() + delta;
  587. }
  588. };
  589. extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta)
  590. {
  591. if (delta == 0)
  592. return LINK(size);
  593. return new CDeltaRecordSize(size, delta);
  594. }
  595. //==================================================================================================
  596. // Elevator scanning
  597. #define MAX_PENDING 20000
  598. class ElevatorScanner;
  599. class PendingFetch : public IInterface, public CInterface
  600. {
  601. public:
  602. IMPLEMENT_IINTERFACE;
  603. static int compare(const void *a, const void *b);
  604. offset_t pos;
  605. IReceiver *receiver;
  606. void *target;
  607. IRecordFetchChannel *channel;
  608. };
  609. class ElevatorChannel : implements IRecordFetchChannel, public CInterface
  610. {
  611. private:
  612. bool cancelled;
  613. bool immediate;
  614. ElevatorScanner &scanner;
  615. public:
  616. IMPLEMENT_IINTERFACE;
  617. ElevatorChannel(ElevatorScanner &, bool);
  618. ~ElevatorChannel();
  619. //Interface IRecordFetchChannel
  620. virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver);
  621. virtual void flush();
  622. virtual void abort() { cancelled = true; }
  623. virtual bool isAborted() { return cancelled; }
  624. virtual bool isImmediate() { return immediate; }
  625. };
  626. class ElevatorScanner : public Thread, public IRecordFetcher
  627. {
  628. private:
  629. Monitor scanlist;
  630. Monitor isRoom;
  631. PendingFetch pending[MAX_PENDING];
  632. unsigned nextSlot;
  633. size32_t recordSize;
  634. int file;
  635. offset_t reads;
  636. unsigned scans;
  637. bool stopped;
  638. unsigned duetime;
  639. void scan();
  640. void doFetch(PendingFetch &);
  641. void stop();
  642. void resetTimer()
  643. {
  644. duetime = msTick()+timelimit;
  645. }
  646. public:
  647. IMPLEMENT_IINTERFACE;
  648. virtual void beforeDispose();
  649. ElevatorScanner(int file, size32_t recordSize);
  650. ~ElevatorScanner();
  651. //Interface IRecordFetcher
  652. virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); }
  653. //Interface Thread
  654. virtual int run();
  655. void flush(IRecordFetchChannel *);
  656. void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *);
  657. };
  658. int PendingFetch::compare(const void *a, const void *b)
  659. {
  660. offset_t aa = ((PendingFetch *) a)->pos;
  661. offset_t bb = ((PendingFetch *) b)->pos;
  662. if (aa > bb)
  663. return 1;
  664. else if (aa == bb)
  665. return 0;
  666. else
  667. return -1;
  668. }
  669. ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner)
  670. {
  671. scanner.Link();
  672. cancelled = false;
  673. immediate = _immediate;
  674. }
  675. ElevatorChannel::~ElevatorChannel()
  676. {
  677. flush();
  678. scanner.Release();
  679. }
  680. void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver)
  681. {
  682. scanner.fetch(fpos, buffer, receiver, this);
  683. }
  684. void ElevatorChannel::flush()
  685. {
  686. scanner.flush(this);
  687. }
  688. ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner")
  689. {
  690. file = _file;
  691. recordSize = _recordSize;
  692. nextSlot = 0;
  693. reads = 0;
  694. scans = 0;
  695. stopped = false;
  696. start();
  697. }
  698. ElevatorScanner::~ElevatorScanner()
  699. {
  700. PrintLog("Elevator scanner statistics: %" I64F "d reads (%" I64F "d bytes), %d scans", reads, reads*recordSize, scans);
  701. }
  702. void ElevatorScanner::beforeDispose()
  703. {
  704. stop();
  705. join();
  706. }
  707. void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel)
  708. {
  709. synchronized procedure(scanlist);
  710. if (channel->isImmediate())
  711. {
  712. // MORE - atomic seek/read would be preferable!
  713. checked_lseeki64(file, fpos, SEEK_SET);
  714. checked_read(file, buffer, recordSize);
  715. reads++;
  716. if (!receiver->takeRecord(fpos))
  717. channel->abort();
  718. return;
  719. }
  720. {
  721. synchronized block(isRoom);
  722. while (nextSlot >= MAX_PENDING)
  723. isRoom.wait();
  724. }
  725. if (!channel->isAborted())
  726. {
  727. pending[nextSlot].pos = fpos;
  728. pending[nextSlot].receiver = receiver;
  729. pending[nextSlot].target = buffer;
  730. pending[nextSlot].channel = channel;
  731. nextSlot++;
  732. resetTimer();
  733. scanlist.notify();
  734. }
  735. }
  736. void ElevatorScanner::doFetch(PendingFetch &next)
  737. {
  738. if (!next.channel->isAborted())
  739. {
  740. // MORE - atomic seek/read would be preferable!
  741. checked_lseeki64(file, next.pos, SEEK_SET);
  742. checked_read(file, next.target, recordSize);
  743. reads++;
  744. if (!next.receiver->takeRecord(next.pos))
  745. next.channel->abort();
  746. }
  747. }
  748. void ElevatorScanner::scan()
  749. {
  750. PrintLog("Starting elevator scan of %d items", nextSlot);
  751. scans++;
  752. qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare);
  753. for (unsigned i = 0; i < nextSlot; i++)
  754. {
  755. doFetch(pending[i]);
  756. }
  757. nextSlot = 0;
  758. {
  759. synchronized block(isRoom);
  760. isRoom.notify();
  761. }
  762. PrintLog("Finished elevator scan");
  763. }
  764. void ElevatorScanner::flush(IRecordFetchChannel *)
  765. {
  766. // MORE - I could just flush what was asked for, but I may as well flush the lot.
  767. synchronized procedure(scanlist);
  768. if (nextSlot)
  769. scan();
  770. }
  771. int ElevatorScanner::run()
  772. {
  773. scanlist.lock();
  774. for (;;)
  775. {
  776. while (nextSlot<MAX_PENDING)
  777. {
  778. if (nextSlot)
  779. {
  780. // if (!connections.length())
  781. // break;
  782. int timeleft = (int)(duetime-msTick());
  783. if (timeleft<=0)
  784. break;
  785. }
  786. if (stopped)
  787. {
  788. scanlist.unlock();
  789. return 0;
  790. }
  791. // MORE - need a timeout on the wait!
  792. scanlist.wait();
  793. }
  794. scan();
  795. }
  796. }
  797. void ElevatorScanner::stop()
  798. {
  799. synchronized procedure(scanlist);
  800. if (!stopped)
  801. {
  802. stopped = true;
  803. scanlist.notify();
  804. }
  805. }
  806. extern jlib_decl IRecordFetcher *createElevatorFetcher(int file, size32_t recSize)
  807. {
  808. return new ElevatorScanner(file, recSize);
  809. }
  810. //==================================================================================================
  811. // chained routines allowing multiple streams to be concatenated
  812. // all streams assumed to have same record size
  813. class CChainedWriteSeq : public IWriteSeq, public CInterface
  814. {
  815. protected:
  816. IWriteSeq *stream;
  817. IWriteSeqAllocator *allocator;
  818. unsigned num;
  819. size32_t recsize;
  820. offset_t pos;
  821. public:
  822. IMPLEMENT_IINTERFACE;
  823. CChainedWriteSeq(IWriteSeqAllocator *_allocator)
  824. {
  825. allocator = _allocator;
  826. allocator->Link();
  827. num = 0;
  828. recsize = 0;
  829. pos = 0;
  830. }
  831. virtual ~CChainedWriteSeq()
  832. {
  833. ::Release(stream);
  834. allocator->Release();
  835. }
  836. void flush() { if (stream) stream->flush(); }
  837. void put(const void *dst) { putn(dst,1); }
  838. void putn(const void *dst, unsigned numrecs)
  839. {
  840. if (numrecs==0) return;
  841. if (stream==NULL) return; // could raise exception instead
  842. byte *out=(byte *)dst;
  843. while (numrecs>num) {
  844. stream->putn(out,num);
  845. pos+=num;
  846. numrecs-=num;
  847. stream->flush();
  848. IWriteSeq *oldstream=stream;
  849. stream = allocator->next(num);
  850. oldstream->Release();
  851. if (!stream) {
  852. return; // could raise exception
  853. }
  854. }
  855. stream->putn(out,numrecs);
  856. pos+=numrecs;
  857. }
  858. virtual size32_t getRecordSize() { if ((recsize==0)&&stream) recsize = stream->getRecordSize(); return recsize; }
  859. virtual offset_t getPosition() { return pos; }
  860. };
  861. class CChainedReadSeq : public IReadSeq, public CInterface
  862. {
  863. protected:
  864. IReadSeq *stream;
  865. IReadSeqAllocator *allocator;
  866. unsigned num;
  867. size32_t recsize;
  868. public:
  869. IMPLEMENT_IINTERFACE;
  870. CChainedReadSeq(IReadSeqAllocator *_allocator)
  871. {
  872. allocator = _allocator;
  873. allocator->Link();
  874. stream = allocator->next();
  875. num = 0;
  876. recsize = 0;
  877. }
  878. virtual ~CChainedReadSeq()
  879. {
  880. ::Release(stream);
  881. allocator->Release();
  882. }
  883. virtual bool get(void *dst) { return (getn(dst,1)==1); }
  884. virtual unsigned getn(void *dst, unsigned n)
  885. {
  886. unsigned done=0;
  887. while (stream&&n) {
  888. unsigned r = stream->getn(dst,n);
  889. if (r==0) {
  890. IReadSeq *oldstream=stream;
  891. stream = allocator->next();
  892. oldstream->Release();
  893. }
  894. else {
  895. n-=r;
  896. done+=r;
  897. }
  898. }
  899. return done;
  900. }
  901. virtual size32_t getRecordSize() { return stream->getRecordSize(); }
  902. virtual void reset() { stream->reset(); }
  903. virtual void stop() { stream->stop(); }
  904. };
  905. unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
  906. {
  907. size32_t recsize=from->getRecordSize();
  908. assertex(recsize==to->getRecordSize());
  909. unsigned nbuf = bufsize/recsize;
  910. if (nbuf==0)
  911. nbuf = 1;
  912. MemoryAttr ma;
  913. byte *buf=(byte *)ma.allocate(nbuf*recsize);
  914. unsigned ret = 0;
  915. for (;;) {
  916. unsigned n = from->getn(buf,nbuf);
  917. if (n==0)
  918. break;
  919. to->putn(buf,n);
  920. ret += n;
  921. }
  922. return ret;
  923. }
  924. /////////////////
  925. CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)
  926. {
  927. bufferSize = _bufferSize;
  928. numInBuffer = 0;
  929. curBufferOffset = 0;
  930. reading = true;
  931. minDirectSize = std::min(bufferSize/4,(size32_t)0x2000); // size where we don't bother copying into the buffer
  932. }
  933. size32_t CBufferedIOStreamBase::doread(size32_t len, void * data)
  934. {
  935. if (!reading)
  936. {
  937. doflush();
  938. reading = true;
  939. }
  940. size32_t sizeGot = readFromBuffer(len, data);
  941. len -= sizeGot;
  942. if (len!=0)
  943. {
  944. data = (char *)data + sizeGot;
  945. if (len >= minDirectSize)
  946. sizeGot += directRead(len, data); // if direct read specified don't loop
  947. else
  948. {
  949. do
  950. {
  951. if (!fillBuffer())
  952. break;
  953. size32_t numRead = readFromBuffer(len, data);
  954. sizeGot += numRead;
  955. len -= numRead;
  956. data = (char *)data + numRead;
  957. } while (len);
  958. }
  959. }
  960. return sizeGot;
  961. }
  962. size32_t CBufferedIOStreamBase::dowrite(size32_t len, const void * data)
  963. {
  964. if (reading)
  965. {
  966. curBufferOffset = 0;
  967. numInBuffer = 0;
  968. reading = false;
  969. }
  970. size32_t ret = len;
  971. while (len) { // tries to write in buffer size chunks, also flushes as soon as possible
  972. size32_t wr;
  973. if (numInBuffer != 0) {
  974. wr = std::min(len,bufferSize-curBufferOffset);
  975. writeToBuffer(wr, data);
  976. if (numInBuffer==bufferSize)
  977. doflush();
  978. len -= wr;
  979. if (len==0)
  980. break;
  981. data = (char *)data + wr;
  982. }
  983. if (len >= minDirectSize)
  984. return directWrite(len, data)+ret-len;
  985. wr = std::min(len,bufferSize);
  986. writeToBuffer(wr, data);
  987. if (numInBuffer==bufferSize)
  988. doflush();
  989. len -= wr;
  990. data = (char *)data + wr;
  991. }
  992. return ret; // there is a bit of an assumption here that flush always works
  993. }
  994. ////////////////////////////
  995. class CBufferedIIOStream : public CBufferedIOStreamBase, implements IIOStream
  996. {
  997. Linked<IIOStream> io;
  998. public:
  999. IMPLEMENT_IINTERFACE;
  1000. CBufferedIIOStream(IIOStream *_io, unsigned bufSize) : CBufferedIOStreamBase(bufSize), io(_io)
  1001. {
  1002. buffer = new byte[bufSize];
  1003. }
  1004. ~CBufferedIIOStream()
  1005. {
  1006. delete [] buffer;
  1007. }
  1008. virtual void beforeDispose()
  1009. {
  1010. try
  1011. {
  1012. // NOTE - flush may throw an exception and thus cannot be done in the destructor.
  1013. flush();
  1014. }
  1015. catch (IException *E)
  1016. {
  1017. EXCLOG(E, "ERROR - Exception in CBufferedIIOStream::flush ignored");
  1018. E->Release();
  1019. assert(!"ERROR - Exception in CBufferedIIOStream::flush ignored");
  1020. }
  1021. catch (...)
  1022. {
  1023. DBGLOG("ERROR - Unknown exception in CBufferedIIOStream::flush ignored");
  1024. assert(!"ERROR - Unknown exception in CBufferedIIOStream::flush ignored");
  1025. }
  1026. }
  1027. virtual bool fillBuffer()
  1028. {
  1029. reading = true;
  1030. numInBuffer = io->read(bufferSize, buffer);
  1031. curBufferOffset = 0;
  1032. return numInBuffer!=0;
  1033. }
  1034. virtual size32_t directRead(size32_t len, void * data)
  1035. {
  1036. return io->read(len, data);
  1037. }
  1038. virtual size32_t directWrite(size32_t len, const void * data)
  1039. {
  1040. return io->write(len,data);
  1041. }
  1042. virtual void doflush()
  1043. {
  1044. if (!reading && numInBuffer)
  1045. {
  1046. //Copy numInBuffer out before flush so that destructor doesn't attempt to flush again.
  1047. size32_t numToWrite = numInBuffer;
  1048. numInBuffer = 0;
  1049. io->write(numToWrite, buffer);
  1050. curBufferOffset = 0;
  1051. }
  1052. }
  1053. // IIOStream impl.
  1054. virtual size32_t read(size32_t len, void * data) { return CBufferedIOStreamBase::doread(len, data); }
  1055. virtual size32_t write(size32_t len, const void * data) { return CBufferedIOStreamBase::dowrite(len, data); }
  1056. virtual void flush() { doflush(); }
  1057. };
  1058. IIOStream *createBufferedIOStream(IIOStream *io, unsigned bufSize)
  1059. {
  1060. if (bufSize == (unsigned)-1)
  1061. bufSize = DEFAULT_BUFFER_SIZE;
  1062. return new CBufferedIIOStream(io, bufSize);
  1063. }
  1064. IRowStream *createNullRowStream()
  1065. {
  1066. class cNullStream: implements IRowStream, public CInterface
  1067. {
  1068. const void *nextRow() { return NULL; }
  1069. void stop() {}
  1070. public:
  1071. IMPLEMENT_IINTERFACE;
  1072. cNullStream() {}
  1073. };
  1074. return new cNullStream;
  1075. }
  1076. unsigned copyRowStream(IRowStream *in, IRowWriter *out)
  1077. {
  1078. unsigned ret=0;
  1079. for (;;) {
  1080. const void *row = in->nextRow();
  1081. if (!row)
  1082. break;
  1083. ret ++;
  1084. out->putRow(row);
  1085. }
  1086. return ret;
  1087. }
  1088. unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1089. {
  1090. unsigned ret=0;
  1091. for (;;) {
  1092. const void *row = in->nextRow();
  1093. if (!row) {
  1094. row = in->nextRow();
  1095. if (!row)
  1096. break;
  1097. out->putRow(NULL);
  1098. }
  1099. ret ++;
  1100. out->putRow(row);
  1101. }
  1102. return ret;
  1103. }
  1104. unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out)
  1105. {
  1106. unsigned ret=0;
  1107. for (;;) {
  1108. const void *row = in->ungroupedNextRow();
  1109. if (!row)
  1110. break;
  1111. ret ++;
  1112. out->putRow(row);
  1113. }
  1114. return ret;
  1115. }
  1116. class CConcatRowStream : public IRowStream, public CInterface
  1117. {
  1118. IArrayOf<IRowStream> oinstreams;
  1119. public:
  1120. unsigned num;
  1121. unsigned idx;
  1122. IRowStream **in;
  1123. bool grouped;
  1124. bool needeog;
  1125. public:
  1126. IMPLEMENT_IINTERFACE;
  1127. CConcatRowStream(unsigned _num, IRowStream **_in,bool _grouped)
  1128. {
  1129. // in streams assumed valid throughout (i.e. not linked)
  1130. num = _num;
  1131. idx = 0;
  1132. assertex(num);
  1133. oinstreams.ensure(num);
  1134. for (unsigned n = 0;n<num;n++)
  1135. oinstreams.append(*LINK(_in[n]));
  1136. in = oinstreams.getArray();
  1137. grouped = _grouped;
  1138. needeog = false;
  1139. }
  1140. const void *nextRow()
  1141. {
  1142. while (idx<num) {
  1143. const void *row;
  1144. if (grouped) {
  1145. row = in[idx]->nextRow();
  1146. if (row) {
  1147. needeog = true;
  1148. return row;
  1149. }
  1150. if (needeog) {
  1151. needeog = false;
  1152. return NULL;
  1153. }
  1154. }
  1155. else {
  1156. row = in[idx]->ungroupedNextRow();
  1157. if (row)
  1158. return row;
  1159. }
  1160. idx++;
  1161. }
  1162. return NULL;
  1163. }
  1164. virtual void stop()
  1165. {
  1166. while (idx<num)
  1167. in[idx++]->stop();
  1168. }
  1169. };
  1170. extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa)
  1171. {
  1172. return new CChainedWriteSeq(iwsa);
  1173. }
  1174. extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa)
  1175. {
  1176. return new CChainedReadSeq(irsa);
  1177. }
  1178. IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped)
  1179. {
  1180. switch(numstreams)
  1181. {
  1182. case 0:
  1183. return createNullRowStream();
  1184. case 1:
  1185. return LINK(streams[0]);
  1186. default:
  1187. return new CConcatRowStream(numstreams,streams,grouped);
  1188. }
  1189. }
  1190. #ifdef __x86_64__
  1191. void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
  1192. void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
  1193. void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
  1194. #endif
  1195. template<typename T> size32_t readSimpleStream(T &target, ISimpleReadStream &stream, size32_t readChunkSize)
  1196. {
  1197. size32_t totalSz = 0;
  1198. while (true)
  1199. {
  1200. size32_t sizeRead = stream.read(readChunkSize, target.reserve(readChunkSize));
  1201. if (sizeRead < readChunkSize)
  1202. {
  1203. target.setLength(target.length() - (readChunkSize - sizeRead));
  1204. if (!sizeRead)
  1205. break;
  1206. }
  1207. totalSz += sizeRead;
  1208. }
  1209. return totalSz;
  1210. }
  1211. template size32_t readSimpleStream<StringBuffer>(StringBuffer &, ISimpleReadStream &, size32_t);
  1212. template size32_t readSimpleStream<MemoryBuffer>(MemoryBuffer &, ISimpleReadStream &, size32_t);