keydiff.cpp 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlzw.hpp"
  15. #include "jlzma.hpp"
  16. #include "jexcept.hpp"
  17. #include "jhtree.hpp"
  18. #include "ctfile.hpp"
  19. #include "keybuild.hpp"
  20. #include "limits.h"
  21. #include "keydiff.hpp"
  22. #define SMALL_ENOUGH_RATIO 20
  23. #define KEYDIFFSIG "KEYDIFF"
  24. #define LZMA_FLAG (0x80000000)
  25. class RowBuffer
  26. {
  27. public:
  28. RowBuffer() : rowsize(0), thisrowsize(0), buffsize(0), buffer(0), fpos(0) {}
  29. ~RowBuffer() { free(buffer); }
  30. void init(size32_t _rowsize, bool _isVar)
  31. {
  32. rowsize = _rowsize;
  33. isVar = _isVar;
  34. thisrowsize = rowsize;
  35. buffsize = rowsize + sizeof(offset_t);
  36. buffer = calloc(buffsize, 1);
  37. fpos = reinterpret_cast<offset_t *>(buffer);
  38. row = reinterpret_cast<char *>(buffer) + sizeof(offset_t);
  39. *fpos = 0;
  40. }
  41. void clear()
  42. {
  43. memset(row, 0, rowsize);
  44. *fpos = 0;
  45. }
  46. bool getCursorNext(IKeyCursor * keyCursor)
  47. {
  48. if(keyCursor->next(row))
  49. {
  50. if(isVar)
  51. thisrowsize = keyCursor->getSize();
  52. *fpos = keyCursor->getFPos();
  53. return true;
  54. }
  55. *fpos = 0;
  56. return false;
  57. }
  58. void putBuilder(IKeyBuilder * keyBuilder, unsigned __int64 reccount)
  59. {
  60. keyBuilder->processKeyData(row, *fpos, thisrowsize);
  61. }
  62. offset_t queryFPos() const { return *fpos; }
  63. void setFPos(offset_t fp) { *fpos = fp; }
  64. CNodeInfo * getNodeInfo(unsigned __int64 reccount)
  65. {
  66. return new CNodeInfo(*fpos, row, thisrowsize, reccount);
  67. }
  68. void tally(CRC32 & crc) { crc.tally(sizeof(offset_t) + thisrowsize, buffer); }
  69. int compareKeyed(RowBuffer const & other) const
  70. {
  71. size32_t minsize = thisrowsize;
  72. if(other.thisrowsize < minsize)
  73. minsize = other.thisrowsize;
  74. int cmp = memcmp(row, other.row, minsize);
  75. if(cmp != 0)
  76. return cmp;
  77. if(thisrowsize < other.thisrowsize)
  78. return -1;
  79. if(thisrowsize > other.thisrowsize)
  80. return +1;
  81. return 0;
  82. }
  83. void swap(RowBuffer & other)
  84. {
  85. void * tmpb = other.buffer;
  86. char * tmpr = other.row;
  87. offset_t * tmpf = other.fpos;
  88. other.buffer = buffer;
  89. other.row = row;
  90. other.fpos = fpos;
  91. buffer = tmpb;
  92. row = tmpr;
  93. fpos = tmpf;
  94. if(isVar)
  95. {
  96. size32_t tmps = other.thisrowsize;
  97. other.thisrowsize = thisrowsize;
  98. thisrowsize = tmps;
  99. }
  100. }
  101. size32_t diffCompress(RowBuffer const & prev, char * dst) const
  102. {
  103. if(isVar)
  104. {
  105. size32_t maxrowsize;
  106. if(thisrowsize > prev.thisrowsize)
  107. {
  108. maxrowsize = thisrowsize;
  109. memset(prev.row + prev.thisrowsize, 0, thisrowsize - prev.thisrowsize);
  110. }
  111. else
  112. {
  113. maxrowsize = prev.thisrowsize;
  114. if(prev.thisrowsize > thisrowsize)
  115. memset(row + thisrowsize, 0, prev.thisrowsize - thisrowsize);
  116. }
  117. *reinterpret_cast<size32_t *>(dst) = thisrowsize;
  118. return (sizeof(size32_t) + DiffCompress2(buffer, dst + sizeof(size32_t), prev.buffer, maxrowsize + sizeof(offset_t)));
  119. }
  120. else
  121. {
  122. return DiffCompress2(buffer, dst, prev.buffer, buffsize);
  123. }
  124. }
  125. size32_t diffExpand(byte const * src, RowBuffer const & prev)
  126. {
  127. if(isVar)
  128. {
  129. thisrowsize = *reinterpret_cast<size32_t const *>(src);
  130. unsigned maxrowsize;
  131. if(thisrowsize > prev.thisrowsize)
  132. {
  133. maxrowsize = thisrowsize;
  134. memset(prev.row + prev.thisrowsize, 0, thisrowsize - prev.thisrowsize);
  135. }
  136. else
  137. maxrowsize = prev.thisrowsize;
  138. return (sizeof(size32_t) + DiffExpand(src + sizeof(size32_t), buffer, prev.buffer, maxrowsize + sizeof(offset_t)));
  139. }
  140. else
  141. {
  142. return DiffExpand(src, buffer, prev.buffer, buffsize);
  143. }
  144. }
  145. void copyFrom(RowBuffer const & src)
  146. {
  147. memcpy(buffer, src.buffer, buffsize);
  148. if(isVar)
  149. thisrowsize = src.thisrowsize;
  150. }
  151. size32_t buffSize() { return buffsize; }
  152. size32_t rowSize() { return thisrowsize; }
  153. size32_t serializeRowSize() { return thisrowsize+sizeof(offset_t)+(isVar?sizeof(size32_t):0); }
  154. size32_t serialize(void *dst)
  155. {
  156. size32_t ret = thisrowsize+sizeof(offset_t);
  157. if (isVar) {
  158. *((size32_t *)dst) = ret;
  159. memcpy((byte *)dst+sizeof(size32_t),buffer,ret);
  160. ret += sizeof(size32_t);
  161. }
  162. else
  163. memcpy((byte *)dst,buffer,ret);
  164. return ret;
  165. }
  166. size32_t deserialize(const void *src)
  167. {
  168. size32_t ret = 0;
  169. if (isVar) {
  170. thisrowsize = *(const size32_t *)src;
  171. thisrowsize -= sizeof(offset_t);
  172. src = (const byte *)src + sizeof(size32_t);
  173. ret += sizeof(size32_t);
  174. }
  175. size32_t cp = thisrowsize+sizeof(offset_t);
  176. assertex(buffsize>=cp);
  177. memcpy(buffer,src,cp);
  178. return ret+cp;
  179. }
  180. private:
  181. size32_t rowsize;
  182. bool isVar;
  183. size32_t thisrowsize;
  184. size32_t buffsize;
  185. void * buffer;
  186. char * row;
  187. offset_t * fpos;
  188. };
  189. class CKeyReader: public CInterface
  190. {
  191. public:
  192. CKeyReader(char const * filename) : count(0)
  193. {
  194. keyFile.setown(createIFile(filename));
  195. keyFileIO.setown(keyFile->open(IFOread));
  196. if(!keyFileIO)
  197. throw MakeStringException(0, "Could not read index file %s", filename);
  198. keyIndex.setown(createKeyIndex(filename, 0, *keyFileIO, false, false)); // MORE - should we care about crc?
  199. unsigned flags = keyIndex->getFlags();
  200. variableWidth = ((flags & HTREE_VARSIZE) == HTREE_VARSIZE);
  201. if((flags & HTREE_QUICK_COMPRESSED_KEY) == HTREE_QUICK_COMPRESSED_KEY)
  202. quickCompressed = true;
  203. else if((flags & HTREE_COMPRESSED_KEY) == HTREE_COMPRESSED_KEY)
  204. quickCompressed = false;
  205. else
  206. throw MakeStringException(0, "Index file %s did not have compression flags set, unsupported", filename);
  207. unsigned optionalFlags = (HTREE_VARSIZE | HTREE_QUICK_COMPRESSED_KEY | HTREE_TOPLEVEL_KEY | HTREE_FULLSORT_KEY);
  208. unsigned requiredFlags = COL_PREFIX;
  209. #ifdef _DEBUG
  210. if((flags & ~optionalFlags) != requiredFlags)
  211. ERRLOG("Index file %s did not have expected index flags set (%x)", filename, (flags & ~optionalFlags) );
  212. #else
  213. if((flags & ~optionalFlags) != requiredFlags)
  214. throw MakeStringException(0, "Index file %s did not have expected index flags set (%x)", filename, (flags & ~optionalFlags) );
  215. #endif
  216. offset_t blobHead = keyIndex->queryBlobHead();
  217. if(blobHead == static_cast<offset_t>(-1))
  218. WARNLOG("Index part %s does not declare blob status: if it contains blobs, they will be lost", filename);
  219. else if(blobHead != 0)
  220. throw MakeStringException(0, "Index contains BLOBs, which are currently not supported by keydiff/patch");
  221. if(keyIndex->queryMetadataHead())
  222. throw MakeStringException(0, "Index contains metadata, which is not currently supported by keydiff/patch");
  223. keyCursor.setown(keyIndex->getCursor(NULL));
  224. if(keyIndex->hasPayload())
  225. keyedsize = keyIndex->keyedSize();
  226. else
  227. keyedsize = static_cast<unsigned>(-1);
  228. rowsize = keyIndex->keySize();
  229. eof = false;
  230. }
  231. bool get(RowBuffer & buffer)
  232. {
  233. if(eof)
  234. return false;
  235. if(buffer.getCursorNext(keyCursor))
  236. {
  237. buffer.tally(crc);
  238. count++;
  239. checkProgress();
  240. return true;
  241. }
  242. eof = true;
  243. checkProgress();
  244. return false;
  245. }
  246. void getToEnd(RowBuffer & buffer)
  247. {
  248. while(!eof)
  249. get(buffer);
  250. }
  251. void getRawToEnd()
  252. {
  253. char * buff = reinterpret_cast<char *>(malloc(rowsize));
  254. while(!eof)
  255. {
  256. if(keyCursor->next(buff))
  257. {
  258. offset_t fpos = keyCursor->getFPos();
  259. crc.tally(rowsize, buff);
  260. crc.tally(sizeof(fpos), &fpos);
  261. }
  262. else
  263. eof = true;
  264. }
  265. free(buff);
  266. }
  267. size32_t queryKeyedSize() const { return keyedsize; }
  268. size32_t queryRowSize() const { return rowsize; }
  269. unsigned queryCRC() { return crc.get(); }
  270. unsigned queryCount() const { return count; }
  271. bool isVariableWidth() const { return variableWidth; }
  272. bool isQuickCompressed() const { return quickCompressed; }
  273. unsigned getNodeSize() const { return keyIndex->getNodeSize(); }
  274. virtual void setProgressCallback(IKeyDiffProgressCallback * callback, offset_t freq)
  275. {
  276. progressCallback.setown(callback);
  277. progressFrequency = freq;
  278. progressCount = 0;
  279. }
  280. private:
  281. void checkProgress()
  282. {
  283. if(!progressCallback)
  284. return;
  285. offset_t latest = keyIndex->queryLatestGetNodeOffset();
  286. if((latest - progressCount) < progressFrequency)
  287. return;
  288. do
  289. {
  290. progressCount += progressFrequency;
  291. } while((latest - progressCount) >= progressFrequency);
  292. progressCallback->handle(latest);
  293. }
  294. private:
  295. Owned<IFile> keyFile;
  296. Owned<IFileIO> keyFileIO;
  297. Owned<IKeyIndex> keyIndex;
  298. Owned<IKeyCursor> keyCursor;
  299. CRC32 crc;
  300. size32_t keyedsize;
  301. size32_t rowsize;
  302. bool eof;
  303. unsigned count;
  304. bool variableWidth;
  305. bool quickCompressed;
  306. Owned<IKeyDiffProgressCallback> progressCallback;
  307. offset_t progressFrequency;
  308. offset_t progressCount;
  309. };
  310. class CKeyFileReader: public CInterface, extends IKeyFileRowReader
  311. {
  312. CKeyReader reader;
  313. Owned<IPropertyTree> header;
  314. RowBuffer buffer;
  315. public:
  316. IMPLEMENT_IINTERFACE;
  317. CKeyFileReader(const char *filename)
  318. : reader(filename)
  319. {
  320. size32_t rowsize = reader.queryRowSize();
  321. bool isvar = reader.isVariableWidth();
  322. buffer.init(rowsize,isvar);
  323. header.setown(createPTree("Index"));
  324. header->setPropInt("@rowSize",rowsize);
  325. header->setPropInt("@keyedSize",reader.queryKeyedSize());
  326. header->setPropBool("@variableWidth",isvar);
  327. header->setPropBool("@quickCompressed",reader.isQuickCompressed());
  328. #if 0
  329. PROGLOG("rowSize = %d",rowsize);
  330. PROGLOG("keyedSize = %d",reader.queryKeyedSize());
  331. PROGLOG("variableWidth = %s",isvar?"true":"false");
  332. PROGLOG("quickCompressed = %s",reader.isQuickCompressed()?"true":"false");
  333. #endif
  334. }
  335. const void *nextRow()
  336. {
  337. if (!reader.get(buffer))
  338. return NULL;
  339. void *ret = malloc(buffer.serializeRowSize());
  340. buffer.serialize(ret);
  341. return ret;
  342. }
  343. void stop()
  344. {
  345. }
  346. IPropertyTree *queryHeader()
  347. {
  348. return header;
  349. }
  350. };
  351. class CKeyWriter: public CInterface
  352. {
  353. public:
  354. CKeyWriter()
  355. {
  356. }
  357. void init (char const * filename, bool overwrite, size32_t _keyedsize, size32_t _rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize)
  358. {
  359. keyedsize = _keyedsize;
  360. rowsize = _rowsize;
  361. reccount = 0;
  362. keyFile.setown(createIFile(filename));
  363. if(!overwrite && (keyFile->isFile() != notFound))
  364. throw MakeStringException(0, "Found preexisting index file %s (overwrite not selected)", filename);
  365. keyFileIO.setown(keyFile->openShared(IFOcreate, IFSHfull)); // not sure if needs shared here
  366. if(!keyFileIO)
  367. throw MakeStringException(0, "Could not write index file %s", filename);
  368. keyStream.setown(createIOStream(keyFileIO));
  369. unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY;
  370. if(variableWidth)
  371. flags |= HTREE_VARSIZE;
  372. if(quickCompressed)
  373. flags |= HTREE_QUICK_COMPRESSED_KEY;
  374. keyBuilder.setown(createKeyBuilder(keyStream, flags, rowsize, nodeSize, keyedsize, 0)); // MORE - support for sequence other than 0...
  375. }
  376. ~CKeyWriter()
  377. {
  378. if (keyBuilder)
  379. keyBuilder->finish();
  380. }
  381. void put(RowBuffer & buffer)
  382. {
  383. buffer.tally(crc);
  384. buffer.putBuilder(keyBuilder, reccount++);
  385. }
  386. void putNode(CNodeInfo & info)
  387. {
  388. crc.tally(rowsize, info.value);
  389. crc.tally(sizeof(info.pos), &(info.pos));
  390. keyBuilder->processKeyData(reinterpret_cast<char *>(info.value), info.pos, info.size);
  391. }
  392. unsigned __int64 queryCount()
  393. {
  394. return reccount;
  395. }
  396. unsigned queryCRC()
  397. {
  398. return crc.get();
  399. }
  400. offset_t getPosition()
  401. {
  402. return keyStream->tell();
  403. }
  404. private:
  405. Owned<IFile> keyFile;
  406. Owned<IFileIO> keyFileIO;
  407. Owned<IFileIOStream> keyStream;
  408. Owned<IKeyBuilder> keyBuilder;
  409. CRC32 crc;
  410. size32_t keyedsize;
  411. size32_t rowsize;
  412. unsigned __int64 reccount;
  413. };
  414. class KeyDiffVersion
  415. {
  416. public:
  417. KeyDiffVersion(unsigned short _mjr, unsigned short _mnr) : mjr(_mjr), mnr(_mnr) {}
  418. KeyDiffVersion(KeyDiffVersion const & other) : mjr(other.mjr), mnr(other.mnr) {}
  419. unsigned short queryMajor() const { return mjr; }
  420. unsigned short queryMinor() const { return mnr; }
  421. void serialize(MemoryBuffer & buff) const { buff.append(mjr).append(mnr); }
  422. void deserialize(MemoryBuffer & buff) { buff.read(mjr).read(mnr); }
  423. bool operator<(KeyDiffVersion const & other) const { return ((mjr < other.mjr) || ((mjr == other.mjr) && (mnr < other.mnr))); }
  424. static size32_t querySerializedSize() { return 2*sizeof(unsigned short); }
  425. private:
  426. unsigned short mjr;
  427. unsigned short mnr;
  428. };
  429. class CKeyFileWriter: public CInterface, extends IKeyFileRowWriter
  430. {
  431. CKeyWriter writer;
  432. Owned<IPropertyTree> header;
  433. RowBuffer buffer;
  434. public:
  435. IMPLEMENT_IINTERFACE;
  436. CKeyFileWriter(const char *filename, IPropertyTree *_header, bool overwrite, unsigned nodeSize)
  437. : header(createPTreeFromIPT(_header))
  438. {
  439. writer.init(filename,overwrite,header->getPropInt("@keyedSize"), header->getPropInt("@rowSize"), header->getPropBool("@variableWidth"), header->getPropBool("@quickCompressed"), header->getPropInt("@nodeSize", NODESIZE));
  440. size32_t rowsize = header->getPropInt("@rowSize");
  441. bool isvar = header->getPropBool("@variableWidth");
  442. buffer.init(rowsize,isvar);
  443. }
  444. void flush()
  445. {
  446. // not needed?
  447. }
  448. virtual void putRow(const void *src)
  449. {
  450. buffer.deserialize(src);
  451. writer.put(buffer);
  452. free((void *)src);
  453. }
  454. offset_t getPosition()
  455. {
  456. return writer.getPosition();
  457. }
  458. };
  459. class KeyDiffHeader
  460. {
  461. public:
  462. KeyDiffHeader() : version(0, 0), minPatchVersion(0, 0), oldCRC(0), newCRC(0), patchCRC(0), tlkInfo(false), tlkCRC(0) {}
  463. KeyDiffHeader(KeyDiffVersion const & _version, KeyDiffVersion const & _minPatchVersion, char const * _oldIndex, char const * _newIndex, char const * _newTLK)
  464. : version(_version), minPatchVersion(_minPatchVersion), oldCRC(0), newCRC(0), patchCRC(0), oldIndex(_oldIndex), newIndex(_newIndex), tlkInfo(false), tlkCRC(0)
  465. {
  466. if(_newTLK != 0)
  467. {
  468. tlkInfo = true;
  469. newTLK.append(_newTLK);
  470. }
  471. crcStreamPos = namesStreamPos = endStreamPos = 0;
  472. }
  473. KeyDiffVersion const & queryVersion() const { return version; }
  474. KeyDiffVersion const & queryMinPatchVersion() const { return minPatchVersion; }
  475. unsigned queryOldCRC() const { return oldCRC; }
  476. unsigned queryNewCRC() const { return newCRC; }
  477. unsigned queryPatchCRC() const { return patchCRC; }
  478. unsigned queryTLKCRC() const { return tlkCRC; }
  479. char const * queryOldIndex() const { return oldIndex.str(); }
  480. char const * queryNewIndex() const { return newIndex.str(); }
  481. bool hasTLKInfo() const { return tlkInfo; }
  482. char const * queryNewTLK() const { return tlkInfo ? newTLK.str() : 0; }
  483. void write(IFileIOStream * _stream)
  484. {
  485. stream.set(_stream);
  486. MemoryBuffer buff;
  487. buff.append(7, KEYDIFFSIG);
  488. version.serialize(buff);
  489. minPatchVersion.serialize(buff);
  490. crcStreamPos = buff.length();
  491. crcHeadVer.tally((size32_t)crcStreamPos, buff.toByteArray());
  492. buff.append(oldCRC).append(newCRC).append(patchCRC).append(tlkCRC);
  493. namesStreamPos = buff.length();
  494. crcHeadCRCs.tally((size32_t)(namesStreamPos - crcStreamPos), buff.toByteArray() + crcStreamPos);
  495. size32_t oil = oldIndex.length();
  496. size32_t nil = newIndex.length();
  497. size32_t tlkl = tlkInfo ? newTLK.length() : 0;
  498. buff.append(oil).append(nil).append(tlkl).append(oil, oldIndex.str()).append(nil, newIndex.str());
  499. if(tlkl)
  500. buff.append(tlkl, newTLK.str());
  501. endStreamPos = buff.length();
  502. crcHeadNames.tally((size32_t)(endStreamPos - namesStreamPos), buff.toByteArray() + namesStreamPos);
  503. stream->write(buff.length(), buff.toByteArray());
  504. }
  505. void rewriteCRC(unsigned _oldCRC, unsigned _newCRC, unsigned _patchCRC)
  506. {
  507. oldCRC = _oldCRC;
  508. newCRC = _newCRC;
  509. patchCRC = _patchCRC;
  510. if(tlkInfo)
  511. readTLKCRC();
  512. MemoryBuffer buff;
  513. buff.append(oldCRC).append(newCRC).append(patchCRC).append(tlkCRC);
  514. stream->flush();
  515. stream->seek(crcStreamPos, IFSbegin);
  516. stream->write(buff.length(), buff.toByteArray());
  517. crcHeadCRCs.reset();
  518. assertex(buff.length() == (namesStreamPos - crcStreamPos));
  519. crcHeadCRCs.tally(buff.length(), buff.toByteArray());
  520. }
  521. void readVersionInfo(IFileIOStream * _stream, char const * patchName)
  522. {
  523. stream.set(_stream);
  524. MemoryBuffer buff;
  525. size32_t bufflen = 7 + 2*KeyDiffVersion::querySerializedSize();
  526. stream->read(bufflen, buff.reserve(bufflen));
  527. char signature[7];
  528. buff.read(7, signature);
  529. if(strncmp(signature, KEYDIFFSIG, 7) != 0)
  530. throw MakeStringException(0, "Bad format in file %s, did not appear to be key patch file", patchName);
  531. version.deserialize(buff);
  532. minPatchVersion.deserialize(buff);
  533. }
  534. void readFileInfo()
  535. {
  536. MemoryBuffer buff;
  537. size32_t bufflen = 4*sizeof(unsigned) + 3*sizeof(size32_t);
  538. stream->read(bufflen, buff.reserve(bufflen));
  539. buff.read(oldCRC).read(newCRC).read(patchCRC).read(tlkCRC);
  540. size32_t oil, nil, tlkl;
  541. buff.read(oil).read(nil).read(tlkl);
  542. stream->read(oil, oldIndex.reserve(oil));
  543. stream->read(nil, newIndex.reserve(nil));
  544. if(tlkl)
  545. {
  546. tlkInfo = true;
  547. stream->read(tlkl, newTLK.reserve(tlkl));
  548. }
  549. }
  550. unsigned mergeFileCRC(offset_t datasize, unsigned datacrc)
  551. {
  552. CRC32Merger merger;
  553. merger.addChildCRC(crcStreamPos, crcHeadVer.get(), true);
  554. merger.addChildCRC(namesStreamPos - crcStreamPos, crcHeadCRCs.get(), true);
  555. merger.addChildCRC(endStreamPos - namesStreamPos, crcHeadNames.get(), true);
  556. merger.addChildCRC(datasize, datacrc, true);
  557. return merger.get();
  558. }
  559. private:
  560. void readTLKCRC()
  561. {
  562. CKeyReader tlkReader(newTLK);
  563. tlkReader.getRawToEnd();
  564. tlkCRC = tlkReader.queryCRC();
  565. }
  566. private:
  567. Owned<IFileIOStream> stream;
  568. offset_t crcStreamPos;
  569. offset_t namesStreamPos;
  570. offset_t endStreamPos;
  571. KeyDiffVersion version;
  572. KeyDiffVersion minPatchVersion;
  573. unsigned oldCRC;
  574. unsigned newCRC;
  575. unsigned patchCRC;
  576. StringBuffer oldIndex;
  577. StringBuffer newIndex;
  578. bool tlkInfo;
  579. StringBuffer newTLK;
  580. unsigned tlkCRC;
  581. CRC32 crcHeadVer, crcHeadCRCs, crcHeadNames;
  582. };
  583. class CKeyDiff : public CInterface
  584. {
  585. public:
  586. typedef enum {
  587. CMD_END = 0,
  588. CMD_MATCH = 1, // new curr == old curr
  589. CMD_FPOS = 2, // new curr == old curr but fpos has changed, new fpos follows
  590. CMD_DIFF_OLD_CURR = 3, // diff between new curr and old curr follows
  591. CMD_DIFF_OLD_PREV = 4, // diff between new curr and old prev follows
  592. CMD_DIFF_NEW_PREV = 5, // diff between new curr and new prev follows
  593. CMD_SKIP = 6, // +N-1, skip N old records
  594. MAX_SKIP = 249
  595. } CommandCode;
  596. static KeyDiffVersion const version;
  597. static KeyDiffVersion const minDiffVersionForPatch;
  598. static KeyDiffVersion const minPatchVersionForDiff;
  599. CKeyDiff() {}
  600. CKeyDiff(char const * oldIndex, char const * newIndex, char const * newTLK)
  601. : header(version, minPatchVersionForDiff, oldIndex, newIndex, newTLK)
  602. {
  603. }
  604. protected:
  605. KeyDiffHeader header;
  606. Owned<IFile> file;
  607. Owned<IFileIO> fileIO;
  608. Owned<IFileIOStream> stream;
  609. static size32_t const streambuffsize;
  610. static size32_t const compressThreshold;
  611. };
  612. class KeyDiffStats
  613. {
  614. public:
  615. KeyDiffStats() : stats(new unsigned[CKeyDiff::CMD_SKIP-1]), diffSize(0)
  616. {
  617. unsigned i;
  618. for(i=1; i<CKeyDiff::CMD_SKIP; i++)
  619. stats[i-1] = 0;
  620. }
  621. ~KeyDiffStats() { delete [] stats; }
  622. void inc(CKeyDiff::CommandCode cmd) { assertex(cmd < CKeyDiff::CMD_SKIP); assertex(cmd > 0); stats[cmd-1]++; }
  623. void addDiffSize(size32_t sz) { diffSize += sz; }
  624. void log() const
  625. {
  626. LOG(MCstats, "Matching rows: %u", stats[CKeyDiff::CMD_MATCH-1]);
  627. LOG(MCstats, "Rows close to previous old row: %u", stats[CKeyDiff::CMD_DIFF_OLD_PREV-1]);
  628. LOG(MCstats, "Rows close to current old row: %u", stats[CKeyDiff::CMD_DIFF_OLD_CURR-1]);
  629. LOG(MCstats, "Rows close to previous new row: %u", stats[CKeyDiff::CMD_DIFF_NEW_PREV-1]);
  630. unsigned diffNum = stats[CKeyDiff::CMD_DIFF_OLD_PREV-1] + stats[CKeyDiff::CMD_DIFF_OLD_CURR-1] + stats[CKeyDiff::CMD_DIFF_NEW_PREV-1];
  631. if(diffNum > 0)
  632. LOG(MCstats, "Average diff size: %u", ((diffSize + diffNum/2) / diffNum));
  633. }
  634. private:
  635. unsigned * stats;
  636. size32_t diffSize;
  637. };
  638. class CWritableKeyDiff : public CKeyDiff
  639. {
  640. public:
  641. CWritableKeyDiff(char const * filename, bool overwrite, char const * oldIndex, char const * newIndex, char const * newTLK, unsigned _compmode)
  642. : CKeyDiff(oldIndex, newIndex, newTLK)
  643. {
  644. file.setown(createIFile(filename));
  645. if(!overwrite && (file->isFile() != notFound))
  646. throw MakeStringException(0, "Found preexisting key patch file %s (overwrite not selected)", filename);
  647. fileIO.setown(file->open(IFOcreate));
  648. if(!fileIO)
  649. throw MakeStringException(0, "Could not write key patch file %s", filename);
  650. stream.setown(createIOStream(fileIO));
  651. compmode = _compmode;
  652. datasize = 0;
  653. }
  654. void writeHeader()
  655. {
  656. header.write(stream);
  657. }
  658. void rewriteHeaderCRC(unsigned oldCRC, unsigned newCRC)
  659. {
  660. header.rewriteCRC(oldCRC, newCRC, crc.get());
  661. }
  662. void writeSkip(unsigned count)
  663. {
  664. outb(CMD_SKIP + count - 1);
  665. }
  666. void writeMatch()
  667. {
  668. stats.inc(CMD_MATCH);
  669. outb(CMD_MATCH);
  670. }
  671. void writeDiff(CommandCode code, char * ptr, size32_t sz)
  672. {
  673. stats.inc(code);
  674. stats.addDiffSize(sz);
  675. outb(code);
  676. out(ptr, sz);
  677. }
  678. void finish()
  679. {
  680. out(NULL, 0);
  681. outTerminate();
  682. }
  683. void logStats() const
  684. {
  685. stats.log();
  686. }
  687. unsigned queryCRC()
  688. {
  689. return crc.get();
  690. }
  691. unsigned queryFileCRC()
  692. {
  693. return header.mergeFileCRC(datasize, datacrc.get());
  694. }
  695. private:
  696. void out(char * ptr, size32_t sz)
  697. {
  698. if(!ptr || (outbuff.length()+sz > streambuffsize))
  699. writeBuff();
  700. outbuff.append(sz, ptr);
  701. }
  702. void outb(byte b)
  703. {
  704. if(outbuff.length() >= streambuffsize)
  705. writeBuff();
  706. outbuff.append(b);
  707. }
  708. void outTerminate()
  709. {
  710. size32_t zero = 0;
  711. stream->write(sizeof(zero), &zero);
  712. stream->write(sizeof(zero), &zero);
  713. datacrc.tally(sizeof(zero), &zero);
  714. datacrc.tally(sizeof(zero), &zero);
  715. datasize += 2*sizeof(zero);
  716. }
  717. void writeBuff()
  718. {
  719. size32_t outsize = outbuff.length();
  720. size32_t wrsize = outsize;
  721. void const * wrbuff = outbuff.toByteArray();
  722. crc.tally(wrsize, wrbuff);
  723. MemoryAttr ma;
  724. MemoryBuffer mb;
  725. size32_t wrflag = wrsize;
  726. if(compmode && (outsize > compressThreshold))
  727. {
  728. size32_t newsize = outsize*4/5; // only compress if get better than 80%
  729. if (compmode==COMPRESS_METHOD_LZW) {
  730. byte *compbuff = (byte *)ma.allocate(streambuffsize);
  731. Owned<ICompressor> compressor = createLZWCompressor();
  732. compressor->open(compbuff, newsize);
  733. if (compressor->write(wrbuff, outsize)==outsize) {
  734. compressor->close();
  735. wrsize = compressor->buflen();
  736. wrflag = wrsize;
  737. wrbuff = compbuff;
  738. }
  739. }
  740. else if (compmode==COMPRESS_METHOD_LZMA) {
  741. LZMACompressToBuffer(mb,outsize,wrbuff);
  742. if (mb.length()+16<outsize) {
  743. wrsize = mb.length();
  744. wrflag = wrsize|LZMA_FLAG;
  745. wrbuff = mb.bufferBase();
  746. }
  747. }
  748. else
  749. throw MakeStringException(-1,"Unknown compression mode (%d)",compmode);
  750. }
  751. stream->write(sizeof(outsize), &outsize);
  752. stream->write(sizeof(wrflag), &wrflag);
  753. stream->write(wrsize, wrbuff);
  754. datacrc.tally(sizeof(outsize), &outsize);
  755. datacrc.tally(sizeof(wrsize), &wrsize);
  756. datacrc.tally(wrsize, wrbuff);
  757. datasize += (sizeof(outsize) + sizeof(wrflag) + wrsize);
  758. outbuff.clear();
  759. }
  760. private:
  761. unsigned compmode; // 0, COMPRESS_METHOD_LZW or COMPRESS_METHOD_LZMA
  762. MemoryBuffer outbuff;
  763. CRC32 crc;
  764. CRC32 datacrc;
  765. offset_t datasize;
  766. KeyDiffStats stats;
  767. };
  768. class CReadableKeyDiff : public CKeyDiff
  769. {
  770. public:
  771. CReadableKeyDiff(char const * filename)
  772. : patch(filename), eof(false), insize(0), lastSkipCount(0)
  773. {
  774. file.setown(createIFile(filename));
  775. fileIO.setown(file->open(IFOread));
  776. if(!fileIO)
  777. throw MakeStringException(0, "Could not read key patch file %s", filename);
  778. stream.setown(createIOStream(fileIO));
  779. inbuff.reserve(streambuffsize + 2*sizeof(size32_t));
  780. inbuff.rewrite(0);
  781. }
  782. KeyDiffHeader const & queryHeader() const { return header; }
  783. bool compatibleVersions(StringBuffer & error) const
  784. {
  785. if(header.queryVersion() < minDiffVersionForPatch)
  786. {
  787. error.appendf("Patch was created with keydiff version %u.%u, this keypatch requires at least keydiff version %u.%u", header.queryVersion().queryMajor(), header.queryVersion().queryMinor(), minDiffVersionForPatch.queryMajor(), minDiffVersionForPatch.queryMinor());
  788. return false;
  789. }
  790. if(version < header.queryMinPatchVersion())
  791. {
  792. error.appendf("This is keypatch version %u.%u, this patch requires at least keypatch version %u.%u", version.queryMajor(), version.queryMinor(), header.queryMinPatchVersion().queryMajor(), header.queryMinPatchVersion().queryMinor());
  793. return false;
  794. }
  795. return true;
  796. }
  797. void readHeaderVersionInfo()
  798. {
  799. header.readVersionInfo(stream, patch.get());
  800. }
  801. void readHeaderFileInfo()
  802. {
  803. header.readFileInfo();
  804. size32_t insize, rdsize;
  805. stream->read(sizeof(insize), &insize);
  806. stream->read(sizeof(rdsize), &rdsize);
  807. inbuff.append(insize);
  808. inbuff.append(rdsize);
  809. inbuff.reset(0);
  810. }
  811. CommandCode readCmd()
  812. {
  813. byte cmd;
  814. if(!inb(cmd))
  815. return CMD_END;
  816. if(cmd >= CMD_SKIP)
  817. {
  818. lastSkipCount = cmd - CMD_SKIP + 1;
  819. return CMD_SKIP;
  820. }
  821. return static_cast<CommandCode>(cmd);
  822. }
  823. void readNewFPos(RowBuffer & buffer)
  824. {
  825. offset_t fpos;
  826. infpos(fpos);
  827. buffer.setFPos(fpos);
  828. }
  829. unsigned readSkipCount()
  830. {
  831. return lastSkipCount;
  832. }
  833. bool readDiffAndExpand(RowBuffer const & prev, RowBuffer & dest)
  834. {
  835. byte const * src = in();
  836. if(!src) return false;
  837. size32_t consumed = dest.diffExpand(src, prev);
  838. return skip(consumed);
  839. }
  840. unsigned queryCRC()
  841. {
  842. return crc.get();
  843. }
  844. private:
  845. byte const * in()
  846. {
  847. if(insize == 0)
  848. if(!readBuff())
  849. return NULL;
  850. return inbuff.readDirect(0);
  851. }
  852. bool skip(size32_t sz)
  853. {
  854. if(insize < sz)
  855. return false;
  856. insize -= sz;
  857. inbuff.skip(sz);
  858. return true;
  859. }
  860. bool inb(byte & b)
  861. {
  862. if(insize == 0)
  863. if(!readBuff())
  864. return false;
  865. if(insize == 0)
  866. return false;
  867. inbuff.read(b);
  868. insize--;
  869. return true;
  870. }
  871. bool infpos(offset_t & fp)
  872. {
  873. if(insize == 0)
  874. if(!readBuff())
  875. return false;
  876. if(insize < sizeof(offset_t))
  877. return false;
  878. inbuff.read(fp);
  879. insize -= sizeof(offset_t);
  880. return true;
  881. }
  882. bool readBuff()
  883. {
  884. if(eof)
  885. return false;
  886. size32_t rdsize;
  887. inbuff.read(insize);
  888. inbuff.read(rdsize);
  889. if(insize == 0)
  890. {
  891. eof = true;
  892. inbuff.clear();
  893. insize = 0;
  894. return false;
  895. }
  896. inbuff.rewrite(0);
  897. if(insize == rdsize)
  898. {
  899. stream->read(rdsize + 2*sizeof(size32_t), inbuff.reserve(rdsize + 2*sizeof(size32_t)));
  900. }
  901. else
  902. {
  903. bool fastlz = false;
  904. if (rdsize&LZMA_FLAG) {
  905. fastlz = true;
  906. rdsize &= ~LZMA_FLAG;
  907. }
  908. byte * buf;
  909. if (compma.length()<rdsize)
  910. buf = (byte *)compma.allocate(rdsize+4096);
  911. else
  912. buf = (byte *)compma.bufferBase();
  913. stream->read(rdsize, buf);
  914. if (fastlz)
  915. LZMADecompressToBuffer(inbuff,buf);
  916. else {
  917. Owned<IExpander> expander = createLZWExpander();
  918. size32_t expsize = expander->init(buf);
  919. if(expsize != insize)
  920. throw MakeStringException(0, "LZW compression/expansion error");
  921. expander->expand(inbuff.reserve(insize));
  922. }
  923. stream->read(2*sizeof(size32_t), inbuff.reserve(2*sizeof(size32_t)));
  924. }
  925. crc.tally(insize, inbuff.toByteArray());
  926. return true;
  927. }
  928. private:
  929. StringAttr patch;
  930. CRC32 crc;
  931. bool eof;
  932. MemoryBuffer inbuff;
  933. size32_t insize;
  934. unsigned lastSkipCount;
  935. MemoryAttr compma;
  936. };
  937. class CKeyDiffGenerator : public CInterface, public IKeyDiffGenerator
  938. {
  939. public:
  940. IMPLEMENT_IINTERFACE;
  941. CKeyDiffGenerator(char const * oldIndex, char const * newIndex, char const * patch, char const * newTLK, bool overwrite, unsigned compmode)
  942. : oldInput(oldIndex), newInput(newIndex), keydiff(patch, overwrite, oldIndex, newIndex, newTLK, compmode), keyedsize(oldInput.queryKeyedSize()), rowsize(oldInput.queryRowSize())
  943. {
  944. if((newInput.queryKeyedSize() != keyedsize) || (newInput.queryRowSize() != rowsize))
  945. throw MakeStringException(0, "Cannot generate diff for keys with different record sizes");
  946. if(newInput.isVariableWidth() != oldInput.isVariableWidth())
  947. throw MakeStringException(0, "Old and new keys are of different types (%s is variable width)", (oldInput.isVariableWidth() ? "old" : "new"));
  948. if(newInput.isQuickCompressed() != oldInput.isQuickCompressed())
  949. throw MakeStringException(0, "Old and new keys are of different types (%s is quick compressed)", (oldInput.isQuickCompressed() ? "old" : "new"));
  950. newcurr.init(rowsize, oldInput.isVariableWidth());
  951. newprev.init(rowsize, oldInput.isVariableWidth());
  952. oldcurr.init(rowsize, oldInput.isVariableWidth());
  953. oldprev.init(rowsize, oldInput.isVariableWidth());
  954. size32_t diffsize = (rowsize + sizeof(offset_t)) * 2; // *2 is excessive
  955. if(oldInput.isVariableWidth()) diffsize += sizeof(size32_t); // as have to store size
  956. diffnewprev = (char *)malloc(diffsize);
  957. diffoldcurr = (char *)malloc(diffsize);
  958. diffoldprev = (char *)malloc(diffsize);
  959. }
  960. ~CKeyDiffGenerator()
  961. {
  962. free(diffnewprev);
  963. free(diffoldcurr);
  964. free(diffoldprev);
  965. }
  966. virtual void run()
  967. {
  968. keydiff.writeHeader();
  969. writeBody();
  970. keydiff.finish();
  971. oldInput.getToEnd(oldcurr);
  972. keydiff.rewriteHeaderCRC(oldInput.queryCRC(), newInput.queryCRC());
  973. }
  974. virtual void logStats() const
  975. {
  976. LOG(MCstats, "Rows in old index: %u", oldInput.queryCount());
  977. LOG(MCstats, "Rows in new index: %u", newInput.queryCount());
  978. keydiff.logStats();
  979. }
  980. virtual unsigned queryPatchCRC()
  981. {
  982. return keydiff.queryCRC();
  983. }
  984. virtual unsigned queryPatchFileCRC()
  985. {
  986. return keydiff.queryFileCRC();
  987. }
  988. virtual void setProgressCallback(IKeyDiffProgressCallback * callback, offset_t freq)
  989. {
  990. oldInput.setProgressCallback(callback, freq);
  991. }
  992. private:
  993. bool readNew()
  994. {
  995. newcurr.swap(newprev);
  996. return newInput.get(newcurr);
  997. }
  998. bool readOld()
  999. {
  1000. oldcurr.swap(oldprev);
  1001. if(oldInput.get(oldcurr))
  1002. return true;
  1003. oldcurr.clear();
  1004. return false;
  1005. }
  1006. void writeBody()
  1007. {
  1008. if(!readNew())
  1009. return;
  1010. bool eosold = !readOld();
  1011. while(true)
  1012. {
  1013. int cmp = -1;
  1014. unsigned skipcount = 0;
  1015. size32_t doc = (size32_t)-1;
  1016. while (!eosold&&(skipcount < CKeyDiff::MAX_SKIP))
  1017. {
  1018. cmp = newcurr.compareKeyed(oldcurr);
  1019. if(cmp <= 0)
  1020. break;
  1021. #ifdef SMALL_ENOUGH_RATIO
  1022. size32_t ndoc = newcurr.diffCompress(oldcurr, diffoldcurr);
  1023. if (ndoc<newcurr.rowSize()/SMALL_ENOUGH_RATIO) {
  1024. doc = ndoc;
  1025. cmp = -1;
  1026. break;
  1027. }
  1028. #endif
  1029. skipcount++;
  1030. eosold = !readOld();
  1031. }
  1032. if(skipcount)
  1033. keydiff.writeSkip(skipcount);
  1034. if (eosold||((cmp==0) && (newcurr.queryFPos() != oldcurr.queryFPos())))
  1035. cmp = -1;
  1036. if(cmp==0)
  1037. {
  1038. keydiff.writeMatch();
  1039. if(!readNew())
  1040. break;
  1041. }
  1042. else if(cmp<0)
  1043. {
  1044. if (doc!=(size32_t)-1)
  1045. keydiff.writeDiff(CKeyDiff::CMD_DIFF_OLD_CURR, diffoldcurr, doc);
  1046. else {
  1047. size32_t dnp = newcurr.diffCompress(newprev, diffnewprev);
  1048. size32_t dop = newcurr.diffCompress(oldprev, diffoldprev);
  1049. doc = newcurr.diffCompress(oldcurr, diffoldcurr);
  1050. if(dnp<dop)
  1051. {
  1052. if(dnp<doc)
  1053. keydiff.writeDiff(CKeyDiff::CMD_DIFF_NEW_PREV, diffnewprev, dnp);
  1054. else
  1055. keydiff.writeDiff(CKeyDiff::CMD_DIFF_OLD_CURR, diffoldcurr, doc);
  1056. }
  1057. else if(dop<doc)
  1058. keydiff.writeDiff(CKeyDiff::CMD_DIFF_OLD_PREV, diffoldprev, dop);
  1059. else
  1060. keydiff.writeDiff(CKeyDiff::CMD_DIFF_OLD_CURR, diffoldcurr, doc);
  1061. }
  1062. if(!readNew())
  1063. break;
  1064. }
  1065. }
  1066. }
  1067. private:
  1068. CKeyReader oldInput;
  1069. CKeyReader newInput;
  1070. CWritableKeyDiff keydiff;
  1071. size32_t keyedsize;
  1072. size32_t rowsize;
  1073. RowBuffer newcurr;
  1074. RowBuffer newprev;
  1075. RowBuffer oldcurr;
  1076. RowBuffer oldprev;
  1077. char * diffnewprev;
  1078. char * diffoldcurr;
  1079. char * diffoldprev;
  1080. };
  1081. class CTLKGenerator : public Thread
  1082. {
  1083. public:
  1084. CTLKGenerator(INodeReceiver * receiver, unsigned numParts, KeyDiffHeader const & _header) : tlkReceiver(receiver), remaining(numParts), header(_header)
  1085. {
  1086. }
  1087. void open(char const * tlkName, bool overwrite, unsigned keyedsize, unsigned rowsize, bool variableWidth, bool quickCompressed, unsigned nodeSize)
  1088. {
  1089. filename.set(tlkName);
  1090. writer.setown(new CKeyWriter());
  1091. writer->init(tlkName, overwrite, keyedsize, rowsize, variableWidth, quickCompressed, nodeSize);
  1092. }
  1093. virtual int run()
  1094. {
  1095. Owned<CNodeInfo> ni;
  1096. while(remaining)
  1097. {
  1098. ni.setown(new CNodeInfo);
  1099. if(tlkReceiver->recv(*ni))
  1100. addNode(LINK(ni));
  1101. }
  1102. return 0;
  1103. }
  1104. bool addNode(CNodeInfo * info)
  1105. {
  1106. CriticalBlock block(crit);
  1107. TLKnodes.append(*info);
  1108. remaining--;
  1109. if(remaining==0)
  1110. finish();
  1111. return (remaining>0);
  1112. }
  1113. private:
  1114. void finish()
  1115. {
  1116. PROGLOG("Received all TLK data, generating TLK");
  1117. tlkReceiver->stop();
  1118. TLKnodes.sort(rowCompare);
  1119. if(TLKnodes.length())
  1120. {
  1121. CNodeInfo & lastNode = TLKnodes.item(TLKnodes.length()-1);
  1122. memset(lastNode.value, 0xff, lastNode.size);
  1123. }
  1124. offset_t fp = 1;
  1125. ForEachItemIn(idx, TLKnodes) {
  1126. CNodeInfo & info = TLKnodes.item(idx);
  1127. info.pos = fp++;
  1128. writer->putNode(info);
  1129. }
  1130. if(header.hasTLKInfo())
  1131. {
  1132. if(writer->queryCRC() != header.queryTLKCRC())
  1133. WARNLOG("CRC mismatch: on keydiff, new TLK %s had key CRC %08X, while on keypatch, new TLK %s had key CRC %08X", header.queryNewTLK(), header.queryTLKCRC(), filename.get(), writer->queryCRC());
  1134. }
  1135. else
  1136. WARNLOG("Patch did not include TLK info in header, TLK has been generated but its CRC has not been verified");
  1137. }
  1138. static int rowCompare(IInterface * const * ll, IInterface * const * rr)
  1139. {
  1140. CNodeInfo * l = static_cast<CNodeInfo *>(*ll);
  1141. CNodeInfo * r = static_cast<CNodeInfo *>(*rr);
  1142. return memcmp(l->value, r->value, l->size);
  1143. }
  1144. private:
  1145. Owned<INodeReceiver> tlkReceiver;
  1146. unsigned remaining;
  1147. KeyDiffHeader const & header;
  1148. StringAttr filename;
  1149. Owned<CKeyWriter> writer;
  1150. CriticalSection crit;
  1151. NodeInfoArray TLKnodes;
  1152. };
  1153. class CKeyDiffApplicator : public CInterface, public IKeyDiffApplicator
  1154. {
  1155. public:
  1156. IMPLEMENT_IINTERFACE;
  1157. CKeyDiffApplicator(char const * patch, char const * _oldIndex, char const * _newIndex, char const * _newTLK, bool _overwrite, bool _ignoreTLK)
  1158. : oldIndex(_oldIndex), newIndex(_newIndex), newTLK(_newTLK), overwrite(_overwrite), ignoreTLK(_ignoreTLK), keydiff(patch), keyedsize(0), rowsize(0)
  1159. {
  1160. }
  1161. ~CKeyDiffApplicator()
  1162. {
  1163. }
  1164. virtual void setTransmitTLK(INodeSender * sender)
  1165. {
  1166. tlkSender.setown(sender);
  1167. }
  1168. virtual void setReceiveTLK(INodeReceiver * receiver, unsigned numParts)
  1169. {
  1170. tlkGen.setown(new CTLKGenerator(receiver, numParts, keydiff.queryHeader()));
  1171. }
  1172. virtual void run()
  1173. {
  1174. init();
  1175. readOld(1);
  1176. bool more = true;
  1177. while(more)
  1178. {
  1179. CKeyDiff::CommandCode cmd = keydiff.readCmd();
  1180. switch(cmd)
  1181. {
  1182. case CKeyDiff::CMD_END:
  1183. more = false;
  1184. break;
  1185. case CKeyDiff::CMD_MATCH:
  1186. newcurr.copyFrom(oldcurr);
  1187. writeNew();
  1188. break;
  1189. case CKeyDiff::CMD_SKIP:
  1190. readOld(keydiff.readSkipCount());
  1191. break;
  1192. case CKeyDiff::CMD_DIFF_OLD_CURR:
  1193. doDiff(oldcurr);
  1194. break;
  1195. case CKeyDiff::CMD_DIFF_OLD_PREV:
  1196. doDiff(oldprev);
  1197. break;
  1198. case CKeyDiff::CMD_DIFF_NEW_PREV:
  1199. doDiff(newprev);
  1200. break;
  1201. case CKeyDiff::CMD_FPOS:
  1202. default:
  1203. UNIMPLEMENTED;
  1204. }
  1205. }
  1206. verifyCRCs();
  1207. if(newOutput->queryCount())
  1208. {
  1209. if(tlkGen)
  1210. {
  1211. bool wait = tlkGen->addNode(newprev.getNodeInfo(newOutput->queryCount()-1));
  1212. if(wait) PROGLOG("Waiting for remaining TLK data");
  1213. tlkGen->join();
  1214. }
  1215. else if(tlkSender)
  1216. {
  1217. Owned<CNodeInfo> ni(newprev.getNodeInfo(newOutput->queryCount()-1));
  1218. tlkSender->send(*ni);
  1219. }
  1220. }
  1221. }
  1222. virtual void getHeaderVersionInfo(unsigned short & versionMajor, unsigned short & versionMinor, unsigned short & minPatchVersionMajor, unsigned short & minPatchVersionMinor)
  1223. {
  1224. keydiff.readHeaderVersionInfo();
  1225. versionMajor = keydiff.queryHeader().queryVersion().queryMajor();
  1226. versionMinor = keydiff.queryHeader().queryVersion().queryMinor();
  1227. minPatchVersionMajor = keydiff.queryHeader().queryMinPatchVersion().queryMajor();
  1228. minPatchVersionMinor = keydiff.queryHeader().queryMinPatchVersion().queryMinor();
  1229. }
  1230. virtual void getHeaderFileInfo(StringAttr & oldindex, StringAttr & newindex, bool & tlkInfo, StringAttr & newTLK)
  1231. {
  1232. keydiff.readHeaderFileInfo();
  1233. oldindex.set(keydiff.queryHeader().queryOldIndex());
  1234. newindex.set(keydiff.queryHeader().queryNewIndex());
  1235. tlkInfo = keydiff.queryHeader().hasTLKInfo();
  1236. if(tlkInfo)
  1237. newTLK.set(keydiff.queryHeader().queryNewTLK());
  1238. }
  1239. virtual bool compatibleVersions(StringBuffer & error) const
  1240. {
  1241. return keydiff.compatibleVersions(error);
  1242. }
  1243. virtual void setProgressCallback(IKeyDiffProgressCallback * callback, offset_t freq)
  1244. {
  1245. progressCallback.setown(callback);
  1246. progressFrequency = freq;
  1247. }
  1248. private:
  1249. void init()
  1250. {
  1251. keydiff.readHeaderVersionInfo();
  1252. StringBuffer versionError;
  1253. if(!keydiff.compatibleVersions(versionError))
  1254. throw MakeStringExceptionDirect(0, versionError.str());
  1255. keydiff.readHeaderFileInfo();
  1256. if(!oldIndex.get())
  1257. oldIndex.set(keydiff.queryHeader().queryOldIndex());
  1258. if(!newIndex.get())
  1259. newIndex.set(keydiff.queryHeader().queryNewIndex());
  1260. if(tlkGen)
  1261. {
  1262. if(!newTLK.get())
  1263. {
  1264. if(keydiff.queryHeader().hasTLKInfo())
  1265. newTLK.set(keydiff.queryHeader().queryNewTLK());
  1266. else
  1267. throw MakeStringException(0, "Trying to generate TLK using filename from patch, but patch does not include TLK header information");
  1268. }
  1269. }
  1270. else if(keydiff.queryHeader().hasTLKInfo() && !ignoreTLK)
  1271. throw MakeStringException(0, "Patch includes TLK header information, but TLK generation not enabled --- aborting, invoke with warning suppressed to go ahead");
  1272. oldInput.setown(new CKeyReader(oldIndex));
  1273. keyedsize = oldInput->queryKeyedSize();
  1274. if(progressCallback)
  1275. oldInput->setProgressCallback(progressCallback.getLink(), progressFrequency);
  1276. rowsize = oldInput->queryRowSize();
  1277. newOutput.setown(new CKeyWriter());
  1278. newOutput->init(newIndex, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize());
  1279. if(tlkGen)
  1280. tlkGen->open(newTLK, overwrite, keyedsize, rowsize, oldInput->isVariableWidth(), oldInput->isQuickCompressed(), oldInput->getNodeSize());
  1281. newcurr.init(rowsize, oldInput->isVariableWidth());
  1282. newprev.init(rowsize, oldInput->isVariableWidth());
  1283. oldcurr.init(rowsize, oldInput->isVariableWidth());
  1284. oldprev.init(rowsize, oldInput->isVariableWidth());
  1285. if(tlkGen)
  1286. tlkGen->start();
  1287. }
  1288. bool readOld(unsigned count)
  1289. {
  1290. while(count--)
  1291. {
  1292. oldcurr.swap(oldprev);
  1293. if(!oldInput->get(oldcurr))
  1294. {
  1295. oldcurr.clear();
  1296. return false;
  1297. }
  1298. }
  1299. return true;
  1300. }
  1301. void doDiff(RowBuffer const & prev)
  1302. {
  1303. bool ok = keydiff.readDiffAndExpand(prev, newcurr);
  1304. if(!ok)
  1305. throw MakeStringException(0, "Error in patch file");
  1306. writeNew();
  1307. }
  1308. void writeNew()
  1309. {
  1310. newOutput->put(newcurr);
  1311. newcurr.swap(newprev);
  1312. }
  1313. void verifyCRCs()
  1314. {
  1315. oldInput->getToEnd(oldcurr);
  1316. if(oldInput->queryCRC() != keydiff.queryHeader().queryOldCRC())
  1317. WARNLOG("CRC mismatch: on keydiff, old index %s had key CRC %08X, while on keypatch, old index %s had key CRC %08X", keydiff.queryHeader().queryOldIndex(), keydiff.queryHeader().queryOldCRC(), oldIndex.get(), oldInput->queryCRC());
  1318. if(newOutput->queryCRC() != keydiff.queryHeader().queryNewCRC())
  1319. WARNLOG("CRC mismatch: on keydiff, new index %s had key CRC %08X, while on keypatch, new index %s generated with key CRC %08X", keydiff.queryHeader().queryNewIndex(), keydiff.queryHeader().queryNewCRC(), newIndex.get(), newOutput->queryCRC());
  1320. if(keydiff.queryCRC() != keydiff.queryHeader().queryPatchCRC())
  1321. WARNLOG("CRC mismatch: on keydiff, the patch was generated with block CRC %08X, while on keypatch, it was read with block CRC %08X: looks like there has been a file corruption", keydiff.queryHeader().queryPatchCRC(), keydiff.queryCRC());
  1322. }
  1323. private:
  1324. StringAttr oldIndex;
  1325. StringAttr newIndex;
  1326. StringAttr newTLK;
  1327. bool overwrite;
  1328. bool ignoreTLK;
  1329. CReadableKeyDiff keydiff;
  1330. Owned<CKeyReader> oldInput;
  1331. Owned<CKeyWriter> newOutput;
  1332. size32_t keyedsize;
  1333. size32_t rowsize;
  1334. RowBuffer newcurr;
  1335. RowBuffer newprev;
  1336. RowBuffer oldcurr;
  1337. RowBuffer oldprev;
  1338. Owned<CTLKGenerator> tlkGen;
  1339. Owned<INodeSender> tlkSender;
  1340. Owned<IKeyDiffProgressCallback> progressCallback;
  1341. offset_t progressFrequency;
  1342. };
  1343. IKeyDiffGenerator * createKeyDiffGenerator(char const * oldIndex, char const * newIndex, char const * patch, char const * newTLK, bool overwrite, unsigned compmode)
  1344. {
  1345. return new CKeyDiffGenerator(oldIndex, newIndex, patch, newTLK, overwrite, compmode);
  1346. }
  1347. IKeyDiffApplicator * createKeyDiffApplicator(char const * patch, bool overwrite, bool ignoreTLK)
  1348. {
  1349. return new CKeyDiffApplicator(patch, 0, 0, 0, overwrite, ignoreTLK);
  1350. }
  1351. IKeyDiffApplicator * createKeyDiffApplicator(char const * patch, char const * oldIndex, char const * newIndex, char const * newTLK, bool overwrite, bool ignoreTLK)
  1352. {
  1353. return new CKeyDiffApplicator(patch, oldIndex, newIndex, newTLK, overwrite, ignoreTLK);
  1354. }
  1355. StringBuffer & getKeyDiffVersion(StringBuffer & buff)
  1356. {
  1357. return buff.append(CKeyDiff::version.queryMajor()).append('.').append(CKeyDiff::version.queryMinor());
  1358. }
  1359. StringBuffer & getKeyDiffMinDiffVersionForPatch(StringBuffer & buff)
  1360. {
  1361. return buff.append(CKeyDiff::minDiffVersionForPatch.queryMajor()).append('.').append(CKeyDiff::minDiffVersionForPatch.queryMinor());
  1362. }
  1363. StringBuffer & getKeyDiffMinPatchVersionForDiff(StringBuffer & buff)
  1364. {
  1365. return buff.append(CKeyDiff::minPatchVersionForDiff.queryMajor()).append('.').append(CKeyDiff::minPatchVersionForDiff.queryMinor());
  1366. }
  1367. IKeyFileRowReader *createKeyFileReader(const char *filename)
  1368. {
  1369. return new CKeyFileReader(filename);
  1370. }
  1371. IKeyFileRowWriter *createKeyWriter(const char *filename,IPropertyTree *header, bool overwrite, unsigned nodeSize)
  1372. {
  1373. return new CKeyFileWriter(filename,header,overwrite, nodeSize);
  1374. }
  1375. /* To apply a patch, we require that
  1376. (a) the version of keydiff which generated the patch should be at least the minDiffVersionForPatch of the keypatch which applies it,
  1377. AND (b) the version of keypatch which applies the patch should be at least the minPatchVersionForDiff of the keydiff which generated it. */
  1378. KeyDiffVersion const CKeyDiff::version(1, 0);
  1379. KeyDiffVersion const CKeyDiff::minDiffVersionForPatch(0, 8);
  1380. KeyDiffVersion const CKeyDiff::minPatchVersionForDiff(1, 0); // version 1 for fastLZ
  1381. size32_t const CKeyDiff::streambuffsize = 0x20000;
  1382. size32_t const CKeyDiff::compressThreshold = 0x1000;