123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "jlib.hpp"
- #include "ccd.hpp"
- #include "ccdquery.hpp"
- #include "ccdstate.hpp"
- #include "ccdserver.hpp"
- #include "ccdcontext.hpp"
- #include "ccddebug.hpp"
- #include "ccdactivities.hpp"
- #include "ccdqueue.ipp"
- #include "ccdsnmp.hpp"
- #include "ccdfile.hpp"
- #include "ccdkey.hpp"
- #include "rtlkey.hpp"
- #include "eclrtl_imp.hpp"
- #include "rtlread_imp.hpp"
- #include "rtlcommon.hpp"
- #include "rtldynfield.hpp"
- #include "jhtree.hpp"
- #include "jlog.hpp"
- #include "jmisc.hpp"
- #include "udplib.hpp"
- #include "csvsplitter.hpp"
- #include "thorxmlread.hpp"
- #include "thorcommon.ipp"
- #include "thorstrand.hpp"
- #include "jstats.h"
- size32_t diskReadBufferSize = 0x10000;
- using roxiemem::OwnedRoxieRow;
- using roxiemem::OwnedConstRoxieRow;
- using roxiemem::OwnedRoxieString;
- using roxiemem::IRowManager;
- #define maxContinuationSize 48000 // note - must fit in the 2-byte length field... but also needs to be possible to send back from Roxie server->slave in one packet
- size32_t serializeRow(IOutputRowSerializer * serializer, IMessagePacker *output, const void *unserialized)
- {
- CSizingSerializer sizer;
- serializer->serialize(sizer, (const byte *) unserialized);
- unsigned serializedLength = sizer.size();
- void *udpBuffer = output->getBuffer(serializedLength, true);
- CRawRowSerializer memSerializer(serializedLength, (byte *) udpBuffer);
- serializer->serialize(memSerializer, (const byte *) unserialized);
- assertex(memSerializer.size() == serializedLength);
- output->putBuffer(udpBuffer, serializedLength, true);
- return serializedLength;
- }
- inline void appendBuffer(IMessagePacker * output, size32_t size, const void * data, bool isVariable)
- {
- void *recBuffer = output->getBuffer(size, isVariable);
- memcpy(recBuffer, data, size);
- output->putBuffer(recBuffer, size, isVariable);
- }
- extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val)
- {
- if (val)
- {
- StringBuffer xpath;
- xpath.appendf("att[@name='%s']", statName);
- IPropertyTree *att = node->queryPropTree(xpath.str());
- if (!att)
- {
- att = node->addPropTree("att");
- att->setProp("@name", statName);
- }
- att->setProp("@type", statType);
- att->setPropInt64("@value", val);
- }
- }
- extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val)
- {
- if (val)
- {
- reply.appendf(" <att name='%s' type='%s' value='%" I64F "d'/>\n", statName, statType, val);
- }
- }
- CActivityFactory::CActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
- : queryFactory(_queryFactory),
- helperFactory(_helperFactory),
- id(_id),
- subgraphId(_subgraphId),
- kind(_kind),
- mystats(allStatistics) // We COULD cut down this list but it would complicate the structure, and we do actually track more in the factory than in the activity
- {
- if (helperFactory)
- {
- Owned<IHThorArg> helper = helperFactory();
- meta.set(helper->queryOutputMeta());
- }
- const char *recordTranslationModeHintText = _graphNode.queryProp("hint[@name='layoutTranslation']/@value");
- if (recordTranslationModeHintText)
- recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText);
- }
- void CActivityFactory::addChildQuery(unsigned id, ActivityArray *childQuery)
- {
- childQueries.append(*childQuery);
- childQueryIndexes.append(id);
- }
- ActivityArray *CActivityFactory::queryChildQuery(unsigned idx, unsigned &id)
- {
- if (childQueries.isItem(idx))
- {
- id = childQueryIndexes.item(idx);
- return &childQueries.item(idx);
- }
- id = 0;
- return NULL;
- }
- class CSlaveActivityFactory : public CActivityFactory, implements ISlaveActivityFactory
- {
- public:
- IMPLEMENT_IINTERFACE
- CSlaveActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CActivityFactory(_graphNode.getPropInt("@id", 0), _subgraphId, _queryFactory, _helperFactory, getActivityKind(_graphNode), _graphNode)
- {
- }
- virtual IQueryFactory &queryQueryFactory() const
- {
- return CActivityFactory::queryQueryFactory();
- }
- void addChildQuery(unsigned id, ActivityArray *childQuery)
- {
- CActivityFactory::addChildQuery(id, childQuery);
- }
- StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("%p", this);
- }
- const char *queryQueryName() const
- {
- return queryFactory.queryQueryName();
- }
- virtual ActivityArray *queryChildQuery(unsigned idx, unsigned &id)
- {
- return CActivityFactory::queryChildQuery(idx, id);
- }
- virtual void mergeStats(const CRuntimeStatisticCollection &from) const
- {
- CActivityFactory::mergeStats(from);
- }
- virtual unsigned queryId() const
- {
- return CActivityFactory::queryId();
- }
- virtual ThorActivityKind getKind() const
- {
- return CActivityFactory::getKind();
- }
- virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
- {
- CActivityFactory::getEdgeProgressInfo(idx, edge);
- }
- virtual void getNodeProgressInfo(IPropertyTree &node) const
- {
- CActivityFactory::getNodeProgressInfo(node);
- }
- virtual void resetNodeProgressInfo()
- {
- CActivityFactory::resetNodeProgressInfo();
- }
- virtual void getActivityMetrics(StringBuffer &reply) const
- {
- CActivityFactory::getActivityMetrics(reply);
- }
- IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return queryFactory.createSlaveContext(logctx, packet, childQueries.length()!=0);
- }
- virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
- {
- if (datafile)
- addXrefFileInfo(reply, datafile);
- }
- void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
- {
- if (childQueries.length())
- {
- ForEachItemIn(idx, childQueries)
- {
- if (!_probeManager) // MORE - the probeAllRows is a hack!
- _probeManager = queryContext->queryProbeManager();
- IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx, 1); // MORE - the parent is wrong!
- childGraphs.append(*childGraph);
- queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
- childGraph->onCreate(colocalArg); //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
- }
- }
- }
- Owned<const IResolvedFile> datafile;
- protected:
- static IPropertyTree *queryStatsNode(IPropertyTree *parent, const char *xpath)
- {
- StringBuffer levelx;
- const char *sep = strchr(xpath, '/');
- if (sep)
- levelx.append(sep-xpath, xpath);
- else
- levelx.append(xpath);
- IPropertyTree *child = parent->queryPropTree(levelx);
- if (!child)
- {
- const char *id = strchr(levelx, '[');
- if (!id)
- {
- child = parent->addPropTree(levelx);
- }
- else
- {
- StringBuffer elem;
- elem.append(id-levelx, levelx);
- child = parent->addPropTree(elem);
- for (;;)
- {
- StringBuffer attr, val;
- id++;
- while (*id != '=')
- attr.append(*id++);
- id++;
- char qu = *id++;
- while (*id != qu)
- val.append(*id++);
- child->setProp(attr, val);
- id++;
- if (*id == ']')
- {
- if (id[1]!='[')
- break;
- id++;
- }
- else
- throwUnexpected();
- }
- }
- }
- if (sep)
- return queryStatsNode(child, sep+1);
- else
- return child;
- }
- };
- //================================================================================================
- class CRoxieSlaveActivity : implements CInterfaceOf<IRoxieSlaveActivity>, implements ICodeContext
- {
- protected:
- SlaveContextLogger &logctx;
- Linked<IRoxieQueryPacket> packet;
- mutable Owned<IRoxieSlaveContext> queryContext; // bit of a hack but easier than changing the ICodeContext callback interface to remove const
- const CSlaveActivityFactory *basefactory;
- IArrayOf<IActivityGraph> childGraphs;
- IHThorArg *basehelper;
- PartNoType lastPartNo;
- MemoryBuffer serializedCreate;
- MemoryBuffer resentInfo;
- CachedOutputMetaData meta;
- Owned<IOutputRowSerializer> serializer;
- Owned<IEngineRowAllocator> rowAllocator;
- #ifdef _DEBUG
- Owned<IProbeManager> probeManager;
- #endif
- bool aborted;
- bool resent;
- bool isOpt;
- bool variableFileName;
- RecordTranslationMode allowFieldTranslation;
- Owned<const IResolvedFile> varFileInfo;
- virtual void setPartNo(bool filechanged) = 0;
- inline void checkPartChanged(PartNoType &newPart)
- {
- if (newPart.partNo!=lastPartNo.partNo || newPart.fileNo!=lastPartNo.fileNo)
- {
- lastPartNo.partNo = newPart.partNo;
- bool filechanged = lastPartNo.fileNo != newPart.fileNo;
- lastPartNo.fileNo = newPart.fileNo;
- setPartNo(filechanged);
- }
- }
- virtual bool needsRowAllocator()
- {
- return meta.needsSerializeDisk() || meta.isVariableSize();
- }
- virtual void onCreate()
- {
- queryContext.setown(basefactory->createSlaveContext(logctx, packet));
- #ifdef _DEBUG
- // MORE - need to consider debugging....
- if (probeAllRows)
- probeManager.setown(createProbeManager());
- basefactory->createChildQueries(queryContext, childGraphs, basehelper, probeManager, queryContext, logctx);
- #else
- basefactory->createChildQueries(queryContext, childGraphs, basehelper, NULL, queryContext, logctx);
- #endif
- if (meta.needsSerializeDisk())
- serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
- if (needsRowAllocator())
- rowAllocator.setown(getRowAllocator(meta.queryOriginal(), basefactory->queryId()));
- unsigned parentExtractSize;
- serializedCreate.read(parentExtractSize);
- const byte * parentExtract = serializedCreate.readDirect(parentExtractSize);
- basehelper->onCreate(this, NULL, &serializedCreate);
- basehelper->onStart(parentExtract, &serializedCreate);
- deserializeExtra(serializedCreate);
- if (variableFileName) // note - in keyed join with dependent index case, kj itself won't have variableFileName but indexread might
- {
- CDateTime cacheDate(serializedCreate);
- unsigned checksum;
- serializedCreate.read(checksum);
- OwnedRoxieString fname(queryDynamicFileName());
- varFileInfo.setown(querySlaveDynamicFileCache()->lookupDynamicFile(logctx, fname, cacheDate, checksum, &packet->queryHeader(), isOpt, true));
- setVariableFileInfo();
- }
- }
- virtual void deserializeExtra(MemoryBuffer &out)
- {
- }
- CRoxieSlaveActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory)
- : logctx(_logctx), packet(_packet), basefactory(_factory)
- {
- allowFieldTranslation = _factory->getEnableFieldTranslation();
- resent = packet->getContinuationLength() != 0;
- serializedCreate.setBuffer(packet->getContextLength(), (void *) packet->queryContextData(), false);
- if (resent)
- resentInfo.setBuffer(packet->getContinuationLength(), (void *) packet->queryContinuationData(), false);
- basehelper = _hFactory();
- lastPartNo.partNo = 0xffff;
- lastPartNo.fileNo = 0xffff;
- aborted = false;
- isOpt = false;
- variableFileName = false;
- meta.set(basehelper->queryOutputMeta());
- }
- ~CRoxieSlaveActivity()
- {
- ::Release(basehelper);
- }
- virtual void beforeDispose() override
- {
- CRuntimeStatisticCollection merged(allStatistics);
- logctx.gatherStats(merged);
- if (defaultCollectFactoryStatistics)
- basefactory->mergeStats(merged);
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IRoxieSlaveActivity>)
- virtual const char *queryDynamicFileName() const = 0;
- virtual void setVariableFileInfo() = 0;
- virtual IIndexReadActivityInfo *queryIndexReadActivity() { throwUnexpected(); } // should only be called for index activity
- virtual unsigned queryId()
- {
- return basefactory->queryId();
- }
- virtual bool check()
- {
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- doCheck(output);
- output->flush();
- return true;
- }
- virtual void doCheck(IMessagePacker *output)
- {
- // MORE - unsophisticated default - if this approach seems fruitful then we can add something more thorough
- void *recBuffer = output->getBuffer(sizeof(bool), false);
- bool ret = false;
- memcpy(recBuffer, &ret, sizeof(bool));
- }
- virtual void abort()
- {
- if (logctx.queryTraceLevel() > 2)
- {
- StringBuffer s;
- logctx.CTXLOG("Aborting running activity: %s", packet->queryHeader().toString(s).str());
- }
- aborted = true;
- logctx.abort();
- if (queryContext)
- {
- Owned<IException> E = MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
- queryContext->notifyAbort(E);
- }
- }
- virtual IRoxieQueryPacket *queryPacket() const
- {
- return packet;
- }
- void limitExceeded(bool keyed = false)
- {
- RoxiePacketHeader &header = packet->queryHeader();
- StringBuffer s;
- logctx.CTXLOG("%sLIMIT EXCEEDED: %s", keyed ? "KEYED " : "", header.toString(s).str());
- header.activityId = keyed ? ROXIE_KEYEDLIMIT_EXCEEDED : ROXIE_LIMIT_EXCEEDED;
- Owned<IMessagePacker> output = ROQ->createOutputStream(header, false, logctx);
- output->flush();
- aborted = true;
- logctx.abort();
- }
- virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
- {
- assertex(colocal == basehelper);
- return queryContext->queryCodeContext()->resolveChildQuery(activityId, colocal);
- }
- size32_t serializeRow(IMessagePacker *output, const void *unserialized) const
- {
- return ::serializeRow(serializer, output, unserialized);
- }
- virtual const char *loadResource(unsigned id)
- {
- return queryContext->queryCodeContext()->loadResource(id);
- }
- // Sets should not happen - they can only happen in the main Roxie server context
- virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
- virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
- virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
- virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { throwUnexpected(); }
- virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
- virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
- virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
- virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
- virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { throwUnexpected(); }
- virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
- virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
- virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
- // Some gets are allowed though (e.g. for ONCE values)
- virtual bool getResultBool(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultBool(name, sequence);
- }
- virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultData(tlen, tgt, name, sequence);
- }
- virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
- }
- virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- queryContext->queryCodeContext()->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- queryContext->queryCodeContext()->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual __int64 getResultInt(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultInt(name, sequence);
- }
- virtual double getResultReal(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultReal(name, sequence);
- }
- virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultString(tlen, tgt, name, sequence);
- }
- virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultStringF(tlen, tgt, name, sequence);
- }
- virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultUnicode(tlen, tgt, name, sequence);
- }
- virtual char *getResultVarString(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultVarString(name, sequence);
- }
- virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultVarUnicode(name, sequence);
- }
- virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- return queryContext->queryCodeContext()->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
- }
- virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
- {
- return queryContext->queryCodeContext()->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
- }
- virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
- {
- return queryNullSectionTimer();
- }
- // Not yet thought about these....
- virtual char *getWuid() { throwUnexpected(); } // caller frees return string.
- virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); } // shouldn't really be here, but it broke thor.
- virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { throwUnexpected(); }
- virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); return 0; }
- virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
- virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
- virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
- virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
- virtual unsigned getNodes() { throwUnexpected(); }
- virtual unsigned getNodeNum() { throwUnexpected(); }
- virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); } // caller frees return string.
- virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
- virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
- virtual char *getJobName() { throwUnexpected(); } // caller frees return string.
- virtual char *getJobOwner() { throwUnexpected(); } // caller frees return string.
- virtual char *getClusterName() { throwUnexpected(); } // caller frees return str.
- virtual char *getGroupName() { throwUnexpected(); } // caller frees return string.
- virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
- virtual char *getDaliServers() { return queryContext->queryCodeContext()->getDaliServers(); }
- // The below are called on Roxie server and passed in context
- virtual unsigned getPriority() const { throwUnexpected(); }
- virtual char *getPlatform() { throwUnexpected(); }
- virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
- virtual char *getOS() { throwUnexpected(); }
- virtual unsigned logString(const char *text) const
- {
- if (text && *text)
- {
- logctx.CTXLOG("USER: %s", text);
- return strlen(text);
- }
- else
- return 0;
- }
- virtual const IContextLogger &queryContextLogger() const
- {
- return logctx;
- }
- virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
- {
- return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
- }
- virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
- {
- return queryContext->queryCodeContext()->getRowAllocatorEx(meta, activityId, heapFlags);
- }
- virtual const char *cloneVString(const char *str) const
- {
- return queryContext->queryCodeContext()->cloneVString(str);
- }
- virtual const char *cloneVString(size32_t len, const char *str) const
- {
- return queryContext->queryCodeContext()->cloneVString(len, str);
- }
- virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
- {
- convertRowToXML(lenResult, result, info, row, flags);
- }
- virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
- {
- convertRowToJSON(lenResult, result, info, row, flags);
- }
- const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
- {
- return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
- }
- const void * fromJson(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
- {
- return createRowFromJson(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
- }
- virtual IEngineContext *queryEngineContext() { return NULL; }
- virtual IWorkUnit *updateWorkUnit() const { throwUnexpected(); }
- virtual unsigned getGraphLoopCounter() const override { return queryContext->queryCodeContext()->getGraphLoopCounter(); }
- virtual IEclGraphResults * resolveLocalQuery(__int64 activityId) override { return NULL; } // Only want to do this on the server
- virtual IDebuggableContext *queryDebugContext() const override { return queryContext->queryCodeContext()->queryDebugContext(); }
- virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char * source) override { throwUnexpected(); }
- };
- //================================================================================================
- class OptimizedRowBuilder : public ARowBuilder, public CInterface
- {
- public:
- OptimizedRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output, IOutputRowSerializer * _serializer)
- : dynamicBuilder(_rowAllocator, false), meta(_meta), output(_output), serializer(_serializer)
- {
- useDynamic = serializer != NULL || meta.isVariableSize();
- }
- IMPLEMENT_IINTERFACE
- virtual IEngineRowAllocator *queryAllocator() const
- {
- return dynamicBuilder.queryAllocator();
- }
- virtual byte * createSelf()
- {
- if (useDynamic)
- {
- dynamicBuilder.ensureRow();
- self = dynamicBuilder.getSelf();
- }
- else
- self = static_cast<byte *>(output->getBuffer(meta.getFixedSize(), false));
- return self;
- }
- virtual byte * ensureCapacity(size32_t required, const char * fieldName)
- {
- if (useDynamic)
- {
- self = dynamicBuilder.ensureCapacity(required, fieldName);
- return static_cast<byte *>(self);
- }
- else
- {
- size32_t fixedLength = meta.getFixedSize();
- if (required <= fixedLength)
- return static_cast<byte *>(self);
- // This should never happen!
- rtlReportFieldOverflow(required, fixedLength, fieldName);
- return NULL;
- }
- }
- virtual void reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "OptimizedRowBuilder::row() is NULL");
- }
- inline void ensureRow()
- {
- if (!self)
- createSelf();
- }
- inline void clear()
- {
- if (useDynamic)
- dynamicBuilder.clear();
- self = NULL;
- }
- size_t writeToOutput(size32_t transformedSize, bool outputIfEmpty)
- {
- size32_t outputSize = transformedSize;
- if (transformedSize || outputIfEmpty)
- {
- if (useDynamic)
- {
- OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
- if (serializer)
- outputSize = serializeRow(serializer, output, result);
- else
- {
- self = static_cast<byte *>(output->getBuffer(transformedSize, true));
- memcpy(self, result, transformedSize);
- output->putBuffer(self, transformedSize, true);
- }
- }
- else
- {
- output->putBuffer(self, transformedSize, false);
- }
- }
- clear();
- return outputSize;
- }
- private:
- RtlDynamicRowBuilder dynamicBuilder;
- const CachedOutputMetaData & meta;
- IMessagePacker * output;
- IOutputRowSerializer * serializer;
- bool useDynamic;
- };
- class OptimizedKJRowBuilder : public ARowBuilder, public CInterface
- {
- // Rules are different enough that we can't easily derive from OptimizedRowBuilder
- public:
- IMPLEMENT_IINTERFACE;
- OptimizedKJRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output)
- : dynamicBuilder(_rowAllocator, false), meta(_meta), output(_output)
- {
- useDynamic = meta.isVariableSize();
- }
- virtual IEngineRowAllocator *queryAllocator() const
- {
- return dynamicBuilder.queryAllocator();
- }
- virtual byte * createSelf()
- {
- if (useDynamic)
- {
- dynamicBuilder.ensureRow();
- self = dynamicBuilder.getSelf();
- return self;
- }
- else
- {
- self = static_cast<byte *>(output->getBuffer(KEYEDJOIN_RECORD_SIZE(meta.getFixedSize()), true)) + KEYEDJOIN_RECORD_SIZE(0);
- return self ;
- }
- }
- virtual byte * ensureCapacity(size32_t required, const char * fieldName)
- {
- if (useDynamic)
- {
- self = dynamicBuilder.ensureCapacity(required, fieldName);
- return self;
- }
- else
- {
- size32_t fixedLength = meta.getFixedSize();
- if (required <= fixedLength)
- return self;
- rtlReportFieldOverflow(required, fixedLength, fieldName);
- return NULL;
- }
- }
- virtual void reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "OptimizedKJRowBuilder::row() is NULL");
- }
- inline void ensureRow()
- {
- if (!self)
- createSelf();
- }
- void writeToOutput(size32_t transformedSize, offset_t recptr, CJoinGroup *jg, unsigned short partNo)
- {
- KeyedJoinHeader *rec;
- if (useDynamic)
- {
- OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
- rec = (KeyedJoinHeader *) (output->getBuffer(KEYEDJOIN_RECORD_SIZE(transformedSize), true));
- memcpy(&rec->rhsdata, result, transformedSize);
- }
- else
- {
- rec = (KeyedJoinHeader *)(self - KEYEDJOIN_RECORD_SIZE(0));
- }
- rec->fpos = recptr;
- rec->thisGroup = jg;
- rec->partNo = partNo;
- output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(transformedSize), true);
- self = NULL;
- }
- private:
- RtlDynamicRowBuilder dynamicBuilder;
- const CachedOutputMetaData & meta;
- IMessagePacker * output;
- bool useDynamic;
- };
- //================================================================================================
- class CRoxieDiskReadBaseActivity : public CRoxieSlaveActivity, implements IIndexReadContext//, implements IDiskReadActivity
- {
- protected:
- IHThorDiskReadBaseArg *helper;
- unsigned processed;
- unsigned parallelPartNo;
- unsigned numParallel;
- bool isGrouped = false;
- bool forceUnkeyed;
- offset_t readPos;
- CachedOutputMetaData diskSize;
- ScoredRowFilter postFilter;
- Owned<IDirectReader> reader;
- Linked<IInMemoryIndexManager> manager;
- Linked<ITranslatorSet> translators;
- Owned<IInMemoryFileProcessor> processor;
- CriticalSection pcrit;
- public:
- CRoxieDiskReadBaseActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- parallelPartNo(_parallelPartNo),
- numParallel(_numParallel),
- forceUnkeyed(_forceUnkeyed),
- manager(_manager),
- translators(_translators)
- {
- helper = (IHThorDiskReadBaseArg *) basehelper;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
- isOpt = (helper->getFlags() & TDRoptional) != 0;
- diskSize.set(helper->queryProjectedDiskRecordSize()->querySerializedDiskMeta());
- isGrouped = diskSize.isGrouped();
- processed = 0;
- readPos = 0;
- if (resent)
- {
- }
- }
- virtual void onCreate()
- {
- CRoxieSlaveActivity::onCreate();
- helper->createSegmentMonitors(this);
- const IKeyTranslator *keyTranslator = translators->queryKeyTranslator(0); // any part would do - in-memory requires all actuals to have same layout
- if (keyTranslator)
- keyTranslator->translate(postFilter);
- if (resent)
- {
- bool usedKey;
- resentInfo.read(processed);
- resentInfo.read(usedKey);
- if (usedKey)
- reader.setown(manager->selectKey(resentInfo, postFilter, translators));
- else
- {
- resentInfo.read(readPos);
- reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
- }
- assertex(resentInfo.remaining() == 0);
- }
- else
- {
- if (!forceUnkeyed && !isGrouped)
- reader.setown(manager->selectKey(postFilter, translators, logctx));
- if (!reader)
- reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
- }
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned channel = packet->queryHeader().channel;
- unsigned projectedCrc = helper->getProjectedFormatCrc();
- unsigned expectedCrc = helper->getDiskFormatCrc();
- translators.setown(varFileInfo->getTranslators(projectedCrc, helper->queryProjectedDiskRecordSize(), expectedCrc, helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false));
- manager.setown(varFileInfo->getIndexManager(isOpt, channel, translators->queryActualLayout(0), false));
- }
- inline bool queryKeyed() const
- {
- return reader->isKeyed();
- }
- void setParallel(unsigned _partno, unsigned _numParallel)
- {
- assertex(!processor);
- parallelPartNo = _partno;
- numParallel = _numParallel;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
- }
- virtual void append(IKeySegmentMonitor *segment)
- {
- segment->Release();
- throwUnexpected();
- }
- virtual void append(FFoption option, const IFieldFilter * filter)
- {
- if (filter->isWild())
- filter->Release();
- else
- postFilter.addFilter(*filter);
- }
- virtual void abort()
- {
- CRoxieSlaveActivity::abort();
- CriticalBlock p(pcrit);
- if (processor)
- processor->abort();
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieDiskReadBaseActivity::process");
- diskReadStarted++;
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- doProcess(output);
- helper->setCallback(NULL);
- if (aborted)
- return NULL;
- else
- {
- diskReadCompleted++;
- return output.getClear();
- }
- }
- virtual void doCheck(IMessagePacker *output)
- {
- // for in-memory diskread activities, not a lot to check. If it got this far the answer is 'true'...
- void *recBuffer = output->getBuffer(sizeof(bool), false);
- bool ret = true;
- memcpy(recBuffer, &ret, sizeof(bool));
- }
- virtual void doProcess(IMessagePacker * output) = 0;
-
- virtual void setPartNo(bool filechanged)
- {
- throwUnexpected();
- }
- };
- class CRoxieDiskBaseActivityFactory : public CSlaveActivityFactory
- {
- protected:
- Owned<ITranslatorSet> translators;
- Owned<IInMemoryIndexManager> manager;
- public:
- CRoxieDiskBaseActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorDiskReadBaseArg> helper = (IHThorDiskReadBaseArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
- bool isCodeSigned = isActivityCodeSigned(_graphNode);
- if (!variableFileName)
- {
- bool isOpt = (helper->getFlags() & TDRoptional) != 0;
- OwnedRoxieString fileName(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- unsigned channel = queryFactory.queryChannel();
- unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
- unsigned expectedFormatCrc = helper->getDiskFormatCrc();
- translators.setown(datafile->getTranslators(projectedFormatCrc, helper->queryProjectedDiskRecordSize(), expectedFormatCrc, helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- manager.setown(datafile->getIndexManager(isOpt, channel, translators->queryActualLayout(0), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false)));
- Owned<IPropertyTreeIterator> memKeyInfo = queryFactory.queryPackage().getInMemoryIndexInfo(_graphNode);
- Owned<IPropertyTree> memKeyHint;
- if (!memKeyInfo && _graphNode.hasProp("hint[@name=\"key\"]"))
- {
- memKeyHint.setown(createPTreeFromXMLString(_graphNode.queryProp("hint[@name=\"key\"]/@value")));
- memKeyInfo.setown(memKeyHint->getElements("MemIndex"));
- }
- if (memKeyInfo)
- {
- // There's a small potential flaw that I can end up with indexes from a query that was unloaded active on some nodes but not others
- // which could mess up continuation. I think I don't care.
- ForEach(*memKeyInfo)
- {
- IPropertyTree &info = memKeyInfo->query();
- manager->setKeyInfo(info);
- }
- }
- }
- else
- manager.setown(createInMemoryIndexManager(helper->queryProjectedDiskRecordSize()->queryRecordAccessor(true), true, nullptr));
- }
- }
- ~CRoxieDiskBaseActivityFactory()
- {
- }
- };
- //================================================================================================
- class CRoxieDiskReadActivity;
- class CRoxieCsvReadActivity;
- class CRoxieXmlReadActivity;
- IInMemoryFileProcessor *createReadRecordProcessor(CRoxieDiskReadActivity &owner, bool isGrouped, IDirectReader *reader);
- IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize);
- IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *reader);
- class CRoxieDiskReadActivity : public CRoxieDiskReadBaseActivity
- {
- friend class ReadRecordProcessor;
- protected:
- IHThorDiskReadArg *helper;
- public:
- CRoxieDiskReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit); // because of race with abort.
- processor.setown(createReadRecordProcessor(*this, isGrouped, reader));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, const void *src) const
- {
- if (likely(helper->canMatch(src)))
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder, src);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- else
- return 0;
- }
- };
- class CRoxieCsvReadActivity : public CRoxieDiskReadBaseActivity
- {
- public:
- friend class CsvRecordProcessor;
- protected:
- IHThorCsvReadArg *helper;
- const IResolvedFile *datafile;
- size32_t maxRowSize;
- public:
- CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, const IResolvedFile *_datafile, size32_t _maxRowSize)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true), datafile(_datafile), maxRowSize(_maxRowSize)
- {
- onCreate();
- helper = (IHThorCsvReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("CsvRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(
- createCsvRecordProcessor(*this,
- manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators),
- packet->queryHeader().channel==1 && !resent,
- varFileInfo ? varFileInfo.get() : datafile, maxRowSize));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, unsigned *srcLen, const char **src) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder, srcLen, src);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieXmlReadActivity : public CRoxieDiskReadBaseActivity
- {
- public:
- friend class XmlRecordProcessor;
- protected:
- IHThorXmlReadArg *helper;
- public:
- CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true)
- {
- onCreate();
- helper = (IHThorXmlReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("XmlRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createXmlRecordProcessor(*this, manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators)));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, IXmlToRowTransformer *rowTransformer, IColumnProvider *lastMatch, IThorDiskCallback *callback) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = rowTransformer->transform(rowBuilder, lastMatch, callback);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieDiskReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- class CRoxieCsvReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- size32_t maxRowSize;
- public:
- CRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- maxRowSize = defaultMaxCsvRowSize * 1024 * 1024;
- IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
- if (workunit)
- maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, translators, datafile, maxRowSize);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- class CRoxieXmlReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- class RecordProcessor : implements IInMemoryFileProcessor, public CInterface
- {
- protected:
- bool aborted = false;
- IDirectReader *reader;
- public:
- IMPLEMENT_IINTERFACE
- RecordProcessor(IDirectReader *_reader) : reader(_reader) {}
- virtual void abort()
- {
- aborted = true;
- }
- };
- //================================================================================================
- // Disk read activity
- class ReadRecordProcessor : public RecordProcessor
- {
- public:
- ReadRecordProcessor(CRoxieDiskReadActivity &_owner, bool _isGrouped, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner), isGrouped(_isGrouped)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- helper->setCallback(reader);
- unsigned lastGroupProcessed = processed;
- bool someInGroup = false;
- while (!aborted)
- {
- // This loop is the inner loop for memory diskreads - so keep it efficient!
- const byte *nextRec = reader->nextRow();
- if (nextRec)
- {
- someInGroup = true;
- size32_t transformedSize = owner.doTransform(output, nextRec);
- reader->finishedRow();
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize && !isGrouped)
- break;
- }
- }
- else if (isGrouped)
- {
- if (processed>lastGroupProcessed)
- break; // We return grouped data one whole group at a time
- else if (!someInGroup) // eof
- return;
- }
- else
- return;
- }
- // Send continuation message indicating end of group or too much data
- if (!aborted)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- }
- }
- protected:
- CRoxieDiskReadActivity &owner;
- IHThorDiskReadArg *helper;
- bool isGrouped;
- };
- IInMemoryFileProcessor *createReadRecordProcessor(CRoxieDiskReadActivity &owner, bool isGrouped, IDirectReader *_reader)
- {
- return new ReadRecordProcessor(owner, isGrouped, _reader);
- }
- //================================================================================================
- // RecordProcessor used by CSV read activity. We don't try to index these ...
- class CsvRecordProcessor : public RecordProcessor
- {
- protected:
- CRoxieCsvReadActivity &owner;
- IHThorCsvReadArg *helper;
- bool skipHeader;
- const IResolvedFile *datafile;
- size32_t maxRowSize;
- public:
- CsvRecordProcessor(CRoxieCsvReadActivity &_owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *_datafile, size32_t _maxRowSize)
- : RecordProcessor(_reader), owner(_owner), datafile(_datafile), maxRowSize(_maxRowSize)
- {
- // NOTE - postfilter not used at present - but may be in future
- helper = _owner.helper;
- skipHeader = _skipHeader;
- helper->setCallback(reader);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- ICsvParameters * csvInfo = helper->queryCsvParameters();
- unsigned headerLines = skipHeader ? csvInfo->queryHeaderLen() : 0;
- const char *quotes = NULL;
- const char *separators = NULL;
- const char *terminators = NULL;
- const char *escapes = NULL;
- CSVSplitter csvSplitter;
- if (datafile)
- {
- const IPropertyTree *options = datafile->queryProperties();
- if (options)
- {
- quotes = options->queryProp("@csvQuote");
- separators = options->queryProp("@csvSeparate");
- terminators = options->queryProp("@csvTerminate");
- escapes = options->queryProp("@csvEscape");
- }
- }
- csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
- ISerialStream *stream = reader->queryDirectStreamReader();
- while (!aborted)
- {
- size32_t thisLineLength = csvSplitter.splitLine(stream, maxRowSize);
- if (0 == thisLineLength)
- break;
- if (headerLines)
- {
- headerLines--;
- stream->skip(thisLineLength);
- }
- else
- {
- unsigned transformedSize = owner.doTransform(output, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData());
- stream->skip(thisLineLength);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return;
- }
- }
- }
- }
- }
- };
- //================================================================================================
- // RecordProcessor used by XML read activity. We don't try to index these...
- class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect, implements IThorDiskCallback
- {
- public:
- IMPLEMENT_IINTERFACE;
- XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner), streamReader(_reader->queryDirectStreamReader()), fileposition(0)
- {
- helper = _owner.helper;
- helper->setCallback(this);
- }
- //interface IThorDiskCallback
- virtual unsigned __int64 getFilePosition(const void * row)
- {
- return fileposition;
- }
- virtual unsigned __int64 getLocalFilePosition(const void * row)
- {
- return streamReader->makeFilePositionLocal(fileposition);
- }
- virtual const char * queryLogicalFilename(const void * row)
- {
- return reader->queryLogicalFilename(row);
- }
- virtual const byte * lookupBlob(unsigned __int64 id) { throwUnexpected(); }
- virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
- {
- fileposition = startOffset;
- lastMatch.set(&entry);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- // Note: xml read does not support continuation record stuff as too hard to serialize state of xml parser
- Linked<IXmlToRowTransformer> rowTransformer = helper->queryTransformer();
- OwnedRoxieString xmlIterator(helper->getXmlIteratorPath());
- Owned<IXMLParse> xmlParser;
- if (owner.basefactory->getKind() == TAKjsonread)
- xmlParser.setown(createJSONParse(*streamReader, xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
- else
- xmlParser.setown(createXMLParse(*streamReader, xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
- while (!aborted)
- {
- //call to next() will callback on the IXmlSelect interface
- bool gotNext = false;
- gotNext = xmlParser->next();
- if(!gotNext)
- break;
- else if (lastMatch)
- {
- unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, this);
- lastMatch.clear();
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- }
- }
- }
- }
- protected:
- CRoxieXmlReadActivity &owner;
- IHThorXmlReadArg *helper;
- Owned<IColumnProvider> lastMatch;
- IDirectStreamReader *streamReader;
- unsigned __int64 fileposition;
- };
- IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
- {
- return new CsvRecordProcessor(owner, _reader, _skipHeader, datafile, maxRowSize);
- }
- IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *_reader)
- {
- return new XmlRecordProcessor(owner, _reader);
- }
- ISlaveActivityFactory *createRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieCsvReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieXmlReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskNormalizeActivity;
- IInMemoryFileProcessor *createNormalizeRecordProcessor(CRoxieDiskNormalizeActivity &owner, IDirectReader *_reader);
- class CRoxieDiskNormalizeActivity : public CRoxieDiskReadBaseActivity
- {
- friend class NormalizeRecordProcessor;
- protected:
- IHThorDiskNormalizeArg *helper;
- public:
- CRoxieDiskNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskNormalizeArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskNormalize %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createNormalizeRecordProcessor(*this, reader));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doNormalizeTransform(IMessagePacker * output) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieDiskNormalizeActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskNormalize "));
- }
- };
- ISlaveActivityFactory *createRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- // RecordProcessors used by Disk Normalize activity.
- class NormalizeRecordProcessor : public RecordProcessor
- {
- public:
- NormalizeRecordProcessor(CRoxieDiskNormalizeActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- helper->setCallback(reader);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- if (helper->first(nextRec))
- {
- do
- {
- size32_t transformedSize = owner.doNormalizeTransform(output);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- totalSizeSent += transformedSize;
- if (processed == stopAfter)
- return;
- }
- } while (helper->next());
- }
- reader->finishedRow();
- if (totalSizeSent > indexReadChunkSize)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return;
- }
- }
- }
- protected:
- CRoxieDiskNormalizeActivity &owner;
- IHThorDiskNormalizeArg *helper;
- };
- IInMemoryFileProcessor *createNormalizeRecordProcessor(CRoxieDiskNormalizeActivity &owner, IDirectReader *_reader)
- {
- return new NormalizeRecordProcessor(owner, _reader);
- }
- //================================================================================================
- class CRoxieDiskCountActivity;
- IInMemoryFileProcessor *createCountRecordProcessor(CRoxieDiskCountActivity &owner, IDirectReader *reader);
- class CRoxieDiskCountActivity : public CRoxieDiskReadBaseActivity
- {
- friend class CountRecordProcessor;
- protected:
- IHThorDiskCountArg *helper;
- public:
- CRoxieDiskCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskCountArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskCount %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createCountRecordProcessor(*this, reader));
- }
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, (unsigned __int64) -1, stopAfter);
- }
- };
- class CRoxieDiskCountActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessors used by Disk Count activity.
- class CountRecordProcessor : public RecordProcessor
- {
- public:
- CountRecordProcessor(CRoxieDiskCountActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned outputRecordSize = owner.meta.getFixedSize();
- void *recBuffer = output->getBuffer(outputRecordSize, false);
- helper->setCallback(reader);
- unsigned __int64 totalCount = 0;
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- totalCount += helper->numValid(nextRec);
- if (totalCount >= stopAfter)
- {
- totalCount = stopAfter;
- break;
- }
- reader->finishedRow();
- }
- if (!aborted)
- {
- assert(!owner.serializer); // A count can never need serializing, surely!
- if (outputRecordSize == 1)
- *(byte *)recBuffer = (byte)totalCount;
- else
- {
- assertex(outputRecordSize == sizeof(unsigned __int64));
- *(unsigned __int64 *)recBuffer = totalCount;
- }
- output->putBuffer(recBuffer, outputRecordSize, false);
- }
- }
- protected:
- CRoxieDiskCountActivity &owner;
- IHThorDiskCountArg *helper;
- };
- IInMemoryFileProcessor *createCountRecordProcessor(CRoxieDiskCountActivity &owner, IDirectReader *reader)
- {
- return new CountRecordProcessor(owner, reader);
- }
- ISlaveActivityFactory *createRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskAggregateActivity;
- IInMemoryFileProcessor *createAggregateRecordProcessor(CRoxieDiskAggregateActivity &owner, IDirectReader *_reader);
- class CRoxieDiskAggregateActivity : public CRoxieDiskReadBaseActivity
- {
- friend class AggregateRecordProcessor;
- protected:
- IHThorDiskAggregateArg *helper;
- public:
- CRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, _parallelPartNo, _numParallel, _forceUnkeyed)
- {
- onCreate();
- helper = (IHThorDiskAggregateArg *) basehelper;
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskAggregate %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createAggregateRecordProcessor(*this, reader));
- }
- processor->doQuery(output, 0, 0, 0);
- }
- };
- //================================================================================================
- class CParallelRoxieActivity : public CRoxieSlaveActivity
- {
- protected:
- IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> parts;
- unsigned numParallel;
- CriticalSection parCrit;
- Owned<IOutputRowDeserializer> deserializer;
- public:
- CParallelRoxieActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory, unsigned _numParallel)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _factory), numParallel(_numParallel)
- {
- assertex(numParallel > 1);
- }
- virtual void abort()
- {
- ForEachItemIn(idx, parts)
- {
- parts.item(idx).abort();
- }
- }
- virtual void setPartNo(bool filechanged) { throwUnexpected(); }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return parts.item(0).toString(ret);
- }
- virtual const char *queryDynamicFileName() const
- {
- throwUnexpected();
- }
- virtual void setVariableFileInfo()
- {
- throwUnexpected();
- }
- virtual void doProcess(IMessagePacker * output) = 0;
- virtual void processRow(CDummyMessagePacker &output) = 0;
- virtual IMessagePacker *process()
- {
- if (numParallel == 1)
- {
- return parts.item(0).process();
- }
- else
- {
- MTIME_SECTION(queryActiveTimer(), "CParallelRoxieActivity::process");
- diskReadStarted++;
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- class casyncfor: public CAsyncFor
- {
- IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> &parts;
- CParallelRoxieActivity &parent;
- public:
- casyncfor(IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> &_parts, CParallelRoxieActivity &_parent)
- : parts(_parts), parent(_parent)
- {
- }
- void Do(unsigned i)
- {
- try
- {
- CDummyMessagePacker d;
- parts.item(i).doProcess(&d);
- d.flush();
- parent.processRow(d);
- }
- catch (IException *)
- {
- // if one throws exception, may as well abort the rest
- parent.abort();
- throw;
- }
- }
- } afor(parts, *this);
- afor.For(numParallel, numParallel);
- //for (unsigned i = 0; i < numParallel; i++) afor.Do(i); // use this instead of line above to make them serial - handy for debugging!
- if (aborted)
- return NULL;
- else
- {
- doProcess(output);
- diskReadCompleted++;
- return output.getClear();
- }
- }
- }
- };
- class CParallelRoxieDiskAggregateActivity : public CParallelRoxieActivity
- {
- protected:
- IHThorDiskAggregateArg *helper;
- OwnedConstRoxieRow finalRow;
- public:
- CParallelRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
- CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel)
- {
- helper = (IHThorDiskAggregateArg *) basehelper;
- onCreate();
- if (meta.needsSerializeDisk())
- {
- // MORE - avoiding serializing to dummy would be more efficient...
- deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
- }
- CRoxieDiskAggregateActivity *part0 = new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
- parts.append(*part0);
- if (part0->queryKeyed())
- {
- numParallel = 1;
- part0->setParallel(0, 1);
- }
- else
- {
- for (unsigned i = 1; i < numParallel; i++)
- parts.append(*new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
- }
- }
- ~CParallelRoxieDiskAggregateActivity()
- {
- finalRow.clear();
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual void doProcess(IMessagePacker *output)
- {
- if (!aborted)
- {
- if (!finalRow)
- {
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size32_t size = helper->clearAggregate(rowBuilder);
- finalRow.setown(rowBuilder.finalizeRowClear(size));
- }
- //GH-RKC: This can probably be cleaner and more efficient using the OptimizedRowBuilder class introduced since I fixed this code
- if (serializer)
- serializeRow(output, finalRow);
- else
- {
- size32_t size = meta.getRecordSize(finalRow);
- appendBuffer(output, size, finalRow, meta.isVariableSize());
- }
- }
- finalRow.clear();
- helper->setCallback(NULL);
- }
- virtual void processRow(CDummyMessagePacker &d)
- {
- CriticalBlock c(parCrit);
- MemoryBuffer &m = d.data;
- RtlDynamicRowBuilder finalBuilder(rowAllocator, false);
- if (deserializer)
- {
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
- CThorStreamDeserializerSource rowSource(stream);
- while (m.remaining())
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
- dbgassertex(outsize==(size_t) *rowLen);
- if (!finalBuilder.exists())
- finalBuilder.swapWith(rowBuilder);
- else
- {
- const void * deserialized = rowBuilder.finalizeRowClear(outsize);
- helper->mergeAggregate(finalBuilder, deserialized);
- ReleaseRoxieRow(deserialized);
- }
- }
- }
- else
- {
- RecordLengthType len = meta.getFixedSize();
- while (m.remaining())
- {
- const void *row;
- if (!meta.isFixedSize())
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- len = *rowLen;
- }
- row = m.readDirect(len);
- if (!finalBuilder.exists())
- cloneRow(finalBuilder, row, meta);
- else
- helper->mergeAggregate(finalBuilder, row);
- }
- }
- if (finalBuilder.exists())
- {
- size32_t finalSize = meta.getRecordSize(finalBuilder.getSelf()); // MORE - can probably track it above...
- finalRow.setown(finalBuilder.finalizeRowClear(finalSize));
- }
- }
- };
- class CRoxieDiskAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- if (parallelAggregate > 1)
- return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
- else
- return new CRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessor used by Disk Aggregate activity.
- class AggregateRecordProcessor : public RecordProcessor
- {
- public:
- AggregateRecordProcessor(CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- helper->setCallback(reader);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
- helper->clearAggregate(rowBuilder);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- helper->processRow(rowBuilder, nextRec);
- reader->finishedRow();
- }
- if (!aborted)
- {
- if (helper->processedAnyRows())
- {
- size32_t finalSize = owner.meta.getRecordSize(rowBuilder.getSelf());
- rowBuilder.writeToOutput(finalSize, true);
- }
- }
- }
- protected:
- CRoxieDiskAggregateActivity &owner;
- IHThorDiskAggregateArg *helper;
- };
- IInMemoryFileProcessor *createAggregateRecordProcessor(CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
- {
- return new AggregateRecordProcessor(_owner, _reader);
- }
- ISlaveActivityFactory *createRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskGroupAggregateActivity : public CRoxieDiskReadBaseActivity
- {
- protected:
- IHThorDiskGroupAggregateArg *helper;
- RowAggregator results;
- void outputResults(IMessagePacker *output)
- {
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = results.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- }
- public:
- CRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned partNo, unsigned numParts, bool _forceUnkeyed)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, partNo, numParts, _forceUnkeyed),
- helper((IHThorDiskGroupAggregateArg *) basehelper),
- results(*helper, *helper)
- {
- onCreate();
- results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskGroupAggregate %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createGroupAggregateRecordProcessor(results, *helper, reader));
- }
- processor->doQuery(output, 0, 0, 0);
- if (!aborted)
- outputResults(output);
- results.reset();
- }
- };
- class CParallelRoxieDiskGroupAggregateActivity : public CParallelRoxieActivity
- {
- protected:
- IHThorDiskGroupAggregateArg *helper;
- RowAggregator resultAggregator;
- Owned<IRowManager> rowManager;
- public:
- CParallelRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
- CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel),
- helper((IHThorDiskGroupAggregateArg *) basehelper),
- resultAggregator(*helper, *helper)
- {
- onCreate();
- resultAggregator.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- if (meta.needsSerializeDisk())
- {
- // MORE - avoiding serializing to dummy would be more efficient...
- deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
- }
- CRoxieDiskGroupAggregateActivity *part0 = new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
- parts.append(*part0);
- if (part0->queryKeyed())
- {
- numParallel = 1;
- part0->setParallel(0, 1);
- }
- else
- {
- for (unsigned i = 1; i < numParallel; i++)
- parts.append(*new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
- }
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual void doProcess(IMessagePacker *output)
- {
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = resultAggregator.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- resultAggregator.reset();
- helper->setCallback(NULL);
- }
- void processRow(CDummyMessagePacker &d)
- {
- CriticalBlock b(parCrit); // MORE - use a spinlock
- MemoryBuffer &m = d.data;
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
- CThorStreamDeserializerSource rowSource(stream);
- while (m.remaining())
- {
- const void *row;
- if (meta.isFixedSize() && !deserializer)
- {
- row = m.readDirect(meta.getFixedSize());
- resultAggregator.mergeElement(row);
- }
- else
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- RecordLengthType len = *rowLen;
- if (deserializer)
- {
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
- OwnedConstRoxieRow deserialized = rowBuilder.finalizeRowClear(outsize);
- resultAggregator.mergeElement(deserialized);
- }
- else
- {
- row = m.readDirect(len);
- resultAggregator.mergeElement(row);
- }
- }
- }
- }
- };
- class CRoxieDiskGroupAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- if (parallelAggregate > 1)
- return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
- else
- return new CRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessors used by Disk Group Aggregate activity.
- class GroupAggregateRecordProcessor : public RecordProcessor, implements IHThorGroupAggregateCallback
- {
- public:
- IMPLEMENT_IINTERFACE;
- GroupAggregateRecordProcessor(RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper, IDirectReader *_reader)
- : RecordProcessor(_reader), results(_results), helper(_helper)
- {
- }
- virtual void processRow(const void * next)
- {
- results.addRow(next);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- helper.setCallback(reader);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- helper.processRow(nextRec, this);
- reader->finishedRow();
- }
- }
- protected:
- RowAggregator &results;
- IHThorDiskGroupAggregateArg &helper;
- };
- IInMemoryFileProcessor *createGroupAggregateRecordProcessor(RowAggregator &results, IHThorDiskGroupAggregateArg &helper, IDirectReader *reader)
- {
- return new GroupAggregateRecordProcessor(results, helper, reader);
- }
- ISlaveActivityFactory *createRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedActivityFactory : public CSlaveActivityFactory
- {
- friend class CRoxieKeyedActivity;
- protected:
- Owned<IKeyArray> keyArray;
- Owned<ITranslatorSet> translators;
- unsigned projectedCrc = 0;
- unsigned expectedCrc = 0;
- IOutputMetaData *projectedMeta = nullptr;
- IOutputMetaData *expectedMeta = nullptr;
- CRoxieKeyedActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- };
- class CRoxieIndexActivityFactory : public CRoxieKeyedActivityFactory
- {
- public:
- CRoxieIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- void init(IHThorIndexReadBaseArg * helper, IPropertyTree &graphNode)
- {
- projectedCrc = helper->getProjectedFormatCrc();
- expectedCrc = helper->getDiskFormatCrc();
- projectedMeta = helper->queryProjectedDiskRecordSize();
- expectedMeta = helper->queryDiskRecordSize();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isCodeSigned = isActivityCodeSigned(graphNode);
- bool isOpt = (helper->getFlags() & TIRoptional) != 0;
- OwnedRoxieString indexName(helper->getFileName());
- datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- translators.setown(datafile->getTranslators(projectedCrc, projectedMeta, expectedCrc, expectedMeta, queryFactory.queryOptions().enableFieldTranslation, true));
- keyArray.setown(datafile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- };
- class CRoxieKeyedActivity : public CRoxieSlaveActivity
- {
- // Common base class for all activities that deal with keys - keyed join or indexread and its allies
- protected:
- Owned<IKeyManager> tlk;
- Linked<ITranslatorSet> translators;
- Linked<IKeyArray> keyArray;
- const RtlRecord *keyRecInfo = nullptr;
- bool createSegmentMonitorsPending;
- virtual bool hasNewSegmentMonitors() = 0;
- virtual void createSegmentMonitors() = 0;
- virtual void setPartNo(bool filechanged)
- {
- if (!lastPartNo.partNo) // Check for ,LOCAL indexes
- {
- assertex(filechanged);
- Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
- for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
- if (kib)
- {
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (k)
- {
- assertex(!k->isTopLevelKey());
- allKeys->addIndex(LINK(k));
- }
- }
- }
- if (allKeys->numParts())
- {
- tlk.setown(createKeyMerger(*keyRecInfo, allKeys, 0, &logctx, hasNewSegmentMonitors(), !logctx.isBlind()));
- createSegmentMonitorsPending = true;
- }
- else
- tlk.clear();
- }
- else
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(lastPartNo.partNo);
- assertex(kib != NULL);
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (filechanged)
- {
- tlk.setown(createLocalKeyManager(*keyRecInfo, k, &logctx, hasNewSegmentMonitors(), !logctx.isBlind()));
- createSegmentMonitorsPending = true;
- }
- else
- tlk->setKey(k);
- }
- }
- virtual void setVariableFileInfo()
- {
- const CRoxieKeyedActivityFactory &aFactory = *static_cast<const CRoxieKeyedActivityFactory *>(basefactory);
- translators.setown(varFileInfo->getTranslators(aFactory.projectedCrc, aFactory.projectedMeta, aFactory.expectedCrc, aFactory.expectedMeta, allowFieldTranslation, true));
- keyArray.setown(varFileInfo->getKeyArray(isOpt, packet->queryHeader().channel));
- }
- void noteStats(unsigned accepted, unsigned rejected)
- {
- // Note that the key-level statistics (seeks, scans, skips etc) are handled inside jhtree
- logctx.noteStatistic(StNumIndexAccepted, accepted);
- logctx.noteStatistic(StNumIndexRejected, rejected);
- logctx.noteStatistic(StNumIndexRowsRead, accepted+rejected);
- }
- CRoxieKeyedActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedActivityFactory *_aFactory)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- translators(_aFactory->translators),
- keyArray(_aFactory->keyArray),
- createSegmentMonitorsPending(true)
- {
- }
- };
- class CRoxieIndexActivity : public CRoxieKeyedActivity
- {
- // Common base class for indexread, indexcount and related activities
- protected:
- PartNoType *inputData; // list of channels
- IHThorIndexReadBaseArg * indexHelper;
- unsigned inputCount;
- unsigned inputsDone;
- unsigned processed;
- unsigned keyprocessed;
- unsigned steppingOffset;
- unsigned steppingLength;
- unsigned short numSkipFields;
- unsigned numSeeks;
- bool seeksAreEof;
- bool lastRowCompleteMatch;
- CIndexTransformCallback callback;
- SmartStepExtra stepExtra; // just used for flags - a little unnecessary...
- const byte *steppingRow;
- virtual bool hasNewSegmentMonitors() { return indexHelper->hasNewSegmentMonitors(); }
- bool checkLimit(unsigned __int64 limit)
- {
- assertex(!resent);
- unsigned __int64 result = 0;
- unsigned inputsDone = 0;
- bool ret = true;
- unsigned saveStepping = steppingOffset; // Avoid using a stepping keymerger for the checkCount - they don't support it (and would be inefficient)
- steppingOffset = 0;
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- result += tlk->checkCount(limit-result);
- if (result > limit)
- {
- ret = false;
- break;
- }
- }
- inputsDone++;
- }
- if (saveStepping)
- {
- steppingOffset = saveStepping;
- lastPartNo.partNo = 0xffff;
- lastPartNo.fileNo = 0xffff;
- tlk.clear();
- }
- return ret;
- }
- public:
- CRoxieIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
- : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory),
- steppingOffset(_steppingOffset),
- stepExtra(SSEFreadAhead, NULL)
- {
- indexHelper = (IHThorIndexReadBaseArg *) basehelper;
- keyRecInfo = &indexHelper->queryDiskRecordSize()->queryRecordAccessor(true);
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
- isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
- inputData = NULL;
- inputCount = 0;
- inputsDone = 0;
- processed = 0;
- keyprocessed = 0;
- numSkipFields = 0;
- lastRowCompleteMatch = true; // default is we only return complete matches....
- seeksAreEof = false;
- steppingLength = 0;
- steppingRow = NULL;
- numSeeks = 0;
- if (packet->getSmartStepInfoLength())
- {
- const byte *smartStepInfoValue = packet->querySmartStepInfoData();
- numSkipFields = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- steppingLength = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- unsigned flags = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- seeksAreEof = * (bool *) smartStepInfoValue;
- smartStepInfoValue += sizeof(bool);
- numSeeks = * (unsigned *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned);
- assertex(numSeeks); // Given that we put the first seek in here to there should always be at least one!
- steppingRow = smartStepInfoValue; // the first of them...
- stepExtra.set(flags, NULL);
- if (logctx.queryTraceLevel() > 10)
- {
- logctx.CTXLOG("%d seek rows provided. mismatch(%d) readahead(%d) onlyfirst(%d)", numSeeks,
- (int)stepExtra.returnMismatches(), (int)stepExtra.readAheadManyResults(), (int)stepExtra.onlyReturnFirstSeekMatch());
- if (logctx.queryTraceLevel() > 15)
- {
- for (unsigned i = 0; i < numSeeks; i++)
- {
- StringBuffer b;
- for (unsigned j = 0; j < steppingLength; j++)
- b.appendf("%02x ", steppingRow[i*steppingLength + j]);
- logctx.CTXLOG("Seek row %d: %s", i+1, b.str());
- }
- }
- }
- }
- else
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("0 seek rows provided.");
- }
- }
- virtual void onCreate()
- {
- CRoxieKeyedActivity::onCreate();
- inputData = (PartNoType *) serializedCreate.readDirect(0);
- inputCount = (serializedCreate.length() - serializedCreate.getPos()) / sizeof(*inputData);
- indexHelper->setCallback(&callback);
- }
- virtual const char *queryDynamicFileName() const
- {
- return indexHelper->getFileName();
- }
- virtual void createSegmentMonitors()
- {
- if (createSegmentMonitorsPending)
- {
- createSegmentMonitorsPending = false;
- tlk->setLayoutTranslator(translators->queryTranslator(lastPartNo.fileNo));
- indexHelper->createSegmentMonitors(tlk);
- tlk->finishSegmentMonitors();
- }
- }
- bool sendContinuation(IMessagePacker * output)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(lastRowCompleteMatch);
- si.append(inputsDone);
- si.append(processed);
- si.append(keyprocessed);
- si.append(lastPartNo.partNo);
- si.append(lastPartNo.fileNo);
- tlk->serializeCursorPos(si);
- if (si.length() <= maxContinuationSize)
- {
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return true;
- }
- else
- return false;
- }
- void readContinuationInfo()
- {
- resentInfo.read(lastRowCompleteMatch);
- resentInfo.read(inputsDone);
- resentInfo.read(processed);
- resentInfo.read(keyprocessed);
- resentInfo.read(lastPartNo.partNo);
- resentInfo.read(lastPartNo.fileNo);
- setPartNo(true);
- tlk->deserializeCursorPos(resentInfo);
- assertex(resentInfo.remaining() == 0);
- }
- virtual void setPartNo(bool fileChanged)
- {
- // NOTE - may be used by both indexread and normalize...
- if (steppingOffset) // MORE - may be other cases too - eg want output sorted and there are multiple subfiles...
- {
- unsigned i = 0;
- Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
- while (!aborted && i < inputCount)
- {
- PartNoType &part = inputData[i];
- lastPartNo.partNo = part.partNo;
- lastPartNo.fileNo = part.fileNo; // This is a hack so that the translator can be retrieved. We don't support record translation properly when doing keymerging...)
- // MORE - this code looks like it could be commoned up with code in CRoxieKeyedActivity::setPartNo
- if (!lastPartNo.partNo)
- {
- for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
- if (kib)
- {
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (k)
- {
- assertex(!k->isTopLevelKey());
- allKeys->addIndex(LINK(k));
- }
- }
- }
- }
- else
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(part.partNo);
- assertex(kib != NULL);
- IKeyIndex *k = kib->queryPart(part.fileNo);
- allKeys->addIndex(LINK(k));
- }
- i++;
- }
- if (allKeys->numParts())
- tlk.setown(::createKeyMerger(*keyRecInfo, allKeys, steppingOffset, &logctx, hasNewSegmentMonitors(), !logctx.isBlind()));
- else
- tlk.clear();
- createSegmentMonitorsPending = true;
- }
- else
- CRoxieKeyedActivity::setPartNo(fileChanged);
- }
- };
- //================================================================================================
- class CRoxieIndexReadActivity : public CRoxieIndexActivity, implements IIndexReadActivityInfo
- {
- protected:
- IHThorIndexReadArg * readHelper;
- public:
- CRoxieIndexReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, _steppingOffset)
- {
- onCreate();
- readHelper = (IHThorIndexReadArg *) basehelper;
- if (resent)
- readContinuationInfo();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexRead %u", packet->queryHeader().activityId);
- }
- /* Notes on Global smart stepping implementation:
- When smart stepping, I get from the Roxie server:
- 1 or more seek positions
- some flags
-
- I can read from the index in an order that matches the seek positions order (because of index merger), and which match my hard filter (they may not match my postfilter)
- I can skip forwards in the index to the first record GE a skip position
- I am not going to try to implement a (possible future) flag to return mismatches after any but the last of the seek positions (yet)
- I want to return M matching (keyed matches, seek field matches, and postfilter matches) rows for all provided seek positions
- where M is 1 if SSEFonlyReturnFirstSeekMatch flag set, otherwise all...
- THEN once we are beyond the last provided seek:
- if returnMismatches flag set,
- current row = (next row matching keyed filter)
- if someNewAsYetUnnamedAndUnimplementedFlag flag set
- //if the post filter matches then there may be scope for returning all records which match the seek fields of that first match
- // But not very likely to help unless terms correlated, and may well hinder
- return current row and all following rows matching keyed filter, seek data from current row, and postfilter
- return next row matching keyed filter (even if doesn't match postfilter)
- else
- return next N rows matching keyed filter and postfilter (where N depends on readAheadManyResults flag - 1 if not set)
- */
- inline void advanceToNextSeek()
- {
- assertex(numSeeks != 0);
- if (--numSeeks)
- steppingRow += steppingLength;
- else
- steppingRow = NULL;
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexReadActivity ::process");
- unsigned __int64 keyedLimit = readHelper->getKeyedLimit();
- unsigned __int64 limit = readHelper->getRowLimit();
- if (!resent && (keyedLimit != (unsigned __int64) -1) && ((keyedLimit > preabortIndexReadsThreshold) || (indexHelper->getFlags() & TIRcountkeyedlimit) != 0)) // Don't recheck the limit every time!
- {
- if (!checkLimit(keyedLimit))
- {
- limitExceeded(true);
- return NULL;
- }
- }
- unsigned __int64 stopAfter = readHelper->getChooseNLimit();
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned totalSizeSent = 0;
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- unsigned keyprocessedBefore = keyprocessed;
- bool continuationFailed = false;
- const byte *rawSeek = NULL;
- if (steppingRow)
- rawSeek = steppingRow;
- bool continuationNeeded = false;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- while (!aborted && inputsDone < inputCount)
- {
- if (!resent || !steppingOffset) // Bit of a hack... In the resent case, we have already set up the tlk, and all keys are processed at once in the steppingOffset case (which makes checkPartChanged gives a false positive in this case)
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(resent);
- resent = false;
- {
- TransformCallbackAssociation associate(callback, tlk); // want to destroy this before we advance to next key...
- while (!aborted && (rawSeek ? tlk->lookupSkip(rawSeek, steppingOffset, steppingLength) : tlk->lookup(true)))
- {
- rawSeek = NULL; // only want to do the seek first time we look for a particular seek value
- keyprocessed++;
- if ((keyedLimit != (unsigned __int64) -1) && keyprocessed > keyedLimit)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- limitExceeded(true);
- break;
- }
- indexRecordsRead++;
- size32_t transformedSize;
- const byte * keyRow = tlk->queryKeyBuffer();
- int diff = 0;
- if (steppingRow)
- {
- diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
- assertex(diff >= 0);
- }
- while (diff > 0)
- {
- advanceToNextSeek();
- if (!steppingRow)
- break;
- diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
- if (diff < 0)
- {
- rawSeek = steppingRow;
- break;
- }
- }
- if (diff >= 0)
- {
- if (diff > 0 && seeksAreEof)
- {
- assertex(!steppingRow);
- break;
- }
- rowBuilder.ensureRow();
- transformedSize = 0;
- if (likely(readHelper->canMatch(keyRow)))
- transformedSize = readHelper->transform(rowBuilder, keyRow);
- callback.finishedRow();
- if (transformedSize)
- {
- if (logctx.queryTraceLevel() > 15)
- {
- StringBuffer b;
- for (unsigned j = 0; j < (steppingLength ? steppingLength : 6); j++)
- b.appendf("%02x ", keyRow[steppingOffset + j]);
- logctx.CTXLOG("Returning seek row %s", b.str());
- }
- // Did get a match
- processed++;
- if (limit && processed > limit)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- limitExceeded(false);
- break;
- }
- if (processed > stopAfter)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- return output.getClear();
- }
- rowBuilder.writeToOutput(transformedSize, true);
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize || (steppingOffset && !steppingRow && !stepExtra.readAheadManyResults()))
- continuationNeeded = true;
- lastRowCompleteMatch = true;
- if (steppingRow && stepExtra.onlyReturnFirstSeekMatch())
- advanceToNextSeek();
- }
- else
- {
- // Didn't get a match
- if (steppingOffset && !steppingRow && stepExtra.returnMismatches())
- {
- transformedSize = readHelper->unfilteredTransform(rowBuilder, keyRow);
- if (transformedSize) // will only be zero in odd situations where codegen can't work out how to transform (eg because of a skip)
- {
- callback.finishedRow();
- rowBuilder.writeToOutput(transformedSize, true);
- totalSizeSent += transformedSize;
- continuationNeeded = true;
- lastRowCompleteMatch = false;
- }
- }
- else
- {
- postFiltered++;
- skipped++;
- }
- }
- }
- if (continuationNeeded && !continuationFailed)
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("Indexread returning partial result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
- if (sendContinuation(output))
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- return output.getClear();
- }
- else
- {
- // This is actually pretty fatal for smart-stepping case
- if (logctx.queryTraceLevel())
- logctx.CTXLOG("Indexread unable to return partial result set");
- continuationFailed = true;
- }
- }
- rowBuilder.clear();
- }
- }
- }
- if (steppingOffset)
- inputsDone = inputCount;
- else
- inputsDone++;
- }
- if (tlk) // a very early abort can mean it is NULL.... MORE is this the right place to put it or should it be inside the loop??
- {
- if (logctx.queryTraceLevel() > 10 && !aborted)
- {
- logctx.CTXLOG("Indexread returning result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
- if (steppingOffset)
- logctx.CTXLOG("Indexread return: steppingOffset %d, steppingRow %p, stepExtra.returnMismatches() %d",steppingOffset, steppingRow, (int) stepExtra.returnMismatches());
- }
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- }
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- virtual IIndexReadActivityInfo *queryIndexReadActivity()
- {
- return this;
- }
- virtual IKeyArray *getKeySet() const
- {
- return keyArray.getLink();
- }
- virtual const IResolvedFile *getVarFileInfo() const
- {
- return varFileInfo.getLink();
- }
- virtual ITranslatorSet *getTranslators() const override
- {
- return translators.getLink();
- }
- virtual void mergeSegmentMonitors(IIndexReadContext *irc) const
- {
- indexHelper->createSegmentMonitors(irc); // NOTE: they will merge;
- }
- virtual IRoxieServerActivity *queryActivity() { throwUnexpected(); }
- virtual const RemoteActivityId &queryRemoteId() const { throwUnexpected(); }
- };
- //================================================================================================
- class CRoxieIndexReadActivityFactory : public CRoxieIndexActivityFactory
- {
- unsigned steppingOffset;
- public:
- CRoxieIndexReadActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexReadArg> helper = (IHThorIndexReadArg *) helperFactory();
- init(helper, graphNode);
- ISteppingMeta *rawMeta = helper->queryRawSteppingMeta();
- if (rawMeta)
- {
- // MORE - should check all keys in maxFields list can actually be keyed.
- const CFieldOffsetSize * fields = rawMeta->queryFields();
- steppingOffset = fields[0].offset;
- }
- else
- {
- steppingOffset = 0;
- }
- }
-
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexReadActivity(logctx, packet, helperFactory, this, steppingOffset);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXREAD "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- //MORE: Very similar to the indexRead code, but I'm not sure it's worth commoning up....
- class CRoxieIndexNormalizeActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorCompoundNormalizeExtra * normalizeHelper;
- public:
- CRoxieIndexNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0) //MORE - stepping?
- {
- onCreate();
- normalizeHelper = (IHThorIndexNormalizeArg *) basehelper;
- if (resent)
- readContinuationInfo();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexNormalize %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexNormalizeActivity ::process");
- unsigned __int64 keyedLimit = normalizeHelper->getKeyedLimit();
- unsigned __int64 rowLimit = normalizeHelper->getRowLimit();
- if (!resent && (keyedLimit != (unsigned __int64) -1) && (indexHelper->getFlags() & TIRcountkeyedlimit) != 0) // Don't recheck the limit every time!
- {
- if (!checkLimit(keyedLimit))
- {
- limitExceeded(true);
- return NULL;
- }
- }
- unsigned __int64 stopAfter = normalizeHelper->getChooseNLimit();
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned totalSizeSent = 0;
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- bool continuationFailed = false;
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(resent);
- resent = false;
- TransformCallbackAssociation associate(callback, tlk);
- while (!aborted && tlk->lookup(true))
- {
- keyprocessed++;
- if (keyedLimit && processed > keyedLimit)
- {
- noteStats(processed-processedBefore, skipped);
- limitExceeded(true);
- break;
- }
- indexRecordsRead++;
- if (normalizeHelper->first(tlk->queryKeyBuffer()))
- {
- do
- {
- rowBuilder.ensureRow();
- size32_t transformedSize = normalizeHelper->transform(rowBuilder);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- limitExceeded(false);
- break;
- }
- if (processed > stopAfter)
- {
- noteStats(processed-processedBefore, skipped);
- return output.getClear();
- }
- totalSizeSent += rowBuilder.writeToOutput(transformedSize, true);
- }
- } while (normalizeHelper->next());
- callback.finishedRow();
- if (totalSizeSent > indexReadChunkSize && !continuationFailed)
- {
- if (sendContinuation(output))
- {
- noteStats(processed-processedBefore, skipped);
- return output.getClear();
- }
- else
- continuationFailed = true;
- }
- }
- else
- {
- postFiltered++;
- skipped++;
- }
- }
- }
- inputsDone++;
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexNormalizeActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexNormalizeActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexNormalizeArg> helper = (IHThorIndexNormalizeArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexNormalizeActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("IndexNormalize "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexCountActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorIndexCountArg * countHelper;
- unsigned __int64 choosenLimit;
- unsigned __int64 rowLimit;
- unsigned __int64 keyedLimit;
- public:
- CRoxieIndexCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
- {
- onCreate();
- countHelper = (IHThorIndexCountArg *) basehelper;
- assertex(!resent);
- choosenLimit = countHelper->getChooseNLimit();
- rowLimit = countHelper->getRowLimit();
- keyedLimit = countHelper->getKeyedLimit();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexCount %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexCountActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- unsigned __int64 count = 0;
- if (!resent && (keyedLimit != (unsigned __int64) -1) && ((keyedLimit > preabortIndexReadsThreshold) || (indexHelper->getFlags() & TIRcountkeyedlimit) != 0)) // Don't recheck the limit every time!
- {
- if (!checkLimit(keyedLimit))
- {
- limitExceeded(true);
- return NULL;
- }
- }
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- while (!aborted && inputsDone < inputCount && count < choosenLimit)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- if (countHelper->hasFilter())
- {
- callback.setManager(tlk);
- while (!aborted && (count < choosenLimit) && tlk->lookup(true))
- {
- keyprocessed++;
- indexRecordsRead++;
- count += countHelper->numValid(tlk->queryKeyBuffer());
- if (count > rowLimit)
- limitExceeded(false);
- else if (keyprocessed > keyedLimit)
- limitExceeded(true);
- callback.finishedRow();
- }
- callback.setManager(NULL);
- }
- else
- {
- //MORE: GH->RKC There should be value in providing a choosenLimit to getCount()
- //MORE: note that tlk->checkCount() is NOT suitable as it makes assumptions about the segmonitors (only checks leading ones)
- count += tlk->getCount();
- if (count > rowLimit)
- limitExceeded(false);
- else if (count > keyedLimit)
- limitExceeded(true);
- }
- }
- inputsDone++;
- }
- if (!aborted && count)
- {
- if (count > choosenLimit)
- count = choosenLimit;
- processed++;
- assertex(!serializer);
- void *recBuffer = output->getBuffer(meta.getFixedSize(), false);
- if (meta.getFixedSize() == 1)
- *(byte *)recBuffer = (byte)count;
- else
- {
- assertex(meta.getFixedSize() == sizeof(unsigned __int64));
- *(unsigned __int64 *)recBuffer = count;
- }
- output->putBuffer(recBuffer, meta.getFixedSize(), false);
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexCountActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexCountActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexCountArg> helper = (IHThorIndexCountArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexCountActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXCOUNT "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexAggregateActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorCompoundAggregateExtra * aggregateHelper;
- public:
- CRoxieIndexAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
- {
- onCreate();
- aggregateHelper = (IHThorIndexAggregateArg *) basehelper;
- assertex(!resent);
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexAggregate %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexAggregateActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- rowBuilder.ensureRow();
- aggregateHelper->clearAggregate(rowBuilder);
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- callback.setManager(tlk);
- while (!aborted && tlk->lookup(true))
- {
- keyprocessed++;
- indexRecordsRead++;
- aggregateHelper->processRow(rowBuilder, tlk->queryKeyBuffer());
- callback.finishedRow();
- }
- callback.setManager(NULL);
- }
- inputsDone++;
- }
- if (!aborted && aggregateHelper->processedAnyRows())
- {
- processed++;
- size32_t transformedSize = meta.getRecordSize(rowBuilder.getSelf());
- rowBuilder.writeToOutput(transformedSize, true);
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexAggregateActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexAggregateArg> helper = (IHThorIndexAggregateArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexAggregateActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXAGGREGATE "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexGroupAggregateActivity : public CRoxieIndexActivity, implements IHThorGroupAggregateCallback
- {
- protected:
- IHThorCompoundGroupAggregateExtra * aggregateHelper;
- RowAggregator results;
- unsigned groupSegCount;
- ThorActivityKind kind;
- public:
- IMPLEMENT_IINTERFACE_USING(CRoxieIndexActivity)
- CRoxieIndexGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, ThorActivityKind _kind)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0),
- aggregateHelper((IHThorIndexGroupAggregateArg *) basehelper),
- results(*aggregateHelper, *aggregateHelper), kind(_kind)
- {
- onCreate();
- results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- assertex(!resent);
- groupSegCount = 0;
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexGroupAggregate %u", packet->queryHeader().activityId);
- }
- virtual void processRow(const void * next)
- {
- results.addRow(next);
- }
- virtual void createSegmentMonitors()
- {
- if (createSegmentMonitorsPending)
- {
- CRoxieIndexActivity::createSegmentMonitors();
- if ((kind==TAKindexgroupcount || kind==TAKindexgroupexists))
- groupSegCount = aggregateHelper->getGroupingMaxField();
- else
- groupSegCount = 0;
- }
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexGroupAggregateActivity ::process");
- Owned<IRowManager> rowManager = roxiemem::createRowManager(0, NULL, logctx, NULL, false); // MORE - should not really use default limits
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- try
- {
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- callback.setManager(tlk);
- while (!aborted && tlk->lookup(true))
- {
- if (groupSegCount && !translators->queryTranslator(lastPartNo.fileNo))
- {
- AggregateRowBuilder &rowBuilder = results.addRow(tlk->queryKeyBuffer());
- callback.finishedRow();
- if (kind == TAKindexgroupcount)
- {
- unsigned __int64 count = tlk->getCurrentRangeCount(groupSegCount);
- aggregateHelper->processCountGrouping(rowBuilder, count-1);
- }
- if (!tlk->nextRange(groupSegCount))
- break;
- }
- else
- {
- keyprocessed++;
- indexRecordsRead++;
- aggregateHelper->processRow(tlk->queryKeyBuffer(), this);
- callback.finishedRow();
- }
- }
- callback.setManager(NULL);
- }
- inputsDone++;
- }
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = results.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- }
- catch (...)
- {
- results.reset(); // kill entries before the rowManager dies.
- throw;
- }
- results.reset();
- noteStats(processed-processedBefore, 0);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexGroupAggregateActivityFactory : public CRoxieIndexActivityFactory
- {
- ThorActivityKind kind;
- public:
- CRoxieIndexGroupAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), kind(_kind)
- {
- Owned<IHThorIndexGroupAggregateArg> helper = (IHThorIndexGroupAggregateArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexGroupAggregateActivity(logctx, packet, helperFactory, this, kind);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXGROUPAGGREGATE "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
- {
- return new CRoxieIndexGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _kind);
- }
- //================================================================================================
- class CRoxieFetchActivityFactory : public CSlaveActivityFactory
- {
- public:
- Owned<ITranslatorSet> translators;
- Owned<IFileIOArray> fileArray;
- CRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorFetchBaseArg> helper = (IHThorFetchBaseArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isCodeSigned = isActivityCodeSigned(_graphNode);
- bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- OwnedRoxieString fname(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- unsigned expectedCrc = helper->getDiskFormatCrc();
- unsigned projectedCrc = helper->getProjectedFormatCrc();
- translators.setown(datafile->getTranslators(projectedCrc, helper->queryProjectedDiskRecordSize(), expectedCrc, helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("FETCH "));
- }
- };
- class CRoxieFetchActivityBase : public CRoxieSlaveActivity
- {
- protected:
- IHThorFetchBaseArg *helper;
- const CRoxieFetchActivityFactory *factory;
- Owned<IFileIO> rawFile;
- Owned<ISerialStream> rawStream;
- offset_t base;
- char *inputData;
- char *inputLimit;
- Linked<ITranslatorSet> translators;
- Linked<IFileIOArray> files;
- const IDynamicTransform *translator = nullptr;
- bool needsRHS;
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData) = 0;
- public:
- CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory), translators(_translators), files(_files)
- {
- helper = (IHThorFetchBaseArg *) basehelper;
- base = 0;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- onCreate();
- inputData = (char *) serializedCreate.readDirect(0);
- inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
- needsRHS = helper->transformNeedsRhs();
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned expectedCrc = helper->getDiskFormatCrc();
- unsigned projectedCrc = helper->getProjectedFormatCrc();
- translators.setown(varFileInfo->getTranslators(projectedCrc, helper->queryProjectedDiskRecordSize(), expectedCrc, helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false));
- files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("Fetch %u", packet->queryHeader().activityId);
- }
- virtual void setPartNo(bool filechanged);
- };
- IMessagePacker *CRoxieFetchActivityBase::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieFetchActivityBase::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned accepted = 0;
- unsigned rejected = 0;
- unsigned __int64 rowLimit = helper->getRowLimit();
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- while (!aborted && inputData < inputLimit)
- {
- checkPartChanged(*(PartNoType *) inputData);
- inputData += sizeof(PartNoType);
- offset_t rp = *(offset_t *)inputData;
- inputData += sizeof(offset_t);
- unsigned rhsSize;
- if (needsRHS)
- {
- rhsSize = *(unsigned *)inputData;
- inputData += sizeof(unsigned);
- }
- else
- rhsSize = 0;
- offset_t pos;
- if (isLocalFpos(rp))
- pos = getLocalFposOffset(rp);
- else
- pos = rp-base;
- unsigned thisSize = doFetch(rowBuilder, pos, rp, inputData);
- inputData += rhsSize;
- if (thisSize)
- {
- rowBuilder.writeToOutput(thisSize, true);
- accepted++;
- if (accepted > rowLimit)
- {
- logctx.noteStatistic(StNumDiskSeeks, accepted+rejected);
- logctx.noteStatistic(StNumDiskRowsRead, accepted+rejected);
- logctx.noteStatistic(StNumDiskAccepted, accepted);
- logctx.noteStatistic(StNumDiskRejected, rejected);
- limitExceeded();
- return NULL;
- }
- }
- else
- rejected++;
- }
- logctx.noteStatistic(StNumDiskSeeks, accepted+rejected);
- logctx.noteStatistic(StNumDiskRowsRead, accepted+rejected);
- logctx.noteStatistic(StNumDiskAccepted, accepted);
- logctx.noteStatistic(StNumDiskRejected, rejected);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- class CRoxieFetchActivity : public CRoxieFetchActivityBase
- {
- Owned<IEngineRowAllocator> diskAllocator;
- CThorContiguousRowBuffer prefetchSource;
- Owned<ISourceRowPrefetcher> rowPrefetcher;
- public:
- CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files)
- {
- // If we ever supported superfiles this would need to move to setPartNo, and pass proper subfile idx in
- rowPrefetcher.setown(translators->getPrefetcher(0));
- IOutputMetaData *diskMeta = helper->queryProjectedDiskRecordSize();
- diskAllocator.setown(getRowAllocator(diskMeta, basefactory->queryId()));
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- prefetchSource.reset(pos);
- rowPrefetcher->readAhead(prefetchSource);
- const byte *diskRow = prefetchSource.queryRow();
- if (translator)
- {
- MemoryBuffer buf;
- MemoryBufferBuilder aBuilder(buf, 0);
- FetchVirtualFieldCallback fieldCallback(rawpos);
- translator->translate(aBuilder, fieldCallback, diskRow);
- // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
- return h->transform(rowBuilder, buf.toByteArray(), inputData, rawpos);
- }
- else
- {
- // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
- return h->transform(rowBuilder, diskRow, inputData, rawpos);
- }
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- prefetchSource.setStream(rawStream);
- }
- };
- IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieFetchActivity(logctx, packet, helperFactory, this, translators, fileArray);
- }
- //------------------------------------------------------------------------------------
- class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
- {
- CSVSplitter csvSplitter;
- size32_t maxRowSize;
- CThorStreamDeserializerSource deserializeSource;
- public:
- CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files,
- unsigned _maxColumns, size32_t _maxRowSize)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files), maxRowSize(_maxRowSize)
- {
- const char * quotes = NULL;
- const char * separators = NULL;
- const char * terminators = NULL;
- const char * escapes = NULL;
- const IResolvedFile *fileInfo = varFileInfo ? varFileInfo : factory->datafile;
- if (fileInfo)
- {
- const IPropertyTree *options = fileInfo->queryProperties();
- if (options)
- {
- quotes = options->queryProp("@csvQuote");
- separators = options->queryProp("@csvSeparate");
- terminators = options->queryProp("@csvTerminate");
- escapes = options->queryProp("@csvEscape");
- }
- }
- IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
- ICsvParameters *csvInfo = h->queryCsvParameters();
- csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators, escapes);
- if (translator)
- UNIMPLEMENTED; // It's not obvious how we would support this.
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
- rawStream->reset(pos);
- size32_t rowSize = 4096; // MORE - make configurable
- for (;;)
- {
- size32_t avail;
- const void *peek = rawStream->peek(rowSize, avail);
- if (csvSplitter.splitLine(avail, (const byte *)peek) < rowSize || avail < rowSize)
- break;
- if (rowSize == maxRowSize)
- throw MakeStringException(0, "File contained a line of length greater than %d bytes.", maxRowSize);
- if (rowSize >= maxRowSize/2)
- rowSize = maxRowSize;
- else
- rowSize += rowSize;
- }
- return h->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), inputData, rawpos);
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- deserializeSource.setStream(rawStream);
- }
- };
- class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSelect
- {
- Owned<IXMLParse> parser;
- Owned<IColumnProvider> lastMatch;
- Owned<IFileIOStream> rawStreamX;
- unsigned streamBufferSize;
- CThorStreamDeserializerSource deserializeSource;
- public:
- IMPLEMENT_IINTERFACE_USING(CRoxieFetchActivityBase)
- CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files,
- unsigned _streamBufferSize)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files),
- streamBufferSize(_streamBufferSize)
- {
- if (translator)
- UNIMPLEMENTED;
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- rawStreamX->seek(pos, IFSbegin);
- try
- {
- while(!lastMatch)
- if(!parser->next())
- throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
- IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
- unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
- lastMatch.clear();
- parser->reset();
- return thisSize;
- }
- catch (IException *E)
- {
- ::Release(E);
- throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
- }
- }
- virtual void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
- {
- lastMatch.set(&entry);
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- deserializeSource.setStream(rawStream);
- rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
- parser.setown((factory->getKind()==TAKjsonfetch) ? createJSONParse(*rawStreamX, "/", *this) : createXMLParse(*rawStreamX, "/", *this));
- }
- };
- void CRoxieFetchActivityBase::setPartNo(bool filechanged)
- {
- rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
- translator = translators->queryTranslator(0); // MORE - superfiles
- assertex(rawFile != NULL);
- rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
- }
- class CRoxieCSVFetchActivityFactory : public CRoxieFetchActivityFactory
- {
- unsigned maxColumns;
- size32_t maxRowSize;
- public:
- CRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorCsvFetchArg> helper = (IHThorCsvFetchArg*) helperFactory();
- maxColumns = helper->getMaxColumns();
- ICsvParameters *csvInfo = helper->queryCsvParameters();
- assertex(!csvInfo->queryEBCDIC());
- maxRowSize = defaultMaxCsvRowSize * 1024 * 1024;
- IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
- if (workunit)
- maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, maxColumns, maxRowSize);
- }
- };
- class CRoxieXMLFetchActivityFactory : public CRoxieFetchActivityFactory
- {
- public:
- CRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, 4096);
- }
- };
- ISlaveActivityFactory *createRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieCSVFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieXMLFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedJoinIndexActivityFactory : public CRoxieKeyedActivityFactory
- {
- public:
- CRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename|JFindexfromactivity)) != 0);
- expectedCrc = helper->getIndexFormatCrc();
- projectedCrc = helper->getProjectedIndexFormatCrc();
- projectedMeta = helper->queryProjectedIndexRecordSize();
- expectedMeta = helper->queryIndexRecordSize();
- if (!variableFileName)
- {
- bool isCodeSigned = isActivityCodeSigned(_graphNode);
- bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
- OwnedRoxieString indexFileName(helper->getIndexFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- translators.setown(datafile->getTranslators(projectedCrc, projectedMeta, expectedCrc, expectedMeta, queryFactory.queryOptions().enableFieldTranslation, true));
- keyArray.setown(datafile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("KEYEDJOIN INDEX "));
- }
- };
- class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
- {
- IHThorKeyedJoinArg *helper;
- const CRoxieKeyedJoinIndexActivityFactory *factory;
- unsigned inputLength;
- char *inputData;
- Owned<IRoxieSlaveActivity> rootIndexActivity;
- IIndexReadActivityInfo *rootIndex;
- unsigned processed;
- unsigned candidateCount;
- unsigned keepCount;
- unsigned inputDone;
- public:
- CRoxieKeyedJoinIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
- : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory)
- {
- helper = (IHThorKeyedJoinArg *) basehelper;
- keyRecInfo = &helper->queryIndexRecordSize()->queryRecordAccessor(true);
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename|JFindexfromactivity)) != 0);
- inputDone = 0;
- processed = 0;
- candidateCount = 0;
- keepCount = 0;
- rootIndex = NULL;
- onCreate();
- inputData = (char *) serializedCreate.readDirect(0);
- inputLength = (serializedCreate.length() - serializedCreate.getPos());
- if (resent)
- {
- resentInfo.read(inputDone);
- inputData += inputDone;
- resentInfo.read(processed);
- resentInfo.read(candidateCount);
- resentInfo.read(keepCount);
- resentInfo.read(lastPartNo.partNo);
- resentInfo.read(lastPartNo.fileNo);
- setPartNo(true);
- tlk->deserializeCursorPos(resentInfo);
- assertex(resentInfo.remaining() == 0);
- }
- }
- virtual bool hasNewSegmentMonitors() { return helper->hasNewSegmentMonitors(); }
- ~CRoxieKeyedJoinIndexActivity()
- {
- }
- virtual void deserializeExtra(MemoryBuffer &buff)
- {
- if (helper->getJoinFlags() & JFindexfromactivity)
- {
- RemoteActivityId indexActivityId(buff);
- assertex(indexActivityId.activityId);
- unsigned indexCtxLen;
- buff.read(indexCtxLen);
- const void *indexCtx = buff.readDirect(indexCtxLen);
- //We create a packet for the index activity to use. Header, trace info and parentextract are clone of mine, context info is copied from buff
- MemoryBuffer indexPacketData;
- indexPacketData.append(sizeof(RoxiePacketHeader), &packet->queryHeader());
- indexPacketData.append(packet->getTraceLength(), packet->queryTraceInfo());
- const byte *parentExtract = (const byte *) packet->queryContextData();
- unsigned parentExtractLen = *(unsigned*) parentExtract;
- indexPacketData.append(parentExtractLen);
- indexPacketData.append(parentExtractLen, parentExtract + sizeof(unsigned));
- indexPacketData.append(indexCtxLen, indexCtx);
- RoxiePacketHeader *newHeader = (RoxiePacketHeader *) indexPacketData.toByteArray();
- newHeader->continueSequence = 0;
- newHeader->activityId = indexActivityId.activityId;
- newHeader->queryHash = indexActivityId.queryHash;
- Owned<IRoxieQueryPacket> indexPacket = createRoxiePacket(indexPacketData);
- Owned<ISlaveActivityFactory> indexActivityFactory = factory->queryQueryFactory().getSlaveActivityFactory(indexActivityId.activityId);
- assertex(indexActivityFactory != NULL);
- rootIndexActivity.setown(indexActivityFactory->createActivity(logctx, indexPacket));
- rootIndex = rootIndexActivity->queryIndexReadActivity();
-
- varFileInfo.setown(rootIndex->getVarFileInfo());
- translators.setown(rootIndex->getTranslators());
- keyArray.setown(rootIndex->getKeySet());
- }
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getIndexFileName();
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("KeyedJoinIndex %u", packet->queryHeader().activityId);
- }
- virtual void createSegmentMonitors()
- {
- // This is called to create the segmonitors that apply to ALL rows - not the per-row ones
- // At present there are none. However we should still set up the layout translation.
- if (createSegmentMonitorsPending)
- {
- createSegmentMonitorsPending = false;
- tlk->setLayoutTranslator(translators->queryTranslator(lastPartNo.fileNo));
- }
- }
- };
- IRoxieSlaveActivity *CRoxieKeyedJoinIndexActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieKeyedJoinIndexActivity(logctx, packet, helperFactory, this);
- }
- IMessagePacker *CRoxieKeyedJoinIndexActivity::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieKeyedJoinIndexActivity::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- CachedOutputMetaData joinFieldsMeta(helper->queryJoinFieldsRecordSize());
- Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
- OptimizedKJRowBuilder rowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned atmost = helper->getJoinLimit();
- if (!atmost) atmost = (unsigned) -1;
- unsigned abortLimit = helper->getMatchAbortLimit();
- if (!abortLimit) abortLimit = (unsigned) -1;
- if (abortLimit < atmost)
- atmost = abortLimit;
- unsigned keepLimit = helper->getKeepLimit();
- unsigned joinFlags = helper->getJoinFlags();
- if (joinFlags & (JFtransformMaySkip | JFfetchMayFilter))
- keepLimit = 0;
- if ((joinFlags & (JFexclude|JFleftouter)) == (JFexclude|JFleftouter) && (!(joinFlags & JFfetchMayFilter))) // For left-only joins, all we care about is existence of a match. Return as soon as we know that there is one
- keepLimit = 1;
- unsigned processedBefore = processed;
- unsigned rejected = 0;
- CachedOutputMetaData inputFields(helper->queryIndexReadInputRecordSize());
- unsigned inputSize = inputFields.getFixedSize();
- unsigned totalSizeSent = 0;
- // Now go fetch the records
- bool continuationFailed = false;
- while (!aborted && inputDone < inputLength)
- {
- checkPartChanged(*(PartNoType *) inputData);
- CJoinGroup *jg = *(CJoinGroup **) (inputData + sizeof(PartNoType)); // NOTE - this is a pointer in Roxie server's address space - don't go following it!
- char *inputRow = inputData+sizeof(PartNoType)+sizeof(const void *);
- if (inputFields.isVariableSize())
- {
- inputSize = *(unsigned *)inputRow;
- inputRow += sizeof(unsigned);
- }
- if (tlk)
- {
- createSegmentMonitors();
- helper->createSegmentMonitors(tlk, inputRow);
- if (rootIndex)
- rootIndex->mergeSegmentMonitors(tlk);
- tlk->finishSegmentMonitors();
- if (!resent && (atmost != (unsigned) -1) && ((atmost > preabortKeyedJoinsThreshold) || (joinFlags & JFcountmatchabortlimit) || (keepLimit != 0)))
- {
- unsigned __int64 precount = tlk->checkCount(atmost);
- if (precount > atmost)
- {
- candidateCount = atmost+1;
- if (logctx.queryTraceLevel() > 5)
- logctx.CTXLOG("Pre-aborting since candidate count is at least %" I64F "d", precount);
- }
- else
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("NOT Pre-aborting since candidate count is %" I64F "d", precount);
- tlk->reset(false);
- }
- }
- else
- tlk->reset(resent);
- resent = false;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- while (candidateCount <= atmost)
- {
- if (tlk->lookup(true))
- {
- candidateCount++;
- indexRecordsRead++;
- KLBlobProviderAdapter adapter(tlk);
- const byte *indexRow = tlk->queryKeyBuffer();
- size_t fposOffset = tlk->queryRowSize() - sizeof(offset_t);
- offset_t fpos = rtlReadBigUInt8(indexRow + fposOffset);
- if (helper->indexReadMatch(inputRow, indexRow, &adapter))
- {
- processed++;
- if (keepLimit)
- {
- keepCount++;
- if (keepCount > keepLimit)
- break;
- }
- if (processed > rowLimit)
- {
- if (logctx.queryTraceLevel() > 1)
- {
- StringBuffer s;
- logctx.CTXLOG("limit exceeded for %s", packet->queryHeader().toString(s).str());
- }
- noteStats(processed-processedBefore, rejected);
- limitExceeded();
- return NULL;
- }
- unsigned totalSize = 0;
- if (helper->diskAccessRequired())
- {
- const void *self = output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- KeyedJoinHeader *rec = (KeyedJoinHeader *) self;
- rec->fpos = fpos;
- rec->thisGroup = jg;
- rec->partNo = lastPartNo.partNo;
- output->putBuffer(self, KEYEDJOIN_RECORD_SIZE(0), true);
- }
- else
- {
- KLBlobProviderAdapter adapter(tlk);
- totalSize = helper->extractJoinFields(rowBuilder, indexRow, &adapter);
- rowBuilder.writeToOutput(totalSize, fpos, jg, lastPartNo.partNo);
- }
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(totalSize);
- if (totalSizeSent > indexReadChunkSize && !continuationFailed)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(inputDone);
- si.append(processed);
- si.append(candidateCount);
- si.append(keepCount);
- si.append(lastPartNo.partNo);
- si.append(lastPartNo.fileNo);
- tlk->serializeCursorPos(si);
- if (si.length() <= maxContinuationSize)
- {
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- noteStats(processed-processedBefore, rejected);
- return output.getClear();
- }
- else
- continuationFailed = true;
- }
- }
- else
- {
- rejected++;
- postFiltered++;
- }
- }
- else
- break;
- }
- tlk->releaseSegmentMonitors();
- }
- // output an end marker for the matches to this group
- KeyedJoinHeader *rec = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- rec->fpos = candidateCount;
- rec->thisGroup = jg;
- rec->partNo = (unsigned short) -1;
- output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(0), true);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(0); // note - don't interrupt here though - too complicated.
- candidateCount = 0;
- keepCount = 0;
- inputData = inputRow + inputSize;
- inputDone += sizeof(PartNoType) + sizeof(const void *);
- if (inputFields.isVariableSize())
- inputDone += sizeof(unsigned);
- inputDone += inputSize;
- }
- noteStats(processed-processedBefore, rejected);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- //================================================================================================
- ISlaveActivityFactory *createRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieKeyedJoinIndexActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedJoinFetchActivityFactory : public CSlaveActivityFactory
- {
- public:
- Owned<ITranslatorSet> translators;
- Owned<IFileIOArray> files;
- CRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
- assertex(helper->diskAccessRequired());
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isCodeSigned = isActivityCodeSigned(_graphNode);
- bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- OwnedRoxieString fileName(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- unsigned expectedCrc = helper->getDiskFormatCrc();
- unsigned projectedCrc = helper->getProjectedFormatCrc();
- translators.setown(datafile->getTranslators(projectedCrc, helper->queryProjectedDiskRecordSize(), expectedCrc, helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- files.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("KEYEDJOIN FETCH "));
- }
- };
- class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
- {
- IHThorKeyedJoinArg *helper;
- Owned<IFileIO> rawFile;
- offset_t base;
- const char *inputLimit;
- const char *inputData;
- Linked<ITranslatorSet> translators;
- Linked<IFileIOArray> files;
- Owned<ISerialStream> rawStream;
- CThorContiguousRowBuffer prefetchSource;
- Owned<ISourceRowPrefetcher> prefetcher;
- const IDynamicTransform *translator = nullptr;
- virtual void setPartNo(bool filechanged)
- {
- rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
- translator = translators->queryTranslator(0); // MORE - superfiles
- rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
- prefetcher.setown(translators->getPrefetcher(0));
- prefetchSource.setStream(rawStream);
- }
- public:
- CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory,
- IFileIOArray *_files, ITranslatorSet *_translators)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- translators(_translators),
- files(_files)
- {
- // MORE - no continuation row support?
- base = 0;
- helper = (IHThorKeyedJoinArg *) basehelper;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- onCreate();
- inputData = (const char *) serializedCreate.readDirect(0);
- inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
- }
- ~CRoxieKeyedJoinFetchActivity()
- {
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned expectedCrc = helper->getDiskFormatCrc();
- unsigned projectedCrc = helper->getProjectedFormatCrc();
- translators.setown(varFileInfo->getTranslators(projectedCrc, helper->queryProjectedDiskRecordSize(), expectedCrc, helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false));
- files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("KeyedJoinFetch %u", packet->queryHeader().activityId);
- }
- };
- IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieKeyedJoinFetchActivity::process");
- // MORE - where we are returning everything there is an optimization or two to be had
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned processed = 0;
- unsigned skipped = 0;
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned totalSizeSent = 0;
- CachedOutputMetaData joinFieldsMeta(helper->queryJoinFieldsRecordSize());
- Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
- OptimizedKJRowBuilder jfRowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
- CachedOutputMetaData inputFields(helper->queryFetchInputRecordSize());
- size32_t inputSize = inputFields.getFixedSize();
- while (!aborted && inputData < inputLimit)
- {
- checkPartChanged(*(PartNoType *) inputData);
- inputData += sizeof(PartNoType);
- offset_t rp;
- memcpy(&rp, inputData, sizeof(rp));
- offset_t pos;
- if (isLocalFpos(rp))
- pos = getLocalFposOffset(rp);
- else
- pos = rp-base;
- prefetchSource.reset(pos);
- prefetcher->readAhead(prefetchSource);
- const byte *rawRHS = prefetchSource.queryRow();
- const KeyedJoinHeader *headerPtr = (KeyedJoinHeader *) inputData;
- MemoryBuffer buf;
- if (translator)
- {
- MemoryBufferBuilder aBuilder(buf, 0);
- FetchVirtualFieldCallback fieldCallback(headerPtr->fpos);
- translator->translate(aBuilder, fieldCallback, rawRHS);
- rawRHS = (const byte *) buf.toByteArray();
- }
- inputData = &headerPtr->rhsdata[0];
- if (inputFields.isVariableSize())
- {
- memcpy(&inputSize, inputData, sizeof(inputSize));
- inputData += sizeof(inputSize);
- }
- if (helper->fetchMatch(inputData, rawRHS))
- {
- unsigned thisSize = helper->extractJoinFields(jfRowBuilder, rawRHS, (IBlobProvider*)NULL);
- jfRowBuilder.writeToOutput(thisSize, headerPtr->fpos, headerPtr->thisGroup, headerPtr->partNo);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(thisSize);
- processed++;
- if (processed > rowLimit)
- {
- logctx.noteStatistic(StNumDiskSeeks, processed+skipped);
- logctx.noteStatistic(StNumDiskRowsRead, processed+skipped);
- logctx.noteStatistic(StNumDiskAccepted, processed);
- logctx.noteStatistic(StNumDiskRejected, skipped);
- limitExceeded();
- return NULL;
- }
- }
- else
- {
- skipped++;
- KeyedJoinHeader *out = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- out->fpos = 0;
- out->thisGroup = headerPtr->thisGroup;
- out->partNo = (unsigned short) -1;
- output->putBuffer(out, KEYEDJOIN_RECORD_SIZE(0), true);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(0);
- }
- inputData += inputSize;
- }
- logctx.noteStatistic(StNumDiskSeeks, processed+skipped);
- logctx.noteStatistic(StNumDiskRowsRead, processed+skipped);
- logctx.noteStatistic(StNumDiskAccepted, processed);
- logctx.noteStatistic(StNumDiskRejected, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this, files, translators);
- }
- ISlaveActivityFactory *createRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieKeyedJoinFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieRemoteActivity : public CRoxieSlaveActivity
- {
- protected:
- IHThorRemoteArg * remoteHelper;
- unsigned processed;
- unsigned remoteId;
- public:
- CRoxieRemoteActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory, unsigned _remoteId)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- remoteId(_remoteId)
- {
- remoteHelper = (IHThorRemoteArg *) basehelper;
- processed = 0;
- onCreate();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("Remote %u", packet->queryHeader().activityId);
- }
- virtual const char *queryDynamicFileName() const
- {
- throwUnexpected();
- }
- virtual void setVariableFileInfo()
- {
- throwUnexpected();
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieRemoteActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned __int64 rowLimit = remoteHelper->getRowLimit();
- rtlRowBuilder remoteExtractBuilder;
- remoteHelper->createParentExtract(remoteExtractBuilder);
- Linked<IActivityGraph> remoteQuery = queryContext->queryChildGraph(remoteId);
- Linked<IRoxieServerChildGraph> remoteGraph = remoteQuery->queryLoopGraph();
- try
- {
- remoteGraph->beforeExecute();
- Owned<IFinalRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
- Owned<IStrandJunction> junction;
- IEngineRowStream *stream = connectSingleStream(queryContext, input, 0, junction, 0);
- while (!aborted)
- {
- const void * next = stream->ungroupedNextRow();
- if (!next)
- break;
- size32_t nextSize = meta.getRecordSize(next);
- //MORE - what about grouping?
- processed++;
- if (processed > rowLimit)
- {
- ReleaseRoxieRow(next);
- limitExceeded();
- break;
- }
- if (serializer)
- serializeRow(output, next);
- else
- {
- void * recBuffer = output->getBuffer(nextSize, meta.isVariableSize());
- memcpy(recBuffer, next, nextSize);
- output->putBuffer(recBuffer, nextSize, meta.isVariableSize());
- }
- ReleaseRoxieRow(next);
- }
- remoteGraph->afterExecute();
- }
- catch (IException *E)
- {
- remoteGraph->afterExecute();
- if (aborted)
- ::Release(E);
- else
- throw;
- }
- catch (...)
- {
- remoteGraph->afterExecute();
- throw;
- }
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- virtual void setPartNo(bool filechanged)
- {
- throwUnexpected();
- }
- };
- //================================================================================================
- class CRoxieRemoteActivityFactory : public CSlaveActivityFactory
- {
- unsigned remoteId;
- public:
- CRoxieRemoteActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
- : CSlaveActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), remoteId(_remoteId)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieRemoteActivity(logctx, packet, helperFactory, this, remoteId);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("Remote "));
- }
- };
- ISlaveActivityFactory *createRoxieRemoteActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
- {
- return new CRoxieRemoteActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _remoteId);
- }
- //================================================================================================
- class CRoxieDummyActivityFactory : public CSlaveActivityFactory // not a real activity - just used to properly link files
- {
- protected:
- Owned<const IResolvedFile> indexfile;
- Owned<IKeyArray> keyArray;
- Owned<IFileIOArray> fileArray;
- public:
- CRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, NULL)
- {
- if (_graphNode.getPropBool("att[@name='_isSpill']/@value", false) || _graphNode.getPropBool("att[@name='_isSpillGlobal']/@value", false))
- return; // ignore 'spills'
- try // operations does not want any missing file errors to be fatal, or throw traps - just log it
- {
- ThorActivityKind kind = getActivityKind(_graphNode);
- if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
- {
- bool isCodeSigned = isActivityCodeSigned(_graphNode);
- const char *fileName = queryNodeFileName(_graphNode, kind);
- const char *indexName = queryNodeIndexName(_graphNode, kind);
- if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
- {
- bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
- indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (indexfile)
- keyArray.setown(indexfile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
- {
- bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true, isCodeSigned));
- if (datafile)
- {
- fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- }
- catch(IException *E)
- {
- StringBuffer errors;
- E->errorMessage(errors);
- OERRLOG("%s File error = %s", (isLoadDataOnly) ? "LOADDATAONLY" : "SUSPENDED QUERY", errors.str());
- E->Release();
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- throwUnexpected(); // don't actually want to create an activity
- }
- };
- //================================================================================================
- ISlaveActivityFactory *createRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
- {
- // MORE - bool isLoadDataOnly may need to be an enum if more than just LOADDATAONLY and suspended queries use this
- return new CRoxieDummyActivityFactory(_graphNode, _subgraphId, _queryFactory, isLoadDataOnly);
- }
|