jlzw.cpp 83 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // JLIB LZW compression class
  14. #include "platform.h"
  15. #include "jmisc.hpp"
  16. #include "jlib.hpp"
  17. #include <time.h>
  18. #include "jfile.hpp"
  19. #include "jencrypt.hpp"
  20. #include "jflz.hpp"
  21. #include "jlz4.hpp"
  22. #ifdef _WIN32
  23. #include <io.h>
  24. #endif
  25. #include "jlzw.ipp"
  26. #define COMMITTED ((size32_t)-1)
  27. #define BITS_LO 9
  28. #define BITS_HI 15
  29. #define MAX_CODE ((1<<BITS_HI)-1)
  30. #define BUMP_CODE 257
  31. #define FIRST_CODE 258
  32. #define SAFETY_MARGIN 16 // for 15 bits
  33. #define BITS_PER_UNIT 8
  34. #define BITS_ALWAYS 8
  35. #define ALWAYS_MASK ((1<<BITS_ALWAYS)-1)
  36. typedef unsigned long bucket_t;
  37. // typedef long long lbucket_t;
  38. typedef __int64 lbucket_t;
  39. //#define STATS
  40. //#define TEST
  41. #ifdef _DEBUG
  42. #define ASSERT(a) assertex(a)
  43. #else
  44. #define ASSERT(a)
  45. #endif
  46. LZWDictionary::LZWDictionary()
  47. {
  48. curbits = 0;
  49. }
  50. void LZWDictionary::initdict()
  51. {
  52. nextcode = FIRST_CODE;
  53. curbits = BITS_LO;
  54. nextbump = 1<<BITS_LO;
  55. }
  56. bool LZWDictionary::bumpbits()
  57. {
  58. if (curbits==BITS_HI)
  59. return false;
  60. curbits++;
  61. nextbump = 1<<curbits;
  62. return true;
  63. }
  64. #ifdef STATS
  65. static unsigned st_tottimems=0;
  66. static unsigned st_maxtime=0;
  67. static int st_maxtime_writes=0;
  68. static int st_totwrites=0;
  69. static int st_totblocks=0;
  70. static int st_totwritten=0; // in K
  71. static int st_totread=0; // in K
  72. static unsigned st_thistime=0;
  73. static int st_thiswrites=0;
  74. static unsigned st_totbitsize=0;
  75. static unsigned st_totbitsizeuc=0;
  76. #endif
  77. CLZWCompressor::CLZWCompressor(bool _supportbigendian)
  78. {
  79. outbuf = NULL;
  80. outlen = 0;
  81. maxlen = 0;
  82. bufalloc = 0;
  83. inuseflag=0xff;
  84. supportbigendian = _supportbigendian;
  85. outBufStart = 0;
  86. outBufMb = NULL;
  87. }
  88. CLZWCompressor::~CLZWCompressor()
  89. {
  90. if (bufalloc)
  91. free(outbuf);
  92. #ifdef STATS
  93. printf("HLZW STATS:\n");
  94. printf(" st_tottimems = %d\n",st_tottimems);
  95. printf(" st_maxtime = %d\n",st_maxtime);
  96. printf(" st_maxtime_writes = %d\n",st_maxtime_writes);
  97. printf(" st_totwrites = %d\n",st_totwrites);
  98. printf(" st_totblocks = %d\n",st_totblocks);
  99. printf(" st_totwritten = %dK\n",st_totwritten); // in K
  100. printf(" st_totread = %dK\n",st_totread); // in K
  101. printf(" st_totbitsize = %d\n",st_totbitsize);
  102. printf(" st_totbitsizeuc = %d\n",st_totbitsizeuc);
  103. #endif
  104. }
  105. void CLZWCompressor::initdict()
  106. {
  107. dict.initdict();
  108. // use inuseflag rather than clearing as this can take a large proportion of the time
  109. // (e.g. in hozed)
  110. if (inuseflag==0xff) {
  111. memset(dictinuse,0,sizeof(dictinuse));
  112. inuseflag=0;
  113. }
  114. inuseflag++;
  115. }
  116. struct ShiftInfo {
  117. int mask1;
  118. int shift2; // NB right shift, not left
  119. int mask2;
  120. int padding; // make it multiple of 4
  121. };
  122. ShiftInfo ShiftArray[BITS_HI-BITS_ALWAYS+1][BITS_PER_UNIT];
  123. static struct __initShiftArray {
  124. __initShiftArray()
  125. {
  126. for (unsigned numBits = BITS_LO; numBits <= BITS_HI; ++numBits) {
  127. unsigned copyBits = numBits-BITS_ALWAYS;
  128. unsigned mask = (1<<numBits)-1-ALWAYS_MASK;
  129. for (unsigned shift = 0; shift < BITS_PER_UNIT; shift++) {
  130. ShiftInfo & cur = ShiftArray[copyBits][shift];
  131. if (shift + copyBits <= BITS_PER_UNIT) {
  132. cur.mask1 = mask;
  133. cur.shift2 = 0;
  134. cur.mask2 = 0;
  135. }
  136. else {
  137. cur.shift2 = BITS_PER_UNIT + BITS_ALWAYS - shift;
  138. cur.mask1 = (1<<cur.shift2)-1-ALWAYS_MASK;
  139. cur.mask2 = mask - cur.mask1;
  140. }
  141. }
  142. }
  143. }
  144. } __do_initShiftArray;
  145. #define PUTCODE(code) \
  146. { \
  147. unsigned inbits=code; \
  148. int shift=curShift; \
  149. int copyBits = dict.curbits - BITS_PER_UNIT; \
  150. \
  151. *(outbytes++) = (unsigned char)(inbits&0xff); \
  152. ShiftInfo & cur = ShiftArray[copyBits][shift]; \
  153. outbitbuf |= (inbits & cur.mask1) >> (BITS_ALWAYS-shift); \
  154. shift += copyBits; \
  155. if (shift >= BITS_PER_UNIT) \
  156. { \
  157. shift -= BITS_PER_UNIT; \
  158. *(outbits++) = outbitbuf; \
  159. if (outbits==outnext) { \
  160. outbytes = outnext; \
  161. outbits = outbytes+BITS_ALWAYS; \
  162. outnext += dict.curbits; \
  163. outlen += dict.curbits; \
  164. ASSERT(shift==0); \
  165. } \
  166. outbitbuf = 0; \
  167. if (shift != 0) \
  168. outbitbuf = (inbits & cur.mask2) >> cur.shift2; \
  169. } \
  170. curShift = shift; \
  171. }
  172. #define GETCODE(ret) \
  173. { \
  174. int shift=curShift; \
  175. int copyBits = dict.curbits - BITS_PER_UNIT; \
  176. \
  177. ret = *(inbytes++); \
  178. ShiftInfo & cur = ShiftArray[copyBits][shift]; \
  179. ret |= (*inbits << (BITS_ALWAYS-shift)) & cur.mask1; \
  180. shift += copyBits; \
  181. if (shift >= BITS_PER_UNIT) \
  182. { \
  183. shift -= BITS_PER_UNIT; \
  184. inbits++; \
  185. if (inbits==innext) { \
  186. inbytes = innext; \
  187. inbits = inbytes+BITS_ALWAYS; \
  188. innext += dict.curbits; \
  189. ASSERT(shift==0); \
  190. } \
  191. if (shift != 0) \
  192. ret |= (*inbits << cur.shift2) & cur.mask2; \
  193. } \
  194. curShift = shift; \
  195. }
  196. void CLZWCompressor::initCommon()
  197. {
  198. ASSERT(dict.curbits==0); // check for open called twice with no close
  199. initdict();
  200. curcode = -1;
  201. inlen = 0;
  202. inlenblk = COMMITTED;
  203. memset(outbuf,0,sizeof(size32_t));
  204. outlen = sizeof(size32_t)+dict.curbits;
  205. outbytes = (unsigned char *)outbuf+sizeof(size32_t);
  206. outbits = outbytes+8;
  207. outnext = outbytes+dict.curbits;
  208. curShift=0; //outmask = 0x80;
  209. outbitbuf = 0;
  210. }
  211. void CLZWCompressor::flushbuf()
  212. {
  213. if (outbytes==outnext)
  214. return;
  215. *(outbits++) = outbitbuf;
  216. while (outbits!=outnext) {
  217. *(outbits++) = 0;
  218. }
  219. do {
  220. *(outbytes++) = 0;
  221. } while (outbytes+(dict.curbits-8)!=outnext);
  222. }
  223. void CLZWCompressor::ensure(size32_t sz)
  224. {
  225. dbgassertex(outBufMb);
  226. size32_t outBytesOffset = outbytes-(byte *)outbuf;
  227. size32_t outBitsOffset = outbits-(byte *)outbuf;
  228. size32_t outNextOffset = outnext-(byte *)outbuf;
  229. outbuf = (byte *)outBufMb->ensureCapacity(sz);
  230. maxlen = outBufMb->capacity()-SAFETY_MARGIN;
  231. outbytes = (byte *)outbuf+outBytesOffset;
  232. outbits = (byte *)outbuf+outBitsOffset;
  233. outnext = (byte *)outbuf+outNextOffset;
  234. }
  235. void CLZWCompressor::open(MemoryBuffer &mb, size32_t initialSize)
  236. {
  237. if (bufalloc)
  238. free(outbuf);
  239. bufalloc = 0;
  240. outBufMb = &mb;
  241. outBufStart = mb.length();
  242. outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
  243. maxlen = outBufMb->capacity()-SAFETY_MARGIN;
  244. initCommon();
  245. }
  246. void CLZWCompressor::open(void *buf,size32_t max)
  247. {
  248. #ifdef STATS
  249. st_thistime = msTick();
  250. st_thiswrites=0;
  251. #endif
  252. if (buf)
  253. {
  254. if (bufalloc)
  255. free(outbuf);
  256. bufalloc = 0;
  257. outbuf = buf;
  258. }
  259. else if (max>bufalloc)
  260. {
  261. if (bufalloc)
  262. free(outbuf);
  263. bufalloc = max;
  264. outbuf = malloc(bufalloc);
  265. }
  266. outBufMb = NULL;
  267. ASSERT(max>SAFETY_MARGIN+sizeof(size32_t)); // minimum required
  268. maxlen=max-SAFETY_MARGIN;
  269. initCommon();
  270. }
  271. #define HASHC(code,ch) (((0x01000193*(unsigned)code)^(unsigned char)ch)%LZW_HASH_TABLE_SIZE)
  272. #define BE_MEMCPY4(dst,src) { if (supportbigendian) _WINCPYREV4(dst,src); else memcpy(dst,src,4); }
  273. size32_t CLZWCompressor::write(const void *buf,size32_t buflen)
  274. {
  275. if (!buflen)
  276. return 0;
  277. if (!dict.curbits)
  278. return 0;
  279. unsigned char *in=(unsigned char *)buf;
  280. #ifdef STATS
  281. st_thiswrites++;
  282. #endif
  283. size32_t len=buflen;
  284. if (curcode==-1)
  285. {
  286. curcode = *(in++);
  287. len--;
  288. }
  289. while (len--)
  290. {
  291. int ch = *(in++);
  292. int index = HASHC(curcode,ch);
  293. for (;;)
  294. {
  295. if (dictinuse[index]!=inuseflag)
  296. {
  297. dictinuse[index] = inuseflag;
  298. dictcode[index] = dict.nextcode++;
  299. dict.dictparent[index] = curcode;
  300. dict.dictchar[index] = (unsigned char) ch;
  301. PUTCODE(curcode);
  302. if ((outlen>=maxlen))
  303. {
  304. if (outBufMb)
  305. ensure(outlen+0x10000);
  306. else
  307. {
  308. size32_t ret;
  309. if (inlenblk==COMMITTED)
  310. {
  311. ret = in-(unsigned char *)buf-1;
  312. inlen += in-(unsigned char *)buf-1;
  313. }
  314. else
  315. ret = 0;
  316. close();
  317. return ret;
  318. }
  319. }
  320. if (dict.nextcode == dict.nextbump)
  321. {
  322. PUTCODE(BUMP_CODE);
  323. flushbuf();
  324. bool eodict = !dict.bumpbits();
  325. if (eodict)
  326. initdict();
  327. outbytes = outnext;
  328. outbits = outbytes+8;
  329. outnext += dict.curbits;
  330. outlen += dict.curbits;
  331. curShift=0;//outmask = 0x80;
  332. outbitbuf = 0;
  333. }
  334. curcode = ch;
  335. break;
  336. }
  337. if (dict.dictparent[index] == curcode &&
  338. dict.dictchar[index] == (unsigned char)ch)
  339. {
  340. curcode = dictcode[index];
  341. break;
  342. }
  343. index--;
  344. if (index<0)
  345. index = LZW_HASH_TABLE_SIZE-1;
  346. }
  347. }
  348. inlen += buflen;
  349. return buflen;
  350. }
  351. void CLZWCompressor::startblock()
  352. {
  353. inlenblk = inlen;
  354. }
  355. void CLZWCompressor::commitblock()
  356. {
  357. inlenblk = COMMITTED;
  358. }
  359. void CLZWCompressor::close()
  360. {
  361. if (dict.curbits)
  362. {
  363. PUTCODE(curcode);
  364. flushbuf();
  365. dict.curbits = 0;
  366. if (inlenblk!=COMMITTED)
  367. inlen = inlenblk; // transaction failed
  368. inlenblk = COMMITTED;
  369. BE_MEMCPY4(outbuf,&inlen);
  370. #ifdef STATS
  371. unsigned t = (msTick()-st_thistime);
  372. if (t>st_maxtime) {
  373. st_maxtime = t;
  374. st_maxtime_writes = st_thiswrites;
  375. }
  376. st_tottimems += t;
  377. st_totwrites += st_thiswrites;
  378. st_totwritten += (outlen+511)/1024;
  379. st_totread += (inlen+511)/1024;
  380. st_totblocks++;
  381. #endif
  382. if (outBufMb)
  383. {
  384. outBufMb->setWritePos(outBufStart+outlen);
  385. outBufMb = NULL;
  386. }
  387. }
  388. }
  389. CLZWExpander::CLZWExpander(bool _supportbigendian)
  390. {
  391. outbuf = NULL;
  392. outlen = 0;
  393. outmax = 0;
  394. bufalloc = 0;
  395. supportbigendian = _supportbigendian;
  396. }
  397. CLZWExpander::~CLZWExpander()
  398. {
  399. if (bufalloc)
  400. free(outbuf);
  401. }
  402. size32_t CLZWExpander::init(const void *blk)
  403. {
  404. dict.initdict();
  405. BE_MEMCPY4(&outlen,blk);
  406. inbytes=(unsigned char *)blk+sizeof(size32_t);
  407. inbits=inbytes+8;
  408. innext=inbytes+dict.curbits;
  409. curShift=0;
  410. return outlen;
  411. }
  412. void CLZWExpander::expand(void *buf)
  413. {
  414. if (!outlen)
  415. return;
  416. if (buf) {
  417. if (bufalloc)
  418. free(outbuf);
  419. bufalloc = 0;
  420. outbuf = (unsigned char *)buf;
  421. }
  422. else if (outlen>bufalloc) {
  423. if (bufalloc)
  424. free(outbuf);
  425. bufalloc = outlen;
  426. outbuf = (unsigned char *)malloc(bufalloc);
  427. if (!outbuf)
  428. throw MakeStringException(MSGAUD_operator,0, "Out of memory in LZWExpander::expand, requesting %d bytes", bufalloc);
  429. }
  430. unsigned char *out=outbuf;
  431. unsigned char *outend = out+outlen;
  432. int oldcode ;
  433. GETCODE(oldcode);
  434. int ch=oldcode;
  435. *(out++)=(unsigned char)ch;
  436. while (out!=outend) {
  437. int newcode;
  438. GETCODE(newcode);
  439. unsigned char *sp = stack;
  440. if (newcode >= dict.nextcode) {
  441. *(sp++) = (unsigned char) ch;
  442. ch = oldcode;
  443. }
  444. else if (newcode == BUMP_CODE) {
  445. bool eodict = !dict.bumpbits();
  446. if (eodict)
  447. dict.initdict();
  448. inbytes = innext;
  449. inbits = inbytes+8;
  450. innext += dict.curbits;
  451. curShift=0;
  452. if (eodict) {
  453. GETCODE(oldcode);
  454. ch=oldcode;
  455. *(out++)=(unsigned char)ch;
  456. }
  457. continue;
  458. }
  459. else
  460. ch = newcode;
  461. while (ch > 255) {
  462. *(sp++) = dict.dictchar[ch];
  463. ch = dict.dictparent[ch];
  464. }
  465. #ifdef _DEBUG
  466. assertex(dict.nextcode <= MAX_CODE);
  467. #endif
  468. dict.dictparent[dict.nextcode] = oldcode;
  469. dict.dictchar[dict.nextcode++] = (unsigned char) ch;
  470. oldcode = newcode;
  471. *(out++) = ch;
  472. while ((sp!=stack)&&(out!=outend)) {
  473. *(out++)=(unsigned char)*(--sp);
  474. }
  475. }
  476. }
  477. // encoding
  478. // 0 = 0
  479. // 10 = 1
  480. // 1100 = 2
  481. // 1101 = 3
  482. // 1110bb = 4-7
  483. // 11110bbbb = 8-23
  484. // 111110bbbbbbbb = 24-279
  485. #define OUTBIT(b) { if (b) bb|=bm; if (bm==0x80) { outp[l++] = bb; bb=0; bm=1; } else bm<<=1; }
  486. size32_t bitcompress(unsigned *p,int n,void *outb)
  487. {
  488. int l=0;
  489. unsigned char *outp=(unsigned char *)outb;
  490. outp[1] = 0;
  491. unsigned char bm=1;
  492. unsigned char bb=0;
  493. while (n--) {
  494. unsigned d=*p;
  495. if (d==0) { // special 0
  496. OUTBIT(0);
  497. }
  498. else if (--d==0) { // special 1
  499. OUTBIT(1);
  500. OUTBIT(0);
  501. }
  502. else {
  503. d--;
  504. unsigned m;
  505. unsigned nb=0;
  506. while (1) {
  507. if (nb==5) {
  508. m = 0x80000000;
  509. nb++;
  510. break;
  511. }
  512. unsigned ntb = 1<<nb;
  513. m = 1<<ntb;
  514. nb++;
  515. if (d<m) {
  516. m>>=1;
  517. break;
  518. }
  519. d-=m;
  520. }
  521. OUTBIT(1);
  522. while (nb--)
  523. OUTBIT(1);
  524. OUTBIT(0);
  525. while (m) {
  526. OUTBIT(m&d);
  527. m>>=1;
  528. }
  529. }
  530. p++;
  531. }
  532. if (bm!=1) {
  533. outp[l++] = bb; // flush remaining bits
  534. }
  535. return l;
  536. }
  537. #define MAX_BUCKETS 1024
  538. ICompressor *createLZWCompressor(bool _supportbigendian)
  539. {
  540. return new CLZWCompressor(_supportbigendian);
  541. }
  542. IExpander *createLZWExpander(bool _supportbigendian)
  543. {
  544. return new CLZWExpander(_supportbigendian);
  545. }
  546. //===========================================================================
  547. /*
  548. RLE
  549. uses <d1-de> 1-15 repeats of prev char
  550. <d0> <rept-15> 15-222 repeats of prev char
  551. <d0> as escape (followed by d0-df)
  552. <d0> <ff> (at start) - plain row following
  553. prev char is initialy assumed 0
  554. */
  555. size32_t RLECompress(void *dst,const void *src,size32_t size) // maximum will write is 2+size
  556. {
  557. if (size==0)
  558. return 0;
  559. byte *out=(byte *)dst;
  560. byte *outmax = out+size;
  561. const byte *in=(const byte *)src;
  562. const byte *inmax = in+size;
  563. byte pc = 0;
  564. for (;;) {
  565. byte c = *(in++);
  566. if (c==pc) {
  567. byte cnt = 0;
  568. do {
  569. cnt++;
  570. if (in==inmax) {
  571. if (cnt<=15)
  572. *(out++) = 0xd0+cnt;
  573. else {
  574. *(out++) = 0xd0;
  575. if (out==outmax)
  576. goto Fail;
  577. *(out++) = cnt-15;
  578. }
  579. return (size32_t)(out-(byte *)dst);
  580. }
  581. c = *(in++);
  582. } while ((c==pc)&&(cnt!=222));
  583. if (cnt<=15)
  584. *(out++) = 0xd0+cnt;
  585. else {
  586. *(out++) = 0xd0;
  587. if (out==outmax)
  588. break; // fail
  589. *(out++) = cnt-15;
  590. }
  591. if (out==outmax)
  592. break;
  593. }
  594. if ((c<0xd0)||(c>=0xe0))
  595. *(out++) = c;
  596. else {
  597. *(out++) = 0xd0;
  598. if (out==outmax)
  599. break; // fail
  600. *(out++) = c;
  601. }
  602. if (in==inmax)
  603. return (size32_t)(out-(byte *)dst);
  604. if (out==outmax)
  605. break; // will need at least one more char
  606. pc = c;
  607. }
  608. Fail:
  609. out=(byte *)dst;
  610. *(out++) = 0xd0;
  611. *(out++) = 0xff;
  612. memcpy(out,src,size);
  613. return size+2;
  614. }
  615. size32_t RLEExpand(void *dst,const void *src,size32_t expsize)
  616. {
  617. if (expsize==0)
  618. return 0;
  619. byte *out=(byte *)dst;
  620. byte *outmax = out+expsize;
  621. const byte *in=(const byte *)src;
  622. byte c = *(in++);
  623. if ((c==0xd0)&&(*in==0xff)) {
  624. memcpy(dst,in+1,expsize);
  625. return expsize+2;
  626. }
  627. byte pc = 0;
  628. for (;;) {
  629. if ((c<0xd0)||(c>=0xe0))
  630. *(out++) = c;
  631. else {
  632. c -= 0xd0;
  633. if (c==0) {
  634. c = *(in++);
  635. if (c>=0xd0) {
  636. *(out++) = c;
  637. if (c>=0xe0)
  638. throw MakeStringException(-1,"Corrupt RLE format");
  639. goto Escape;
  640. }
  641. c+=15;
  642. }
  643. size32_t left = (size32_t)(outmax-out);
  644. size32_t cnt = c;
  645. c = pc;
  646. if (left<cnt)
  647. cnt = left;
  648. while (cnt--)
  649. *(out++) = c;
  650. }
  651. Escape:
  652. if (out==outmax)
  653. break;
  654. pc = c;
  655. c = *(in++);
  656. }
  657. return (size32_t)(in-(const byte *)src);
  658. }
  659. void appendToBuffer(MemoryBuffer & out, size32_t len, const void * src)
  660. {
  661. out.append(false);
  662. out.append(len);
  663. out.append(len, src);
  664. }
  665. void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
  666. {
  667. unsigned originalLength = out.length();
  668. out.append(true);
  669. out.append((size32_t)0);
  670. if (len >= 32)
  671. {
  672. size32_t newSize = len * 4 / 5; // Copy if compresses less than 80% ...
  673. Owned<ICompressor> compressor = createLZWCompressor();
  674. void *newData = out.reserve(newSize);
  675. compressor->open(newData, newSize);
  676. if (compressor->write(src, len)==len)
  677. {
  678. compressor->close();
  679. size32_t compressedLen = compressor->buflen();
  680. out.setWritePos(originalLength + sizeof(bool));
  681. out.append(compressedLen);
  682. out.setWritePos(originalLength + sizeof(bool) + sizeof(size32_t) + compressedLen);
  683. return;
  684. }
  685. }
  686. // all or don't compress
  687. out.setWritePos(originalLength);
  688. appendToBuffer(out, len, src);
  689. }
  690. void decompressToBuffer(MemoryBuffer & out, const void * src)
  691. {
  692. Owned<IExpander> expander = createLZWExpander();
  693. unsigned outSize = expander->init(src);
  694. void * buff = out.reserve(outSize);
  695. expander->expand(buff);
  696. }
  697. void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in)
  698. {
  699. bool compressed;
  700. size32_t srcLen;
  701. in.read(compressed).read(srcLen);
  702. if (compressed)
  703. decompressToBuffer(out, in.readDirect(srcLen));
  704. else
  705. out.append(srcLen, in.readDirect(srcLen));
  706. }
  707. void decompressToAttr(MemoryAttr & out, const void * src)
  708. {
  709. Owned<IExpander> expander = createLZWExpander();
  710. unsigned outSize = expander->init(src);
  711. void * buff = out.allocate(outSize);
  712. expander->expand(buff);
  713. }
  714. void decompressToBuffer(MemoryAttr & out, MemoryBuffer & in)
  715. {
  716. bool compressed;
  717. size32_t srcLen;
  718. in.read(compressed).read(srcLen);
  719. if (compressed)
  720. decompressToAttr(out, in.readDirect(srcLen));
  721. else
  722. out.set(srcLen, in.readDirect(srcLen));
  723. }
  724. /*
  725. Simple Diff compression format is
  726. <compressed-block> ::= <initial-row> { <compressed-row> }
  727. <compressed-row> ::= { <same-count> <diff-count> <diff-bytes> }
  728. <same-count> ::= { 255 } <byte> -- value is sum
  729. <diff-count> ::= <byte>
  730. <diff-bytes> ::= { <bytes> }
  731. // note if diff-count is > 255 it will be broken into 255 diff followed by 0 same
  732. // also need at least 2 bytes same before stops difference block
  733. thus AAAAAA...AAAAAA [ len 500 ]
  734. followed by ADADAD...ADADAD
  735. will be saved as 1,255,ADADA..ADADA,0,244,ADADA..ADADA -> 503 bytes
  736. and AAAAAA...AAAAAA [ len 500 ]
  737. followed by AADDAA...AADDAA
  738. will be saved as 2,2,DD,2,2,DD...2,2,DD,2 -> 499 bytes
  739. and AAAAAA...AAAAAA [ len 500 ]
  740. followed by AAAAAA...AAAAAA
  741. will be saved as 255,245 -> 2 bytes
  742. and AAAAAA...AAAAAA [ len 500 ]
  743. followed by ZZZZZZ...ZZZZZZ
  744. will be saves as 0,255,ZZ..ZZ,0,245,ZZ..ZZ -> 504 bytes
  745. // maximum size is of a row is bounded by: rowsize+((rowsize+254)/255)*2;
  746. */
  747. size32_t DiffCompress(const void *src,void *dst,void *buff,size32_t rs)
  748. {
  749. const unsigned char *s=(const unsigned char *)src;
  750. unsigned char *d=(unsigned char *)dst;
  751. unsigned char *b=(unsigned char *)buff;
  752. ASSERT(rs);
  753. size32_t cnt;
  754. cnt = 0;
  755. while (*s==*b) {
  756. Loop:
  757. cnt++;
  758. rs--;
  759. if (rs==0) break;
  760. s++;
  761. b++;
  762. }
  763. while (cnt>=255) {
  764. *d = 255;
  765. d++;
  766. cnt-=255;
  767. }
  768. *d = (unsigned char)cnt;
  769. d++;
  770. if (rs!=0) {
  771. unsigned char *dcnt=d;
  772. d++;
  773. cnt = 0;
  774. while(1) {
  775. cnt++;
  776. *d = *s;
  777. d++;
  778. *b = *s;
  779. rs--;
  780. if (rs==0) {
  781. *dcnt=(unsigned char)cnt;
  782. break;
  783. }
  784. s++;
  785. b++;
  786. if (*s==*b) {
  787. if ((rs>1)&&(s[1]==b[1])) { // slower but slightly better compression
  788. *dcnt=(unsigned char)cnt;
  789. cnt = 0;
  790. goto Loop;
  791. }
  792. }
  793. if (cnt==255) {
  794. *dcnt=(unsigned char)cnt;
  795. *d = 0;
  796. d++;
  797. dcnt = d++;
  798. cnt = 0;
  799. }
  800. }
  801. }
  802. return (size32_t)(d-(unsigned char *)dst);
  803. }
  804. size32_t DiffCompress2(const void *src,void *dst,const void *prev,size32_t rs)
  805. {
  806. // doesn't update prev
  807. const unsigned char *s=(const unsigned char *)src;
  808. unsigned char *d=(unsigned char *)dst;
  809. const unsigned char *b=(unsigned char *)prev;
  810. ASSERT(rs);
  811. size32_t cnt;
  812. cnt = 0;
  813. while (*s==*b) {
  814. Loop:
  815. cnt++;
  816. rs--;
  817. if (rs==0) break;
  818. s++;
  819. b++;
  820. }
  821. while (cnt>=255) {
  822. *d = 255;
  823. d++;
  824. cnt-=255;
  825. }
  826. *d = (unsigned char)cnt;
  827. d++;
  828. if (rs!=0) {
  829. unsigned char *dcnt=d;
  830. d++;
  831. cnt = 0;
  832. while(1) {
  833. cnt++;
  834. *d = *s;
  835. d++;
  836. rs--;
  837. if (rs==0) {
  838. *dcnt=(unsigned char)cnt;
  839. break;
  840. }
  841. s++;
  842. b++;
  843. if (*s==*b) {
  844. if ((rs>1)&&(s[1]==b[1])) { // slower but slightly better compression
  845. *dcnt=(unsigned char)cnt;
  846. cnt = 0;
  847. goto Loop;
  848. }
  849. }
  850. if (cnt==255) {
  851. *dcnt=(unsigned char)cnt;
  852. *d = 0;
  853. d++;
  854. dcnt = d++;
  855. cnt = 0;
  856. }
  857. }
  858. }
  859. return (size32_t)(d-(unsigned char *)dst);
  860. }
  861. size32_t DiffCompressFirst(const void *src,void *dst,void *buf,size32_t rs)
  862. {
  863. memcpy(buf,src,rs);
  864. const unsigned char *s=(const unsigned char *)src;
  865. unsigned char *d=(unsigned char *)dst;
  866. *d = 0;
  867. d++;
  868. while (rs) {
  869. unsigned cnt=(rs<=255)?rs:255;
  870. *d=(unsigned char)cnt;
  871. d++;
  872. memcpy(d,s,cnt);
  873. d += cnt;
  874. s += cnt;
  875. *d = 0;
  876. d++;
  877. rs -= cnt;
  878. }
  879. return (size32_t)(d-(unsigned char *)dst);
  880. }
  881. size32_t DiffCompressedSize(const void *src,size32_t rs)
  882. {
  883. const unsigned char *s=(const unsigned char *)src;
  884. unsigned n;
  885. while (rs) {
  886. // first comes compressed
  887. do {
  888. n = *s;
  889. s++;
  890. rs -= n;
  891. } while (n==255);
  892. if (rs==0)
  893. break;
  894. n = *s;
  895. s++;
  896. rs -= n;
  897. s += n;
  898. }
  899. return (size32_t)(s-(const unsigned char *)src);
  900. }
  901. size32_t DiffExpand(const void *src,void *dst,const void *prev,size32_t rs)
  902. {
  903. unsigned char *s=(unsigned char *)src;
  904. unsigned char *d=(unsigned char *)dst;
  905. const unsigned char *b=(const unsigned char *)prev;
  906. ASSERT(rs);
  907. while (rs) {
  908. size32_t cnt = 0;
  909. size32_t c;
  910. do {
  911. c=(size32_t)*s;
  912. s++;
  913. cnt += c;
  914. } while (c==255);
  915. rs -= cnt;
  916. while (cnt!=0) {
  917. *d = *b;
  918. d++;
  919. b++;
  920. cnt--;
  921. }
  922. if ((int)rs<=0) {
  923. if (rs == 0)
  924. break;
  925. throw MakeStringException(-1,"Corrupt compressed data(1)");
  926. }
  927. cnt=(size32_t)*s;
  928. s++;
  929. rs -= cnt;
  930. b += cnt;
  931. const unsigned char *e = s+cnt;
  932. while (s!=e) {
  933. *d = *s;
  934. s++;
  935. d++;
  936. }
  937. }
  938. return (size32_t)(s-(unsigned char *)src);
  939. }
  940. // helper class
  941. class CDiffExpand
  942. {
  943. byte *s;
  944. const byte *b;
  945. size32_t rs;
  946. enum {
  947. S_pre_repeat,
  948. S_repeat,
  949. S_diff
  950. } state;
  951. size32_t cnt;
  952. public:
  953. inline void init(const void *src,const void *prev,size32_t _rs)
  954. {
  955. s=(byte *)src;
  956. b=(const byte *)prev;
  957. state = S_pre_repeat;
  958. rs = _rs;
  959. cnt = 0;
  960. }
  961. inline void skip(size32_t sz)
  962. {
  963. if (!sz)
  964. return;
  965. while (sz) {
  966. switch (state) {
  967. case S_pre_repeat:
  968. if (!rs)
  969. return;
  970. cnt = 0;
  971. size32_t c;
  972. do {
  973. c=(size32_t)*s;
  974. s++;
  975. cnt += c;
  976. } while (c==255);
  977. rs -= cnt;
  978. state = S_repeat;
  979. // fall through
  980. case S_repeat:
  981. if (cnt) {
  982. if (sz<=cnt) {
  983. cnt -= sz;
  984. b += sz;
  985. return;
  986. }
  987. b += cnt;
  988. sz-=cnt;
  989. }
  990. if ((int)rs<=0) {
  991. if (rs == 0)
  992. return;
  993. throw MakeStringException(-1,"Corrupt compressed data(2)");
  994. }
  995. cnt=(size32_t)*s;
  996. s++;
  997. rs -= cnt;
  998. b += cnt;
  999. state = S_diff;
  1000. // fall through
  1001. case S_diff:
  1002. if (cnt) {
  1003. if (sz<=cnt) {
  1004. cnt -= sz;
  1005. s += sz;
  1006. return;
  1007. }
  1008. s += cnt;
  1009. sz -= cnt;
  1010. }
  1011. state = S_pre_repeat;
  1012. }
  1013. }
  1014. }
  1015. inline size32_t cpy(void *dst,size32_t sz)
  1016. {
  1017. if (!sz)
  1018. return 0;
  1019. byte *d=(byte *)dst;
  1020. for (;;) {
  1021. switch (state) {
  1022. case S_pre_repeat:
  1023. if (!rs)
  1024. return (size32_t)(d-(byte *)dst);
  1025. cnt = 0;
  1026. size32_t c;
  1027. do {
  1028. c=(size32_t)*s;
  1029. s++;
  1030. cnt += c;
  1031. } while (c==255);
  1032. rs -= cnt;
  1033. state = S_repeat;
  1034. // fall through
  1035. case S_repeat:
  1036. if (cnt) {
  1037. if (cnt>=sz) {
  1038. memcpy(d,b,sz);
  1039. b += sz;
  1040. cnt -= sz;
  1041. d += sz;
  1042. return (size32_t)(d-(byte *)dst);
  1043. }
  1044. memcpy(d,b,cnt);
  1045. b += cnt;
  1046. d += cnt;
  1047. sz -= cnt;
  1048. }
  1049. if ((int)rs<=0) {
  1050. if (rs == 0)
  1051. return (size32_t)(d-(byte *)dst);
  1052. throw MakeStringException(-1,"Corrupt compressed data(3)");
  1053. }
  1054. cnt=(size32_t)*s;
  1055. s++;
  1056. rs -= cnt;
  1057. b += cnt;
  1058. state = S_diff;
  1059. // fall through
  1060. case S_diff:
  1061. if (cnt) {
  1062. if (cnt>=sz) {
  1063. memcpy(d,s,sz);
  1064. s += sz;
  1065. cnt -= sz;
  1066. d += sz;
  1067. return (size32_t)(d-(byte *)dst);
  1068. }
  1069. memcpy(d,s,cnt);
  1070. s += cnt;
  1071. d += cnt;
  1072. sz -= cnt;
  1073. }
  1074. state = S_pre_repeat;
  1075. }
  1076. }
  1077. return 0; // never gets here
  1078. }
  1079. inline int cmp(const void *dst,size32_t sz)
  1080. {
  1081. int ret;
  1082. if (!sz)
  1083. return rs?-1:0;
  1084. const byte *d=(const byte *)dst;
  1085. for (;;) {
  1086. switch (state) {
  1087. case S_pre_repeat:
  1088. if (!rs)
  1089. return sz?1:0;
  1090. cnt = 0;
  1091. size32_t c;
  1092. do {
  1093. c=(size32_t)*s;
  1094. s++;
  1095. cnt += c;
  1096. } while (c==255);
  1097. rs -= cnt;
  1098. state = S_repeat;
  1099. // fall through
  1100. case S_repeat:
  1101. if (cnt) {
  1102. if (cnt>=sz) {
  1103. ret = memcmp(d,b,sz);
  1104. b += sz;
  1105. cnt -= sz;
  1106. return ret;
  1107. }
  1108. ret = memcmp(d,b,cnt);
  1109. b += cnt;
  1110. if (ret)
  1111. return ret;
  1112. d += cnt;
  1113. sz -= cnt;
  1114. }
  1115. if ((int)rs<=0) {
  1116. if (rs == 0)
  1117. return sz?1:0;
  1118. throw MakeStringException(-1,"Corrupt compressed data(4)");
  1119. }
  1120. cnt=(size32_t)*s;
  1121. s++;
  1122. rs -= cnt;
  1123. b += cnt;
  1124. state = S_diff;
  1125. // fall through
  1126. case S_diff:
  1127. if (cnt) {
  1128. if (cnt>=sz) {
  1129. ret = memcmp(d,s,sz);
  1130. s += sz;
  1131. cnt -= sz;
  1132. return ret;
  1133. }
  1134. ret = memcmp(d,s,cnt);
  1135. s += cnt;
  1136. if (ret)
  1137. return ret;
  1138. d += cnt;
  1139. sz -= cnt;
  1140. }
  1141. state = S_pre_repeat;
  1142. }
  1143. }
  1144. return 0; // never gets here
  1145. }
  1146. };
  1147. // =============================================================================
  1148. // RDIFF
  1149. // format ::= <expsize> <recsize> <plainrec> { <rowdif> }
  1150. class jlib_decl CRDiffCompressor : public ICompressor, public CInterface
  1151. {
  1152. size32_t inlen;
  1153. size32_t outlen;
  1154. size32_t bufalloc;
  1155. size32_t remaining;
  1156. void *outbuf;
  1157. unsigned char *out;
  1158. MemoryBuffer *outBufMb;
  1159. size32_t outBufStart;
  1160. size32_t recsize; // assumed fixed length rows
  1161. // assumes a transaction is a record
  1162. MemoryBuffer transbuf;
  1163. size32_t maxrecsize; // maximum size diff compress
  1164. unsigned char *prev;
  1165. void initCommon()
  1166. {
  1167. inlen = 0;
  1168. memset(outbuf, 0, sizeof(size32_t)*2);
  1169. outlen = sizeof(size32_t)*2;
  1170. out = (byte *)outbuf+outlen;
  1171. free(prev);
  1172. prev = NULL;
  1173. }
  1174. inline void ensure(size32_t sz)
  1175. {
  1176. if (NULL == outBufMb)
  1177. throw MakeStringException(-3,"CRDiffCompressor row doesn't fit in buffer!");
  1178. dbgassertex(remaining<sz);
  1179. verifyex(outBufMb->ensureCapacity(outBufMb->capacity()+(sz-remaining)));
  1180. outbuf = ((byte *)outBufMb->bufferBase())+outBufStart;
  1181. out = (byte *)outbuf+outlen;
  1182. remaining = outBufMb->capacity()-outlen;
  1183. }
  1184. public:
  1185. IMPLEMENT_IINTERFACE;
  1186. CRDiffCompressor()
  1187. {
  1188. outbuf = NULL;
  1189. outlen = 0;
  1190. maxrecsize = 0;
  1191. recsize = 0;
  1192. bufalloc = 0;
  1193. prev = NULL;
  1194. outBufMb = NULL;
  1195. }
  1196. ~CRDiffCompressor()
  1197. {
  1198. free(prev);
  1199. if (bufalloc)
  1200. free(outbuf);
  1201. }
  1202. void open(MemoryBuffer &mb, size32_t initialSize)
  1203. {
  1204. outBufMb = &mb;
  1205. outBufStart = mb.length();
  1206. outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
  1207. bufalloc = 0;
  1208. initCommon();
  1209. remaining = outBufMb->capacity()-outlen;
  1210. }
  1211. void open(void *buf,size32_t max)
  1212. {
  1213. if (buf)
  1214. {
  1215. if (bufalloc)
  1216. free(outbuf);
  1217. bufalloc = 0;
  1218. outbuf = buf;
  1219. }
  1220. else if (max>bufalloc)
  1221. {
  1222. if (bufalloc)
  1223. free(outbuf);
  1224. bufalloc = max;
  1225. outbuf = malloc(bufalloc);
  1226. }
  1227. outBufMb = NULL;
  1228. ASSERT(max>2+sizeof(size32_t)*2); // minimum required (actually will need enough for recsize so only a guess)
  1229. initCommon();
  1230. remaining = max-outlen;
  1231. }
  1232. void close()
  1233. {
  1234. transbuf.clear();
  1235. memcpy(outbuf,&inlen,sizeof(inlen)); // expanded size
  1236. memcpy((byte *)outbuf+sizeof(inlen),&recsize,sizeof(recsize));
  1237. if (outBufMb)
  1238. {
  1239. outBufMb->setWritePos(outBufStart+outlen);
  1240. outBufMb = NULL;
  1241. }
  1242. }
  1243. inline size32_t maxcompsize(size32_t s) { return s+((s+254)/255)*2; }
  1244. size32_t write(const void *buf,size32_t buflen)
  1245. {
  1246. // assumes a transaction is a row and at least one row fits in
  1247. if (prev)
  1248. {
  1249. if (transbuf.length()==0)
  1250. {
  1251. if (remaining<maxrecsize) // this is a bit odd because no incremental diffcomp
  1252. {
  1253. if (NULL == outBufMb)
  1254. return 0;
  1255. }
  1256. }
  1257. transbuf.append(buflen,buf);
  1258. }
  1259. else // first row
  1260. {
  1261. if (remaining<buflen)
  1262. ensure(buflen);
  1263. memcpy(out,buf,buflen);
  1264. out += buflen;
  1265. outlen += buflen;
  1266. }
  1267. // should inlen be updated here (probably not in transaction mode which is all this supports)
  1268. return buflen;
  1269. }
  1270. void startblock()
  1271. {
  1272. transbuf.clear();
  1273. }
  1274. void commitblock()
  1275. {
  1276. if (prev)
  1277. {
  1278. if (recsize!=transbuf.length())
  1279. throw MakeStringException(-1,"CRDiffCompressor used with variable sized row");
  1280. if (remaining<maxrecsize)
  1281. ensure(maxrecsize-remaining);
  1282. size32_t sz = DiffCompress(transbuf.toByteArray(),out,prev,recsize);
  1283. transbuf.clear();
  1284. out += sz;
  1285. outlen += sz;
  1286. remaining -= sz;
  1287. }
  1288. else
  1289. {
  1290. recsize = outlen-sizeof(size32_t)*2;
  1291. maxrecsize = maxcompsize(recsize);
  1292. prev = (byte *)malloc(recsize);
  1293. memcpy(prev,out-recsize,recsize);
  1294. remaining -= recsize;
  1295. }
  1296. inlen += recsize;
  1297. }
  1298. virtual void *bufptr() { return outbuf;}
  1299. virtual size32_t buflen() { return outlen;}
  1300. };
  1301. class jlib_decl CRDiffExpander : public IExpander, public CInterface
  1302. {
  1303. unsigned char *outbuf;
  1304. size32_t outlen;
  1305. size32_t bufalloc;
  1306. unsigned char *in;
  1307. size32_t recsize;
  1308. public:
  1309. IMPLEMENT_IINTERFACE;
  1310. CRDiffExpander()
  1311. {
  1312. outbuf = NULL;
  1313. outlen = 0;
  1314. bufalloc = 0;
  1315. recsize = 0;
  1316. }
  1317. ~CRDiffExpander()
  1318. {
  1319. if (bufalloc)
  1320. free(outbuf);
  1321. }
  1322. size32_t init(const void *blk) // returns size required
  1323. {
  1324. memcpy(&outlen,blk,sizeof(outlen));
  1325. memcpy(&recsize,(unsigned char *)blk+sizeof(outlen),sizeof(recsize));
  1326. in=(unsigned char *)blk+sizeof(outlen)+sizeof(recsize);
  1327. return outlen;
  1328. }
  1329. void expand(void *buf)
  1330. {
  1331. if (!outlen)
  1332. return;
  1333. if (buf) {
  1334. if (bufalloc)
  1335. free(outbuf);
  1336. bufalloc = 0;
  1337. outbuf = (unsigned char *)buf;
  1338. }
  1339. else if (outlen>bufalloc) {
  1340. if (bufalloc)
  1341. free(outbuf);
  1342. bufalloc = outlen;
  1343. outbuf = (unsigned char *)malloc(bufalloc);
  1344. }
  1345. if (outlen<recsize)
  1346. throw MakeStringException(-1,"CRDiffExpander: invalid buffer format");
  1347. unsigned char *out=outbuf;
  1348. memcpy(out,in,recsize);
  1349. const unsigned char *prev = out;
  1350. out += recsize;
  1351. in += recsize;
  1352. size_t remaining = outlen-recsize;
  1353. while (remaining) {
  1354. if (remaining<recsize)
  1355. throw MakeStringException(-2,"CRDiffExpander: invalid buffer format");
  1356. size32_t sz = DiffExpand(in,out,prev,recsize);
  1357. in += sz;
  1358. prev = out;
  1359. out += recsize;
  1360. remaining -= recsize;
  1361. }
  1362. }
  1363. virtual void *bufptr() { return outbuf;}
  1364. virtual size32_t buflen() { return outlen;}
  1365. };
  1366. ICompressor *createRDiffCompressor()
  1367. {
  1368. return new CRDiffCompressor;
  1369. }
  1370. IExpander *createRDiffExpander()
  1371. {
  1372. return new CRDiffExpander;
  1373. }
  1374. // =============================================================================
  1375. // RANDRDIFF
  1376. // format ::= <totsize> <0xffff> <recsize> <firstrlesize> <numrows> { <rowofs> } <difrecs> <firsrecrle>
  1377. // all 16bit except recs
  1378. struct RRDheader
  1379. {
  1380. unsigned short totsize;
  1381. unsigned short flag;
  1382. unsigned short recsize;
  1383. unsigned short firstrlesize;
  1384. unsigned short numrows;
  1385. unsigned short rowofs[0x3fff];
  1386. inline unsigned short hsize() { return (5+numrows)*sizeof(short); }
  1387. };
  1388. #define MIN_RRDHEADER_SIZE (5*sizeof(short))
  1389. class jlib_decl CRandRDiffCompressor : public ICompressor, public CInterface
  1390. {
  1391. size32_t inlen;
  1392. size32_t bufalloc;
  1393. size32_t max;
  1394. void *outbuf;
  1395. RRDheader *header;
  1396. // assumes a transaction is a record
  1397. MemoryBuffer rowbuf;
  1398. MemoryBuffer diffbuf;
  1399. MemoryBuffer firstrec;
  1400. MemoryAttr firstrle;
  1401. size32_t maxdiffsize;
  1402. size32_t recsize;
  1403. size32_t compsize;
  1404. size32_t outBufStart;
  1405. MemoryBuffer *outBufMb;
  1406. void initCommon()
  1407. {
  1408. header = (RRDheader *)outbuf;
  1409. inlen = 0;
  1410. memset(header,0,MIN_RRDHEADER_SIZE);
  1411. diffbuf.clear();
  1412. firstrec.clear();
  1413. firstrle.clear();
  1414. rowbuf.clear();
  1415. }
  1416. public:
  1417. IMPLEMENT_IINTERFACE;
  1418. CRandRDiffCompressor()
  1419. {
  1420. outbuf = NULL;
  1421. header = NULL;
  1422. bufalloc = 0;
  1423. max = 0;
  1424. maxdiffsize = 0;
  1425. recsize = 0;
  1426. outBufStart = 0;
  1427. outBufMb = NULL;
  1428. }
  1429. ~CRandRDiffCompressor()
  1430. {
  1431. if (bufalloc)
  1432. free(outbuf);
  1433. }
  1434. void open(MemoryBuffer &mb, size32_t initialSize)
  1435. {
  1436. outBufMb = &mb;
  1437. outBufStart = mb.length();
  1438. outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
  1439. bufalloc = 0;
  1440. initCommon();
  1441. }
  1442. void open(void *buf,size32_t _max)
  1443. {
  1444. max = _max;
  1445. if (buf) {
  1446. if (bufalloc) {
  1447. free(outbuf);
  1448. }
  1449. bufalloc = 0;
  1450. outbuf = buf;
  1451. }
  1452. else if (max>bufalloc) {
  1453. if (bufalloc)
  1454. free(outbuf);
  1455. bufalloc = max;
  1456. outbuf = malloc(bufalloc);
  1457. }
  1458. outBufMb = NULL;
  1459. ASSERT(max>MIN_RRDHEADER_SIZE+sizeof(unsigned short)+3); // hopefully a lot bigger!
  1460. initCommon();
  1461. }
  1462. void close()
  1463. {
  1464. header->rowofs[0] = (unsigned short)diffbuf.length();
  1465. ASSERT((size32_t)(header->totsize+header->firstrlesize)<=max);
  1466. unsigned short hofs = header->hsize();
  1467. ASSERT(header->totsize==hofs+diffbuf.length());
  1468. if (outBufMb)
  1469. {
  1470. outbuf = (byte *)outBufMb->ensureCapacity(header->totsize+header->firstrlesize);
  1471. outBufMb->setWritePos(outBufStart+header->totsize+header->firstrlesize);
  1472. outBufMb = NULL;
  1473. }
  1474. byte *out = (byte *)outbuf+hofs;
  1475. memcpy(out,diffbuf.toByteArray(),diffbuf.length());
  1476. out += diffbuf.length();
  1477. diffbuf.clear();
  1478. memcpy(out,firstrle.bufferBase(),header->firstrlesize);
  1479. header->totsize += header->firstrlesize;
  1480. firstrle.clear();
  1481. firstrec.clear();
  1482. header->flag = 0xffff;
  1483. // adjust offsets
  1484. unsigned i = header->numrows;
  1485. while (i--)
  1486. header->rowofs[i] += hofs;
  1487. }
  1488. inline size32_t maxcompsize(size32_t s) { return s+((s+254)/255)*2; }
  1489. size32_t write(const void *buf,size32_t buflen)
  1490. {
  1491. // assumes a transaction is a row and at least one row fits in
  1492. unsigned nr = header->numrows;
  1493. if (nr) {
  1494. rowbuf.append(buflen,buf);
  1495. if (rowbuf.length()==recsize) { // because no incremental diffcomp do here
  1496. size32_t sz = diffbuf.length();
  1497. compsize = DiffCompress2(rowbuf.toByteArray(),diffbuf.reserve(maxdiffsize),firstrec.toByteArray(),recsize);
  1498. if (header->totsize+sizeof(short)+compsize+header->firstrlesize>max) {
  1499. diffbuf.setLength(sz);
  1500. return 0;
  1501. }
  1502. header->rowofs[nr] = (unsigned short)sz; // will need to adjust later
  1503. diffbuf.setLength(sz+compsize);
  1504. }
  1505. }
  1506. else
  1507. firstrec.append(buflen,buf);
  1508. return buflen;
  1509. }
  1510. void startblock()
  1511. {
  1512. rowbuf.clear();
  1513. }
  1514. void commitblock()
  1515. {
  1516. unsigned nr = header->numrows;
  1517. if (nr) {
  1518. if (recsize!=rowbuf.length())
  1519. throw MakeStringException(-1,"CRandDiffCompressor used with variable sized row");
  1520. rowbuf.clear();
  1521. header->numrows++;
  1522. header->totsize += (unsigned short)compsize+sizeof(unsigned short);
  1523. }
  1524. else {
  1525. header->numrows = 1;
  1526. header->totsize = header->hsize(); // don't add in rle size yet
  1527. recsize = firstrec.length();
  1528. header->recsize = (unsigned short)recsize;
  1529. maxdiffsize = maxcompsize(recsize);
  1530. size32_t sz = RLECompress(firstrle.allocate(recsize+2),firstrec.toByteArray(),recsize);
  1531. header->firstrlesize = (unsigned short)sz;
  1532. }
  1533. inlen += recsize;
  1534. }
  1535. void *bufptr() { return outbuf;}
  1536. size32_t buflen() { return header->totsize;}
  1537. };
  1538. class jlib_decl CRandRDiffExpander : public IRandRowExpander, public CInterface
  1539. {
  1540. MemoryAttr buf;
  1541. const RRDheader *header;
  1542. size32_t recsize;
  1543. unsigned numrows;
  1544. byte *firstrow;
  1545. inline byte *rowptr(unsigned idx) const { return (byte *)header+header->rowofs[idx]; }
  1546. public:
  1547. IMPLEMENT_IINTERFACE;
  1548. CRandRDiffExpander()
  1549. {
  1550. recsize = 0;
  1551. numrows = 0;
  1552. header = NULL;
  1553. }
  1554. ~CRandRDiffExpander()
  1555. {
  1556. }
  1557. bool init(const void *blk,bool copy)
  1558. {
  1559. // if copy then use new block with first row at end
  1560. header=(const RRDheader *)blk;
  1561. if (header->flag!=0xffff) // flag
  1562. return false;
  1563. recsize = header->recsize;
  1564. numrows = header->numrows;
  1565. RRDheader *headercopy;
  1566. if (copy) {
  1567. size32_t sz = header->totsize-header->firstrlesize+recsize;
  1568. headercopy = (RRDheader *)buf.allocate(sz);
  1569. memcpy(headercopy,blk,header->totsize-header->firstrlesize);
  1570. firstrow = (byte *)headercopy+headercopy->rowofs[0];
  1571. headercopy->totsize = (unsigned short)sz;
  1572. }
  1573. else
  1574. firstrow = (byte *)buf.allocate(recsize);
  1575. RLEExpand(firstrow,(const byte *)header+header->rowofs[0],recsize);
  1576. if (copy)
  1577. header = headercopy;
  1578. return true;
  1579. }
  1580. bool expandRow(void *target,unsigned idx) const
  1581. {
  1582. if (idx>=numrows)
  1583. return false;
  1584. if (idx)
  1585. DiffExpand(rowptr(idx),target,firstrow,recsize);
  1586. else
  1587. memcpy(target, firstrow, recsize);
  1588. return true;
  1589. }
  1590. size32_t expandRow(void *target,unsigned idx,size32_t ofs,size32_t sz) const
  1591. {
  1592. if ((idx>=numrows)||(ofs>=recsize))
  1593. return 0;
  1594. if (sz>recsize-ofs)
  1595. sz = recsize-ofs;
  1596. if (idx==0)
  1597. memcpy(target,firstrow+ofs,sz);
  1598. else if ((ofs==0)&&(sz>=recsize))
  1599. DiffExpand(rowptr(idx),target,firstrow,recsize);
  1600. else {
  1601. CDiffExpand exp;
  1602. exp.init(rowptr(idx),firstrow,recsize);
  1603. exp.skip(ofs);
  1604. exp.cpy(target,sz);
  1605. }
  1606. return sz;
  1607. }
  1608. int cmpRow(const void *target,unsigned idx,size32_t ofs=0,size32_t sz=(size32_t)-1) const
  1609. {
  1610. if ((idx>=numrows)||(ofs>=recsize))
  1611. return -1;
  1612. if (sz>=recsize-ofs)
  1613. sz = recsize-ofs;
  1614. if (idx==0)
  1615. return memcmp(target,firstrow+ofs,sz);
  1616. CDiffExpand exp;
  1617. exp.init(rowptr(idx),firstrow,recsize);
  1618. exp.skip(ofs);
  1619. return exp.cmp(target,sz);
  1620. }
  1621. size32_t rowSize() const { return recsize; }
  1622. unsigned numRows() const { return numrows; }
  1623. const byte *firstRow() const { return firstrow; }
  1624. };
  1625. ICompressor *createRandRDiffCompressor()
  1626. {
  1627. return new CRandRDiffCompressor;
  1628. }
  1629. IRandRowExpander *createRandRDiffExpander()
  1630. {
  1631. return new CRandRDiffExpander;
  1632. }
  1633. // =============================================================================
  1634. // Compressed files
  1635. typedef enum { ICFcreate, ICFread, ICFappend } ICFmode;
  1636. static const __int64 COMPRESSEDFILEFLAG = I64C(0xc0528ce99f10da55);
  1637. #define COMPRESSEDFILEBLOCKSIZE (0x10000)
  1638. static const __int64 FASTCOMPRESSEDFILEFLAG = I64C(0xc1518de99f10da55);
  1639. static const __int64 LZ4COMPRESSEDFILEFLAG = I64C(0xc1200e0b71321c73);
  1640. #pragma pack(push,1)
  1641. struct CompressedFileTrailer
  1642. {
  1643. unsigned datacrc;
  1644. offset_t expandedSize;
  1645. offset_t indexPos; // end of blocks
  1646. size32_t blockSize;
  1647. size32_t recordSize; // 0 is lzw or fastlz or lz4
  1648. __int64 compressedType;
  1649. unsigned crc; // must be last
  1650. unsigned numBlocks() { return (unsigned)((indexPos+blockSize-1)/blockSize); }
  1651. unsigned method()
  1652. {
  1653. if (compressedType==FASTCOMPRESSEDFILEFLAG)
  1654. return COMPRESS_METHOD_FASTLZ;
  1655. if (compressedType==LZ4COMPRESSEDFILEFLAG)
  1656. return COMPRESS_METHOD_LZ4;
  1657. if (compressedType==COMPRESSEDFILEFLAG)
  1658. {
  1659. if (recordSize)
  1660. return COMPRESS_METHOD_ROWDIF;
  1661. else
  1662. return COMPRESS_METHOD_LZW;
  1663. }
  1664. return 0;
  1665. }
  1666. void setDetails(IPropertyTree &tree)
  1667. {
  1668. tree.setPropInt("@datacrc",datacrc);
  1669. tree.setPropInt64("@expandedSize",expandedSize);
  1670. tree.setPropInt64("@indexPos",indexPos);
  1671. tree.setPropInt("@blockSize",blockSize);
  1672. tree.setPropInt("@recordSize",recordSize); // 0 is lzw or fastlz or lz4
  1673. tree.setPropInt64("@compressedType",compressedType);
  1674. tree.setPropInt("@method",method());
  1675. tree.setPropInt("@crc",crc);
  1676. tree.setPropInt("@numblocks",(unsigned)((indexPos+blockSize-1)/blockSize));
  1677. }
  1678. };
  1679. // backward compatibility - will remove at some point
  1680. struct WinCompressedFileTrailer
  1681. {
  1682. unsigned datacrc;
  1683. unsigned filler1;
  1684. offset_t expandedSize;
  1685. offset_t indexPos; // end of blocks
  1686. size32_t blockSize;
  1687. size32_t recordSize; // 0 is lzw or fastlz or lz4
  1688. __int64 compressedType;
  1689. unsigned crc; // must be last
  1690. unsigned filler2;
  1691. void translate(CompressedFileTrailer &out)
  1692. {
  1693. if (compressedType==COMPRESSEDFILEFLAG) {
  1694. out.datacrc = datacrc;
  1695. out.expandedSize = expandedSize;
  1696. out.indexPos = indexPos;
  1697. out.blockSize = blockSize;
  1698. out.recordSize = recordSize;
  1699. out.compressedType = compressedType;
  1700. out.crc = crc;
  1701. }
  1702. else {
  1703. memcpy(&out,(byte *)this+sizeof(WinCompressedFileTrailer)-sizeof(CompressedFileTrailer),sizeof(CompressedFileTrailer));
  1704. }
  1705. }
  1706. };
  1707. #pragma pack(pop)
  1708. class CCompressedFile : implements ICompressedFileIO, public CInterface
  1709. {
  1710. Linked<IFileIO> fileio;
  1711. Linked<IMemoryMappedFile> mmfile;
  1712. CompressedFileTrailer trailer;
  1713. unsigned curblocknum;
  1714. offset_t curblockpos; // logical pos (reading only)
  1715. MemoryBuffer curblockbuf; // expanded buffer when reading
  1716. MemoryAttr compblk;
  1717. byte *compblkptr;
  1718. size32_t compblklen;
  1719. MemoryAttr compbuf;
  1720. MemoryBuffer indexbuf; // non-empty once index read
  1721. ICFmode mode;
  1722. CriticalSection crit;
  1723. MemoryBuffer overflow; // where partial row written
  1724. MemoryAttr prevrowbuf;
  1725. bool setcrc;
  1726. bool writeException;
  1727. Owned<ICompressor> compressor;
  1728. Owned<IExpander> expander;
  1729. unsigned compMethod;
  1730. unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); }
  1731. unsigned lookupIndex(offset_t pos,offset_t &curpos,size32_t &expsize)
  1732. {
  1733. // NB index starts at block 1 (and has size as last entry)
  1734. const offset_t *index;
  1735. unsigned a = 0;
  1736. unsigned b = indexNum();
  1737. index = (const offset_t *)indexbuf.toByteArray();
  1738. while (b>a) {
  1739. unsigned m = a+(b-a)/2;
  1740. __int64 dif = (__int64)pos-index[m];
  1741. if (dif==0) {
  1742. b = m+1;
  1743. a = b;
  1744. }
  1745. else if (dif>0)
  1746. a = m+1;
  1747. else
  1748. b = m;
  1749. }
  1750. curpos=b?index[b-1]:0;
  1751. expsize = (size32_t)(index[b]-curpos);
  1752. return b;
  1753. }
  1754. void getblock(offset_t pos)
  1755. {
  1756. curblockbuf.clear();
  1757. size32_t expsize;
  1758. curblocknum = lookupIndex(pos,curblockpos,expsize);
  1759. size32_t toread = trailer.blockSize;
  1760. offset_t p = (offset_t)curblocknum*toread;
  1761. assertex(p<=trailer.indexPos);
  1762. if (trailer.indexPos-p<(offset_t)toread)
  1763. toread = (size32_t)(trailer.indexPos-p);
  1764. if (!toread)
  1765. return;
  1766. if (fileio) {
  1767. MemoryAttr comp;
  1768. void *b=comp.allocate(toread);
  1769. size32_t r = fileio->read(p,toread,b);
  1770. assertex(r==toread);
  1771. expand(b,curblockbuf,expsize);
  1772. }
  1773. else { // memory mapped
  1774. assertex((memsize_t)p==p);
  1775. expand(mmfile->base()+(memsize_t)p,curblockbuf,expsize);
  1776. }
  1777. }
  1778. void checkedwrite(offset_t pos, size32_t len, const void * data)
  1779. {
  1780. size32_t ret = fileio->write(pos,len,data);
  1781. if (ret!=len)
  1782. throw MakeStringException(DISK_FULL_EXCEPTION_CODE,"CCompressedFile::checkedwrite");
  1783. if (setcrc)
  1784. trailer.crc = crc32((const char *)data,len,trailer.crc);
  1785. }
  1786. void flush()
  1787. {
  1788. try
  1789. {
  1790. curblocknum++;
  1791. indexbuf.append((unsigned __int64) trailer.expandedSize-overflow.length());
  1792. offset_t p = ((offset_t)curblocknum)*((offset_t)trailer.blockSize);
  1793. if (trailer.recordSize==0) {
  1794. compressor->close();
  1795. compblklen = compressor->buflen();
  1796. }
  1797. if (compblklen) {
  1798. if (p>trailer.indexPos) { // fill gap
  1799. MemoryAttr fill;
  1800. size32_t fl = (size32_t)(p-trailer.indexPos);
  1801. memset(fill.allocate(fl),0xff,fl);
  1802. checkedwrite(trailer.indexPos,fl,fill.get());
  1803. }
  1804. checkedwrite(p,compblklen,compblkptr);
  1805. p += compblklen;
  1806. compblklen = 0;
  1807. }
  1808. trailer.indexPos = p;
  1809. if (trailer.recordSize==0) {
  1810. compressor->open(compblkptr, trailer.blockSize);
  1811. }
  1812. }
  1813. catch (IException *e)
  1814. {
  1815. writeException = true;
  1816. EXCLOG(e, "CCompressedFile::flush");
  1817. throw;
  1818. }
  1819. }
  1820. virtual void expand(const void *compbuf,MemoryBuffer &expbuf,size32_t expsize)
  1821. {
  1822. size32_t rs = trailer.recordSize;
  1823. if (rs) { // diff expand
  1824. const byte *src = (const byte *)compbuf;
  1825. byte *dst = (byte *)expbuf.reserve(expsize);
  1826. if (expsize) {
  1827. assertex(expsize>=rs);
  1828. memcpy(dst,src,rs);
  1829. dst += rs;
  1830. src += rs;
  1831. expsize -= rs;
  1832. while (expsize) {
  1833. assertex(expsize>=rs);
  1834. src += DiffExpand(src, dst, dst-rs, rs);
  1835. expsize -= rs;
  1836. dst += rs;
  1837. }
  1838. }
  1839. }
  1840. else { // lzw or fastlz or lz4
  1841. assertex(expander.get());
  1842. size32_t exp = expander->init(compbuf);
  1843. if (exp!=expsize) {
  1844. throw MakeStringException(-1,"Compressed file format failure(%d,%d) - Encrypted?",exp,expsize);
  1845. }
  1846. expander->expand(expbuf.reserve(exp));
  1847. }
  1848. }
  1849. bool compressrow(const void *src,size32_t rs)
  1850. {
  1851. bool ret = true;
  1852. if (compblklen==0) {
  1853. memcpy(prevrowbuf.bufferBase(),src,rs);
  1854. memcpy(compblkptr,src,rs);
  1855. compblklen = rs;
  1856. }
  1857. else {
  1858. size32_t len = DiffCompress(src,compblkptr+compblklen,prevrowbuf.bufferBase(),rs);
  1859. if (compblklen+len>trailer.blockSize)
  1860. ret = false;
  1861. else
  1862. compblklen += len;
  1863. }
  1864. return ret;
  1865. }
  1866. size32_t compress(const void *expbuf,size32_t len) // iff return!=len then overflowed
  1867. {
  1868. const byte *src = (const byte *)expbuf;
  1869. size32_t rs = trailer.recordSize;
  1870. if (rs) { // diff compress
  1871. if (overflow.length()) {
  1872. assertex(overflow.length()<=rs);
  1873. size32_t left = rs-overflow.length();
  1874. if (left>len)
  1875. left = len;
  1876. overflow.append(left,expbuf);
  1877. len -= left;
  1878. if (overflow.length()==rs) {
  1879. if (!compressrow(overflow.toByteArray(),rs)) { // this is nasty case
  1880. overflow.setLength(rs-left);
  1881. return (size32_t)(src-(const byte *)expbuf);
  1882. }
  1883. overflow.clear();
  1884. }
  1885. src += left;
  1886. }
  1887. while (len>=rs) {
  1888. if (!compressrow(src,rs))
  1889. return (size32_t)(src-(const byte *)expbuf);
  1890. len -= rs;
  1891. src += rs;
  1892. }
  1893. if (len) {
  1894. overflow.append(len,src);
  1895. src += len;
  1896. }
  1897. }
  1898. else // lzw or fastlz or lz4
  1899. {
  1900. src += compressor->write(src, len);
  1901. }
  1902. return (size32_t)(src-(const byte *)expbuf);
  1903. }
  1904. public:
  1905. IMPLEMENT_IINTERFACE;
  1906. CCompressedFile(IFileIO *_fileio,IMemoryMappedFile *_mmfile,CompressedFileTrailer &_trailer,ICFmode _mode, bool _setcrc,ICompressor *_compressor,IExpander *_expander, unsigned _compMethod)
  1907. : fileio(_fileio), mmfile(_mmfile)
  1908. {
  1909. compressor.set(_compressor);
  1910. expander.set(_expander);
  1911. setcrc = _setcrc;
  1912. writeException = false;
  1913. memcpy(&trailer,&_trailer,sizeof(trailer));
  1914. mode = _mode;
  1915. curblockpos = 0;
  1916. curblocknum = (unsigned)-1; // relies on wrap
  1917. compMethod = _compMethod;
  1918. if (mode!=ICFread) {
  1919. if (!_fileio&&_mmfile)
  1920. throw MakeStringException(-1,"Compressed Write not supported on memory mapped files");
  1921. if (trailer.recordSize) {
  1922. if ((trailer.recordSize>trailer.blockSize/4) || // just too big
  1923. (trailer.recordSize<10)) // or too small
  1924. trailer.recordSize = 0;
  1925. else
  1926. prevrowbuf.allocate(trailer.recordSize);
  1927. }
  1928. compblkptr = (byte *)compblk.allocate(trailer.blockSize+trailer.recordSize*2+16); // over estimate!
  1929. compblklen = 0;
  1930. if (trailer.recordSize==0) {
  1931. if (!compressor)
  1932. {
  1933. if (compMethod == COMPRESS_METHOD_FASTLZ)
  1934. compressor.setown(createFastLZCompressor());
  1935. else if (compMethod == COMPRESS_METHOD_LZ4)
  1936. compressor.setown(createLZ4Compressor());
  1937. else // fallback
  1938. {
  1939. compMethod = COMPRESS_METHOD_LZW;
  1940. trailer.compressedType = COMPRESSEDFILEFLAG;
  1941. compressor.setown(createLZWCompressor(true));
  1942. }
  1943. }
  1944. compressor->open(compblkptr, trailer.blockSize);
  1945. }
  1946. }
  1947. if (mode!=ICFcreate) {
  1948. unsigned nb = trailer.numBlocks();
  1949. size32_t toread = sizeof(offset_t)*nb;
  1950. if (fileio) {
  1951. size32_t r = fileio->read(trailer.indexPos,toread,indexbuf.reserveTruncate(toread));
  1952. assertex(r==toread);
  1953. }
  1954. else {
  1955. assertex((memsize_t)trailer.indexPos==trailer.indexPos);
  1956. memcpy(indexbuf.reserveTruncate(toread),mmfile->base()+(memsize_t)trailer.indexPos,toread);
  1957. }
  1958. if (mode==ICFappend) {
  1959. curblocknum = nb-1;
  1960. if (setcrc) {
  1961. trailer.crc = trailer.datacrc;
  1962. trailer.datacrc = ~0U;
  1963. }
  1964. }
  1965. if (trailer.recordSize==0) {
  1966. if (!expander) {
  1967. if (compMethod == COMPRESS_METHOD_FASTLZ)
  1968. expander.setown(createFastLZExpander());
  1969. else if (compMethod == COMPRESS_METHOD_LZ4)
  1970. expander.setown(createLZ4Expander());
  1971. else // fallback
  1972. {
  1973. compMethod = COMPRESS_METHOD_LZW;
  1974. expander.setown(createLZWExpander(true));
  1975. }
  1976. }
  1977. }
  1978. }
  1979. }
  1980. virtual ~CCompressedFile()
  1981. {
  1982. if (!writeException)
  1983. {
  1984. try { close(); }
  1985. catch (IException *e)
  1986. {
  1987. EXCLOG(e, "~CCompressedFile");
  1988. e->Release();
  1989. }
  1990. }
  1991. }
  1992. virtual offset_t size()
  1993. {
  1994. CriticalBlock block(crit);
  1995. return trailer.expandedSize;
  1996. }
  1997. virtual size32_t read(offset_t pos, size32_t len, void * data)
  1998. {
  1999. CriticalBlock block(crit);
  2000. assertex(mode==ICFread);
  2001. size32_t ret=0;
  2002. while (pos<trailer.expandedSize) {
  2003. if ((offset_t)len>trailer.expandedSize-pos)
  2004. len = (size32_t)(trailer.expandedSize-pos);
  2005. if ((pos>=curblockpos)&&(pos<curblockpos+curblockbuf.length())) { // see if in current buffer
  2006. size32_t tocopy = (size32_t)(curblockpos+curblockbuf.length()-pos);
  2007. if (tocopy>len)
  2008. tocopy = len;
  2009. memcpy(data,curblockbuf.toByteArray()+(pos-curblockpos),tocopy);
  2010. ret += tocopy;
  2011. len -= tocopy;
  2012. data = (byte *)data+tocopy;
  2013. pos += tocopy;
  2014. }
  2015. if (len==0)
  2016. break;
  2017. getblock(pos);
  2018. }
  2019. return ret;
  2020. }
  2021. size32_t write(offset_t pos, size32_t len, const void * data)
  2022. {
  2023. CriticalBlock block(crit);
  2024. assertex(mode!=ICFread);
  2025. size32_t ret = 0;
  2026. for (;;) {
  2027. if (pos!=trailer.expandedSize)
  2028. throw MakeStringException(-1,"sequential writes only on compressed file");
  2029. size32_t done = compress(data,len);
  2030. trailer.expandedSize += done;
  2031. len -= done;
  2032. ret += done;
  2033. pos += done;
  2034. data = (const byte *)data+done;
  2035. if (len==0)
  2036. break;
  2037. flush();
  2038. }
  2039. return ret;
  2040. }
  2041. virtual unsigned __int64 getStatistic(StatisticKind kind)
  2042. {
  2043. return fileio->getStatistic(kind);
  2044. }
  2045. void setSize(offset_t size) { UNIMPLEMENTED; }
  2046. offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; }
  2047. void close()
  2048. {
  2049. CriticalBlock block(crit);
  2050. if (mode!=ICFread) {
  2051. if (overflow.length()) {
  2052. unsigned ol = overflow.length();
  2053. overflow.clear();
  2054. throw MakeStringException(-1,"Partial row written at end of file %d of %d",ol,trailer.recordSize);
  2055. }
  2056. flush();
  2057. trailer.datacrc = trailer.crc;
  2058. if (setcrc) {
  2059. indexbuf.append(sizeof(trailer)-sizeof(trailer.crc),&trailer);
  2060. trailer.crc = crc32((const char *)indexbuf.toByteArray(),
  2061. indexbuf.length(),trailer.crc);
  2062. indexbuf.append(trailer.crc);
  2063. }
  2064. else {
  2065. trailer.datacrc = 0;
  2066. trailer.crc = ~0U;
  2067. indexbuf.append(sizeof(trailer),&trailer);
  2068. }
  2069. checkedwrite(trailer.indexPos,indexbuf.length(),indexbuf.toByteArray());
  2070. indexbuf.clear();
  2071. }
  2072. mode = ICFread;
  2073. curblockpos = 0;
  2074. curblocknum = (unsigned)-1; // relies on wrap
  2075. }
  2076. unsigned dataCRC()
  2077. {
  2078. if (mode==ICFread)
  2079. return trailer.datacrc;
  2080. return trailer.crc;
  2081. }
  2082. size32_t recordSize()
  2083. {
  2084. return trailer.recordSize;
  2085. }
  2086. size32_t blockSize()
  2087. {
  2088. return trailer.blockSize;
  2089. }
  2090. void setBlockSize(size32_t size)
  2091. {
  2092. trailer.blockSize = size;
  2093. compressor->close();
  2094. compressor->open(compblkptr, size);
  2095. }
  2096. bool readMode()
  2097. {
  2098. return (mode==ICFread);
  2099. }
  2100. unsigned method()
  2101. {
  2102. return trailer.method();
  2103. }
  2104. };
  2105. static unsigned getCompressedMethod(__int64 compressedType)
  2106. {
  2107. if (compressedType == COMPRESSEDFILEFLAG)
  2108. return COMPRESS_METHOD_LZW;
  2109. else if (compressedType == FASTCOMPRESSEDFILEFLAG)
  2110. return COMPRESS_METHOD_FASTLZ;
  2111. else if (compressedType == LZ4COMPRESSEDFILEFLAG)
  2112. return COMPRESS_METHOD_LZ4;
  2113. return 0;
  2114. }
  2115. static bool isCompressedType(__int64 compressedType)
  2116. {
  2117. return 0 != getCompressedMethod(compressedType);
  2118. }
  2119. bool isCompressedFile(IFileIO *iFileIO, CompressedFileTrailer *trailer=nullptr)
  2120. {
  2121. if (iFileIO)
  2122. {
  2123. offset_t fsize = iFileIO->size();
  2124. if (fsize>=sizeof(WinCompressedFileTrailer)) // thats 8 bytes bigger but I think doesn't matter
  2125. {
  2126. WinCompressedFileTrailer wintrailer;
  2127. CompressedFileTrailer _trailer;
  2128. if (!trailer)
  2129. trailer = &_trailer;
  2130. if (iFileIO->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer))
  2131. {
  2132. wintrailer.translate(*trailer);
  2133. if (isCompressedType(trailer->compressedType))
  2134. return true;
  2135. }
  2136. }
  2137. }
  2138. return false;
  2139. }
  2140. bool isCompressedFile(const char *filename)
  2141. {
  2142. Owned<IFile> iFile = createIFile(filename);
  2143. return isCompressedFile(iFile);
  2144. }
  2145. bool isCompressedFile(IFile *iFile)
  2146. {
  2147. Owned<IFileIO> iFileIO = iFile->open(IFOread);
  2148. return isCompressedFile(iFileIO);
  2149. }
  2150. ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expander)
  2151. {
  2152. CompressedFileTrailer trailer;
  2153. if (isCompressedFile(fileio, &trailer))
  2154. {
  2155. if (expander&&(trailer.recordSize!=0))
  2156. throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
  2157. unsigned compMethod = getCompressedMethod(trailer.compressedType);
  2158. return new CCompressedFile(fileio,NULL,trailer,ICFread,false,NULL,expander,compMethod);
  2159. }
  2160. return nullptr;
  2161. }
  2162. ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander, bool memorymapped, IFEflags extraFlags)
  2163. {
  2164. if (file)
  2165. {
  2166. if (memorymapped)
  2167. {
  2168. Owned<IMemoryMappedFile> mmfile = file->openMemoryMapped();
  2169. if (mmfile)
  2170. {
  2171. offset_t fsize = mmfile->fileSize();
  2172. if (fsize>=sizeof(WinCompressedFileTrailer)) // thats 8 bytes bigger but I think doesn't matter
  2173. {
  2174. WinCompressedFileTrailer wintrailer;
  2175. CompressedFileTrailer trailer;
  2176. memcpy(&wintrailer,mmfile->base()+fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer));
  2177. wintrailer.translate(trailer);
  2178. unsigned compMethod = getCompressedMethod(trailer.compressedType);
  2179. if (compMethod)
  2180. {
  2181. if (expander&&(trailer.recordSize!=0))
  2182. throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
  2183. return new CCompressedFile(NULL,mmfile,trailer,ICFread,false,NULL,expander,compMethod);
  2184. }
  2185. }
  2186. }
  2187. }
  2188. Owned<IFileIO> fileio = file->open(IFOread, extraFlags);
  2189. if (fileio)
  2190. return createCompressedFileReader(fileio,expander);
  2191. }
  2192. return NULL;
  2193. }
  2194. ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool _setcrc,ICompressor *compressor, unsigned _compMethod)
  2195. {
  2196. CompressedFileTrailer trailer;
  2197. offset_t fsize = fileio->size();
  2198. if (fsize)
  2199. {
  2200. for (;;)
  2201. {
  2202. if (fsize>=sizeof(WinCompressedFileTrailer)) // thats 8 bytes bigger but I think doesn't matter
  2203. {
  2204. WinCompressedFileTrailer wintrailer;
  2205. if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
  2206. wintrailer.translate(trailer);
  2207. unsigned compMethod = getCompressedMethod(trailer.compressedType);
  2208. if (compMethod)
  2209. {
  2210. // check trailer.compressedType against _compMethod
  2211. if (_compMethod != compMethod)
  2212. throw MakeStringException(-1,"Appending to file with different compression method");
  2213. if ((recordsize==trailer.recordSize)||!trailer.recordSize)
  2214. break;
  2215. throw MakeStringException(-1,"Appending to file with different record size (%d,%d)",recordsize,trailer.recordSize);
  2216. }
  2217. }
  2218. }
  2219. throw MakeStringException(-1,"Appending to file that is not compressed");
  2220. }
  2221. }
  2222. else
  2223. {
  2224. memset(&trailer,0,sizeof(trailer));
  2225. trailer.crc = ~0U;
  2226. if (_compMethod == COMPRESS_METHOD_FASTLZ)
  2227. {
  2228. trailer.compressedType = FASTCOMPRESSEDFILEFLAG;
  2229. trailer.blockSize = FASTCOMPRESSEDFILEBLOCKSIZE;
  2230. trailer.recordSize = 0;
  2231. }
  2232. else if (_compMethod == COMPRESS_METHOD_LZ4)
  2233. {
  2234. trailer.compressedType = LZ4COMPRESSEDFILEFLAG;
  2235. trailer.blockSize = LZ4COMPRESSEDFILEBLOCKSIZE;
  2236. trailer.recordSize = 0;
  2237. }
  2238. else // fallback
  2239. {
  2240. trailer.compressedType = COMPRESSEDFILEFLAG;
  2241. trailer.blockSize = COMPRESSEDFILEBLOCKSIZE;
  2242. trailer.recordSize = recordsize;
  2243. }
  2244. }
  2245. // MCK - may present compatibility issue if passing in compressor and wanting row comp
  2246. if (compressor)
  2247. trailer.recordSize = 0; // force not row compressed if compressor specified
  2248. CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,fsize?ICFappend:ICFcreate,_setcrc,compressor,NULL,_compMethod);
  2249. return cfile;
  2250. }
  2251. ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append,bool _setcrc,ICompressor *compressor, unsigned _compMethod, IFEflags extraFlags)
  2252. {
  2253. if (file) {
  2254. if (append&&!file->exists())
  2255. append = false;
  2256. Owned<IFileIO> fileio = file->open(append?IFOreadwrite:IFOcreate, extraFlags);
  2257. if (fileio)
  2258. return createCompressedFileWriter(fileio,recordsize,_setcrc,compressor,_compMethod);
  2259. }
  2260. return NULL;
  2261. }
  2262. //===================================================================================
  2263. #define AES_PADDING_SIZE 32
  2264. class CAESCompressor : implements ICompressor, public CInterface
  2265. {
  2266. Owned<ICompressor> comp; // base compressor
  2267. MemoryBuffer compattr; // compressed buffer
  2268. MemoryAttr outattr; // compressed and encrypted (if outblk NULL)
  2269. void *outbuf; // dest
  2270. size32_t outlen;
  2271. size32_t outmax;
  2272. MemoryAttr key;
  2273. MemoryBuffer *outBufMb;
  2274. public:
  2275. IMPLEMENT_IINTERFACE;
  2276. CAESCompressor(const void *_key, unsigned _keylen)
  2277. : key(_keylen,_key)
  2278. {
  2279. comp.setown(createLZWCompressor(true));
  2280. outlen = 0;
  2281. outmax = 0;
  2282. outBufMb = NULL;
  2283. }
  2284. void open(MemoryBuffer &mb, size32_t initialSize)
  2285. {
  2286. outlen = 0;
  2287. outmax = initialSize;
  2288. outbuf = NULL;
  2289. outBufMb = &mb;
  2290. comp->open(compattr, initialSize);
  2291. }
  2292. void open(void *blk,size32_t blksize)
  2293. {
  2294. outlen = 0;
  2295. outmax = blksize;
  2296. if (blk)
  2297. outbuf = blk;
  2298. else
  2299. outbuf = outattr.allocate(blksize);
  2300. outBufMb = NULL;
  2301. size32_t subsz = blksize-AES_PADDING_SIZE-sizeof(size32_t);
  2302. comp->open(compattr.reserveTruncate(subsz),subsz);
  2303. }
  2304. void close()
  2305. {
  2306. comp->close();
  2307. // now encrypt
  2308. MemoryBuffer buf;
  2309. aesEncrypt(key.get(), key.length(), comp->bufptr(), comp->buflen(), buf);
  2310. outlen = buf.length();
  2311. if (outBufMb)
  2312. {
  2313. outmax = sizeof(size32_t)+outlen;
  2314. outbuf = outBufMb->reserveTruncate(outmax);
  2315. outBufMb = NULL;
  2316. }
  2317. memcpy(outbuf,&outlen,sizeof(size32_t));
  2318. outlen += sizeof(size32_t);
  2319. assertex(outlen<=outmax);
  2320. memcpy((byte *)outbuf+sizeof(size32_t),buf.bufferBase(),buf.length());
  2321. outmax = 0;
  2322. }
  2323. size32_t write(const void *buf,size32_t len)
  2324. {
  2325. return comp->write(buf,len);
  2326. }
  2327. void * bufptr()
  2328. {
  2329. assertex(0 == outmax); // i.e. closed
  2330. return outbuf;
  2331. }
  2332. size32_t buflen()
  2333. {
  2334. assertex(0 == outmax); // i.e. closed
  2335. return outlen;
  2336. }
  2337. void startblock()
  2338. {
  2339. comp->startblock();
  2340. }
  2341. void commitblock()
  2342. {
  2343. comp->commitblock();
  2344. }
  2345. };
  2346. class CAESExpander : implements IExpander, public CInterface
  2347. {
  2348. Owned<IExpander> exp; // base expander
  2349. MemoryBuffer compbuf;
  2350. MemoryAttr key;
  2351. public:
  2352. IMPLEMENT_IINTERFACE;
  2353. CAESExpander(const void *_key, unsigned _keylen)
  2354. : key(_keylen,_key)
  2355. {
  2356. exp.setown(createLZWExpander(true));
  2357. }
  2358. size32_t init(const void *blk)
  2359. {
  2360. // first decrypt
  2361. const byte *p = (const byte *)blk;
  2362. size32_t l = *(const size32_t *)p;
  2363. aesDecrypt(key.get(),key.length(),p+sizeof(size32_t),l,compbuf);
  2364. return exp->init(compbuf.bufferBase());
  2365. }
  2366. void expand(void *target)
  2367. {
  2368. exp->expand(target);
  2369. }
  2370. virtual void * bufptr()
  2371. {
  2372. return exp->bufptr();
  2373. }
  2374. virtual size32_t buflen()
  2375. {
  2376. return exp->buflen();
  2377. }
  2378. };
  2379. ICompressor *createAESCompressor(const void *key, unsigned keylen)
  2380. {
  2381. return new CAESCompressor(key,keylen);
  2382. }
  2383. IExpander *createAESExpander(const void *key, unsigned keylen)
  2384. {
  2385. return new CAESExpander(key,keylen);
  2386. }
  2387. #define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (8 - (n))))
  2388. inline void padKey32(byte *keyout,size32_t len, const byte *key)
  2389. {
  2390. if (len==0)
  2391. memset(keyout,0xcc,32);
  2392. else if (len<=32) {
  2393. for (unsigned i=0;i<32;i++)
  2394. keyout[i] = (i<len)?key[i%len]:ROTATE_LEFT(key[i%len],i/len);
  2395. }
  2396. else {
  2397. memcpy(keyout,key,32);
  2398. // xor excess rotated
  2399. for (unsigned i=32;i<len;i++)
  2400. keyout[i%32] ^= ROTATE_LEFT(key[i],(i/8)%8);
  2401. }
  2402. }
  2403. ICompressor *createAESCompressor256(size32_t len, const void *key)
  2404. {
  2405. byte k[32];
  2406. padKey32(k,len,(const byte *)key);
  2407. return new CAESCompressor(k,32);
  2408. }
  2409. IExpander *createAESExpander256(size32_t len, const void *key)
  2410. {
  2411. byte k[32];
  2412. padKey32(k,len,(const byte *)key);
  2413. return new CAESExpander(k,32);
  2414. }
  2415. IPropertyTree *getBlockedFileDetails(IFile *file)
  2416. {
  2417. Owned<IPropertyTree> tree = createPTree("BlockedFile");
  2418. Owned<IFileIO> fileio = file?file->open(IFOread):NULL;
  2419. if (fileio) {
  2420. offset_t fsize = fileio->size();
  2421. tree->setPropInt64("@size",fsize);
  2422. if (fsize>=sizeof(WinCompressedFileTrailer)) { // thats 8 bytes bigger but I think doesn't matter
  2423. WinCompressedFileTrailer wintrailer;
  2424. CompressedFileTrailer trailer;
  2425. if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
  2426. wintrailer.translate(trailer);
  2427. if (isCompressedType(trailer.compressedType))
  2428. {
  2429. trailer.setDetails(*tree);
  2430. unsigned nb = trailer.numBlocks();
  2431. MemoryAttr indexbuf;
  2432. size32_t toread = sizeof(offset_t)*nb;
  2433. size32_t r = fileio->read(trailer.indexPos,toread,indexbuf.allocate(toread));
  2434. if (r&&(r==toread)) {
  2435. offset_t s = 0;
  2436. const offset_t *index = (const offset_t *)indexbuf.bufferBase();
  2437. for (unsigned i=0;i<nb;i++) {
  2438. IPropertyTree * t = tree->addPropTree("Block",createPTree("Block"));
  2439. t->addPropInt64("@start",s);
  2440. offset_t p = s;
  2441. s = index[i];
  2442. t->addPropInt64("@end",s);
  2443. t->addPropInt64("@length",s-p);
  2444. }
  2445. }
  2446. return tree.getClear();
  2447. }
  2448. }
  2449. }
  2450. }
  2451. return NULL;
  2452. }
  2453. class CCompressHandlerArray : public IArrayOf<ICompressHandler>
  2454. {
  2455. public:
  2456. ICompressHandler *lookup(const char *type) const
  2457. {
  2458. ForEachItemIn(h, *this)
  2459. {
  2460. ICompressHandler &handler = item(h);
  2461. if (0 == stricmp(type, handler.queryType()))
  2462. return &handler;
  2463. }
  2464. return NULL;
  2465. }
  2466. } compressors;
  2467. bool addCompressorHandler(ICompressHandler *handler)
  2468. {
  2469. if (compressors.lookup(handler->queryType()))
  2470. {
  2471. handler->Release();
  2472. return false; // already registered
  2473. }
  2474. compressors.append(* handler);
  2475. return true;
  2476. }
  2477. bool removeCompressorHandler(ICompressHandler *handler)
  2478. {
  2479. return compressors.zap(* handler);
  2480. }
  2481. Linked<ICompressHandler> defaultCompressor;
  2482. MODULE_INIT(INIT_PRIORITY_STANDARD)
  2483. {
  2484. class CCompressHandlerBase : implements ICompressHandler, public CInterface
  2485. {
  2486. StringAttr type;
  2487. public:
  2488. IMPLEMENT_IINTERFACE;
  2489. CCompressHandlerBase(const char *_type) : type(_type) { }
  2490. // ICompressHandler
  2491. virtual const char *queryType() const { return type; }
  2492. };
  2493. class CFLZCompressHandler : public CCompressHandlerBase
  2494. {
  2495. public:
  2496. CFLZCompressHandler() : CCompressHandlerBase("FLZ") { }
  2497. virtual ICompressor *getCompressor(const char *options) { return createFastLZCompressor(); }
  2498. virtual IExpander *getExpander(const char *options) { return createFastLZExpander(); }
  2499. };
  2500. class CLZ4CompressHandler : public CCompressHandlerBase
  2501. {
  2502. public:
  2503. CLZ4CompressHandler() : CCompressHandlerBase("LZ4") { }
  2504. virtual ICompressor *getCompressor(const char *options) { return createLZ4Compressor(); }
  2505. virtual IExpander *getExpander(const char *options) { return createLZ4Expander(); }
  2506. };
  2507. class CAESCompressHandler : public CCompressHandlerBase
  2508. {
  2509. public:
  2510. CAESCompressHandler() : CCompressHandlerBase("AES") { }
  2511. virtual ICompressor *getCompressor(const char *options)
  2512. {
  2513. assertex(options);
  2514. return createAESCompressor(options, strlen(options));
  2515. }
  2516. virtual IExpander *getExpander(const char *options)
  2517. {
  2518. assertex(options);
  2519. return createAESExpander(options, strlen(options));
  2520. }
  2521. };
  2522. class CDiffCompressHandler : public CCompressHandlerBase
  2523. {
  2524. public:
  2525. CDiffCompressHandler() : CCompressHandlerBase("DIFF") { }
  2526. virtual ICompressor *getCompressor(const char *options) { return createRDiffCompressor(); }
  2527. virtual IExpander *getExpander(const char *options) { return createRDiffExpander(); }
  2528. };
  2529. class CLZWCompressHandler : public CCompressHandlerBase
  2530. {
  2531. public:
  2532. CLZWCompressHandler() : CCompressHandlerBase("LZW") { }
  2533. virtual ICompressor *getCompressor(const char *options) { return createLZWCompressor(true); }
  2534. virtual IExpander *getExpander(const char *options) { return createLZWExpander(true); }
  2535. };
  2536. ICompressHandler *flzCompressor = new CFLZCompressHandler();
  2537. addCompressorHandler(flzCompressor);
  2538. addCompressorHandler(new CAESCompressHandler());
  2539. addCompressorHandler(new CDiffCompressHandler());
  2540. addCompressorHandler(new CLZWCompressHandler());
  2541. addCompressorHandler(new CLZ4CompressHandler());
  2542. defaultCompressor.set(flzCompressor);
  2543. return true;
  2544. }
  2545. ICompressHandler *queryCompressHandler(const char *type)
  2546. {
  2547. return compressors.lookup(type);
  2548. }
  2549. void setDefaultCompressor(const char *type)
  2550. {
  2551. ICompressHandler *_defaultCompressor = queryCompressHandler(type);
  2552. if (!_defaultCompressor)
  2553. throw MakeStringException(-1, "setDefaultCompressor: '%s' compressor not registered", type);
  2554. defaultCompressor.set(_defaultCompressor);
  2555. }
  2556. ICompressHandler *queryDefaultCompressHandler()
  2557. {
  2558. return defaultCompressor;
  2559. }
  2560. ICompressor *getCompressor(const char *type, const char *options)
  2561. {
  2562. ICompressHandler *handler = compressors.lookup(type);
  2563. if (handler)
  2564. return handler->getCompressor(options);
  2565. return NULL;
  2566. }
  2567. IExpander *getExpander(const char *type, const char *options)
  2568. {
  2569. ICompressHandler *handler = compressors.lookup(type);
  2570. if (handler)
  2571. return handler->getExpander(options);
  2572. return NULL;
  2573. }
  2574. //===================================================================================
  2575. //#define TEST_ROWDIFF
  2576. #ifdef TEST_ROWDIFF
  2577. #include "jfile.hpp"
  2578. jlib_decl void testDiffComp(unsigned amount)
  2579. {
  2580. size32_t sz = 11;
  2581. Owned<IWriteSeqVar> out = createRowCompWriteSeq("test.out", sz);
  2582. { MTIME_SECTION(defaultTimer, "Comp Write");
  2583. int cpies;
  2584. for (cpies=0; cpies<amount; cpies++)
  2585. {
  2586. out->putn("Kate cccc \0A Another \0A Brother ", 3);
  2587. out->putn( "Jake Smith", 1);
  2588. out->putn( "Jake Brown", 1);
  2589. out->putn( "J Smith ", 1);
  2590. out->putn( "K Smith ", 1);
  2591. out->putn( "Kate Smith", 1);
  2592. out->putn( "Kate Brown", 1);
  2593. out->putn("Kate aaaa \0Kate bbbb ", 2);
  2594. out->putn("Kate cccc \0A Another \0A Brother ", 3);
  2595. out->putn( "A Brolley ", 1);
  2596. }
  2597. }
  2598. out.clear();
  2599. MemoryBuffer buf;
  2600. char *s = (char *) buf.reserve(sz);
  2601. { MTIME_SECTION(defaultTimer, "Comp read");
  2602. Owned<IReadSeqVar> in = createRowCompReadSeq("test.out", 0, sz);
  2603. count_t a = 0;
  2604. for (;;)
  2605. {
  2606. size32_t tmpSz;
  2607. if (!in->get(sz, s, tmpSz))
  2608. break;
  2609. a++;
  2610. // DBGLOG("Entry: %s", s);
  2611. }
  2612. DBGLOG("read: %d", a);
  2613. }
  2614. { MTIME_SECTION(defaultTimer, "Comp read async std");
  2615. Owned<IFile> iFile = createIFile("test.out");
  2616. Owned<IFileAsyncIO> iFileIO = iFile->openAsync(IFOread);
  2617. Owned<IFileIOStream> iFileIOStream = createBufferedAsyncIOStream(iFileIO);
  2618. Owned<IReadSeqVar> in = createRowCompReadSeq(*iFileIOStream, 0, sz);
  2619. count_t a = 0;
  2620. for (;;)
  2621. {
  2622. size32_t tmpSz;
  2623. if (!in->get(sz, s, tmpSz))
  2624. break;
  2625. a++;
  2626. // DBGLOG("Entry: %s", s);
  2627. }
  2628. DBGLOG("async std read: %d", a);
  2629. }
  2630. { MTIME_SECTION(defaultTimer, "Comp read async");
  2631. Owned<IReadSeqVar> in = createRowCompReadSeq("test.out", 0, sz, -1, true);
  2632. count_t a = 0;
  2633. for (;;)
  2634. {
  2635. size32_t tmpSz;
  2636. if (!in->get(sz, s, tmpSz))
  2637. break;
  2638. a++;
  2639. // DBGLOG("Entry: %s", s);
  2640. }
  2641. DBGLOG("async read: %d", a);
  2642. }
  2643. }
  2644. #endif