jhtree.cpp 109 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. //****************************************************************************
  14. // Name: jhtree.cpp
  15. //
  16. // Purpose:
  17. //
  18. // Description:
  19. //
  20. // Notes: Supports only static (non-changing) files
  21. //
  22. // Initially I was holding on to the root nodes, but came to find
  23. // that they could potentially fill the cache by themselves...
  24. //
  25. // Things to play with:
  26. // - try not unpacking the entire node when it is read in.
  27. // break it out as needed later.
  28. //
  29. // History: 31-Aug-99 crs original
  30. // 08-Jan-00 nh added LZW compression of nodes
  31. // 14-feb-00 nh added GetORDKey
  32. // 15-feb-00 nh fixed isolatenode and nextNode
  33. // 12-Apr-00 jcs moved over to jhtree.dll etc.
  34. //****************************************************************************
  35. #include "platform.h"
  36. #include <stdio.h>
  37. #include <fcntl.h>
  38. #include <stdlib.h>
  39. #include <limits.h>
  40. #ifdef __linux__
  41. #include <alloca.h>
  42. #endif
  43. #include "hlzw.h"
  44. #include "jmutex.hpp"
  45. #include "jhutil.hpp"
  46. #include "jmisc.hpp"
  47. #include "jstats.h"
  48. #include "ctfile.hpp"
  49. #include "jhtree.ipp"
  50. #include "keybuild.hpp"
  51. #include "bloom.hpp"
  52. #include "eclhelper_dyn.hpp"
  53. #include "rtlrecord.hpp"
  54. #include "rtldynfield.hpp"
  55. static std::atomic<CKeyStore *> keyStore(nullptr);
  56. static unsigned defaultKeyIndexLimit = 200;
  57. static CNodeCache *nodeCache = NULL;
  58. static CriticalSection *initCrit = NULL;
  59. bool useMemoryMappedIndexes = false;
  60. bool linuxYield = false;
  61. bool traceSmartStepping = false;
  62. bool flushJHtreeCacheOnOOM = true;
  63. MODULE_INIT(INIT_PRIORITY_JHTREE_JHTREE)
  64. {
  65. initCrit = new CriticalSection;
  66. return 1;
  67. }
  68. MODULE_EXIT()
  69. {
  70. delete initCrit;
  71. delete keyStore.load(std::memory_order_relaxed);
  72. ::Release((CInterface*)nodeCache);
  73. }
  74. //#define DUMP_NODES
  75. SegMonitorList::SegMonitorList(const RtlRecord &_recInfo) : recInfo(_recInfo)
  76. {
  77. keySegCount = recInfo.getNumKeyedFields();
  78. reset();
  79. }
  80. SegMonitorList::SegMonitorList(const SegMonitorList &from, const char *fixedVals, unsigned sortFieldOffset)
  81. : recInfo(from.recInfo), keySegCount(from.keySegCount)
  82. {
  83. ForEachItemIn(idx, from.segMonitors)
  84. {
  85. IKeySegmentMonitor &seg = from.segMonitors.item(idx);
  86. unsigned offset = seg.getOffset();
  87. if (offset < sortFieldOffset)
  88. segMonitors.append(*createSingleKeySegmentMonitor(false, seg.queryFieldIndex(), offset, seg.getSize(), fixedVals+offset));
  89. else
  90. segMonitors.append(OLINK(seg));
  91. }
  92. recalculateCache();
  93. modified = false;
  94. }
  95. void SegMonitorList::describe(StringBuffer &out) const
  96. {
  97. for (unsigned idx=0; idx <= lastRealSeg() && idx < segMonitors.length(); idx++)
  98. {
  99. auto &filter = segMonitors.item(idx);
  100. if (idx)
  101. out.append(',');
  102. out.appendf("%s=", recInfo.queryName(idx));
  103. filter.describe(out, *recInfo.queryType(idx));
  104. }
  105. }
  106. bool SegMonitorList::matchesBuffer(const void *buffer, unsigned lastSeg, unsigned &matchSeg) const
  107. {
  108. if (segMonitors.length())
  109. {
  110. for (; matchSeg <= lastSeg; matchSeg++)
  111. {
  112. if (!segMonitors.item(matchSeg).matchesBuffer(buffer))
  113. return false;
  114. }
  115. }
  116. return true;
  117. }
  118. bool SegMonitorList::canMatch() const
  119. {
  120. ForEachItemIn(idx, segMonitors)
  121. {
  122. if (segMonitors.item(idx).isEmpty())
  123. return false;
  124. }
  125. return true;
  126. }
  127. IIndexFilter *SegMonitorList::item(unsigned idx) const
  128. {
  129. return &segMonitors.item(idx);
  130. }
  131. size32_t SegMonitorList::getSize() const
  132. {
  133. unsigned lim = segMonitors.length();
  134. if (lim)
  135. {
  136. IKeySegmentMonitor &lastItem = segMonitors.item(lim-1);
  137. return lastItem.getOffset() + lastItem.getSize();
  138. }
  139. else
  140. return 0;
  141. }
  142. void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname) const
  143. {
  144. size32_t segSize = getSize();
  145. if (segSize != keyedSize)
  146. {
  147. StringBuffer err;
  148. err.appendf("Key size mismatch on key %s - key size is %u, expected %u", keyname, keyedSize, getSize());
  149. IException *e = MakeStringExceptionDirect(1000, err.str());
  150. EXCLOG(e, err.str());
  151. throw e;
  152. }
  153. }
  154. void SegMonitorList::setLow(unsigned segno, void *keyBuffer) const
  155. {
  156. unsigned lim = segMonitors.length();
  157. while (segno < lim)
  158. segMonitors.item(segno++).setLow(keyBuffer);
  159. }
  160. unsigned SegMonitorList::setLowAfter(size32_t offset, void *keyBuffer) const
  161. {
  162. unsigned lim = segMonitors.length();
  163. unsigned segno = 0;
  164. unsigned skipped = 0;
  165. while (segno < lim)
  166. {
  167. IKeySegmentMonitor &seg = segMonitors.item(segno++);
  168. if (seg.getOffset() >= offset)
  169. seg.setLow(keyBuffer);
  170. else if (seg.getSize()+seg.getOffset() <= offset)
  171. skipped++;
  172. else
  173. {
  174. byte *temp = (byte *) alloca(seg.getSize() + seg.getOffset());
  175. seg.setLow(temp);
  176. memcpy((byte *)keyBuffer+offset, temp+offset, seg.getSize() - (offset - seg.getOffset()));
  177. }
  178. }
  179. return skipped;
  180. }
  181. void SegMonitorList::endRange(unsigned segno, void *keyBuffer) const
  182. {
  183. unsigned lim = segMonitors.length();
  184. if (segno < lim)
  185. segMonitors.item(segno++).endRange(keyBuffer);
  186. while (segno < lim)
  187. segMonitors.item(segno++).setHigh(keyBuffer);
  188. }
  189. bool SegMonitorList::incrementKey(unsigned segno, void *keyBuffer) const
  190. {
  191. // Increment the key buffer to next acceptable value
  192. for(;;)
  193. {
  194. if (segMonitors.item(segno).increment(keyBuffer))
  195. {
  196. setLow(segno+1, keyBuffer);
  197. return true;
  198. }
  199. if (!segno)
  200. return false;
  201. segno--;
  202. }
  203. }
  204. unsigned SegMonitorList::_lastRealSeg() const
  205. {
  206. unsigned seg = segMonitors.length();
  207. for (;;)
  208. {
  209. if (!seg)
  210. return 0;
  211. seg--;
  212. if (!segMonitors.item(seg).isWild()) // MORE - why not just remove them? Stepping/overrides?
  213. return seg;
  214. }
  215. }
  216. unsigned SegMonitorList::lastFullSeg() const
  217. {
  218. // This is used to determine what part of the segmonitor list to use for a pre-count to determine if atmost/limit have been hit
  219. // We include everything up to the last of i) the last keyed element or ii) the last keyed,opt element that has no wild between it and a keyed element
  220. // NOTE - can return (unsigned) -1 if there are no full segments
  221. unsigned len = segMonitors.length();
  222. unsigned seg = 0;
  223. unsigned ret = (unsigned) -1;
  224. bool wildSeen = false;
  225. while (seg < len)
  226. {
  227. if (segMonitors.item(seg).isWild())
  228. wildSeen = true;
  229. else
  230. {
  231. if (!wildSeen || !segMonitors.item(seg).isOptional())
  232. {
  233. ret = seg;
  234. wildSeen = false;
  235. }
  236. }
  237. seg++;
  238. }
  239. return ret;
  240. }
  241. void SegMonitorList::finish(unsigned keyedSize)
  242. {
  243. if (modified)
  244. {
  245. while (segMonitors.length() < keySegCount)
  246. {
  247. unsigned idx = segMonitors.length();
  248. size32_t offset = recInfo.getFixedOffset(idx);
  249. if (offset == keyedSize)
  250. {
  251. DBGLOG("SegMonitor record does not match key"); // Can happen when reading older indexes that don't save key information in metadata properly
  252. keySegCount = segMonitors.length();
  253. break;
  254. }
  255. size32_t size = recInfo.getFixedOffset(idx+1) - offset;
  256. segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
  257. }
  258. size32_t segSize = getSize();
  259. assertex(segSize == keyedSize);
  260. recalculateCache();
  261. modified = false;
  262. }
  263. }
  264. void SegMonitorList::recalculateCache()
  265. {
  266. cachedLRS = _lastRealSeg();
  267. }
  268. void SegMonitorList::reset()
  269. {
  270. segMonitors.kill();
  271. modified = true;
  272. }
  273. // interface IIndexReadContext
  274. void SegMonitorList::append(IKeySegmentMonitor *segment)
  275. {
  276. modified = true;
  277. unsigned fieldIdx = segment->queryFieldIndex();
  278. unsigned offset = segment->getOffset();
  279. unsigned size = segment->getSize();
  280. while (segMonitors.length() < fieldIdx)
  281. {
  282. unsigned idx = segMonitors.length();
  283. size32_t offset = recInfo.getFixedOffset(idx);
  284. size32_t size = recInfo.getFixedOffset(idx+1) - offset;
  285. segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
  286. }
  287. segMonitors.append(*segment);
  288. }
  289. void SegMonitorList::append(FFoption option, const IFieldFilter * filter)
  290. {
  291. throwUnexpected();
  292. }
  293. ///
  294. static UnexpectedVirtualFieldCallback unexpectedFieldCallback;
  295. class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
  296. {
  297. protected:
  298. KeyStatsCollector stats;
  299. Owned <IIndexFilterList> filter;
  300. IKeyCursor *keyCursor;
  301. ConstPointerArray activeBlobs;
  302. __uint64 partitionFieldMask = 0;
  303. unsigned indexParts = 0;
  304. unsigned keyedSize; // size of non-payload part of key
  305. bool started = false;
  306. bool newFilters = false;
  307. bool logExcessiveSeeks = false;
  308. Owned<const IDynamicTransform> layoutTrans;
  309. MemoryBuffer buf; // used when translating
  310. size32_t layoutSize = 0;
  311. public:
  312. IMPLEMENT_IINTERFACE;
  313. CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
  314. : stats(_ctx), newFilters(_newFilters), logExcessiveSeeks(_logExcessiveSeeks)
  315. {
  316. if (newFilters)
  317. filter.setown(new IndexRowFilter(_recInfo));
  318. else
  319. filter.setown(new SegMonitorList(_recInfo));
  320. keyCursor = NULL;
  321. keyedSize = 0;
  322. setKey(_key);
  323. }
  324. ~CKeyLevelManager()
  325. {
  326. ::Release(keyCursor);
  327. releaseBlobs();
  328. }
  329. virtual unsigned querySeeks() const
  330. {
  331. return stats.seeks;
  332. }
  333. virtual unsigned queryScans() const
  334. {
  335. return stats.scans;
  336. }
  337. virtual unsigned querySkips() const
  338. {
  339. return stats.skips;
  340. }
  341. virtual void resetCounts()
  342. {
  343. stats.reset();
  344. }
  345. void setKey(IKeyIndexBase * _key)
  346. {
  347. ::Release(keyCursor);
  348. keyCursor = NULL;
  349. if (_key)
  350. {
  351. assertex(_key->numParts()==1);
  352. IKeyIndex *ki = _key->queryPart(0);
  353. keyCursor = ki->getCursor(filter, logExcessiveSeeks);
  354. if (keyedSize)
  355. assertex(keyedSize == ki->keyedSize());
  356. else
  357. keyedSize = ki->keyedSize();
  358. partitionFieldMask = ki->getPartitionFieldMask();
  359. indexParts = ki->numPartitions();
  360. }
  361. }
  362. virtual unsigned getPartition() override
  363. {
  364. if (partitionFieldMask)
  365. {
  366. hash64_t hash = HASH64_INIT;
  367. if (getBloomHash(partitionFieldMask, *filter, hash))
  368. return (((unsigned) hash) % indexParts) + 1; // NOTE - the Hash distribute function that distributes the index when building will truncate to 32-bits before taking modulus - so we must too!
  369. }
  370. return 0;
  371. }
  372. virtual void setChooseNLimit(unsigned __int64 _rowLimit) override
  373. {
  374. // TODO ?
  375. }
  376. virtual void reset(bool crappyHack)
  377. {
  378. if (keyCursor)
  379. {
  380. if (!started)
  381. {
  382. started = true;
  383. filter->checkSize(keyedSize, keyCursor->queryName());
  384. }
  385. if (!crappyHack)
  386. {
  387. keyCursor->reset();
  388. }
  389. }
  390. }
  391. virtual void releaseSegmentMonitors()
  392. {
  393. filter->reset();
  394. started = false;
  395. }
  396. virtual void append(IKeySegmentMonitor *segment)
  397. {
  398. assertex(!newFilters && !started);
  399. filter->append(segment);
  400. }
  401. virtual void append(FFoption option, const IFieldFilter * fieldFilter)
  402. {
  403. assertex(newFilters && !started);
  404. filter->append(option, fieldFilter);
  405. }
  406. inline const byte *queryKeyBuffer()
  407. {
  408. if(layoutTrans)
  409. {
  410. buf.setLength(0);
  411. MemoryBufferBuilder aBuilder(buf, 0);
  412. layoutSize = layoutTrans->translate(aBuilder, unexpectedFieldCallback, reinterpret_cast<byte const *>(keyCursor->queryKeyBuffer()));
  413. return aBuilder.getSelf();
  414. }
  415. else
  416. return reinterpret_cast<byte const *>(keyCursor->queryKeyBuffer());
  417. }
  418. inline size32_t queryRowSize()
  419. {
  420. if (layoutTrans)
  421. return layoutSize;
  422. else
  423. return keyCursor ? keyCursor->getSize() : 0;
  424. }
  425. inline unsigned __int64 querySequence()
  426. {
  427. return keyCursor ? keyCursor->getSequence() : 0;
  428. }
  429. virtual bool lookup(bool exact)
  430. {
  431. if (keyCursor)
  432. return keyCursor->lookup(exact, stats);
  433. else
  434. return false;
  435. }
  436. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  437. {
  438. return keyCursor ? keyCursor->lookupSkip(seek, seekOffset, seeklen, stats) : false;
  439. }
  440. unsigned __int64 getCount()
  441. {
  442. assertex(keyCursor);
  443. return keyCursor->getCount(stats);
  444. }
  445. unsigned __int64 getCurrentRangeCount(unsigned groupSegCount)
  446. {
  447. assertex(keyCursor);
  448. return keyCursor->getCurrentRangeCount(groupSegCount, stats);
  449. }
  450. bool nextRange(unsigned groupSegCount)
  451. {
  452. assertex(keyCursor);
  453. return keyCursor->nextRange(groupSegCount);
  454. }
  455. unsigned __int64 checkCount(unsigned __int64 max)
  456. {
  457. assertex(keyCursor);
  458. return keyCursor->checkCount(max, stats);
  459. }
  460. virtual void serializeCursorPos(MemoryBuffer &mb)
  461. {
  462. keyCursor->serializeCursorPos(mb);
  463. }
  464. virtual void deserializeCursorPos(MemoryBuffer &mb)
  465. {
  466. keyCursor->deserializeCursorPos(mb, stats);
  467. }
  468. virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  469. {
  470. const byte *ret = keyCursor->loadBlob(blobid, blobsize);
  471. activeBlobs.append(ret);
  472. return ret;
  473. }
  474. virtual void releaseBlobs()
  475. {
  476. ForEachItemIn(idx, activeBlobs)
  477. {
  478. free((void *) activeBlobs.item(idx));
  479. }
  480. activeBlobs.kill();
  481. }
  482. virtual void setLayoutTranslator(const IDynamicTransform * trans) override
  483. {
  484. layoutTrans.set(trans);
  485. }
  486. virtual void finishSegmentMonitors()
  487. {
  488. filter->finish(keyedSize);
  489. }
  490. virtual void describeFilter(StringBuffer &out) const override
  491. {
  492. filter->describe(out);
  493. }
  494. };
  495. ///////////////////////////////////////////////////////////////////////////////
  496. ///////////////////////////////////////////////////////////////////////////////
  497. // For some reason #pragma pack does not seem to work here. Force all elements to 8 bytes
  498. class CKeyIdAndPos
  499. {
  500. public:
  501. unsigned __int64 keyId;
  502. offset_t pos;
  503. CKeyIdAndPos(unsigned __int64 _keyId, offset_t _pos) { keyId = _keyId; pos = _pos; }
  504. bool operator==(const CKeyIdAndPos &other) { return keyId == other.keyId && pos == other.pos; }
  505. };
  506. class CNodeMapping : public HTMapping<CJHTreeNode, CKeyIdAndPos>
  507. {
  508. public:
  509. CNodeMapping(CKeyIdAndPos &fp, CJHTreeNode &et) : HTMapping<CJHTreeNode, CKeyIdAndPos>(et, fp) { }
  510. ~CNodeMapping() { this->et.Release(); }
  511. CJHTreeNode &query() { return queryElement(); }
  512. };
  513. typedef OwningSimpleHashTableOf<CNodeMapping, CKeyIdAndPos> CNodeTable;
  514. #define FIXED_NODE_OVERHEAD (sizeof(CJHTreeNode))
  515. class CNodeMRUCache : public CMRUCacheOf<CKeyIdAndPos, CJHTreeNode, CNodeMapping, CNodeTable>
  516. {
  517. size32_t sizeInMem, memLimit;
  518. public:
  519. CNodeMRUCache(size32_t _memLimit) : memLimit(0)
  520. {
  521. sizeInMem = 0;
  522. setMemLimit(_memLimit);
  523. }
  524. size32_t setMemLimit(size32_t _memLimit)
  525. {
  526. size32_t oldMemLimit = memLimit;
  527. memLimit = _memLimit;
  528. if (full())
  529. makeSpace();
  530. return oldMemLimit;
  531. }
  532. virtual void makeSpace()
  533. {
  534. // remove LRU until !full
  535. do
  536. {
  537. clear(1);
  538. }
  539. while (full());
  540. }
  541. virtual bool full()
  542. {
  543. if (((size32_t)-1) == memLimit) return false;
  544. return sizeInMem > memLimit;
  545. }
  546. virtual void elementAdded(CNodeMapping *mapping)
  547. {
  548. CJHTreeNode &node = mapping->queryElement();
  549. sizeInMem += (FIXED_NODE_OVERHEAD+node.getMemSize());
  550. }
  551. virtual void elementRemoved(CNodeMapping *mapping)
  552. {
  553. CJHTreeNode &node = mapping->queryElement();
  554. sizeInMem -= (FIXED_NODE_OVERHEAD+node.getMemSize());
  555. }
  556. };
  557. class CNodeCache : public CInterface
  558. {
  559. private:
  560. mutable CriticalSection lock;
  561. CNodeMRUCache nodeCache;
  562. CNodeMRUCache leafCache;
  563. CNodeMRUCache blobCache;
  564. CNodeMRUCache preloadCache;
  565. bool cacheNodes;
  566. bool cacheLeaves;
  567. bool cacheBlobs;
  568. bool preloadNodes;
  569. public:
  570. CNodeCache(size32_t maxNodeMem, size32_t maxLeaveMem, size32_t maxBlobMem)
  571. : nodeCache(maxNodeMem), leafCache(maxLeaveMem), blobCache(maxBlobMem), preloadCache((unsigned) -1)
  572. {
  573. cacheNodes = maxNodeMem != 0;
  574. cacheLeaves = maxLeaveMem != 0;;
  575. cacheBlobs = maxBlobMem != 0;
  576. preloadNodes = false;
  577. // note that each index caches the last blob it unpacked so that sequential blobfetches are still ok
  578. }
  579. CJHTreeNode *getNode(INodeLoader *key, int keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
  580. void preload(CJHTreeNode *node, int keyID, offset_t pos, IContextLogger *ctx);
  581. bool isPreloaded(int keyID, offset_t pos);
  582. inline bool getNodeCachePreload()
  583. {
  584. return preloadNodes;
  585. }
  586. inline bool setNodeCachePreload(bool _preload)
  587. {
  588. bool oldPreloadNodes = preloadNodes;
  589. preloadNodes = _preload;
  590. return oldPreloadNodes;
  591. }
  592. inline size32_t setNodeCacheMem(size32_t newSize)
  593. {
  594. CriticalBlock block(lock);
  595. unsigned oldV = nodeCache.setMemLimit(newSize);
  596. cacheNodes = (newSize != 0);
  597. return oldV;
  598. }
  599. inline size32_t setLeafCacheMem(size32_t newSize)
  600. {
  601. CriticalBlock block(lock);
  602. unsigned oldV = leafCache.setMemLimit(newSize);
  603. cacheLeaves = (newSize != 0);
  604. return oldV;
  605. }
  606. inline size32_t setBlobCacheMem(size32_t newSize)
  607. {
  608. CriticalBlock block(lock);
  609. unsigned oldV = blobCache.setMemLimit(newSize);
  610. cacheBlobs = (newSize != 0);
  611. return oldV;
  612. }
  613. void clear()
  614. {
  615. CriticalBlock block(lock);
  616. nodeCache.kill();
  617. leafCache.kill();
  618. blobCache.kill();
  619. }
  620. };
  621. static inline CNodeCache *queryNodeCache()
  622. {
  623. if (nodeCache) return nodeCache; // avoid crit
  624. CriticalBlock b(*initCrit);
  625. if (!nodeCache) nodeCache = new CNodeCache(100*0x100000, 50*0x100000, 0);
  626. return nodeCache;
  627. }
  628. void clearNodeCache()
  629. {
  630. queryNodeCache()->clear();
  631. }
  632. inline CKeyStore *queryKeyStore()
  633. {
  634. CKeyStore * value = keyStore.load(std::memory_order_acquire);
  635. if (value) return value; // avoid crit
  636. CriticalBlock b(*initCrit);
  637. if (!keyStore.load(std::memory_order_acquire)) keyStore = new CKeyStore;
  638. return keyStore;
  639. }
  640. unsigned setKeyIndexCacheSize(unsigned limit)
  641. {
  642. return queryKeyStore()->setKeyCacheLimit(limit);
  643. }
  644. CKeyStore::CKeyStore() : keyIndexCache(defaultKeyIndexLimit)
  645. {
  646. nextId = 0;
  647. #if 0
  648. mm.setown(createSharedMemoryManager("RichardsSharedMemManager", 0x100000));
  649. try
  650. {
  651. if (mm)
  652. sharedCache.setown(mm->share());
  653. }
  654. catch (IException *E)
  655. {
  656. E->Release();
  657. }
  658. #endif
  659. }
  660. CKeyStore::~CKeyStore()
  661. {
  662. }
  663. unsigned CKeyStore::setKeyCacheLimit(unsigned limit)
  664. {
  665. return keyIndexCache.setCacheLimit(limit);
  666. }
  667. IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  668. {
  669. // isTLK provided by caller since flags in key header unreliable. If either say it's a TLK, I believe it.
  670. {
  671. MTIME_SECTION(queryActiveTimer(), "CKeyStore_load");
  672. IKeyIndex *keyIndex;
  673. // MORE - holds onto the mutex way too long
  674. synchronized block(mutex);
  675. StringBuffer fname;
  676. fname.append(fileName).append('/').append(crc);
  677. keyIndex = keyIndexCache.query(fname);
  678. if (NULL == keyIndex)
  679. {
  680. if (iMappedFile)
  681. {
  682. assert(!iFileIO && !part);
  683. keyIndex = new CMemKeyIndex(getUniqId(), LINK(iMappedFile), fname, isTLK);
  684. }
  685. else if (iFileIO)
  686. {
  687. assert(!part);
  688. keyIndex = new CDiskKeyIndex(getUniqId(), LINK(iFileIO), fname, isTLK, allowPreload);
  689. }
  690. else
  691. {
  692. Owned<IFile> iFile;
  693. if (part)
  694. {
  695. iFile.setown(part->open());
  696. if (NULL == iFile.get())
  697. throw MakeStringException(0, "Failed to open index file %s", fileName);
  698. }
  699. else
  700. iFile.setown(createIFile(fileName));
  701. IFileIO *fio = iFile->open(IFOread);
  702. if (fio)
  703. keyIndex = new CDiskKeyIndex(getUniqId(), fio, fname, isTLK, allowPreload);
  704. else
  705. throw MakeStringException(0, "Failed to open index file %s", fileName);
  706. }
  707. keyIndexCache.add(fname, *LINK(keyIndex));
  708. }
  709. else
  710. {
  711. LINK(keyIndex);
  712. }
  713. assertex(NULL != keyIndex);
  714. return keyIndex;
  715. }
  716. }
  717. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload)
  718. {
  719. return doload(fileName, crc, NULL, iFileIO, NULL, isTLK, allowPreload);
  720. }
  721. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  722. {
  723. return doload(fileName, crc, NULL, NULL, iMappedFile, isTLK, allowPreload);
  724. }
  725. // fileName+crc used only as key for cache
  726. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload)
  727. {
  728. return doload(fileName, crc, &part, NULL, NULL, isTLK, allowPreload);
  729. }
  730. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload)
  731. {
  732. return doload(fileName, crc, NULL, NULL, NULL, isTLK, allowPreload);
  733. }
  734. StringBuffer &CKeyStore::getMetrics(StringBuffer &xml)
  735. {
  736. xml.append(" <IndexMetrics>\n");
  737. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  738. ForEach(*iter)
  739. {
  740. CKeyIndexMapping &mapping = iter->query();
  741. IKeyIndex &index = mapping.query();
  742. const char *name = mapping.queryFindString();
  743. xml.appendf(" <Index name=\"%s\" scans=\"%d\" seeks=\"%d\"/>\n", name, index.queryScans(), index.querySeeks());
  744. }
  745. xml.append(" </IndexMetrics>\n");
  746. return xml;
  747. }
  748. void CKeyStore::resetMetrics()
  749. {
  750. synchronized block(mutex);
  751. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  752. ForEach(*iter)
  753. {
  754. CKeyIndexMapping &mapping = iter->query();
  755. IKeyIndex &index = mapping.query();
  756. index.resetCounts();
  757. }
  758. }
  759. void CKeyStore::clearCache(bool killAll)
  760. {
  761. synchronized block(mutex);
  762. if (killAll)
  763. {
  764. clearNodeCache(); // no point in keeping old nodes cached if key store cache has been cleared
  765. keyIndexCache.kill();
  766. }
  767. else
  768. {
  769. StringArray goers;
  770. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  771. ForEach(*iter)
  772. {
  773. CKeyIndexMapping &mapping = iter->query();
  774. IKeyIndex &index = mapping.query();
  775. if (!index.IsShared())
  776. {
  777. const char *name = mapping.queryFindString();
  778. goers.append(name);
  779. }
  780. }
  781. ForEachItemIn(idx, goers)
  782. {
  783. keyIndexCache.remove(goers.item(idx));
  784. }
  785. }
  786. }
  787. void CKeyStore::clearCacheEntry(const char *keyName)
  788. {
  789. if (!keyName || !*keyName)
  790. return; // nothing to do
  791. synchronized block(mutex);
  792. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  793. StringArray goers;
  794. ForEach(*iter)
  795. {
  796. CKeyIndexMapping &mapping = iter->query();
  797. IKeyIndex &index = mapping.query();
  798. if (!index.IsShared())
  799. {
  800. const char *name = mapping.queryFindString();
  801. if (strstr(name, keyName) != 0) // keyName doesn't have drive or part number associated with it
  802. goers.append(name);
  803. }
  804. }
  805. ForEachItemIn(idx, goers)
  806. {
  807. keyIndexCache.remove(goers.item(idx));
  808. }
  809. }
  810. void CKeyStore::clearCacheEntry(const IFileIO *io)
  811. {
  812. synchronized block(mutex);
  813. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  814. StringArray goers;
  815. ForEach(*iter)
  816. {
  817. CKeyIndexMapping &mapping = iter->query();
  818. IKeyIndex &index = mapping.query();
  819. if (!index.IsShared())
  820. {
  821. if (index.queryFileIO()==io)
  822. goers.append(mapping.queryFindString());
  823. }
  824. }
  825. ForEachItemIn(idx, goers)
  826. {
  827. keyIndexCache.remove(goers.item(idx));
  828. }
  829. }
  830. // CKeyIndex impl.
  831. CKeyIndex::CKeyIndex(int _iD, const char *_name) : name(_name)
  832. {
  833. iD = _iD;
  834. cache = queryNodeCache(); // use one node cache for all key indexes;
  835. cache->Link();
  836. keyHdr = NULL;
  837. rootNode = NULL;
  838. cachedBlobNodePos = 0;
  839. keySeeks.store(0);
  840. keyScans.store(0);
  841. latestGetNodeOffset = 0;
  842. }
  843. void CKeyIndex::cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK)
  844. {
  845. bool first = true;
  846. while (nodePos)
  847. {
  848. Owned<CJHTreeNode> node = loadNode(nodePos);
  849. if (node->isLeaf())
  850. {
  851. if (!isTLK)
  852. return;
  853. }
  854. else if (first)
  855. {
  856. cacheNodes(cache, node->getFPosAt(0), isTLK);
  857. first = false;
  858. }
  859. cache->preload(node, iD, nodePos, NULL);
  860. nodePos = node->getRightSib();
  861. }
  862. }
  863. void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
  864. {
  865. if (isTLK)
  866. hdr.ktype |= HTREE_TOPLEVEL_KEY; // thor does not set
  867. keyHdr = new CKeyHdr();
  868. try
  869. {
  870. keyHdr->load(hdr);
  871. offset_t rootPos = keyHdr->getRootFPos();
  872. Linked<CNodeCache> nodeCache = queryNodeCache();
  873. if (allowPreload)
  874. {
  875. if (nodeCache->getNodeCachePreload() && !nodeCache->isPreloaded(iD, rootPos))
  876. {
  877. cacheNodes(nodeCache, rootPos, isTLK);
  878. }
  879. }
  880. rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
  881. loadBloomFilters();
  882. }
  883. catch (IKeyException *ke)
  884. {
  885. if (!name.get()) throw;
  886. StringBuffer msg;
  887. IKeyException *ke2 = MakeKeyException(ke->errorCode(), "%s. In key '%s' (corrupt index?)", ke->errorMessage(msg).str(), name.get());
  888. ke->Release();
  889. throw ke2;
  890. }
  891. }
  892. CKeyIndex::~CKeyIndex()
  893. {
  894. ::Release(keyHdr);
  895. ::Release(cache);
  896. ::Release(rootNode);
  897. }
  898. CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
  899. : CKeyIndex(_iD, _name)
  900. {
  901. io.setown(_io);
  902. assertex(io->offset()==0); // mapped whole file
  903. assertex(io->length()==io->fileSize()); // mapped whole file
  904. KeyHdr hdr;
  905. if (io->length() < sizeof(hdr))
  906. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  907. memcpy(&hdr, io->base(), sizeof(hdr));
  908. if (hdr.ktype & USE_TRAILING_HEADER)
  909. {
  910. _WINREV(hdr.nodeSize);
  911. memcpy(&hdr, (io->base()+io->length()) - hdr.nodeSize, sizeof(hdr));
  912. }
  913. init(hdr, isTLK, false);
  914. }
  915. CJHTreeNode *CMemKeyIndex::loadNode(offset_t pos)
  916. {
  917. nodesLoaded++;
  918. if (pos + keyHdr->getNodeSize() > io->fileSize())
  919. {
  920. IException *E = MakeStringException(errno, "Error reading node at position %" I64F "x past EOF", pos);
  921. StringBuffer m;
  922. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  923. EXCLOG(E, m.str());
  924. throw E;
  925. }
  926. char *nodeData = (char *) (io->base() + pos);
  927. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  928. return CKeyIndex::loadNode(nodeData, pos, false);
  929. }
  930. CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
  931. : CKeyIndex(_iD, _name)
  932. {
  933. io.setown(_io);
  934. KeyHdr hdr;
  935. if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
  936. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  937. if (hdr.ktype & USE_TRAILING_HEADER)
  938. {
  939. _WINREV(hdr.nodeSize);
  940. if (!io->read(io->size() - hdr.nodeSize, sizeof(hdr), &hdr))
  941. throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", _name);
  942. }
  943. init(hdr, isTLK, allowPreload);
  944. }
  945. CJHTreeNode *CDiskKeyIndex::loadNode(offset_t pos)
  946. {
  947. nodesLoaded++;
  948. unsigned nodeSize = keyHdr->getNodeSize();
  949. MemoryAttr ma;
  950. char *nodeData = (char *) ma.allocate(nodeSize);
  951. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  952. if (io->read(pos, nodeSize, nodeData) != nodeSize)
  953. {
  954. IException *E = MakeStringException(errno, "Error %d reading node at position %" I64F "x", errno, pos);
  955. StringBuffer m;
  956. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  957. EXCLOG(E, m.str());
  958. throw E;
  959. }
  960. return CKeyIndex::loadNode(nodeData, pos, true);
  961. }
  962. CJHTreeNode *CKeyIndex::loadNode(char *nodeData, offset_t pos, bool needsCopy)
  963. {
  964. try
  965. {
  966. Owned<CJHTreeNode> ret;
  967. char leafFlag = ((NodeHdr *) nodeData)->leafFlag;
  968. switch(leafFlag)
  969. {
  970. case 0:
  971. ret.setown(new CJHTreeNode());
  972. break;
  973. case 1:
  974. if (keyHdr->isVariable())
  975. ret.setown(new CJHVarTreeNode());
  976. else if (keyHdr->isRowCompressed())
  977. ret.setown(new CJHRowCompressedNode());
  978. else
  979. ret.setown(new CJHTreeNode());
  980. break;
  981. case 2:
  982. ret.setown(new CJHTreeBlobNode());
  983. break;
  984. case 3:
  985. ret.setown(new CJHTreeMetadataNode());
  986. break;
  987. case 4:
  988. ret.setown(new CJHTreeBloomTableNode());
  989. break;
  990. default:
  991. throwUnexpected();
  992. }
  993. {
  994. MTIME_SECTION(queryActiveTimer(), "JHTREE load node");
  995. ret->load(keyHdr, nodeData, pos, needsCopy);
  996. }
  997. return ret.getClear();
  998. }
  999. catch (IException *E)
  1000. {
  1001. StringBuffer m;
  1002. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  1003. EXCLOG(E, m.str());
  1004. throw;
  1005. }
  1006. catch (...)
  1007. {
  1008. DBGLOG("Unknown exception in key %s, position 0x%" I64F "x", name.get(), pos);
  1009. throw;
  1010. }
  1011. }
  1012. bool CKeyIndex::isTopLevelKey()
  1013. {
  1014. return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0;
  1015. }
  1016. bool CKeyIndex::isFullySorted()
  1017. {
  1018. return (keyHdr->getKeyType() & HTREE_FULLSORT_KEY) != 0;
  1019. }
  1020. __uint64 CKeyIndex::getPartitionFieldMask()
  1021. {
  1022. return keyHdr->getPartitionFieldMask();
  1023. }
  1024. unsigned CKeyIndex::numPartitions()
  1025. {
  1026. return keyHdr->numPartitions();
  1027. }
  1028. IKeyCursor *CKeyIndex::getCursor(const IIndexFilterList *filter, bool logExcessiveSeeks)
  1029. {
  1030. return new CKeyCursor(*this, filter, logExcessiveSeeks);
  1031. }
  1032. CJHTreeNode *CKeyIndex::getNode(offset_t offset, IContextLogger *ctx)
  1033. {
  1034. latestGetNodeOffset = offset;
  1035. return cache->getNode(this, iD, offset, ctx, isTopLevelKey());
  1036. }
  1037. void dumpNode(FILE *out, CJHTreeNode *node, int length, unsigned rowCount, bool raw)
  1038. {
  1039. if (!raw)
  1040. fprintf(out, "Node dump: fpos(%" I64F "d) leaf(%d)\n", node->getFpos(), node->isLeaf());
  1041. if (rowCount==0 || rowCount > node->getNumKeys())
  1042. rowCount = node->getNumKeys();
  1043. for (unsigned int i=0; i<rowCount; i++)
  1044. {
  1045. char *dst = (char *) alloca(node->getKeyLen()+50);
  1046. node->getValueAt(i, dst);
  1047. if (raw)
  1048. {
  1049. fwrite(dst, 1, length, out);
  1050. }
  1051. else
  1052. {
  1053. offset_t pos = node->getFPosAt(i);
  1054. StringBuffer s;
  1055. appendURL(&s, dst, length, true);
  1056. fprintf(out, "keyVal %d [%" I64F "d] = %s\n", i, pos, s.str());
  1057. }
  1058. }
  1059. if (!raw)
  1060. fprintf(out, "==========\n");
  1061. }
  1062. void CKeyIndex::dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw)
  1063. {
  1064. Owned<CJHTreeNode> node = loadNode(pos);
  1065. ::dumpNode(out, node, keySize(), count, isRaw);
  1066. }
  1067. bool CKeyIndex::hasSpecialFileposition() const
  1068. {
  1069. return keyHdr->hasSpecialFileposition();
  1070. }
  1071. bool CKeyIndex::needsRowBuffer() const
  1072. {
  1073. return keyHdr->hasSpecialFileposition() || keyHdr->isRowCompressed();
  1074. }
  1075. size32_t CKeyIndex::keySize()
  1076. {
  1077. size32_t fileposSize = keyHdr->hasSpecialFileposition() ? sizeof(offset_t) : 0;
  1078. return keyHdr->getMaxKeyLength() + fileposSize;
  1079. }
  1080. size32_t CKeyIndex::keyedSize()
  1081. {
  1082. return keyHdr->getNodeKeyLength();
  1083. }
  1084. bool CKeyIndex::hasPayload()
  1085. {
  1086. return keyHdr->hasPayload();
  1087. }
  1088. CJHTreeBlobNode *CKeyIndex::getBlobNode(offset_t nodepos)
  1089. {
  1090. CriticalBlock b(blobCacheCrit);
  1091. if (nodepos != cachedBlobNodePos)
  1092. {
  1093. cachedBlobNode.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeBlobNode)); // note - don't use the cache
  1094. cachedBlobNodePos = nodepos;
  1095. }
  1096. return cachedBlobNode.getLink();
  1097. }
  1098. const byte *CKeyIndex::loadBlob(unsigned __int64 blobid, size32_t &blobSize)
  1099. {
  1100. offset_t nodepos = blobid & I64C(0xffffffffffff);
  1101. size32_t offset = (size32_t) ((blobid & I64C(0xffff000000000000)) >> 44);
  1102. Owned<CJHTreeBlobNode> blobNode = getBlobNode(nodepos);
  1103. size32_t sizeRemaining = blobNode->getTotalBlobSize(offset);
  1104. blobSize = sizeRemaining;
  1105. byte *ret = (byte *) malloc(sizeRemaining);
  1106. byte *finger = ret;
  1107. for (;;)
  1108. {
  1109. size32_t gotHere = blobNode->getBlobData(offset, finger);
  1110. assertex(gotHere <= sizeRemaining);
  1111. sizeRemaining -= gotHere;
  1112. finger += gotHere;
  1113. if (!sizeRemaining)
  1114. break;
  1115. blobNode.setown(getBlobNode(blobNode->getRightSib()));
  1116. offset = 0;
  1117. }
  1118. return ret;
  1119. }
  1120. offset_t CKeyIndex::queryMetadataHead()
  1121. {
  1122. offset_t ret = keyHdr->getHdrStruct()->metadataHead;
  1123. if(ret == static_cast<offset_t>(-1)) ret = 0; // index created before introduction of metadata would have FFFF... in this space
  1124. return ret;
  1125. }
  1126. void CKeyIndex::loadBloomFilters()
  1127. {
  1128. offset_t bloomAddr = keyHdr->getHdrStruct()->bloomHead;
  1129. if (!bloomAddr || bloomAddr == static_cast<offset_t>(-1))
  1130. return; // indexes created before introduction of bloomfilter would have FFFF... in this space
  1131. while (bloomAddr)
  1132. {
  1133. Owned<CJHTreeNode> node = loadNode(bloomAddr);
  1134. assertex(node->isBloom());
  1135. CJHTreeBloomTableNode &bloomNode = *static_cast<CJHTreeBloomTableNode *>(node.get());
  1136. bloomAddr = bloomNode.get8();
  1137. unsigned numHashes = bloomNode.get4();
  1138. __uint64 fields = bloomNode.get8();
  1139. unsigned bloomTableSize = bloomNode.get4();
  1140. MemoryBuffer bloomTable;
  1141. bloomTable.ensureCapacity(bloomTableSize);
  1142. for (;;)
  1143. {
  1144. static_cast<CJHTreeBloomTableNode *>(node.get())->get(bloomTable);
  1145. offset_t next = node->getRightSib();
  1146. if (!next)
  1147. break;
  1148. node.setown(loadNode(next));
  1149. assertex(node->isBloom());
  1150. }
  1151. assertex(bloomTable.length()==bloomTableSize);
  1152. //DBGLOG("Creating bloomfilter(%d, %d) for fields %" I64F "x",numHashes, bloomTableSize, fields);
  1153. bloomFilters.append(*new IndexBloomFilter(numHashes, bloomTableSize, (byte *) bloomTable.detach(), fields));
  1154. }
  1155. bloomFilters.sort(IndexBloomFilter::compare);
  1156. }
  1157. bool CKeyIndex::bloomFilterReject(const IIndexFilterList &segs) const
  1158. {
  1159. ForEachItemIn(idx, bloomFilters)
  1160. {
  1161. IndexBloomFilter &filter = bloomFilters.item(idx);
  1162. if (filter.reject(segs))
  1163. return true;
  1164. }
  1165. return false;
  1166. }
  1167. IPropertyTree * CKeyIndex::getMetadata()
  1168. {
  1169. offset_t nodepos = queryMetadataHead();
  1170. if(!nodepos)
  1171. return NULL;
  1172. Owned<CJHTreeMetadataNode> node;
  1173. StringBuffer xml;
  1174. while(nodepos)
  1175. {
  1176. node.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeMetadataNode));
  1177. node->get(xml);
  1178. nodepos = node->getRightSib();
  1179. }
  1180. IPropertyTree * ret;
  1181. try
  1182. {
  1183. ret = createPTreeFromXMLString(xml.str());
  1184. }
  1185. catch(IPTreeReadException * e)
  1186. {
  1187. StringBuffer emsg;
  1188. IException * wrapped = MakeStringException(e->errorAudience(), e->errorCode(), "Error retrieving XML metadata: %s", e->errorMessage(emsg).str());
  1189. e->Release();
  1190. throw wrapped;
  1191. }
  1192. return ret;
  1193. }
  1194. CJHTreeNode *CKeyIndex::locateFirstNode(KeyStatsCollector &stats)
  1195. {
  1196. keySeeks++;
  1197. stats.seeks++;
  1198. CJHTreeNode * n = 0;
  1199. CJHTreeNode * p = LINK(rootNode);
  1200. while (p != 0)
  1201. {
  1202. n = p;
  1203. p = getNode(n->prevNodeFpos(), stats.ctx);
  1204. if (p != 0)
  1205. n->Release();
  1206. }
  1207. return n;
  1208. }
  1209. CJHTreeNode *CKeyIndex::locateLastNode(KeyStatsCollector &stats)
  1210. {
  1211. keySeeks++;
  1212. stats.seeks++;
  1213. CJHTreeNode * n = 0;
  1214. CJHTreeNode * p = LINK(rootNode);
  1215. while (p != 0)
  1216. {
  1217. n = p;
  1218. p = getNode(n->nextNodeFpos(), stats.ctx);
  1219. if (p != 0)
  1220. n->Release();
  1221. }
  1222. return n;
  1223. }
  1224. void KeyStatsCollector::noteSeeks(unsigned lseeks, unsigned lscans, unsigned lwildseeks)
  1225. {
  1226. seeks += lseeks;
  1227. scans += lscans;
  1228. wildseeks += lwildseeks;
  1229. if (ctx)
  1230. {
  1231. if (lseeks) ctx->noteStatistic(StNumIndexSeeks, lseeks);
  1232. if (lscans) ctx->noteStatistic(StNumIndexScans, lscans);
  1233. if (lwildseeks) ctx->noteStatistic(StNumIndexWildSeeks, lwildseeks);
  1234. }
  1235. }
  1236. void KeyStatsCollector::noteSkips(unsigned lskips, unsigned lnullSkips)
  1237. {
  1238. skips += lskips;
  1239. if (ctx)
  1240. {
  1241. if (lskips) ctx->noteStatistic(StNumIndexSkips, lskips);
  1242. if (lnullSkips) ctx->noteStatistic(StNumIndexNullSkips, lnullSkips);
  1243. }
  1244. }
  1245. void KeyStatsCollector::reset()
  1246. {
  1247. seeks = 0;
  1248. scans = 0;
  1249. wildseeks = 0;
  1250. skips = 0;
  1251. nullskips = 0;
  1252. }
  1253. CKeyCursor::CKeyCursor(CKeyIndex &_key, const IIndexFilterList *_filter, bool _logExcessiveSeeks)
  1254. : key(OLINK(_key)), filter(_filter), logExcessiveSeeks(_logExcessiveSeeks)
  1255. {
  1256. nodeKey = 0;
  1257. keyBuffer = (char *) malloc(key.keySize()); // MORE - keyedSize would do eventually
  1258. }
  1259. CKeyCursor::CKeyCursor(const CKeyCursor &from)
  1260. : key(OLINK(from.key)), filter(from.filter)
  1261. {
  1262. nodeKey = from.nodeKey;
  1263. node.set(from.node);
  1264. unsigned keySize = key.keySize();
  1265. keyBuffer = (char *) malloc(keySize); // MORE - keyedSize would do eventually. And we may not even need all of that in the derived case
  1266. memcpy(keyBuffer, from.keyBuffer, keySize);
  1267. eof = from.eof;
  1268. matched = from.matched;
  1269. }
  1270. CKeyCursor::~CKeyCursor()
  1271. {
  1272. key.Release();
  1273. free(keyBuffer);
  1274. }
  1275. void CKeyCursor::reset()
  1276. {
  1277. node.clear();
  1278. matched = false;
  1279. eof = key.bloomFilterReject(*filter) || !filter->canMatch();
  1280. if (!eof)
  1281. setLow(0);
  1282. }
  1283. bool CKeyCursor::next(char *dst, KeyStatsCollector &stats)
  1284. {
  1285. if (!node)
  1286. {
  1287. node.setown(key.locateFirstNode(stats));
  1288. nodeKey = 0;
  1289. return node && node->getValueAt(nodeKey, dst);
  1290. }
  1291. else
  1292. {
  1293. key.keyScans++;
  1294. if (!node->getValueAt( ++nodeKey, dst))
  1295. {
  1296. offset_t rsib = node->getRightSib();
  1297. node.clear();
  1298. if (rsib != 0)
  1299. {
  1300. node.setown(key.getNode(rsib, stats.ctx));
  1301. if (node != NULL)
  1302. {
  1303. nodeKey = 0;
  1304. return node->getValueAt(0, dst);
  1305. }
  1306. }
  1307. return false;
  1308. }
  1309. else
  1310. return true;
  1311. }
  1312. }
  1313. const char *CKeyCursor::queryName() const
  1314. {
  1315. return key.queryFileName();
  1316. }
  1317. size32_t CKeyCursor::getKeyedSize() const
  1318. {
  1319. return key.keyedSize();
  1320. }
  1321. const byte *CKeyCursor::queryKeyBuffer() const
  1322. {
  1323. return (const byte *) keyBuffer;
  1324. }
  1325. size32_t CKeyCursor::getSize()
  1326. {
  1327. assertex(node);
  1328. return node->getSizeAt(nodeKey);
  1329. }
  1330. offset_t CKeyCursor::getFPos()
  1331. {
  1332. assertex(node);
  1333. return node->getFPosAt(nodeKey);
  1334. }
  1335. unsigned __int64 CKeyCursor::getSequence()
  1336. {
  1337. assertex(node);
  1338. return node->getSequence(nodeKey);
  1339. }
  1340. bool CKeyCursor::last(char *dst, KeyStatsCollector &stats)
  1341. {
  1342. node.setown(key.locateLastNode(stats));
  1343. nodeKey = node->getNumKeys()-1;
  1344. return node->getValueAt( nodeKey, dst );
  1345. }
  1346. bool CKeyCursor::gtEqual(const char *src, char *dst, KeyStatsCollector &stats)
  1347. {
  1348. key.keySeeks++;
  1349. unsigned lwm = 0;
  1350. if (node)
  1351. {
  1352. // When seeking forward, there are two cases worth optimizing:
  1353. // 1. the next record is actually the one we want
  1354. // 2. The record we want is on the current page
  1355. unsigned numKeys = node->getNumKeys();
  1356. if (nodeKey < numKeys-1)
  1357. {
  1358. int rc = node->compareValueAt(src, ++nodeKey);
  1359. if (rc <= 0)
  1360. {
  1361. node->getValueAt(nodeKey, dst);
  1362. return true;
  1363. }
  1364. if (nodeKey < numKeys-1)
  1365. {
  1366. rc = node->compareValueAt(src, numKeys-1);
  1367. if (rc <= 0)
  1368. lwm = nodeKey+1;
  1369. }
  1370. }
  1371. }
  1372. if (!lwm)
  1373. node.set(key.rootNode);
  1374. for (;;)
  1375. {
  1376. unsigned int a = lwm;
  1377. int b = node->getNumKeys();
  1378. // first search for first GTE entry (result in b(<),a(>=))
  1379. while ((int)a<b)
  1380. {
  1381. int i = a+(b-a)/2;
  1382. int rc = node->compareValueAt(src, i);
  1383. if (rc>0)
  1384. a = i+1;
  1385. else
  1386. b = i;
  1387. }
  1388. if (node->isLeaf())
  1389. {
  1390. if (a<node->getNumKeys())
  1391. nodeKey = a;
  1392. else
  1393. {
  1394. offset_t nextPos = node->nextNodeFpos(); // This can happen at eof because of key peculiarity where level above reports ffff as last
  1395. node.setown(key.getNode(nextPos, stats.ctx));
  1396. nodeKey = 0;
  1397. }
  1398. if (node)
  1399. {
  1400. node->getValueAt(nodeKey, dst);
  1401. return true;
  1402. }
  1403. else
  1404. return false;
  1405. }
  1406. else
  1407. {
  1408. if (a<node->getNumKeys())
  1409. {
  1410. offset_t npos = node->getFPosAt(a);
  1411. node.setown(key.getNode(npos, stats.ctx));
  1412. }
  1413. else
  1414. return false;
  1415. }
  1416. }
  1417. }
  1418. bool CKeyCursor::ltEqual(const char *src, KeyStatsCollector &stats)
  1419. {
  1420. key.keySeeks++;
  1421. matched = false;
  1422. unsigned lwm = 0;
  1423. if (node)
  1424. {
  1425. // When seeking forward, there are two cases worth optimizing:
  1426. // 1. next record is > src, so we return current
  1427. // 2. The record we want is on the current page
  1428. unsigned numKeys = node->getNumKeys();
  1429. if (nodeKey < numKeys-1)
  1430. {
  1431. int rc = node->compareValueAt(src, ++nodeKey);
  1432. if (rc < 0)
  1433. {
  1434. --nodeKey;
  1435. return true;
  1436. }
  1437. if (nodeKey < numKeys-1)
  1438. {
  1439. rc = node->compareValueAt(src, numKeys-1);
  1440. if (rc < 0)
  1441. lwm = nodeKey;
  1442. }
  1443. }
  1444. }
  1445. if (!lwm)
  1446. node.set(key.rootNode);
  1447. for (;;)
  1448. {
  1449. unsigned int a = lwm;
  1450. int b = node->getNumKeys();
  1451. // Locate first record greater than src
  1452. while ((int)a<b)
  1453. {
  1454. int i = a+(b+1-a)/2;
  1455. int rc = node->compareValueAt(src, i-1);
  1456. if (rc>=0)
  1457. a = i;
  1458. else
  1459. b = i-1;
  1460. }
  1461. if (node->isLeaf())
  1462. {
  1463. // record we want is the one before first record greater than src.
  1464. if (a>0)
  1465. nodeKey = a-1;
  1466. else
  1467. {
  1468. offset_t prevPos = node->prevNodeFpos();
  1469. node.setown(key.getNode(prevPos, stats.ctx));
  1470. if (node)
  1471. nodeKey = node->getNumKeys()-1;
  1472. }
  1473. if (node)
  1474. {
  1475. return true;
  1476. }
  1477. else
  1478. return false;
  1479. }
  1480. else
  1481. {
  1482. // Node to look in is the first one one that ended greater than src.
  1483. if (a==node->getNumKeys())
  1484. a--; // value being looked for is off the end of the index.
  1485. offset_t npos = node->getFPosAt(a);
  1486. node.setown(key.getNode(npos, stats.ctx));
  1487. if (!node)
  1488. throw MakeStringException(0, "Invalid key %s: child node pointer should never be NULL", key.name.get());
  1489. }
  1490. }
  1491. }
  1492. void CKeyCursor::serializeCursorPos(MemoryBuffer &mb)
  1493. {
  1494. mb.append(eof);
  1495. if (!eof)
  1496. {
  1497. mb.append(matched);
  1498. if (node)
  1499. {
  1500. mb.append(node->getFpos());
  1501. mb.append(nodeKey);
  1502. }
  1503. else
  1504. {
  1505. offset_t zero = 0;
  1506. unsigned zero2 = 0;
  1507. mb.append(zero);
  1508. mb.append(zero2);
  1509. }
  1510. }
  1511. }
  1512. void CKeyCursor::deserializeCursorPos(MemoryBuffer &mb, KeyStatsCollector &stats)
  1513. {
  1514. mb.read(eof);
  1515. node.clear();
  1516. if (!eof)
  1517. {
  1518. mb.read(matched);
  1519. offset_t nodeAddress;
  1520. mb.read(nodeAddress);
  1521. mb.read(nodeKey);
  1522. if (nodeAddress)
  1523. {
  1524. node.setown(key.getNode(nodeAddress, stats.ctx));
  1525. if (node && keyBuffer)
  1526. node->getValueAt(nodeKey, keyBuffer);
  1527. }
  1528. }
  1529. }
  1530. const byte *CKeyCursor::loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  1531. {
  1532. return key.loadBlob(blobid, blobsize);
  1533. }
  1534. bool CKeyCursor::lookup(bool exact, KeyStatsCollector &stats)
  1535. {
  1536. return _lookup(exact, filter->lastRealSeg(), stats);
  1537. }
  1538. bool CKeyCursor::_lookup(bool exact, unsigned lastSeg, KeyStatsCollector &stats)
  1539. {
  1540. bool ret = false;
  1541. unsigned lwildseeks = 0;
  1542. unsigned lseeks = 0;
  1543. unsigned lscans = 0;
  1544. while (!eof)
  1545. {
  1546. if (matched)
  1547. {
  1548. if (!next(keyBuffer, stats))
  1549. eof = true;
  1550. lscans++;
  1551. }
  1552. else
  1553. {
  1554. if (!gtEqual(keyBuffer, keyBuffer, stats))
  1555. eof = true;
  1556. lseeks++;
  1557. }
  1558. if (!eof)
  1559. {
  1560. unsigned i = 0;
  1561. matched = filter->matchesBuffer(keyBuffer, lastSeg, i);
  1562. if (matched)
  1563. {
  1564. ret = true;
  1565. break;
  1566. }
  1567. #ifdef __linux__
  1568. if (linuxYield)
  1569. sched_yield();
  1570. #endif
  1571. eof = !filter->incrementKey(i, keyBuffer);
  1572. if (!exact)
  1573. {
  1574. ret = true;
  1575. break;
  1576. }
  1577. lwildseeks++;
  1578. }
  1579. else
  1580. eof = true;
  1581. }
  1582. if (logExcessiveSeeks && lwildseeks > 1000 && ret)
  1583. reportExcessiveSeeks(lwildseeks, lastSeg, getSize(), stats);
  1584. stats.noteSeeks(lseeks, lscans, lwildseeks);
  1585. return ret;
  1586. }
  1587. bool CKeyCursor::lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen, KeyStatsCollector &stats)
  1588. {
  1589. if (skipTo(seek, seekOffset, seeklen))
  1590. stats.noteSkips(1, 0);
  1591. else
  1592. stats.noteSkips(0, 1);
  1593. bool ret = lookup(true, stats);
  1594. #ifdef _DEBUG
  1595. if (traceSmartStepping)
  1596. {
  1597. StringBuffer recstr;
  1598. unsigned i;
  1599. for (i = 0; i < key.keySize(); i++)
  1600. {
  1601. unsigned char c = ((unsigned char *) keyBuffer)[i];
  1602. recstr.appendf("%c", isprint(c) ? c : '.');
  1603. }
  1604. recstr.append (" ");
  1605. for (i = 0; i < key.keySize(); i++)
  1606. {
  1607. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  1608. }
  1609. DBGLOG("SKIP: Got skips=%02d seeks=%02d scans=%02d : %s", stats.skips, stats.seeks, stats.scans, recstr.str());
  1610. }
  1611. #endif
  1612. return ret;
  1613. }
  1614. unsigned __int64 CKeyCursor::getCount(KeyStatsCollector &stats)
  1615. {
  1616. reset();
  1617. unsigned __int64 result = 0;
  1618. unsigned lseeks = 0;
  1619. unsigned lastRealSeg = filter->lastRealSeg();
  1620. for (;;)
  1621. {
  1622. if (_lookup(true, lastRealSeg, stats))
  1623. {
  1624. unsigned __int64 locount = getSequence();
  1625. endRange(lastRealSeg);
  1626. ltEqual(keyBuffer, stats);
  1627. lseeks++;
  1628. result += getSequence()-locount+1;
  1629. if (!incrementKey(lastRealSeg))
  1630. break;
  1631. }
  1632. else
  1633. break;
  1634. }
  1635. stats.noteSeeks(lseeks, 0, 0);
  1636. return result;
  1637. }
  1638. unsigned __int64 CKeyCursor::checkCount(unsigned __int64 max, KeyStatsCollector &stats)
  1639. {
  1640. reset();
  1641. unsigned __int64 result = 0;
  1642. unsigned lseeks = 0;
  1643. unsigned lastFullSeg = filter->lastFullSeg();
  1644. if (lastFullSeg == (unsigned) -1)
  1645. {
  1646. stats.noteSeeks(1, 0, 0);
  1647. if (last(nullptr, stats))
  1648. return getSequence()+1;
  1649. else
  1650. return 0;
  1651. }
  1652. for (;;)
  1653. {
  1654. if (_lookup(true, lastFullSeg, stats))
  1655. {
  1656. unsigned __int64 locount = getSequence();
  1657. endRange(lastFullSeg);
  1658. ltEqual(keyBuffer, stats);
  1659. lseeks++;
  1660. result += getSequence()-locount+1;
  1661. if (max && (result > max))
  1662. break;
  1663. if (!incrementKey(lastFullSeg))
  1664. break;
  1665. }
  1666. else
  1667. break;
  1668. }
  1669. stats.noteSeeks(lseeks, 0, 0);
  1670. return result;
  1671. }
  1672. unsigned __int64 CKeyCursor::getCurrentRangeCount(unsigned groupSegCount, KeyStatsCollector &stats)
  1673. {
  1674. unsigned __int64 locount = getSequence();
  1675. endRange(groupSegCount);
  1676. ltEqual(keyBuffer, stats);
  1677. stats.noteSeeks(1, 0, 0);
  1678. return getSequence()-locount+1;
  1679. }
  1680. bool CKeyCursor::nextRange(unsigned groupSegCount)
  1681. {
  1682. matched = false;
  1683. if (!incrementKey(groupSegCount-1))
  1684. return false;
  1685. return true;
  1686. }
  1687. void CKeyCursor::reportExcessiveSeeks(unsigned numSeeks, unsigned lastSeg, size32_t recSize, KeyStatsCollector &stats)
  1688. {
  1689. StringBuffer recstr;
  1690. unsigned i;
  1691. bool printHex = false;
  1692. for (i = 0; i < recSize; i++)
  1693. {
  1694. unsigned char c = ((unsigned char *) keyBuffer)[i];
  1695. if (isprint(c))
  1696. recstr.append(c);
  1697. else
  1698. {
  1699. recstr.append('.');
  1700. printHex = true;
  1701. }
  1702. }
  1703. if (printHex)
  1704. {
  1705. recstr.append ("\n");
  1706. for (i = 0; i < recSize; i++)
  1707. {
  1708. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  1709. }
  1710. }
  1711. recstr.append ("\nusing filter:\n");
  1712. filter->describe(recstr);
  1713. if (stats.ctx)
  1714. stats.ctx->CTXLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), key.queryFileName());
  1715. else
  1716. DBGLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), key.queryFileName());
  1717. }
  1718. bool CKeyCursor::skipTo(const void *_seek, size32_t seekOffset, size32_t seeklen)
  1719. {
  1720. // Modify the current key contents buffer as follows
  1721. // Take bytes up to seekoffset from current buffer (i.e. leave them alone)
  1722. // Take up to seeklen bytes from seek comparing them as I go. If I see a lower one before I see a higher one, stop.
  1723. // If I didn't see any higher ones, return (at which point the skipto was a no-op
  1724. // If I saw higher ones, call setLowAfter for all remaining segmonitors
  1725. // If the current contents of buffer could not match, call incremementKey at the appropriate monitor so that it can
  1726. // Clear the matched flag
  1727. const byte *seek = (const byte *) _seek;
  1728. while (seeklen)
  1729. {
  1730. int c = *seek - (byte) (keyBuffer[seekOffset]);
  1731. if (c < 0)
  1732. return false;
  1733. else if (c>0)
  1734. {
  1735. memcpy(keyBuffer+seekOffset, seek, seeklen);
  1736. break;
  1737. }
  1738. seek++;
  1739. seekOffset++;
  1740. seeklen--;
  1741. }
  1742. if (!seeklen) return false;
  1743. unsigned j = setLowAfter(seekOffset + seeklen);
  1744. bool canmatch = filter->matchesBuffer(keyBuffer, filter->lastRealSeg(), j);
  1745. if (!canmatch)
  1746. eof = !incrementKey(j);
  1747. matched = false;
  1748. return true;
  1749. }
  1750. IKeyCursor * CKeyCursor::fixSortSegs(unsigned sortFieldOffset)
  1751. {
  1752. return new CPartialKeyCursor(*this, sortFieldOffset);
  1753. }
  1754. CPartialKeyCursor::CPartialKeyCursor(const CKeyCursor &from, unsigned sortFieldOffset)
  1755. : CKeyCursor(from)
  1756. {
  1757. filter = filter->fixSortSegs(keyBuffer, sortFieldOffset);
  1758. }
  1759. CPartialKeyCursor::~CPartialKeyCursor()
  1760. {
  1761. ::Release(filter);
  1762. }
  1763. //-------------------------------------------------------
  1764. IndexRowFilter::IndexRowFilter(const RtlRecord &_recInfo) : recInfo(_recInfo)
  1765. {
  1766. keySegCount = recInfo.getNumKeyedFields();
  1767. lastReal = 0;
  1768. lastFull = 0;
  1769. keyedSize = 0;
  1770. }
  1771. IndexRowFilter::IndexRowFilter(const IndexRowFilter &from, const char *fixedVals, unsigned sortFieldOffset)
  1772. : recInfo(from.recInfo), keySegCount(from.keySegCount)
  1773. {
  1774. lastReal = 0;
  1775. lastFull = 0;
  1776. keyedSize = 0;
  1777. ForEachItemIn(idx, from.filters)
  1778. {
  1779. auto &filter = from.filters.item(idx);
  1780. unsigned field = filter.queryFieldIndex();
  1781. unsigned offset = recInfo.getFixedOffset(field);
  1782. if (offset < sortFieldOffset)
  1783. append(FFkeyed, createFieldFilter(field, *recInfo.queryType(field), fixedVals+offset));
  1784. else
  1785. append(FFkeyed, LINK(&filter)); // MORE - FFopt vs FFkeyed is dodgy
  1786. }
  1787. }
  1788. void IndexRowFilter::append(IKeySegmentMonitor *segment)
  1789. {
  1790. throwUnexpected();
  1791. }
  1792. const IIndexFilter *IndexRowFilter::item(unsigned idx) const
  1793. {
  1794. return &queryFilter(idx);
  1795. }
  1796. void IndexRowFilter::append(FFoption option, const IFieldFilter * filter)
  1797. {
  1798. assertex(filter->queryType().isFixedSize());
  1799. unsigned idx = filter->queryFieldIndex();
  1800. while (idx > numFilterFields())
  1801. {
  1802. append(FFkeyed, createWildFieldFilter(numFilterFields(), *recInfo.queryType(numFilterFields())));
  1803. }
  1804. assertex(idx == numFilterFields());
  1805. if (!filter->isWild())
  1806. {
  1807. lastReal = idx;
  1808. if (option != FFopt || lastFull == idx-1)
  1809. lastFull = idx;
  1810. }
  1811. keyedSize += filter->queryType().getMinSize();
  1812. addFilter(*filter);
  1813. }
  1814. void IndexRowFilter::setLow(unsigned field, void *keyBuffer) const
  1815. {
  1816. unsigned lim = numFilterFields();
  1817. while (field < lim)
  1818. {
  1819. unsigned offset = recInfo.getFixedOffset(field);
  1820. const IFieldFilter &filter = queryFilter(field);
  1821. filter.setLow(keyBuffer, offset);
  1822. field++;
  1823. }
  1824. }
  1825. unsigned IndexRowFilter::setLowAfter(size32_t offset, void *keyBuffer) const
  1826. {
  1827. unsigned lim = filters.length();
  1828. unsigned field = 0;
  1829. unsigned skipped = 0;
  1830. unsigned fieldOffset = recInfo.getFixedOffset(field);
  1831. while (field < lim)
  1832. {
  1833. unsigned nextOffset = recInfo.getFixedOffset(field+1);
  1834. if (fieldOffset >= offset)
  1835. filters.item(field).setLow(keyBuffer, fieldOffset);
  1836. else if (nextOffset <= offset)
  1837. skipped++;
  1838. else
  1839. {
  1840. byte *temp = (byte *) alloca(nextOffset - fieldOffset);
  1841. filters.item(field).setLow(temp, 0);
  1842. memcpy((byte *)keyBuffer+offset, temp, nextOffset - offset);
  1843. }
  1844. field++;
  1845. fieldOffset = nextOffset;
  1846. }
  1847. return skipped;
  1848. }
  1849. bool IndexRowFilter::incrementKey(unsigned segno, void *keyBuffer) const
  1850. {
  1851. // Increment the key buffer to next acceptable value
  1852. for(;;)
  1853. {
  1854. if (queryFilter(segno).incrementKey(keyBuffer, recInfo.getFixedOffset(segno)))
  1855. {
  1856. setLow(segno+1, keyBuffer);
  1857. return true;
  1858. }
  1859. if (!segno)
  1860. return false;
  1861. segno--;
  1862. }
  1863. }
  1864. void IndexRowFilter::endRange(unsigned field, void *keyBuffer) const
  1865. {
  1866. unsigned lim = numFilterFields();
  1867. if (field < lim)
  1868. {
  1869. queryFilter(field).endRange(keyBuffer, recInfo.getFixedOffset(field));
  1870. field++;
  1871. }
  1872. while (field < lim)
  1873. {
  1874. queryFilter(field).setHigh(keyBuffer, recInfo.getFixedOffset(field));
  1875. field++;
  1876. }
  1877. }
  1878. unsigned IndexRowFilter::lastRealSeg() const
  1879. {
  1880. return lastReal;
  1881. }
  1882. unsigned IndexRowFilter::lastFullSeg() const
  1883. {
  1884. return lastFull;
  1885. }
  1886. unsigned IndexRowFilter::numFilterFields() const
  1887. {
  1888. return RowFilter::numFilterFields();
  1889. }
  1890. IIndexFilterList *IndexRowFilter::fixSortSegs(const char *fixedVals, unsigned sortFieldOffset) const
  1891. {
  1892. return new IndexRowFilter(*this, fixedVals, sortFieldOffset);
  1893. }
  1894. void IndexRowFilter::reset()
  1895. {
  1896. RowFilter::clear();
  1897. lastReal = 0;
  1898. lastFull = 0;
  1899. keyedSize = 0;
  1900. }
  1901. void IndexRowFilter::checkSize(size32_t _keyedSize, char const * keyname) const
  1902. {
  1903. if (_keyedSize != keyedSize)
  1904. {
  1905. StringBuffer err;
  1906. err.appendf("Key size mismatch on key %s - key size is %u, expected %u", keyname, _keyedSize, keyedSize);
  1907. }
  1908. }
  1909. void IndexRowFilter::recalculateCache()
  1910. {
  1911. // Nothing to do. This probably should be moved to be local to SegMonitorList
  1912. }
  1913. void IndexRowFilter::finish(size32_t _keyedSize)
  1914. {
  1915. while (numFilterFields() < keySegCount)
  1916. {
  1917. unsigned idx = numFilterFields();
  1918. append(FFkeyed, createWildFieldFilter(idx, *recInfo.queryType(idx)));
  1919. }
  1920. assertex(numFilterFields() == keySegCount);
  1921. }
  1922. void IndexRowFilter::describe(StringBuffer &out) const
  1923. {
  1924. for (unsigned idx=0; idx <= lastRealSeg() && idx < numFilterFields(); idx++)
  1925. {
  1926. auto &filter = queryFilter(idx);
  1927. if (idx)
  1928. out.append(',');
  1929. out.appendf("%s=", recInfo.queryName(idx));
  1930. filter.describe(out);
  1931. }
  1932. }
  1933. bool IndexRowFilter::matchesBuffer(const void *buffer, unsigned lastSeg, unsigned &matchSeg) const
  1934. {
  1935. if (numFilterFields())
  1936. {
  1937. RtlFixedRow rowInfo(recInfo, buffer, numFilterFields());
  1938. for (; matchSeg <= lastSeg; matchSeg++)
  1939. {
  1940. if (!queryFilter(matchSeg).matches(rowInfo))
  1941. return false;
  1942. }
  1943. }
  1944. return true;
  1945. }
  1946. bool IndexRowFilter::canMatch() const
  1947. {
  1948. ForEachItemIn(idx, filters)
  1949. {
  1950. if (filters.item(idx).isEmpty())
  1951. return false;
  1952. }
  1953. return true;
  1954. }
  1955. //-------------------------------------------------------
  1956. class CLazyKeyIndex : implements IKeyIndex, public CInterface
  1957. {
  1958. StringAttr keyfile;
  1959. unsigned crc;
  1960. Linked<IDelayedFile> delayedFile;
  1961. mutable Owned<IFileIO> iFileIO;
  1962. mutable Owned<IKeyIndex> realKey;
  1963. mutable CriticalSection c;
  1964. bool isTLK;
  1965. bool preloadAllowed;
  1966. inline IKeyIndex &checkOpen() const
  1967. {
  1968. CriticalBlock b(c);
  1969. if (!realKey)
  1970. {
  1971. Owned<IMemoryMappedFile> mapped = useMemoryMappedIndexes ? delayedFile->getMappedFile() : nullptr;
  1972. if (mapped)
  1973. realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
  1974. else
  1975. {
  1976. iFileIO.setown(delayedFile->getFileIO());
  1977. realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
  1978. }
  1979. if (!realKey)
  1980. {
  1981. DBGLOG("Lazy key file %s could not be opened", keyfile.get());
  1982. throw MakeStringException(0, "Lazy key file %s could not be opened", keyfile.get());
  1983. }
  1984. }
  1985. return *realKey;
  1986. }
  1987. public:
  1988. IMPLEMENT_IINTERFACE;
  1989. CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
  1990. : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
  1991. {}
  1992. virtual bool IsShared() const { return CInterface::IsShared(); }
  1993. virtual IKeyCursor *getCursor(const IIndexFilterList *filter, bool logExcessiveSeeks) override { return checkOpen().getCursor(filter, logExcessiveSeeks); }
  1994. virtual size32_t keySize() { return checkOpen().keySize(); }
  1995. virtual size32_t keyedSize() { return checkOpen().keyedSize(); }
  1996. virtual bool hasPayload() { return checkOpen().hasPayload(); }
  1997. virtual bool isTopLevelKey() override { return checkOpen().isTopLevelKey(); }
  1998. virtual bool isFullySorted() override { return checkOpen().isFullySorted(); }
  1999. virtual __uint64 getPartitionFieldMask() { return checkOpen().getPartitionFieldMask(); }
  2000. virtual unsigned numPartitions() { return checkOpen().numPartitions(); }
  2001. virtual unsigned getFlags() { return checkOpen().getFlags(); }
  2002. virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) { checkOpen().dumpNode(out, pos, count, isRaw); }
  2003. virtual unsigned numParts() { return 1; }
  2004. virtual IKeyIndex *queryPart(unsigned idx) { return idx ? NULL : this; }
  2005. virtual unsigned queryScans() { return realKey ? realKey->queryScans() : 0; }
  2006. virtual unsigned querySeeks() { return realKey ? realKey->querySeeks() : 0; }
  2007. virtual const char *queryFileName() { return keyfile.get(); }
  2008. virtual offset_t queryBlobHead() { return checkOpen().queryBlobHead(); }
  2009. virtual void resetCounts() { if (realKey) realKey->resetCounts(); }
  2010. virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
  2011. virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
  2012. virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
  2013. virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
  2014. virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
  2015. virtual bool hasSpecialFileposition() const { return checkOpen().hasSpecialFileposition(); }
  2016. virtual bool needsRowBuffer() const { return checkOpen().needsRowBuffer(); }
  2017. };
  2018. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
  2019. {
  2020. return queryKeyStore()->load(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  2021. }
  2022. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, bool isTLK, bool preloadAllowed)
  2023. {
  2024. return queryKeyStore()->load(keyfile, crc, isTLK, preloadAllowed);
  2025. }
  2026. extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed)
  2027. {
  2028. StringBuffer filePath;
  2029. const RemoteFilename &rfn = part.queryCopies().item(0);
  2030. rfn.getPath(filePath);
  2031. return queryKeyStore()->load(filePath.str(), crc, part, isTLK, preloadAllowed);
  2032. }
  2033. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, bool isTLK, bool preloadAllowed)
  2034. {
  2035. return new CLazyKeyIndex(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  2036. }
  2037. extern jhtree_decl void clearKeyStoreCache(bool killAll)
  2038. {
  2039. queryKeyStore()->clearCache(killAll);
  2040. }
  2041. extern jhtree_decl void clearKeyStoreCacheEntry(const char *name)
  2042. {
  2043. queryKeyStore()->clearCacheEntry(name);
  2044. }
  2045. extern jhtree_decl void clearKeyStoreCacheEntry(const IFileIO *io)
  2046. {
  2047. queryKeyStore()->clearCacheEntry(io);
  2048. }
  2049. extern jhtree_decl StringBuffer &getIndexMetrics(StringBuffer &ret)
  2050. {
  2051. return queryKeyStore()->getMetrics(ret);
  2052. }
  2053. extern jhtree_decl void resetIndexMetrics()
  2054. {
  2055. queryKeyStore()->resetMetrics();
  2056. }
  2057. extern jhtree_decl bool setNodeCachePreload(bool preload)
  2058. {
  2059. return queryNodeCache()->setNodeCachePreload(preload);
  2060. }
  2061. extern jhtree_decl size32_t setNodeCacheMem(size32_t cacheSize)
  2062. {
  2063. return queryNodeCache()->setNodeCacheMem(cacheSize);
  2064. }
  2065. extern jhtree_decl size32_t setLeafCacheMem(size32_t cacheSize)
  2066. {
  2067. return queryNodeCache()->setLeafCacheMem(cacheSize);
  2068. }
  2069. extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize)
  2070. {
  2071. return queryNodeCache()->setBlobCacheMem(cacheSize);
  2072. }
  2073. ///////////////////////////////////////////////////////////////////////////////
  2074. // CNodeCache impl.
  2075. ///////////////////////////////////////////////////////////////////////////////
  2076. CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IContextLogger *ctx, bool isTLK)
  2077. {
  2078. // MORE - could probably be improved - I think having the cache template separate is not helping us here
  2079. // Also one cache per key would surely be faster, and could still use a global total
  2080. if (!pos)
  2081. return NULL;
  2082. {
  2083. // It's a shame that we don't know the type before we read it. But probably not that big a deal
  2084. CriticalBlock block(lock);
  2085. CKeyIdAndPos key(iD, pos);
  2086. if (preloadNodes)
  2087. {
  2088. CJHTreeNode *cacheNode = preloadCache.query(key);
  2089. if (cacheNode)
  2090. {
  2091. cacheHits++;
  2092. if (ctx) ctx->noteStatistic(StNumPreloadCacheHits, 1);
  2093. preloadCacheHits++;
  2094. return LINK(cacheNode);
  2095. }
  2096. }
  2097. if (cacheNodes)
  2098. {
  2099. CJHTreeNode *cacheNode = nodeCache.query(key);
  2100. if (cacheNode)
  2101. {
  2102. cacheHits++;
  2103. if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
  2104. nodeCacheHits++;
  2105. return LINK(cacheNode);
  2106. }
  2107. }
  2108. if (cacheLeaves)
  2109. {
  2110. CJHTreeNode *cacheNode = leafCache.query(key);
  2111. if (cacheNode)
  2112. {
  2113. cacheHits++;
  2114. if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
  2115. leafCacheHits++;
  2116. return LINK(cacheNode);
  2117. }
  2118. }
  2119. if (cacheBlobs)
  2120. {
  2121. CJHTreeNode *cacheNode = blobCache.query(key);
  2122. if (cacheNode)
  2123. {
  2124. cacheHits++;
  2125. if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
  2126. blobCacheHits++;
  2127. return LINK(cacheNode);
  2128. }
  2129. }
  2130. CJHTreeNode *node;
  2131. {
  2132. CriticalUnblock block(lock);
  2133. node = keyIndex->loadNode(pos); // NOTE - don't want cache locked while we load!
  2134. }
  2135. cacheAdds++;
  2136. if (node->isBlob())
  2137. {
  2138. if (cacheBlobs)
  2139. {
  2140. CJHTreeNode *cacheNode = blobCache.query(key); // check if added to cache while we were reading
  2141. if (cacheNode)
  2142. {
  2143. ::Release(node);
  2144. cacheHits++;
  2145. if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
  2146. blobCacheHits++;
  2147. return LINK(cacheNode);
  2148. }
  2149. if (ctx) ctx->noteStatistic(StNumBlobCacheAdds, 1);
  2150. blobCacheAdds++;
  2151. blobCache.add(key, *LINK(node));
  2152. }
  2153. }
  2154. else if (node->isLeaf() && !isTLK) // leaves in TLK are cached as if they were nodes
  2155. {
  2156. if (cacheLeaves)
  2157. {
  2158. CJHTreeNode *cacheNode = leafCache.query(key); // check if added to cache while we were reading
  2159. if (cacheNode)
  2160. {
  2161. ::Release(node);
  2162. cacheHits++;
  2163. if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
  2164. leafCacheHits++;
  2165. return LINK(cacheNode);
  2166. }
  2167. if (ctx) ctx->noteStatistic(StNumLeafCacheAdds, 1);
  2168. leafCacheAdds++;
  2169. leafCache.add(key, *LINK(node));
  2170. }
  2171. }
  2172. else
  2173. {
  2174. if (cacheNodes)
  2175. {
  2176. CJHTreeNode *cacheNode = nodeCache.query(key); // check if added to cache while we were reading
  2177. if (cacheNode)
  2178. {
  2179. ::Release(node);
  2180. cacheHits++;
  2181. if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
  2182. nodeCacheHits++;
  2183. return LINK(cacheNode);
  2184. }
  2185. if (ctx) ctx->noteStatistic(StNumNodeCacheAdds, 1);
  2186. nodeCacheAdds++;
  2187. nodeCache.add(key, *LINK(node));
  2188. }
  2189. }
  2190. return node;
  2191. }
  2192. }
  2193. void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger *ctx)
  2194. {
  2195. assertex(pos);
  2196. assertex(preloadNodes);
  2197. CriticalBlock block(lock);
  2198. CKeyIdAndPos key(iD, pos);
  2199. CJHTreeNode *cacheNode = preloadCache.query(key);
  2200. if (!cacheNode)
  2201. {
  2202. cacheAdds++;
  2203. if (ctx) ctx->noteStatistic(StNumPreloadCacheAdds, 1);
  2204. preloadCacheAdds++;
  2205. preloadCache.add(key, *LINK(node));
  2206. }
  2207. }
  2208. bool CNodeCache::isPreloaded(int iD, offset_t pos)
  2209. {
  2210. CriticalBlock block(lock);
  2211. CKeyIdAndPos key(iD, pos);
  2212. return NULL != preloadCache.query(key);
  2213. }
  2214. RelaxedAtomic<unsigned> cacheAdds;
  2215. RelaxedAtomic<unsigned> cacheHits;
  2216. RelaxedAtomic<unsigned> nodesLoaded;
  2217. RelaxedAtomic<unsigned> blobCacheHits;
  2218. RelaxedAtomic<unsigned> blobCacheAdds;
  2219. RelaxedAtomic<unsigned> leafCacheHits;
  2220. RelaxedAtomic<unsigned> leafCacheAdds;
  2221. RelaxedAtomic<unsigned> nodeCacheHits;
  2222. RelaxedAtomic<unsigned> nodeCacheAdds;
  2223. RelaxedAtomic<unsigned> preloadCacheHits;
  2224. RelaxedAtomic<unsigned> preloadCacheAdds;
  2225. void clearNodeStats()
  2226. {
  2227. cacheAdds.store(0);
  2228. cacheHits.store(0);
  2229. nodesLoaded.store(0);
  2230. blobCacheHits.store(0);
  2231. blobCacheAdds.store(0);
  2232. leafCacheHits.store(0);
  2233. leafCacheAdds.store(0);
  2234. nodeCacheHits.store(0);
  2235. nodeCacheAdds.store(0);
  2236. preloadCacheHits.store(0);
  2237. preloadCacheAdds.store(0);
  2238. }
  2239. //------------------------------------------------------------------------------------------------
  2240. class CKeyMerger : public CKeyLevelManager
  2241. {
  2242. unsigned *mergeheap;
  2243. unsigned numkeys;
  2244. unsigned activekeys;
  2245. IArrayOf<IKeyCursor> cursorArray;
  2246. UnsignedArray mergeHeapArray;
  2247. UnsignedArray keyNoArray;
  2248. IKeyCursor **cursors;
  2249. unsigned sortFieldOffset;
  2250. unsigned sortFromSeg;
  2251. bool resetPending;
  2252. inline int BuffCompare(unsigned a, unsigned b)
  2253. {
  2254. const byte *c1 = cursors[mergeheap[a]]->queryKeyBuffer();
  2255. const byte *c2 = cursors[mergeheap[b]]->queryKeyBuffer();
  2256. //Only compare the keyed portion, and if equal tie-break on lower input numbers having priority
  2257. //In the future this should use the comparison functions from the type info
  2258. int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, keyedSize-sortFieldOffset);
  2259. if (!ret)
  2260. {
  2261. if (sortFieldOffset)
  2262. ret = memcmp(c1, c2, sortFieldOffset);
  2263. //If they are equal, earlier inputs have priority
  2264. if (!ret)
  2265. ret = a - b;
  2266. }
  2267. return ret;
  2268. }
  2269. Linked<IKeyIndexBase> keyset;
  2270. void calculateSortSeg()
  2271. {
  2272. // Make sure that sortFromSeg is properly set
  2273. sortFromSeg = (unsigned) -1;
  2274. unsigned numFilters = filter->numFilterFields();
  2275. for (unsigned idx = 0; idx < numFilters; idx++)
  2276. {
  2277. unsigned offset = filter->getFieldOffset(idx);
  2278. if (offset == sortFieldOffset)
  2279. {
  2280. sortFromSeg = idx;
  2281. break;
  2282. }
  2283. }
  2284. if (sortFromSeg == -1)
  2285. assertex(!"Attempting to sort from offset that is not on a segment boundary");
  2286. assertex(resetPending == true);
  2287. }
  2288. public:
  2289. CKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet *_keyset, unsigned _sortFieldOffset, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
  2290. : CKeyLevelManager(_recInfo, NULL, _ctx, _newFilters, _logExcessiveSeeks), sortFieldOffset(_sortFieldOffset)
  2291. {
  2292. init();
  2293. setKey(_keyset);
  2294. }
  2295. CKeyMerger(const RtlRecord &_recInfo, IKeyIndex *_onekey, unsigned _sortFieldOffset, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
  2296. : CKeyLevelManager(_recInfo, NULL, _ctx, _newFilters, _logExcessiveSeeks), sortFieldOffset(_sortFieldOffset)
  2297. {
  2298. init();
  2299. setKey(_onekey);
  2300. }
  2301. ~CKeyMerger()
  2302. {
  2303. killBuffers();
  2304. }
  2305. void killBuffers()
  2306. {
  2307. cursorArray.kill();
  2308. keyCursor = NULL; // cursorArray owns cursors
  2309. mergeHeapArray.kill();
  2310. keyNoArray.kill();
  2311. cursors = NULL;
  2312. mergeheap = NULL;
  2313. }
  2314. void init()
  2315. {
  2316. numkeys = 0;
  2317. activekeys = 0;
  2318. resetPending = true;
  2319. sortFromSeg = 0;
  2320. }
  2321. virtual unsigned getPartition() override
  2322. {
  2323. return 0; // If all keys share partition info (is that required?) then we can do better
  2324. }
  2325. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  2326. {
  2327. // Rather like a lookup, except that no records below the value indicated by seek* should be returned.
  2328. if (resetPending)
  2329. {
  2330. resetSort(seek, seekOffset, seeklen);
  2331. if (!activekeys)
  2332. return false;
  2333. #ifdef _DEBUG
  2334. if (traceSmartStepping)
  2335. DBGLOG("SKIP: init key = %d", mergeheap[0]);
  2336. #endif
  2337. return true;
  2338. }
  2339. else
  2340. {
  2341. if (!activekeys)
  2342. {
  2343. #ifdef _DEBUG
  2344. if (traceSmartStepping)
  2345. DBGLOG("SKIP: merge done");
  2346. #endif
  2347. return false;
  2348. }
  2349. unsigned key = mergeheap[0];
  2350. #ifdef _DEBUG
  2351. if (traceSmartStepping)
  2352. DBGLOG("SKIP: merging key = %d", key);
  2353. #endif
  2354. unsigned compares = 0;
  2355. for (;;)
  2356. {
  2357. if (!CKeyLevelManager::lookupSkip(seek, seekOffset, seeklen) )
  2358. {
  2359. activekeys--;
  2360. if (!activekeys)
  2361. {
  2362. if (stats.ctx)
  2363. stats.ctx->noteStatistic(StNumIndexMergeCompares, compares);
  2364. return false;
  2365. }
  2366. mergeheap[0] = mergeheap[activekeys];
  2367. }
  2368. /* The key associated with mergeheap[0] will have changed
  2369. This code restores the heap property
  2370. */
  2371. unsigned p = 0; /* parent */
  2372. while (1)
  2373. {
  2374. unsigned c = p*2 + 1; /* child */
  2375. if ( c >= activekeys )
  2376. break;
  2377. /* Select smaller child */
  2378. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2379. /* If child is greater or equal than parent then we are done */
  2380. if ( BuffCompare( c, p ) >= 0 )
  2381. break;
  2382. /* Swap parent and child */
  2383. int r = mergeheap[c];
  2384. mergeheap[c] = mergeheap[p];
  2385. mergeheap[p] = r;
  2386. /* child becomes parent */
  2387. p = c;
  2388. }
  2389. if (key != mergeheap[0])
  2390. {
  2391. key = mergeheap[0];
  2392. keyCursor = cursors[key];
  2393. }
  2394. const byte *keyBuffer = keyCursor->queryKeyBuffer();
  2395. if (memcmp(seek, keyBuffer+seekOffset, seeklen) <= 0)
  2396. {
  2397. #ifdef _DEBUG
  2398. if (traceSmartStepping)
  2399. {
  2400. unsigned keySize = keyCursor->getSize(); // MORE - is this the current row size?
  2401. DBGLOG("SKIP: merged key = %d", key);
  2402. StringBuffer recstr;
  2403. unsigned i;
  2404. for (i = 0; i < keySize; i++)
  2405. {
  2406. unsigned char c = ((unsigned char *) keyBuffer)[i];
  2407. recstr.appendf("%c", isprint(c) ? c : '.');
  2408. }
  2409. recstr.append (" ");
  2410. for (i = 0; i < keySize; i++)
  2411. {
  2412. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  2413. }
  2414. DBGLOG("SKIP: Out skips=%02d seeks=%02d scans=%02d : %s", stats.skips, stats.seeks, stats.scans, recstr.str());
  2415. }
  2416. #endif
  2417. if (stats.ctx)
  2418. stats.ctx->noteStatistic(StNumIndexMergeCompares, compares);
  2419. return true;
  2420. }
  2421. else
  2422. {
  2423. compares++;
  2424. if (stats.ctx && (compares == 100))
  2425. {
  2426. stats.ctx->noteStatistic(StNumIndexMergeCompares, compares); // also checks for abort...
  2427. compares = 0;
  2428. }
  2429. }
  2430. }
  2431. }
  2432. }
  2433. virtual void setLayoutTranslator(const IDynamicTransform * trans) override
  2434. {
  2435. if (trans && trans->keyedTranslated())
  2436. throw MakeStringException(0, "Layout translation not supported when merging key parts, as it may change sort order");
  2437. // It MIGHT be possible to support translation still if all keyCursors have the same translation
  2438. // would have to translate AFTER the merge, but that's ok
  2439. // HOWEVER the result won't be guaranteed to be in sorted order afterwards so is there any point?
  2440. CKeyLevelManager::setLayoutTranslator(trans);
  2441. }
  2442. virtual void setKey(IKeyIndexBase *_keyset)
  2443. {
  2444. keyset.set(_keyset);
  2445. if (_keyset && _keyset->numParts())
  2446. {
  2447. IKeyIndex *ki = _keyset->queryPart(0);
  2448. keyedSize = ki->keyedSize();
  2449. numkeys = _keyset->numParts();
  2450. if (sortFieldOffset > keyedSize)
  2451. throw MakeStringException(0, "Index sort order can only include keyed fields");
  2452. }
  2453. else
  2454. numkeys = 0;
  2455. killBuffers();
  2456. }
  2457. void resetSort(const void *seek, size32_t seekOffset, size32_t seeklen)
  2458. {
  2459. activekeys = 0;
  2460. filter->recalculateCache();
  2461. unsigned i;
  2462. for (i = 0; i < numkeys; i++)
  2463. {
  2464. keyCursor = keyset->queryPart(i)->getCursor(filter, logExcessiveSeeks);
  2465. keyCursor->reset();
  2466. for (;;)
  2467. {
  2468. bool found;
  2469. unsigned lskips = 0;
  2470. unsigned lnullSkips = 0;
  2471. for (;;)
  2472. {
  2473. if (seek)
  2474. {
  2475. if (keyCursor->skipTo(seek, seekOffset, seeklen))
  2476. lskips++;
  2477. else
  2478. lnullSkips++;
  2479. }
  2480. found = keyCursor->lookup(true, stats);
  2481. if (!found || !seek || memcmp(keyCursor->queryKeyBuffer() + seekOffset, seek, seeklen) >= 0)
  2482. break;
  2483. }
  2484. stats.noteSkips(lskips, lnullSkips);
  2485. if (found)
  2486. {
  2487. IKeyCursor *mergeCursor = LINK(keyCursor);
  2488. if (sortFromSeg)
  2489. mergeCursor = keyCursor->fixSortSegs(sortFieldOffset);
  2490. keyNoArray.append(i);
  2491. cursorArray.append(*mergeCursor);
  2492. mergeHeapArray.append(activekeys++);
  2493. if (!sortFromSeg || !keyCursor->nextRange(sortFromSeg))
  2494. break;
  2495. }
  2496. else
  2497. {
  2498. keyCursor->Release();
  2499. break;
  2500. }
  2501. }
  2502. }
  2503. if (activekeys>0)
  2504. {
  2505. if (stats.ctx)
  2506. stats.ctx->noteStatistic(StNumIndexMerges, activekeys);
  2507. cursors = cursorArray.getArray();
  2508. mergeheap = mergeHeapArray.getArray();
  2509. /* Permute mergeheap to establish the heap property
  2510. For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
  2511. The children of p must both be greater than or equal to p
  2512. The parent of a child c is given by p = (c-1)/2
  2513. */
  2514. for (i=1; i<activekeys; i++)
  2515. {
  2516. int r = mergeheap[i];
  2517. int c = i; /* child */
  2518. while (c > 0)
  2519. {
  2520. int p = (c-1)/2; /* parent */
  2521. if ( BuffCompare( c, p ) >= 0 )
  2522. break;
  2523. mergeheap[c] = mergeheap[p];
  2524. mergeheap[p] = r;
  2525. c = p;
  2526. }
  2527. }
  2528. keyCursor = cursors[mergeheap[0]];
  2529. }
  2530. else
  2531. {
  2532. keyCursor = NULL;
  2533. }
  2534. resetPending = false;
  2535. }
  2536. virtual void reset(bool crappyHack)
  2537. {
  2538. if (!started)
  2539. {
  2540. started = true;
  2541. filter->checkSize(keyedSize, "[merger]"); //PG: not sure what keyname to use here
  2542. }
  2543. if (!crappyHack)
  2544. {
  2545. killBuffers();
  2546. resetPending = true;
  2547. }
  2548. else
  2549. {
  2550. if (sortFieldOffset)
  2551. {
  2552. ForEachItemIn(idx, cursorArray)
  2553. {
  2554. cursorArray.replace(*cursorArray.item(idx).fixSortSegs(sortFieldOffset), idx);
  2555. }
  2556. }
  2557. keyCursor = cursors[mergeheap[0]];
  2558. resetPending = false;
  2559. }
  2560. }
  2561. virtual bool lookup(bool exact)
  2562. {
  2563. assertex(exact);
  2564. if (resetPending)
  2565. {
  2566. resetSort(NULL, 0, 0);
  2567. if (!activekeys)
  2568. return false;
  2569. }
  2570. else
  2571. {
  2572. if (!activekeys)
  2573. return false;
  2574. unsigned key = mergeheap[0];
  2575. if (!keyCursor->lookup(exact, stats))
  2576. {
  2577. activekeys--;
  2578. if (!activekeys)
  2579. return false; // MORE - does this lose a record?
  2580. mergeheap[0] = mergeheap[activekeys];
  2581. }
  2582. /* The key associated with mergeheap[0] will have changed
  2583. This code restores the heap property
  2584. */
  2585. unsigned p = 0; /* parent */
  2586. while (1)
  2587. {
  2588. unsigned c = p*2 + 1; /* child */
  2589. if ( c >= activekeys )
  2590. break;
  2591. /* Select smaller child */
  2592. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2593. /* If child is greater or equal than parent then we are done */
  2594. if ( BuffCompare( c, p ) >= 0 )
  2595. break;
  2596. /* Swap parent and child */
  2597. int r = mergeheap[c];
  2598. mergeheap[c] = mergeheap[p];
  2599. mergeheap[p] = r;
  2600. /* child becomes parent */
  2601. p = c;
  2602. }
  2603. // dumpMergeHeap();
  2604. if (mergeheap[0] != key)
  2605. keyCursor = cursors[mergeheap[0]];
  2606. }
  2607. return true;
  2608. }
  2609. virtual unsigned __int64 getCount()
  2610. {
  2611. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2612. // as the sequence numbers are not in sequence
  2613. unsigned __int64 ret = 0;
  2614. if (resetPending)
  2615. resetSort(NULL, 0, 0); // This is slightly suboptimal
  2616. for (unsigned i = 0; i < activekeys; i++)
  2617. {
  2618. unsigned key = mergeheap[i];
  2619. keyCursor = cursors[key];
  2620. ret += CKeyLevelManager::getCount();
  2621. }
  2622. return ret;
  2623. }
  2624. virtual unsigned __int64 checkCount(unsigned __int64 max)
  2625. {
  2626. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2627. // as the sequence numbers are not in sequence
  2628. unsigned __int64 ret = 0;
  2629. if (resetPending)
  2630. resetSort(NULL, 0, 0); // this is a little suboptimal as we will not bail out early
  2631. for (unsigned i = 0; i < activekeys; i++)
  2632. {
  2633. unsigned key = mergeheap[i];
  2634. keyCursor = cursors[key];
  2635. unsigned __int64 thisKeyCount = CKeyLevelManager::checkCount(max);
  2636. ret += thisKeyCount;
  2637. if (thisKeyCount > max)
  2638. return ret;
  2639. max -= thisKeyCount;
  2640. }
  2641. return ret;
  2642. }
  2643. virtual void serializeCursorPos(MemoryBuffer &mb)
  2644. {
  2645. // dumpMergeHeap();
  2646. mb.append(activekeys);
  2647. for (unsigned i = 0; i < activekeys; i++)
  2648. {
  2649. unsigned key = mergeheap[i];
  2650. mb.append(keyNoArray.item(key));
  2651. cursors[key]->serializeCursorPos(mb);
  2652. }
  2653. }
  2654. virtual void deserializeCursorPos(MemoryBuffer &mb)
  2655. {
  2656. mb.read(activekeys);
  2657. for (unsigned i = 0; i < activekeys; i++)
  2658. {
  2659. unsigned keyno;
  2660. mb.read(keyno);
  2661. keyNoArray.append(keyno);
  2662. keyCursor = keyset->queryPart(keyno)->getCursor(filter, logExcessiveSeeks);
  2663. keyCursor->deserializeCursorPos(mb, stats);
  2664. cursorArray.append(*keyCursor);
  2665. mergeHeapArray.append(i);
  2666. }
  2667. cursors = cursorArray.getArray();
  2668. mergeheap = mergeHeapArray.getArray();
  2669. }
  2670. virtual void finishSegmentMonitors()
  2671. {
  2672. CKeyLevelManager::finishSegmentMonitors();
  2673. if (sortFieldOffset)
  2674. {
  2675. filter->checkSize(keyedSize, "[merger]"); // Ensures trailing KSM is setup
  2676. calculateSortSeg();
  2677. }
  2678. }
  2679. };
  2680. extern jhtree_decl IKeyManager *createKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet * _keys, unsigned _sortFieldOffset, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
  2681. {
  2682. return new CKeyMerger(_recInfo, _keys, _sortFieldOffset, _ctx, _newFilters, _logExcessiveSeeks);
  2683. }
  2684. extern jhtree_decl IKeyManager *createSingleKeyMerger(const RtlRecord &_recInfo, IKeyIndex * _onekey, unsigned _sortFieldOffset, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
  2685. {
  2686. return new CKeyMerger(_recInfo, _onekey, _sortFieldOffset, _ctx, _newFilters, _logExcessiveSeeks);
  2687. }
  2688. class CKeyIndexSet : implements IKeyIndexSet, public CInterface
  2689. {
  2690. IPointerArrayOf<IKeyIndex> indexes;
  2691. offset_t recordCount = 0;
  2692. offset_t totalSize = 0;
  2693. StringAttr origFileName;
  2694. public:
  2695. IMPLEMENT_IINTERFACE;
  2696. virtual bool IsShared() const { return CInterface::IsShared(); }
  2697. void addIndex(IKeyIndex *i) { indexes.append(i); }
  2698. virtual unsigned numParts() { return indexes.length(); }
  2699. virtual IKeyIndex *queryPart(unsigned partNo) { return indexes.item(partNo); }
  2700. virtual void setRecordCount(offset_t count) { recordCount = count; }
  2701. virtual void setTotalSize(offset_t size) { totalSize = size; }
  2702. virtual offset_t getRecordCount() { return recordCount; }
  2703. virtual offset_t getTotalSize() { return totalSize; }
  2704. };
  2705. extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
  2706. {
  2707. return new CKeyIndexSet;
  2708. }
  2709. extern jhtree_decl IKeyManager *createLocalKeyManager(const RtlRecord &_recInfo, IKeyIndex *_key, IContextLogger *_ctx, bool newFilters, bool _logExcessiveSeeks)
  2710. {
  2711. return new CKeyLevelManager(_recInfo, _key, _ctx, newFilters, _logExcessiveSeeks);
  2712. }
  2713. class CKeyArray : implements IKeyArray, public CInterface
  2714. {
  2715. public:
  2716. IMPLEMENT_IINTERFACE;
  2717. virtual bool IsShared() const { return CInterface::IsShared(); }
  2718. IPointerArrayOf<IKeyIndexBase> keys;
  2719. virtual IKeyIndexBase *queryKeyPart(unsigned partNo)
  2720. {
  2721. if (!keys.isItem(partNo))
  2722. {
  2723. return NULL;
  2724. }
  2725. IKeyIndexBase *key = keys.item(partNo);
  2726. return key;
  2727. }
  2728. virtual unsigned length() { return keys.length(); }
  2729. void addKey(IKeyIndexBase *f) { keys.append(f); }
  2730. };
  2731. extern jhtree_decl IKeyArray *createKeyArray()
  2732. {
  2733. return new CKeyArray;
  2734. }
  2735. extern jhtree_decl IIndexLookup *createIndexLookup(IKeyManager *keyManager)
  2736. {
  2737. class CIndexLookup : public CSimpleInterfaceOf<IIndexLookup>
  2738. {
  2739. Linked<IKeyManager> keyManager;
  2740. public:
  2741. CIndexLookup(IKeyManager *_keyManager) : keyManager(_keyManager)
  2742. {
  2743. }
  2744. virtual void ensureAvailable() override { }
  2745. virtual unsigned __int64 getCount() override
  2746. {
  2747. return keyManager->getCount();
  2748. }
  2749. virtual unsigned __int64 checkCount(unsigned __int64 limit) override
  2750. {
  2751. return keyManager->checkCount(limit);
  2752. }
  2753. virtual const void *nextKey() override
  2754. {
  2755. if (keyManager->lookup(true))
  2756. return keyManager->queryKeyBuffer();
  2757. else
  2758. return nullptr;
  2759. }
  2760. virtual unsigned querySeeks() const override { return keyManager->querySeeks(); }
  2761. virtual unsigned queryScans() const override { return keyManager->queryScans(); }
  2762. virtual unsigned querySkips() const override { return keyManager->querySkips(); }
  2763. };
  2764. return new CIndexLookup(keyManager);
  2765. }
  2766. #ifdef _USE_CPPUNIT
  2767. #include "unittests.hpp"
  2768. class IKeyManagerTest : public CppUnit::TestFixture
  2769. {
  2770. CPPUNIT_TEST_SUITE( IKeyManagerTest );
  2771. CPPUNIT_TEST(testStepping);
  2772. CPPUNIT_TEST(testKeys);
  2773. CPPUNIT_TEST_SUITE_END();
  2774. void testStepping()
  2775. {
  2776. buildTestKeys(false, true, false);
  2777. {
  2778. // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
  2779. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2780. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  2781. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2782. keyset->addIndex(index1.getClear());
  2783. keyset->addIndex(index2.getClear());
  2784. const char *json = "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
  2785. " \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
  2786. " \"fieldType\": 13, \"length\": 10, "
  2787. " \"fields\": [ "
  2788. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2789. " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 } ] "
  2790. "}";
  2791. Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, false);
  2792. Owned <IKeyManager> tlk1 = createKeyMerger(meta->queryRecordAccessor(true), keyset, 7, NULL, false, false);
  2793. Owned<IStringSet> sset1 = createStringSet(7);
  2794. sset1->addRange("0000003", "0000003");
  2795. sset1->addRange("0000005", "0000006");
  2796. tlk1->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2797. Owned<IStringSet> sset2 = createStringSet(3);
  2798. sset2->addRange("010", "010");
  2799. sset2->addRange("030", "033");
  2800. Owned<IStringSet> sset3 = createStringSet(3);
  2801. sset3->addRange("999", "XXX");
  2802. sset3->addRange("000", "002");
  2803. tlk1->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2804. tlk1->finishSegmentMonitors();
  2805. tlk1->reset();
  2806. offset_t fpos;
  2807. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003010", 10)==0);
  2808. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005010", 10)==0);
  2809. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006010", 10)==0);
  2810. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003030", 10)==0);
  2811. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005030", 10)==0);
  2812. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006030", 10)==0);
  2813. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003031", 10)==0);
  2814. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005031", 10)==0);
  2815. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006031", 10)==0);
  2816. MemoryBuffer mb;
  2817. tlk1->serializeCursorPos(mb);
  2818. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003032", 10)==0);
  2819. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005032", 10)==0);
  2820. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006032", 10)==0);
  2821. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003033", 10)==0);
  2822. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005033", 10)==0);
  2823. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006033", 10)==0);
  2824. ASSERT(!tlk1->lookup(true));
  2825. ASSERT(!tlk1->lookup(true));
  2826. Owned <IKeyManager> tlk2 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL, false, false);
  2827. tlk2->setKey(keyset);
  2828. tlk2->deserializeCursorPos(mb);
  2829. tlk2->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2830. tlk2->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2831. tlk2->finishSegmentMonitors();
  2832. tlk2->reset(true);
  2833. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003032", 10)==0);
  2834. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005032", 10)==0);
  2835. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006032", 10)==0);
  2836. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003033", 10)==0);
  2837. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005033", 10)==0);
  2838. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006033", 10)==0);
  2839. ASSERT(!tlk2->lookup(true));
  2840. ASSERT(!tlk2->lookup(true));
  2841. Owned <IKeyManager> tlk3 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL, false, false);
  2842. tlk3->setKey(keyset);
  2843. tlk3->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2844. tlk3->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2845. tlk3->finishSegmentMonitors();
  2846. tlk3->reset(false);
  2847. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003010", 10)==0);
  2848. ASSERT(tlk3->lookupSkip("031", 7, 3)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003031", 10)==0);
  2849. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000005031", 10)==0);
  2850. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000006031", 10)==0);
  2851. ASSERT(!tlk3->lookupSkip("081", 7, 3));
  2852. ASSERT(!tlk3->lookup(true));
  2853. Owned <IKeyManager> tlk4 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL, false, false);
  2854. tlk4->setKey(keyset);
  2855. tlk4->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2856. tlk4->append(createKeySegmentMonitor(false, sset3.getLink(), 1, 7, 3));
  2857. tlk4->finishSegmentMonitors();
  2858. tlk4->reset(false);
  2859. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003000", 10)==0);
  2860. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005000", 10)==0);
  2861. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006000", 10)==0);
  2862. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003001", 10)==0);
  2863. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005001", 10)==0);
  2864. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006001", 10)==0);
  2865. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003002", 10)==0);
  2866. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005002", 10)==0);
  2867. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006002", 10)==0);
  2868. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003999", 10)==0);
  2869. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005999", 10)==0);
  2870. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006999", 10)==0);
  2871. ASSERT(!tlk4->lookup(true));
  2872. ASSERT(!tlk4->lookup(true));
  2873. }
  2874. clearKeyStoreCache(true);
  2875. removeTestKeys();
  2876. }
  2877. void buildTestKeys(bool variable, bool useTrailingHeader, bool noSeek)
  2878. {
  2879. buildTestKey("keyfile1.$$$", false, variable, useTrailingHeader, noSeek);
  2880. buildTestKey("keyfile2.$$$", true, variable, useTrailingHeader, noSeek);
  2881. }
  2882. void buildTestKey(const char *filename, bool skip, bool variable, bool useTrailingHeader, bool noSeek)
  2883. {
  2884. OwnedIFile file = createIFile(filename);
  2885. OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
  2886. Owned<IFileIOStream> out = createIOStream(io);
  2887. unsigned maxRecSize = variable ? 18 : 10;
  2888. unsigned keyedSize = 10;
  2889. Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY |
  2890. (variable ? HTREE_VARSIZE : 0) |
  2891. (useTrailingHeader ? USE_TRAILING_HEADER : 0) |
  2892. (noSeek ? TRAILING_HEADER_ONLY : 0),
  2893. maxRecSize, NODESIZE, keyedSize, 0, nullptr, true, false);
  2894. char keybuf[18];
  2895. memset(keybuf, '0', 18);
  2896. for (unsigned count = 0; count < 10000; count++)
  2897. {
  2898. unsigned datasize = 10;
  2899. if (variable && (count % 10)==0)
  2900. {
  2901. char *blob = new char[count+100000];
  2902. byte seed = count;
  2903. for (unsigned i = 0; i < count+100000; i++)
  2904. {
  2905. blob[i] = seed;
  2906. seed = seed * 13 + i;
  2907. }
  2908. offset_t blobid = builder->createBlob(count+100000, blob);
  2909. memcpy(keybuf + 10, &blobid, sizeof(blobid));
  2910. delete [] blob;
  2911. datasize += sizeof(blobid);
  2912. }
  2913. bool skipme = (count % 4 == 0) != skip;
  2914. if (!skipme)
  2915. {
  2916. builder->processKeyData(keybuf, count*10, datasize);
  2917. if (count==48 || count==49)
  2918. builder->processKeyData(keybuf, count*10, datasize);
  2919. }
  2920. unsigned idx = 9;
  2921. for (;;)
  2922. {
  2923. if (keybuf[idx]=='9')
  2924. keybuf[idx--]='0';
  2925. else
  2926. {
  2927. keybuf[idx]++;
  2928. break;
  2929. }
  2930. }
  2931. }
  2932. builder->finish(nullptr, nullptr);
  2933. out->flush();
  2934. }
  2935. void removeTestKeys()
  2936. {
  2937. ASSERT(remove("keyfile1.$$$")==0);
  2938. ASSERT(remove("keyfile2.$$$")==0);
  2939. }
  2940. void checkBlob(IKeyManager *key, unsigned size)
  2941. {
  2942. unsigned __int64 blobid;
  2943. memcpy(&blobid, key->queryKeyBuffer()+10, sizeof(blobid));
  2944. ASSERT(blobid != 0);
  2945. size32_t blobsize;
  2946. const byte *blob = key->loadBlob(blobid, blobsize);
  2947. ASSERT(blob != NULL);
  2948. ASSERT(blobsize == size);
  2949. byte seed = size-100000;
  2950. for (unsigned i = 0; i < size; i++)
  2951. {
  2952. ASSERT(blob[i] == seed);
  2953. seed = seed * 13 + i;
  2954. }
  2955. key->releaseBlobs();
  2956. }
  2957. protected:
  2958. void testKeys(bool variable, bool useTrailingHeader, bool noSeek)
  2959. {
  2960. const char *json = variable ?
  2961. "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
  2962. " \"ty2\": { \"fieldType\": 15, \"length\": 8 }, "
  2963. " \"fieldType\": 13, \"length\": 10, "
  2964. " \"fields\": [ "
  2965. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2966. " { \"name\": \"f3\", \"type\": \"ty2\", \"flags\": 65551 } " // 0x01000f i.e. payload and blob
  2967. " ]"
  2968. "}"
  2969. :
  2970. "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
  2971. " \"fieldType\": 13, \"length\": 10, "
  2972. " \"fields\": [ "
  2973. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2974. " ] "
  2975. "}";
  2976. Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, false);
  2977. const RtlRecord &recInfo = meta->queryRecordAccessor(true);
  2978. buildTestKeys(variable, useTrailingHeader, noSeek);
  2979. {
  2980. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2981. Owned <IKeyManager> tlk1 = createLocalKeyManager(recInfo, index1, NULL, false, false);
  2982. Owned<IStringSet> sset1 = createStringSet(10);
  2983. sset1->addRange("0000000001", "0000000100");
  2984. tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 0, 10));
  2985. tlk1->finishSegmentMonitors();
  2986. tlk1->reset();
  2987. Owned <IKeyManager> tlk1a = createLocalKeyManager(recInfo, index1, NULL, false, false);
  2988. Owned<IStringSet> sset1a = createStringSet(8);
  2989. sset1a->addRange("00000000", "00000001");
  2990. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 0, 8));
  2991. tlk1a->append(createKeySegmentMonitor(false, NULL, 1, 8, 1));
  2992. sset1a.setown(createStringSet(1));
  2993. sset1a->addRange("0", "1");
  2994. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 2, 9, 1));
  2995. tlk1a->finishSegmentMonitors();
  2996. tlk1a->reset();
  2997. Owned<IStringSet> ssetx = createStringSet(10);
  2998. ssetx->addRange("0000000001", "0000000002");
  2999. ASSERT(ssetx->numValues() == 2);
  3000. ssetx->addRange("00000000AK", "00000000AL");
  3001. ASSERT(ssetx->numValues() == 4);
  3002. ssetx->addRange("0000000100", "0010000000");
  3003. ASSERT(ssetx->numValues() == (unsigned) -1);
  3004. ssetx->addRange("0000000001", "0010000000");
  3005. ASSERT(ssetx->numValues() == (unsigned) -1);
  3006. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  3007. Owned <IKeyManager> tlk2 = createLocalKeyManager(recInfo, index2, NULL, false, false);
  3008. Owned<IStringSet> sset2 = createStringSet(10);
  3009. sset2->addRange("0000000001", "0000000100");
  3010. ASSERT(sset2->numValues() == 65536);
  3011. tlk2->append(createKeySegmentMonitor(false, sset2.getClear(), 0, 0, 10));
  3012. tlk2->finishSegmentMonitors();
  3013. tlk2->reset();
  3014. Owned <IKeyManager> tlk3;
  3015. if (!variable)
  3016. {
  3017. Owned<IKeyIndexSet> both = createKeyIndexSet();
  3018. both->addIndex(index1.getLink());
  3019. both->addIndex(index2.getLink());
  3020. Owned<IStringSet> sset3 = createStringSet(10);
  3021. tlk3.setown(createKeyMerger(recInfo, NULL, 0, NULL, false, false));
  3022. tlk3->setKey(both);
  3023. sset3->addRange("0000000001", "0000000100");
  3024. tlk3->append(createKeySegmentMonitor(false, sset3.getClear(), 0, 0, 10));
  3025. tlk3->finishSegmentMonitors();
  3026. tlk3->reset();
  3027. }
  3028. Owned <IKeyManager> tlk2a = createLocalKeyManager(recInfo, index2, NULL, false, false);
  3029. Owned<IStringSet> sset2a = createStringSet(10);
  3030. sset2a->addRange("0000000048", "0000000048");
  3031. ASSERT(sset2a->numValues() == 1);
  3032. tlk2a->append(createKeySegmentMonitor(false, sset2a.getClear(), 0, 0, 10));
  3033. tlk2a->finishSegmentMonitors();
  3034. tlk2a->reset();
  3035. Owned <IKeyManager> tlk2b = createLocalKeyManager(recInfo, index2, NULL, false, false);
  3036. Owned<IStringSet> sset2b = createStringSet(10);
  3037. sset2b->addRange("0000000047", "0000000049");
  3038. ASSERT(sset2b->numValues() == 3);
  3039. tlk2b->append(createKeySegmentMonitor(false, sset2b.getClear(), 0, 0, 10));
  3040. tlk2b->finishSegmentMonitors();
  3041. tlk2b->reset();
  3042. Owned <IKeyManager> tlk2c = createLocalKeyManager(recInfo, index2, NULL, false, false);
  3043. Owned<IStringSet> sset2c = createStringSet(10);
  3044. sset2c->addRange("0000000047", "0000000047");
  3045. tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 0, 10));
  3046. tlk2c->finishSegmentMonitors();
  3047. tlk2c->reset();
  3048. ASSERT(tlk1->getCount() == 76);
  3049. ASSERT(tlk1->getCount() == 76);
  3050. ASSERT(tlk1a->getCount() == 30);
  3051. ASSERT(tlk2->getCount() == 26);
  3052. ASSERT(tlk2a->getCount() == 2);
  3053. ASSERT(tlk2b->getCount() == 2);
  3054. ASSERT(tlk2c->getCount() == 0);
  3055. if (tlk3)
  3056. ASSERT(tlk3->getCount() == 102);
  3057. // MORE - PUT SOME TESTS IN FOR WILD SEEK STUFF
  3058. unsigned pass;
  3059. char buf[11];
  3060. unsigned i;
  3061. for (pass = 0; pass < 2; pass++)
  3062. {
  3063. offset_t fpos;
  3064. tlk1->reset();
  3065. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000001", 10)==0);
  3066. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000002", 10)==0);
  3067. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000003", 10)==0);
  3068. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000005", 10)==0);
  3069. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000006", 10)==0);
  3070. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000007", 10)==0);
  3071. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000009", 10)==0);
  3072. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000010", 10)==0);
  3073. if (variable)
  3074. checkBlob(tlk1, 10+100000);
  3075. tlk1a->reset();
  3076. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000001", 10)==0);
  3077. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000010", 10)==0);
  3078. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000011", 10)==0);
  3079. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000021", 10)==0);
  3080. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000030", 10)==0);
  3081. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000031", 10)==0);
  3082. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000041", 10)==0);
  3083. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000050", 10)==0);
  3084. tlk2->reset();
  3085. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000004", 10)==0);
  3086. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000008", 10)==0);
  3087. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000012", 10)==0);
  3088. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000016", 10)==0);
  3089. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000020", 10)==0);
  3090. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000024", 10)==0);
  3091. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000028", 10)==0);
  3092. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000032", 10)==0);
  3093. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000036", 10)==0);
  3094. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000040", 10)==0);
  3095. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000044", 10)==0);
  3096. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
  3097. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
  3098. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000052", 10)==0);
  3099. if (tlk3)
  3100. {
  3101. tlk3->reset();
  3102. for (i = 1; i <= 100; i++)
  3103. {
  3104. ASSERT(tlk3->lookup(true));
  3105. sprintf(buf, "%010d", i);
  3106. ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
  3107. if (i==48 || i==49)
  3108. {
  3109. ASSERT(tlk3->lookup(true));
  3110. ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
  3111. }
  3112. }
  3113. ASSERT(!tlk3->lookup(true));
  3114. ASSERT(!tlk3->lookup(true));
  3115. }
  3116. }
  3117. tlk1->releaseSegmentMonitors();
  3118. tlk2->releaseSegmentMonitors();
  3119. if (tlk3)
  3120. tlk3->releaseSegmentMonitors();
  3121. }
  3122. clearKeyStoreCache(true);
  3123. removeTestKeys();
  3124. }
  3125. void testKeys()
  3126. {
  3127. ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
  3128. for (bool var : { false, true })
  3129. for (bool trail : { false, true })
  3130. for (bool noseek : { false, true })
  3131. testKeys(var, trail, noseek);
  3132. }
  3133. };
  3134. CPPUNIT_TEST_SUITE_REGISTRATION( IKeyManagerTest );
  3135. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( IKeyManagerTest, "IKeyManagerTest" );
  3136. #endif