jhtree.cpp 104 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. //****************************************************************************
  14. // Name: jhtree.cpp
  15. //
  16. // Purpose:
  17. //
  18. // Description:
  19. //
  20. // Notes: Supports only static (non-changing) files
  21. //
  22. // Initially I was holding on to the root nodes, but came to find
  23. // that they could potentially fill the cache by themselves...
  24. //
  25. // Things to play with:
  26. // - try not unpacking the entire node when it is read in.
  27. // break it out as needed later.
  28. //
  29. // History: 31-Aug-99 crs original
  30. // 08-Jan-00 nh added LZW compression of nodes
  31. // 14-feb-00 nh added GetORDKey
  32. // 15-feb-00 nh fixed isolatenode and nextNode
  33. // 12-Apr-00 jcs moved over to jhtree.dll etc.
  34. //****************************************************************************
  35. #include "platform.h"
  36. #include <stdio.h>
  37. #include <fcntl.h>
  38. #include <stdlib.h>
  39. #include <limits.h>
  40. #ifdef __linux__
  41. #include <alloca.h>
  42. #endif
  43. #include "hlzw.h"
  44. #include "jmutex.hpp"
  45. #include "jhutil.hpp"
  46. #include "jmisc.hpp"
  47. #include "jstats.h"
  48. #include "ctfile.hpp"
  49. #include "jhtree.ipp"
  50. #include "keybuild.hpp"
  51. #include "eclhelper_dyn.hpp"
  52. #include "rtlrecord.hpp"
  53. #include "rtldynfield.hpp"
  54. static std::atomic<CKeyStore *> keyStore(nullptr);
  55. static unsigned defaultKeyIndexLimit = 200;
  56. static CNodeCache *nodeCache = NULL;
  57. static CriticalSection *initCrit = NULL;
  58. bool useMemoryMappedIndexes = false;
  59. bool logExcessiveSeeks = false;
  60. bool linuxYield = false;
  61. bool traceSmartStepping = false;
  62. bool flushJHtreeCacheOnOOM = true;
  63. MODULE_INIT(INIT_PRIORITY_JHTREE_JHTREE)
  64. {
  65. initCrit = new CriticalSection;
  66. return 1;
  67. }
  68. MODULE_EXIT()
  69. {
  70. delete initCrit;
  71. delete keyStore.load(std::memory_order_relaxed);
  72. ::Release((CInterface*)nodeCache);
  73. }
  74. //#define DUMP_NODES
  75. SegMonitorList::SegMonitorList(const RtlRecord &_recInfo, bool _needWild) : recInfo(_recInfo), needWild(_needWild)
  76. {
  77. keySegCount = recInfo.getNumKeyedFields();
  78. reset();
  79. }
  80. unsigned SegMonitorList::ordinality() const
  81. {
  82. return segMonitors.length();
  83. }
  84. IKeySegmentMonitor *SegMonitorList::item(unsigned idx) const
  85. {
  86. return &segMonitors.item(idx);
  87. }
  88. size32_t SegMonitorList::getSize() const
  89. {
  90. unsigned lim = segMonitors.length();
  91. if (lim)
  92. {
  93. IKeySegmentMonitor &lastItem = segMonitors.item(lim-1);
  94. return lastItem.getOffset() + lastItem.getSize();
  95. }
  96. else
  97. return 0;
  98. }
  99. void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname)
  100. {
  101. size32_t segSize = getSize();
  102. if (segSize != keyedSize)
  103. {
  104. StringBuffer err;
  105. err.appendf("Key size mismatch on key %s - key size is %u, expected %u", keyname, keyedSize, getSize());
  106. IException *e = MakeStringExceptionDirect(1000, err.str());
  107. EXCLOG(e, err.str());
  108. throw e;
  109. }
  110. }
  111. void SegMonitorList::setLow(unsigned segno, void *keyBuffer) const
  112. {
  113. unsigned lim = segMonitors.length();
  114. while (segno < lim)
  115. segMonitors.item(segno++).setLow(keyBuffer);
  116. }
  117. unsigned SegMonitorList::setLowAfter(size32_t offset, void *keyBuffer) const
  118. {
  119. unsigned lim = segMonitors.length();
  120. unsigned segno = 0;
  121. unsigned skipped = 0;
  122. while (segno < lim)
  123. {
  124. IKeySegmentMonitor &seg = segMonitors.item(segno++);
  125. if (seg.getOffset() >= offset)
  126. seg.setLow(keyBuffer);
  127. else if (seg.getSize()+seg.getOffset() <= offset)
  128. skipped++;
  129. else
  130. {
  131. byte *temp = (byte *) alloca(seg.getSize() + seg.getOffset());
  132. seg.setLow(temp);
  133. memcpy((byte *)keyBuffer+offset, temp+offset, seg.getSize() - (offset - seg.getOffset()));
  134. }
  135. }
  136. return skipped;
  137. }
  138. void SegMonitorList::endRange(unsigned segno, void *keyBuffer) const
  139. {
  140. unsigned lim = segMonitors.length();
  141. if (segno < lim)
  142. segMonitors.item(segno++).endRange(keyBuffer);
  143. while (segno < lim)
  144. segMonitors.item(segno++).setHigh(keyBuffer);
  145. }
  146. bool SegMonitorList::incrementKey(unsigned segno, void *keyBuffer) const
  147. {
  148. // Increment the key buffer to next acceptable value
  149. for(;;)
  150. {
  151. if (segMonitors.item(segno).increment(keyBuffer))
  152. {
  153. setLow(segno+1, keyBuffer);
  154. return true;
  155. }
  156. if (!segno)
  157. return false;
  158. segno--;
  159. }
  160. }
  161. unsigned SegMonitorList::_lastRealSeg() const
  162. {
  163. unsigned seg = segMonitors.length();
  164. for (;;)
  165. {
  166. if (!seg)
  167. return 0;
  168. seg--;
  169. if (!segMonitors.item(seg).isWild())
  170. return seg;
  171. }
  172. }
  173. unsigned SegMonitorList::lastFullSeg() const
  174. {
  175. // This is used to determine what part of the segmonitor list to use for a pre-count to determine if atmost/limit have been hit
  176. // We include everything up to the last of i) the last keyed element or ii) the last keyed,opt element that has no wild between it and a keyed element
  177. // NOTE - can return (unsigned) -1 if there are no full segments
  178. unsigned len = segMonitors.length();
  179. unsigned seg = 0;
  180. unsigned ret = (unsigned) -1;
  181. bool wildSeen = false;
  182. while (seg < len)
  183. {
  184. if (segMonitors.item(seg).isWild())
  185. wildSeen = true;
  186. else
  187. {
  188. if (!wildSeen || !segMonitors.item(seg).isOptional())
  189. {
  190. ret = seg;
  191. wildSeen = false;
  192. }
  193. }
  194. seg++;
  195. }
  196. return ret;
  197. }
  198. void SegMonitorList::finish(unsigned keyedSize)
  199. {
  200. if (modified)
  201. {
  202. while (segMonitors.length() < keySegCount)
  203. {
  204. unsigned idx = segMonitors.length();
  205. size32_t offset = recInfo.getFixedOffset(idx);
  206. size32_t size = recInfo.getFixedOffset(idx+1) - offset;
  207. segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
  208. }
  209. size32_t segSize = getSize();
  210. assertex(segSize == keyedSize);
  211. recalculateCache();
  212. modified = false;
  213. }
  214. }
  215. void SegMonitorList::recalculateCache()
  216. {
  217. cachedLRS = _lastRealSeg();
  218. }
  219. void SegMonitorList::reset()
  220. {
  221. segMonitors.kill();
  222. modified = true;
  223. }
  224. void SegMonitorList::swapWith(SegMonitorList &other)
  225. {
  226. reset();
  227. other.segMonitors.swapWith(segMonitors);
  228. }
  229. void SegMonitorList::deserialize(MemoryBuffer &mb)
  230. {
  231. unsigned num;
  232. mb.read(num);
  233. while (num--)
  234. append(deserializeKeySegmentMonitor(mb));
  235. }
  236. void SegMonitorList::serialize(MemoryBuffer &mb) const
  237. {
  238. mb.append((unsigned) ordinality());
  239. ForEachItemIn(idx, segMonitors)
  240. segMonitors.item(idx).serialize(mb);
  241. }
  242. // interface IIndexReadContext
  243. void SegMonitorList::append(IKeySegmentMonitor *segment)
  244. {
  245. modified = true;
  246. unsigned fieldIdx = segment->getFieldIdx();
  247. unsigned offset = segment->getOffset();
  248. unsigned size = segment->getSize();
  249. while (segMonitors.length() < fieldIdx)
  250. {
  251. unsigned idx = segMonitors.length();
  252. size32_t offset = recInfo.getFixedOffset(idx);
  253. size32_t size = recInfo.getFixedOffset(idx+1) - offset;
  254. segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
  255. }
  256. segMonitors.append(*segment);
  257. }
  258. void SegMonitorList::append(FFoption option, IFieldFilter * filter)
  259. {
  260. UNIMPLEMENTED;
  261. }
  262. bool SegMonitorList::matched(void *keyBuffer, unsigned &lastMatch) const
  263. {
  264. lastMatch = 0;
  265. for (; lastMatch < segMonitors.length(); lastMatch++)
  266. {
  267. if (!segMonitors.item(lastMatch).matchesBuffer(keyBuffer))
  268. return false;
  269. }
  270. return true;
  271. }
  272. ///
  273. class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
  274. {
  275. protected:
  276. IContextLogger *ctx;
  277. SegMonitorList segs;
  278. IKeyCursor *keyCursor;
  279. char *keyBuffer;
  280. unsigned keySize; // size of key record including payload
  281. unsigned keyedSize; // size of non-payload part of key
  282. unsigned numsegs;
  283. bool matched = false;
  284. bool eof = false;
  285. bool started = false;
  286. StringAttr keyName;
  287. unsigned seeks;
  288. unsigned scans;
  289. unsigned skips;
  290. unsigned nullSkips;
  291. unsigned wildseeks;
  292. Owned<const IDynamicTransform> layoutTrans;
  293. MemoryBuffer buf; // used when translating
  294. size32_t layoutSize = 0;
  295. inline void setLow(unsigned segNo)
  296. {
  297. segs.setLow(segNo, keyBuffer);
  298. }
  299. inline unsigned setLowAfter(size32_t offset)
  300. {
  301. return segs.setLowAfter(offset, keyBuffer);
  302. }
  303. inline bool incrementKey(unsigned segno) const
  304. {
  305. return segs.incrementKey(segno, keyBuffer);
  306. }
  307. inline void endRange(unsigned segno)
  308. {
  309. segs.endRange(segno, keyBuffer);
  310. }
  311. bool skipTo(const void *_seek, size32_t seekOffset, size32_t seeklen)
  312. {
  313. // Modify the current key contents buffer as follows
  314. // Take bytes up to seekoffset from current buffer (i.e. leave them alone)
  315. // Take up to seeklen bytes from seek comparing them as I go. If I see a lower one before I see a higher one, stop.
  316. // If I didn't see any higher ones, return (at which point the skipto was a no-op
  317. // If I saw higher ones, call setLowAfter for all remaining segmonitors
  318. // If the current contents of buffer could not match, call incremementKey at the appropriate monitor so that it can
  319. // Clear the matched flag
  320. const byte *seek = (const byte *) _seek;
  321. while (seeklen)
  322. {
  323. int c = *seek - (byte) (keyBuffer[seekOffset]);
  324. if (c < 0)
  325. return false;
  326. else if (c>0)
  327. {
  328. memcpy(keyBuffer+seekOffset, seek, seeklen);
  329. break;
  330. }
  331. seek++;
  332. seekOffset++;
  333. seeklen--;
  334. }
  335. #ifdef _DEBUG
  336. if (traceSmartStepping)
  337. {
  338. StringBuffer recstr;
  339. unsigned i;
  340. for (i = 0; i < keySize; i++)
  341. {
  342. unsigned char c = ((unsigned char *) keyBuffer)[i];
  343. recstr.appendf("%c", isprint(c) ? c : '.');
  344. }
  345. recstr.append (" ");
  346. for (i = 0; i < keySize; i++)
  347. {
  348. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  349. }
  350. DBGLOG("SKIP: IN skips=%02d nullskips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  351. }
  352. #endif
  353. if (!seeklen) return false;
  354. unsigned j = setLowAfter(seekOffset + seeklen);
  355. bool canmatch = true;
  356. unsigned lastSeg = segs.lastRealSeg();
  357. for (; j <= lastSeg; j++)
  358. {
  359. canmatch = segs.segMonitors.item(j).matchesBuffer(keyBuffer);
  360. if (!canmatch)
  361. {
  362. eof = !incrementKey(j);
  363. break;
  364. }
  365. }
  366. matched = false;
  367. return true;
  368. }
  369. void noteSeeks(unsigned lseeks, unsigned lscans, unsigned lwildseeks)
  370. {
  371. seeks += lseeks;
  372. scans += lscans;
  373. wildseeks += lwildseeks;
  374. if (ctx)
  375. {
  376. if (lseeks) ctx->noteStatistic(StNumIndexSeeks, lseeks);
  377. if (lscans) ctx->noteStatistic(StNumIndexScans, lscans);
  378. if (lwildseeks) ctx->noteStatistic(StNumIndexWildSeeks, lwildseeks);
  379. }
  380. }
  381. void noteSkips(unsigned lskips, unsigned lnullSkips)
  382. {
  383. skips += lskips;
  384. nullSkips += lnullSkips;
  385. if (ctx)
  386. {
  387. if (lskips) ctx->noteStatistic(StNumIndexSkips, lskips);
  388. if (lnullSkips) ctx->noteStatistic(StNumIndexNullSkips, lnullSkips);
  389. }
  390. }
  391. void reportExcessiveSeeks(unsigned numSeeks, unsigned lastSeg)
  392. {
  393. StringBuffer recstr;
  394. unsigned i;
  395. for (i = 0; i < keySize; i++)
  396. {
  397. unsigned char c = ((unsigned char *) keyBuffer)[i];
  398. recstr.appendf("%c", isprint(c) ? c : '.');
  399. }
  400. recstr.append ("\n");
  401. for (i = 0; i < keySize; i++)
  402. {
  403. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  404. }
  405. recstr.append ("\nusing segmonitors:\n");
  406. for (i=0; i <= lastSeg; i++)
  407. {
  408. unsigned size = segs.segMonitors.item(i).getSize();
  409. while (size--)
  410. recstr.append( segs.segMonitors.item(i).isWild() ? '?' : '#');
  411. }
  412. if (ctx)
  413. ctx->CTXLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
  414. else
  415. DBGLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
  416. }
  417. public:
  418. IMPLEMENT_IINTERFACE;
  419. CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx) : segs(_recInfo, true)
  420. {
  421. ctx = _ctx;
  422. numsegs = 0;
  423. keyBuffer = NULL;
  424. keyCursor = NULL;
  425. keySize = 0;
  426. keyedSize = 0;
  427. setKey(_key);
  428. seeks = 0;
  429. scans = 0;
  430. skips = 0;
  431. nullSkips = 0;
  432. wildseeks = 0;
  433. }
  434. ~CKeyLevelManager()
  435. {
  436. free (keyBuffer);
  437. ::Release(keyCursor);
  438. }
  439. virtual unsigned querySeeks() const
  440. {
  441. return seeks;
  442. }
  443. virtual unsigned queryScans() const
  444. {
  445. return scans;
  446. }
  447. virtual unsigned querySkips() const
  448. {
  449. return skips;
  450. }
  451. virtual unsigned queryNullSkips() const
  452. {
  453. return nullSkips;
  454. }
  455. virtual void resetCounts()
  456. {
  457. scans = 0;
  458. seeks = 0;
  459. skips = 0;
  460. nullSkips = 0;
  461. wildseeks = 0;
  462. }
  463. void setKey(IKeyIndexBase * _key)
  464. {
  465. ::Release(keyCursor);
  466. keyCursor = NULL;
  467. if (_key)
  468. {
  469. assertex(_key->numParts()==1);
  470. IKeyIndex *ki = _key->queryPart(0);
  471. keyCursor = ki->getCursor(ctx);
  472. keyName.set(ki->queryFileName());
  473. if (!keyBuffer)
  474. {
  475. keySize = ki->keySize();
  476. keyedSize = ki->keyedSize();
  477. if (!keySize)
  478. {
  479. StringBuffer err;
  480. err.appendf("Key appears corrupt - key file (%s) indicates record size is zero", keyName.get());
  481. IException *e = MakeStringExceptionDirect(1000, err.str());
  482. EXCLOG(e, err.str());
  483. throw e;
  484. }
  485. keyBuffer = (char *) malloc(keySize);
  486. }
  487. else
  488. {
  489. assertex(keyedSize==ki->keyedSize());
  490. assertex(keySize==ki->keySize());
  491. }
  492. }
  493. }
  494. virtual void setChooseNLimit(unsigned __int64 _rowLimit) override
  495. {
  496. // TODO ?
  497. }
  498. virtual void reset(bool crappyHack)
  499. {
  500. if (keyCursor)
  501. {
  502. if (!started)
  503. {
  504. started = true;
  505. numsegs = segs.ordinality();
  506. segs.checkSize(keyedSize, keyName.get());
  507. }
  508. if (!crappyHack)
  509. {
  510. matched = false;
  511. eof = false;
  512. setLow(0);
  513. keyCursor->reset();
  514. }
  515. }
  516. }
  517. virtual void releaseSegmentMonitors()
  518. {
  519. segs.reset();
  520. started = false;
  521. }
  522. virtual void append(IKeySegmentMonitor *segment)
  523. {
  524. assertex(!started);
  525. segs.append(segment);
  526. }
  527. virtual void append(FFoption option, IFieldFilter * filter)
  528. {
  529. UNIMPLEMENTED;
  530. }
  531. virtual unsigned ordinality() const
  532. {
  533. return segs.ordinality();
  534. }
  535. virtual IKeySegmentMonitor *item(unsigned idx) const
  536. {
  537. return segs.item(idx);
  538. }
  539. inline const byte *queryKeyBuffer()
  540. {
  541. if(layoutTrans)
  542. {
  543. buf.setLength(0);
  544. MemoryBufferBuilder aBuilder(buf, 0);
  545. layoutSize = layoutTrans->translate(aBuilder, reinterpret_cast<byte const *>(keyBuffer));
  546. return reinterpret_cast<byte const *>(buf.toByteArray());
  547. }
  548. else
  549. return reinterpret_cast<byte const *>(keyBuffer);
  550. }
  551. inline size32_t queryRowSize()
  552. {
  553. if (layoutTrans)
  554. return layoutSize;
  555. else
  556. return keyCursor ? keyCursor->getSize() : 0;
  557. }
  558. inline unsigned __int64 querySequence()
  559. {
  560. return keyCursor ? keyCursor->getSequence() : 0;
  561. }
  562. inline unsigned queryRecordSize() { return keySize; }
  563. bool _lookup(bool exact, unsigned lastSeg)
  564. {
  565. bool ret = false;
  566. unsigned lwildseeks = 0;
  567. unsigned lseeks = 0;
  568. unsigned lscans = 0;
  569. while (!eof)
  570. {
  571. bool ok;
  572. if (matched)
  573. {
  574. ok = keyCursor->next(keyBuffer);
  575. lscans++;
  576. }
  577. else
  578. {
  579. ok = keyCursor->gtEqual(keyBuffer, keyBuffer, true);
  580. lseeks++;
  581. }
  582. if (ok)
  583. {
  584. unsigned i = 0;
  585. matched = true;
  586. if (segs.segMonitors.length())
  587. {
  588. for (; i <= lastSeg; i++)
  589. {
  590. matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
  591. if (!matched)
  592. break;
  593. }
  594. }
  595. if (matched)
  596. {
  597. ret = true;
  598. break;
  599. }
  600. #ifdef __linux__
  601. if (linuxYield)
  602. sched_yield();
  603. #endif
  604. eof = !incrementKey(i);
  605. if (!exact)
  606. {
  607. ret = true;
  608. break;
  609. }
  610. lwildseeks++;
  611. }
  612. else
  613. eof = true;
  614. }
  615. if (logExcessiveSeeks && lwildseeks > 1000)
  616. reportExcessiveSeeks(lwildseeks, lastSeg);
  617. noteSeeks(lseeks, lscans, lwildseeks);
  618. return ret;
  619. }
  620. virtual bool lookup(bool exact)
  621. {
  622. if (keyCursor)
  623. return _lookup(exact, segs.lastRealSeg());
  624. else
  625. return false;
  626. }
  627. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  628. {
  629. if (keyCursor)
  630. {
  631. if (skipTo(seek, seekOffset, seeklen))
  632. noteSkips(1, 0);
  633. else
  634. noteSkips(0, 1);
  635. bool ret = _lookup(true, segs.lastRealSeg());
  636. #ifdef _DEBUG
  637. if (traceSmartStepping)
  638. {
  639. StringBuffer recstr;
  640. unsigned i;
  641. for (i = 0; i < keySize; i++)
  642. {
  643. unsigned char c = ((unsigned char *) keyBuffer)[i];
  644. recstr.appendf("%c", isprint(c) ? c : '.');
  645. }
  646. recstr.append (" ");
  647. for (i = 0; i < keySize; i++)
  648. {
  649. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  650. }
  651. DBGLOG("SKIP: Got skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  652. }
  653. #endif
  654. return ret;
  655. }
  656. else
  657. return false;
  658. }
  659. unsigned __int64 getCount()
  660. {
  661. assertex(keyCursor);
  662. matched = false;
  663. eof = false;
  664. setLow(0);
  665. keyCursor->reset();
  666. unsigned __int64 result = 0;
  667. unsigned lseeks = 0;
  668. unsigned lastRealSeg = segs.lastRealSeg();
  669. for (;;)
  670. {
  671. if (_lookup(true, lastRealSeg))
  672. {
  673. unsigned __int64 locount = keyCursor->getSequence();
  674. endRange(lastRealSeg);
  675. keyCursor->ltEqual(keyBuffer, NULL, true);
  676. lseeks++;
  677. result += keyCursor->getSequence()-locount+1;
  678. if (!incrementKey(lastRealSeg))
  679. break;
  680. matched = false;
  681. }
  682. else
  683. break;
  684. }
  685. noteSeeks(lseeks, 0, 0);
  686. return result;
  687. }
  688. unsigned __int64 getCurrentRangeCount(unsigned groupSegCount)
  689. {
  690. unsigned __int64 result = 0;
  691. if (keyCursor)
  692. {
  693. unsigned __int64 locount = keyCursor->getSequence();
  694. endRange(groupSegCount);
  695. keyCursor->ltEqual(keyBuffer, NULL, true);
  696. result = keyCursor->getSequence()-locount+1;
  697. noteSeeks(1, 0, 0);
  698. }
  699. return result;
  700. }
  701. bool nextRange(unsigned groupSegCount)
  702. {
  703. if (!incrementKey(groupSegCount-1))
  704. return false;
  705. matched = false;
  706. return true;
  707. }
  708. unsigned __int64 checkCount(unsigned __int64 max)
  709. {
  710. assertex(keyCursor);
  711. matched = false;
  712. eof = false;
  713. setLow(0);
  714. keyCursor->reset();
  715. unsigned __int64 result = 0;
  716. unsigned lseeks = 0;
  717. unsigned lastFullSeg = segs.lastFullSeg();
  718. if (lastFullSeg == (unsigned) -1)
  719. {
  720. noteSeeks(1, 0, 0);
  721. if (keyCursor->last(NULL))
  722. return keyCursor->getSequence()+1;
  723. else
  724. return 0;
  725. }
  726. for (;;)
  727. {
  728. if (_lookup(true, lastFullSeg))
  729. {
  730. unsigned __int64 locount = keyCursor->getSequence();
  731. endRange(lastFullSeg);
  732. keyCursor->ltEqual(keyBuffer, NULL, true);
  733. lseeks++;
  734. result += keyCursor->getSequence()-locount+1;
  735. if (max && (result > max))
  736. break;
  737. if (!incrementKey(lastFullSeg))
  738. break;
  739. matched = false;
  740. }
  741. else
  742. break;
  743. }
  744. noteSeeks(lseeks, 0, 0);
  745. return result;
  746. }
  747. virtual void serializeCursorPos(MemoryBuffer &mb)
  748. {
  749. mb.append(eof);
  750. if (!eof)
  751. {
  752. keyCursor->serializeCursorPos(mb);
  753. mb.append(matched);
  754. }
  755. }
  756. virtual void deserializeCursorPos(MemoryBuffer &mb)
  757. {
  758. mb.read(eof);
  759. if (!eof)
  760. {
  761. assertex(keyBuffer);
  762. keyCursor->deserializeCursorPos(mb, keyBuffer);
  763. mb.read(matched);
  764. }
  765. }
  766. virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  767. {
  768. return keyCursor->loadBlob(blobid, blobsize);
  769. }
  770. virtual void releaseBlobs()
  771. {
  772. if (keyCursor)
  773. keyCursor->releaseBlobs();
  774. }
  775. virtual void setLayoutTranslator(const IDynamicTransform * trans) override
  776. {
  777. layoutTrans.set(trans);
  778. }
  779. virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
  780. {
  781. segs.swapWith(segmentMonitors);
  782. }
  783. virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
  784. {
  785. segs.deserialize(mb);
  786. }
  787. virtual void finishSegmentMonitors()
  788. {
  789. segs.finish(keyedSize);
  790. }
  791. };
  792. ///////////////////////////////////////////////////////////////////////////////
  793. ///////////////////////////////////////////////////////////////////////////////
  794. // For some reason #pragma pack does not seem to work here. Force all elements to 8 bytes
  795. class CKeyIdAndPos
  796. {
  797. public:
  798. unsigned __int64 keyId;
  799. offset_t pos;
  800. CKeyIdAndPos(unsigned __int64 _keyId, offset_t _pos) { keyId = _keyId; pos = _pos; }
  801. bool operator==(const CKeyIdAndPos &other) { return keyId == other.keyId && pos == other.pos; }
  802. };
  803. class CNodeMapping : public HTMapping<CJHTreeNode, CKeyIdAndPos>
  804. {
  805. public:
  806. CNodeMapping(CKeyIdAndPos &fp, CJHTreeNode &et) : HTMapping<CJHTreeNode, CKeyIdAndPos>(et, fp) { }
  807. ~CNodeMapping() { this->et.Release(); }
  808. CJHTreeNode &query() { return queryElement(); }
  809. };
  810. typedef OwningSimpleHashTableOf<CNodeMapping, CKeyIdAndPos> CNodeTable;
  811. #define FIXED_NODE_OVERHEAD (sizeof(CJHTreeNode))
  812. class CNodeMRUCache : public CMRUCacheOf<CKeyIdAndPos, CJHTreeNode, CNodeMapping, CNodeTable>
  813. {
  814. size32_t sizeInMem, memLimit;
  815. public:
  816. CNodeMRUCache(size32_t _memLimit) : memLimit(0)
  817. {
  818. sizeInMem = 0;
  819. setMemLimit(_memLimit);
  820. }
  821. size32_t setMemLimit(size32_t _memLimit)
  822. {
  823. size32_t oldMemLimit = memLimit;
  824. memLimit = _memLimit;
  825. if (full())
  826. makeSpace();
  827. return oldMemLimit;
  828. }
  829. virtual void makeSpace()
  830. {
  831. // remove LRU until !full
  832. do
  833. {
  834. clear(1);
  835. }
  836. while (full());
  837. }
  838. virtual bool full()
  839. {
  840. if (((size32_t)-1) == memLimit) return false;
  841. return sizeInMem > memLimit;
  842. }
  843. virtual void elementAdded(CNodeMapping *mapping)
  844. {
  845. CJHTreeNode &node = mapping->queryElement();
  846. sizeInMem += (FIXED_NODE_OVERHEAD+node.getMemSize());
  847. }
  848. virtual void elementRemoved(CNodeMapping *mapping)
  849. {
  850. CJHTreeNode &node = mapping->queryElement();
  851. sizeInMem -= (FIXED_NODE_OVERHEAD+node.getMemSize());
  852. }
  853. };
  854. class CNodeCache : public CInterface
  855. {
  856. private:
  857. mutable CriticalSection lock;
  858. CNodeMRUCache nodeCache;
  859. CNodeMRUCache leafCache;
  860. CNodeMRUCache blobCache;
  861. CNodeMRUCache preloadCache;
  862. bool cacheNodes;
  863. bool cacheLeaves;
  864. bool cacheBlobs;
  865. bool preloadNodes;
  866. public:
  867. CNodeCache(size32_t maxNodeMem, size32_t maxLeaveMem, size32_t maxBlobMem)
  868. : nodeCache(maxNodeMem), leafCache(maxLeaveMem), blobCache(maxBlobMem), preloadCache((unsigned) -1)
  869. {
  870. cacheNodes = maxNodeMem != 0;
  871. cacheLeaves = maxLeaveMem != 0;;
  872. cacheBlobs = maxBlobMem != 0;
  873. preloadNodes = false;
  874. // note that each index caches the last blob it unpacked so that sequential blobfetches are still ok
  875. }
  876. CJHTreeNode *getNode(INodeLoader *key, int keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
  877. void preload(CJHTreeNode *node, int keyID, offset_t pos, IContextLogger *ctx);
  878. bool isPreloaded(int keyID, offset_t pos);
  879. inline bool getNodeCachePreload()
  880. {
  881. return preloadNodes;
  882. }
  883. inline bool setNodeCachePreload(bool _preload)
  884. {
  885. bool oldPreloadNodes = preloadNodes;
  886. preloadNodes = _preload;
  887. return oldPreloadNodes;
  888. }
  889. inline size32_t setNodeCacheMem(size32_t newSize)
  890. {
  891. CriticalBlock block(lock);
  892. unsigned oldV = nodeCache.setMemLimit(newSize);
  893. cacheNodes = (newSize != 0);
  894. return oldV;
  895. }
  896. inline size32_t setLeafCacheMem(size32_t newSize)
  897. {
  898. CriticalBlock block(lock);
  899. unsigned oldV = leafCache.setMemLimit(newSize);
  900. cacheLeaves = (newSize != 0);
  901. return oldV;
  902. }
  903. inline size32_t setBlobCacheMem(size32_t newSize)
  904. {
  905. CriticalBlock block(lock);
  906. unsigned oldV = blobCache.setMemLimit(newSize);
  907. cacheBlobs = (newSize != 0);
  908. return oldV;
  909. }
  910. void clear()
  911. {
  912. CriticalBlock block(lock);
  913. nodeCache.kill();
  914. leafCache.kill();
  915. blobCache.kill();
  916. }
  917. };
  918. static inline CNodeCache *queryNodeCache()
  919. {
  920. if (nodeCache) return nodeCache; // avoid crit
  921. CriticalBlock b(*initCrit);
  922. if (!nodeCache) nodeCache = new CNodeCache(100*0x100000, 50*0x100000, 0);
  923. return nodeCache;
  924. }
  925. void clearNodeCache()
  926. {
  927. queryNodeCache()->clear();
  928. }
  929. inline CKeyStore *queryKeyStore()
  930. {
  931. CKeyStore * value = keyStore.load(std::memory_order_acquire);
  932. if (value) return value; // avoid crit
  933. CriticalBlock b(*initCrit);
  934. if (!keyStore.load(std::memory_order_acquire)) keyStore = new CKeyStore;
  935. return keyStore;
  936. }
  937. unsigned setKeyIndexCacheSize(unsigned limit)
  938. {
  939. return queryKeyStore()->setKeyCacheLimit(limit);
  940. }
  941. CKeyStore::CKeyStore() : keyIndexCache(defaultKeyIndexLimit)
  942. {
  943. nextId = 0;
  944. #if 0
  945. mm.setown(createSharedMemoryManager("RichardsSharedMemManager", 0x100000));
  946. try
  947. {
  948. if (mm)
  949. sharedCache.setown(mm->share());
  950. }
  951. catch (IException *E)
  952. {
  953. E->Release();
  954. }
  955. #endif
  956. }
  957. CKeyStore::~CKeyStore()
  958. {
  959. }
  960. unsigned CKeyStore::setKeyCacheLimit(unsigned limit)
  961. {
  962. return keyIndexCache.setCacheLimit(limit);
  963. }
  964. IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  965. {
  966. // isTLK provided by caller since flags in key header unreliable. If either say it's a TLK, I believe it.
  967. {
  968. MTIME_SECTION(queryActiveTimer(), "CKeyStore_load");
  969. IKeyIndex *keyIndex;
  970. // MORE - holds onto the mutex way too long
  971. synchronized block(mutex);
  972. StringBuffer fname;
  973. fname.append(fileName).append('/').append(crc);
  974. keyIndex = keyIndexCache.query(fname);
  975. if (NULL == keyIndex)
  976. {
  977. if (iMappedFile)
  978. {
  979. assert(!iFileIO && !part);
  980. keyIndex = new CMemKeyIndex(getUniqId(), LINK(iMappedFile), fname, isTLK);
  981. }
  982. else if (iFileIO)
  983. {
  984. assert(!part);
  985. keyIndex = new CDiskKeyIndex(getUniqId(), LINK(iFileIO), fname, isTLK, allowPreload);
  986. }
  987. else
  988. {
  989. Owned<IFile> iFile;
  990. if (part)
  991. {
  992. iFile.setown(part->open());
  993. if (NULL == iFile.get())
  994. throw MakeStringException(0, "Failed to open index file %s", fileName);
  995. }
  996. else
  997. iFile.setown(createIFile(fileName));
  998. IFileIO *fio = iFile->open(IFOread);
  999. if (fio)
  1000. keyIndex = new CDiskKeyIndex(getUniqId(), fio, fname, isTLK, allowPreload);
  1001. else
  1002. throw MakeStringException(0, "Failed to open index file %s", fileName);
  1003. }
  1004. keyIndexCache.add(fname, *LINK(keyIndex));
  1005. }
  1006. else
  1007. {
  1008. LINK(keyIndex);
  1009. }
  1010. assertex(NULL != keyIndex);
  1011. return keyIndex;
  1012. }
  1013. }
  1014. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload)
  1015. {
  1016. return doload(fileName, crc, NULL, iFileIO, NULL, isTLK, allowPreload);
  1017. }
  1018. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
  1019. {
  1020. return doload(fileName, crc, NULL, NULL, iMappedFile, isTLK, allowPreload);
  1021. }
  1022. // fileName+crc used only as key for cache
  1023. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload)
  1024. {
  1025. return doload(fileName, crc, &part, NULL, NULL, isTLK, allowPreload);
  1026. }
  1027. IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload)
  1028. {
  1029. return doload(fileName, crc, NULL, NULL, NULL, isTLK, allowPreload);
  1030. }
  1031. StringBuffer &CKeyStore::getMetrics(StringBuffer &xml)
  1032. {
  1033. xml.append(" <IndexMetrics>\n");
  1034. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1035. ForEach(*iter)
  1036. {
  1037. CKeyIndexMapping &mapping = iter->query();
  1038. IKeyIndex &index = mapping.query();
  1039. const char *name = mapping.queryFindString();
  1040. xml.appendf(" <Index name=\"%s\" scans=\"%d\" seeks=\"%d\"/>\n", name, index.queryScans(), index.querySeeks());
  1041. }
  1042. xml.append(" </IndexMetrics>\n");
  1043. return xml;
  1044. }
  1045. void CKeyStore::resetMetrics()
  1046. {
  1047. synchronized block(mutex);
  1048. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1049. ForEach(*iter)
  1050. {
  1051. CKeyIndexMapping &mapping = iter->query();
  1052. IKeyIndex &index = mapping.query();
  1053. index.resetCounts();
  1054. }
  1055. }
  1056. void CKeyStore::clearCache(bool killAll)
  1057. {
  1058. synchronized block(mutex);
  1059. if (killAll)
  1060. {
  1061. clearNodeCache(); // no point in keeping old nodes cached if key store cache has been cleared
  1062. keyIndexCache.kill();
  1063. }
  1064. else
  1065. {
  1066. StringArray goers;
  1067. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1068. ForEach(*iter)
  1069. {
  1070. CKeyIndexMapping &mapping = iter->query();
  1071. IKeyIndex &index = mapping.query();
  1072. if (!index.IsShared())
  1073. {
  1074. const char *name = mapping.queryFindString();
  1075. goers.append(name);
  1076. }
  1077. }
  1078. ForEachItemIn(idx, goers)
  1079. {
  1080. keyIndexCache.remove(goers.item(idx));
  1081. }
  1082. }
  1083. }
  1084. void CKeyStore::clearCacheEntry(const char *keyName)
  1085. {
  1086. if (!keyName || !*keyName)
  1087. return; // nothing to do
  1088. synchronized block(mutex);
  1089. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1090. StringArray goers;
  1091. ForEach(*iter)
  1092. {
  1093. CKeyIndexMapping &mapping = iter->query();
  1094. IKeyIndex &index = mapping.query();
  1095. if (!index.IsShared())
  1096. {
  1097. const char *name = mapping.queryFindString();
  1098. if (strstr(name, keyName) != 0) // keyName doesn't have drive or part number associated with it
  1099. goers.append(name);
  1100. }
  1101. }
  1102. ForEachItemIn(idx, goers)
  1103. {
  1104. keyIndexCache.remove(goers.item(idx));
  1105. }
  1106. }
  1107. void CKeyStore::clearCacheEntry(const IFileIO *io)
  1108. {
  1109. synchronized block(mutex);
  1110. Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
  1111. StringArray goers;
  1112. ForEach(*iter)
  1113. {
  1114. CKeyIndexMapping &mapping = iter->query();
  1115. IKeyIndex &index = mapping.query();
  1116. if (!index.IsShared())
  1117. {
  1118. if (index.queryFileIO()==io)
  1119. goers.append(mapping.queryFindString());
  1120. }
  1121. }
  1122. ForEachItemIn(idx, goers)
  1123. {
  1124. keyIndexCache.remove(goers.item(idx));
  1125. }
  1126. }
  1127. // CKeyIndex impl.
  1128. CKeyIndex::CKeyIndex(int _iD, const char *_name) : name(_name)
  1129. {
  1130. iD = _iD;
  1131. cache = queryNodeCache(); // use one node cache for all key indexes;
  1132. cache->Link();
  1133. keyHdr = NULL;
  1134. rootNode = NULL;
  1135. cachedBlobNodePos = 0;
  1136. keySeeks.store(0);
  1137. keyScans.store(0);
  1138. latestGetNodeOffset = 0;
  1139. }
  1140. void CKeyIndex::cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK)
  1141. {
  1142. bool first = true;
  1143. while (nodePos)
  1144. {
  1145. Owned<CJHTreeNode> node = loadNode(nodePos);
  1146. if (node->isLeaf())
  1147. {
  1148. if (!isTLK)
  1149. return;
  1150. }
  1151. else if (first)
  1152. {
  1153. cacheNodes(cache, node->getFPosAt(0), isTLK);
  1154. first = false;
  1155. }
  1156. cache->preload(node, iD, nodePos, NULL);
  1157. nodePos = node->getRightSib();
  1158. }
  1159. }
  1160. void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
  1161. {
  1162. if (isTLK)
  1163. hdr.ktype |= HTREE_TOPLEVEL_KEY; // thor does not set
  1164. keyHdr = new CKeyHdr();
  1165. try
  1166. {
  1167. keyHdr->load(hdr);
  1168. }
  1169. catch (IKeyException *ke)
  1170. {
  1171. if (!name.get()) throw;
  1172. StringBuffer msg;
  1173. IKeyException *ke2 = MakeKeyException(ke->errorCode(), "%s. In key '%s'.", ke->errorMessage(msg).str(), name.get());
  1174. ke->Release();
  1175. throw ke2;
  1176. }
  1177. offset_t rootPos = keyHdr->getRootFPos();
  1178. Linked<CNodeCache> nodeCache = queryNodeCache();
  1179. if (allowPreload)
  1180. {
  1181. if (nodeCache->getNodeCachePreload() && !nodeCache->isPreloaded(iD, rootPos))
  1182. {
  1183. cacheNodes(nodeCache, rootPos, isTLK);
  1184. }
  1185. }
  1186. rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
  1187. }
  1188. CKeyIndex::~CKeyIndex()
  1189. {
  1190. ::Release(keyHdr);
  1191. ::Release(cache);
  1192. ::Release(rootNode);
  1193. }
  1194. CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
  1195. : CKeyIndex(_iD, _name)
  1196. {
  1197. io.setown(_io);
  1198. assertex(io->offset()==0); // mapped whole file
  1199. assertex(io->length()==io->fileSize()); // mapped whole file
  1200. KeyHdr hdr;
  1201. if (io->length() < sizeof(hdr))
  1202. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  1203. memcpy(&hdr, io->base(), sizeof(hdr));
  1204. init(hdr, isTLK, false);
  1205. }
  1206. CJHTreeNode *CMemKeyIndex::loadNode(offset_t pos)
  1207. {
  1208. nodesLoaded++;
  1209. if (pos + keyHdr->getNodeSize() > io->fileSize())
  1210. {
  1211. IException *E = MakeStringException(errno, "Error reading node at position %" I64F "x past EOF", pos);
  1212. StringBuffer m;
  1213. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  1214. EXCLOG(E, m.str());
  1215. throw E;
  1216. }
  1217. char *nodeData = (char *) (io->base() + pos);
  1218. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  1219. return CKeyIndex::loadNode(nodeData, pos, false);
  1220. }
  1221. CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
  1222. : CKeyIndex(_iD, _name)
  1223. {
  1224. io.setown(_io);
  1225. KeyHdr hdr;
  1226. if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
  1227. throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
  1228. init(hdr, isTLK, allowPreload);
  1229. }
  1230. CJHTreeNode *CDiskKeyIndex::loadNode(offset_t pos)
  1231. {
  1232. nodesLoaded++;
  1233. unsigned nodeSize = keyHdr->getNodeSize();
  1234. MemoryAttr ma;
  1235. char *nodeData = (char *) ma.allocate(nodeSize);
  1236. MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
  1237. if (io->read(pos, nodeSize, nodeData) != nodeSize)
  1238. {
  1239. IException *E = MakeStringException(errno, "Error %d reading node at position %" I64F "x", errno, pos);
  1240. StringBuffer m;
  1241. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  1242. EXCLOG(E, m.str());
  1243. throw E;
  1244. }
  1245. return CKeyIndex::loadNode(nodeData, pos, true);
  1246. }
  1247. CJHTreeNode *CKeyIndex::loadNode(char *nodeData, offset_t pos, bool needsCopy)
  1248. {
  1249. try
  1250. {
  1251. Owned<CJHTreeNode> ret;
  1252. char leafFlag = ((NodeHdr *) nodeData)->leafFlag;
  1253. switch(leafFlag)
  1254. {
  1255. case 0:
  1256. ret.setown(new CJHTreeNode());
  1257. break;
  1258. case 1:
  1259. if (keyHdr->isVariable())
  1260. ret.setown(new CJHVarTreeNode());
  1261. else
  1262. ret.setown(new CJHTreeNode());
  1263. break;
  1264. case 2:
  1265. ret.setown(new CJHTreeBlobNode());
  1266. break;
  1267. case 3:
  1268. ret.setown(new CJHTreeMetadataNode());
  1269. break;
  1270. default:
  1271. throwUnexpected();
  1272. }
  1273. {
  1274. MTIME_SECTION(queryActiveTimer(), "JHTREE load node");
  1275. ret->load(keyHdr, nodeData, pos, true);
  1276. }
  1277. return ret.getClear();
  1278. }
  1279. catch (IException *E)
  1280. {
  1281. StringBuffer m;
  1282. m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
  1283. EXCLOG(E, m.str());
  1284. throw;
  1285. }
  1286. catch (...)
  1287. {
  1288. DBGLOG("Unknown exception in key %s, position 0x%" I64F "x", name.get(), pos);
  1289. throw;
  1290. }
  1291. }
  1292. bool CKeyIndex::isTopLevelKey()
  1293. {
  1294. return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0;
  1295. }
  1296. bool CKeyIndex::isFullySorted()
  1297. {
  1298. return (keyHdr->getKeyType() & HTREE_FULLSORT_KEY) != 0;
  1299. }
  1300. IKeyCursor *CKeyIndex::getCursor(IContextLogger *ctx)
  1301. {
  1302. return new CKeyCursor(*this, ctx); // MORE - pool them?
  1303. }
  1304. CJHTreeNode *CKeyIndex::getNode(offset_t offset, IContextLogger *ctx)
  1305. {
  1306. latestGetNodeOffset = offset;
  1307. return cache->getNode(this, iD, offset, ctx, isTopLevelKey());
  1308. }
  1309. void dumpNode(FILE *out, CJHTreeNode *node, int length, unsigned rowCount, bool raw)
  1310. {
  1311. if (!raw)
  1312. fprintf(out, "Node dump: fpos(%" I64F "d) leaf(%d)\n", node->getFpos(), node->isLeaf());
  1313. if (rowCount==0 || rowCount > node->getNumKeys())
  1314. rowCount = node->getNumKeys();
  1315. for (unsigned int i=0; i<rowCount; i++)
  1316. {
  1317. char *dst = (char *) alloca(node->getKeyLen()+50);
  1318. node->getValueAt(i, dst);
  1319. if (raw)
  1320. {
  1321. fwrite(dst, 1, length, out);
  1322. }
  1323. else
  1324. {
  1325. offset_t pos = node->getFPosAt(i);
  1326. StringBuffer s;
  1327. appendURL(&s, dst, length, true);
  1328. fprintf(out, "keyVal %d [%" I64F "d] = %s\n", i, pos, s.str());
  1329. }
  1330. }
  1331. if (!raw)
  1332. fprintf(out, "==========\n");
  1333. }
  1334. void CKeyIndex::dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw)
  1335. {
  1336. Owned<CJHTreeNode> node = loadNode(pos);
  1337. ::dumpNode(out, node, keySize(), count, isRaw);
  1338. }
  1339. bool CKeyIndex::hasSpecialFileposition() const
  1340. {
  1341. return keyHdr->hasSpecialFileposition();
  1342. }
  1343. size32_t CKeyIndex::keySize()
  1344. {
  1345. size32_t fileposSize = keyHdr->hasSpecialFileposition() ? sizeof(offset_t) : 0;
  1346. return keyHdr->getMaxKeyLength() + fileposSize;
  1347. }
  1348. size32_t CKeyIndex::keyedSize()
  1349. {
  1350. return keyHdr->getNodeKeyLength();
  1351. }
  1352. bool CKeyIndex::hasPayload()
  1353. {
  1354. return keyHdr->hasPayload();
  1355. }
  1356. CJHTreeBlobNode *CKeyIndex::getBlobNode(offset_t nodepos)
  1357. {
  1358. CriticalBlock b(blobCacheCrit);
  1359. if (nodepos != cachedBlobNodePos)
  1360. {
  1361. cachedBlobNode.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeBlobNode)); // note - don't use the cache
  1362. cachedBlobNodePos = nodepos;
  1363. }
  1364. return cachedBlobNode.getLink();
  1365. }
  1366. const byte *CKeyIndex::loadBlob(unsigned __int64 blobid, size32_t &blobSize)
  1367. {
  1368. offset_t nodepos = blobid & I64C(0xffffffffffff);
  1369. size32_t offset = (size32_t) ((blobid & I64C(0xffff000000000000)) >> 44);
  1370. Owned<CJHTreeBlobNode> blobNode = getBlobNode(nodepos);
  1371. size32_t sizeRemaining = blobNode->getTotalBlobSize(offset);
  1372. blobSize = sizeRemaining;
  1373. byte *ret = (byte *) malloc(sizeRemaining);
  1374. byte *finger = ret;
  1375. for (;;)
  1376. {
  1377. size32_t gotHere = blobNode->getBlobData(offset, finger);
  1378. assertex(gotHere <= sizeRemaining);
  1379. sizeRemaining -= gotHere;
  1380. finger += gotHere;
  1381. if (!sizeRemaining)
  1382. break;
  1383. blobNode.setown(getBlobNode(blobNode->getRightSib()));
  1384. offset = 0;
  1385. }
  1386. return ret;
  1387. }
  1388. offset_t CKeyIndex::queryMetadataHead()
  1389. {
  1390. offset_t ret = keyHdr->getHdrStruct()->metadataHead;
  1391. if(ret == static_cast<offset_t>(-1)) ret = 0; // index created before introduction of metadata would have FFFF... in this space
  1392. return ret;
  1393. }
  1394. IPropertyTree * CKeyIndex::getMetadata()
  1395. {
  1396. offset_t nodepos = queryMetadataHead();
  1397. if(!nodepos)
  1398. return NULL;
  1399. Owned<CJHTreeMetadataNode> node;
  1400. StringBuffer xml;
  1401. while(nodepos)
  1402. {
  1403. node.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeMetadataNode));
  1404. node->get(xml);
  1405. nodepos = node->getRightSib();
  1406. }
  1407. IPropertyTree * ret;
  1408. try
  1409. {
  1410. ret = createPTreeFromXMLString(xml.str());
  1411. }
  1412. catch(IPTreeReadException * e)
  1413. {
  1414. StringBuffer emsg;
  1415. IException * wrapped = MakeStringException(e->errorAudience(), e->errorCode(), "Error retrieving XML metadata: %s", e->errorMessage(emsg).str());
  1416. e->Release();
  1417. throw wrapped;
  1418. }
  1419. return ret;
  1420. }
  1421. CKeyCursor::CKeyCursor(CKeyIndex &_key, IContextLogger *_ctx)
  1422. : key(_key), ctx(_ctx)
  1423. {
  1424. key.Link();
  1425. nodeKey = 0;
  1426. }
  1427. CKeyCursor::~CKeyCursor()
  1428. {
  1429. key.Release();
  1430. releaseBlobs();
  1431. }
  1432. void CKeyCursor::reset()
  1433. {
  1434. node.clear();
  1435. }
  1436. CJHTreeNode *CKeyCursor::locateFirstNode()
  1437. {
  1438. CJHTreeNode * n = 0;
  1439. CJHTreeNode * p = LINK(key.rootNode);
  1440. while (p != 0)
  1441. {
  1442. n = p;
  1443. p = key.getNode(n->prevNodeFpos(), ctx);
  1444. if (p != 0)
  1445. n->Release();
  1446. }
  1447. return n;
  1448. }
  1449. CJHTreeNode *CKeyCursor::locateLastNode()
  1450. {
  1451. CJHTreeNode * n = 0;
  1452. CJHTreeNode * p = LINK(key.rootNode);
  1453. while (p != 0)
  1454. {
  1455. n = p;
  1456. p = key.getNode(n->nextNodeFpos(), ctx);
  1457. if (p != 0)
  1458. n->Release();
  1459. }
  1460. return n;
  1461. }
  1462. bool CKeyCursor::next(char *dst)
  1463. {
  1464. if (!node)
  1465. return first(dst);
  1466. else
  1467. {
  1468. key.keyScans++;
  1469. if (!node->getValueAt( ++nodeKey, dst))
  1470. {
  1471. offset_t rsib = node->getRightSib();
  1472. node.clear();
  1473. if (rsib != 0)
  1474. {
  1475. node.setown(key.getNode(rsib, ctx));
  1476. if (node != NULL)
  1477. {
  1478. nodeKey = 0;
  1479. return node->getValueAt(0, dst);
  1480. }
  1481. }
  1482. return false;
  1483. }
  1484. else
  1485. return true;
  1486. }
  1487. }
  1488. bool CKeyCursor::prev(char *dst)
  1489. {
  1490. if (!node)
  1491. return last(dst); // Note - this used to say First - surely a typo
  1492. else
  1493. {
  1494. key.keyScans++;
  1495. if (!nodeKey)
  1496. {
  1497. offset_t lsib = node->getLeftSib();
  1498. node.clear();
  1499. if (lsib != 0)
  1500. {
  1501. node.setown(key.getNode(lsib, ctx));
  1502. if (node)
  1503. {
  1504. nodeKey = node->getNumKeys()-1;
  1505. return node->getValueAt(nodeKey, dst );
  1506. }
  1507. }
  1508. return false;
  1509. }
  1510. else
  1511. return node->getValueAt(--nodeKey, dst);
  1512. }
  1513. }
  1514. size32_t CKeyCursor::getSize()
  1515. {
  1516. assertex(node);
  1517. return node->getSizeAt(nodeKey);
  1518. }
  1519. offset_t CKeyCursor::getFPos()
  1520. {
  1521. assertex(node);
  1522. return node->getFPosAt(nodeKey);
  1523. }
  1524. unsigned __int64 CKeyCursor::getSequence()
  1525. {
  1526. assertex(node);
  1527. return node->getSequence(nodeKey);
  1528. }
  1529. bool CKeyCursor::first(char *dst)
  1530. {
  1531. key.keySeeks++;
  1532. node.setown(locateFirstNode());
  1533. nodeKey = 0;
  1534. return node->getValueAt(nodeKey, dst);
  1535. }
  1536. bool CKeyCursor::last(char *dst)
  1537. {
  1538. key.keySeeks++;
  1539. node.setown(locateLastNode());
  1540. nodeKey = node->getNumKeys()-1;
  1541. return node->getValueAt( nodeKey, dst );
  1542. }
  1543. bool CKeyCursor::gtEqual(const char *src, char *dst, bool seekForward)
  1544. {
  1545. key.keySeeks++;
  1546. unsigned lwm = 0;
  1547. if (seekForward && node)
  1548. {
  1549. // When seeking forward, there are two cases worth optimizing:
  1550. // 1. the next record is actually the one we want
  1551. // 2. The record we want is on the current page
  1552. unsigned numKeys = node->getNumKeys();
  1553. if (nodeKey < numKeys-1)
  1554. {
  1555. int rc = node->compareValueAt(src, ++nodeKey);
  1556. if (rc <= 0)
  1557. {
  1558. node->getValueAt(nodeKey, dst);
  1559. return true;
  1560. }
  1561. if (nodeKey < numKeys-1)
  1562. {
  1563. rc = node->compareValueAt(src, numKeys-1);
  1564. if (rc <= 0)
  1565. lwm = nodeKey+1;
  1566. }
  1567. }
  1568. }
  1569. if (!lwm)
  1570. node.set(key.rootNode);
  1571. for (;;)
  1572. {
  1573. unsigned int a = lwm;
  1574. int b = node->getNumKeys();
  1575. // first search for first GTE entry (result in b(<),a(>=))
  1576. while ((int)a<b)
  1577. {
  1578. int i = a+(b-a)/2;
  1579. int rc = node->compareValueAt(src, i);
  1580. if (rc>0)
  1581. a = i+1;
  1582. else
  1583. b = i;
  1584. }
  1585. if (node->isLeaf())
  1586. {
  1587. if (a<node->getNumKeys())
  1588. nodeKey = a;
  1589. else
  1590. {
  1591. offset_t nextPos = node->nextNodeFpos(); // This can happen at eof because of key peculiarity where level above reports ffff as last
  1592. node.setown(key.getNode(nextPos, ctx));
  1593. nodeKey = 0;
  1594. }
  1595. if (node)
  1596. {
  1597. node->getValueAt(nodeKey, dst);
  1598. return true;
  1599. }
  1600. else
  1601. return false;
  1602. }
  1603. else
  1604. {
  1605. if (a<node->getNumKeys())
  1606. {
  1607. offset_t npos = node->getFPosAt(a);
  1608. node.setown(key.getNode(npos, ctx));
  1609. }
  1610. else
  1611. return false;
  1612. }
  1613. }
  1614. }
  1615. bool CKeyCursor::ltEqual(const char *src, char *dst, bool seekForward)
  1616. {
  1617. key.keySeeks++;
  1618. unsigned lwm = 0;
  1619. if (seekForward && node)
  1620. {
  1621. // When seeking forward, there are two cases worth optimizing:
  1622. // 1. next record is > src, so we return current
  1623. // 2. The record we want is on the current page
  1624. unsigned numKeys = node->getNumKeys();
  1625. if (nodeKey < numKeys-1)
  1626. {
  1627. int rc = node->compareValueAt(src, ++nodeKey);
  1628. if (rc < 0)
  1629. {
  1630. node->getValueAt(--nodeKey, dst);
  1631. return true;
  1632. }
  1633. if (nodeKey < numKeys-1)
  1634. {
  1635. rc = node->compareValueAt(src, numKeys-1);
  1636. if (rc < 0)
  1637. lwm = nodeKey;
  1638. }
  1639. }
  1640. }
  1641. if (!lwm)
  1642. node.set(key.rootNode);
  1643. for (;;)
  1644. {
  1645. unsigned int a = lwm;
  1646. int b = node->getNumKeys();
  1647. // Locate first record greater than src
  1648. while ((int)a<b)
  1649. {
  1650. int i = a+(b+1-a)/2;
  1651. int rc = node->compareValueAt(src, i-1);
  1652. if (rc>=0)
  1653. a = i;
  1654. else
  1655. b = i-1;
  1656. }
  1657. if (node->isLeaf())
  1658. {
  1659. // record we want is the one before first record greater than src.
  1660. if (a>0)
  1661. nodeKey = a-1;
  1662. else
  1663. {
  1664. offset_t prevPos = node->prevNodeFpos();
  1665. node.setown(key.getNode(prevPos, ctx));
  1666. if (node)
  1667. nodeKey = node->getNumKeys()-1;
  1668. }
  1669. if (node)
  1670. {
  1671. node->getValueAt(nodeKey, dst);
  1672. return true;
  1673. }
  1674. else
  1675. return false;
  1676. }
  1677. else
  1678. {
  1679. // Node to look in is the first one one that ended greater than src.
  1680. if (a==node->getNumKeys())
  1681. a--; // value being looked for is off the end of the index.
  1682. offset_t npos = node->getFPosAt(a);
  1683. node.setown(key.getNode(npos, ctx));
  1684. if (!node)
  1685. throw MakeStringException(0, "Invalid key %s: child node pointer should never be NULL", key.name.get());
  1686. }
  1687. }
  1688. }
  1689. void CKeyCursor::serializeCursorPos(MemoryBuffer &mb)
  1690. {
  1691. if (node)
  1692. {
  1693. mb.append(node->getFpos());
  1694. mb.append(nodeKey);
  1695. }
  1696. else
  1697. {
  1698. offset_t zero = 0;
  1699. unsigned zero2 = 0;
  1700. mb.append(zero);
  1701. mb.append(zero2);
  1702. }
  1703. }
  1704. void CKeyCursor::deserializeCursorPos(MemoryBuffer &mb, char *keyBuffer)
  1705. {
  1706. offset_t nodeAddress;
  1707. mb.read(nodeAddress);
  1708. mb.read(nodeKey);
  1709. if (nodeAddress)
  1710. {
  1711. node.setown(key.getNode(nodeAddress, ctx));
  1712. if (node && keyBuffer)
  1713. node->getValueAt(nodeKey, keyBuffer);
  1714. }
  1715. else
  1716. node.clear();
  1717. }
  1718. const byte *CKeyCursor::loadBlob(unsigned __int64 blobid, size32_t &blobsize)
  1719. {
  1720. const byte *ret = key.loadBlob(blobid, blobsize);
  1721. activeBlobs.append(ret);
  1722. return ret;
  1723. }
  1724. void CKeyCursor::releaseBlobs()
  1725. {
  1726. ForEachItemIn(idx, activeBlobs)
  1727. {
  1728. free((void *) activeBlobs.item(idx));
  1729. }
  1730. activeBlobs.kill();
  1731. }
  1732. class CLazyKeyIndex : implements IKeyIndex, public CInterface
  1733. {
  1734. StringAttr keyfile;
  1735. unsigned crc;
  1736. Linked<IDelayedFile> delayedFile;
  1737. Owned<IFileIO> iFileIO;
  1738. Owned<IKeyIndex> realKey;
  1739. CriticalSection c;
  1740. bool isTLK;
  1741. bool preloadAllowed;
  1742. inline IKeyIndex &checkOpen()
  1743. {
  1744. CriticalBlock b(c);
  1745. if (!realKey)
  1746. {
  1747. Owned<IMemoryMappedFile> mapped = useMemoryMappedIndexes ? delayedFile->getMappedFile() : nullptr;
  1748. if (mapped)
  1749. realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
  1750. else
  1751. {
  1752. iFileIO.setown(delayedFile->getFileIO());
  1753. realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
  1754. }
  1755. if (!realKey)
  1756. {
  1757. DBGLOG("Lazy key file %s could not be opened", keyfile.get());
  1758. throw MakeStringException(0, "Lazy key file %s could not be opened", keyfile.get());
  1759. }
  1760. }
  1761. return *realKey;
  1762. }
  1763. public:
  1764. IMPLEMENT_IINTERFACE;
  1765. CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
  1766. : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
  1767. {}
  1768. virtual bool IsShared() const { return CInterface::IsShared(); }
  1769. virtual IKeyCursor *getCursor(IContextLogger *ctx) { return checkOpen().getCursor(ctx); }
  1770. virtual size32_t keySize() { return checkOpen().keySize(); }
  1771. virtual size32_t keyedSize() { return checkOpen().keyedSize(); }
  1772. virtual bool hasPayload() { return checkOpen().hasPayload(); }
  1773. virtual bool isTopLevelKey() { return checkOpen().isTopLevelKey(); }
  1774. virtual bool isFullySorted() { return checkOpen().isFullySorted(); }
  1775. virtual unsigned getFlags() { return checkOpen().getFlags(); }
  1776. virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) { checkOpen().dumpNode(out, pos, count, isRaw); }
  1777. virtual unsigned numParts() { return 1; }
  1778. virtual IKeyIndex *queryPart(unsigned idx) { return idx ? NULL : this; }
  1779. virtual unsigned queryScans() { return realKey ? realKey->queryScans() : 0; }
  1780. virtual unsigned querySeeks() { return realKey ? realKey->querySeeks() : 0; }
  1781. virtual const char *queryFileName() { return keyfile.get(); }
  1782. virtual offset_t queryBlobHead() { return checkOpen().queryBlobHead(); }
  1783. virtual void resetCounts() { if (realKey) realKey->resetCounts(); }
  1784. virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
  1785. virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
  1786. virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
  1787. virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
  1788. virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
  1789. virtual bool hasSpecialFileposition() const { return realKey ? realKey->hasSpecialFileposition() : false; }
  1790. };
  1791. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
  1792. {
  1793. return queryKeyStore()->load(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  1794. }
  1795. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, bool isTLK, bool preloadAllowed)
  1796. {
  1797. return queryKeyStore()->load(keyfile, crc, isTLK, preloadAllowed);
  1798. }
  1799. extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed)
  1800. {
  1801. StringBuffer filePath;
  1802. const RemoteFilename &rfn = part.queryCopies().item(0);
  1803. rfn.getPath(filePath);
  1804. return queryKeyStore()->load(filePath.str(), crc, part, isTLK, preloadAllowed);
  1805. }
  1806. extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, bool isTLK, bool preloadAllowed)
  1807. {
  1808. return new CLazyKeyIndex(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
  1809. }
  1810. extern jhtree_decl void clearKeyStoreCache(bool killAll)
  1811. {
  1812. queryKeyStore()->clearCache(killAll);
  1813. }
  1814. extern jhtree_decl void clearKeyStoreCacheEntry(const char *name)
  1815. {
  1816. queryKeyStore()->clearCacheEntry(name);
  1817. }
  1818. extern jhtree_decl void clearKeyStoreCacheEntry(const IFileIO *io)
  1819. {
  1820. queryKeyStore()->clearCacheEntry(io);
  1821. }
  1822. extern jhtree_decl StringBuffer &getIndexMetrics(StringBuffer &ret)
  1823. {
  1824. return queryKeyStore()->getMetrics(ret);
  1825. }
  1826. extern jhtree_decl void resetIndexMetrics()
  1827. {
  1828. queryKeyStore()->resetMetrics();
  1829. }
  1830. extern jhtree_decl bool isKeyFile(const char *filename)
  1831. {
  1832. OwnedIFile file = createIFile(filename);
  1833. OwnedIFileIO io = file->open(IFOread);
  1834. unsigned __int64 size = file->size();
  1835. if (size)
  1836. {
  1837. KeyHdr hdr;
  1838. if (io->read(0, sizeof(hdr), &hdr) == sizeof(hdr))
  1839. {
  1840. _WINREV(hdr.phyrec);
  1841. _WINREV(hdr.root);
  1842. _WINREV(hdr.nodeSize);
  1843. if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.root && hdr.root % hdr.nodeSize == 0)
  1844. {
  1845. NodeHdr root;
  1846. if (io->read(hdr.root, sizeof(root), &root) == sizeof(root))
  1847. {
  1848. _WINREV(root.rightSib);
  1849. _WINREV(root.leftSib);
  1850. return root.leftSib==0 && root.rightSib==0;
  1851. }
  1852. }
  1853. }
  1854. }
  1855. return false;
  1856. }
  1857. extern jhtree_decl bool setNodeCachePreload(bool preload)
  1858. {
  1859. return queryNodeCache()->setNodeCachePreload(preload);
  1860. }
  1861. extern jhtree_decl size32_t setNodeCacheMem(size32_t cacheSize)
  1862. {
  1863. return queryNodeCache()->setNodeCacheMem(cacheSize);
  1864. }
  1865. extern jhtree_decl size32_t setLeafCacheMem(size32_t cacheSize)
  1866. {
  1867. return queryNodeCache()->setLeafCacheMem(cacheSize);
  1868. }
  1869. extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize)
  1870. {
  1871. return queryNodeCache()->setBlobCacheMem(cacheSize);
  1872. }
  1873. ///////////////////////////////////////////////////////////////////////////////
  1874. // CNodeCache impl.
  1875. ///////////////////////////////////////////////////////////////////////////////
  1876. CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IContextLogger *ctx, bool isTLK)
  1877. {
  1878. // MORE - could probably be improved - I think having the cache template separate is not helping us here
  1879. // Also one cache per key would surely be faster, and could still use a global total
  1880. if (!pos)
  1881. return NULL;
  1882. {
  1883. // It's a shame that we don't know the type before we read it. But probably not that big a deal
  1884. CriticalBlock block(lock);
  1885. CKeyIdAndPos key(iD, pos);
  1886. if (preloadNodes)
  1887. {
  1888. CJHTreeNode *cacheNode = preloadCache.query(key);
  1889. if (cacheNode)
  1890. {
  1891. cacheHits++;
  1892. if (ctx) ctx->noteStatistic(StNumPreloadCacheHits, 1);
  1893. preloadCacheHits++;
  1894. return LINK(cacheNode);
  1895. }
  1896. }
  1897. if (cacheNodes)
  1898. {
  1899. CJHTreeNode *cacheNode = nodeCache.query(key);
  1900. if (cacheNode)
  1901. {
  1902. cacheHits++;
  1903. if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
  1904. nodeCacheHits++;
  1905. return LINK(cacheNode);
  1906. }
  1907. }
  1908. if (cacheLeaves)
  1909. {
  1910. CJHTreeNode *cacheNode = leafCache.query(key);
  1911. if (cacheNode)
  1912. {
  1913. cacheHits++;
  1914. if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
  1915. leafCacheHits++;
  1916. return LINK(cacheNode);
  1917. }
  1918. }
  1919. if (cacheBlobs)
  1920. {
  1921. CJHTreeNode *cacheNode = blobCache.query(key);
  1922. if (cacheNode)
  1923. {
  1924. cacheHits++;
  1925. if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
  1926. blobCacheHits++;
  1927. return LINK(cacheNode);
  1928. }
  1929. }
  1930. CJHTreeNode *node;
  1931. {
  1932. CriticalUnblock block(lock);
  1933. node = keyIndex->loadNode(pos); // NOTE - don't want cache locked while we load!
  1934. }
  1935. cacheAdds++;
  1936. if (node->isBlob())
  1937. {
  1938. if (cacheBlobs)
  1939. {
  1940. CJHTreeNode *cacheNode = blobCache.query(key); // check if added to cache while we were reading
  1941. if (cacheNode)
  1942. {
  1943. ::Release(node);
  1944. cacheHits++;
  1945. if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
  1946. blobCacheHits++;
  1947. return LINK(cacheNode);
  1948. }
  1949. if (ctx) ctx->noteStatistic(StNumBlobCacheAdds, 1);
  1950. blobCacheAdds++;
  1951. blobCache.add(key, *LINK(node));
  1952. }
  1953. }
  1954. else if (node->isLeaf() && !isTLK) // leaves in TLK are cached as if they were nodes
  1955. {
  1956. if (cacheLeaves)
  1957. {
  1958. CJHTreeNode *cacheNode = leafCache.query(key); // check if added to cache while we were reading
  1959. if (cacheNode)
  1960. {
  1961. ::Release(node);
  1962. cacheHits++;
  1963. if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
  1964. leafCacheHits++;
  1965. return LINK(cacheNode);
  1966. }
  1967. if (ctx) ctx->noteStatistic(StNumLeafCacheAdds, 1);
  1968. leafCacheAdds++;
  1969. leafCache.add(key, *LINK(node));
  1970. }
  1971. }
  1972. else
  1973. {
  1974. if (cacheNodes)
  1975. {
  1976. CJHTreeNode *cacheNode = nodeCache.query(key); // check if added to cache while we were reading
  1977. if (cacheNode)
  1978. {
  1979. ::Release(node);
  1980. cacheHits++;
  1981. if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
  1982. nodeCacheHits++;
  1983. return LINK(cacheNode);
  1984. }
  1985. if (ctx) ctx->noteStatistic(StNumNodeCacheAdds, 1);
  1986. nodeCacheAdds++;
  1987. nodeCache.add(key, *LINK(node));
  1988. }
  1989. }
  1990. return node;
  1991. }
  1992. }
  1993. void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger *ctx)
  1994. {
  1995. assertex(pos);
  1996. assertex(preloadNodes);
  1997. CriticalBlock block(lock);
  1998. CKeyIdAndPos key(iD, pos);
  1999. CJHTreeNode *cacheNode = preloadCache.query(key);
  2000. if (!cacheNode)
  2001. {
  2002. cacheAdds++;
  2003. if (ctx) ctx->noteStatistic(StNumPreloadCacheAdds, 1);
  2004. preloadCacheAdds++;
  2005. preloadCache.add(key, *LINK(node));
  2006. }
  2007. }
  2008. bool CNodeCache::isPreloaded(int iD, offset_t pos)
  2009. {
  2010. CriticalBlock block(lock);
  2011. CKeyIdAndPos key(iD, pos);
  2012. return NULL != preloadCache.query(key);
  2013. }
  2014. RelaxedAtomic<unsigned> cacheAdds;
  2015. RelaxedAtomic<unsigned> cacheHits;
  2016. RelaxedAtomic<unsigned> nodesLoaded;
  2017. RelaxedAtomic<unsigned> blobCacheHits;
  2018. RelaxedAtomic<unsigned> blobCacheAdds;
  2019. RelaxedAtomic<unsigned> leafCacheHits;
  2020. RelaxedAtomic<unsigned> leafCacheAdds;
  2021. RelaxedAtomic<unsigned> nodeCacheHits;
  2022. RelaxedAtomic<unsigned> nodeCacheAdds;
  2023. RelaxedAtomic<unsigned> preloadCacheHits;
  2024. RelaxedAtomic<unsigned> preloadCacheAdds;
  2025. void clearNodeStats()
  2026. {
  2027. cacheAdds.store(0);
  2028. cacheHits.store(0);
  2029. nodesLoaded.store(0);
  2030. blobCacheHits.store(0);
  2031. blobCacheAdds.store(0);
  2032. leafCacheHits.store(0);
  2033. leafCacheAdds.store(0);
  2034. nodeCacheHits.store(0);
  2035. nodeCacheAdds.store(0);
  2036. preloadCacheHits.store(0);
  2037. preloadCacheAdds.store(0);
  2038. }
  2039. //------------------------------------------------------------------------------------------------
  2040. class CKeyMerger : public CKeyLevelManager
  2041. {
  2042. unsigned *mergeheap;
  2043. unsigned numkeys;
  2044. unsigned activekeys;
  2045. unsigned compareSize = 0;
  2046. IArrayOf<IKeyCursor> cursorArray;
  2047. PointerArray bufferArray;
  2048. PointerArray fixedArray;
  2049. BoolArray matchedArray;
  2050. UnsignedArray mergeHeapArray;
  2051. UnsignedArray keyNoArray;
  2052. IKeyCursor **cursors;
  2053. char **buffers;
  2054. void **fixeds;
  2055. bool *matcheds;
  2056. unsigned sortFieldOffset;
  2057. unsigned sortFromSeg;
  2058. bool resetPending;
  2059. // #define BuffCompare(a, b) memcmp(buffers[mergeheap[a]]+sortFieldOffset, buffers[mergeheap[b]]+sortFieldOffset, keySize-sortFieldOffset) // NOTE - compare whole key not just keyed part.
  2060. inline int BuffCompare(unsigned a, unsigned b)
  2061. {
  2062. const char *c1 = buffers[mergeheap[a]];
  2063. const char *c2 = buffers[mergeheap[b]];
  2064. //Backwards compatibility - do not compare the fileposition field, even if it would be significant.
  2065. //int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, keySize-sortFieldOffset); // NOTE - compare whole key not just keyed part.
  2066. int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, compareSize-sortFieldOffset); // NOTE - compare whole key not just keyed part.
  2067. if (!ret && sortFieldOffset)
  2068. ret = memcmp(c1, c2, sortFieldOffset);
  2069. return ret;
  2070. }
  2071. Linked<IKeyIndexBase> keyset;
  2072. void resetKey(unsigned i)
  2073. {
  2074. matcheds[i] = false;
  2075. segs.setLow(sortFromSeg, buffers[i]);
  2076. IKeyCursor *cursor = cursors[i];
  2077. if (cursor)
  2078. cursor->reset();
  2079. }
  2080. void replicateForTrailingSort()
  2081. {
  2082. // For each key that is present, we need to repeat it for each distinct value of leading segmonitor fields that is present in the index.
  2083. // We also need to make sure that sortFromSeg is properly set
  2084. sortFromSeg = (unsigned) -1;
  2085. ForEachItemIn(idx, segs.segMonitors)
  2086. {
  2087. IKeySegmentMonitor &seg = segs.segMonitors.item(idx);
  2088. unsigned offset = seg.getOffset();
  2089. if (offset == sortFieldOffset)
  2090. {
  2091. sortFromSeg = idx;
  2092. break;
  2093. }
  2094. IKeySegmentMonitor *override = createOverrideableKeySegmentMonitor(LINK(&seg));
  2095. segs.segMonitors.replace(*override, idx);
  2096. }
  2097. if (sortFromSeg == -1)
  2098. assertex(!"Attempting to sort from offset that is not on a segment boundary"); // MORE - can use the information that we have earlier to make sure not merged
  2099. assertex(resetPending == true); // we do the actual replication in reset
  2100. }
  2101. void dumpMergeHeap()
  2102. {
  2103. DBGLOG("---------");
  2104. for (unsigned i = 0; i < activekeys; i++)
  2105. {
  2106. int key = mergeheap[i];
  2107. DBGLOG("%d %.*s %d", key, keySize, buffers[key], matcheds[key]);
  2108. }
  2109. }
  2110. inline void setSegOverrides(unsigned key)
  2111. {
  2112. keyBuffer = buffers[key];
  2113. keyCursor = cursors[key];
  2114. matched = matcheds[key];
  2115. for (unsigned segno = 0; segno < sortFromSeg; segno++)
  2116. {
  2117. IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
  2118. assertex(sm);
  2119. sm->setOverrideBuffer(fixeds[key]);
  2120. }
  2121. segs.recalculateCache();
  2122. }
  2123. public:
  2124. CKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet *_keyset, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
  2125. {
  2126. init();
  2127. setKey(_keyset);
  2128. }
  2129. CKeyMerger(const RtlRecord &_recInfo, IKeyIndex *_onekey, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
  2130. {
  2131. init();
  2132. setKey(_onekey);
  2133. }
  2134. ~CKeyMerger()
  2135. {
  2136. killBuffers();
  2137. keyCursor = NULL; // so parent class doesn't delete it!
  2138. keyBuffer = NULL; // ditto
  2139. }
  2140. void killBuffers()
  2141. {
  2142. ForEachItemIn(idx, bufferArray)
  2143. {
  2144. free(bufferArray.item(idx));
  2145. }
  2146. ForEachItemIn(idx1, fixedArray)
  2147. {
  2148. free(fixedArray.item(idx1));
  2149. }
  2150. cursorArray.kill();
  2151. keyCursor = NULL; // cursorArray owns cursors
  2152. matchedArray.kill();
  2153. mergeHeapArray.kill();
  2154. bufferArray.kill();
  2155. fixedArray.kill();
  2156. keyNoArray.kill();
  2157. cursors = NULL;
  2158. matcheds = NULL;
  2159. mergeheap = NULL;
  2160. buffers = NULL;
  2161. fixeds = NULL;
  2162. }
  2163. void init()
  2164. {
  2165. numkeys = 0;
  2166. activekeys = 0;
  2167. resetPending = true;
  2168. sortFromSeg = 0;
  2169. }
  2170. virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
  2171. {
  2172. // Rather like a lookup, except that no records below the value indicated by seek* should be returned.
  2173. if (resetPending)
  2174. {
  2175. resetSort(seek, seekOffset, seeklen);
  2176. if (!activekeys)
  2177. return false;
  2178. #ifdef _DEBUG
  2179. if (traceSmartStepping)
  2180. DBGLOG("SKIP: init key = %d", mergeheap[0]);
  2181. #endif
  2182. return true;
  2183. }
  2184. else
  2185. {
  2186. if (!activekeys)
  2187. {
  2188. #ifdef _DEBUG
  2189. if (traceSmartStepping)
  2190. DBGLOG("SKIP: merge done");
  2191. #endif
  2192. return false;
  2193. }
  2194. unsigned key = mergeheap[0];
  2195. #ifdef _DEBUG
  2196. if (traceSmartStepping)
  2197. DBGLOG("SKIP: merging key = %d", key);
  2198. #endif
  2199. unsigned compares = 0;
  2200. for (;;)
  2201. {
  2202. if (!CKeyLevelManager::lookupSkip(seek, seekOffset, seeklen) )
  2203. {
  2204. activekeys--;
  2205. if (!activekeys)
  2206. {
  2207. if (ctx)
  2208. ctx->noteStatistic(StNumIndexMergeCompares, compares);
  2209. return false;
  2210. }
  2211. eof = false;
  2212. mergeheap[0] = mergeheap[activekeys];
  2213. }
  2214. /* The key associated with mergeheap[0] will have changed
  2215. This code restores the heap property
  2216. */
  2217. unsigned p = 0; /* parent */
  2218. while (1)
  2219. {
  2220. unsigned c = p*2 + 1; /* child */
  2221. if ( c >= activekeys )
  2222. break;
  2223. /* Select smaller child */
  2224. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2225. /* If child is greater or equal than parent then we are done */
  2226. if ( BuffCompare( c, p ) >= 0 )
  2227. break;
  2228. /* Swap parent and child */
  2229. int r = mergeheap[c];
  2230. mergeheap[c] = mergeheap[p];
  2231. mergeheap[p] = r;
  2232. /* child becomes parent */
  2233. p = c;
  2234. }
  2235. if (key != mergeheap[0])
  2236. {
  2237. key = mergeheap[0];
  2238. setSegOverrides(key);
  2239. }
  2240. if (memcmp(seek, keyBuffer+seekOffset, seeklen) <= 0)
  2241. {
  2242. #ifdef _DEBUG
  2243. if (traceSmartStepping)
  2244. {
  2245. DBGLOG("SKIP: merged key = %d", key);
  2246. StringBuffer recstr;
  2247. unsigned i;
  2248. for (i = 0; i < keySize; i++)
  2249. {
  2250. unsigned char c = ((unsigned char *) keyBuffer)[i];
  2251. recstr.appendf("%c", isprint(c) ? c : '.');
  2252. }
  2253. recstr.append (" ");
  2254. for (i = 0; i < keySize; i++)
  2255. {
  2256. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  2257. }
  2258. DBGLOG("SKIP: Out skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
  2259. }
  2260. #endif
  2261. if (ctx)
  2262. ctx->noteStatistic(StNumIndexMergeCompares, compares);
  2263. return true;
  2264. }
  2265. else
  2266. {
  2267. compares++;
  2268. if (ctx && (compares == 100))
  2269. {
  2270. ctx->noteStatistic(StNumIndexMergeCompares, compares); // also checks for abort...
  2271. compares = 0;
  2272. }
  2273. }
  2274. }
  2275. }
  2276. }
  2277. virtual void setLayoutTranslator(const IDynamicTransform * trans) override
  2278. {
  2279. if (trans)
  2280. throw MakeStringException(0, "Layout translation not supported when merging key parts, as it may change sort order");
  2281. // It MIGHT be possible to support translation still if all keyCursors have the same translation
  2282. // would have to translate AFTER the merge, but that's ok
  2283. // HOWEVER the result won't be guaranteed to be in sorted order afterwards so is there any point?
  2284. }
  2285. virtual void setKey(IKeyIndexBase *_keyset)
  2286. {
  2287. keyset.set(_keyset);
  2288. if (_keyset && _keyset->numParts())
  2289. {
  2290. IKeyIndex *ki = _keyset->queryPart(0);
  2291. keySize = ki->keySize();
  2292. if (!keySize)
  2293. throw MakeStringException(0, "Invalid key size 0 in key %s", ki->queryFileName());
  2294. keyedSize = ki->keyedSize();
  2295. numkeys = _keyset->numParts();
  2296. compareSize = keySize - (ki->hasSpecialFileposition() ? sizeof(offset_t) : 0);
  2297. }
  2298. else
  2299. numkeys = 0;
  2300. killBuffers();
  2301. }
  2302. void resetSort(const void *seek, size32_t seekOffset, size32_t seeklen)
  2303. {
  2304. activekeys = 0;
  2305. keyBuffer = NULL;
  2306. void *fixedValue = NULL;
  2307. unsigned segno;
  2308. for (segno = 0; segno < sortFromSeg; segno++)
  2309. {
  2310. IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
  2311. assertex(sm);
  2312. sm->setOverrideBuffer(NULL);
  2313. }
  2314. segs.recalculateCache();
  2315. unsigned i;
  2316. for (i = 0; i < numkeys; i++)
  2317. {
  2318. assertex(keySize);
  2319. if (!keyBuffer) keyBuffer = (char *) malloc(keySize);
  2320. segs.setLow(0, keyBuffer);
  2321. for (;;)
  2322. {
  2323. keyCursor = keyset->queryPart(i)->getCursor(ctx);
  2324. matched = false;
  2325. eof = false;
  2326. keyCursor->reset();
  2327. bool found;
  2328. unsigned lskips = 0;
  2329. unsigned lnullSkips = 0;
  2330. for (;;)
  2331. {
  2332. if (seek)
  2333. {
  2334. if (skipTo(seek, seekOffset, seeklen))
  2335. lskips++;
  2336. else
  2337. lnullSkips++;
  2338. }
  2339. found = _lookup(true, segs.lastRealSeg());
  2340. if (!found || !seek || memcmp(keyBuffer + seekOffset, seek, seeklen) >= 0)
  2341. break;
  2342. }
  2343. noteSkips(lskips, lnullSkips);
  2344. if (found)
  2345. {
  2346. keyNoArray.append(i);
  2347. cursorArray.append(*keyCursor);
  2348. bufferArray.append(keyBuffer);
  2349. matchedArray.append(matched);
  2350. mergeHeapArray.append(activekeys++);
  2351. if (!sortFromSeg)
  2352. {
  2353. keyBuffer = NULL;
  2354. fixedValue = NULL;
  2355. break;
  2356. }
  2357. assertex(sortFieldOffset);
  2358. void *fixedValue = malloc(sortFieldOffset);
  2359. memcpy(fixedValue, keyBuffer, sortFieldOffset);
  2360. fixedArray.append(fixedValue);
  2361. #ifdef _DEBUG
  2362. if (traceSmartStepping)
  2363. {
  2364. StringBuffer recstr;
  2365. unsigned i;
  2366. for (i = 0; i < sortFieldOffset; i++)
  2367. {
  2368. unsigned char c = ((unsigned char *) keyBuffer)[i];
  2369. recstr.appendf("%c", isprint(c) ? c : '.');
  2370. }
  2371. recstr.append (" ");
  2372. for (i = 0; i < keySize; i++)
  2373. {
  2374. recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
  2375. }
  2376. DBGLOG("Adding key cursor info for %s", recstr.str());
  2377. }
  2378. #endif
  2379. // Now advance segments 0 through sortFromSeg-1 to next legal value...
  2380. assertex(keySize);
  2381. char *nextBuffer = (char *) malloc(keySize);
  2382. memcpy(nextBuffer, keyBuffer, keySize);
  2383. keyBuffer = nextBuffer;
  2384. #ifdef _DEBUG
  2385. assertex(segs.segMonitors.item(sortFromSeg-1).matchesBuffer(keyBuffer));
  2386. #endif
  2387. if (!segs.incrementKey(sortFromSeg-1, keyBuffer))
  2388. break;
  2389. }
  2390. else
  2391. {
  2392. keyCursor->Release();
  2393. break;
  2394. }
  2395. }
  2396. }
  2397. free (keyBuffer);
  2398. keyBuffer = NULL;
  2399. if (activekeys>0)
  2400. {
  2401. if (ctx)
  2402. ctx->noteStatistic(StNumIndexMerges, activekeys);
  2403. cursors = cursorArray.getArray();
  2404. matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
  2405. buffers = (char **) bufferArray.getArray();
  2406. mergeheap = mergeHeapArray.getArray();
  2407. fixeds = fixedArray.getArray();
  2408. /* Permute mergeheap to establish the heap property
  2409. For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
  2410. The children of p must both be greater than or equal to p
  2411. The parent of a child c is given by p = (c-1)/2
  2412. */
  2413. for (i=1; i<activekeys; i++)
  2414. {
  2415. int r = mergeheap[i];
  2416. int c = i; /* child */
  2417. while (c > 0)
  2418. {
  2419. int p = (c-1)/2; /* parent */
  2420. if ( BuffCompare( c, p ) >= 0 )
  2421. break;
  2422. mergeheap[c] = mergeheap[p];
  2423. mergeheap[p] = r;
  2424. c = p;
  2425. }
  2426. }
  2427. setSegOverrides(mergeheap[0]);
  2428. eof = false;
  2429. }
  2430. else
  2431. {
  2432. keyBuffer = NULL;
  2433. keyCursor = NULL;
  2434. matched = false;
  2435. eof = true;
  2436. }
  2437. resetPending = false;
  2438. }
  2439. virtual void reset(bool crappyHack)
  2440. {
  2441. if (!started)
  2442. {
  2443. started = true;
  2444. if (keyedSize)
  2445. segs.checkSize(keyedSize, "[merger]"); //PG: not sure what keyname to use here
  2446. }
  2447. if (!crappyHack)
  2448. {
  2449. killBuffers();
  2450. resetPending = true;
  2451. }
  2452. else
  2453. {
  2454. setSegOverrides(mergeheap[0]);
  2455. resetPending = false;
  2456. }
  2457. }
  2458. virtual bool lookup(bool exact)
  2459. {
  2460. assertex(exact);
  2461. if (resetPending)
  2462. {
  2463. resetSort(NULL, 0, 0);
  2464. if (!activekeys)
  2465. return false;
  2466. }
  2467. else
  2468. {
  2469. if (!activekeys)
  2470. return false;
  2471. unsigned key = mergeheap[0];
  2472. if (!CKeyLevelManager::lookup(exact))
  2473. {
  2474. activekeys--;
  2475. if (!activekeys)
  2476. return false; // MORE - does this lose a record?
  2477. eof = false;
  2478. mergeheap[0] = mergeheap[activekeys];
  2479. }
  2480. /* The key associated with mergeheap[0] will have changed
  2481. This code restores the heap property
  2482. */
  2483. unsigned p = 0; /* parent */
  2484. while (1)
  2485. {
  2486. unsigned c = p*2 + 1; /* child */
  2487. if ( c >= activekeys )
  2488. break;
  2489. /* Select smaller child */
  2490. if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
  2491. /* If child is greater or equal than parent then we are done */
  2492. if ( BuffCompare( c, p ) >= 0 )
  2493. break;
  2494. /* Swap parent and child */
  2495. int r = mergeheap[c];
  2496. mergeheap[c] = mergeheap[p];
  2497. mergeheap[p] = r;
  2498. /* child becomes parent */
  2499. p = c;
  2500. }
  2501. // dumpMergeHeap();
  2502. if (mergeheap[0] != key)
  2503. setSegOverrides(mergeheap[0]);
  2504. }
  2505. return true;
  2506. }
  2507. virtual unsigned __int64 getCount()
  2508. {
  2509. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2510. // as the sequence numbers are not in sequence
  2511. unsigned __int64 ret = 0;
  2512. if (resetPending)
  2513. resetSort(NULL, 0, 0); // This is slightly suboptimal
  2514. for (unsigned i = 0; i < activekeys; i++)
  2515. {
  2516. unsigned key = mergeheap[i];
  2517. keyBuffer = buffers[key];
  2518. keyCursor = cursors[key];
  2519. ret += CKeyLevelManager::getCount();
  2520. }
  2521. return ret;
  2522. }
  2523. virtual unsigned __int64 checkCount(unsigned __int64 max)
  2524. {
  2525. assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
  2526. // as the sequence numbers are not in sequence
  2527. unsigned __int64 ret = 0;
  2528. if (resetPending)
  2529. resetSort(NULL, 0, 0); // this is a little suboptimal as we will not bail out early
  2530. for (unsigned i = 0; i < activekeys; i++)
  2531. {
  2532. unsigned key = mergeheap[i];
  2533. keyBuffer = buffers[key];
  2534. keyCursor = cursors[key];
  2535. unsigned __int64 thisKeyCount = CKeyLevelManager::checkCount(max);
  2536. ret += thisKeyCount;
  2537. if (thisKeyCount > max)
  2538. return ret;
  2539. max -= thisKeyCount;
  2540. }
  2541. return ret;
  2542. }
  2543. virtual void serializeCursorPos(MemoryBuffer &mb)
  2544. {
  2545. // dumpMergeHeap();
  2546. mb.append(eof);
  2547. mb.append(activekeys);
  2548. for (unsigned i = 0; i < activekeys; i++)
  2549. {
  2550. unsigned key = mergeheap[i];
  2551. mb.append(keyNoArray.item(key));
  2552. cursors[key]->serializeCursorPos(mb);
  2553. mb.append(matcheds[key]);
  2554. }
  2555. }
  2556. virtual void deserializeCursorPos(MemoryBuffer &mb)
  2557. {
  2558. mb.read(eof);
  2559. mb.read(activekeys);
  2560. for (unsigned i = 0; i < activekeys; i++)
  2561. {
  2562. unsigned keyno;
  2563. mb.read(keyno);
  2564. keyNoArray.append(keyno);
  2565. keyCursor = keyset->queryPart(keyno)->getCursor(ctx);
  2566. assertex(keySize);
  2567. keyBuffer = (char *) malloc(keySize);
  2568. cursorArray.append(*keyCursor);
  2569. keyCursor->deserializeCursorPos(mb, keyBuffer);
  2570. mb.read(matched);
  2571. matchedArray.append(matched);
  2572. bufferArray.append(keyBuffer);
  2573. void *fixedValue = (char *) malloc(sortFieldOffset);
  2574. memcpy(fixedValue, keyBuffer, sortFieldOffset); // If it's not at EOF then it must match
  2575. fixedArray.append(fixedValue);
  2576. mergeHeapArray.append(i);
  2577. }
  2578. cursors = cursorArray.getArray();
  2579. matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
  2580. buffers = (char **) bufferArray.getArray();
  2581. mergeheap = mergeHeapArray.getArray();
  2582. fixeds = fixedArray.getArray();
  2583. }
  2584. virtual void finishSegmentMonitors()
  2585. {
  2586. CKeyLevelManager::finishSegmentMonitors();
  2587. if (sortFieldOffset)
  2588. {
  2589. if (keyedSize)
  2590. segs.checkSize(keyedSize, "[merger]"); // Ensures trailing KSM is setup
  2591. replicateForTrailingSort();
  2592. }
  2593. }
  2594. };
  2595. extern jhtree_decl IKeyManager *createKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet * _keys, unsigned _sortFieldOffset, IContextLogger *_ctx)
  2596. {
  2597. return new CKeyMerger(_recInfo, _keys, _sortFieldOffset, _ctx);
  2598. }
  2599. extern jhtree_decl IKeyManager *createSingleKeyMerger(const RtlRecord &_recInfo, IKeyIndex * _onekey, unsigned _sortFieldOffset, IContextLogger *_ctx)
  2600. {
  2601. return new CKeyMerger(_recInfo, _onekey, _sortFieldOffset, _ctx);
  2602. }
  2603. class CKeyIndexSet : implements IKeyIndexSet, public CInterface
  2604. {
  2605. IPointerArrayOf<IKeyIndex> indexes;
  2606. offset_t recordCount = 0;
  2607. offset_t totalSize = 0;
  2608. StringAttr origFileName;
  2609. public:
  2610. IMPLEMENT_IINTERFACE;
  2611. virtual bool IsShared() const { return CInterface::IsShared(); }
  2612. void addIndex(IKeyIndex *i) { indexes.append(i); }
  2613. virtual unsigned numParts() { return indexes.length(); }
  2614. virtual IKeyIndex *queryPart(unsigned partNo) { return indexes.item(partNo); }
  2615. virtual void setRecordCount(offset_t count) { recordCount = count; }
  2616. virtual void setTotalSize(offset_t size) { totalSize = size; }
  2617. virtual offset_t getRecordCount() { return recordCount; }
  2618. virtual offset_t getTotalSize() { return totalSize; }
  2619. };
  2620. extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
  2621. {
  2622. return new CKeyIndexSet;
  2623. }
  2624. extern jhtree_decl IKeyManager *createLocalKeyManager(const RtlRecord &_recInfo, IKeyIndex *_key, IContextLogger *_ctx)
  2625. {
  2626. return new CKeyLevelManager(_recInfo, _key, _ctx);
  2627. }
  2628. class CKeyArray : implements IKeyArray, public CInterface
  2629. {
  2630. public:
  2631. IMPLEMENT_IINTERFACE;
  2632. virtual bool IsShared() const { return CInterface::IsShared(); }
  2633. IPointerArrayOf<IKeyIndexBase> keys;
  2634. virtual IKeyIndexBase *queryKeyPart(unsigned partNo)
  2635. {
  2636. if (!keys.isItem(partNo))
  2637. {
  2638. return NULL;
  2639. }
  2640. IKeyIndexBase *key = keys.item(partNo);
  2641. return key;
  2642. }
  2643. virtual unsigned length() { return keys.length(); }
  2644. void addKey(IKeyIndexBase *f) { keys.append(f); }
  2645. };
  2646. extern jhtree_decl IKeyArray *createKeyArray()
  2647. {
  2648. return new CKeyArray;
  2649. }
  2650. #ifdef _USE_CPPUNIT
  2651. #include "unittests.hpp"
  2652. class IKeyManagerTest : public CppUnit::TestFixture
  2653. {
  2654. CPPUNIT_TEST_SUITE( IKeyManagerTest );
  2655. CPPUNIT_TEST(testStepping);
  2656. CPPUNIT_TEST(testKeys);
  2657. CPPUNIT_TEST_SUITE_END();
  2658. void testStepping()
  2659. {
  2660. buildTestKeys(false);
  2661. {
  2662. // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
  2663. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2664. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  2665. Owned<IKeyIndexSet> keyset = createKeyIndexSet();
  2666. keyset->addIndex(index1.getClear());
  2667. keyset->addIndex(index2.getClear());
  2668. const char *json = "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
  2669. " \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
  2670. " \"fieldType\": 13, \"length\": 10, "
  2671. " \"fields\": [ "
  2672. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2673. " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 } ] "
  2674. "}";
  2675. Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
  2676. Owned <IKeyManager> tlk1 = createKeyMerger(meta->queryRecordAccessor(true), keyset, 7, NULL);
  2677. Owned<IStringSet> sset1 = createStringSet(7);
  2678. sset1->addRange("0000003", "0000003");
  2679. sset1->addRange("0000005", "0000006");
  2680. tlk1->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2681. Owned<IStringSet> sset2 = createStringSet(3);
  2682. sset2->addRange("010", "010");
  2683. sset2->addRange("030", "033");
  2684. Owned<IStringSet> sset3 = createStringSet(3);
  2685. sset3->addRange("999", "XXX");
  2686. sset3->addRange("000", "002");
  2687. tlk1->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2688. tlk1->finishSegmentMonitors();
  2689. tlk1->reset();
  2690. offset_t fpos;
  2691. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003010", 10)==0);
  2692. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005010", 10)==0);
  2693. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006010", 10)==0);
  2694. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003030", 10)==0);
  2695. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005030", 10)==0);
  2696. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006030", 10)==0);
  2697. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003031", 10)==0);
  2698. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005031", 10)==0);
  2699. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006031", 10)==0);
  2700. MemoryBuffer mb;
  2701. tlk1->serializeCursorPos(mb);
  2702. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003032", 10)==0);
  2703. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005032", 10)==0);
  2704. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006032", 10)==0);
  2705. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003033", 10)==0);
  2706. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005033", 10)==0);
  2707. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006033", 10)==0);
  2708. ASSERT(!tlk1->lookup(true));
  2709. ASSERT(!tlk1->lookup(true));
  2710. Owned <IKeyManager> tlk2 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
  2711. tlk2->setKey(keyset);
  2712. tlk2->deserializeCursorPos(mb);
  2713. tlk2->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2714. tlk2->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2715. tlk2->finishSegmentMonitors();
  2716. tlk2->reset(true);
  2717. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003032", 10)==0);
  2718. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005032", 10)==0);
  2719. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006032", 10)==0);
  2720. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003033", 10)==0);
  2721. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005033", 10)==0);
  2722. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006033", 10)==0);
  2723. ASSERT(!tlk2->lookup(true));
  2724. ASSERT(!tlk2->lookup(true));
  2725. Owned <IKeyManager> tlk3 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
  2726. tlk3->setKey(keyset);
  2727. tlk3->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2728. tlk3->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
  2729. tlk3->finishSegmentMonitors();
  2730. tlk3->reset(false);
  2731. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003010", 10)==0);
  2732. ASSERT(tlk3->lookupSkip("031", 7, 3)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003031", 10)==0);
  2733. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000005031", 10)==0);
  2734. ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000006031", 10)==0);
  2735. ASSERT(!tlk3->lookupSkip("081", 7, 3));
  2736. ASSERT(!tlk3->lookup(true));
  2737. Owned <IKeyManager> tlk4 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
  2738. tlk4->setKey(keyset);
  2739. tlk4->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
  2740. tlk4->append(createKeySegmentMonitor(false, sset3.getLink(), 1, 7, 3));
  2741. tlk4->finishSegmentMonitors();
  2742. tlk4->reset(false);
  2743. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003000", 10)==0);
  2744. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005000", 10)==0);
  2745. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006000", 10)==0);
  2746. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003001", 10)==0);
  2747. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005001", 10)==0);
  2748. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006001", 10)==0);
  2749. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003002", 10)==0);
  2750. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005002", 10)==0);
  2751. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006002", 10)==0);
  2752. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003999", 10)==0);
  2753. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005999", 10)==0);
  2754. ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006999", 10)==0);
  2755. ASSERT(!tlk4->lookup(true));
  2756. ASSERT(!tlk4->lookup(true));
  2757. }
  2758. clearKeyStoreCache(true);
  2759. removeTestKeys();
  2760. }
  2761. void buildTestKeys(bool variable)
  2762. {
  2763. buildTestKey("keyfile1.$$$", false, variable);
  2764. buildTestKey("keyfile2.$$$", true, variable);
  2765. }
  2766. void buildTestKey(const char *filename, bool skip, bool variable)
  2767. {
  2768. OwnedIFile file = createIFile(filename);
  2769. OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
  2770. Owned<IFileIOStream> out = createIOStream(io);
  2771. unsigned maxRecSize = variable ? 18 : 10;
  2772. unsigned keyedSize = 10;
  2773. Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY | (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0);
  2774. char keybuf[18];
  2775. memset(keybuf, '0', 18);
  2776. for (unsigned count = 0; count < 10000; count++)
  2777. {
  2778. unsigned datasize = 10;
  2779. if (variable && (count % 10)==0)
  2780. {
  2781. char *blob = new char[count+100000];
  2782. byte seed = count;
  2783. for (unsigned i = 0; i < count+100000; i++)
  2784. {
  2785. blob[i] = seed;
  2786. seed = seed * 13 + i;
  2787. }
  2788. offset_t blobid = builder->createBlob(count+100000, blob);
  2789. memcpy(keybuf + 10, &blobid, sizeof(blobid));
  2790. delete [] blob;
  2791. datasize += sizeof(blobid);
  2792. }
  2793. bool skipme = (count % 4 == 0) != skip;
  2794. if (!skipme)
  2795. {
  2796. builder->processKeyData(keybuf, count*10, datasize);
  2797. if (count==48 || count==49)
  2798. builder->processKeyData(keybuf, count*10, datasize);
  2799. }
  2800. unsigned idx = 9;
  2801. for (;;)
  2802. {
  2803. if (keybuf[idx]=='9')
  2804. keybuf[idx--]='0';
  2805. else
  2806. {
  2807. keybuf[idx]++;
  2808. break;
  2809. }
  2810. }
  2811. }
  2812. builder->finish(nullptr, nullptr);
  2813. out->flush();
  2814. }
  2815. void removeTestKeys()
  2816. {
  2817. ASSERT(remove("keyfile1.$$$")==0);
  2818. ASSERT(remove("keyfile2.$$$")==0);
  2819. }
  2820. void checkBlob(IKeyManager *key, unsigned size)
  2821. {
  2822. unsigned __int64 blobid;
  2823. memcpy(&blobid, key->queryKeyBuffer()+10, sizeof(blobid));
  2824. ASSERT(blobid != 0);
  2825. size32_t blobsize;
  2826. const byte *blob = key->loadBlob(blobid, blobsize);
  2827. ASSERT(blob != NULL);
  2828. ASSERT(blobsize == size);
  2829. byte seed = size-100000;
  2830. for (unsigned i = 0; i < size; i++)
  2831. {
  2832. ASSERT(blob[i] == seed);
  2833. seed = seed * 13 + i;
  2834. }
  2835. key->releaseBlobs();
  2836. }
  2837. protected:
  2838. void testKeys(bool variable)
  2839. {
  2840. const char *json = variable ?
  2841. "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
  2842. " \"ty2\": { \"fieldType\": 15, \"length\": 8 }, "
  2843. " \"fieldType\": 13, \"length\": 10, "
  2844. " \"fields\": [ "
  2845. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2846. " { \"name\": \"f3\", \"type\": \"ty2\", \"flags\": 65551 } " // 0x01000f i.e. payload and blob
  2847. " ]"
  2848. "}"
  2849. :
  2850. "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
  2851. " \"fieldType\": 13, \"length\": 10, "
  2852. " \"fields\": [ "
  2853. " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
  2854. " ] "
  2855. "}";
  2856. Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
  2857. const RtlRecord &recInfo = meta->queryRecordAccessor(true);
  2858. buildTestKeys(variable);
  2859. {
  2860. Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
  2861. Owned <IKeyManager> tlk1 = createLocalKeyManager(recInfo, index1, NULL);
  2862. Owned<IStringSet> sset1 = createStringSet(10);
  2863. sset1->addRange("0000000001", "0000000100");
  2864. tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 0, 10));
  2865. tlk1->finishSegmentMonitors();
  2866. tlk1->reset();
  2867. Owned <IKeyManager> tlk1a = createLocalKeyManager(recInfo, index1, NULL);
  2868. Owned<IStringSet> sset1a = createStringSet(8);
  2869. sset1a->addRange("00000000", "00000001");
  2870. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 0, 8));
  2871. tlk1a->append(createKeySegmentMonitor(false, NULL, 1, 8, 1));
  2872. sset1a.setown(createStringSet(1));
  2873. sset1a->addRange("0", "1");
  2874. tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 2, 9, 1));
  2875. tlk1a->finishSegmentMonitors();
  2876. tlk1a->reset();
  2877. Owned<IStringSet> ssetx = createStringSet(10);
  2878. ssetx->addRange("0000000001", "0000000002");
  2879. ASSERT(ssetx->numValues() == 2);
  2880. ssetx->addRange("00000000AK", "00000000AL");
  2881. ASSERT(ssetx->numValues() == 4);
  2882. ssetx->addRange("0000000100", "0010000000");
  2883. ASSERT(ssetx->numValues() == (unsigned) -1);
  2884. ssetx->addRange("0000000001", "0010000000");
  2885. ASSERT(ssetx->numValues() == (unsigned) -1);
  2886. Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
  2887. Owned <IKeyManager> tlk2 = createLocalKeyManager(recInfo, index2, NULL);
  2888. Owned<IStringSet> sset2 = createStringSet(10);
  2889. sset2->addRange("0000000001", "0000000100");
  2890. ASSERT(sset2->numValues() == 65536);
  2891. tlk2->append(createKeySegmentMonitor(false, sset2.getClear(), 0, 0, 10));
  2892. tlk2->finishSegmentMonitors();
  2893. tlk2->reset();
  2894. Owned <IKeyManager> tlk3;
  2895. if (!variable)
  2896. {
  2897. Owned<IKeyIndexSet> both = createKeyIndexSet();
  2898. both->addIndex(index1.getLink());
  2899. both->addIndex(index2.getLink());
  2900. Owned<IStringSet> sset3 = createStringSet(10);
  2901. tlk3.setown(createKeyMerger(recInfo, NULL, 0, NULL));
  2902. tlk3->setKey(both);
  2903. sset3->addRange("0000000001", "0000000100");
  2904. tlk3->append(createKeySegmentMonitor(false, sset3.getClear(), 0, 0, 10));
  2905. tlk3->finishSegmentMonitors();
  2906. tlk3->reset();
  2907. }
  2908. Owned <IKeyManager> tlk2a = createLocalKeyManager(recInfo, index2, NULL);
  2909. Owned<IStringSet> sset2a = createStringSet(10);
  2910. sset2a->addRange("0000000048", "0000000048");
  2911. ASSERT(sset2a->numValues() == 1);
  2912. tlk2a->append(createKeySegmentMonitor(false, sset2a.getClear(), 0, 0, 10));
  2913. tlk2a->finishSegmentMonitors();
  2914. tlk2a->reset();
  2915. Owned <IKeyManager> tlk2b = createLocalKeyManager(recInfo, index2, NULL);
  2916. Owned<IStringSet> sset2b = createStringSet(10);
  2917. sset2b->addRange("0000000047", "0000000049");
  2918. ASSERT(sset2b->numValues() == 3);
  2919. tlk2b->append(createKeySegmentMonitor(false, sset2b.getClear(), 0, 0, 10));
  2920. tlk2b->finishSegmentMonitors();
  2921. tlk2b->reset();
  2922. Owned <IKeyManager> tlk2c = createLocalKeyManager(recInfo, index2, NULL);
  2923. Owned<IStringSet> sset2c = createStringSet(10);
  2924. sset2c->addRange("0000000047", "0000000047");
  2925. tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 0, 10));
  2926. tlk2c->finishSegmentMonitors();
  2927. tlk2c->reset();
  2928. ASSERT(tlk1->getCount() == 76);
  2929. ASSERT(tlk1->getCount() == 76);
  2930. ASSERT(tlk1a->getCount() == 30);
  2931. ASSERT(tlk2->getCount() == 26);
  2932. ASSERT(tlk2a->getCount() == 2);
  2933. ASSERT(tlk2b->getCount() == 2);
  2934. ASSERT(tlk2c->getCount() == 0);
  2935. if (tlk3)
  2936. ASSERT(tlk3->getCount() == 102);
  2937. // MORE - PUT SOME TESTS IN FOR WILD SEEK STUFF
  2938. unsigned pass;
  2939. char buf[11];
  2940. unsigned i;
  2941. for (pass = 0; pass < 2; pass++)
  2942. {
  2943. offset_t fpos;
  2944. tlk1->reset();
  2945. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000001", 10)==0);
  2946. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000002", 10)==0);
  2947. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000003", 10)==0);
  2948. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000005", 10)==0);
  2949. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000006", 10)==0);
  2950. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000007", 10)==0);
  2951. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000009", 10)==0);
  2952. ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000010", 10)==0);
  2953. if (variable)
  2954. checkBlob(tlk1, 10+100000);
  2955. tlk1a->reset();
  2956. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000001", 10)==0);
  2957. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000010", 10)==0);
  2958. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000011", 10)==0);
  2959. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000021", 10)==0);
  2960. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000030", 10)==0);
  2961. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000031", 10)==0);
  2962. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000041", 10)==0);
  2963. ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000050", 10)==0);
  2964. tlk2->reset();
  2965. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000004", 10)==0);
  2966. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000008", 10)==0);
  2967. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000012", 10)==0);
  2968. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000016", 10)==0);
  2969. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000020", 10)==0);
  2970. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000024", 10)==0);
  2971. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000028", 10)==0);
  2972. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000032", 10)==0);
  2973. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000036", 10)==0);
  2974. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000040", 10)==0);
  2975. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000044", 10)==0);
  2976. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
  2977. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
  2978. ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000052", 10)==0);
  2979. if (tlk3)
  2980. {
  2981. tlk3->reset();
  2982. for (i = 1; i <= 100; i++)
  2983. {
  2984. ASSERT(tlk3->lookup(true));
  2985. sprintf(buf, "%010d", i);
  2986. ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
  2987. if (i==48 || i==49)
  2988. {
  2989. ASSERT(tlk3->lookup(true));
  2990. ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
  2991. }
  2992. }
  2993. ASSERT(!tlk3->lookup(true));
  2994. ASSERT(!tlk3->lookup(true));
  2995. }
  2996. }
  2997. tlk1->releaseSegmentMonitors();
  2998. tlk2->releaseSegmentMonitors();
  2999. if (tlk3)
  3000. tlk3->releaseSegmentMonitors();
  3001. }
  3002. clearKeyStoreCache(true);
  3003. removeTestKeys();
  3004. }
  3005. void testKeys()
  3006. {
  3007. ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
  3008. testKeys(false);
  3009. testKeys(true);
  3010. }
  3011. };
  3012. CPPUNIT_TEST_SUITE_REGISTRATION( IKeyManagerTest );
  3013. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( IKeyManagerTest, "IKeyManagerTest" );
  3014. #endif