keybuild.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "keybuild.hpp"
  14. #include "jmisc.hpp"
  15. struct CRC32HTE
  16. {
  17. CRC32 crc;
  18. offset_t startBlockPos, endBlockPos;
  19. offset_t size;
  20. CRC32HTE() : startBlockPos(0), endBlockPos(0), size(0) { }
  21. const void *queryStartParam() const
  22. {
  23. return (const void *) &startBlockPos;
  24. }
  25. const void *queryEndParam() const
  26. {
  27. return (const void *) &endBlockPos;
  28. }
  29. };
  30. class CRC32HT : public SuperHashTableOf<CRC32HTE, offset_t>
  31. {
  32. public:
  33. CRC32HT(void) : SuperHashTableOf<CRC32HTE, offset_t>() { }
  34. CRC32HT(unsigned initsize) : SuperHashTableOf<CRC32HTE, offset_t>(initsize) { }
  35. ~CRC32HT() { _releaseAll(); }
  36. CRC32HTE *find(offset_t & fp) const { return SuperHashTableOf<CRC32HTE, offset_t>::find(&fp); }
  37. virtual void onAdd(void *et) { }
  38. virtual void onRemove(void *et) { }
  39. virtual unsigned getHashFromFindParam(const void *fp) const
  40. {
  41. return hashc((const unsigned char *) fp, sizeof(offset_t), 0);
  42. }
  43. };
  44. class CRC32StartHT : public CRC32HT
  45. {
  46. public:
  47. virtual unsigned getHashFromElement(const void *et) const
  48. {
  49. return hashc((const unsigned char *) ((const CRC32HTE *) et)->queryStartParam(), sizeof(offset_t), 0);
  50. }
  51. virtual const void *getFindParam(const void *et) const { return ((const CRC32HTE *)et)->queryStartParam(); }
  52. virtual bool matchesFindParam(const void *et, const void *fp, unsigned) const { return *(offset_t *)((const CRC32HTE *)et)->queryStartParam() == *(offset_t *)fp; }
  53. };
  54. class CRC32EndHT : public CRC32HT
  55. {
  56. public:
  57. virtual unsigned getHashFromElement(const void *et) const
  58. {
  59. return hashc((const unsigned char *) ((const CRC32HTE *) et)->queryEndParam(), sizeof(offset_t), 0);
  60. }
  61. virtual const void *getFindParam(const void *et) const { return ((const CRC32HTE *)et)->queryEndParam(); }
  62. virtual bool matchesFindParam(const void *et, const void *fp, unsigned) const { return *(offset_t *)((const CRC32HTE *)et)->queryEndParam() == *(offset_t *)fp; }
  63. };
  64. class CKeyBuilderBase : public CInterface
  65. {
  66. protected:
  67. unsigned keyValueSize;
  68. count_t records;
  69. unsigned levels;
  70. offset_t nextPos;
  71. Owned<CKeyHdr> keyHdr;
  72. CWriteNode *prevLeafNode;
  73. NodeInfoArray leafInfo;
  74. Linked<IFileIOStream> out;
  75. unsigned keyedSize;
  76. unsigned __int64 sequence;
  77. CRC32StartHT crcStartPosTable;
  78. CRC32EndHT crcEndPosTable;
  79. bool doCrc;
  80. public:
  81. CKeyBuilderBase(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned _keyedSize, unsigned __int64 _startSequence) : out(_out)
  82. {
  83. doCrc = false;
  84. sequence = _startSequence;
  85. keyHdr.setown(new CKeyHdr());
  86. keyValueSize = rawSize;
  87. keyedSize = _keyedSize != (unsigned) -1 ? _keyedSize : rawSize;
  88. levels = 0;
  89. records = 0;
  90. nextPos = nodeSize; // leaving room for header
  91. prevLeafNode = NULL;
  92. assertex(nodeSize >= CKeyHdr::getSize());
  93. assertex(nodeSize <= 0xffff); // stored in a short in the header - we should fix that if/when we restructure header
  94. KeyHdr *hdr = keyHdr->getHdrStruct();
  95. hdr->nodeSize = nodeSize;
  96. hdr->extsiz = 4096;
  97. hdr->length = keyValueSize;
  98. hdr->ktype = flags;
  99. hdr->timeid = 0;
  100. hdr->clstyp = 1; // IDX_CLOSE
  101. hdr->maxkbn = nodeSize-sizeof(NodeHdr);
  102. hdr->maxkbl = hdr->maxkbn;
  103. hdr->flpntr = sizeof(offset_t);
  104. hdr->verson = 130; // version from ctree.
  105. hdr->keypad = ' ';
  106. hdr->flflvr = 1;
  107. hdr->flalgn = 8;
  108. hdr->maxmrk = hdr->nodeSize/4; // always this in ctree.
  109. hdr->namlen = 255;
  110. hdr->defrel = 8;
  111. hdr->hdrseq = 0;
  112. hdr->fposOffset = 0;
  113. hdr->fileSize = 0;
  114. hdr->nodeKeyLength = _keyedSize;
  115. hdr->version = KEYBUILD_VERSION;
  116. hdr->blobHead = 0;
  117. hdr->metadataHead = 0;
  118. keyHdr->write(out); // Reserve space for the header - we'll seek back and write it properly later
  119. }
  120. CKeyBuilderBase(CKeyHdr * chdr)
  121. {
  122. levels = 0;
  123. records = 0;
  124. prevLeafNode = NULL;
  125. keyHdr.set(chdr);
  126. KeyHdr *hdr = keyHdr->getHdrStruct();
  127. records = hdr->nument;
  128. nextPos = hdr->nodeSize; // leaving room for header
  129. keyValueSize = keyHdr->getMaxKeyLength();
  130. keyedSize = keyHdr->getNodeKeyLength();
  131. }
  132. ~CKeyBuilderBase()
  133. {
  134. for (;;)
  135. {
  136. CRC32HTE *et = (CRC32HTE *)crcEndPosTable.next(NULL);
  137. if (!et) break;
  138. crcEndPosTable.removeExact(et);
  139. delete et;
  140. }
  141. }
  142. void buildLevel(NodeInfoArray &thisLevel, NodeInfoArray &parents)
  143. {
  144. unsigned int leaf = 0;
  145. CWriteNode *node = NULL;
  146. node = new CWriteNode(nextPos, keyHdr, levels==0);
  147. nextPos += keyHdr->getNodeSize();
  148. while (leaf<thisLevel.ordinality())
  149. {
  150. CNodeInfo &info = thisLevel.item(leaf);
  151. if (!node->add(info.pos, info.value, info.size, info.sequence))
  152. {
  153. flushNode(node, parents);
  154. node->Release();
  155. node = new CWriteNode(nextPos, keyHdr, levels==0);
  156. nextPos += keyHdr->getNodeSize();
  157. verifyex(node->add(info.pos, info.value, info.size, info.sequence));
  158. }
  159. leaf++;
  160. }
  161. flushNode(node, parents);
  162. flushNode(NULL, parents);
  163. node->Release();
  164. }
  165. protected:
  166. offset_t endLevel(bool close)
  167. {
  168. return 0;
  169. }
  170. offset_t nextLevel()
  171. {
  172. offset_t ret = endLevel(false);
  173. levels++;
  174. return 0;
  175. }
  176. void writeFileHeader(bool fixHdr, CRC32 *crc)
  177. {
  178. if (out)
  179. {
  180. out->flush();
  181. out->seek(0, IFSbegin);
  182. keyHdr->write(out, crc);
  183. }
  184. }
  185. void writeNode(CWriteNodeBase *node)
  186. {
  187. unsigned nodeSize = keyHdr->getNodeSize();
  188. if (doCrc)
  189. {
  190. offset_t nodePos = node->getFpos();
  191. CRC32HTE *rollingCrcEntry1 = crcEndPosTable.find(nodePos); // is start of this block end of another?
  192. nodePos += nodeSize; // update to endpos
  193. if (rollingCrcEntry1)
  194. {
  195. crcEndPosTable.removeExact(rollingCrcEntry1); // end pos will change
  196. node->write(out, &rollingCrcEntry1->crc);
  197. rollingCrcEntry1->size += nodeSize;
  198. CRC32HTE *rollingCrcEntry2 = crcStartPosTable.find(nodePos); // is end of this block, start of another?
  199. if (rollingCrcEntry2)
  200. {
  201. crcStartPosTable.removeExact(rollingCrcEntry2); // remove completely, will join to rollingCrcEntry1
  202. crcEndPosTable.removeExact(rollingCrcEntry2);
  203. CRC32Merger crcMerger;
  204. crcMerger.addChildCRC(rollingCrcEntry1->size, rollingCrcEntry1->crc.get(), true);
  205. crcMerger.addChildCRC(rollingCrcEntry2->size, rollingCrcEntry2->crc.get(), true);
  206. rollingCrcEntry1->crc.reset(~crcMerger.get());
  207. rollingCrcEntry1->size += rollingCrcEntry2->size;
  208. rollingCrcEntry1->endBlockPos = rollingCrcEntry2->endBlockPos;
  209. delete rollingCrcEntry2;
  210. }
  211. else
  212. rollingCrcEntry1->endBlockPos = nodePos;
  213. crcEndPosTable.replace(*rollingCrcEntry1);
  214. }
  215. else
  216. {
  217. rollingCrcEntry1 = crcStartPosTable.find(nodePos); // is end of this node, start of another?
  218. if (rollingCrcEntry1)
  219. {
  220. crcStartPosTable.removeExact(rollingCrcEntry1); // start pos will change
  221. CRC32 crcFirst;
  222. node->write(out, &crcFirst);
  223. CRC32Merger crcMerger;
  224. crcMerger.addChildCRC(nodeSize, crcFirst.get(), true);
  225. crcMerger.addChildCRC(rollingCrcEntry1->size, rollingCrcEntry1->crc.get(), true);
  226. rollingCrcEntry1->crc.reset(~crcMerger.get());
  227. rollingCrcEntry1->startBlockPos = node->getFpos();
  228. rollingCrcEntry1->size += nodeSize;
  229. crcStartPosTable.replace(*rollingCrcEntry1);
  230. }
  231. else
  232. {
  233. rollingCrcEntry1 = new CRC32HTE;
  234. node->write(out, &rollingCrcEntry1->crc);
  235. rollingCrcEntry1->startBlockPos = node->getFpos();
  236. rollingCrcEntry1->endBlockPos = node->getFpos()+nodeSize;
  237. rollingCrcEntry1->size = nodeSize;
  238. crcStartPosTable.replace(*rollingCrcEntry1);
  239. crcEndPosTable.replace(*rollingCrcEntry1);
  240. }
  241. }
  242. }
  243. else
  244. node->write(out);
  245. }
  246. void flushNode(CWriteNode *node, NodeInfoArray &nodeInfo)
  247. {
  248. // Messy code, but I don't have the energy to recode right now.
  249. if (prevLeafNode != NULL)
  250. {
  251. unsigned __int64 lastSequence = prevLeafNode->getLastSequence();
  252. if (node)
  253. {
  254. prevLeafNode->setRightSib(node->getFpos());
  255. node->setLeftSib(prevLeafNode->getFpos());
  256. nodeInfo.append(* new CNodeInfo(prevLeafNode->getFpos(), prevLeafNode->getLastKeyValue(), keyedSize, lastSequence));
  257. }
  258. else
  259. nodeInfo.append(* new CNodeInfo(prevLeafNode->getFpos(), NULL, keyedSize, lastSequence));
  260. writeNode(prevLeafNode);
  261. prevLeafNode->Release();
  262. prevLeafNode = NULL;
  263. }
  264. if (NULL != node)
  265. {
  266. prevLeafNode = node;
  267. prevLeafNode->Link();
  268. }
  269. }
  270. void buildTree(NodeInfoArray &children)
  271. {
  272. if (children.ordinality() != 1 || levels==0)
  273. {
  274. // Note that we always create at least 2 levels as various places assume it
  275. // Also when building keys for Moxie (bias != 0), need parent level due to assumptions in DKC...
  276. offset_t offset = nextLevel();
  277. if (offset)
  278. {
  279. ForEachItemIn(idx, children)
  280. {
  281. CNodeInfo &info = children.item(idx);
  282. info.pos += offset;
  283. }
  284. }
  285. NodeInfoArray parentInfo;
  286. buildLevel(children, parentInfo);
  287. buildTree(parentInfo);
  288. }
  289. else
  290. {
  291. KeyHdr *hdr = keyHdr->getHdrStruct();
  292. hdr->nument = records;
  293. hdr->root = nextPos - hdr->nodeSize;
  294. hdr->phyrec = hdr->numrec = nextPos-1;
  295. hdr->maxmrk = hdr->nodeSize/4; // always this in ctree.
  296. hdr->namlen = 255;
  297. hdr->defrel = 8;
  298. hdr->hdrseq = levels;
  299. }
  300. }
  301. };
  302. class CKeyBuilder : public CKeyBuilderBase, implements IKeyBuilder
  303. {
  304. private:
  305. CWriteNode *activeNode;
  306. CBlobWriteNode *activeBlobNode;
  307. public:
  308. IMPLEMENT_IINTERFACE;
  309. CKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyedSize, unsigned __int64 startSequence)
  310. : CKeyBuilderBase(_out, flags, rawSize, nodeSize, keyedSize, startSequence)
  311. {
  312. doCrc = true;
  313. activeNode = NULL;
  314. activeBlobNode = NULL;
  315. }
  316. public:
  317. void finish(unsigned *fileCrc)
  318. {
  319. finish(NULL, fileCrc);
  320. }
  321. void finish(IPropertyTree * metadata, unsigned * fileCrc)
  322. {
  323. if (NULL != activeNode)
  324. {
  325. flushNode(activeNode, leafInfo);
  326. activeNode->Release();
  327. }
  328. if (NULL != activeBlobNode)
  329. {
  330. writeNode(activeBlobNode);
  331. activeBlobNode->Release();
  332. }
  333. flushNode(NULL, leafInfo);
  334. buildTree(leafInfo);
  335. if(metadata)
  336. {
  337. assertex(strcmp(metadata->queryName(), "metadata") == 0);
  338. StringBuffer metaXML;
  339. toXML(metadata, metaXML);
  340. writeMetadata(metaXML.str(), metaXML.length());
  341. }
  342. CRC32 headerCrc;
  343. writeFileHeader(false, &headerCrc);
  344. if (fileCrc)
  345. {
  346. if (doCrc)
  347. {
  348. assertex(crcEndPosTable.count() <= 1);
  349. CRC32Merger crcMerger;
  350. crcMerger.addChildCRC(keyHdr->getNodeSize(), headerCrc.get(), true);
  351. CRC32HTE *rollingCrcEntry = (CRC32HTE *)crcEndPosTable.next(NULL);
  352. if (rollingCrcEntry)
  353. crcMerger.addChildCRC(rollingCrcEntry->size, rollingCrcEntry->crc.get(), true);
  354. *fileCrc = crcMerger.get();
  355. }
  356. else
  357. *fileCrc = 0;
  358. }
  359. }
  360. void addLeafInfo(CNodeInfo *info)
  361. {
  362. leafInfo.append(* info);
  363. }
  364. void processKeyData(const char *keyData, offset_t pos, size32_t recsize)
  365. {
  366. records++;
  367. if (NULL == activeNode)
  368. {
  369. activeNode = new CWriteNode(nextPos, keyHdr, true);
  370. nextPos += keyHdr->getNodeSize();
  371. }
  372. if (!activeNode->add(pos, keyData, recsize, sequence))
  373. {
  374. assertex(NULL != activeNode->getLastKeyValue()); // empty and doesn't fit!
  375. flushNode(activeNode, leafInfo);
  376. activeNode->Release();
  377. activeNode = new CWriteNode(nextPos, keyHdr, true);
  378. nextPos += keyHdr->getNodeSize();
  379. if (!activeNode->add(pos, keyData, recsize, sequence))
  380. throw MakeStringException(0, "Key row too large to fit within a key node (uncompressed size=%d, variable=%s, pos=%" I64F "d)", recsize, keyHdr->isVariable()?"true":"false", pos);
  381. }
  382. sequence++;
  383. }
  384. void newBlobNode()
  385. {
  386. if (keyHdr->getHdrStruct()->blobHead == 0)
  387. keyHdr->getHdrStruct()->blobHead = nextPos;
  388. CBlobWriteNode *prevBlobNode = activeBlobNode;
  389. activeBlobNode = new CBlobWriteNode(nextPos, keyHdr);
  390. nextPos += keyHdr->getNodeSize();
  391. if (prevBlobNode)
  392. {
  393. activeBlobNode->setLeftSib(prevBlobNode->getFpos());
  394. prevBlobNode->setRightSib(activeBlobNode->getFpos());
  395. writeNode(prevBlobNode);
  396. delete(prevBlobNode);
  397. }
  398. }
  399. virtual unsigned __int64 createBlob(size32_t size, const char * ptr)
  400. {
  401. if (!size)
  402. return 0;
  403. if (NULL == activeBlobNode)
  404. newBlobNode();
  405. unsigned __int64 head = activeBlobNode->add(ptr, size);
  406. if (!head)
  407. {
  408. newBlobNode();
  409. head = activeBlobNode->add(ptr, size);
  410. assertex(head);
  411. }
  412. while (size)
  413. {
  414. newBlobNode();
  415. unsigned __int64 chunkhead = activeBlobNode->add(ptr, size);
  416. assertex(chunkhead);
  417. }
  418. return head;
  419. }
  420. protected:
  421. void writeMetadata(char const * data, size32_t size)
  422. {
  423. assertex(keyHdr->getHdrStruct()->metadataHead == 0);
  424. assertex(size);
  425. keyHdr->getHdrStruct()->metadataHead = nextPos;
  426. Owned<CMetadataWriteNode> prevNode;
  427. while(size)
  428. {
  429. Owned<CMetadataWriteNode> node(new CMetadataWriteNode(nextPos, keyHdr));
  430. nextPos += keyHdr->getNodeSize();
  431. size32_t written = node->set(data, size);
  432. assertex(written);
  433. if(prevNode)
  434. {
  435. node->setLeftSib(prevNode->getFpos());
  436. prevNode->setRightSib(node->getFpos());
  437. writeNode(prevNode);
  438. }
  439. prevNode.setown(node.getLink());
  440. }
  441. writeNode(prevNode);
  442. }
  443. };
  444. extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence)
  445. {
  446. return new CKeyBuilder(_out, flags, rawSize, nodeSize, keyFieldSize, startSequence);
  447. }
  448. class PartNodeInfo : public CInterface
  449. {
  450. public:
  451. PartNodeInfo(unsigned _part, NodeInfoArray & _nodes)
  452. {
  453. part = _part;
  454. ForEachItemIn(idx, _nodes)
  455. nodes.append(OLINK(_nodes.item(idx)));
  456. }
  457. public:
  458. unsigned part;
  459. NodeInfoArray nodes;
  460. };
  461. int compareParts(CInterface * const * _left, CInterface * const * _right)
  462. {
  463. PartNodeInfo * left = (PartNodeInfo *)*_left;
  464. PartNodeInfo * right = (PartNodeInfo *)*_right;
  465. return (int)(left->part - right->part);
  466. }
  467. class CKeyDesprayer : public CKeyBuilderBase, public IKeyDesprayer
  468. {
  469. public:
  470. CKeyDesprayer(CKeyHdr * _hdr, IFileIOStream * _out) : CKeyBuilderBase(_hdr)
  471. {
  472. out.set(_out);
  473. nextPos = out->tell();
  474. }
  475. IMPLEMENT_IINTERFACE
  476. virtual void addPart(unsigned idx, offset_t numRecords, NodeInfoArray & nodes)
  477. {
  478. records += numRecords;
  479. parts.append(* new PartNodeInfo(idx, nodes));
  480. }
  481. virtual void finish()
  482. {
  483. levels = 1; // already processed one level of index....
  484. parts.sort(compareParts);
  485. ForEachItemIn(idx, parts)
  486. {
  487. NodeInfoArray & nodes = parts.item(idx).nodes;
  488. ForEachItemIn(idx2, nodes)
  489. leafInfo.append(OLINK(nodes.item(idx2)));
  490. }
  491. buildTree(leafInfo);
  492. writeFileHeader(true, NULL);
  493. }
  494. protected:
  495. CIArrayOf<PartNodeInfo> parts;
  496. };
  497. extern jhtree_decl IKeyDesprayer * createKeyDesprayer(IFile * in, IFileIOStream * out)
  498. {
  499. Owned<IFileIO> io = in->open(IFOread);
  500. MemoryAttr buffer(sizeof(KeyHdr));
  501. io->read(0, sizeof(KeyHdr), (void *)buffer.get());
  502. Owned<CKeyHdr> hdr = new CKeyHdr;
  503. hdr->load(*(KeyHdr *)buffer.get());
  504. hdr->getHdrStruct()->nument = 0;
  505. return new CKeyDesprayer(hdr, out);
  506. }