123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- //****************************************************************************
- // Name: jhtree.cpp
- //
- // Purpose:
- //
- // Description:
- //
- // Notes: Supports only static (non-changing) files
- //
- // Initially I was holding on to the root nodes, but came to find
- // that they could potentially fill the cache by themselves...
- //
- // Things to play with:
- // - try not unpacking the entire node when it is read in.
- // break it out as needed later.
- //
- // History: 31-Aug-99 crs original
- // 08-Jan-00 nh added LZW compression of nodes
- // 14-feb-00 nh added GetORDKey
- // 15-feb-00 nh fixed isolatenode and nextNode
- // 12-Apr-00 jcs moved over to jhtree.dll etc.
- //****************************************************************************
- #include "platform.h"
- #include <stdio.h>
- #include <fcntl.h>
- #include <stdlib.h>
- #include <limits.h>
- #ifdef __linux__
- #include <alloca.h>
- #endif
- #include "hlzw.h"
- #include "jmutex.hpp"
- #include "jhutil.hpp"
- #include "jmisc.hpp"
- #include "jstats.h"
- #include "ctfile.hpp"
- #include "jhtree.ipp"
- #include "keybuild.hpp"
- #include "eclhelper_dyn.hpp"
- #include "rtlrecord.hpp"
- #include "rtldynfield.hpp"
- static std::atomic<CKeyStore *> keyStore(nullptr);
- static unsigned defaultKeyIndexLimit = 200;
- static CNodeCache *nodeCache = NULL;
- static CriticalSection *initCrit = NULL;
- bool useMemoryMappedIndexes = false;
- bool logExcessiveSeeks = false;
- bool linuxYield = false;
- bool traceSmartStepping = false;
- bool flushJHtreeCacheOnOOM = true;
- MODULE_INIT(INIT_PRIORITY_JHTREE_JHTREE)
- {
- initCrit = new CriticalSection;
- return 1;
- }
- MODULE_EXIT()
- {
- delete initCrit;
- delete keyStore.load(std::memory_order_relaxed);
- ::Release((CInterface*)nodeCache);
- }
- //#define DUMP_NODES
- SegMonitorList::SegMonitorList(const RtlRecord &_recInfo, bool _needWild) : recInfo(_recInfo), needWild(_needWild)
- {
- keySegCount = recInfo.getNumKeyedFields();
- reset();
- }
- unsigned SegMonitorList::ordinality() const
- {
- return segMonitors.length();
- }
- IKeySegmentMonitor *SegMonitorList::item(unsigned idx) const
- {
- return &segMonitors.item(idx);
- }
- size32_t SegMonitorList::getSize() const
- {
- unsigned lim = segMonitors.length();
- if (lim)
- {
- IKeySegmentMonitor &lastItem = segMonitors.item(lim-1);
- return lastItem.getOffset() + lastItem.getSize();
- }
- else
- return 0;
- }
- void SegMonitorList::checkSize(size32_t keyedSize, char const * keyname)
- {
- size32_t segSize = getSize();
- if (segSize != keyedSize)
- {
- StringBuffer err;
- err.appendf("Key size mismatch on key %s - key size is %u, expected %u", keyname, keyedSize, getSize());
- IException *e = MakeStringExceptionDirect(1000, err.str());
- EXCLOG(e, err.str());
- throw e;
- }
- }
- void SegMonitorList::setLow(unsigned segno, void *keyBuffer) const
- {
- unsigned lim = segMonitors.length();
- while (segno < lim)
- segMonitors.item(segno++).setLow(keyBuffer);
- }
- unsigned SegMonitorList::setLowAfter(size32_t offset, void *keyBuffer) const
- {
- unsigned lim = segMonitors.length();
- unsigned segno = 0;
- unsigned skipped = 0;
- while (segno < lim)
- {
- IKeySegmentMonitor &seg = segMonitors.item(segno++);
- if (seg.getOffset() >= offset)
- seg.setLow(keyBuffer);
- else if (seg.getSize()+seg.getOffset() <= offset)
- skipped++;
- else
- {
- byte *temp = (byte *) alloca(seg.getSize() + seg.getOffset());
- seg.setLow(temp);
- memcpy((byte *)keyBuffer+offset, temp+offset, seg.getSize() - (offset - seg.getOffset()));
- }
- }
- return skipped;
- }
- void SegMonitorList::endRange(unsigned segno, void *keyBuffer) const
- {
- unsigned lim = segMonitors.length();
- if (segno < lim)
- segMonitors.item(segno++).endRange(keyBuffer);
- while (segno < lim)
- segMonitors.item(segno++).setHigh(keyBuffer);
- }
- bool SegMonitorList::incrementKey(unsigned segno, void *keyBuffer) const
- {
- // Increment the key buffer to next acceptable value
- for(;;)
- {
- if (segMonitors.item(segno).increment(keyBuffer))
- {
- setLow(segno+1, keyBuffer);
- return true;
- }
- if (!segno)
- return false;
- segno--;
- }
- }
- unsigned SegMonitorList::_lastRealSeg() const
- {
- unsigned seg = segMonitors.length();
- for (;;)
- {
- if (!seg)
- return 0;
- seg--;
- if (!segMonitors.item(seg).isWild())
- return seg;
- }
- }
- unsigned SegMonitorList::lastFullSeg() const
- {
- // This is used to determine what part of the segmonitor list to use for a pre-count to determine if atmost/limit have been hit
- // We include everything up to the last of i) the last keyed element or ii) the last keyed,opt element that has no wild between it and a keyed element
- // NOTE - can return (unsigned) -1 if there are no full segments
- unsigned len = segMonitors.length();
- unsigned seg = 0;
- unsigned ret = (unsigned) -1;
- bool wildSeen = false;
- while (seg < len)
- {
- if (segMonitors.item(seg).isWild())
- wildSeen = true;
- else
- {
- if (!wildSeen || !segMonitors.item(seg).isOptional())
- {
- ret = seg;
- wildSeen = false;
- }
- }
- seg++;
- }
- return ret;
- }
- void SegMonitorList::finish(unsigned keyedSize)
- {
- if (modified)
- {
- while (segMonitors.length() < keySegCount)
- {
- unsigned idx = segMonitors.length();
- size32_t offset = recInfo.getFixedOffset(idx);
- size32_t size = recInfo.getFixedOffset(idx+1) - offset;
- segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
- }
- size32_t segSize = getSize();
- assertex(segSize == keyedSize);
- recalculateCache();
- modified = false;
- }
- }
- void SegMonitorList::recalculateCache()
- {
- cachedLRS = _lastRealSeg();
- }
- void SegMonitorList::reset()
- {
- segMonitors.kill();
- modified = true;
- }
- void SegMonitorList::swapWith(SegMonitorList &other)
- {
- reset();
- other.segMonitors.swapWith(segMonitors);
- }
- void SegMonitorList::deserialize(MemoryBuffer &mb)
- {
- unsigned num;
- mb.read(num);
- while (num--)
- append(deserializeKeySegmentMonitor(mb));
- }
- void SegMonitorList::serialize(MemoryBuffer &mb) const
- {
- mb.append((unsigned) ordinality());
- ForEachItemIn(idx, segMonitors)
- segMonitors.item(idx).serialize(mb);
- }
- // interface IIndexReadContext
- void SegMonitorList::append(IKeySegmentMonitor *segment)
- {
- modified = true;
- unsigned fieldIdx = segment->getFieldIdx();
- unsigned offset = segment->getOffset();
- unsigned size = segment->getSize();
- while (segMonitors.length() < fieldIdx)
- {
- unsigned idx = segMonitors.length();
- size32_t offset = recInfo.getFixedOffset(idx);
- size32_t size = recInfo.getFixedOffset(idx+1) - offset;
- segMonitors.append(*createWildKeySegmentMonitor(idx, offset, size));
- }
- segMonitors.append(*segment);
- }
- void SegMonitorList::append(FFoption option, IFieldFilter * filter)
- {
- UNIMPLEMENTED;
- }
- bool SegMonitorList::matched(void *keyBuffer, unsigned &lastMatch) const
- {
- lastMatch = 0;
- for (; lastMatch < segMonitors.length(); lastMatch++)
- {
- if (!segMonitors.item(lastMatch).matchesBuffer(keyBuffer))
- return false;
- }
- return true;
- }
- ///
- class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
- {
- protected:
- IContextLogger *ctx;
- SegMonitorList segs;
- IKeyCursor *keyCursor;
- char *keyBuffer;
- unsigned keySize; // size of key record including payload
- unsigned keyedSize; // size of non-payload part of key
- unsigned numsegs;
- bool matched = false;
- bool eof = false;
- bool started = false;
- StringAttr keyName;
- unsigned seeks;
- unsigned scans;
- unsigned skips;
- unsigned nullSkips;
- unsigned wildseeks;
- Owned<const IDynamicTransform> layoutTrans;
- MemoryBuffer buf; // used when translating
- size32_t layoutSize = 0;
- inline void setLow(unsigned segNo)
- {
- segs.setLow(segNo, keyBuffer);
- }
- inline unsigned setLowAfter(size32_t offset)
- {
- return segs.setLowAfter(offset, keyBuffer);
- }
- inline bool incrementKey(unsigned segno) const
- {
- return segs.incrementKey(segno, keyBuffer);
- }
- inline void endRange(unsigned segno)
- {
- segs.endRange(segno, keyBuffer);
- }
- bool skipTo(const void *_seek, size32_t seekOffset, size32_t seeklen)
- {
- // Modify the current key contents buffer as follows
- // Take bytes up to seekoffset from current buffer (i.e. leave them alone)
- // Take up to seeklen bytes from seek comparing them as I go. If I see a lower one before I see a higher one, stop.
- // If I didn't see any higher ones, return (at which point the skipto was a no-op
- // If I saw higher ones, call setLowAfter for all remaining segmonitors
- // If the current contents of buffer could not match, call incremementKey at the appropriate monitor so that it can
- // Clear the matched flag
- const byte *seek = (const byte *) _seek;
- while (seeklen)
- {
- int c = *seek - (byte) (keyBuffer[seekOffset]);
- if (c < 0)
- return false;
- else if (c>0)
- {
- memcpy(keyBuffer+seekOffset, seek, seeklen);
- break;
- }
- seek++;
- seekOffset++;
- seeklen--;
- }
- #ifdef _DEBUG
- if (traceSmartStepping)
- {
- StringBuffer recstr;
- unsigned i;
- for (i = 0; i < keySize; i++)
- {
- unsigned char c = ((unsigned char *) keyBuffer)[i];
- recstr.appendf("%c", isprint(c) ? c : '.');
- }
- recstr.append (" ");
- for (i = 0; i < keySize; i++)
- {
- recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
- }
- DBGLOG("SKIP: IN skips=%02d nullskips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
- }
- #endif
- if (!seeklen) return false;
- unsigned j = setLowAfter(seekOffset + seeklen);
- bool canmatch = true;
- unsigned lastSeg = segs.lastRealSeg();
- for (; j <= lastSeg; j++)
- {
- canmatch = segs.segMonitors.item(j).matchesBuffer(keyBuffer);
- if (!canmatch)
- {
- eof = !incrementKey(j);
- break;
- }
- }
- matched = false;
- return true;
- }
- void noteSeeks(unsigned lseeks, unsigned lscans, unsigned lwildseeks)
- {
- seeks += lseeks;
- scans += lscans;
- wildseeks += lwildseeks;
- if (ctx)
- {
- if (lseeks) ctx->noteStatistic(StNumIndexSeeks, lseeks);
- if (lscans) ctx->noteStatistic(StNumIndexScans, lscans);
- if (lwildseeks) ctx->noteStatistic(StNumIndexWildSeeks, lwildseeks);
- }
- }
- void noteSkips(unsigned lskips, unsigned lnullSkips)
- {
- skips += lskips;
- nullSkips += lnullSkips;
- if (ctx)
- {
- if (lskips) ctx->noteStatistic(StNumIndexSkips, lskips);
- if (lnullSkips) ctx->noteStatistic(StNumIndexNullSkips, lnullSkips);
- }
- }
- void reportExcessiveSeeks(unsigned numSeeks, unsigned lastSeg)
- {
- StringBuffer recstr;
- unsigned i;
- for (i = 0; i < keySize; i++)
- {
- unsigned char c = ((unsigned char *) keyBuffer)[i];
- recstr.appendf("%c", isprint(c) ? c : '.');
- }
- recstr.append ("\n");
- for (i = 0; i < keySize; i++)
- {
- recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
- }
- recstr.append ("\nusing segmonitors:\n");
- for (i=0; i <= lastSeg; i++)
- {
- unsigned size = segs.segMonitors.item(i).getSize();
- while (size--)
- recstr.append( segs.segMonitors.item(i).isWild() ? '?' : '#');
- }
- if (ctx)
- ctx->CTXLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
- else
- DBGLOG("%d seeks to lookup record \n%s\n in key %s", numSeeks, recstr.str(), keyName.get());
- }
- public:
- IMPLEMENT_IINTERFACE;
- CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx) : segs(_recInfo, true)
- {
- ctx = _ctx;
- numsegs = 0;
- keyBuffer = NULL;
- keyCursor = NULL;
- keySize = 0;
- keyedSize = 0;
- setKey(_key);
- seeks = 0;
- scans = 0;
- skips = 0;
- nullSkips = 0;
- wildseeks = 0;
- }
- ~CKeyLevelManager()
- {
- free (keyBuffer);
- ::Release(keyCursor);
- }
- virtual unsigned querySeeks() const
- {
- return seeks;
- }
- virtual unsigned queryScans() const
- {
- return scans;
- }
- virtual unsigned querySkips() const
- {
- return skips;
- }
- virtual unsigned queryNullSkips() const
- {
- return nullSkips;
- }
- virtual void resetCounts()
- {
- scans = 0;
- seeks = 0;
- skips = 0;
- nullSkips = 0;
- wildseeks = 0;
- }
- void setKey(IKeyIndexBase * _key)
- {
- ::Release(keyCursor);
- keyCursor = NULL;
- if (_key)
- {
- assertex(_key->numParts()==1);
- IKeyIndex *ki = _key->queryPart(0);
- keyCursor = ki->getCursor(ctx);
- keyName.set(ki->queryFileName());
- if (!keyBuffer)
- {
- keySize = ki->keySize();
- keyedSize = ki->keyedSize();
- if (!keySize)
- {
- StringBuffer err;
- err.appendf("Key appears corrupt - key file (%s) indicates record size is zero", keyName.get());
- IException *e = MakeStringExceptionDirect(1000, err.str());
- EXCLOG(e, err.str());
- throw e;
- }
- keyBuffer = (char *) malloc(keySize);
- }
- else
- {
- assertex(keyedSize==ki->keyedSize());
- assertex(keySize==ki->keySize());
- }
- }
- }
- virtual void setChooseNLimit(unsigned __int64 _rowLimit) override
- {
- // TODO ?
- }
- virtual void reset(bool crappyHack)
- {
- if (keyCursor)
- {
- if (!started)
- {
- started = true;
- numsegs = segs.ordinality();
- segs.checkSize(keyedSize, keyName.get());
- }
- if (!crappyHack)
- {
- matched = false;
- eof = false;
- setLow(0);
- keyCursor->reset();
- }
- }
- }
- virtual void releaseSegmentMonitors()
- {
- segs.reset();
- started = false;
- }
- virtual void append(IKeySegmentMonitor *segment)
- {
- assertex(!started);
- segs.append(segment);
- }
- virtual void append(FFoption option, IFieldFilter * filter)
- {
- UNIMPLEMENTED;
- }
- virtual unsigned ordinality() const
- {
- return segs.ordinality();
- }
- virtual IKeySegmentMonitor *item(unsigned idx) const
- {
- return segs.item(idx);
- }
- inline const byte *queryKeyBuffer()
- {
- if(layoutTrans)
- {
- buf.setLength(0);
- MemoryBufferBuilder aBuilder(buf, 0);
- layoutSize = layoutTrans->translate(aBuilder, reinterpret_cast<byte const *>(keyBuffer));
- return reinterpret_cast<byte const *>(buf.toByteArray());
- }
- else
- return reinterpret_cast<byte const *>(keyBuffer);
- }
- inline size32_t queryRowSize()
- {
- if (layoutTrans)
- return layoutSize;
- else
- return keyCursor ? keyCursor->getSize() : 0;
- }
- inline unsigned __int64 querySequence()
- {
- return keyCursor ? keyCursor->getSequence() : 0;
- }
- inline unsigned queryRecordSize() { return keySize; }
- bool _lookup(bool exact, unsigned lastSeg)
- {
- bool ret = false;
- unsigned lwildseeks = 0;
- unsigned lseeks = 0;
- unsigned lscans = 0;
- while (!eof)
- {
- bool ok;
- if (matched)
- {
- ok = keyCursor->next(keyBuffer);
- lscans++;
- }
- else
- {
- ok = keyCursor->gtEqual(keyBuffer, keyBuffer, true);
- lseeks++;
- }
- if (ok)
- {
- unsigned i = 0;
- matched = true;
- if (segs.segMonitors.length())
- {
- for (; i <= lastSeg; i++)
- {
- matched = segs.segMonitors.item(i).matchesBuffer(keyBuffer);
- if (!matched)
- break;
- }
- }
- if (matched)
- {
- ret = true;
- break;
- }
- #ifdef __linux__
- if (linuxYield)
- sched_yield();
- #endif
- eof = !incrementKey(i);
- if (!exact)
- {
- ret = true;
- break;
- }
- lwildseeks++;
- }
- else
- eof = true;
- }
- if (logExcessiveSeeks && lwildseeks > 1000)
- reportExcessiveSeeks(lwildseeks, lastSeg);
- noteSeeks(lseeks, lscans, lwildseeks);
- return ret;
- }
- virtual bool lookup(bool exact)
- {
- if (keyCursor)
- return _lookup(exact, segs.lastRealSeg());
- else
- return false;
- }
- virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
- {
- if (keyCursor)
- {
- if (skipTo(seek, seekOffset, seeklen))
- noteSkips(1, 0);
- else
- noteSkips(0, 1);
- bool ret = _lookup(true, segs.lastRealSeg());
- #ifdef _DEBUG
- if (traceSmartStepping)
- {
- StringBuffer recstr;
- unsigned i;
- for (i = 0; i < keySize; i++)
- {
- unsigned char c = ((unsigned char *) keyBuffer)[i];
- recstr.appendf("%c", isprint(c) ? c : '.');
- }
- recstr.append (" ");
- for (i = 0; i < keySize; i++)
- {
- recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
- }
- DBGLOG("SKIP: Got skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
- }
- #endif
- return ret;
- }
- else
- return false;
- }
- unsigned __int64 getCount()
- {
- assertex(keyCursor);
- matched = false;
- eof = false;
- setLow(0);
- keyCursor->reset();
- unsigned __int64 result = 0;
- unsigned lseeks = 0;
- unsigned lastRealSeg = segs.lastRealSeg();
- for (;;)
- {
- if (_lookup(true, lastRealSeg))
- {
- unsigned __int64 locount = keyCursor->getSequence();
- endRange(lastRealSeg);
- keyCursor->ltEqual(keyBuffer, NULL, true);
- lseeks++;
- result += keyCursor->getSequence()-locount+1;
- if (!incrementKey(lastRealSeg))
- break;
- matched = false;
- }
- else
- break;
- }
- noteSeeks(lseeks, 0, 0);
- return result;
- }
- unsigned __int64 getCurrentRangeCount(unsigned groupSegCount)
- {
- unsigned __int64 result = 0;
- if (keyCursor)
- {
- unsigned __int64 locount = keyCursor->getSequence();
- endRange(groupSegCount);
- keyCursor->ltEqual(keyBuffer, NULL, true);
- result = keyCursor->getSequence()-locount+1;
- noteSeeks(1, 0, 0);
- }
- return result;
- }
- bool nextRange(unsigned groupSegCount)
- {
- if (!incrementKey(groupSegCount-1))
- return false;
- matched = false;
- return true;
- }
- unsigned __int64 checkCount(unsigned __int64 max)
- {
- assertex(keyCursor);
- matched = false;
- eof = false;
- setLow(0);
- keyCursor->reset();
- unsigned __int64 result = 0;
- unsigned lseeks = 0;
- unsigned lastFullSeg = segs.lastFullSeg();
- if (lastFullSeg == (unsigned) -1)
- {
- noteSeeks(1, 0, 0);
- if (keyCursor->last(NULL))
- return keyCursor->getSequence()+1;
- else
- return 0;
- }
- for (;;)
- {
- if (_lookup(true, lastFullSeg))
- {
- unsigned __int64 locount = keyCursor->getSequence();
- endRange(lastFullSeg);
- keyCursor->ltEqual(keyBuffer, NULL, true);
- lseeks++;
- result += keyCursor->getSequence()-locount+1;
- if (max && (result > max))
- break;
- if (!incrementKey(lastFullSeg))
- break;
- matched = false;
- }
- else
- break;
- }
- noteSeeks(lseeks, 0, 0);
- return result;
- }
- virtual void serializeCursorPos(MemoryBuffer &mb)
- {
- mb.append(eof);
- if (!eof)
- {
- keyCursor->serializeCursorPos(mb);
- mb.append(matched);
- }
- }
- virtual void deserializeCursorPos(MemoryBuffer &mb)
- {
- mb.read(eof);
- if (!eof)
- {
- assertex(keyBuffer);
- keyCursor->deserializeCursorPos(mb, keyBuffer);
- mb.read(matched);
- }
- }
- virtual const byte *loadBlob(unsigned __int64 blobid, size32_t &blobsize)
- {
- return keyCursor->loadBlob(blobid, blobsize);
- }
- virtual void releaseBlobs()
- {
- if (keyCursor)
- keyCursor->releaseBlobs();
- }
- virtual void setLayoutTranslator(const IDynamicTransform * trans) override
- {
- layoutTrans.set(trans);
- }
- virtual void setSegmentMonitors(SegMonitorList &segmentMonitors) override
- {
- segs.swapWith(segmentMonitors);
- }
- virtual void deserializeSegmentMonitors(MemoryBuffer &mb) override
- {
- segs.deserialize(mb);
- }
- virtual void finishSegmentMonitors()
- {
- segs.finish(keyedSize);
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////////////////////////////////////////////////
- // For some reason #pragma pack does not seem to work here. Force all elements to 8 bytes
- class CKeyIdAndPos
- {
- public:
- unsigned __int64 keyId;
- offset_t pos;
- CKeyIdAndPos(unsigned __int64 _keyId, offset_t _pos) { keyId = _keyId; pos = _pos; }
- bool operator==(const CKeyIdAndPos &other) { return keyId == other.keyId && pos == other.pos; }
- };
- class CNodeMapping : public HTMapping<CJHTreeNode, CKeyIdAndPos>
- {
- public:
- CNodeMapping(CKeyIdAndPos &fp, CJHTreeNode &et) : HTMapping<CJHTreeNode, CKeyIdAndPos>(et, fp) { }
- ~CNodeMapping() { this->et.Release(); }
- CJHTreeNode &query() { return queryElement(); }
- };
- typedef OwningSimpleHashTableOf<CNodeMapping, CKeyIdAndPos> CNodeTable;
- #define FIXED_NODE_OVERHEAD (sizeof(CJHTreeNode))
- class CNodeMRUCache : public CMRUCacheOf<CKeyIdAndPos, CJHTreeNode, CNodeMapping, CNodeTable>
- {
- size32_t sizeInMem, memLimit;
- public:
- CNodeMRUCache(size32_t _memLimit) : memLimit(0)
- {
- sizeInMem = 0;
- setMemLimit(_memLimit);
- }
- size32_t setMemLimit(size32_t _memLimit)
- {
- size32_t oldMemLimit = memLimit;
- memLimit = _memLimit;
- if (full())
- makeSpace();
- return oldMemLimit;
- }
- virtual void makeSpace()
- {
- // remove LRU until !full
- do
- {
- clear(1);
- }
- while (full());
- }
- virtual bool full()
- {
- if (((size32_t)-1) == memLimit) return false;
- return sizeInMem > memLimit;
- }
- virtual void elementAdded(CNodeMapping *mapping)
- {
- CJHTreeNode &node = mapping->queryElement();
- sizeInMem += (FIXED_NODE_OVERHEAD+node.getMemSize());
- }
- virtual void elementRemoved(CNodeMapping *mapping)
- {
- CJHTreeNode &node = mapping->queryElement();
- sizeInMem -= (FIXED_NODE_OVERHEAD+node.getMemSize());
- }
- };
- class CNodeCache : public CInterface
- {
- private:
- mutable CriticalSection lock;
- CNodeMRUCache nodeCache;
- CNodeMRUCache leafCache;
- CNodeMRUCache blobCache;
- CNodeMRUCache preloadCache;
- bool cacheNodes;
- bool cacheLeaves;
- bool cacheBlobs;
- bool preloadNodes;
- public:
- CNodeCache(size32_t maxNodeMem, size32_t maxLeaveMem, size32_t maxBlobMem)
- : nodeCache(maxNodeMem), leafCache(maxLeaveMem), blobCache(maxBlobMem), preloadCache((unsigned) -1)
- {
- cacheNodes = maxNodeMem != 0;
- cacheLeaves = maxLeaveMem != 0;;
- cacheBlobs = maxBlobMem != 0;
- preloadNodes = false;
- // note that each index caches the last blob it unpacked so that sequential blobfetches are still ok
- }
- CJHTreeNode *getNode(INodeLoader *key, int keyID, offset_t pos, IContextLogger *ctx, bool isTLK);
- void preload(CJHTreeNode *node, int keyID, offset_t pos, IContextLogger *ctx);
- bool isPreloaded(int keyID, offset_t pos);
- inline bool getNodeCachePreload()
- {
- return preloadNodes;
- }
- inline bool setNodeCachePreload(bool _preload)
- {
- bool oldPreloadNodes = preloadNodes;
- preloadNodes = _preload;
- return oldPreloadNodes;
- }
- inline size32_t setNodeCacheMem(size32_t newSize)
- {
- CriticalBlock block(lock);
- unsigned oldV = nodeCache.setMemLimit(newSize);
- cacheNodes = (newSize != 0);
- return oldV;
- }
- inline size32_t setLeafCacheMem(size32_t newSize)
- {
- CriticalBlock block(lock);
- unsigned oldV = leafCache.setMemLimit(newSize);
- cacheLeaves = (newSize != 0);
- return oldV;
- }
- inline size32_t setBlobCacheMem(size32_t newSize)
- {
- CriticalBlock block(lock);
- unsigned oldV = blobCache.setMemLimit(newSize);
- cacheBlobs = (newSize != 0);
- return oldV;
- }
- void clear()
- {
- CriticalBlock block(lock);
- nodeCache.kill();
- leafCache.kill();
- blobCache.kill();
- }
- };
- static inline CNodeCache *queryNodeCache()
- {
- if (nodeCache) return nodeCache; // avoid crit
- CriticalBlock b(*initCrit);
- if (!nodeCache) nodeCache = new CNodeCache(100*0x100000, 50*0x100000, 0);
- return nodeCache;
- }
- void clearNodeCache()
- {
- queryNodeCache()->clear();
- }
- inline CKeyStore *queryKeyStore()
- {
- CKeyStore * value = keyStore.load(std::memory_order_acquire);
- if (value) return value; // avoid crit
- CriticalBlock b(*initCrit);
- if (!keyStore.load(std::memory_order_acquire)) keyStore = new CKeyStore;
- return keyStore;
- }
- unsigned setKeyIndexCacheSize(unsigned limit)
- {
- return queryKeyStore()->setKeyCacheLimit(limit);
- }
- CKeyStore::CKeyStore() : keyIndexCache(defaultKeyIndexLimit)
- {
- nextId = 0;
- #if 0
- mm.setown(createSharedMemoryManager("RichardsSharedMemManager", 0x100000));
- try
- {
- if (mm)
- sharedCache.setown(mm->share());
- }
- catch (IException *E)
- {
- E->Release();
- }
- #endif
- }
- CKeyStore::~CKeyStore()
- {
- }
- unsigned CKeyStore::setKeyCacheLimit(unsigned limit)
- {
- return keyIndexCache.setCacheLimit(limit);
- }
- IKeyIndex *CKeyStore::doload(const char *fileName, unsigned crc, IReplicatedFile *part, IFileIO *iFileIO, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
- {
- // isTLK provided by caller since flags in key header unreliable. If either say it's a TLK, I believe it.
- {
- MTIME_SECTION(queryActiveTimer(), "CKeyStore_load");
- IKeyIndex *keyIndex;
- // MORE - holds onto the mutex way too long
- synchronized block(mutex);
- StringBuffer fname;
- fname.append(fileName).append('/').append(crc);
- keyIndex = keyIndexCache.query(fname);
- if (NULL == keyIndex)
- {
- if (iMappedFile)
- {
- assert(!iFileIO && !part);
- keyIndex = new CMemKeyIndex(getUniqId(), LINK(iMappedFile), fname, isTLK);
- }
- else if (iFileIO)
- {
- assert(!part);
- keyIndex = new CDiskKeyIndex(getUniqId(), LINK(iFileIO), fname, isTLK, allowPreload);
- }
- else
- {
- Owned<IFile> iFile;
- if (part)
- {
- iFile.setown(part->open());
- if (NULL == iFile.get())
- throw MakeStringException(0, "Failed to open index file %s", fileName);
- }
- else
- iFile.setown(createIFile(fileName));
- IFileIO *fio = iFile->open(IFOread);
- if (fio)
- keyIndex = new CDiskKeyIndex(getUniqId(), fio, fname, isTLK, allowPreload);
- else
- throw MakeStringException(0, "Failed to open index file %s", fileName);
- }
- keyIndexCache.add(fname, *LINK(keyIndex));
- }
- else
- {
- LINK(keyIndex);
- }
- assertex(NULL != keyIndex);
- return keyIndex;
- }
- }
- IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IFileIO *iFileIO, bool isTLK, bool allowPreload)
- {
- return doload(fileName, crc, NULL, iFileIO, NULL, isTLK, allowPreload);
- }
- IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IMemoryMappedFile *iMappedFile, bool isTLK, bool allowPreload)
- {
- return doload(fileName, crc, NULL, NULL, iMappedFile, isTLK, allowPreload);
- }
- // fileName+crc used only as key for cache
- IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, IReplicatedFile &part, bool isTLK, bool allowPreload)
- {
- return doload(fileName, crc, &part, NULL, NULL, isTLK, allowPreload);
- }
- IKeyIndex *CKeyStore::load(const char *fileName, unsigned crc, bool isTLK, bool allowPreload)
- {
- return doload(fileName, crc, NULL, NULL, NULL, isTLK, allowPreload);
- }
- StringBuffer &CKeyStore::getMetrics(StringBuffer &xml)
- {
- xml.append(" <IndexMetrics>\n");
- Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
- ForEach(*iter)
- {
- CKeyIndexMapping &mapping = iter->query();
- IKeyIndex &index = mapping.query();
- const char *name = mapping.queryFindString();
- xml.appendf(" <Index name=\"%s\" scans=\"%d\" seeks=\"%d\"/>\n", name, index.queryScans(), index.querySeeks());
- }
- xml.append(" </IndexMetrics>\n");
- return xml;
- }
- void CKeyStore::resetMetrics()
- {
- synchronized block(mutex);
- Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
- ForEach(*iter)
- {
- CKeyIndexMapping &mapping = iter->query();
- IKeyIndex &index = mapping.query();
- index.resetCounts();
- }
- }
- void CKeyStore::clearCache(bool killAll)
- {
- synchronized block(mutex);
- if (killAll)
- {
- clearNodeCache(); // no point in keeping old nodes cached if key store cache has been cleared
- keyIndexCache.kill();
- }
- else
- {
- StringArray goers;
- Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
- ForEach(*iter)
- {
- CKeyIndexMapping &mapping = iter->query();
- IKeyIndex &index = mapping.query();
- if (!index.IsShared())
- {
- const char *name = mapping.queryFindString();
- goers.append(name);
- }
- }
- ForEachItemIn(idx, goers)
- {
- keyIndexCache.remove(goers.item(idx));
- }
- }
- }
- void CKeyStore::clearCacheEntry(const char *keyName)
- {
- if (!keyName || !*keyName)
- return; // nothing to do
- synchronized block(mutex);
- Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
- StringArray goers;
- ForEach(*iter)
- {
- CKeyIndexMapping &mapping = iter->query();
- IKeyIndex &index = mapping.query();
- if (!index.IsShared())
- {
- const char *name = mapping.queryFindString();
- if (strstr(name, keyName) != 0) // keyName doesn't have drive or part number associated with it
- goers.append(name);
- }
- }
- ForEachItemIn(idx, goers)
- {
- keyIndexCache.remove(goers.item(idx));
- }
- }
- void CKeyStore::clearCacheEntry(const IFileIO *io)
- {
- synchronized block(mutex);
- Owned<CKeyIndexMRUCache::CMRUIterator> iter = keyIndexCache.getIterator();
- StringArray goers;
- ForEach(*iter)
- {
- CKeyIndexMapping &mapping = iter->query();
- IKeyIndex &index = mapping.query();
- if (!index.IsShared())
- {
- if (index.queryFileIO()==io)
- goers.append(mapping.queryFindString());
- }
- }
- ForEachItemIn(idx, goers)
- {
- keyIndexCache.remove(goers.item(idx));
- }
- }
- // CKeyIndex impl.
- CKeyIndex::CKeyIndex(int _iD, const char *_name) : name(_name)
- {
- iD = _iD;
- cache = queryNodeCache(); // use one node cache for all key indexes;
- cache->Link();
- keyHdr = NULL;
- rootNode = NULL;
- cachedBlobNodePos = 0;
- keySeeks.store(0);
- keyScans.store(0);
- latestGetNodeOffset = 0;
- }
- void CKeyIndex::cacheNodes(CNodeCache *cache, offset_t nodePos, bool isTLK)
- {
- bool first = true;
- while (nodePos)
- {
- Owned<CJHTreeNode> node = loadNode(nodePos);
- if (node->isLeaf())
- {
- if (!isTLK)
- return;
- }
- else if (first)
- {
- cacheNodes(cache, node->getFPosAt(0), isTLK);
- first = false;
- }
- cache->preload(node, iD, nodePos, NULL);
- nodePos = node->getRightSib();
- }
- }
- void CKeyIndex::init(KeyHdr &hdr, bool isTLK, bool allowPreload)
- {
- if (isTLK)
- hdr.ktype |= HTREE_TOPLEVEL_KEY; // thor does not set
- keyHdr = new CKeyHdr();
- try
- {
- keyHdr->load(hdr);
- }
- catch (IKeyException *ke)
- {
- if (!name.get()) throw;
- StringBuffer msg;
- IKeyException *ke2 = MakeKeyException(ke->errorCode(), "%s. In key '%s'.", ke->errorMessage(msg).str(), name.get());
- ke->Release();
- throw ke2;
- }
- offset_t rootPos = keyHdr->getRootFPos();
- Linked<CNodeCache> nodeCache = queryNodeCache();
- if (allowPreload)
- {
- if (nodeCache->getNodeCachePreload() && !nodeCache->isPreloaded(iD, rootPos))
- {
- cacheNodes(nodeCache, rootPos, isTLK);
- }
- }
- rootNode = nodeCache->getNode(this, iD, rootPos, NULL, isTLK);
- }
- CKeyIndex::~CKeyIndex()
- {
- ::Release(keyHdr);
- ::Release(cache);
- ::Release(rootNode);
- }
- CMemKeyIndex::CMemKeyIndex(int _iD, IMemoryMappedFile *_io, const char *_name, bool isTLK)
- : CKeyIndex(_iD, _name)
- {
- io.setown(_io);
- assertex(io->offset()==0); // mapped whole file
- assertex(io->length()==io->fileSize()); // mapped whole file
- KeyHdr hdr;
- if (io->length() < sizeof(hdr))
- throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
- memcpy(&hdr, io->base(), sizeof(hdr));
- init(hdr, isTLK, false);
- }
- CJHTreeNode *CMemKeyIndex::loadNode(offset_t pos)
- {
- nodesLoaded++;
- if (pos + keyHdr->getNodeSize() > io->fileSize())
- {
- IException *E = MakeStringException(errno, "Error reading node at position %" I64F "x past EOF", pos);
- StringBuffer m;
- m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
- EXCLOG(E, m.str());
- throw E;
- }
- char *nodeData = (char *) (io->base() + pos);
- MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
- return CKeyIndex::loadNode(nodeData, pos, false);
- }
- CDiskKeyIndex::CDiskKeyIndex(int _iD, IFileIO *_io, const char *_name, bool isTLK, bool allowPreload)
- : CKeyIndex(_iD, _name)
- {
- io.setown(_io);
- KeyHdr hdr;
- if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
- throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
- init(hdr, isTLK, allowPreload);
- }
- CJHTreeNode *CDiskKeyIndex::loadNode(offset_t pos)
- {
- nodesLoaded++;
- unsigned nodeSize = keyHdr->getNodeSize();
- MemoryAttr ma;
- char *nodeData = (char *) ma.allocate(nodeSize);
- MTIME_SECTION(queryActiveTimer(), "JHTREE read node");
- if (io->read(pos, nodeSize, nodeData) != nodeSize)
- {
- IException *E = MakeStringException(errno, "Error %d reading node at position %" I64F "x", errno, pos);
- StringBuffer m;
- m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
- EXCLOG(E, m.str());
- throw E;
- }
- return CKeyIndex::loadNode(nodeData, pos, true);
- }
- CJHTreeNode *CKeyIndex::loadNode(char *nodeData, offset_t pos, bool needsCopy)
- {
- try
- {
- Owned<CJHTreeNode> ret;
- char leafFlag = ((NodeHdr *) nodeData)->leafFlag;
- switch(leafFlag)
- {
- case 0:
- ret.setown(new CJHTreeNode());
- break;
- case 1:
- if (keyHdr->isVariable())
- ret.setown(new CJHVarTreeNode());
- else
- ret.setown(new CJHTreeNode());
- break;
- case 2:
- ret.setown(new CJHTreeBlobNode());
- break;
- case 3:
- ret.setown(new CJHTreeMetadataNode());
- break;
- default:
- throwUnexpected();
- }
- {
- MTIME_SECTION(queryActiveTimer(), "JHTREE load node");
- ret->load(keyHdr, nodeData, pos, true);
- }
- return ret.getClear();
- }
- catch (IException *E)
- {
- StringBuffer m;
- m.appendf("In key %s, position 0x%" I64F "x", name.get(), pos);
- EXCLOG(E, m.str());
- throw;
- }
- catch (...)
- {
- DBGLOG("Unknown exception in key %s, position 0x%" I64F "x", name.get(), pos);
- throw;
- }
- }
- bool CKeyIndex::isTopLevelKey()
- {
- return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0;
- }
- bool CKeyIndex::isFullySorted()
- {
- return (keyHdr->getKeyType() & HTREE_FULLSORT_KEY) != 0;
- }
- IKeyCursor *CKeyIndex::getCursor(IContextLogger *ctx)
- {
- return new CKeyCursor(*this, ctx); // MORE - pool them?
- }
- CJHTreeNode *CKeyIndex::getNode(offset_t offset, IContextLogger *ctx)
- {
- latestGetNodeOffset = offset;
- return cache->getNode(this, iD, offset, ctx, isTopLevelKey());
- }
- void dumpNode(FILE *out, CJHTreeNode *node, int length, unsigned rowCount, bool raw)
- {
- if (!raw)
- fprintf(out, "Node dump: fpos(%" I64F "d) leaf(%d)\n", node->getFpos(), node->isLeaf());
- if (rowCount==0 || rowCount > node->getNumKeys())
- rowCount = node->getNumKeys();
- for (unsigned int i=0; i<rowCount; i++)
- {
- char *dst = (char *) alloca(node->getKeyLen()+50);
- node->getValueAt(i, dst);
- if (raw)
- {
- fwrite(dst, 1, length, out);
- }
- else
- {
- offset_t pos = node->getFPosAt(i);
- StringBuffer s;
- appendURL(&s, dst, length, true);
- fprintf(out, "keyVal %d [%" I64F "d] = %s\n", i, pos, s.str());
- }
- }
- if (!raw)
- fprintf(out, "==========\n");
- }
- void CKeyIndex::dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw)
- {
- Owned<CJHTreeNode> node = loadNode(pos);
- ::dumpNode(out, node, keySize(), count, isRaw);
- }
- bool CKeyIndex::hasSpecialFileposition() const
- {
- return keyHdr->hasSpecialFileposition();
- }
- size32_t CKeyIndex::keySize()
- {
- size32_t fileposSize = keyHdr->hasSpecialFileposition() ? sizeof(offset_t) : 0;
- return keyHdr->getMaxKeyLength() + fileposSize;
- }
- size32_t CKeyIndex::keyedSize()
- {
- return keyHdr->getNodeKeyLength();
- }
- bool CKeyIndex::hasPayload()
- {
- return keyHdr->hasPayload();
- }
- CJHTreeBlobNode *CKeyIndex::getBlobNode(offset_t nodepos)
- {
- CriticalBlock b(blobCacheCrit);
- if (nodepos != cachedBlobNodePos)
- {
- cachedBlobNode.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeBlobNode)); // note - don't use the cache
- cachedBlobNodePos = nodepos;
- }
- return cachedBlobNode.getLink();
- }
- const byte *CKeyIndex::loadBlob(unsigned __int64 blobid, size32_t &blobSize)
- {
- offset_t nodepos = blobid & I64C(0xffffffffffff);
- size32_t offset = (size32_t) ((blobid & I64C(0xffff000000000000)) >> 44);
- Owned<CJHTreeBlobNode> blobNode = getBlobNode(nodepos);
- size32_t sizeRemaining = blobNode->getTotalBlobSize(offset);
- blobSize = sizeRemaining;
- byte *ret = (byte *) malloc(sizeRemaining);
- byte *finger = ret;
- for (;;)
- {
- size32_t gotHere = blobNode->getBlobData(offset, finger);
- assertex(gotHere <= sizeRemaining);
- sizeRemaining -= gotHere;
- finger += gotHere;
- if (!sizeRemaining)
- break;
- blobNode.setown(getBlobNode(blobNode->getRightSib()));
- offset = 0;
- }
- return ret;
- }
- offset_t CKeyIndex::queryMetadataHead()
- {
- offset_t ret = keyHdr->getHdrStruct()->metadataHead;
- if(ret == static_cast<offset_t>(-1)) ret = 0; // index created before introduction of metadata would have FFFF... in this space
- return ret;
- }
- IPropertyTree * CKeyIndex::getMetadata()
- {
- offset_t nodepos = queryMetadataHead();
- if(!nodepos)
- return NULL;
- Owned<CJHTreeMetadataNode> node;
- StringBuffer xml;
- while(nodepos)
- {
- node.setown(QUERYINTERFACE(loadNode(nodepos), CJHTreeMetadataNode));
- node->get(xml);
- nodepos = node->getRightSib();
- }
- IPropertyTree * ret;
- try
- {
- ret = createPTreeFromXMLString(xml.str());
- }
- catch(IPTreeReadException * e)
- {
- StringBuffer emsg;
- IException * wrapped = MakeStringException(e->errorAudience(), e->errorCode(), "Error retrieving XML metadata: %s", e->errorMessage(emsg).str());
- e->Release();
- throw wrapped;
- }
- return ret;
- }
- CKeyCursor::CKeyCursor(CKeyIndex &_key, IContextLogger *_ctx)
- : key(_key), ctx(_ctx)
- {
- key.Link();
- nodeKey = 0;
- }
- CKeyCursor::~CKeyCursor()
- {
- key.Release();
- releaseBlobs();
- }
- void CKeyCursor::reset()
- {
- node.clear();
- }
- CJHTreeNode *CKeyCursor::locateFirstNode()
- {
- CJHTreeNode * n = 0;
- CJHTreeNode * p = LINK(key.rootNode);
- while (p != 0)
- {
- n = p;
- p = key.getNode(n->prevNodeFpos(), ctx);
- if (p != 0)
- n->Release();
- }
- return n;
- }
- CJHTreeNode *CKeyCursor::locateLastNode()
- {
- CJHTreeNode * n = 0;
- CJHTreeNode * p = LINK(key.rootNode);
- while (p != 0)
- {
- n = p;
- p = key.getNode(n->nextNodeFpos(), ctx);
- if (p != 0)
- n->Release();
- }
- return n;
- }
- bool CKeyCursor::next(char *dst)
- {
- if (!node)
- return first(dst);
- else
- {
- key.keyScans++;
- if (!node->getValueAt( ++nodeKey, dst))
- {
- offset_t rsib = node->getRightSib();
- node.clear();
- if (rsib != 0)
- {
- node.setown(key.getNode(rsib, ctx));
- if (node != NULL)
- {
- nodeKey = 0;
- return node->getValueAt(0, dst);
- }
- }
- return false;
- }
- else
- return true;
- }
- }
- bool CKeyCursor::prev(char *dst)
- {
- if (!node)
- return last(dst); // Note - this used to say First - surely a typo
- else
- {
- key.keyScans++;
- if (!nodeKey)
- {
- offset_t lsib = node->getLeftSib();
- node.clear();
-
- if (lsib != 0)
- {
- node.setown(key.getNode(lsib, ctx));
- if (node)
- {
- nodeKey = node->getNumKeys()-1;
- return node->getValueAt(nodeKey, dst );
- }
- }
- return false;
- }
- else
- return node->getValueAt(--nodeKey, dst);
- }
- }
- size32_t CKeyCursor::getSize()
- {
- assertex(node);
- return node->getSizeAt(nodeKey);
- }
- offset_t CKeyCursor::getFPos()
- {
- assertex(node);
- return node->getFPosAt(nodeKey);
- }
- unsigned __int64 CKeyCursor::getSequence()
- {
- assertex(node);
- return node->getSequence(nodeKey);
- }
- bool CKeyCursor::first(char *dst)
- {
- key.keySeeks++;
- node.setown(locateFirstNode());
- nodeKey = 0;
- return node->getValueAt(nodeKey, dst);
- }
- bool CKeyCursor::last(char *dst)
- {
- key.keySeeks++;
- node.setown(locateLastNode());
- nodeKey = node->getNumKeys()-1;
- return node->getValueAt( nodeKey, dst );
- }
- bool CKeyCursor::gtEqual(const char *src, char *dst, bool seekForward)
- {
- key.keySeeks++;
- unsigned lwm = 0;
- if (seekForward && node)
- {
- // When seeking forward, there are two cases worth optimizing:
- // 1. the next record is actually the one we want
- // 2. The record we want is on the current page
- unsigned numKeys = node->getNumKeys();
- if (nodeKey < numKeys-1)
- {
- int rc = node->compareValueAt(src, ++nodeKey);
- if (rc <= 0)
- {
- node->getValueAt(nodeKey, dst);
- return true;
- }
- if (nodeKey < numKeys-1)
- {
- rc = node->compareValueAt(src, numKeys-1);
- if (rc <= 0)
- lwm = nodeKey+1;
- }
- }
- }
- if (!lwm)
- node.set(key.rootNode);
- for (;;)
- {
- unsigned int a = lwm;
- int b = node->getNumKeys();
- // first search for first GTE entry (result in b(<),a(>=))
- while ((int)a<b)
- {
- int i = a+(b-a)/2;
- int rc = node->compareValueAt(src, i);
- if (rc>0)
- a = i+1;
- else
- b = i;
- }
- if (node->isLeaf())
- {
- if (a<node->getNumKeys())
- nodeKey = a;
- else
- {
- offset_t nextPos = node->nextNodeFpos(); // This can happen at eof because of key peculiarity where level above reports ffff as last
- node.setown(key.getNode(nextPos, ctx));
- nodeKey = 0;
- }
- if (node)
- {
- node->getValueAt(nodeKey, dst);
- return true;
- }
- else
- return false;
- }
- else
- {
- if (a<node->getNumKeys())
- {
- offset_t npos = node->getFPosAt(a);
- node.setown(key.getNode(npos, ctx));
- }
- else
- return false;
- }
- }
- }
- bool CKeyCursor::ltEqual(const char *src, char *dst, bool seekForward)
- {
- key.keySeeks++;
- unsigned lwm = 0;
- if (seekForward && node)
- {
- // When seeking forward, there are two cases worth optimizing:
- // 1. next record is > src, so we return current
- // 2. The record we want is on the current page
- unsigned numKeys = node->getNumKeys();
- if (nodeKey < numKeys-1)
- {
- int rc = node->compareValueAt(src, ++nodeKey);
- if (rc < 0)
- {
- node->getValueAt(--nodeKey, dst);
- return true;
- }
- if (nodeKey < numKeys-1)
- {
- rc = node->compareValueAt(src, numKeys-1);
- if (rc < 0)
- lwm = nodeKey;
- }
- }
- }
- if (!lwm)
- node.set(key.rootNode);
- for (;;)
- {
- unsigned int a = lwm;
- int b = node->getNumKeys();
- // Locate first record greater than src
- while ((int)a<b)
- {
- int i = a+(b+1-a)/2;
- int rc = node->compareValueAt(src, i-1);
- if (rc>=0)
- a = i;
- else
- b = i-1;
- }
- if (node->isLeaf())
- {
- // record we want is the one before first record greater than src.
- if (a>0)
- nodeKey = a-1;
- else
- {
- offset_t prevPos = node->prevNodeFpos();
- node.setown(key.getNode(prevPos, ctx));
- if (node)
- nodeKey = node->getNumKeys()-1;
- }
- if (node)
- {
- node->getValueAt(nodeKey, dst);
- return true;
- }
- else
- return false;
- }
- else
- {
- // Node to look in is the first one one that ended greater than src.
- if (a==node->getNumKeys())
- a--; // value being looked for is off the end of the index.
- offset_t npos = node->getFPosAt(a);
- node.setown(key.getNode(npos, ctx));
- if (!node)
- throw MakeStringException(0, "Invalid key %s: child node pointer should never be NULL", key.name.get());
- }
- }
- }
- void CKeyCursor::serializeCursorPos(MemoryBuffer &mb)
- {
- if (node)
- {
- mb.append(node->getFpos());
- mb.append(nodeKey);
- }
- else
- {
- offset_t zero = 0;
- unsigned zero2 = 0;
- mb.append(zero);
- mb.append(zero2);
- }
- }
- void CKeyCursor::deserializeCursorPos(MemoryBuffer &mb, char *keyBuffer)
- {
- offset_t nodeAddress;
- mb.read(nodeAddress);
- mb.read(nodeKey);
- if (nodeAddress)
- {
- node.setown(key.getNode(nodeAddress, ctx));
- if (node && keyBuffer)
- node->getValueAt(nodeKey, keyBuffer);
- }
- else
- node.clear();
- }
- const byte *CKeyCursor::loadBlob(unsigned __int64 blobid, size32_t &blobsize)
- {
- const byte *ret = key.loadBlob(blobid, blobsize);
- activeBlobs.append(ret);
- return ret;
- }
- void CKeyCursor::releaseBlobs()
- {
- ForEachItemIn(idx, activeBlobs)
- {
- free((void *) activeBlobs.item(idx));
- }
- activeBlobs.kill();
- }
- class CLazyKeyIndex : implements IKeyIndex, public CInterface
- {
- StringAttr keyfile;
- unsigned crc;
- Linked<IDelayedFile> delayedFile;
- Owned<IFileIO> iFileIO;
- Owned<IKeyIndex> realKey;
- CriticalSection c;
- bool isTLK;
- bool preloadAllowed;
- inline IKeyIndex &checkOpen()
- {
- CriticalBlock b(c);
- if (!realKey)
- {
- Owned<IMemoryMappedFile> mapped = useMemoryMappedIndexes ? delayedFile->getMappedFile() : nullptr;
- if (mapped)
- realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
- else
- {
- iFileIO.setown(delayedFile->getFileIO());
- realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
- }
- if (!realKey)
- {
- DBGLOG("Lazy key file %s could not be opened", keyfile.get());
- throw MakeStringException(0, "Lazy key file %s could not be opened", keyfile.get());
- }
- }
- return *realKey;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
- : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
- {}
- virtual bool IsShared() const { return CInterface::IsShared(); }
- virtual IKeyCursor *getCursor(IContextLogger *ctx) { return checkOpen().getCursor(ctx); }
- virtual size32_t keySize() { return checkOpen().keySize(); }
- virtual size32_t keyedSize() { return checkOpen().keyedSize(); }
- virtual bool hasPayload() { return checkOpen().hasPayload(); }
- virtual bool isTopLevelKey() { return checkOpen().isTopLevelKey(); }
- virtual bool isFullySorted() { return checkOpen().isFullySorted(); }
- virtual unsigned getFlags() { return checkOpen().getFlags(); }
- virtual void dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) { checkOpen().dumpNode(out, pos, count, isRaw); }
- virtual unsigned numParts() { return 1; }
- virtual IKeyIndex *queryPart(unsigned idx) { return idx ? NULL : this; }
- virtual unsigned queryScans() { return realKey ? realKey->queryScans() : 0; }
- virtual unsigned querySeeks() { return realKey ? realKey->querySeeks() : 0; }
- virtual const char *queryFileName() { return keyfile.get(); }
- virtual offset_t queryBlobHead() { return checkOpen().queryBlobHead(); }
- virtual void resetCounts() { if (realKey) realKey->resetCounts(); }
- virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
- virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
- virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
- virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
- virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
- virtual bool hasSpecialFileposition() const { return realKey ? realKey->hasSpecialFileposition() : false; }
- };
- extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
- {
- return queryKeyStore()->load(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
- }
- extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, bool isTLK, bool preloadAllowed)
- {
- return queryKeyStore()->load(keyfile, crc, isTLK, preloadAllowed);
- }
- extern jhtree_decl IKeyIndex *createKeyIndex(IReplicatedFile &part, unsigned crc, bool isTLK, bool preloadAllowed)
- {
- StringBuffer filePath;
- const RemoteFilename &rfn = part.queryCopies().item(0);
- rfn.getPath(filePath);
- return queryKeyStore()->load(filePath.str(), crc, part, isTLK, preloadAllowed);
- }
- extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IDelayedFile &iFileIO, bool isTLK, bool preloadAllowed)
- {
- return new CLazyKeyIndex(keyfile, crc, &iFileIO, isTLK, preloadAllowed);
- }
- extern jhtree_decl void clearKeyStoreCache(bool killAll)
- {
- queryKeyStore()->clearCache(killAll);
- }
- extern jhtree_decl void clearKeyStoreCacheEntry(const char *name)
- {
- queryKeyStore()->clearCacheEntry(name);
- }
- extern jhtree_decl void clearKeyStoreCacheEntry(const IFileIO *io)
- {
- queryKeyStore()->clearCacheEntry(io);
- }
- extern jhtree_decl StringBuffer &getIndexMetrics(StringBuffer &ret)
- {
- return queryKeyStore()->getMetrics(ret);
- }
- extern jhtree_decl void resetIndexMetrics()
- {
- queryKeyStore()->resetMetrics();
- }
- extern jhtree_decl bool isKeyFile(const char *filename)
- {
- OwnedIFile file = createIFile(filename);
- OwnedIFileIO io = file->open(IFOread);
- unsigned __int64 size = file->size();
- if (size)
- {
- KeyHdr hdr;
- if (io->read(0, sizeof(hdr), &hdr) == sizeof(hdr))
- {
- _WINREV(hdr.phyrec);
- _WINREV(hdr.root);
- _WINREV(hdr.nodeSize);
- if (size % hdr.nodeSize == 0 && hdr.phyrec == size-1 && hdr.root && hdr.root % hdr.nodeSize == 0)
- {
- NodeHdr root;
- if (io->read(hdr.root, sizeof(root), &root) == sizeof(root))
- {
- _WINREV(root.rightSib);
- _WINREV(root.leftSib);
- return root.leftSib==0 && root.rightSib==0;
- }
- }
- }
- }
- return false;
- }
- extern jhtree_decl bool setNodeCachePreload(bool preload)
- {
- return queryNodeCache()->setNodeCachePreload(preload);
- }
- extern jhtree_decl size32_t setNodeCacheMem(size32_t cacheSize)
- {
- return queryNodeCache()->setNodeCacheMem(cacheSize);
- }
- extern jhtree_decl size32_t setLeafCacheMem(size32_t cacheSize)
- {
- return queryNodeCache()->setLeafCacheMem(cacheSize);
- }
- extern jhtree_decl size32_t setBlobCacheMem(size32_t cacheSize)
- {
- return queryNodeCache()->setBlobCacheMem(cacheSize);
- }
- ///////////////////////////////////////////////////////////////////////////////
- // CNodeCache impl.
- ///////////////////////////////////////////////////////////////////////////////
- CJHTreeNode *CNodeCache::getNode(INodeLoader *keyIndex, int iD, offset_t pos, IContextLogger *ctx, bool isTLK)
- {
- // MORE - could probably be improved - I think having the cache template separate is not helping us here
- // Also one cache per key would surely be faster, and could still use a global total
- if (!pos)
- return NULL;
- {
- // It's a shame that we don't know the type before we read it. But probably not that big a deal
- CriticalBlock block(lock);
- CKeyIdAndPos key(iD, pos);
- if (preloadNodes)
- {
- CJHTreeNode *cacheNode = preloadCache.query(key);
- if (cacheNode)
- {
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumPreloadCacheHits, 1);
- preloadCacheHits++;
- return LINK(cacheNode);
- }
- }
- if (cacheNodes)
- {
- CJHTreeNode *cacheNode = nodeCache.query(key);
- if (cacheNode)
- {
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
- nodeCacheHits++;
- return LINK(cacheNode);
- }
- }
- if (cacheLeaves)
- {
- CJHTreeNode *cacheNode = leafCache.query(key);
- if (cacheNode)
- {
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
- leafCacheHits++;
- return LINK(cacheNode);
- }
- }
- if (cacheBlobs)
- {
- CJHTreeNode *cacheNode = blobCache.query(key);
- if (cacheNode)
- {
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
- blobCacheHits++;
- return LINK(cacheNode);
- }
- }
- CJHTreeNode *node;
- {
- CriticalUnblock block(lock);
- node = keyIndex->loadNode(pos); // NOTE - don't want cache locked while we load!
- }
- cacheAdds++;
- if (node->isBlob())
- {
- if (cacheBlobs)
- {
- CJHTreeNode *cacheNode = blobCache.query(key); // check if added to cache while we were reading
- if (cacheNode)
- {
- ::Release(node);
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumBlobCacheHits, 1);
- blobCacheHits++;
- return LINK(cacheNode);
- }
- if (ctx) ctx->noteStatistic(StNumBlobCacheAdds, 1);
- blobCacheAdds++;
- blobCache.add(key, *LINK(node));
- }
- }
- else if (node->isLeaf() && !isTLK) // leaves in TLK are cached as if they were nodes
- {
- if (cacheLeaves)
- {
- CJHTreeNode *cacheNode = leafCache.query(key); // check if added to cache while we were reading
- if (cacheNode)
- {
- ::Release(node);
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumLeafCacheHits, 1);
- leafCacheHits++;
- return LINK(cacheNode);
- }
- if (ctx) ctx->noteStatistic(StNumLeafCacheAdds, 1);
- leafCacheAdds++;
- leafCache.add(key, *LINK(node));
- }
- }
- else
- {
- if (cacheNodes)
- {
- CJHTreeNode *cacheNode = nodeCache.query(key); // check if added to cache while we were reading
- if (cacheNode)
- {
- ::Release(node);
- cacheHits++;
- if (ctx) ctx->noteStatistic(StNumNodeCacheHits, 1);
- nodeCacheHits++;
- return LINK(cacheNode);
- }
- if (ctx) ctx->noteStatistic(StNumNodeCacheAdds, 1);
- nodeCacheAdds++;
- nodeCache.add(key, *LINK(node));
- }
- }
- return node;
- }
- }
- void CNodeCache::preload(CJHTreeNode *node, int iD, offset_t pos, IContextLogger *ctx)
- {
- assertex(pos);
- assertex(preloadNodes);
- CriticalBlock block(lock);
- CKeyIdAndPos key(iD, pos);
- CJHTreeNode *cacheNode = preloadCache.query(key);
- if (!cacheNode)
- {
- cacheAdds++;
- if (ctx) ctx->noteStatistic(StNumPreloadCacheAdds, 1);
- preloadCacheAdds++;
- preloadCache.add(key, *LINK(node));
- }
- }
- bool CNodeCache::isPreloaded(int iD, offset_t pos)
- {
- CriticalBlock block(lock);
- CKeyIdAndPos key(iD, pos);
- return NULL != preloadCache.query(key);
- }
- RelaxedAtomic<unsigned> cacheAdds;
- RelaxedAtomic<unsigned> cacheHits;
- RelaxedAtomic<unsigned> nodesLoaded;
- RelaxedAtomic<unsigned> blobCacheHits;
- RelaxedAtomic<unsigned> blobCacheAdds;
- RelaxedAtomic<unsigned> leafCacheHits;
- RelaxedAtomic<unsigned> leafCacheAdds;
- RelaxedAtomic<unsigned> nodeCacheHits;
- RelaxedAtomic<unsigned> nodeCacheAdds;
- RelaxedAtomic<unsigned> preloadCacheHits;
- RelaxedAtomic<unsigned> preloadCacheAdds;
- void clearNodeStats()
- {
- cacheAdds.store(0);
- cacheHits.store(0);
- nodesLoaded.store(0);
- blobCacheHits.store(0);
- blobCacheAdds.store(0);
- leafCacheHits.store(0);
- leafCacheAdds.store(0);
- nodeCacheHits.store(0);
- nodeCacheAdds.store(0);
- preloadCacheHits.store(0);
- preloadCacheAdds.store(0);
- }
- //------------------------------------------------------------------------------------------------
- class CKeyMerger : public CKeyLevelManager
- {
- unsigned *mergeheap;
- unsigned numkeys;
- unsigned activekeys;
- unsigned compareSize = 0;
- IArrayOf<IKeyCursor> cursorArray;
- PointerArray bufferArray;
- PointerArray fixedArray;
- BoolArray matchedArray;
- UnsignedArray mergeHeapArray;
- UnsignedArray keyNoArray;
- IKeyCursor **cursors;
- char **buffers;
- void **fixeds;
- bool *matcheds;
- unsigned sortFieldOffset;
- unsigned sortFromSeg;
- bool resetPending;
- // #define BuffCompare(a, b) memcmp(buffers[mergeheap[a]]+sortFieldOffset, buffers[mergeheap[b]]+sortFieldOffset, keySize-sortFieldOffset) // NOTE - compare whole key not just keyed part.
- inline int BuffCompare(unsigned a, unsigned b)
- {
- const char *c1 = buffers[mergeheap[a]];
- const char *c2 = buffers[mergeheap[b]];
- //Backwards compatibility - do not compare the fileposition field, even if it would be significant.
- //int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, keySize-sortFieldOffset); // NOTE - compare whole key not just keyed part.
- int ret = memcmp(c1+sortFieldOffset, c2+sortFieldOffset, compareSize-sortFieldOffset); // NOTE - compare whole key not just keyed part.
- if (!ret && sortFieldOffset)
- ret = memcmp(c1, c2, sortFieldOffset);
- return ret;
- }
- Linked<IKeyIndexBase> keyset;
- void resetKey(unsigned i)
- {
- matcheds[i] = false;
- segs.setLow(sortFromSeg, buffers[i]);
- IKeyCursor *cursor = cursors[i];
- if (cursor)
- cursor->reset();
- }
- void replicateForTrailingSort()
- {
- // For each key that is present, we need to repeat it for each distinct value of leading segmonitor fields that is present in the index.
- // We also need to make sure that sortFromSeg is properly set
- sortFromSeg = (unsigned) -1;
- ForEachItemIn(idx, segs.segMonitors)
- {
- IKeySegmentMonitor &seg = segs.segMonitors.item(idx);
- unsigned offset = seg.getOffset();
- if (offset == sortFieldOffset)
- {
- sortFromSeg = idx;
- break;
- }
- IKeySegmentMonitor *override = createOverrideableKeySegmentMonitor(LINK(&seg));
- segs.segMonitors.replace(*override, idx);
- }
- if (sortFromSeg == -1)
- assertex(!"Attempting to sort from offset that is not on a segment boundary"); // MORE - can use the information that we have earlier to make sure not merged
- assertex(resetPending == true); // we do the actual replication in reset
- }
- void dumpMergeHeap()
- {
- DBGLOG("---------");
- for (unsigned i = 0; i < activekeys; i++)
- {
- int key = mergeheap[i];
- DBGLOG("%d %.*s %d", key, keySize, buffers[key], matcheds[key]);
- }
- }
- inline void setSegOverrides(unsigned key)
- {
- keyBuffer = buffers[key];
- keyCursor = cursors[key];
- matched = matcheds[key];
- for (unsigned segno = 0; segno < sortFromSeg; segno++)
- {
- IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
- assertex(sm);
- sm->setOverrideBuffer(fixeds[key]);
- }
- segs.recalculateCache();
- }
- public:
- CKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet *_keyset, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
- {
- init();
- setKey(_keyset);
- }
- CKeyMerger(const RtlRecord &_recInfo, IKeyIndex *_onekey, unsigned _sortFieldOffset, IContextLogger *_ctx) : CKeyLevelManager(_recInfo, NULL, _ctx), sortFieldOffset(_sortFieldOffset)
- {
- init();
- setKey(_onekey);
- }
- ~CKeyMerger()
- {
- killBuffers();
- keyCursor = NULL; // so parent class doesn't delete it!
- keyBuffer = NULL; // ditto
- }
- void killBuffers()
- {
- ForEachItemIn(idx, bufferArray)
- {
- free(bufferArray.item(idx));
- }
- ForEachItemIn(idx1, fixedArray)
- {
- free(fixedArray.item(idx1));
- }
- cursorArray.kill();
- keyCursor = NULL; // cursorArray owns cursors
- matchedArray.kill();
- mergeHeapArray.kill();
- bufferArray.kill();
- fixedArray.kill();
- keyNoArray.kill();
- cursors = NULL;
- matcheds = NULL;
- mergeheap = NULL;
- buffers = NULL;
- fixeds = NULL;
- }
- void init()
- {
- numkeys = 0;
- activekeys = 0;
- resetPending = true;
- sortFromSeg = 0;
- }
- virtual bool lookupSkip(const void *seek, size32_t seekOffset, size32_t seeklen)
- {
- // Rather like a lookup, except that no records below the value indicated by seek* should be returned.
- if (resetPending)
- {
- resetSort(seek, seekOffset, seeklen);
- if (!activekeys)
- return false;
- #ifdef _DEBUG
- if (traceSmartStepping)
- DBGLOG("SKIP: init key = %d", mergeheap[0]);
- #endif
- return true;
- }
- else
- {
- if (!activekeys)
- {
- #ifdef _DEBUG
- if (traceSmartStepping)
- DBGLOG("SKIP: merge done");
- #endif
- return false;
- }
- unsigned key = mergeheap[0];
- #ifdef _DEBUG
- if (traceSmartStepping)
- DBGLOG("SKIP: merging key = %d", key);
- #endif
- unsigned compares = 0;
- for (;;)
- {
- if (!CKeyLevelManager::lookupSkip(seek, seekOffset, seeklen) )
- {
- activekeys--;
- if (!activekeys)
- {
- if (ctx)
- ctx->noteStatistic(StNumIndexMergeCompares, compares);
- return false;
- }
- eof = false;
- mergeheap[0] = mergeheap[activekeys];
- }
- /* The key associated with mergeheap[0] will have changed
- This code restores the heap property
- */
- unsigned p = 0; /* parent */
- while (1)
- {
- unsigned c = p*2 + 1; /* child */
- if ( c >= activekeys )
- break;
- /* Select smaller child */
- if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
- /* If child is greater or equal than parent then we are done */
- if ( BuffCompare( c, p ) >= 0 )
- break;
- /* Swap parent and child */
- int r = mergeheap[c];
- mergeheap[c] = mergeheap[p];
- mergeheap[p] = r;
- /* child becomes parent */
- p = c;
- }
- if (key != mergeheap[0])
- {
- key = mergeheap[0];
- setSegOverrides(key);
- }
- if (memcmp(seek, keyBuffer+seekOffset, seeklen) <= 0)
- {
- #ifdef _DEBUG
- if (traceSmartStepping)
- {
- DBGLOG("SKIP: merged key = %d", key);
- StringBuffer recstr;
- unsigned i;
- for (i = 0; i < keySize; i++)
- {
- unsigned char c = ((unsigned char *) keyBuffer)[i];
- recstr.appendf("%c", isprint(c) ? c : '.');
- }
- recstr.append (" ");
- for (i = 0; i < keySize; i++)
- {
- recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
- }
- DBGLOG("SKIP: Out skips=%02d nullSkips=%02d seeks=%02d scans=%02d : %s", skips, nullSkips, seeks, scans, recstr.str());
- }
- #endif
- if (ctx)
- ctx->noteStatistic(StNumIndexMergeCompares, compares);
- return true;
- }
- else
- {
- compares++;
- if (ctx && (compares == 100))
- {
- ctx->noteStatistic(StNumIndexMergeCompares, compares); // also checks for abort...
- compares = 0;
- }
- }
- }
- }
- }
- virtual void setLayoutTranslator(const IDynamicTransform * trans) override
- {
- if (trans)
- throw MakeStringException(0, "Layout translation not supported when merging key parts, as it may change sort order");
- // It MIGHT be possible to support translation still if all keyCursors have the same translation
- // would have to translate AFTER the merge, but that's ok
- // HOWEVER the result won't be guaranteed to be in sorted order afterwards so is there any point?
- }
- virtual void setKey(IKeyIndexBase *_keyset)
- {
- keyset.set(_keyset);
- if (_keyset && _keyset->numParts())
- {
- IKeyIndex *ki = _keyset->queryPart(0);
- keySize = ki->keySize();
- if (!keySize)
- throw MakeStringException(0, "Invalid key size 0 in key %s", ki->queryFileName());
- keyedSize = ki->keyedSize();
- numkeys = _keyset->numParts();
- compareSize = keySize - (ki->hasSpecialFileposition() ? sizeof(offset_t) : 0);
- }
- else
- numkeys = 0;
- killBuffers();
- }
- void resetSort(const void *seek, size32_t seekOffset, size32_t seeklen)
- {
- activekeys = 0;
- keyBuffer = NULL;
- void *fixedValue = NULL;
- unsigned segno;
- for (segno = 0; segno < sortFromSeg; segno++)
- {
- IOverrideableKeySegmentMonitor *sm = QUERYINTERFACE(&segs.segMonitors.item(segno), IOverrideableKeySegmentMonitor);
- assertex(sm);
- sm->setOverrideBuffer(NULL);
- }
- segs.recalculateCache();
- unsigned i;
- for (i = 0; i < numkeys; i++)
- {
- assertex(keySize);
- if (!keyBuffer) keyBuffer = (char *) malloc(keySize);
- segs.setLow(0, keyBuffer);
- for (;;)
- {
- keyCursor = keyset->queryPart(i)->getCursor(ctx);
- matched = false;
- eof = false;
- keyCursor->reset();
- bool found;
- unsigned lskips = 0;
- unsigned lnullSkips = 0;
- for (;;)
- {
- if (seek)
- {
- if (skipTo(seek, seekOffset, seeklen))
- lskips++;
- else
- lnullSkips++;
- }
- found = _lookup(true, segs.lastRealSeg());
- if (!found || !seek || memcmp(keyBuffer + seekOffset, seek, seeklen) >= 0)
- break;
- }
- noteSkips(lskips, lnullSkips);
- if (found)
- {
- keyNoArray.append(i);
- cursorArray.append(*keyCursor);
- bufferArray.append(keyBuffer);
- matchedArray.append(matched);
- mergeHeapArray.append(activekeys++);
- if (!sortFromSeg)
- {
- keyBuffer = NULL;
- fixedValue = NULL;
- break;
- }
- assertex(sortFieldOffset);
- void *fixedValue = malloc(sortFieldOffset);
- memcpy(fixedValue, keyBuffer, sortFieldOffset);
- fixedArray.append(fixedValue);
- #ifdef _DEBUG
- if (traceSmartStepping)
- {
- StringBuffer recstr;
- unsigned i;
- for (i = 0; i < sortFieldOffset; i++)
- {
- unsigned char c = ((unsigned char *) keyBuffer)[i];
- recstr.appendf("%c", isprint(c) ? c : '.');
- }
- recstr.append (" ");
- for (i = 0; i < keySize; i++)
- {
- recstr.appendf("%02x ", ((unsigned char *) keyBuffer)[i]);
- }
- DBGLOG("Adding key cursor info for %s", recstr.str());
- }
- #endif
- // Now advance segments 0 through sortFromSeg-1 to next legal value...
- assertex(keySize);
- char *nextBuffer = (char *) malloc(keySize);
- memcpy(nextBuffer, keyBuffer, keySize);
- keyBuffer = nextBuffer;
- #ifdef _DEBUG
- assertex(segs.segMonitors.item(sortFromSeg-1).matchesBuffer(keyBuffer));
- #endif
- if (!segs.incrementKey(sortFromSeg-1, keyBuffer))
- break;
- }
- else
- {
- keyCursor->Release();
- break;
- }
- }
- }
- free (keyBuffer);
- keyBuffer = NULL;
- if (activekeys>0)
- {
- if (ctx)
- ctx->noteStatistic(StNumIndexMerges, activekeys);
- cursors = cursorArray.getArray();
- matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
- buffers = (char **) bufferArray.getArray();
- mergeheap = mergeHeapArray.getArray();
- fixeds = fixedArray.getArray();
- /* Permute mergeheap to establish the heap property
- For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
- The children of p must both be greater than or equal to p
- The parent of a child c is given by p = (c-1)/2
- */
- for (i=1; i<activekeys; i++)
- {
- int r = mergeheap[i];
- int c = i; /* child */
- while (c > 0)
- {
- int p = (c-1)/2; /* parent */
- if ( BuffCompare( c, p ) >= 0 )
- break;
- mergeheap[c] = mergeheap[p];
- mergeheap[p] = r;
- c = p;
- }
- }
- setSegOverrides(mergeheap[0]);
- eof = false;
- }
- else
- {
- keyBuffer = NULL;
- keyCursor = NULL;
- matched = false;
- eof = true;
- }
- resetPending = false;
- }
- virtual void reset(bool crappyHack)
- {
- if (!started)
- {
- started = true;
- if (keyedSize)
- segs.checkSize(keyedSize, "[merger]"); //PG: not sure what keyname to use here
- }
- if (!crappyHack)
- {
- killBuffers();
- resetPending = true;
- }
- else
- {
- setSegOverrides(mergeheap[0]);
- resetPending = false;
- }
- }
- virtual bool lookup(bool exact)
- {
- assertex(exact);
- if (resetPending)
- {
- resetSort(NULL, 0, 0);
- if (!activekeys)
- return false;
- }
- else
- {
- if (!activekeys)
- return false;
- unsigned key = mergeheap[0];
- if (!CKeyLevelManager::lookup(exact))
- {
- activekeys--;
- if (!activekeys)
- return false; // MORE - does this lose a record?
- eof = false;
- mergeheap[0] = mergeheap[activekeys];
- }
- /* The key associated with mergeheap[0] will have changed
- This code restores the heap property
- */
- unsigned p = 0; /* parent */
- while (1)
- {
- unsigned c = p*2 + 1; /* child */
- if ( c >= activekeys )
- break;
- /* Select smaller child */
- if ( c+1 < activekeys && BuffCompare( c+1, c ) < 0 ) c += 1;
- /* If child is greater or equal than parent then we are done */
- if ( BuffCompare( c, p ) >= 0 )
- break;
- /* Swap parent and child */
- int r = mergeheap[c];
- mergeheap[c] = mergeheap[p];
- mergeheap[p] = r;
- /* child becomes parent */
- p = c;
- }
- // dumpMergeHeap();
- if (mergeheap[0] != key)
- setSegOverrides(mergeheap[0]);
- }
- return true;
- }
- virtual unsigned __int64 getCount()
- {
- assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
- // as the sequence numbers are not in sequence
- unsigned __int64 ret = 0;
- if (resetPending)
- resetSort(NULL, 0, 0); // This is slightly suboptimal
- for (unsigned i = 0; i < activekeys; i++)
- {
- unsigned key = mergeheap[i];
- keyBuffer = buffers[key];
- keyCursor = cursors[key];
- ret += CKeyLevelManager::getCount();
- }
- return ret;
- }
- virtual unsigned __int64 checkCount(unsigned __int64 max)
- {
- assertex (!sortFieldOffset); // we should have avoided using a stepping merger for precheck of limits, both for efficiency and because this code won't work
- // as the sequence numbers are not in sequence
- unsigned __int64 ret = 0;
- if (resetPending)
- resetSort(NULL, 0, 0); // this is a little suboptimal as we will not bail out early
- for (unsigned i = 0; i < activekeys; i++)
- {
- unsigned key = mergeheap[i];
- keyBuffer = buffers[key];
- keyCursor = cursors[key];
- unsigned __int64 thisKeyCount = CKeyLevelManager::checkCount(max);
- ret += thisKeyCount;
- if (thisKeyCount > max)
- return ret;
- max -= thisKeyCount;
- }
- return ret;
- }
- virtual void serializeCursorPos(MemoryBuffer &mb)
- {
- // dumpMergeHeap();
- mb.append(eof);
- mb.append(activekeys);
- for (unsigned i = 0; i < activekeys; i++)
- {
- unsigned key = mergeheap[i];
- mb.append(keyNoArray.item(key));
- cursors[key]->serializeCursorPos(mb);
- mb.append(matcheds[key]);
- }
- }
- virtual void deserializeCursorPos(MemoryBuffer &mb)
- {
- mb.read(eof);
- mb.read(activekeys);
- for (unsigned i = 0; i < activekeys; i++)
- {
- unsigned keyno;
- mb.read(keyno);
- keyNoArray.append(keyno);
- keyCursor = keyset->queryPart(keyno)->getCursor(ctx);
- assertex(keySize);
- keyBuffer = (char *) malloc(keySize);
- cursorArray.append(*keyCursor);
- keyCursor->deserializeCursorPos(mb, keyBuffer);
- mb.read(matched);
- matchedArray.append(matched);
- bufferArray.append(keyBuffer);
- void *fixedValue = (char *) malloc(sortFieldOffset);
- memcpy(fixedValue, keyBuffer, sortFieldOffset); // If it's not at EOF then it must match
- fixedArray.append(fixedValue);
- mergeHeapArray.append(i);
- }
- cursors = cursorArray.getArray();
- matcheds = (bool *) matchedArray.getArray(); // For some reason BoolArray is typedef'd to CharArray on linux...
- buffers = (char **) bufferArray.getArray();
- mergeheap = mergeHeapArray.getArray();
- fixeds = fixedArray.getArray();
- }
- virtual void finishSegmentMonitors()
- {
- CKeyLevelManager::finishSegmentMonitors();
- if (sortFieldOffset)
- {
- if (keyedSize)
- segs.checkSize(keyedSize, "[merger]"); // Ensures trailing KSM is setup
- replicateForTrailingSort();
- }
- }
- };
- extern jhtree_decl IKeyManager *createKeyMerger(const RtlRecord &_recInfo, IKeyIndexSet * _keys, unsigned _sortFieldOffset, IContextLogger *_ctx)
- {
- return new CKeyMerger(_recInfo, _keys, _sortFieldOffset, _ctx);
- }
- extern jhtree_decl IKeyManager *createSingleKeyMerger(const RtlRecord &_recInfo, IKeyIndex * _onekey, unsigned _sortFieldOffset, IContextLogger *_ctx)
- {
- return new CKeyMerger(_recInfo, _onekey, _sortFieldOffset, _ctx);
- }
- class CKeyIndexSet : implements IKeyIndexSet, public CInterface
- {
- IPointerArrayOf<IKeyIndex> indexes;
- offset_t recordCount = 0;
- offset_t totalSize = 0;
- StringAttr origFileName;
- public:
- IMPLEMENT_IINTERFACE;
-
- virtual bool IsShared() const { return CInterface::IsShared(); }
- void addIndex(IKeyIndex *i) { indexes.append(i); }
- virtual unsigned numParts() { return indexes.length(); }
- virtual IKeyIndex *queryPart(unsigned partNo) { return indexes.item(partNo); }
- virtual void setRecordCount(offset_t count) { recordCount = count; }
- virtual void setTotalSize(offset_t size) { totalSize = size; }
- virtual offset_t getRecordCount() { return recordCount; }
- virtual offset_t getTotalSize() { return totalSize; }
- };
- extern jhtree_decl IKeyIndexSet *createKeyIndexSet()
- {
- return new CKeyIndexSet;
- }
- extern jhtree_decl IKeyManager *createLocalKeyManager(const RtlRecord &_recInfo, IKeyIndex *_key, IContextLogger *_ctx)
- {
- return new CKeyLevelManager(_recInfo, _key, _ctx);
- }
- class CKeyArray : implements IKeyArray, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- virtual bool IsShared() const { return CInterface::IsShared(); }
- IPointerArrayOf<IKeyIndexBase> keys;
- virtual IKeyIndexBase *queryKeyPart(unsigned partNo)
- {
- if (!keys.isItem(partNo))
- {
- return NULL;
- }
- IKeyIndexBase *key = keys.item(partNo);
- return key;
- }
- virtual unsigned length() { return keys.length(); }
- void addKey(IKeyIndexBase *f) { keys.append(f); }
- };
- extern jhtree_decl IKeyArray *createKeyArray()
- {
- return new CKeyArray;
- }
- #ifdef _USE_CPPUNIT
- #include "unittests.hpp"
- class IKeyManagerTest : public CppUnit::TestFixture
- {
- CPPUNIT_TEST_SUITE( IKeyManagerTest );
- CPPUNIT_TEST(testStepping);
- CPPUNIT_TEST(testKeys);
- CPPUNIT_TEST_SUITE_END();
- void testStepping()
- {
- buildTestKeys(false);
- {
- // We are going to treat as a 7-byte field then a 3-byte field, and request the datasorted by the 3-byte...
- Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
- Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
- Owned<IKeyIndexSet> keyset = createKeyIndexSet();
- keyset->addIndex(index1.getClear());
- keyset->addIndex(index2.getClear());
- const char *json = "{ \"ty1\": { \"fieldType\": 4, \"length\": 7 }, "
- " \"ty2\": { \"fieldType\": 4, \"length\": 3 }, "
- " \"fieldType\": 13, \"length\": 10, "
- " \"fields\": [ "
- " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
- " { \"name\": \"f2\", \"type\": \"ty2\", \"flags\": 4 } ] "
- "}";
- Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
- Owned <IKeyManager> tlk1 = createKeyMerger(meta->queryRecordAccessor(true), keyset, 7, NULL);
- Owned<IStringSet> sset1 = createStringSet(7);
- sset1->addRange("0000003", "0000003");
- sset1->addRange("0000005", "0000006");
- tlk1->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
- Owned<IStringSet> sset2 = createStringSet(3);
- sset2->addRange("010", "010");
- sset2->addRange("030", "033");
- Owned<IStringSet> sset3 = createStringSet(3);
- sset3->addRange("999", "XXX");
- sset3->addRange("000", "002");
- tlk1->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
- tlk1->finishSegmentMonitors();
- tlk1->reset();
- offset_t fpos;
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003010", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005010", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006010", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003030", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005030", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006030", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003031", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005031", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006031", 10)==0);
- MemoryBuffer mb;
- tlk1->serializeCursorPos(mb);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003032", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005032", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006032", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000003033", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000005033", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000006033", 10)==0);
- ASSERT(!tlk1->lookup(true));
- ASSERT(!tlk1->lookup(true));
- Owned <IKeyManager> tlk2 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
- tlk2->setKey(keyset);
- tlk2->deserializeCursorPos(mb);
- tlk2->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
- tlk2->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
- tlk2->finishSegmentMonitors();
- tlk2->reset(true);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003032", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005032", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006032", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000003033", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000005033", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000006033", 10)==0);
- ASSERT(!tlk2->lookup(true));
- ASSERT(!tlk2->lookup(true));
- Owned <IKeyManager> tlk3 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
- tlk3->setKey(keyset);
- tlk3->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
- tlk3->append(createKeySegmentMonitor(false, sset2.getLink(), 1, 7, 3));
- tlk3->finishSegmentMonitors();
- tlk3->reset(false);
- ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003010", 10)==0);
- ASSERT(tlk3->lookupSkip("031", 7, 3)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000003031", 10)==0);
- ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000005031", 10)==0);
- ASSERT(tlk3->lookup(true)); ASSERT(memcmp(tlk3->queryKeyBuffer(), "0000006031", 10)==0);
- ASSERT(!tlk3->lookupSkip("081", 7, 3));
- ASSERT(!tlk3->lookup(true));
- Owned <IKeyManager> tlk4 = createKeyMerger(meta->queryRecordAccessor(true), NULL, 7, NULL);
- tlk4->setKey(keyset);
- tlk4->append(createKeySegmentMonitor(false, sset1.getLink(), 0, 0, 7));
- tlk4->append(createKeySegmentMonitor(false, sset3.getLink(), 1, 7, 3));
- tlk4->finishSegmentMonitors();
- tlk4->reset(false);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003000", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005000", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006000", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003001", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005001", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006001", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003002", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005002", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006002", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000003999", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000005999", 10)==0);
- ASSERT(tlk4->lookup(true)); ASSERT(memcmp(tlk4->queryKeyBuffer(), "0000006999", 10)==0);
- ASSERT(!tlk4->lookup(true));
- ASSERT(!tlk4->lookup(true));
- }
- clearKeyStoreCache(true);
- removeTestKeys();
- }
- void buildTestKeys(bool variable)
- {
- buildTestKey("keyfile1.$$$", false, variable);
- buildTestKey("keyfile2.$$$", true, variable);
- }
- void buildTestKey(const char *filename, bool skip, bool variable)
- {
- OwnedIFile file = createIFile(filename);
- OwnedIFileIO io = file->openShared(IFOcreate, IFSHfull);
- Owned<IFileIOStream> out = createIOStream(io);
- unsigned maxRecSize = variable ? 18 : 10;
- unsigned keyedSize = 10;
- Owned<IKeyBuilder> builder = createKeyBuilder(out, COL_PREFIX | HTREE_FULLSORT_KEY | HTREE_COMPRESSED_KEY | (variable ? HTREE_VARSIZE : 0), maxRecSize, NODESIZE, keyedSize, 0);
- char keybuf[18];
- memset(keybuf, '0', 18);
- for (unsigned count = 0; count < 10000; count++)
- {
- unsigned datasize = 10;
- if (variable && (count % 10)==0)
- {
- char *blob = new char[count+100000];
- byte seed = count;
- for (unsigned i = 0; i < count+100000; i++)
- {
- blob[i] = seed;
- seed = seed * 13 + i;
- }
- offset_t blobid = builder->createBlob(count+100000, blob);
- memcpy(keybuf + 10, &blobid, sizeof(blobid));
- delete [] blob;
- datasize += sizeof(blobid);
- }
- bool skipme = (count % 4 == 0) != skip;
- if (!skipme)
- {
- builder->processKeyData(keybuf, count*10, datasize);
- if (count==48 || count==49)
- builder->processKeyData(keybuf, count*10, datasize);
- }
- unsigned idx = 9;
- for (;;)
- {
- if (keybuf[idx]=='9')
- keybuf[idx--]='0';
- else
- {
- keybuf[idx]++;
- break;
- }
- }
- }
- builder->finish(nullptr, nullptr);
- out->flush();
- }
- void removeTestKeys()
- {
- ASSERT(remove("keyfile1.$$$")==0);
- ASSERT(remove("keyfile2.$$$")==0);
- }
- void checkBlob(IKeyManager *key, unsigned size)
- {
- unsigned __int64 blobid;
- memcpy(&blobid, key->queryKeyBuffer()+10, sizeof(blobid));
- ASSERT(blobid != 0);
- size32_t blobsize;
- const byte *blob = key->loadBlob(blobid, blobsize);
- ASSERT(blob != NULL);
- ASSERT(blobsize == size);
- byte seed = size-100000;
- for (unsigned i = 0; i < size; i++)
- {
- ASSERT(blob[i] == seed);
- seed = seed * 13 + i;
- }
- key->releaseBlobs();
- }
- protected:
- void testKeys(bool variable)
- {
- const char *json = variable ?
- "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
- " \"ty2\": { \"fieldType\": 15, \"length\": 8 }, "
- " \"fieldType\": 13, \"length\": 10, "
- " \"fields\": [ "
- " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
- " { \"name\": \"f3\", \"type\": \"ty2\", \"flags\": 65551 } " // 0x01000f i.e. payload and blob
- " ]"
- "}"
- :
- "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, "
- " \"fieldType\": 13, \"length\": 10, "
- " \"fields\": [ "
- " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, "
- " ] "
- "}";
- Owned<IOutputMetaData> meta = createTypeInfoOutputMetaData(json, nullptr);
- const RtlRecord &recInfo = meta->queryRecordAccessor(true);
- buildTestKeys(variable);
- {
- Owned <IKeyIndex> index1 = createKeyIndex("keyfile1.$$$", 0, false, false);
- Owned <IKeyManager> tlk1 = createLocalKeyManager(recInfo, index1, NULL);
- Owned<IStringSet> sset1 = createStringSet(10);
- sset1->addRange("0000000001", "0000000100");
- tlk1->append(createKeySegmentMonitor(false, sset1.getClear(), 0, 0, 10));
- tlk1->finishSegmentMonitors();
- tlk1->reset();
- Owned <IKeyManager> tlk1a = createLocalKeyManager(recInfo, index1, NULL);
- Owned<IStringSet> sset1a = createStringSet(8);
- sset1a->addRange("00000000", "00000001");
- tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 0, 0, 8));
- tlk1a->append(createKeySegmentMonitor(false, NULL, 1, 8, 1));
- sset1a.setown(createStringSet(1));
- sset1a->addRange("0", "1");
- tlk1a->append(createKeySegmentMonitor(false, sset1a.getClear(), 2, 9, 1));
- tlk1a->finishSegmentMonitors();
- tlk1a->reset();
- Owned<IStringSet> ssetx = createStringSet(10);
- ssetx->addRange("0000000001", "0000000002");
- ASSERT(ssetx->numValues() == 2);
- ssetx->addRange("00000000AK", "00000000AL");
- ASSERT(ssetx->numValues() == 4);
- ssetx->addRange("0000000100", "0010000000");
- ASSERT(ssetx->numValues() == (unsigned) -1);
- ssetx->addRange("0000000001", "0010000000");
- ASSERT(ssetx->numValues() == (unsigned) -1);
- Owned <IKeyIndex> index2 = createKeyIndex("keyfile2.$$$", 0, false, false);
- Owned <IKeyManager> tlk2 = createLocalKeyManager(recInfo, index2, NULL);
- Owned<IStringSet> sset2 = createStringSet(10);
- sset2->addRange("0000000001", "0000000100");
- ASSERT(sset2->numValues() == 65536);
- tlk2->append(createKeySegmentMonitor(false, sset2.getClear(), 0, 0, 10));
- tlk2->finishSegmentMonitors();
- tlk2->reset();
- Owned <IKeyManager> tlk3;
- if (!variable)
- {
- Owned<IKeyIndexSet> both = createKeyIndexSet();
- both->addIndex(index1.getLink());
- both->addIndex(index2.getLink());
- Owned<IStringSet> sset3 = createStringSet(10);
- tlk3.setown(createKeyMerger(recInfo, NULL, 0, NULL));
- tlk3->setKey(both);
- sset3->addRange("0000000001", "0000000100");
- tlk3->append(createKeySegmentMonitor(false, sset3.getClear(), 0, 0, 10));
- tlk3->finishSegmentMonitors();
- tlk3->reset();
- }
- Owned <IKeyManager> tlk2a = createLocalKeyManager(recInfo, index2, NULL);
- Owned<IStringSet> sset2a = createStringSet(10);
- sset2a->addRange("0000000048", "0000000048");
- ASSERT(sset2a->numValues() == 1);
- tlk2a->append(createKeySegmentMonitor(false, sset2a.getClear(), 0, 0, 10));
- tlk2a->finishSegmentMonitors();
- tlk2a->reset();
- Owned <IKeyManager> tlk2b = createLocalKeyManager(recInfo, index2, NULL);
- Owned<IStringSet> sset2b = createStringSet(10);
- sset2b->addRange("0000000047", "0000000049");
- ASSERT(sset2b->numValues() == 3);
- tlk2b->append(createKeySegmentMonitor(false, sset2b.getClear(), 0, 0, 10));
- tlk2b->finishSegmentMonitors();
- tlk2b->reset();
- Owned <IKeyManager> tlk2c = createLocalKeyManager(recInfo, index2, NULL);
- Owned<IStringSet> sset2c = createStringSet(10);
- sset2c->addRange("0000000047", "0000000047");
- tlk2c->append(createKeySegmentMonitor(false, sset2c.getClear(), 0, 0, 10));
- tlk2c->finishSegmentMonitors();
- tlk2c->reset();
- ASSERT(tlk1->getCount() == 76);
- ASSERT(tlk1->getCount() == 76);
- ASSERT(tlk1a->getCount() == 30);
- ASSERT(tlk2->getCount() == 26);
- ASSERT(tlk2a->getCount() == 2);
- ASSERT(tlk2b->getCount() == 2);
- ASSERT(tlk2c->getCount() == 0);
- if (tlk3)
- ASSERT(tlk3->getCount() == 102);
- // MORE - PUT SOME TESTS IN FOR WILD SEEK STUFF
- unsigned pass;
- char buf[11];
- unsigned i;
- for (pass = 0; pass < 2; pass++)
- {
- offset_t fpos;
- tlk1->reset();
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000001", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000002", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000003", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000005", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000006", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000007", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000009", 10)==0);
- ASSERT(tlk1->lookup(true)); ASSERT(memcmp(tlk1->queryKeyBuffer(), "0000000010", 10)==0);
- if (variable)
- checkBlob(tlk1, 10+100000);
- tlk1a->reset();
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000001", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000010", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000011", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000021", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000030", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000031", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000041", 10)==0);
- ASSERT(tlk1a->lookup(true)); ASSERT(memcmp(tlk1a->queryKeyBuffer(), "0000000050", 10)==0);
- tlk2->reset();
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000004", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000008", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000012", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000016", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000020", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000024", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000028", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000032", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000036", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000040", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000044", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000048", 10)==0);
- ASSERT(tlk2->lookup(true)); ASSERT(memcmp(tlk2->queryKeyBuffer(), "0000000052", 10)==0);
- if (tlk3)
- {
- tlk3->reset();
- for (i = 1; i <= 100; i++)
- {
- ASSERT(tlk3->lookup(true));
- sprintf(buf, "%010d", i);
- ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
- if (i==48 || i==49)
- {
- ASSERT(tlk3->lookup(true));
- ASSERT(memcmp(tlk3->queryKeyBuffer(), buf, 10)==0);
- }
- }
- ASSERT(!tlk3->lookup(true));
- ASSERT(!tlk3->lookup(true));
- }
- }
- tlk1->releaseSegmentMonitors();
- tlk2->releaseSegmentMonitors();
- if (tlk3)
- tlk3->releaseSegmentMonitors();
- }
- clearKeyStoreCache(true);
- removeTestKeys();
- }
- void testKeys()
- {
- ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t));
- testKeys(false);
- testKeys(true);
- }
- };
- CPPUNIT_TEST_SUITE_REGISTRATION( IKeyManagerTest );
- CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( IKeyManagerTest, "IKeyManagerTest" );
- #endif
|