jhtree.cpp 107 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. //****************************************************************************
  14. // Name: jhtree.cpp
  15. //
  16. // Purpose:
  17. //
  18. // Description:
  19. //
  20. // Notes: Supports only static (non-changing) files
  21. //
  22. // Initially I was holding on to the root nodes, but came to find
  23. // that they could potentially fill the cache by themselves...
  24. //
  25. // Things to play with:
  26. // - try not unpacking the entire node when it is read in.
  27. // break it out as needed later.
  28. //
  29. // History: 31-Aug-99 crs original
  30. // 08-Jan-00 nh added LZW compression of nodes
  31. // 14-feb-00 nh added GetORDKey
  32. // 15-feb-00 nh fixed isolatenode and nextNode
  33. // 12-Apr-00 jcs moved over to jhtree.dll etc.
  34. //****************************************************************************
  35. #include "platform.h"
  36. #include <stdio.h>
  37. #include <fcntl.h>
  38. #include <stdlib.h>
  39. #include <limits.h>
  40. #ifdef __linux__
  41. #include <alloca.h>
  42. #endif
  43. #include "hlzw.h"
  44. #include "jmutex.hpp"
  45. #include "jhutil.hpp"
  46. #include "jmisc.hpp"
  47. #include "jstats.h"
  48. #include "ctfile.hpp"
  49. #include "jhtree.ipp"
  50. #include "keybuild.hpp"
  51. #include "layouttrans.hpp"
  52. static CKeyStore *keyStore = NULL;
  53. static unsigned defaultKeyIndexLimit = 200;
  54. static CNodeCache *nodeCache = NULL;
  55. static CriticalSection *initCrit = NULL;
  56. bool useMemoryMappedIndexes = false;
  57. bool logExcessiveSeeks = false;
  58. bool linuxYield = false;
  59. bool traceSmartStepping = false;
  60. bool traceJHtreeAllocations = false;
  61. bool flushJHtreeCacheOnOOM = true;
  62. MODULE_INIT(INIT_PRIORITY_JHTREE_JHTREE)
  63. {
  64. initCrit = new CriticalSection;
  65. return 1;
  66. }
  67. MODULE_EXIT()
  68. {
  69. delete initCrit;
  70. delete keyStore;
  71. ::Release((CInterface*)nodeCache);
  72. }
  73. //#define DUMP_NODES
  74. unsigned SegMonitorList::ordinality() const
  75. {
  76. return segMonitors.length();
  77. }
  78. IKeySegmentMonitor *SegMonitorList::item(unsigned idx) const
  79. {
  80. return &segMonitors.item(idx);
  81. }
  82. size32_t SegMonitorList::getSize() const
  83. {
  84. unsigned lim = segMonitors.length();
  85. if (lim)
  86. {
  87. IKeySegmentMonitor &lastItem = segMonitors.item(lim-1);
  88. return lastItem.getOffset() + lastItem.getSize();
  89. }
  90. else
  91. return 0;
  92. }
  93. void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname)
  94. {
  95. if (getSize() != keyedSize)
  96. {
  97. StringBuffer err;
  98. err.appendf("Key size mismatch on key %s - key size is %u, expected %u", keyname, keyedSize, getSize());
  99. IException *e = MakeStringExceptionDirect(1000, err.str());
  100. EXCLOG(e, err.str());
  101. throw e;
  102. }
  103. }
  104. void SegMonitorList::setLow(unsigned segno, void *keyBuffer) const
  105. {
  106. unsigned lim = segMonitors.length();
  107. while (segno < lim)
  108. segMonitors.item(segno++).setLow(keyBuffer);
  109. }
  110. unsigned SegMonitorList::setLowAfter(size32_t offset, void *keyBuffer) const
  111. {
  112. unsigned lim = segMonitors.length();
  113. unsigned segno = 0;
  114. unsigned skipped = 0;
  115. while (segno < lim)
  116. {
  117. IKeySegmentMonitor &seg = segMonitors.item(segno++);
  118. if (seg.getOffset() >= offset)
  119. seg.setLow(keyBuffer);
  120. else if (seg.getSize()+seg.getOffset() <= offset)
  121. skipped++;
  122. else
  123. {
  124. byte *temp = (byte *) alloca(seg.getSize() + seg.getOffset());
  125. seg.setLow(temp);
  126. memcpy((byte *)keyBuffer+offset, temp+offset, seg.getSize() - (offset - seg.getOffset()));
  127. }
  128. }
  129. return skipped;
  130. }
  131. void SegMonitorList::endRange(unsigned segno, void *keyBuffer) const
  132. {
  133. unsigned lim = segMonitors.length();
  134. if (segno < lim)
  135. segMonitors.item(segno++).endRange(keyBuffer);
  136. while (segno < lim)
  137. segMonitors.item(segno++).setHigh(keyBuffer);
  138. }
  139. bool SegMonitorList::incrementKey(unsigned segno, void *keyBuffer) const
  140. {
  141. // Increment the key buffer to next acceptable value
  142. for(;;)
  143. {
  144. if (segMonitors.item(segno).increment(keyBuffer))
  145. {
  146. setLow(segno+1, keyBuffer);
  147. return true;
  148. }
  149. if (!segno)
  150. return false;
  151. segno--;
  152. }
  153. }
  154. unsigned SegMonitorList::_lastRealSeg() const
  155. {
  156. unsigned seg = segMonitors.length();
  157. loop
  158. {
  159. if (!seg)
  160. return 0;
  161. seg--;
  162. if (!segMonitors.item(seg).isWild())
  163. return seg;
  164. }
  165. }
  166. unsigned SegMonitorList::lastFullSeg() const
  167. {
  168. // This is used to determine what part of the segmonitor list to use for a pre-count to determine if atmost/limit have been hit
  169. // We include everything up to the last of i) the last keyed element or ii) the last keyed,opt element that has no wild between it and a keyed element
  170. // NOTE - can return (unsigned) -1 if there are no full segments
  171. unsigned len = segMonitors.length();
  172. unsigned seg = 0;
  173. unsigned ret = (unsigned) -1;
  174. bool wildSeen = false;
  175. while (seg < len)
  176. {
  177. if (segMonitors.item(seg).isWild())
  178. wildSeen = true;
  179. else
  180. {
  181. if (!wildSeen || !segMonitors.item(seg).isOptional())
  182. {
  183. ret = seg;
  184. wildSeen = false;
  185. }
  186. }
  187. seg++;
  188. }
  189. return ret;
  190. }
  191. void SegMonitorList::finish()
  192. {
  193. if (modified)
  194. {
  195. unsigned len = segMonitors.length();
  196. unsigned curOffset = 0;
  197. unsigned curSize = 0;
  198. if (len)
  199. {
  200. IKeySegmentMonitor *current = &segMonitors.item(0);
  201. curOffset = current->getOffset();
  202. curSize = current->getSize();
  203. for (unsigned seg = 1; seg < len; seg++)
  204. {
  205. IKeySegmentMonitor *next = &segMonitors.item(seg);
  206. unsigned nextOffset = next->getOffset();
  207. unsigned nextSize = next->getSize();
  208. assertex(curOffset + curSize == nextOffset) ;
  209. IKeySegmentMonitor *merged = NULL;
  210. if (nextOffset != mergeBarrier)
  211. merged = current->merge(next);
  212. if (merged)
  213. {
  214. current = merged;
  215. segMonitors.replace(*current, seg-1);
  216. segMonitors.remove(seg);
  217. curSize += nextSize;
  218. seg--;
  219. len--;
  220. }
  221. else
  222. {
  223. current = next;
  224. curOffset = nextOffset;
  225. curSize = nextSize;
  226. }
  227. }
  228. }
  229. recalculateCache();
  230. modified = false;
  231. }
  232. }
  233. void SegMonitorList::recalculateCache()
  234. {
  235. cachedLRS = _lastRealSeg();
  236. }
  237. // interface IIndexReadContext
  238. void SegMonitorList::append(IKeySegmentMonitor *segment)
  239. {
  240. modified = true;
  241. unsigned offset = segment->getOffset();
  242. unsigned size = segment->getSize();
  243. if (mergeBarrier)
  244. {
  245. // check if we need to split
  246. if (offset < mergeBarrier && offset+size > mergeBarrier)
  247. {
  248. IKeySegmentMonitor *split = segment->split(mergeBarrier - offset);
  249. assertex(split);
  250. append(split); // note - recursive call
  251. append(segment);
  252. return;
  253. }
  254. }
  255. if (segMonitors.length())
  256. {
  257. unsigned lpos = segMonitors.length()-1;
  258. IKeySegmentMonitor &last = segMonitors.item(lpos);
  259. if (last.getOffset() + last.getSize() == offset)
  260. {
  261. segMonitors.append(*segment);
  262. }
  263. else
  264. {
  265. // we need to combine new segmonitor with existing segmonitors
  266. assertex (offset + size <= last.getOffset() + last.getSize()); // no holes should ever be created...
  267. if (!segment->isWild()) // X and wild is always X
  268. {
  269. // Find segmonitor that overlaps the new one, splitting if necessary
  270. unsigned idx = 0;
  271. while (segMonitors.isItem(idx))
  272. {
  273. IKeySegmentMonitor &existing = segMonitors.item(idx);
  274. unsigned existingOffset = existing.getOffset();
  275. unsigned existingSize = existing.getSize();
  276. if (existingOffset <= offset && existingOffset+existingSize > offset)
  277. {
  278. // There is some overlap ...
  279. if (existingOffset < offset)
  280. {
  281. // split existing at start
  282. IKeySegmentMonitor *splitExisting = existing.split(offset-existingOffset);
  283. segMonitors.add(*splitExisting, idx);
  284. // now loop around to handle second half of existing
  285. }
  286. else if (size > existingSize)
  287. {
  288. // split new, merge first half into existing, and loop around to merge second half
  289. Owned<IKeySegmentMonitor> splitNew = segment->split(existingSize);
  290. IKeySegmentMonitor *combined = existing.combine(splitNew);
  291. segMonitors.replace(*combined, idx);
  292. }
  293. else if (size < existingSize)
  294. {
  295. // split existing, then merge new into first half
  296. Owned<IKeySegmentMonitor> split = existing.split(size); // leaves existing in the array as the tail
  297. IKeySegmentMonitor *combined = segment->combine(split);
  298. segMonitors.add(*combined, idx);
  299. break;
  300. }
  301. else // perfect overlap
  302. {
  303. IKeySegmentMonitor *combined = segment->combine(&existing);
  304. segMonitors.add(*combined, idx);
  305. break;
  306. }
  307. }
  308. idx++;
  309. }
  310. }
  311. segment->Release();
  312. }
  313. }
  314. else
  315. segMonitors.append(*segment);
  316. }
  317. bool SegMonitorList::matched(void *keyBuffer, unsigned &lastMatch) const
  318. {
  319. lastMatch = 0;
  320. for (; lastMatch < segMonitors.length(); lastMatch++)
  321. {
  322. if (!segMonitors.item(lastMatch).matches(keyBuffer))
  323. return false;
  324. }
  325. return true;
  326. }
  327. ///
  328. class jhtree_decl CKeyLevelManager : public CInterface, implements IKeyManager
  329. {
  330. protected:
  331. IContextLogger *ctx;
  332. SegMonitorList segs;
  333. IKeyCursor *keyCursor;
  334. char *keyBuffer;
  335. offset_t lookupFpos;
  336. unsigned keySize; // size of key record including payload
  337. unsigned keyedSize; // size of non-payload part of key
  338. unsigned eclKeySize; // size of output record according to ecl
  339. unsigned numsegs;
  340. bool matched;
  341. bool eof;
  342. bool started;
  343. StringAttr keyName;
  344. unsigned seeks;
  345. unsigned scans;
  346. unsigned skips;
  347. unsigned nullSkips;
  348. unsigned wildseeks;
  349. Owned<IRecordLayoutTranslator> layoutTrans;
  350. bool transformSegs;
  351. IIndexReadContext * activitySegs;
  352. CMemoryBlock layoutTransBuff;
  353. Owned<IRecordLayoutTranslator::SegmentMonitorContext> layoutTransSegCtx;
  354. Owned<IRecordLayoutTranslator::RowTransformContext> layoutTransRowCtx;
  355. inline void setLow(unsigned segNo)
  356. {
  357. segs.setLow(segNo, keyBuffer);
  358. }
  359. inline unsigned setLowAfter(size32_t offset)
  360. {
  361. return segs.setLowAfter(offset, keyBuffer);
  362. }
  363. inline bool incrementKey(unsigned segno) const
  364. {
  365. return segs.incrementKey(segno, keyBuffer);
  366. }
  367. inline void endRange(unsigned segno)
  368. {
  369. segs.endRange(segno, keyBuffer);
  370. }
  371. byte const * doRowLayoutTransform(offset_t & fpos)
  372. {
  373. layoutTrans->transformRow(layoutTransRowCtx, reinterpret_cast<byte const *>(keyBuffer), keySize, layoutTransBuff, fpos);
  374. return layoutTransBuff.get();
  375. }
  376. bool skipTo(const void *_seek, size32_t seekOffset, size32_t seeklen)
  377. {
  378. // Modify the current key contents buffer as follows
  379. // Take bytes up to seekoffset from current buffer (i.e. leave them alone)
  380. // Take up to seeklen bytes from seek comparing them as I go. If I see a lower one before I see a higher one, stop.
  381. // If I didn't see any higher ones, return (at which point the skipto was a no-op
  382. // If I saw higher ones, call setLowAfter for all remaining segmonitors
  383. // If the current contents of buffer could not match, call incremementKey at the appropriate monitor so that it can
  384. // Clear the matched flag
  385. const byte *seek = (const byte *) _seek;
  386. while (seeklen)
  387. {
  388. int c = *seek - (byte) (keyBuffer[seekOffset]);
  389. if (c < 0)
  390. return false;
  391. else if (c>0)
  392. {
  393. memcpy(keyBuffer+seekOffset, seek, seeklen);
  394. break;
  395. }
  396. seek++;
  397. seekOffset++;
  398. seeklen--;
  399. }
  400. #ifdef _DEBUG
  401. if (traceSmartStepping)
  402. {
  403. StringBuffer recstr;
  404. unsigned i;
  405. for (i = 0; i < keySize; i++)
  406. {
  407. unsigned char c = ((unsigned char *) keyBuffer)[i];
  408. recstr.appendf("%c", isprint(c) ? c : '.');
  409. }
  410. recstr.append (" ");
  411. for (i = 0; i < keySize; i++)
  412. {
  413. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  414. }
  415. DBGLOG("SKIP: IN skips=%02d nullskips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  416. }
  417. #endif
  418. if (!seeklen) return false;
  419. unsigned j = setLowAfter(seekOffset + seeklen);
  420. bool canmatch = true;
  421. unsigned lastSeg = segs.lastRealSeg();
  422. for (; j <= lastSeg; j++)
  423. {
  424. canmatch = segs.segMonitors.item(j).matches(keyBuffer);
  425. if (!canmatch)
  426. {
  427. eof = !incrementKey(j);
  428. break;
  429. }
  430. }
  431. matched = false;
  432. return true;
  433. }
  434. void noteSeeks(unsigned lseeks, unsigned lscans, unsigned lwildseeks)
  435. {
  436. seeks += lseeks;
  437. scans += lscans;
  438. wildseeks += lwildseeks;
  439. if (ctx)
  440. {
  441. if (lseeks) ctx->noteStatistic(STATS_INDEX_SEEKS, lseeks, 1);
  442. if (lscans) ctx->noteStatistic(STATS_INDEX_SCANS, lscans, 1);
  443. if (lwildseeks) ctx->noteStatistic(STATS_INDEX_WILDSEEKS, lwildseeks, 1);
  444. }
  445. }
  446. void noteSkips(unsigned lskips, unsigned lnullSkips)
  447. {
  448. skips += lskips;
  449. nullSkips += lnullSkips;
  450. if (ctx)
  451. {
  452. if (lskips) ctx->noteStatistic(STATS_INDEX_SKIPS, lskips, 1);
  453. if (lnullSkips) ctx->noteStatistic(STATS_INDEX_NULLSKIPS, lnullSkips, 1);
  454. }
  455. }
  456. void reportExcessiveSeeks(unsigned numSeeks, unsigned lastSeg)
  457. {
  458. StringBuffer recstr;
  459. unsigned i;
  460. for (i = 0; i < keySize; i++)
  461. {
  462. unsigned char c = ((unsigned char *) keyBuffer)[i];
  463. recstr.appendf("%c", isprint(c) ? c : '.');
  464. }
  465. recstr.append ("\n");
  466. for (i = 0; i < keySize; i++)
  467. {
  468. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  469. }
  470. recstr.append ("\nusing segmonitors:\n");
  471. for (i=0; i <= lastSeg; i++)
  472. {
  473. unsigned size = segs.segMonitors.item(i).getSize();
  474. while (size--)
  475. recstr.append( segs.segMonitors.item(i).isWild() ? '?' : '#');
  476. }
  477. if (ctx)
  478. ctx->CTXLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
  479. else
  480. DBGLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
  481. }
  482. public:
  483. IMPLEMENT_IINTERFACE;
  484. CKeyLevelManager(IKeyIndex * _key, unsigned _eclKeySize, IContextLogger *_ctx)
  485. {
  486. ctx = _ctx;
  487. numsegs = 0;
  488. keyBuffer = NULL;
  489. keyCursor = NULL;
  490. eclKeySize = _eclKeySize;
  491. started = false;
  492. setKey(_key);
  493. seeks = 0;
  494. scans = 0;
  495. skips = 0;
  496. nullSkips = 0;
  497. wildseeks = 0;
  498. transformSegs = false;
  499. activitySegs = &segs;
  500. }
  501. ~CKeyLevelManager()
  502. {
  503. free (keyBuffer);
  504. ::Release(keyCursor);
  505. }
  506. virtual unsigned querySeeks() const
  507. {
  508. return seeks;
  509. }
  510. virtual unsigned queryScans() const
  511. {
  512. return scans;
  513. }
  514. virtual unsigned querySkips() const
  515. {
  516. return skips;
  517. }
  518. virtual unsigned queryNullSkips() const
  519. {
  520. return nullSkips;
  521. }
  522. virtual void resetCounts()
  523. {
  524. scans = 0;
  525. seeks = 0;
  526. skips = 0;
  527. nullSkips = 0;
  528. wildseeks = 0;
  529. }
  530. void setKey(IKeyIndexBase * _key)
  531. {
  532. ::Release(keyCursor);
  533. keyCursor = NULL;
  534. if (_key)
  535. {
  536. assertex(_key->numParts()==1);
  537. IKeyIndex *ki = _key->queryPart(0);
  538. keyCursor = ki->getCursor(ctx);
  539. keyName.set(ki->queryFileName());
  540. if (!keyBuffer)
  541. {
  542. keySize = ki->keySize();
  543. keyedSize = ki->keyedSize();
  544. assertex(keySize);
  545. keyBuffer = (char *) malloc(keySize);
  546. }
  547. else
  548. {
  549. assertex(keyedSize==ki->keyedSize());
  550. assertex(keySize==ki->keySize());
  551. }
  552. if (eclKeySize && (0 == (ki->getFlags() & HTREE_VARSIZE)) && (eclKeySize != keySize))
  553. {
  554. StringBuffer err;
  555. err.appendf("Key size mismatch - key file (%s) indicates record size should be %d, but ECL declaration was %d", keyName.get(), keySize, eclKeySize);
  556. IException *e = MakeStringExceptionDirect(1000, err.str());
  557. EXCLOG(e, err.str());
  558. throw e;
  559. }
  560. }
  561. }
  562. virtual void reset(bool crappyHack)
  563. {
  564. if (keyCursor)
  565. {
  566. if (!started)
  567. {
  568. started = true;
  569. segs.checkSize(keyedSize, keyName.get());
  570. numsegs = segs.segMonitors.length();
  571. }
  572. if (!crappyHack)
  573. {
  574. matched = false;
  575. eof = false;
  576. setLow(0);
  577. keyCursor->reset();
  578. }
  579. }
  580. }
  581. virtual void releaseSegmentMonitors()
  582. {
  583. segs.segMonitors.kill();
  584. started = false;
  585. if(layoutTransSegCtx)
  586. layoutTransSegCtx->reset();
  587. }
  588. virtual void append(IKeySegmentMonitor *segment)
  589. {
  590. assertex(!started);
  591. activitySegs->append(segment);
  592. }
  593. virtual unsigned ordinality() const
  594. {
  595. return activitySegs->ordinality();
  596. }
  597. virtual IKeySegmentMonitor *item(unsigned idx) const
  598. {
  599. return activitySegs->item(idx);
  600. }
  601. inline const byte *queryKeyBuffer(offset_t & fpos)
  602. {
  603. fpos = lookupFpos;
  604. if(layoutTrans)
  605. return doRowLayoutTransform(fpos);
  606. else
  607. return reinterpret_cast<byte const *>(keyBuffer);
  608. }
  609. inline offset_t queryFpos()
  610. {
  611. return lookupFpos;
  612. }
  613. inline unsigned queryRecordSize() { return keySize; }
  614. bool _lookup(bool exact, unsigned lastSeg)
  615. {
  616. bool ret = false;
  617. unsigned lwildseeks = 0;
  618. unsigned lseeks = 0;
  619. unsigned lscans = 0;
  620. while (!eof)
  621. {
  622. bool ok;
  623. if (matched)
  624. {
  625. ok = keyCursor->next(keyBuffer);
  626. lscans++;
  627. }
  628. else
  629. {
  630. ok = keyCursor->gtEqual(keyBuffer, keyBuffer, true);
  631. lseeks++;
  632. }
  633. if (ok)
  634. {
  635. lookupFpos = keyCursor->getFPos();
  636. unsigned i = 0;
  637. matched = true;
  638. for (; i <= lastSeg; i++)
  639. {
  640. matched = segs.segMonitors.item(i).matches(keyBuffer);
  641. if (!matched)
  642. break;
  643. }
  644. if (matched)
  645. {
  646. ret = true;
  647. break;
  648. }
  649. #ifdef __linux__
  650. if (linuxYield)
  651. sched_yield();
  652. #endif
  653. eof = !incrementKey(i);
  654. if (!exact)
  655. {
  656. ret = true;
  657. break;
  658. }
  659. lwildseeks++;
  660. }
  661. else
  662. eof = true;
  663. }
  664. if (logExcessiveSeeks && lwildseeks > 1000)
  665. reportExcessiveSeeks(lwildseeks, lastSeg);
  666. noteSeeks(lseeks, lscans, lwildseeks);
  667. return ret;
  668. }
  669. virtual bool lookup(bool exact)
  670. {
  671. if (keyCursor)
  672. return _lookup(exact, segs.lastRealSeg());
  673. else
  674. return false;
  675. }
  676. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  677. {
  678. if (keyCursor)
  679. {
  680. if (skipTo(seek, seekOffset, seeklen))
  681. noteSkips(1, 0);
  682. else
  683. noteSkips(0, 1);
  684. bool ret = _lookup(true, segs.lastRealSeg());
  685. #ifdef _DEBUG
  686. if (traceSmartStepping)
  687. {
  688. StringBuffer recstr;
  689. unsigned i;
  690. for (i = 0; i < keySize; i++)
  691. {
  692. unsigned char c = ((unsigned char *) keyBuffer)[i];
  693. recstr.appendf("%c", isprint(c) ? c : '.');
  694. }
  695. recstr.append (" ");
  696. for (i = 0; i < keySize; i++)
  697. {
  698. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  699. }
  700. DBGLOG("SKIP: Got skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  701. }
  702. #endif
  703. return ret;
  704. }
  705. else
  706. return false;
  707. }
  708. unsigned __int64 getCount()
  709. {
  710. matched = false;
  711. eof = false;
  712. setLow(0);
  713. keyCursor->reset();
  714. unsigned __int64 result = 0;
  715. unsigned lseeks = 0;
  716. if (keyCursor)
  717. {
  718. unsigned lastRealSeg = segs.lastRealSeg();
  719. loop
  720. {
  721. if (_lookup(true, lastRealSeg))
  722. {
  723. unsigned __int64 locount = keyCursor->getSequence();
  724. endRange(lastRealSeg);
  725. keyCursor->ltEqual(keyBuffer, NULL, true);
  726. lseeks++;
  727. result += keyCursor->getSequence()-locount+1;
  728. if (!incrementKey(lastRealSeg))
  729. break;
  730. matched = false;
  731. }
  732. else
  733. break;
  734. }
  735. }
  736. noteSeeks(lseeks, 0, 0);
  737. return result;
  738. }
  739. unsigned __int64 getCurrentRangeCount(unsigned groupSegCount)
  740. {
  741. unsigned __int64 result = 0;
  742. if (keyCursor)
  743. {
  744. unsigned __int64 locount = keyCursor->getSequence();
  745. endRange(groupSegCount);
  746. keyCursor->ltEqual(keyBuffer, NULL, true);
  747. result = keyCursor->getSequence()-locount+1;
  748. noteSeeks(1, 0, 0);
  749. }
  750. return result;
  751. }
  752. bool nextRange(unsigned groupSegCount)
  753. {
  754. if (!incrementKey(groupSegCount-1))
  755. return false;
  756. matched = false;
  757. return true;
  758. }
  759. unsigned __int64 checkCount(unsigned __int64 max)
  760. {
  761. matched = false;
  762. eof = false;
  763. setLow(0);
  764. keyCursor->reset();
  765. unsigned __int64 result = 0;
  766. unsigned lseeks = 0;
  767. if (keyCursor)
  768. {
  769. unsigned lastFullSeg = segs.lastFullSeg();
  770. if (lastFullSeg == (unsigned) -1)
  771. {
  772. noteSeeks(1, 0, 0);
  773. if (keyCursor->last(NULL))
  774. return keyCursor->getSequence()+1;
  775. else
  776. return 0;
  777. }
  778. loop
  779. {
  780. if (_lookup(true, lastFullSeg))
  781. {
  782. unsigned __int64 locount = keyCursor->getSequence();
  783. endRange(lastFullSeg);
  784. keyCursor->ltEqual(keyBuffer, NULL, true);
  785. lseeks++;
  786. result += keyCursor->getSequence()-locount+1;
  787. if (max && (result > max))
  788. break;
  789. if (!incrementKey(lastFullSeg))
  790. break;
  791. matched = false;
  792. }
  793. else
  794. break;
  795. }
  796. }
  797. noteSeeks(lseeks, 0, 0);
  798. return result;
  799. }
  800. virtual void serializeCursorPos(MemoryBuffer &mb)
  801. {
  802. mb.append(eof);
  803. if (!eof)
  804. {
  805. keyCursor->serializeCursorPos(mb);
  806. mb.append(matched);
  807. }
  808. }
  809. virtual void deserializeCursorPos(MemoryBuffer &mb)
  810. {
  811. mb.read(eof);
  812. if (!eof)
  813. {
  814. assertex(keyBuffer);
  815. keyCursor->deserializeCursorPos(mb, keyBuffer);
  816. mb.read(matched);
  817. }
  818. }
  819. virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  820. {
  821. return keyCursor->loadBlob(blobid, blobsize);
  822. }
  823. virtual void releaseBlobs()
  824. {
  825. if (keyCursor)
  826. keyCursor->releaseBlobs();
  827. }
  828. virtual void setLayoutTranslator(IRecordLayoutTranslator * trans)
  829. {
  830. layoutTrans.set(trans);
  831. if(layoutTrans && layoutTrans->queryKeysTransformed())
  832. {
  833. transformSegs = true;
  834. layoutTransSegCtx.setown(layoutTrans->getSegmentMonitorContext());
  835. activitySegs = layoutTransSegCtx;
  836. }
  837. else
  838. {
  839. transformSegs = false;
  840. layoutTransSegCtx.clear();
  841. activitySegs = &segs;
  842. }
  843. if(layoutTrans)
  844. {
  845. layoutTransRowCtx.setown(layoutTrans->getRowTransformContext());
  846. }
  847. else
  848. {
  849. layoutTransRowCtx.clear();
  850. }
  851. }
  852. virtual void setMergeBarrier(unsigned offset)
  853. {
  854. activitySegs->setMergeBarrier(offset);
  855. }
  856. virtual void finishSegmentMonitors()
  857. {
  858. if(transformSegs)
  859. {
  860. layoutTrans->createDiskSegmentMonitors(*layoutTransSegCtx, segs);
  861. if(!layoutTrans->querySuccess())
  862. {
  863. StringBuffer err;
  864. err.append("Could not translate index read filters during layout translation of index ").append(keyName.get()).append(": ");
  865. layoutTrans->queryFailure().getDetail(err);
  866. throw MakeStringExceptionDirect(0, err.str());
  867. }
  868. }
  869. segs.finish();
  870. }
  871. };
  872. ///////////////////////////////////////////////////////////////////////////////
  873. ///////////////////////////////////////////////////////////////////////////////
  874. // For some reason #pragma pack does not seem to work here. Force all elements to 8 bytes
  875. class CKeyIdAndPos
  876. {
  877. public:
  878. unsigned __int64 keyId;
  879. offset_t pos;
  880. CKeyIdAndPos(unsigned __int64 _keyId, offset_t _pos) { keyId = _keyId; pos = _pos; }
  881. bool operator==(const CKeyIdAndPos &other) { return keyId == other.keyId && pos == other.pos; }
  882. };
  883. class CNodeMapping : public HTMapping<CJHTreeNode, CKeyIdAndPos>
  884. {
  885. public:
  886. CNodeMapping(CKeyIdAndPos &fp, CJHTreeNode &et) : HTMapping<CJHTreeNode, CKeyIdAndPos>(et, fp) { }
  887. ~CNodeMapping() { this->et.Release(); }
  888. CJHTreeNode &query() { return queryElement(); }
  889. };
  890. typedef OwningSimpleHashTableOf<CNodeMapping, CKeyIdAndPos> CNodeTable;
  891. #define FIXED_NODE_OVERHEAD (sizeof(CJHTreeNode))
  892. class CNodeMRUCache : public CMRUCacheOf<CKeyIdAndPos, CJHTreeNode, CNodeMapping, CNodeTable>
  893. {
  894. size32_t sizeInMem, memLimit;
  895. public:
  896. CNodeMRUCache(size32_t _memLimit)
  897. {
  898. sizeInMem = 0;
  899. setMemLimit(_memLimit);
  900. }
  901. size32_t setMemLimit(size32_t _memLimit)
  902. {
  903. size32_t oldMemLimit = memLimit;
  904. memLimit = _memLimit;
  905. if (full())
  906. makeSpace();
  907. return oldMemLimit;
  908. }
  909. virtual void makeSpace()
  910. {
  911. // remove LRU until !full
  912. do
  913. {
  914. clear(1);
  915. }
  916. while (full());
  917. }
  918. virtual bool full()
  919. {
  920. if (((size32_t)-1) == memLimit) return false;
  921. return sizeInMem > memLimit;
  922. }
  923. virtual void elementAdded(CNodeMapping *mapping)
  924. {
  925. CJHTreeNode &node = mapping->queryElement();
  926. sizeInMem += (FIXED_NODE_OVERHEAD+node.getMemSize());
  927. }
  928. virtual void elementRemoved(CNodeMapping *mapping)
  929. {
  930. CJHTreeNode &node = mapping->queryElement();
  931. sizeInMem -= (FIXED_NODE_OVERHEAD+node.getMemSize());
  932. }
  933. };
  934. class CNodeCache : public CInterface
  935. {
  936. private:
  937. mutable CriticalSection lock;
  938. CNodeMRUCache nodeCache;
  939. CNodeMRUCache leafCache;
  940. CNodeMRUCache blobCache;
  941. CNodeMRUCache preloadCache;
  942. bool cacheNodes;
  943. bool cacheLeaves;
  944. bool cacheBlobs;
  945. bool preloadNodes;
  946. public:
  947. CNodeCache(size32_t maxNodeMem, size32_t maxLeaveMem, size32_t maxBlobMem)
  948. : nodeCache(maxNodeMem), leafCache(maxLeaveMem), blobCache(maxBlobMem), preloadCache((unsigned) -1)
  949. {
  950. cacheNodes = maxNodeMem != 0;
  951. cacheLeaves = maxLeaveMem != 0;;
  952. cacheBlobs = maxBlobMem != 0;
  953. preloadNodes = false;
  954. // note that each index caches the last blob it unpacked so that sequential blobfetches are still ok
  955. }
  956. CJHTreeNode *getNode(INodeLoader *key, int keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
  957. void preload(CJHTreeNode *node, int keyID, offset_t pos, IContextLogger *ctx);
  958. bool isPreloaded(int keyID, offset_t pos);
  959. inline bool getNodeCachePreload()
  960. {
  961. return preloadNodes;
  962. }
  963. inline bool setNodeCachePreload(bool _preload)
  964. {
  965. bool oldPreloadNodes = preloadNodes;
  966. preloadNodes = _preload;
  967. return oldPreloadNodes;
  968. }
  969. inline size32_t setNodeCacheMem(size32_t newSize)
  970. {
  971. CriticalBlock block(lock);
  972. unsigned oldV = nodeCache.setMemLimit(newSize);
  973. cacheNodes = (newSize != 0);
  974. return oldV;
  975. }
  976. inline size32_t setLeafCacheMem(size32_t newSize)
  977. {
  978. CriticalBlock block(lock);
  979. unsigned oldV = leafCache.setMemLimit(newSize);
  980. cacheLeaves = (newSize != 0);
  981. return oldV;
  982. }
  983. inline size32_t setBlobCacheMem(size32_t newSize)
  984. {
  985. CriticalBlock block(lock);
  986. unsigned oldV = blobCache.setMemLimit(newSize);
  987. cacheBlobs = (newSize != 0);
  988. return oldV;
  989. }
  990. void clear()
  991. {
  992. CriticalBlock block(lock);
  993. nodeCache.kill();
  994. leafCache.kill();
  995. blobCache.kill();
  996. }
  997. };
  998. static inline CNodeCache *queryNodeCache()
  999. {
  1000. if (nodeCache) return nodeCache; // avoid crit
  1001. CriticalBlock b(*initCrit);
  1002. if (!nodeCache) nodeCache = new CNodeCache(100*0x100000, 50*0x100000, 0);
  1003. return nodeCache;
  1004. }
  1005. void clearNodeCache()
  1006. {
  1007. queryNodeCache()->clear();
  1008. }
  1009. inline CKeyStore *queryKeyStore()
  1010. {
  1011. if (keyStore) return keyStore; // avoid crit
  1012. CriticalBlock b(*initCrit);
  1013. if (!keyStore) keyStore = new CKeyStore;
  1014. return keyStore;
  1015. }
  1016. unsigned setKeyIndexCacheSize(unsigned limit)
  1017. {
  1018. return queryKeyStore()->setKeyCacheLimit(limit);
  1019. }
  1020. CKeyStore::CKeyStore() : keyIndexCache(defaultKeyIndexLimit)
  1021. {
  1022. nextId = 0;
  1023. #if 0
  1024. mm.setown(createSharedMemoryManager("RichardsSharedMemManager", 0x100000));
  1025. try
  1026. {
  1027. if (mm)
  1028. sharedCache.setown(mm->share());
  1029. }
  1030. catch (IException *E)
  1031. {
  1032. E->Release();
  1033. }
  1034. #endif
  1035. }
  1036. CKeyStore::~CKeyStore()
  1037. {
  1038. }
  1039. unsigned CKeyStore::setKeyCacheLimit(unsigned limit)
  1040. {
  1041. return keyIndexCache.setCacheLimit(limit);
  1042. }
  1043. IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  1044. {
  1045. // isTLK provided by caller since flags in key header unreliable. If either say it's a TLK, I believe it.
  1046. {
  1047. MTIME_SECTION(queryActiveTimer(), "CKeyStore_load");
  1048. IKeyIndex *keyIndex;
  1049. // MORE - holds onto the mutex way too long
  1050. synchronized block(mutex);
  1051. StringBuffer fname;
  1052. fname.append(fileName).append('/').append(crc);
  1053. keyIndex = keyIndexCache.query(fname);
  1054. if (NULL == keyIndex)
  1055. {
  1056. if (iMappedFile)
  1057. {
  1058. assert(!iFileIO && !part);
  1059. keyIndex = new CMemKeyIndex(getUniqId(), LINK(iMappedFile), fname, isTLK);
  1060. }
  1061. else if (iFileIO)
  1062. {
  1063. assert(!part);
  1064. keyIndex = new CDiskKeyIndex(getUniqId(), LINK(iFileIO), fname, isTLK, allowPreload);
  1065. }
  1066. else
  1067. {
  1068. Owned<IFile> iFile;
  1069. if (part)
  1070. {
  1071. iFile.setown(part->open());
  1072. if (NULL == iFile.get())
  1073. throw MakeStringException(0, "Failed to open index file %s", fileName);
  1074. }
  1075. else
  1076. iFile.setown(createIFile(fileName));
  1077. IFileIO *fio = iFile->open(IFOread);
  1078. if (fio)
  1079. keyIndex = new CDiskKeyIndex(getUniqId(), fio, fname, isTLK, allowPreload);
  1080. else
  1081. throw MakeStringException(0, "Failed to open index file %s", fileName);
  1082. }
  1083. keyIndexCache.add(fname, *LINK(keyIndex));
  1084. }
  1085. else
  1086. {
  1087. LINK(keyIndex);
  1088. }
  1089. assertex(NULL != keyIndex);
  1090. return keyIndex;
  1091. }
  1092. }
  1093. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload)
  1094. {
  1095. return doload(fileName, crc, NULL, iFileIO, NULL, isTLK, allowPreload);
  1096. }
  1097. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  1098. {
  1099. return doload(fileName, crc, NULL, NULL, iMappedFile, isTLK, allowPreload);
  1100. }
  1101. // fileName+crc used only as key for cache
  1102. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload)
  1103. {
  1104. return doload(fileName, crc, &part, NULL, NULL, isTLK, allowPreload);
  1105. }
  1106. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload)
  1107. {
  1108. return doload(fileName, crc, NULL, NULL, NULL, isTLK, allowPreload);
  1109. }
  1110. StringBuffer &CKeyStore::getMetrics(StringBuffer &xml)
  1111. {
  1112. xml.append(" <IndexMetrics>\n");
  1113. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1114. ForEach(*iter)
  1115. {
  1116. CKeyIndexMapping &mapping = iter->query();
  1117. IKeyIndex &index = mapping.query();
  1118. const char *name = mapping.queryFindString();
  1119. xml.appendf(" <Index name=\"%s\" scans=\"%d\" seeks=\"%d\"/>\n", name, index.queryScans(), index.querySeeks());
  1120. }
  1121. xml.append(" </IndexMetrics>\n");
  1122. return xml;
  1123. }
  1124. void CKeyStore::resetMetrics()
  1125. {
  1126. synchronized block(mutex);
  1127. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1128. ForEach(*iter)
  1129. {
  1130. CKeyIndexMapping &mapping = iter->query();
  1131. IKeyIndex &index = mapping.query();
  1132. index.resetCounts();
  1133. }
  1134. }
  1135. void CKeyStore::clearCache(bool killAll)
  1136. {
  1137. synchronized block(mutex);
  1138. if (killAll)
  1139. keyIndexCache.kill();
  1140. else
  1141. {
  1142. StringArray goers;
  1143. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1144. ForEach(*iter)
  1145. {
  1146. CKeyIndexMapping &mapping = iter->query();
  1147. IKeyIndex &index = mapping.query();
  1148. if (!index.IsShared())
  1149. {
  1150. const char *name = mapping.queryFindString();
  1151. goers.append(name);
  1152. }
  1153. }
  1154. ForEachItemIn(idx, goers)
  1155. {
  1156. keyIndexCache.remove(goers.item(idx));
  1157. }
  1158. }
  1159. }
  1160. void CKeyStore::clearCacheEntry(const char *keyName)
  1161. {
  1162. if (!keyName || !*keyName)
  1163. return; // nothing to do
  1164. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1165. StringArray goers;
  1166. ForEach(*iter)
  1167. {
  1168. CKeyIndexMapping &mapping = iter->query();
  1169. IKeyIndex &index = mapping.query();
  1170. if (!index.IsShared())
  1171. {
  1172. const char *name = mapping.queryFindString();
  1173. if (strstr(name, keyName) != 0) // keyName doesn't have drive or part number associated with it
  1174. goers.append(name);
  1175. }
  1176. }
  1177. ForEachItemIn(idx, goers)
  1178. {
  1179. keyIndexCache.remove(goers.item(idx));
  1180. }
  1181. }
  1182. // CKeyIndex impl.
  1183. CKeyIndex::CKeyIndex(int _iD, const char *_name) : name(_name)
  1184. {
  1185. iD = _iD;
  1186. cache = queryNodeCache(); // use one node cache for all key indexes;
  1187. cache->Link();
  1188. keyHdr = NULL;
  1189. rootNode = NULL;
  1190. cachedBlobNodePos = 0;
  1191. atomic_set(&keySeeks, 0);
  1192. atomic_set(&keyScans, 0);
  1193. latestGetNodeOffset = 0;
  1194. }
  1195. void CKeyIndex::cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK)
  1196. {
  1197. bool first = true;
  1198. while (nodePos)
  1199. {
  1200. Owned<CJHTreeNode> node = loadNode(nodePos);
  1201. if (node->isLeaf())
  1202. {
  1203. if (!isTLK)
  1204. return;
  1205. }
  1206. else if (first)
  1207. {
  1208. cacheNodes(cache, node->getFPosAt(0), isTLK);
  1209. first = false;
  1210. }
  1211. cache->preload(node, iD, nodePos, NULL);
  1212. nodePos = node->getRightSib();
  1213. }
  1214. }
  1215. void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
  1216. {
  1217. if (isTLK)
  1218. hdr.ktype |= HTREE_TOPLEVEL_KEY; // thor does not set
  1219. keyHdr = new CKeyHdr();
  1220. try
  1221. {
  1222. keyHdr->load(hdr);
  1223. }
  1224. catch (IKeyException *ke)
  1225. {
  1226. if (!name.get()) throw;
  1227. StringBuffer msg;
  1228. IKeyException *ke2 = MakeKeyException(ke->errorCode(), "%s. In key '%s'.", ke->errorMessage(msg).str(), name.get());
  1229. ke->Release();
  1230. throw ke2;
  1231. }
  1232. offset_t rootPos = keyHdr->getRootFPos();
  1233. Linked<CNodeCache> nodeCache = queryNodeCache();
  1234. if (allowPreload)
  1235. {
  1236. if (nodeCache->getNodeCachePreload() && !nodeCache->isPreloaded(iD, rootPos))
  1237. {
  1238. cacheNodes(nodeCache, rootPos, isTLK);
  1239. }
  1240. }
  1241. rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
  1242. }
  1243. CKeyIndex::~CKeyIndex()
  1244. {
  1245. ::Release(keyHdr);
  1246. ::Release(cache);
  1247. ::Release(rootNode);
  1248. }
  1249. CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
  1250. : CKeyIndex(_iD, _name)
  1251. {
  1252. io.setown(_io);
  1253. assertex(io->offset()==0); // mapped whole file
  1254. assertex(io->length()==io->fileSize()); // mapped whole file
  1255. KeyHdr hdr;
  1256. if (io->length() < sizeof(hdr))
  1257. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  1258. memcpy(&hdr, io->base(), sizeof(hdr));
  1259. init(hdr, isTLK, false);
  1260. }
  1261. CJHTreeNode *CMemKeyIndex::loadNode(offset_t pos)
  1262. {
  1263. atomic_inc(&nodesLoaded);
  1264. if (pos + keyHdr->getNodeSize() > io->fileSize())
  1265. {
  1266. IException *E = MakeStringException(errno, "Error reading node at position %"I64F"x past EOF", pos);
  1267. StringBuffer m;
  1268. m.appendf("In key %s, position 0x%"I64F"x", name.get(), pos);
  1269. EXCLOG(E, m.str());
  1270. throw E;
  1271. }
  1272. char *nodeData = (char *) (io->base() + pos);
  1273. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  1274. return CKeyIndex::loadNode(nodeData, pos, false);
  1275. }
  1276. CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
  1277. : CKeyIndex(_iD, _name)
  1278. {
  1279. io.setown(_io);
  1280. KeyHdr hdr;
  1281. if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
  1282. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  1283. init(hdr, isTLK, allowPreload);
  1284. }
  1285. CJHTreeNode *CDiskKeyIndex::loadNode(offset_t pos)
  1286. {
  1287. atomic_inc(&nodesLoaded);
  1288. unsigned nodeSize = keyHdr->getNodeSize();
  1289. MemoryAttr ma;
  1290. char *nodeData = (char *) ma.allocate(nodeSize);
  1291. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  1292. if (io->read(pos, nodeSize, nodeData) != nodeSize)
  1293. {
  1294. IException *E = MakeStringException(errno, "Error %d reading node at position %"I64F"x", errno, pos);
  1295. StringBuffer m;
  1296. m.appendf("In key %s, position 0x%"I64F"x", name.get(), pos);
  1297. EXCLOG(E, m.str());
  1298. throw E;
  1299. }
  1300. return CKeyIndex::loadNode(nodeData, pos, true);
  1301. }
  1302. CJHTreeNode *CKeyIndex::loadNode(char *nodeData, offset_t pos, bool needsCopy)
  1303. {
  1304. try
  1305. {
  1306. Owned<CJHTreeNode> ret;
  1307. char leafFlag = ((NodeHdr *) nodeData)->leafFlag;
  1308. switch(leafFlag)
  1309. {
  1310. case 0:
  1311. ret.setown(new CJHTreeNode());
  1312. break;
  1313. case 1:
  1314. if (keyHdr->isVariable())
  1315. ret.setown(new CJHVarTreeNode());
  1316. else
  1317. ret.setown(new CJHTreeNode());
  1318. break;
  1319. case 2:
  1320. ret.setown(new CJHTreeBlobNode());
  1321. break;
  1322. case 3:
  1323. ret.setown(new CJHTreeMetadataNode());
  1324. break;
  1325. default:
  1326. throwUnexpected();
  1327. }
  1328. {
  1329. MTIME_SECTION(queryActiveTimer(), "JHTREE load node");
  1330. ret->load(keyHdr, nodeData, pos, true);
  1331. }
  1332. return ret.getClear();
  1333. }
  1334. catch (IException *E)
  1335. {
  1336. StringBuffer m;
  1337. m.appendf("In key %s, position 0x%"I64F"x", name.get(), pos);
  1338. EXCLOG(E, m.str());
  1339. throw;
  1340. }
  1341. catch (...)
  1342. {
  1343. DBGLOG("Unknown exception in key %s, position 0x%"I64F"x", name.get(), pos);
  1344. throw;
  1345. }
  1346. }
  1347. bool CKeyIndex::isTopLevelKey()
  1348. {
  1349. return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0;
  1350. }
  1351. bool CKeyIndex::isFullySorted()
  1352. {
  1353. return (keyHdr->getKeyType() & HTREE_FULLSORT_KEY) != 0;
  1354. }
  1355. IKeyCursor *CKeyIndex::getCursor(IContextLogger *ctx)
  1356. {
  1357. return new CKeyCursor(*this, ctx); // MORE - pool them?
  1358. }
  1359. CJHTreeNode *CKeyIndex::getNode(offset_t offset, IContextLogger *ctx)
  1360. {
  1361. latestGetNodeOffset = offset;
  1362. return cache->getNode(this, iD, offset, ctx, isTopLevelKey());
  1363. }
  1364. void dumpNode(FILE *out, CJHTreeNode *node, int length, unsigned rowCount, bool raw)
  1365. {
  1366. if (!raw)
  1367. fprintf(out, "Node dump: fpos(%"I64F"d) leaf(%d)\n", node->getFpos(), node->isLeaf());
  1368. if (rowCount==0 || rowCount > node->getNumKeys())
  1369. rowCount = node->getNumKeys();
  1370. for (unsigned int i=0; i<rowCount; i++)
  1371. {
  1372. char *dst = (char *) alloca(node->getKeyLen()+50);
  1373. node->getValueAt(i, dst);
  1374. if (raw)
  1375. {
  1376. fwrite(dst, 1, length, out);
  1377. }
  1378. else
  1379. {
  1380. offset_t pos = node->getFPosAt(i);
  1381. StringBuffer s;
  1382. appendURL(&s, dst, length, true);
  1383. fprintf(out, "keyVal %d [%"I64F"d] = %s\n", i, pos, s.toCharArray());
  1384. }
  1385. }
  1386. if (!raw)
  1387. fprintf(out, "==========\n");
  1388. }
  1389. void CKeyIndex::dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw)
  1390. {
  1391. Owned<CJHTreeNode> node = loadNode(pos);
  1392. ::dumpNode(out, node, keySize(), count, isRaw);
  1393. }
  1394. size32_t CKeyIndex::keySize()
  1395. {
  1396. return keyHdr->getMaxKeyLength();
  1397. }
  1398. size32_t CKeyIndex::keyedSize()
  1399. {
  1400. return keyHdr->getNodeKeyLength();
  1401. }
  1402. bool CKeyIndex::hasPayload()
  1403. {
  1404. return keyHdr->hasPayload();
  1405. }
  1406. CJHTreeBlobNode *CKeyIndex::getBlobNode(offset_t nodepos)
  1407. {
  1408. CriticalBlock b(blobCacheCrit);
  1409. if (nodepos != cachedBlobNodePos)
  1410. {
  1411. cachedBlobNode.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeBlobNode)); // note - don't use the cache
  1412. cachedBlobNodePos = nodepos;
  1413. }
  1414. return cachedBlobNode.getLink();
  1415. }
  1416. const byte *CKeyIndex::loadBlob(unsigned __int64 blobid, size32_t &blobSize)
  1417. {
  1418. offset_t nodepos = blobid & I64C(0xffffffffffff);
  1419. size32_t offset = (size32_t) ((blobid & I64C(0xffff000000000000)) >> 44);
  1420. Owned<CJHTreeBlobNode> blobNode = getBlobNode(nodepos);
  1421. size32_t sizeRemaining = blobNode->getTotalBlobSize(offset);
  1422. blobSize = sizeRemaining;
  1423. byte *ret = (byte *) malloc(sizeRemaining);
  1424. byte *finger = ret;
  1425. loop
  1426. {
  1427. size32_t gotHere = blobNode->getBlobData(offset, finger);
  1428. assertex(gotHere <= sizeRemaining);
  1429. sizeRemaining -= gotHere;
  1430. finger += gotHere;
  1431. if (!sizeRemaining)
  1432. break;
  1433. blobNode.setown(getBlobNode(blobNode->getRightSib()));
  1434. offset = 0;
  1435. }
  1436. return ret;
  1437. }
  1438. offset_t CKeyIndex::queryMetadataHead()
  1439. {
  1440. offset_t ret = keyHdr->getHdrStruct()->metadataHead;
  1441. if(ret == static_cast<offset_t>(-1)) ret = 0; // index created before introduction of metadata would have FFFF... in this space
  1442. return ret;
  1443. }
  1444. IPropertyTree * CKeyIndex::getMetadata()
  1445. {
  1446. offset_t nodepos = queryMetadataHead();
  1447. if(!nodepos)
  1448. return NULL;
  1449. Owned<CJHTreeMetadataNode> node;
  1450. StringBuffer xml;
  1451. while(nodepos)
  1452. {
  1453. node.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeMetadataNode));
  1454. node->get(xml);
  1455. nodepos = node->getRightSib();
  1456. }
  1457. IPropertyTree * ret;
  1458. try
  1459. {
  1460. ret = createPTreeFromXMLString(xml.str());
  1461. }
  1462. catch(IPTreeReadException * e)
  1463. {
  1464. StringBuffer emsg;
  1465. IException * wrapped = MakeStringException(e->errorAudience(), e->errorCode(), "Error retrieving XML metadata: %s", e->errorMessage(emsg).str());
  1466. e->Release();
  1467. throw wrapped;
  1468. }
  1469. return ret;
  1470. }
  1471. CKeyCursor::CKeyCursor(CKeyIndex &_key, IContextLogger *_ctx)
  1472. : key(_key), ctx(_ctx)
  1473. {
  1474. key.Link();
  1475. nodeKey = 0;
  1476. }
  1477. CKeyCursor::~CKeyCursor()
  1478. {
  1479. key.Release();
  1480. releaseBlobs();
  1481. }
  1482. void CKeyCursor::reset()
  1483. {
  1484. node.clear();
  1485. }
  1486. CJHTreeNode *CKeyCursor::locateFirstNode()
  1487. {
  1488. CJHTreeNode * n = 0;
  1489. CJHTreeNode * p = LINK(key.rootNode);
  1490. while (p != 0)
  1491. {
  1492. n = p;
  1493. p = key.getNode(n->prevNodeFpos(), ctx);
  1494. if (p != 0)
  1495. n->Release();
  1496. }
  1497. return n;
  1498. }
  1499. CJHTreeNode *CKeyCursor::locateLastNode()
  1500. {
  1501. CJHTreeNode * n = 0;
  1502. CJHTreeNode * p = LINK(key.rootNode);
  1503. while (p != 0)
  1504. {
  1505. n = p;
  1506. p = key.getNode(n->nextNodeFpos(), ctx);
  1507. if (p != 0)
  1508. n->Release();
  1509. }
  1510. return n;
  1511. }
  1512. bool CKeyCursor::next(char *dst)
  1513. {
  1514. if (!node)
  1515. return first(dst);
  1516. else
  1517. {
  1518. atomic_inc(&key.keyScans);
  1519. if (!node->getValueAt( ++nodeKey, dst))
  1520. {
  1521. offset_t rsib = node->getRightSib();
  1522. node.clear();
  1523. if (rsib != 0)
  1524. {
  1525. node.setown(key.getNode(rsib, ctx));
  1526. if (node != NULL)
  1527. {
  1528. nodeKey = 0;
  1529. return node->getValueAt(0, dst);
  1530. }
  1531. }
  1532. return false;
  1533. }
  1534. else
  1535. return true;
  1536. }
  1537. }
  1538. bool CKeyCursor::prev(char *dst)
  1539. {
  1540. if (!node)
  1541. return last(dst); // Note - this used to say First - surely a typo
  1542. else
  1543. {
  1544. atomic_inc(&key.keyScans);
  1545. if (!nodeKey)
  1546. {
  1547. offset_t lsib = node->getLeftSib();
  1548. node.clear();
  1549. if (lsib != 0)
  1550. {
  1551. node.setown(key.getNode(lsib, ctx));
  1552. if (node)
  1553. {
  1554. nodeKey = node->getNumKeys()-1;
  1555. return node->getValueAt(nodeKey, dst );
  1556. }
  1557. }
  1558. return false;
  1559. }
  1560. else
  1561. return node->getValueAt(--nodeKey, dst);
  1562. }
  1563. }
  1564. size32_t CKeyCursor::getSize()
  1565. {
  1566. assertex(node);
  1567. return node->getSizeAt(nodeKey);
  1568. }
  1569. offset_t CKeyCursor::getFPos()
  1570. {
  1571. assertex(node);
  1572. return node->getFPosAt(nodeKey);
  1573. }
  1574. unsigned __int64 CKeyCursor::getSequence()
  1575. {
  1576. assertex(node);
  1577. return node->getSequence(nodeKey);
  1578. }
  1579. bool CKeyCursor::first(char *dst)
  1580. {
  1581. atomic_inc(&key.keySeeks);
  1582. node.setown(locateFirstNode());
  1583. nodeKey = 0;
  1584. return node->getValueAt(nodeKey, dst);
  1585. }
  1586. bool CKeyCursor::last(char *dst)
  1587. {
  1588. atomic_inc(&key.keySeeks);
  1589. node.setown(locateLastNode());
  1590. nodeKey = node->getNumKeys()-1;
  1591. return node->getValueAt( nodeKey, dst );
  1592. }
  1593. bool CKeyCursor::gtEqual(const char *src, char *dst, bool seekForward)
  1594. {
  1595. atomic_inc(&key.keySeeks);
  1596. unsigned lwm = 0;
  1597. if (seekForward && node)
  1598. {
  1599. // When seeking forward, there are two cases worth optimizing:
  1600. // 1. the next record is actually the one we want
  1601. // 2. The record we want is on the current page
  1602. unsigned numKeys = node->getNumKeys();
  1603. if (nodeKey < numKeys-1)
  1604. {
  1605. int rc = node->compareValueAt(src, ++nodeKey);
  1606. if (rc <= 0)
  1607. {
  1608. node->getValueAt(nodeKey, dst);
  1609. return true;
  1610. }
  1611. if (nodeKey < numKeys-1)
  1612. {
  1613. rc = node->compareValueAt(src, numKeys-1);
  1614. if (rc <= 0)
  1615. lwm = nodeKey+1;
  1616. }
  1617. }
  1618. }
  1619. if (!lwm)
  1620. node.set(key.rootNode);
  1621. loop
  1622. {
  1623. unsigned int a = lwm;
  1624. int b = node->getNumKeys();
  1625. // first search for first GTE entry (result in b(<),a(>=))
  1626. while ((int)a<b)
  1627. {
  1628. int i = (a+b)/2;
  1629. int rc = node->compareValueAt(src, i);
  1630. if (rc>0)
  1631. a = i+1;
  1632. else
  1633. b = i;
  1634. }
  1635. if (node->isLeaf())
  1636. {
  1637. if (a<node->getNumKeys())
  1638. nodeKey = a;
  1639. else
  1640. {
  1641. offset_t nextPos = node->nextNodeFpos(); // This can happen at eof because of key peculiarity where level above reports ffff as last
  1642. node.setown(key.getNode(nextPos, ctx));
  1643. nodeKey = 0;
  1644. }
  1645. if (node)
  1646. {
  1647. node->getValueAt(nodeKey, dst);
  1648. return true;
  1649. }
  1650. else
  1651. return false;
  1652. }
  1653. else
  1654. {
  1655. if (a<node->getNumKeys())
  1656. {
  1657. offset_t npos = node->getFPosAt(a);
  1658. node.setown(key.getNode(npos, ctx));
  1659. }
  1660. else
  1661. return false;
  1662. }
  1663. }
  1664. }
  1665. bool CKeyCursor::ltEqual(const char *src, char *dst, bool seekForward)
  1666. {
  1667. atomic_inc(&key.keySeeks);
  1668. unsigned lwm = 0;
  1669. if (seekForward && node)
  1670. {
  1671. // When seeking forward, there are two cases worth optimizing:
  1672. // 1. next record is > src, so we return current
  1673. // 2. The record we want is on the current page
  1674. unsigned numKeys = node->getNumKeys();
  1675. if (nodeKey < numKeys-1)
  1676. {
  1677. int rc = node->compareValueAt(src, ++nodeKey);
  1678. if (rc < 0)
  1679. {
  1680. node->getValueAt(--nodeKey, dst);
  1681. return true;
  1682. }
  1683. if (nodeKey < numKeys-1)
  1684. {
  1685. rc = node->compareValueAt(src, numKeys-1);
  1686. if (rc < 0)
  1687. lwm = nodeKey;
  1688. }
  1689. }
  1690. }
  1691. if (!lwm)
  1692. node.set(key.rootNode);
  1693. loop
  1694. {
  1695. unsigned int a = lwm;
  1696. int b = node->getNumKeys();
  1697. // Locate first record greater than src
  1698. while ((int)a<b)
  1699. {
  1700. int i = (a+b+1)/2;
  1701. int rc = node->compareValueAt(src, i-1);
  1702. if (rc>=0)
  1703. a = i;
  1704. else
  1705. b = i-1;
  1706. }
  1707. if (node->isLeaf())
  1708. {
  1709. // record we want is the one before first record greater than src.
  1710. if (a>0)
  1711. nodeKey = a-1;
  1712. else
  1713. {
  1714. offset_t prevPos = node->prevNodeFpos();
  1715. node.setown(key.getNode(prevPos, ctx));
  1716. if (node)
  1717. nodeKey = node->getNumKeys()-1;
  1718. }
  1719. if (node)
  1720. {
  1721. node->getValueAt(nodeKey, dst);
  1722. return true;
  1723. }
  1724. else
  1725. return false;
  1726. }
  1727. else
  1728. {
  1729. // Node to look in is the first one one that ended greater than src.
  1730. if (a==node->getNumKeys())
  1731. a--; // value being looked for is off the end of the index.
  1732. offset_t npos = node->getFPosAt(a);
  1733. node.setown(key.getNode(npos, ctx));
  1734. if (!node)
  1735. throw MakeStringException(0, "Invalid key %s: child node pointer should never be NULL", key.name.get());
  1736. }
  1737. }
  1738. }
  1739. void CKeyCursor::serializeCursorPos(MemoryBuffer &mb)
  1740. {
  1741. if (node)
  1742. {
  1743. mb.append(node->getFpos());
  1744. mb.append(nodeKey);
  1745. }
  1746. else
  1747. {
  1748. offset_t zero = 0;
  1749. unsigned zero2 = 0;
  1750. mb.append(zero);
  1751. mb.append(zero2);
  1752. }
  1753. }
  1754. void CKeyCursor::deserializeCursorPos(MemoryBuffer &mb, char *keyBuffer)
  1755. {
  1756. offset_t nodeAddress;
  1757. mb.read(nodeAddress);
  1758. mb.read(nodeKey);
  1759. if (nodeAddress)
  1760. {
  1761. node.setown(key.getNode(nodeAddress, ctx));
  1762. if (node && keyBuffer)
  1763. node->getValueAt(nodeKey, keyBuffer);
  1764. }
  1765. else
  1766. node.clear();
  1767. }
  1768. const byte *CKeyCursor::loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  1769. {
  1770. const byte *ret = key.loadBlob(blobid, blobsize);
  1771. activeBlobs.append(ret);
  1772. return ret;
  1773. }
  1774. void CKeyCursor::releaseBlobs()
  1775. {
  1776. ForEachItemIn(idx, activeBlobs)
  1777. {
  1778. free((void *) activeBlobs.item(idx));
  1779. }
  1780. activeBlobs.kill();
  1781. }
  1782. class CLazyKeyIndex : public CInterface, implements IKeyIndex
  1783. {
  1784. StringAttr keyfile;
  1785. unsigned crc;
  1786. Linked<IDelayedFile> iFileIO;
  1787. Owned<IKeyIndex> realKey;
  1788. CriticalSection c;
  1789. bool isTLK;
  1790. bool preloadAllowed;
  1791. inline IKeyIndex &checkOpen()
  1792. {
  1793. CriticalBlock b(c);
  1794. if (!realKey)
  1795. {
  1796. IMemoryMappedFile *mapped = useMemoryMappedIndexes ? iFileIO->queryMappedFile() : 0;
  1797. if (mapped)
  1798. realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
  1799. else
  1800. realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO->queryFileIO(), isTLK, preloadAllowed));
  1801. if (!realKey)
  1802. {
  1803. DBGLOG("Lazy key file %s could not be opened", keyfile.get());
  1804. throw MakeStringException(0, "Lazy key file %s could not be opened", keyfile.get());
  1805. }
  1806. }
  1807. return *realKey;
  1808. }
  1809. public:
  1810. IMPLEMENT_IINTERFACE;
  1811. CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_iFileIO, bool _isTLK, bool _preloadAllowed)
  1812. : keyfile(_keyfile), crc(_crc), iFileIO(_iFileIO), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
  1813. {}
  1814. virtual bool IsShared() const { return CInterface::IsShared(); }
  1815. virtual IKeyCursor *getCursor(IContextLogger *ctx) { return checkOpen().getCursor(ctx); }
  1816. virtual size32_t keySize() { return checkOpen().keySize(); }
  1817. virtual size32_t keyedSize() { return checkOpen().keyedSize(); }
  1818. virtual bool hasPayload() { return checkOpen().hasPayload(); }
  1819. virtual bool isTopLevelKey() { return checkOpen().isTopLevelKey(); }
  1820. virtual bool isFullySorted() { return checkOpen().isFullySorted(); }
  1821. virtual unsigned getFlags() { return checkOpen().getFlags(); }
  1822. virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) { checkOpen().dumpNode(out, pos, count, isRaw); }
  1823. virtual unsigned numParts() { return 1; }
  1824. virtual IKeyIndex *queryPart(unsigned idx) { return idx ? NULL : this; }
  1825. virtual unsigned queryScans() { return realKey ? realKey->queryScans() : 0; }
  1826. virtual unsigned querySeeks() { return realKey ? realKey->querySeeks() : 0; }
  1827. virtual const char *queryFileName() { return keyfile.get(); }
  1828. virtual offset_t queryBlobHead() { return checkOpen().queryBlobHead(); }
  1829. virtual void resetCounts() { if (realKey) realKey->resetCounts(); }
  1830. virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
  1831. virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
  1832. virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
  1833. virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
  1834. };
  1835. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
  1836. {
  1837. return queryKeyStore()->load(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  1838. }
  1839. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, bool isTLK, bool preloadAllowed)
  1840. {
  1841. return queryKeyStore()->load(keyfile, crc, isTLK, preloadAllowed);
  1842. }
  1843. extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed)
  1844. {
  1845. StringBuffer filePath;
  1846. const RemoteFilename &rfn = part.queryCopies().item(0);
  1847. rfn.getPath(filePath);
  1848. return queryKeyStore()->load(filePath.str(), crc, part, isTLK, preloadAllowed);
  1849. }
  1850. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, bool isTLK, bool preloadAllowed)
  1851. {
  1852. return new CLazyKeyIndex(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  1853. }
  1854. extern jhtree_decl void clearKeyStoreCache(bool killAll)
  1855. {
  1856. queryKeyStore()->clearCache(killAll);
  1857. }
  1858. extern jhtree_decl void clearKeyStoreCacheEntry(const char *name)
  1859. {
  1860. queryKeyStore()->clearCacheEntry(name);
  1861. }
  1862. extern jhtree_decl StringBuffer &getIndexMetrics(StringBuffer &ret)
  1863. {
  1864. return queryKeyStore()->getMetrics(ret);
  1865. }
  1866. extern jhtree_decl void resetIndexMetrics()
  1867. {
  1868. queryKeyStore()->resetMetrics();
  1869. }
  1870. extern jhtree_decl bool isKeyFile(const char *filename)
  1871. {
  1872. OwnedIFile file = createIFile(filename);
  1873. OwnedIFileIO io = file->open(IFOread);
  1874. unsigned __int64 size = file->size();
  1875. if (size)
  1876. {
  1877. KeyHdr hdr;
  1878. if (io->read(0, sizeof(hdr), &hdr) == sizeof(hdr))
  1879. {
  1880. _WINREV(hdr.phyrec);
  1881. _WINREV(hdr.root);
  1882. _WINREV(hdr.nodeSize);
  1883. if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.root && hdr.root % hdr.nodeSize == 0)
  1884. {
  1885. NodeHdr root;
  1886. if (io->read(hdr.root, sizeof(root), &root) == sizeof(root))
  1887. {
  1888. _WINREV(root.rightSib);
  1889. _WINREV(root.leftSib);
  1890. return root.leftSib==0 && root.rightSib==0;
  1891. }
  1892. }
  1893. }
  1894. }
  1895. return false;
  1896. }
  1897. extern jhtree_decl bool setNodeCachePreload(bool preload)
  1898. {
  1899. return queryNodeCache()->setNodeCachePreload(preload);
  1900. }
  1901. extern jhtree_decl size32_t setNodeCacheMem(size32_t cacheSize)
  1902. {
  1903. return queryNodeCache()->setNodeCacheMem(cacheSize);
  1904. }
  1905. extern jhtree_decl size32_t setLeafCacheMem(size32_t cacheSize)
  1906. {
  1907. return queryNodeCache()->setLeafCacheMem(cacheSize);
  1908. }
  1909. extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize)
  1910. {
  1911. return queryNodeCache()->setBlobCacheMem(cacheSize);
  1912. }
  1913. ///////////////////////////////////////////////////////////////////////////////
  1914. // CNodeCache impl.
  1915. ///////////////////////////////////////////////////////////////////////////////
  1916. CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IContextLogger *ctx, bool isTLK)
  1917. {
  1918. // MORE - could probably be improved - I think having the cache template separate is not helping us here
  1919. // Also one cache per key would surely be faster, and could still use a global total
  1920. if (!pos)
  1921. return NULL;
  1922. {
  1923. // It's a shame that we don't know the type before we read it. But probably not that big a deal
  1924. CriticalBlock block(lock);
  1925. CKeyIdAndPos key(iD, pos);
  1926. if (preloadNodes)
  1927. {
  1928. CJHTreeNode *cacheNode = preloadCache.query(key);
  1929. if (cacheNode)
  1930. {
  1931. atomic_inc(&cacheHits);
  1932. if (ctx) ctx->noteStatistic(STATS_PRELOADCACHEHIT, 1, 1);
  1933. atomic_inc(&preloadCacheHits);
  1934. return LINK(cacheNode);
  1935. }
  1936. }
  1937. if (cacheNodes)
  1938. {
  1939. CJHTreeNode *cacheNode = nodeCache.query(key);
  1940. if (cacheNode)
  1941. {
  1942. atomic_inc(&cacheHits);
  1943. if (ctx) ctx->noteStatistic(STATS_NODECACHEHIT, 1, 1);
  1944. atomic_inc(&nodeCacheHits);
  1945. return LINK(cacheNode);
  1946. }
  1947. }
  1948. if (cacheLeaves)
  1949. {
  1950. CJHTreeNode *cacheNode = leafCache.query(key);
  1951. if (cacheNode)
  1952. {
  1953. atomic_inc(&cacheHits);
  1954. if (ctx) ctx->noteStatistic(STATS_LEAFCACHEHIT, 1, 1);
  1955. atomic_inc(&leafCacheHits);
  1956. return LINK(cacheNode);
  1957. }
  1958. }
  1959. if (cacheBlobs)
  1960. {
  1961. CJHTreeNode *cacheNode = blobCache.query(key);
  1962. if (cacheNode)
  1963. {
  1964. atomic_inc(&cacheHits);
  1965. if (ctx) ctx->noteStatistic(STATS_BLOBCACHEHIT, 1, 1);
  1966. atomic_inc(&blobCacheHits);
  1967. return LINK(cacheNode);
  1968. }
  1969. }
  1970. CJHTreeNode *node;
  1971. {
  1972. CriticalUnblock block(lock);
  1973. node = keyIndex->loadNode(pos); // NOTE - don't want cache locked while we load!
  1974. }
  1975. atomic_inc(&cacheAdds);
  1976. if (node->isBlob())
  1977. {
  1978. if (cacheBlobs)
  1979. {
  1980. CJHTreeNode *cacheNode = blobCache.query(key); // check if added to cache while we were reading
  1981. if (cacheNode)
  1982. {
  1983. ::Release(node);
  1984. atomic_inc(&cacheHits);
  1985. if (ctx) ctx->noteStatistic(STATS_BLOBCACHEHIT, 1, 1);
  1986. atomic_inc(&blobCacheHits);
  1987. return LINK(cacheNode);
  1988. }
  1989. if (ctx) ctx->noteStatistic(STATS_BLOBCACHEADD, 1, 1);
  1990. atomic_inc(&blobCacheAdds);
  1991. blobCache.add(key, *LINK(node));
  1992. }
  1993. }
  1994. else if (node->isLeaf() && !isTLK) // leaves in TLK are cached as if they were nodes
  1995. {
  1996. if (cacheLeaves)
  1997. {
  1998. CJHTreeNode *cacheNode = leafCache.query(key); // check if added to cache while we were reading
  1999. if (cacheNode)
  2000. {
  2001. ::Release(node);
  2002. atomic_inc(&cacheHits);
  2003. if (ctx) ctx->noteStatistic(STATS_LEAFCACHEHIT, 1, 1);
  2004. atomic_inc(&leafCacheHits);
  2005. return LINK(cacheNode);
  2006. }
  2007. if (ctx) ctx->noteStatistic(STATS_LEAFCACHEADD, 1, 1);
  2008. atomic_inc(&leafCacheAdds);
  2009. leafCache.add(key, *LINK(node));
  2010. }
  2011. }
  2012. else
  2013. {
  2014. if (cacheNodes)
  2015. {
  2016. CJHTreeNode *cacheNode = nodeCache.query(key); // check if added to cache while we were reading
  2017. if (cacheNode)
  2018. {
  2019. ::Release(node);
  2020. atomic_inc(&cacheHits);
  2021. if (ctx) ctx->noteStatistic(STATS_NODECACHEHIT, 1, 1);
  2022. atomic_inc(&nodeCacheHits);
  2023. return LINK(cacheNode);
  2024. }
  2025. if (ctx) ctx->noteStatistic(STATS_NODECACHEADD, 1, 1);
  2026. atomic_inc(&nodeCacheAdds);
  2027. nodeCache.add(key, *LINK(node));
  2028. }
  2029. }
  2030. return node;
  2031. }
  2032. }
  2033. void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger *ctx)
  2034. {
  2035. assertex(pos);
  2036. assertex(preloadNodes);
  2037. CriticalBlock block(lock);
  2038. CKeyIdAndPos key(iD, pos);
  2039. CJHTreeNode *cacheNode = preloadCache.query(key);
  2040. if (!cacheNode)
  2041. {
  2042. atomic_inc(&cacheAdds);
  2043. if (ctx) ctx->noteStatistic(STATS_PRELOADCACHEADD, 1, 1);
  2044. atomic_inc(&preloadCacheAdds);
  2045. preloadCache.add(key, *LINK(node));
  2046. }
  2047. }
  2048. bool CNodeCache::isPreloaded(int iD, offset_t pos)
  2049. {
  2050. CriticalBlock block(lock);
  2051. CKeyIdAndPos key(iD, pos);
  2052. return NULL != preloadCache.query(key);
  2053. }
  2054. atomic_t cacheAdds;
  2055. atomic_t cacheHits;
  2056. atomic_t nodesLoaded;
  2057. atomic_t blobCacheHits;
  2058. atomic_t blobCacheAdds;
  2059. atomic_t leafCacheHits;
  2060. atomic_t leafCacheAdds;
  2061. atomic_t nodeCacheHits;
  2062. atomic_t nodeCacheAdds;
  2063. atomic_t preloadCacheHits;
  2064. atomic_t preloadCacheAdds;
  2065. void clearNodeStats()
  2066. {
  2067. atomic_set(&cacheAdds, 0);
  2068. atomic_set(&cacheHits, 0);
  2069. atomic_set(&nodesLoaded, 0);
  2070. atomic_set(&blobCacheHits, 0);
  2071. atomic_set(&blobCacheAdds, 0);
  2072. atomic_set(&leafCacheHits, 0);
  2073. atomic_set(&leafCacheAdds, 0);
  2074. atomic_set(&nodeCacheHits, 0);
  2075. atomic_set(&nodeCacheAdds, 0);
  2076. atomic_set(&preloadCacheHits, 0);
  2077. atomic_set(&preloadCacheAdds, 0);
  2078. }
  2079. //------------------------------------------------------------------------------------------------
  2080. class CKeyMerger : public CKeyLevelManager
  2081. {
  2082. unsigned *mergeheap;
  2083. unsigned numkeys;
  2084. unsigned activekeys;
  2085. IArrayOf<IKeyCursor> cursorArray;
  2086. PointerArray bufferArray;
  2087. PointerArray fixedArray;
  2088. BoolArray matchedArray;
  2089. UInt64Array fposArray;
  2090. UnsignedArray mergeHeapArray;
  2091. UnsignedArray keyNoArray;
  2092. offset_t *fposes;
  2093. IKeyCursor **cursors;
  2094. char **buffers;
  2095. void **fixeds;
  2096. bool *matcheds;
  2097. unsigned sortFieldOffset;
  2098. unsigned sortFromSeg;
  2099. bool resetPending;
  2100. // #define BuffCompare(a, b) memcmp(buffers[mergeheap[a]]+sortFieldOffset, buffers[mergeheap[b]]+sortFieldOffset, keySize-sortFieldOffset) // NOTE - compare whole key not just keyed part.
  2101. inline int BuffCompare(unsigned a, unsigned b)
  2102. {
  2103. const char *c1 = buffers[mergeheap[a]];
  2104. const char *c2 = buffers[mergeheap[b]];
  2105. int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, keySize-sortFieldOffset); // NOTE - compare whole key not just keyed part.
  2106. if (!ret && sortFieldOffset)
  2107. ret = memcmp(c1, c2, sortFieldOffset);
  2108. return ret;
  2109. }
  2110. Linked<IKeyIndexBase> keyset;
  2111. void resetKey(unsigned i)
  2112. {
  2113. matcheds[i] = false;
  2114. segs.setLow(sortFromSeg, buffers[i]);
  2115. IKeyCursor *cursor = cursors[i];
  2116. if (cursor)
  2117. cursor->reset();
  2118. }
  2119. void replicateForTrailingSort()
  2120. {
  2121. // For each key that is present, we need to repeat it for each distinct value of leading segmonitor fields that is present in the index.
  2122. // We also need to make sure that sortFromSeg is properly set
  2123. sortFromSeg = (unsigned) -1;
  2124. ForEachItemIn(idx, segs.segMonitors)
  2125. {
  2126. IKeySegmentMonitor &seg = segs.segMonitors.item(idx);
  2127. unsigned offset = seg.getOffset();
  2128. if (offset == sortFieldOffset)
  2129. {
  2130. sortFromSeg = idx;
  2131. break;
  2132. }
  2133. IKeySegmentMonitor *override = createOverrideableKeySegmentMonitor(LINK(&seg));
  2134. segs.segMonitors.replace(*override, idx);
  2135. }
  2136. if (sortFromSeg == -1)
  2137. assertex(!"Attempting to sort from offset that is not on a segment boundary"); // MORE - can use the information that we have earlier to make sure not merged
  2138. assertex(resetPending == true); // we do the actual replication in reset
  2139. }
  2140. void dumpMergeHeap()
  2141. {
  2142. DBGLOG("---------");
  2143. for (unsigned i = 0; i < activekeys; i++)
  2144. {
  2145. int key = mergeheap[i];
  2146. DBGLOG("%d %.*s %d", key, keySize, buffers[key], matcheds[key]);
  2147. }
  2148. }
  2149. inline void setSegOverrides(unsigned key)
  2150. {
  2151. keyBuffer = buffers[key];
  2152. keyCursor = cursors[key];
  2153. matched = matcheds[key];
  2154. lookupFpos = fposes[key];
  2155. for (unsigned segno = 0; segno < sortFromSeg; segno++)
  2156. {
  2157. IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
  2158. sm->setOverrideBuffer(fixeds[key]);
  2159. }
  2160. segs.recalculateCache();
  2161. }
  2162. public:
  2163. IMPLEMENT_IINTERFACE;
  2164. CKeyMerger(IKeyIndexSet *_keyset, size32_t _eclKeySize, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(NULL, _eclKeySize, _ctx), sortFieldOffset(_sortFieldOffset)
  2165. {
  2166. segs.setMergeBarrier(sortFieldOffset);
  2167. init();
  2168. setKey(_keyset);
  2169. }
  2170. CKeyMerger(IKeyIndex *_onekey, size32_t _eclKeySize, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(NULL, _eclKeySize, _ctx), sortFieldOffset(_sortFieldOffset)
  2171. {
  2172. segs.setMergeBarrier(sortFieldOffset);
  2173. init();
  2174. setKey(_onekey);
  2175. }
  2176. ~CKeyMerger()
  2177. {
  2178. killBuffers();
  2179. keyCursor = NULL; // so parent class doesn't delete it!
  2180. keyBuffer = NULL; // ditto
  2181. }
  2182. void killBuffers()
  2183. {
  2184. ForEachItemIn(idx, bufferArray)
  2185. {
  2186. free(bufferArray.item(idx));
  2187. }
  2188. ForEachItemIn(idx1, fixedArray)
  2189. {
  2190. free(fixedArray.item(idx1));
  2191. }
  2192. cursorArray.kill();
  2193. keyCursor = NULL; // cursorArray owns cursors
  2194. matchedArray.kill();
  2195. fposArray.kill();
  2196. mergeHeapArray.kill();
  2197. bufferArray.kill();
  2198. fixedArray.kill();
  2199. keyNoArray.kill();
  2200. cursors = NULL;
  2201. matcheds = NULL;
  2202. fposes = NULL;
  2203. mergeheap = NULL;
  2204. buffers = NULL;
  2205. fixeds = NULL;
  2206. }
  2207. void init()
  2208. {
  2209. numkeys = 0;
  2210. activekeys = 0;
  2211. resetPending = true;
  2212. sortFromSeg = 0;
  2213. }
  2214. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  2215. {
  2216. // Rather like a lookup, except that no records below the value indicated by seek* should be returned.
  2217. if (resetPending)
  2218. {
  2219. resetSort(seek, seekOffset, seeklen);
  2220. if (!activekeys)
  2221. return false;
  2222. #ifdef _DEBUG
  2223. if (traceSmartStepping)
  2224. DBGLOG("SKIP: init key = %d", mergeheap[0]);
  2225. #endif
  2226. return true;
  2227. }
  2228. else
  2229. {
  2230. if (!activekeys)
  2231. {
  2232. #ifdef _DEBUG
  2233. if (traceSmartStepping)
  2234. DBGLOG("SKIP: merge done");
  2235. #endif
  2236. return false;
  2237. }
  2238. unsigned key = mergeheap[0];
  2239. #ifdef _DEBUG
  2240. if (traceSmartStepping)
  2241. DBGLOG("SKIP: merging key = %d", key);
  2242. #endif
  2243. unsigned compares = 0;
  2244. loop
  2245. {
  2246. if (CKeyLevelManager::lookupSkip(seek, seekOffset, seeklen) )
  2247. {
  2248. fposes[key] = lookupFpos;
  2249. }
  2250. else
  2251. {
  2252. activekeys--;
  2253. if (!activekeys)
  2254. {
  2255. if (ctx)
  2256. ctx->noteStatistic(STATS_INDEX_MERGECOMPARES, compares, 1);
  2257. return false;
  2258. }
  2259. eof = false;
  2260. mergeheap[0] = mergeheap[activekeys];
  2261. }
  2262. /* The key associated with mergeheap[0] will have changed
  2263. This code restores the heap property
  2264. */
  2265. unsigned p = 0; /* parent */
  2266. while (1)
  2267. {
  2268. unsigned c = p*2 + 1; /* child */
  2269. if ( c >= activekeys )
  2270. break;
  2271. /* Select smaller child */
  2272. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2273. /* If child is greater or equal than parent then we are done */
  2274. if ( BuffCompare( c, p ) >= 0 )
  2275. break;
  2276. /* Swap parent and child */
  2277. int r = mergeheap[c];
  2278. mergeheap[c] = mergeheap[p];
  2279. mergeheap[p] = r;
  2280. /* child becomes parent */
  2281. p = c;
  2282. }
  2283. if (key != mergeheap[0])
  2284. {
  2285. key = mergeheap[0];
  2286. setSegOverrides(key);
  2287. }
  2288. if (memcmp(seek, keyBuffer+seekOffset, seeklen) <= 0)
  2289. {
  2290. #ifdef _DEBUG
  2291. if (traceSmartStepping)
  2292. {
  2293. DBGLOG("SKIP: merged key = %d", key);
  2294. StringBuffer recstr;
  2295. unsigned i;
  2296. for (i = 0; i < keySize; i++)
  2297. {
  2298. unsigned char c = ((unsigned char *) keyBuffer)[i];
  2299. recstr.appendf("%c", isprint(c) ? c : '.');
  2300. }
  2301. recstr.append (" ");
  2302. for (i = 0; i < keySize; i++)
  2303. {
  2304. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  2305. }
  2306. DBGLOG("SKIP: Out skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  2307. }
  2308. #endif
  2309. if (ctx)
  2310. ctx->noteStatistic(STATS_INDEX_MERGECOMPARES, compares, 1);
  2311. return true;
  2312. }
  2313. else
  2314. {
  2315. compares++;
  2316. if (ctx && (compares == 100))
  2317. {
  2318. ctx->noteStatistic(STATS_INDEX_MERGECOMPARES, compares, 1); // also checks for abort...
  2319. compares = 0;
  2320. }
  2321. }
  2322. }
  2323. }
  2324. }
  2325. virtual void setLayoutTranslator(IRecordLayoutTranslator * trans)
  2326. {
  2327. if (trans)
  2328. throw MakeStringException(0, "Layout translation not supported when merging key parts, as it may change sort order");
  2329. // it MIGHT be possible to support translation still if all keyCursors have the same translation
  2330. // would have to translate AFTER the merge, but that's ok
  2331. // HOWEVER the result won't be guaranteed to be in sorted order afterwards so is there any point?
  2332. }
  2333. virtual void setKey(IKeyIndexBase *_keyset)
  2334. {
  2335. keyset.set(_keyset);
  2336. if (_keyset && _keyset->numParts())
  2337. {
  2338. IKeyIndex *ki = _keyset->queryPart(0);
  2339. keySize = ki->keySize();
  2340. if (!keySize)
  2341. throw MakeStringException(0, "Invalid key size 0 in key %s", ki->queryFileName());
  2342. keyedSize = ki->keyedSize();
  2343. numkeys = _keyset->numParts();
  2344. }
  2345. else
  2346. numkeys = 0;
  2347. killBuffers();
  2348. }
  2349. void resetSort(const void *seek, size32_t seekOffset, size32_t seeklen)
  2350. {
  2351. activekeys = 0;
  2352. keyBuffer = NULL;
  2353. void *fixedValue = NULL;
  2354. unsigned segno;
  2355. for (segno = 0; segno < sortFromSeg; segno++)
  2356. {
  2357. IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
  2358. sm->setOverrideBuffer(NULL);
  2359. }
  2360. segs.recalculateCache();
  2361. unsigned i;
  2362. for (i = 0; i < numkeys; i++)
  2363. {
  2364. assertex(keySize);
  2365. if (!keyBuffer) keyBuffer = (char *) malloc(keySize);
  2366. segs.setLow(0, keyBuffer);
  2367. loop
  2368. {
  2369. keyCursor = keyset->queryPart(i)->getCursor(ctx);
  2370. matched = false;
  2371. eof = false;
  2372. keyCursor->reset();
  2373. bool found;
  2374. unsigned lskips = 0;
  2375. unsigned lnullSkips = 0;
  2376. loop
  2377. {
  2378. if (seek)
  2379. {
  2380. if (skipTo(seek, seekOffset, seeklen))
  2381. lskips++;
  2382. else
  2383. lnullSkips++;
  2384. }
  2385. found = _lookup(true, segs.lastRealSeg());
  2386. if (!found || !seek || memcmp(keyBuffer + seekOffset, seek, seeklen) >= 0)
  2387. break;
  2388. }
  2389. noteSkips(lskips, lnullSkips);
  2390. if (found)
  2391. {
  2392. keyNoArray.append(i);
  2393. cursorArray.append(*keyCursor);
  2394. bufferArray.append(keyBuffer);
  2395. matchedArray.append(matched);
  2396. fposArray.append(lookupFpos);
  2397. mergeHeapArray.append(activekeys++);
  2398. if (!sortFromSeg)
  2399. {
  2400. keyBuffer = NULL;
  2401. fixedValue = NULL;
  2402. break;
  2403. }
  2404. assertex(sortFieldOffset);
  2405. void *fixedValue = malloc(sortFieldOffset);
  2406. memcpy(fixedValue, keyBuffer, sortFieldOffset);
  2407. fixedArray.append(fixedValue);
  2408. #ifdef _DEBUG
  2409. if (traceSmartStepping)
  2410. {
  2411. StringBuffer recstr;
  2412. unsigned i;
  2413. for (i = 0; i < sortFieldOffset; i++)
  2414. {
  2415. unsigned char c = ((unsigned char *) keyBuffer)[i];
  2416. recstr.appendf("%c", isprint(c) ? c : '.');
  2417. }
  2418. recstr.append (" ");
  2419. for (i = 0; i < keySize; i++)
  2420. {
  2421. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  2422. }
  2423. DBGLOG("Adding key cursor info for %s", recstr.str());
  2424. }
  2425. #endif
  2426. // Now advance segments 0 through sortFromSeg-1 to next legal value...
  2427. assertex(keySize);
  2428. char *nextBuffer = (char *) malloc(keySize);
  2429. memcpy(nextBuffer, keyBuffer, keySize);
  2430. keyBuffer = nextBuffer;
  2431. #ifdef _DEBUG
  2432. assertex(segs.segMonitors.item(sortFromSeg-1).matches(keyBuffer));
  2433. #endif
  2434. if (!segs.incrementKey(sortFromSeg-1, keyBuffer))
  2435. break;
  2436. }
  2437. else
  2438. {
  2439. keyCursor->Release();
  2440. break;
  2441. }
  2442. }
  2443. }
  2444. free (keyBuffer);
  2445. keyBuffer = NULL;
  2446. if (activekeys>0)
  2447. {
  2448. if (ctx)
  2449. ctx->noteStatistic(STATS_INDEX_MERGES, activekeys, 1);
  2450. cursors = cursorArray.getArray();
  2451. fposes = fposArray.getArray();
  2452. matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
  2453. buffers = (char **) bufferArray.getArray();
  2454. mergeheap = mergeHeapArray.getArray();
  2455. fixeds = fixedArray.getArray();
  2456. /* Permute mergeheap to establish the heap property
  2457. For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
  2458. The children of p must both be greater than or equal to p
  2459. The parent of a child c is given by p = (c-1)/2
  2460. */
  2461. for (i=1; i<activekeys; i++)
  2462. {
  2463. int r = mergeheap[i];
  2464. int c = i; /* child */
  2465. while (c > 0)
  2466. {
  2467. int p = (c-1)/2; /* parent */
  2468. if ( BuffCompare( c, p ) >= 0 )
  2469. break;
  2470. mergeheap[c] = mergeheap[p];
  2471. mergeheap[p] = r;
  2472. c = p;
  2473. }
  2474. }
  2475. setSegOverrides(mergeheap[0]);
  2476. eof = false;
  2477. }
  2478. else
  2479. {
  2480. keyBuffer = NULL;
  2481. keyCursor = NULL;
  2482. matched = false;
  2483. eof = true;
  2484. }
  2485. resetPending = false;
  2486. }
  2487. virtual void reset(bool crappyHack)
  2488. {
  2489. if (!started)
  2490. {
  2491. started = true;
  2492. segs.checkSize(keyedSize, "[merger]"); //PG: not sure what keyname to use here
  2493. }
  2494. if (!crappyHack)
  2495. {
  2496. killBuffers();
  2497. resetPending = true;
  2498. }
  2499. else
  2500. {
  2501. setSegOverrides(mergeheap[0]);
  2502. resetPending = false;
  2503. }
  2504. }
  2505. virtual bool lookup(bool exact)
  2506. {
  2507. assertex(exact);
  2508. if (resetPending)
  2509. {
  2510. resetSort(NULL, 0, 0);
  2511. if (!activekeys)
  2512. return false;
  2513. }
  2514. else
  2515. {
  2516. if (!activekeys)
  2517. return false;
  2518. unsigned key = mergeheap[0];
  2519. if (CKeyLevelManager::lookup(exact))
  2520. {
  2521. fposes[key] = lookupFpos;
  2522. }
  2523. else
  2524. {
  2525. activekeys--;
  2526. if (!activekeys)
  2527. return false; // MORE - does this lose a record?
  2528. eof = false;
  2529. mergeheap[0] = mergeheap[activekeys];
  2530. }
  2531. /* The key associated with mergeheap[0] will have changed
  2532. This code restores the heap property
  2533. */
  2534. unsigned p = 0; /* parent */
  2535. while (1)
  2536. {
  2537. unsigned c = p*2 + 1; /* child */
  2538. if ( c >= activekeys )
  2539. break;
  2540. /* Select smaller child */
  2541. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2542. /* If child is greater or equal than parent then we are done */
  2543. if ( BuffCompare( c, p ) >= 0 )
  2544. break;
  2545. /* Swap parent and child */
  2546. int r = mergeheap[c];
  2547. mergeheap[c] = mergeheap[p];
  2548. mergeheap[p] = r;
  2549. /* child becomes parent */
  2550. p = c;
  2551. }
  2552. // dumpMergeHeap();
  2553. if (mergeheap[0] != key)
  2554. setSegOverrides(mergeheap[0]);
  2555. }
  2556. return true;
  2557. }
  2558. virtual unsigned __int64 getCount()
  2559. {
  2560. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2561. // as the sequence numbers are not in sequence
  2562. unsigned __int64 ret = 0;
  2563. if (resetPending)
  2564. resetSort(NULL, 0, 0); // This is slightly suboptimal
  2565. for (unsigned i = 0; i < activekeys; i++)
  2566. {
  2567. unsigned key = mergeheap[i];
  2568. keyBuffer = buffers[key];
  2569. keyCursor = cursors[key];
  2570. ret += CKeyLevelManager::getCount();
  2571. }
  2572. return ret;
  2573. }
  2574. virtual unsigned __int64 checkCount(unsigned __int64 max)
  2575. {
  2576. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2577. // as the sequence numbers are not in sequence
  2578. unsigned __int64 ret = 0;
  2579. if (resetPending)
  2580. resetSort(NULL, 0, 0); // this is a little suboptimal as we will not bail out early
  2581. for (unsigned i = 0; i < activekeys; i++)
  2582. {
  2583. unsigned key = mergeheap[i];
  2584. keyBuffer = buffers[key];
  2585. keyCursor = cursors[key];
  2586. unsigned __int64 thisKeyCount = CKeyLevelManager::checkCount(max);
  2587. ret += thisKeyCount;
  2588. if (thisKeyCount > max)
  2589. return ret;
  2590. max -= thisKeyCount;
  2591. }
  2592. return ret;
  2593. }
  2594. virtual void serializeCursorPos(MemoryBuffer &mb)
  2595. {
  2596. // dumpMergeHeap();
  2597. mb.append(eof);
  2598. mb.append(activekeys);
  2599. for (unsigned i = 0; i < activekeys; i++)
  2600. {
  2601. unsigned key = mergeheap[i];
  2602. mb.append(keyNoArray.item(key));
  2603. cursors[key]->serializeCursorPos(mb);
  2604. mb.append(matcheds[key]);
  2605. mb.append(fposes[key]);
  2606. }
  2607. }
  2608. virtual void deserializeCursorPos(MemoryBuffer &mb)
  2609. {
  2610. mb.read(eof);
  2611. mb.read(activekeys);
  2612. for (unsigned i = 0; i < activekeys; i++)
  2613. {
  2614. unsigned keyno;
  2615. mb.read(keyno);
  2616. keyNoArray.append(keyno);
  2617. keyCursor = keyset->queryPart(keyno)->getCursor(ctx);
  2618. assertex(keySize);
  2619. keyBuffer = (char *) malloc(keySize);
  2620. cursorArray.append(*keyCursor);
  2621. keyCursor->deserializeCursorPos(mb, keyBuffer);
  2622. mb.read(matched);
  2623. matchedArray.append(matched);
  2624. offset_t fpos;
  2625. mb.read(fpos);
  2626. fposArray.append(fpos);
  2627. bufferArray.append(keyBuffer);
  2628. void *fixedValue = (char *) malloc(sortFieldOffset);
  2629. memcpy(fixedValue, keyBuffer, sortFieldOffset); // If it's not at EOF then it must match
  2630. fixedArray.append(fixedValue);
  2631. mergeHeapArray.append(i);
  2632. }
  2633. cursors = cursorArray.getArray();
  2634. fposes = fposArray.getArray();
  2635. matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
  2636. buffers = (char **) bufferArray.getArray();
  2637. mergeheap = mergeHeapArray.getArray();
  2638. fixeds = fixedArray.getArray();
  2639. }
  2640. virtual void finishSegmentMonitors()
  2641. {
  2642. CKeyLevelManager::finishSegmentMonitors();
  2643. if (sortFieldOffset)
  2644. replicateForTrailingSort();
  2645. }
  2646. };
  2647. extern jhtree_decl IKeyManager *createKeyMerger(IKeyIndexSet * _keys, unsigned _rawSize, unsigned _sortFieldOffset, IContextLogger *_ctx)
  2648. {
  2649. return new CKeyMerger(_keys, _rawSize, _sortFieldOffset, _ctx);
  2650. }
  2651. extern jhtree_decl IKeyManager *createSingleKeyMerger(IKeyIndex * _onekey, unsigned _rawSize, unsigned _sortFieldOffset, IContextLogger *_ctx)
  2652. {
  2653. return new CKeyMerger(_onekey, _rawSize, _sortFieldOffset, _ctx);
  2654. }
  2655. class CKeyIndexSet : public CInterface, implements IKeyIndexSet
  2656. {
  2657. IArrayOf<IKeyIndex> indexes;
  2658. offset_t recordCount;
  2659. offset_t totalSize;
  2660. StringAttr origFileName;
  2661. public:
  2662. IMPLEMENT_IINTERFACE;
  2663. virtual bool IsShared() const { return CInterface::IsShared(); }
  2664. void addIndex(IKeyIndex *i) { indexes.append(*i); }
  2665. virtual unsigned numParts() { return indexes.length(); }
  2666. virtual IKeyIndex *queryPart(unsigned partNo) { return &indexes.item(partNo); }
  2667. virtual void setRecordCount(offset_t count) { recordCount = count; }
  2668. virtual void setTotalSize(offset_t size) { totalSize = size; }
  2669. virtual offset_t getRecordCount() { return recordCount; }
  2670. virtual offset_t getTotalSize() { return totalSize; }
  2671. };
  2672. extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
  2673. {
  2674. return new CKeyIndexSet;
  2675. }
  2676. extern jhtree_decl IKeyManager *createKeyManager(IKeyIndex *key, unsigned _rawSize, IContextLogger *_ctx)
  2677. {
  2678. return new CKeyLevelManager(key, _rawSize, _ctx);
  2679. }
  2680. class CKeyArray : public CInterface, implements IKeyArray
  2681. {
  2682. public:
  2683. IMPLEMENT_IINTERFACE;
  2684. virtual bool IsShared() const { return CInterface::IsShared(); }
  2685. IPointerArrayOf<IKeyIndexBase> keys;
  2686. virtual IKeyIndexBase *queryKeyPart(unsigned partNo)
  2687. {
  2688. if (!keys.isItem(partNo))
  2689. {
  2690. return NULL;
  2691. DBGLOG("getKeyPart requested invalid part %d", partNo);
  2692. throw MakeStringException(0, "queryKeyPart requested invalid part %d", partNo);
  2693. }
  2694. IKeyIndexBase *key = keys.item(partNo);
  2695. return key;
  2696. }
  2697. virtual unsigned length() { return keys.length(); }
  2698. void addKey(IKeyIndexBase *f) { keys.append(f); }
  2699. };
  2700. extern jhtree_decl IKeyArray *createKeyArray()
  2701. {
  2702. return new CKeyArray;
  2703. }
  2704. #ifdef _USE_CPPUNIT
  2705. #include "unittests.hpp"
  2706. class IKeyManagerTest : public CppUnit::TestFixture
  2707. {
  2708. CPPUNIT_TEST_SUITE( IKeyManagerTest );
  2709. CPPUNIT_TEST(testStepping);
  2710. CPPUNIT_TEST(testKeys);
  2711. CPPUNIT_TEST_SUITE_END();
  2712. void testStepping()
  2713. {
  2714. buildTestKeys(false, false, false, false);
  2715. {
  2716. // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
  2717. unsigned maxSize = 10; // (variable && blobby) ? 18 : 10;
  2718. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2719. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  2720. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2721. keyset->addIndex(index1.getClear());
  2722. keyset->addIndex(index2.getClear());
  2723. Owned <IKeyManager> tlk1 = createKeyMerger(keyset, maxSize, 7, NULL);
  2724. Owned<IStringSet> sset1 = createStringSet(7);
  2725. sset1->addRange("0000003", "0000003");
  2726. sset1->addRange("0000005", "0000006");
  2727. tlk1->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 7));
  2728. Owned<IStringSet> sset2 = createStringSet(3);
  2729. sset2->addRange("010", "010");
  2730. sset2->addRange("030", "033");
  2731. Owned<IStringSet> sset3 = createStringSet(3);
  2732. sset3->addRange("999", "XXX");
  2733. sset3->addRange("000", "002");
  2734. tlk1->append(createKeySegmentMonitor(false, sset2.getLink(), 7, 3));
  2735. tlk1->finishSegmentMonitors();
  2736. tlk1->reset();
  2737. offset_t fpos;
  2738. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000003010", 10)==0);
  2739. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000005010", 10)==0);
  2740. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000006010", 10)==0);
  2741. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000003030", 10)==0);
  2742. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000005030", 10)==0);
  2743. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000006030", 10)==0);
  2744. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000003031", 10)==0);
  2745. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000005031", 10)==0);
  2746. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000006031", 10)==0);
  2747. MemoryBuffer mb;
  2748. tlk1->serializeCursorPos(mb);
  2749. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000003032", 10)==0);
  2750. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000005032", 10)==0);
  2751. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000006032", 10)==0);
  2752. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000003033", 10)==0);
  2753. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000005033", 10)==0);
  2754. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000006033", 10)==0);
  2755. ASSERT(!tlk1->lookup(true));
  2756. ASSERT(!tlk1->lookup(true));
  2757. Owned <IKeyManager> tlk2 = createKeyMerger(NULL, maxSize, 7, NULL);
  2758. tlk2->setKey(keyset);
  2759. tlk2->deserializeCursorPos(mb);
  2760. tlk2->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 7));
  2761. tlk2->append(createKeySegmentMonitor(false, sset2.getLink(), 7, 3));
  2762. tlk2->finishSegmentMonitors();
  2763. tlk2->reset(true);
  2764. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000003032", 10)==0);
  2765. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000005032", 10)==0);
  2766. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000006032", 10)==0);
  2767. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000003033", 10)==0);
  2768. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000005033", 10)==0);
  2769. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000006033", 10)==0);
  2770. ASSERT(!tlk2->lookup(true));
  2771. ASSERT(!tlk2->lookup(true));
  2772. Owned <IKeyManager> tlk3 = createKeyMerger(NULL, maxSize, 7, NULL);
  2773. tlk3->setKey(keyset);
  2774. tlk3->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 7));
  2775. tlk3->append(createKeySegmentMonitor(false, sset2.getLink(), 7, 3));
  2776. tlk3->finishSegmentMonitors();
  2777. tlk3->reset(false);
  2778. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), "0000003010", 10)==0);
  2779. ASSERT(tlk3->lookupSkip("031", 7, 3)); ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), "0000003031", 10)==0);
  2780. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), "0000005031", 10)==0);
  2781. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), "0000006031", 10)==0);
  2782. ASSERT(!tlk3->lookupSkip("081", 7, 3));
  2783. ASSERT(!tlk3->lookup(true));
  2784. Owned <IKeyManager> tlk4 = createKeyMerger(NULL, maxSize, 7, NULL);
  2785. tlk4->setKey(keyset);
  2786. tlk4->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 7));
  2787. tlk4->append(createKeySegmentMonitor(false, sset3.getLink(), 7, 3));
  2788. tlk4->finishSegmentMonitors();
  2789. tlk4->reset(false);
  2790. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000003000", 10)==0);
  2791. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000005000", 10)==0);
  2792. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000006000", 10)==0);
  2793. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000003001", 10)==0);
  2794. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000005001", 10)==0);
  2795. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000006001", 10)==0);
  2796. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000003002", 10)==0);
  2797. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000005002", 10)==0);
  2798. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000006002", 10)==0);
  2799. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000003999", 10)==0);
  2800. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000005999", 10)==0);
  2801. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(fpos), "0000006999", 10)==0);
  2802. ASSERT(!tlk4->lookup(true));
  2803. ASSERT(!tlk4->lookup(true));
  2804. }
  2805. clearKeyStoreCache(true);
  2806. removeTestKeys();
  2807. }
  2808. void buildTestKeys(bool shortForm, bool indar, bool variable, bool blobby)
  2809. {
  2810. buildTestKey("keyfile1.$$$", false, shortForm, indar, variable, blobby);
  2811. buildTestKey("keyfile2.$$$", true, shortForm, indar, variable, blobby);
  2812. }
  2813. void buildTestKey(const char *filename, bool skip, bool shortForm, bool indar, bool variable, bool blobby)
  2814. {
  2815. OwnedIFile file = createIFile(filename);
  2816. OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
  2817. Owned<IFileIOStream> out = createIOStream(io);
  2818. unsigned maxRecSize = (variable && blobby) ? 18 : 10;
  2819. unsigned keyedSize = (shortForm || (variable && blobby)) ? 10 : (unsigned) -1;
  2820. Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY | (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0);
  2821. char keybuf[18];
  2822. memset(keybuf, '0', 18);
  2823. for (unsigned count = 0; count < 10000; count++)
  2824. {
  2825. unsigned datasize = 10;
  2826. if (blobby && variable && (count % 10)==0)
  2827. {
  2828. char *blob = new char[count+100000];
  2829. byte seed = count;
  2830. for (unsigned i = 0; i < count+100000; i++)
  2831. {
  2832. blob[i] = seed;
  2833. seed = seed * 13 + i;
  2834. }
  2835. offset_t blobid = builder->createBlob(count+100000, blob);
  2836. memcpy(keybuf + 10, &blobid, sizeof(blobid));
  2837. delete [] blob;
  2838. datasize += sizeof(blobid);
  2839. }
  2840. bool skipme = (count % 4 == 0) != skip;
  2841. if (!skipme)
  2842. {
  2843. builder->processKeyData(keybuf, count*10, datasize);
  2844. if (count==48 || count==49)
  2845. builder->processKeyData(keybuf, count*10, datasize);
  2846. }
  2847. unsigned idx = 9;
  2848. loop
  2849. {
  2850. if (keybuf[idx]=='9')
  2851. keybuf[idx--]='0';
  2852. else
  2853. {
  2854. keybuf[idx]++;
  2855. break;
  2856. }
  2857. }
  2858. }
  2859. builder->finish();
  2860. out->flush();
  2861. }
  2862. void removeTestKeys()
  2863. {
  2864. ASSERT(remove("keyfile1.$$$")==0);
  2865. ASSERT(remove("keyfile2.$$$")==0);
  2866. }
  2867. void checkBlob(IKeyManager *key, unsigned size)
  2868. {
  2869. unsigned __int64 blobid;
  2870. offset_t fpos;
  2871. memcpy(&blobid, key->queryKeyBuffer(fpos)+10, sizeof(blobid));
  2872. ASSERT(blobid != 0);
  2873. size32_t blobsize;
  2874. const byte *blob = key->loadBlob(blobid, blobsize);
  2875. ASSERT(blob != NULL);
  2876. ASSERT(blobsize == size);
  2877. byte seed = size-100000;
  2878. for (unsigned i = 0; i < size; i++)
  2879. {
  2880. ASSERT(blob[i] == seed);
  2881. seed = seed * 13 + i;
  2882. }
  2883. key->releaseBlobs();
  2884. }
  2885. protected:
  2886. void testKeys(bool shortForm, bool indar, bool variable, bool blobby)
  2887. {
  2888. // If it is not a supported combination, just return
  2889. if (indar)
  2890. {
  2891. if (shortForm || variable || blobby)
  2892. return;
  2893. }
  2894. if (blobby)
  2895. {
  2896. if (!shortForm || !variable)
  2897. return;
  2898. }
  2899. buildTestKeys(shortForm, indar, variable, blobby);
  2900. {
  2901. unsigned maxSize = (variable && blobby) ? 18 : 10;
  2902. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2903. Owned <IKeyManager> tlk1 = createKeyManager(index1, maxSize, NULL);
  2904. Owned<IStringSet> sset1 = createStringSet(10);
  2905. sset1->addRange("0000000001", "0000000100");
  2906. tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 10));
  2907. tlk1->finishSegmentMonitors();
  2908. tlk1->reset();
  2909. Owned <IKeyManager> tlk1a = createKeyManager(index1, maxSize, NULL);
  2910. Owned<IStringSet> sset1a = createStringSet(8);
  2911. sset1a->addRange("00000000", "00000001");
  2912. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 8));
  2913. tlk1a->append(createKeySegmentMonitor(false, NULL, 8, 1));
  2914. sset1a.setown(createStringSet(1));
  2915. sset1a->addRange("0", "1");
  2916. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 9, 1));
  2917. tlk1a->finishSegmentMonitors();
  2918. tlk1a->reset();
  2919. /* loop
  2920. {
  2921. offset_t fpos;
  2922. if (!tlk1a->lookup(true))
  2923. break;
  2924. DBGLOG("%.10s", tlk1a->queryKeyBuffer(fpos));
  2925. }
  2926. tlk1a->reset();
  2927. */
  2928. Owned<IStringSet> ssetx = createStringSet(10);
  2929. ssetx->addRange("0000000001", "0000000002");
  2930. ASSERT(ssetx->numValues() == 2);
  2931. ssetx->addRange("00000000AK", "00000000AL");
  2932. ASSERT(ssetx->numValues() == 4);
  2933. ssetx->addRange("0000000100", "0010000000");
  2934. ASSERT(ssetx->numValues() == (unsigned) -1);
  2935. ssetx->addRange("0000000001", "0010000000");
  2936. ASSERT(ssetx->numValues() == (unsigned) -1);
  2937. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  2938. Owned <IKeyManager> tlk2 = createKeyManager(index2, maxSize, NULL);
  2939. Owned<IStringSet> sset2 = createStringSet(10);
  2940. sset2->addRange("0000000001", "0000000100");
  2941. ASSERT(sset2->numValues() == 65536);
  2942. tlk2->append(createKeySegmentMonitor(false, sset2.getClear(), 0, 10));
  2943. tlk2->finishSegmentMonitors();
  2944. tlk2->reset();
  2945. Owned <IKeyManager> tlk3;
  2946. if (!variable)
  2947. {
  2948. Owned<IKeyIndexSet> both = createKeyIndexSet();
  2949. both->addIndex(index1.getLink());
  2950. both->addIndex(index2.getLink());
  2951. Owned<IStringSet> sset3 = createStringSet(10);
  2952. tlk3.setown(createKeyMerger(NULL, maxSize, 0, NULL));
  2953. tlk3->setKey(both);
  2954. sset3->addRange("0000000001", "0000000100");
  2955. tlk3->append(createKeySegmentMonitor(false, sset3.getClear(), 0, 10));
  2956. tlk3->finishSegmentMonitors();
  2957. tlk3->reset();
  2958. }
  2959. Owned <IKeyManager> tlk2a = createKeyManager(index2, maxSize, NULL);
  2960. Owned<IStringSet> sset2a = createStringSet(10);
  2961. sset2a->addRange("0000000048", "0000000048");
  2962. ASSERT(sset2a->numValues() == 1);
  2963. tlk2a->append(createKeySegmentMonitor(false, sset2a.getClear(), 0, 10));
  2964. tlk2a->finishSegmentMonitors();
  2965. tlk2a->reset();
  2966. Owned <IKeyManager> tlk2b = createKeyManager(index2, maxSize, NULL);
  2967. Owned<IStringSet> sset2b = createStringSet(10);
  2968. sset2b->addRange("0000000047", "0000000049");
  2969. ASSERT(sset2b->numValues() == 3);
  2970. tlk2b->append(createKeySegmentMonitor(false, sset2b.getClear(), 0, 10));
  2971. tlk2b->finishSegmentMonitors();
  2972. tlk2b->reset();
  2973. Owned <IKeyManager> tlk2c = createKeyManager(index2, maxSize, NULL);
  2974. Owned<IStringSet> sset2c = createStringSet(10);
  2975. sset2c->addRange("0000000047", "0000000047");
  2976. tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 10));
  2977. tlk2c->finishSegmentMonitors();
  2978. tlk2c->reset();
  2979. ASSERT(tlk1->getCount() == 76);
  2980. ASSERT(tlk1->getCount() == 76);
  2981. ASSERT(tlk1a->getCount() == 30);
  2982. ASSERT(tlk2->getCount() == 26);
  2983. ASSERT(tlk2a->getCount() == 2);
  2984. ASSERT(tlk2b->getCount() == 2);
  2985. ASSERT(tlk2c->getCount() == 0);
  2986. if (tlk3)
  2987. ASSERT(tlk3->getCount() == 102);
  2988. // MORE - PUT SOME TESTS IN FOR WILD SEEK STUFF
  2989. unsigned pass;
  2990. char buf[11];
  2991. unsigned i;
  2992. for (pass = 0; pass < 2; pass++)
  2993. {
  2994. offset_t fpos;
  2995. tlk1->reset();
  2996. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000001", 10)==0);
  2997. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000002", 10)==0);
  2998. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000003", 10)==0);
  2999. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000005", 10)==0);
  3000. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000006", 10)==0);
  3001. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000007", 10)==0);
  3002. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000009", 10)==0);
  3003. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(fpos), "0000000010", 10)==0);
  3004. if (blobby)
  3005. checkBlob(tlk1, 10+100000);
  3006. tlk1a->reset();
  3007. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000001", 10)==0);
  3008. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000010", 10)==0);
  3009. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000011", 10)==0);
  3010. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000021", 10)==0);
  3011. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000030", 10)==0);
  3012. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000031", 10)==0);
  3013. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000041", 10)==0);
  3014. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(fpos), "0000000050", 10)==0);
  3015. tlk2->reset();
  3016. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000004", 10)==0);
  3017. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000008", 10)==0);
  3018. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000012", 10)==0);
  3019. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000016", 10)==0);
  3020. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000020", 10)==0);
  3021. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000024", 10)==0);
  3022. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000028", 10)==0);
  3023. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000032", 10)==0);
  3024. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000036", 10)==0);
  3025. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000040", 10)==0);
  3026. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000044", 10)==0);
  3027. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000048", 10)==0);
  3028. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000048", 10)==0);
  3029. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(fpos), "0000000052", 10)==0);
  3030. if (tlk3)
  3031. {
  3032. tlk3->reset();
  3033. for (i = 1; i <= 100; i++)
  3034. {
  3035. ASSERT(tlk3->lookup(true));
  3036. sprintf(buf, "%010d", i);
  3037. ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), buf, 10)==0);
  3038. if (i==48 || i==49)
  3039. {
  3040. ASSERT(tlk3->lookup(true));
  3041. ASSERT(memcmp(tlk3->queryKeyBuffer(fpos), buf, 10)==0);
  3042. }
  3043. }
  3044. ASSERT(!tlk3->lookup(true));
  3045. ASSERT(!tlk3->lookup(true));
  3046. }
  3047. }
  3048. tlk1->releaseSegmentMonitors();
  3049. tlk2->releaseSegmentMonitors();
  3050. if (tlk3)
  3051. tlk3->releaseSegmentMonitors();
  3052. }
  3053. clearKeyStoreCache(true);
  3054. removeTestKeys();
  3055. }
  3056. void testKeys()
  3057. {
  3058. ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
  3059. for (unsigned i = 0; i < 16; i++)
  3060. testKeys((i & 0x8)!=0,(i & 0x4)!=0,(i & 0x2)!=0,(i & 0x1)!=0);
  3061. }
  3062. };
  3063. CPPUNIT_TEST_SUITE_REGISTRATION( IKeyManagerTest );
  3064. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( IKeyManagerTest, "IKeyManagerTest" );
  3065. #endif