12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "jlib.hpp"
- #include "ccd.hpp"
- #include "ccdquery.hpp"
- #include "ccdstate.hpp"
- #include "ccdserver.hpp"
- #include "ccdcontext.hpp"
- #include "ccddebug.hpp"
- #include "ccdactivities.hpp"
- #include "ccdqueue.ipp"
- #include "ccdsnmp.hpp"
- #include "ccdfile.hpp"
- #include "ccdkey.hpp"
- #include "rtlkey.hpp"
- #include "eclrtl_imp.hpp"
- #include "rtlread_imp.hpp"
- #include "rtlcommon.hpp"
- #include "rtldynfield.hpp"
- #include "jhtree.hpp"
- #include "jlog.hpp"
- #include "jmisc.hpp"
- #include "udplib.hpp"
- #include "csvsplitter.hpp"
- #include "thorxmlread.hpp"
- #include "thorcommon.ipp"
- #include "thorstrand.hpp"
- #include "jstats.h"
- size32_t diskReadBufferSize = 0x10000;
- using roxiemem::OwnedRoxieRow;
- using roxiemem::OwnedConstRoxieRow;
- using roxiemem::OwnedRoxieString;
- using roxiemem::IRowManager;
- #define maxContinuationSize 48000 // note - must fit in the 2-byte length field... but also needs to be possible to send back from Roxie server->slave in one packet
- size32_t serializeRow(IOutputRowSerializer * serializer, IMessagePacker *output, const void *unserialized)
- {
- CSizingSerializer sizer;
- serializer->serialize(sizer, (const byte *) unserialized);
- unsigned serializedLength = sizer.size();
- void *udpBuffer = output->getBuffer(serializedLength, true);
- CRawRowSerializer memSerializer(serializedLength, (byte *) udpBuffer);
- serializer->serialize(memSerializer, (const byte *) unserialized);
- assertex(memSerializer.size() == serializedLength);
- output->putBuffer(udpBuffer, serializedLength, true);
- return serializedLength;
- }
- inline void appendBuffer(IMessagePacker * output, size32_t size, const void * data, bool isVariable)
- {
- void *recBuffer = output->getBuffer(size, isVariable);
- memcpy(recBuffer, data, size);
- output->putBuffer(recBuffer, size, isVariable);
- }
- extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val)
- {
- if (val)
- {
- StringBuffer xpath;
- xpath.appendf("att[@name='%s']", statName);
- IPropertyTree *att = node->queryPropTree(xpath.str());
- if (!att)
- {
- att = node->addPropTree("att");
- att->setProp("@name", statName);
- }
- att->setProp("@type", statType);
- att->setPropInt64("@value", val);
- }
- }
- extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val)
- {
- if (val)
- {
- reply.appendf(" <att name='%s' type='%s' value='%" I64F "d'/>\n", statName, statType, val);
- }
- }
- CActivityFactory::CActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
- : id(_id),
- subgraphId(_subgraphId),
- queryFactory(_queryFactory),
- helperFactory(_helperFactory),
- kind(_kind),
- mystats(allStatistics) // We COULD cut down this list but it would complicate the structure, and we do actually track more in the factory than in the activity
- {
- if (helperFactory)
- {
- Owned<IHThorArg> helper = helperFactory();
- meta.set(helper->queryOutputMeta());
- }
- }
- void CActivityFactory::addChildQuery(unsigned id, ActivityArray *childQuery)
- {
- childQueries.append(*childQuery);
- childQueryIndexes.append(id);
- }
- ActivityArray *CActivityFactory::queryChildQuery(unsigned idx, unsigned &id)
- {
- if (childQueries.isItem(idx))
- {
- id = childQueryIndexes.item(idx);
- return &childQueries.item(idx);
- }
- id = 0;
- return NULL;
- }
- class CSlaveActivityFactory : public CActivityFactory, implements ISlaveActivityFactory
- {
- public:
- IMPLEMENT_IINTERFACE
- CSlaveActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CActivityFactory(_graphNode.getPropInt("@id", 0), _subgraphId, _queryFactory, _helperFactory, getActivityKind(_graphNode))
- {
- }
- virtual IQueryFactory &queryQueryFactory() const
- {
- return CActivityFactory::queryQueryFactory();
- }
- void addChildQuery(unsigned id, ActivityArray *childQuery)
- {
- CActivityFactory::addChildQuery(id, childQuery);
- }
- StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("%p", this);
- }
- const char *queryQueryName() const
- {
- return queryFactory.queryQueryName();
- }
- virtual ActivityArray *queryChildQuery(unsigned idx, unsigned &id)
- {
- return CActivityFactory::queryChildQuery(idx, id);
- }
- virtual void mergeStats(const CRuntimeStatisticCollection &from) const
- {
- CActivityFactory::mergeStats(from);
- }
- virtual unsigned queryId() const
- {
- return CActivityFactory::queryId();
- }
- virtual ThorActivityKind getKind() const
- {
- return CActivityFactory::getKind();
- }
- virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
- {
- CActivityFactory::getEdgeProgressInfo(idx, edge);
- }
- virtual void getNodeProgressInfo(IPropertyTree &node) const
- {
- CActivityFactory::getNodeProgressInfo(node);
- }
- virtual void resetNodeProgressInfo()
- {
- CActivityFactory::resetNodeProgressInfo();
- }
- virtual void getActivityMetrics(StringBuffer &reply) const
- {
- CActivityFactory::getActivityMetrics(reply);
- }
- IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return queryFactory.createSlaveContext(logctx, packet, childQueries.length()!=0);
- }
- virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
- {
- if (datafile)
- addXrefFileInfo(reply, datafile);
- }
- void createChildQueries(IRoxieSlaveContext *ctx, IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
- {
- if (childQueries.length())
- {
- ForEachItemIn(idx, childQueries)
- {
- if (!_probeManager) // MORE - the probeAllRows is a hack!
- _probeManager = queryContext->queryProbeManager();
- IActivityGraph *childGraph = createActivityGraph(ctx, NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx, 1); // MORE - the parent is wrong!
- childGraphs.append(*childGraph);
- queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
- childGraph->onCreate(colocalArg); //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
- }
- }
- }
- Owned<const IResolvedFile> datafile;
- protected:
- static IPropertyTree *queryStatsNode(IPropertyTree *parent, const char *xpath)
- {
- StringBuffer levelx;
- const char *sep = strchr(xpath, '/');
- if (sep)
- levelx.append(sep-xpath, xpath);
- else
- levelx.append(xpath);
- IPropertyTree *child = parent->queryPropTree(levelx);
- if (!child)
- {
- const char *id = strchr(levelx, '[');
- if (!id)
- {
- child = parent->addPropTree(levelx);
- }
- else
- {
- StringBuffer elem;
- elem.append(id-levelx, levelx);
- child = parent->addPropTree(elem);
- for (;;)
- {
- StringBuffer attr, val;
- id++;
- while (*id != '=')
- attr.append(*id++);
- id++;
- char qu = *id++;
- while (*id != qu)
- val.append(*id++);
- child->setProp(attr, val);
- id++;
- if (*id == ']')
- {
- if (id[1]!='[')
- break;
- id++;
- }
- else
- throwUnexpected();
- }
- }
- }
- if (sep)
- return queryStatsNode(child, sep+1);
- else
- return child;
- }
- };
- //================================================================================================
- class CRoxieSlaveActivity : implements CInterfaceOf<IRoxieSlaveActivity>, implements ICodeContext
- {
- protected:
- SlaveContextLogger &logctx;
- Linked<IRoxieQueryPacket> packet;
- mutable Owned<IRoxieSlaveContext> queryContext; // bit of a hack but easier than changing the ICodeContext callback interface to remove const
- const CSlaveActivityFactory *basefactory;
- IArrayOf<IActivityGraph> childGraphs;
- IHThorArg *basehelper;
- PartNoType lastPartNo;
- MemoryBuffer serializedCreate;
- MemoryBuffer resentInfo;
- CachedOutputMetaData meta;
- Owned<IOutputRowSerializer> serializer;
- Owned<IEngineRowAllocator> rowAllocator;
- #ifdef _DEBUG
- Owned<IProbeManager> probeManager;
- #endif
- bool aborted;
- bool resent;
- bool isOpt;
- bool variableFileName;
- RecordTranslationMode allowFieldTranslation;
- Owned<const IResolvedFile> varFileInfo;
- virtual void setPartNo(bool filechanged) = 0;
- inline void checkPartChanged(PartNoType &newPart)
- {
- if (newPart.partNo!=lastPartNo.partNo || newPart.fileNo!=lastPartNo.fileNo)
- {
- lastPartNo.partNo = newPart.partNo;
- bool filechanged = lastPartNo.fileNo != newPart.fileNo;
- lastPartNo.fileNo = newPart.fileNo;
- setPartNo(filechanged);
- }
- }
- virtual bool needsRowAllocator()
- {
- return meta.needsSerializeDisk() || meta.isVariableSize();
- }
- virtual void onCreate()
- {
- queryContext.setown(basefactory->createSlaveContext(logctx, packet));
- #ifdef _DEBUG
- // MORE - need to consider debugging....
- if (probeAllRows)
- probeManager.setown(createProbeManager());
- basefactory->createChildQueries(queryContext, childGraphs, basehelper, probeManager, queryContext, logctx);
- #else
- basefactory->createChildQueries(queryContext, childGraphs, basehelper, NULL, queryContext, logctx);
- #endif
- if (meta.needsSerializeDisk())
- serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
- if (needsRowAllocator())
- rowAllocator.setown(getRowAllocator(meta.queryOriginal(), basefactory->queryId()));
- unsigned parentExtractSize;
- serializedCreate.read(parentExtractSize);
- const byte * parentExtract = serializedCreate.readDirect(parentExtractSize);
- basehelper->onCreate(this, NULL, &serializedCreate);
- basehelper->onStart(parentExtract, &serializedCreate);
- deserializeExtra(serializedCreate);
- if (variableFileName) // note - in keyed join with dependent index case, kj itself won't have variableFileName but indexread might
- {
- CDateTime cacheDate(serializedCreate);
- unsigned checksum;
- serializedCreate.read(checksum);
- OwnedRoxieString fname(queryDynamicFileName());
- varFileInfo.setown(querySlaveDynamicFileCache()->lookupDynamicFile(logctx, fname, cacheDate, checksum, &packet->queryHeader(), isOpt, true));
- setVariableFileInfo();
- }
- }
- virtual void deserializeExtra(MemoryBuffer &out)
- {
- }
- CRoxieSlaveActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory)
- : logctx(_logctx), packet(_packet), basefactory(_factory)
- {
- allowFieldTranslation = _factory->getEnableFieldTranslation();
- resent = packet->getContinuationLength() != 0;
- serializedCreate.setBuffer(packet->getContextLength(), (void *) packet->queryContextData(), false);
- if (resent)
- resentInfo.setBuffer(packet->getContinuationLength(), (void *) packet->queryContinuationData(), false);
- basehelper = _hFactory();
- lastPartNo.partNo = 0xffff;
- lastPartNo.fileNo = 0xffff;
- aborted = false;
- isOpt = false;
- variableFileName = false;
- meta.set(basehelper->queryOutputMeta());
- }
- ~CRoxieSlaveActivity()
- {
- ::Release(basehelper);
- }
- virtual void beforeDispose() override
- {
- CRuntimeStatisticCollection merged(allStatistics);
- logctx.gatherStats(merged);
- basefactory->mergeStats(merged);
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CInterfaceOf<IRoxieSlaveActivity>)
- virtual const char *queryDynamicFileName() const = 0;
- virtual void setVariableFileInfo() = 0;
- virtual IIndexReadActivityInfo *queryIndexReadActivity() { throwUnexpected(); } // should only be called for index activity
- virtual unsigned queryId()
- {
- return basefactory->queryId();
- }
- virtual bool check()
- {
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- doCheck(output);
- output->flush(true);
- return true;
- }
- virtual void doCheck(IMessagePacker *output)
- {
- // MORE - unsophisticated default - if this approach seems fruitful then we can add something more thorough
- void *recBuffer = output->getBuffer(sizeof(bool), false);
- bool ret = false;
- memcpy(recBuffer, &ret, sizeof(bool));
- }
- virtual void abort()
- {
- if (logctx.queryTraceLevel() > 2)
- {
- StringBuffer s;
- logctx.CTXLOG("Aborting running activity: %s", packet->queryHeader().toString(s).str());
- }
- aborted = true;
- logctx.abort();
- if (queryContext)
- {
- Owned<IException> E = MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
- queryContext->notifyAbort(E);
- }
- }
- virtual IRoxieQueryPacket *queryPacket() const
- {
- return packet;
- }
- void limitExceeded(bool keyed = false)
- {
- RoxiePacketHeader &header = packet->queryHeader();
- StringBuffer s;
- logctx.CTXLOG("%sLIMIT EXCEEDED: %s", keyed ? "KEYED " : "", header.toString(s).str());
- header.activityId = keyed ? ROXIE_KEYEDLIMIT_EXCEEDED : ROXIE_LIMIT_EXCEEDED;
- Owned<IMessagePacker> output = ROQ->createOutputStream(header, false, logctx);
- output->flush(true);
- aborted = true;
- logctx.abort();
- }
- virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
- {
- assertex(colocal == basehelper);
- return queryContext->queryCodeContext()->resolveChildQuery(activityId, colocal);
- }
- size32_t serializeRow(IMessagePacker *output, const void *unserialized) const
- {
- return ::serializeRow(serializer, output, unserialized);
- }
- virtual const char *loadResource(unsigned id)
- {
- return queryContext->queryCodeContext()->loadResource(id);
- }
- // Sets should not happen - they can only happen in the main Roxie server context
- virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
- virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
- virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
- virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { throwUnexpected(); }
- virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
- virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
- virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
- virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
- virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { throwUnexpected(); }
- virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
- virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
- virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
- // Some gets are allowed though (e.g. for ONCE values)
- virtual bool getResultBool(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultBool(name, sequence);
- }
- virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultData(tlen, tgt, name, sequence);
- }
- virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
- }
- virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- queryContext->queryCodeContext()->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- queryContext->queryCodeContext()->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual __int64 getResultInt(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultInt(name, sequence);
- }
- virtual double getResultReal(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultReal(name, sequence);
- }
- virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultString(tlen, tgt, name, sequence);
- }
- virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultStringF(tlen, tgt, name, sequence);
- }
- virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
- {
- queryContext->queryCodeContext()->getResultUnicode(tlen, tgt, name, sequence);
- }
- virtual char *getResultVarString(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultVarString(name, sequence);
- }
- virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
- {
- return queryContext->queryCodeContext()->getResultVarUnicode(name, sequence);
- }
- virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- return queryContext->queryCodeContext()->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
- }
- virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
- {
- return queryContext->queryCodeContext()->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
- }
- virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
- {
- return queryNullSectionTimer();
- }
- // Not yet thought about these....
- virtual char *getWuid() { throwUnexpected(); } // caller frees return string.
- virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); } // shouldn't really be here, but it broke thor.
- virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { throwUnexpected(); }
- virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); return 0; }
- virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
- virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
- virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
- virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
- virtual unsigned getNodes() { throwUnexpected(); }
- virtual unsigned getNodeNum() { throwUnexpected(); }
- virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); } // caller frees return string.
- virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
- virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
- virtual char *getJobName() { throwUnexpected(); } // caller frees return string.
- virtual char *getJobOwner() { throwUnexpected(); } // caller frees return string.
- virtual char *getClusterName() { throwUnexpected(); } // caller frees return str.
- virtual char *getGroupName() { throwUnexpected(); } // caller frees return string.
- virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
- virtual char *getDaliServers() { return queryContext->queryCodeContext()->getDaliServers(); }
- // The below are called on Roxie server and passed in context
- virtual unsigned getPriority() const { throwUnexpected(); }
- virtual char *getPlatform() { throwUnexpected(); }
- virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
- virtual char *getOS() { throwUnexpected(); }
- virtual unsigned logString(const char *text) const
- {
- if (text && *text)
- {
- logctx.CTXLOG("USER: %s", text);
- return strlen(text);
- }
- else
- return 0;
- }
- virtual const IContextLogger &queryContextLogger() const
- {
- return logctx;
- }
- virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
- {
- return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
- }
- virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
- {
- return queryContext->queryCodeContext()->getRowAllocatorEx(meta, activityId, heapFlags);
- }
- virtual const char *cloneVString(const char *str) const
- {
- return queryContext->queryCodeContext()->cloneVString(str);
- }
- virtual const char *cloneVString(size32_t len, const char *str) const
- {
- return queryContext->queryCodeContext()->cloneVString(len, str);
- }
- virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
- {
- convertRowToXML(lenResult, result, info, row, flags);
- }
- virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
- {
- convertRowToJSON(lenResult, result, info, row, flags);
- }
- const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
- {
- return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
- }
- const void * fromJson(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
- {
- return createRowFromJson(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
- }
- virtual IEngineContext *queryEngineContext() { return NULL; }
- virtual IWorkUnit *updateWorkUnit() const { throwUnexpected(); }
- virtual unsigned getGraphLoopCounter() const override { return queryContext->queryCodeContext()->getGraphLoopCounter(); }
- virtual IEclGraphResults * resolveLocalQuery(__int64 activityId) override { return NULL; } // Only want to do this on the server
- virtual IDebuggableContext *queryDebugContext() const override { return queryContext->queryCodeContext()->queryDebugContext(); }
- };
- //================================================================================================
- class OptimizedRowBuilder : public ARowBuilder, public CInterface
- {
- public:
- OptimizedRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output, IOutputRowSerializer * _serializer)
- : dynamicBuilder(_rowAllocator, false), meta(_meta), serializer(_serializer), output(_output)
- {
- useDynamic = serializer != NULL || meta.isVariableSize();
- }
- IMPLEMENT_IINTERFACE
- virtual IEngineRowAllocator *queryAllocator() const
- {
- return dynamicBuilder.queryAllocator();
- }
- virtual byte * createSelf()
- {
- if (useDynamic)
- {
- dynamicBuilder.ensureRow();
- self = dynamicBuilder.getSelf();
- }
- else
- self = static_cast<byte *>(output->getBuffer(meta.getFixedSize(), false));
- return self;
- }
- virtual byte * ensureCapacity(size32_t required, const char * fieldName)
- {
- if (useDynamic)
- {
- self = dynamicBuilder.ensureCapacity(required, fieldName);
- return static_cast<byte *>(self);
- }
- else
- {
- size32_t fixedLength = meta.getFixedSize();
- if (required <= fixedLength)
- return static_cast<byte *>(self);
- // This should never happen!
- rtlReportFieldOverflow(required, fixedLength, fieldName);
- return NULL;
- }
- }
- virtual void reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "OptimizedRowBuilder::row() is NULL");
- }
- inline void ensureRow()
- {
- if (!self)
- createSelf();
- }
- inline void clear()
- {
- if (useDynamic)
- dynamicBuilder.clear();
- self = NULL;
- }
- size_t writeToOutput(size32_t transformedSize, bool outputIfEmpty)
- {
- size32_t outputSize = transformedSize;
- if (transformedSize || outputIfEmpty)
- {
- if (useDynamic)
- {
- OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
- if (serializer)
- outputSize = serializeRow(serializer, output, result);
- else
- {
- self = static_cast<byte *>(output->getBuffer(transformedSize, true));
- memcpy(self, result, transformedSize);
- output->putBuffer(self, transformedSize, true);
- }
- }
- else
- {
- output->putBuffer(self, transformedSize, false);
- }
- }
- clear();
- return outputSize;
- }
- private:
- RtlDynamicRowBuilder dynamicBuilder;
- const CachedOutputMetaData & meta;
- IMessagePacker * output;
- IOutputRowSerializer * serializer;
- bool useDynamic;
- };
- class OptimizedKJRowBuilder : public ARowBuilder, public CInterface
- {
- // Rules are different enough that we can't easily derive from OptimizedRowBuilder
- public:
- IMPLEMENT_IINTERFACE;
- OptimizedKJRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output)
- : dynamicBuilder(_rowAllocator, false), meta(_meta), output(_output)
- {
- useDynamic = meta.isVariableSize();
- }
- virtual IEngineRowAllocator *queryAllocator() const
- {
- return dynamicBuilder.queryAllocator();
- }
- virtual byte * createSelf()
- {
- if (useDynamic)
- {
- dynamicBuilder.ensureRow();
- self = dynamicBuilder.getSelf();
- return self;
- }
- else
- {
- self = static_cast<byte *>(output->getBuffer(KEYEDJOIN_RECORD_SIZE(meta.getFixedSize()), true)) + KEYEDJOIN_RECORD_SIZE(0);
- return self ;
- }
- }
- virtual byte * ensureCapacity(size32_t required, const char * fieldName)
- {
- if (useDynamic)
- {
- self = dynamicBuilder.ensureCapacity(required, fieldName);
- return self;
- }
- else
- {
- size32_t fixedLength = meta.getFixedSize();
- if (required <= fixedLength)
- return self;
- rtlReportFieldOverflow(required, fixedLength, fieldName);
- return NULL;
- }
- }
- virtual void reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "OptimizedKJRowBuilder::row() is NULL");
- }
- inline void ensureRow()
- {
- if (!self)
- createSelf();
- }
- void writeToOutput(size32_t transformedSize, offset_t recptr, CJoinGroup *jg, unsigned short partNo)
- {
- KeyedJoinHeader *rec;
- if (useDynamic)
- {
- OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
- rec = (KeyedJoinHeader *) (output->getBuffer(KEYEDJOIN_RECORD_SIZE(transformedSize), true));
- memcpy(&rec->rhsdata, result, transformedSize);
- }
- else
- {
- rec = (KeyedJoinHeader *)(self - KEYEDJOIN_RECORD_SIZE(0));
- }
- rec->fpos = recptr;
- rec->thisGroup = jg;
- rec->partNo = partNo;
- output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(transformedSize), true);
- self = NULL;
- }
- private:
- RtlDynamicRowBuilder dynamicBuilder;
- const CachedOutputMetaData & meta;
- IMessagePacker * output;
- bool useDynamic;
- };
- //================================================================================================
- class CRoxieDiskReadBaseActivity : public CRoxieSlaveActivity, implements IIndexReadContext//, implements IDiskReadActivity
- {
- protected:
- IHThorDiskReadBaseArg *helper;
- unsigned processed;
- unsigned parallelPartNo;
- unsigned numParallel;
- bool isGrouped = false;
- bool forceUnkeyed;
- offset_t readPos;
- CachedOutputMetaData diskSize;
- ScoredRowFilter postFilter;
- Owned<IDirectReader> reader;
- Linked<IInMemoryIndexManager> manager;
- Linked<ITranslatorSet> translators;
- Owned<IInMemoryFileProcessor> processor;
- CriticalSection pcrit;
- public:
- CRoxieDiskReadBaseActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- manager(_manager),
- translators(_translators),
- parallelPartNo(_parallelPartNo),
- numParallel(_numParallel),
- forceUnkeyed(_forceUnkeyed)
- {
- helper = (IHThorDiskReadBaseArg *) basehelper;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
- isOpt = (helper->getFlags() & TDRoptional) != 0;
- diskSize.set(helper->queryProjectedDiskRecordSize()->querySerializedDiskMeta());
- isGrouped = diskSize.isGrouped();
- processed = 0;
- readPos = 0;
- if (resent)
- {
- }
- }
- virtual void onCreate()
- {
- CRoxieSlaveActivity::onCreate();
- helper->createSegmentMonitors(this);
- const IKeyTranslator *keyTranslator = translators->queryKeyTranslator(0); // any part would do - in-memory requires all actuals to have same layout
- if (keyTranslator)
- keyTranslator->translate(postFilter);
- if (resent)
- {
- bool usedKey;
- resentInfo.read(processed);
- resentInfo.read(usedKey);
- if (usedKey)
- reader.setown(manager->selectKey(resentInfo, postFilter, translators));
- else
- {
- resentInfo.read(readPos);
- reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
- }
- assertex(resentInfo.remaining() == 0);
- }
- else
- {
- if (!forceUnkeyed && !isGrouped)
- reader.setown(manager->selectKey(postFilter, translators, logctx));
- if (!reader)
- reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
- }
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned channel = packet->queryHeader().channel;
- unsigned formatCrc = basefactory->getFormatCrc(helper->getFormatCrc());
- translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false)); // MORE - FormatCRC may be wrong here. Needs to be crc of projected not expected
- manager.setown(varFileInfo->getIndexManager(isOpt, channel, translators->queryActualLayout(0), false));
- }
- inline bool queryKeyed() const
- {
- return reader->isKeyed();
- }
- void setParallel(unsigned _partno, unsigned _numParallel)
- {
- assertex(!processor);
- parallelPartNo = _partno;
- numParallel = _numParallel;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
- }
- virtual void append(IKeySegmentMonitor *segment)
- {
- segment->Release();
- throwUnexpected();
- }
- virtual void append(FFoption option, const IFieldFilter * filter)
- {
- if (filter->isWild())
- filter->Release();
- else
- postFilter.addFilter(*filter);
- }
- virtual unsigned ordinality() const
- {
- throwUnexpected();
- }
- virtual IKeySegmentMonitor *item(unsigned idx) const
- {
- throwUnexpected();
- }
- virtual void abort()
- {
- CRoxieSlaveActivity::abort();
- CriticalBlock p(pcrit);
- if (processor)
- processor->abort();
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieDiskReadBaseActivity::process");
- diskReadStarted++;
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- doProcess(output);
- helper->setCallback(NULL);
- if (aborted)
- return NULL;
- else
- {
- diskReadCompleted++;
- return output.getClear();
- }
- }
- virtual void doCheck(IMessagePacker *output)
- {
- // for in-memory diskread activities, not a lot to check. If it got this far the answer is 'true'...
- void *recBuffer = output->getBuffer(sizeof(bool), false);
- bool ret = true;
- memcpy(recBuffer, &ret, sizeof(bool));
- }
- virtual void doProcess(IMessagePacker * output) = 0;
-
- virtual void setPartNo(bool filechanged)
- {
- throwUnexpected();
- }
- };
- class CRoxieDiskBaseActivityFactory : public CSlaveActivityFactory
- {
- protected:
- Owned<ITranslatorSet> translators;
- Owned<IInMemoryIndexManager> manager;
- public:
- CRoxieDiskBaseActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorDiskReadBaseArg> helper = (IHThorDiskReadBaseArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isOpt = (helper->getFlags() & TDRoptional) != 0;
- OwnedRoxieString fileName(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- unsigned channel = queryFactory.queryChannel();
- unsigned formatCrc = getFormatCrc(helper->getFormatCrc());
- translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- manager.setown(datafile->getIndexManager(isOpt, channel, translators->queryActualLayout(0), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false)));
- Owned<IPropertyTreeIterator> memKeyInfo = queryFactory.queryPackage().getInMemoryIndexInfo(_graphNode);
- Owned<IPropertyTree> memKeyHint;
- if (!memKeyInfo && _graphNode.hasProp("hint[@name=\"key\"]"))
- {
- memKeyHint.setown(createPTreeFromXMLString(_graphNode.queryProp("hint[@name=\"key\"]/@value")));
- memKeyInfo.setown(memKeyHint->getElements("MemIndex"));
- }
- if (memKeyInfo)
- {
- // There's a small potential flaw that I can end up with indexes from a query that was unloaded active on some nodes but not others
- // which could mess up continuation. I think I don't care.
- ForEach(*memKeyInfo)
- {
- IPropertyTree &info = memKeyInfo->query();
- manager->setKeyInfo(info);
- }
- }
- }
- else
- manager.setown(createInMemoryIndexManager(helper->queryProjectedDiskRecordSize()->queryRecordAccessor(true), true, nullptr));
- }
- }
- ~CRoxieDiskBaseActivityFactory()
- {
- }
- };
- //================================================================================================
- class CRoxieDiskReadActivity;
- class CRoxieCsvReadActivity;
- class CRoxieXmlReadActivity;
- IInMemoryFileProcessor *createReadRecordProcessor(CRoxieDiskReadActivity &owner, bool isGrouped, IDirectReader *reader);
- IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize);
- IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *reader);
- class CRoxieDiskReadActivity : public CRoxieDiskReadBaseActivity
- {
- friend class ReadRecordProcessor;
- protected:
- IHThorDiskReadArg *helper;
- public:
- CRoxieDiskReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit); // because of race with abort.
- processor.setown(createReadRecordProcessor(*this, isGrouped, reader));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, const void *src) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder, src);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieCsvReadActivity : public CRoxieDiskReadBaseActivity
- {
- public:
- friend class CsvRecordProcessor;
- protected:
- IHThorCsvReadArg *helper;
- const IResolvedFile *datafile;
- size32_t maxRowSize;
- public:
- CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, const IResolvedFile *_datafile, size32_t _maxRowSize)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true), datafile(_datafile), maxRowSize(_maxRowSize)
- {
- onCreate();
- helper = (IHThorCsvReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("CsvRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(
- createCsvRecordProcessor(*this,
- manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators),
- packet->queryHeader().channel==1 && !resent,
- varFileInfo ? varFileInfo.get() : datafile, maxRowSize));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, unsigned *srcLen, const char **src) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder, srcLen, src);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieXmlReadActivity : public CRoxieDiskReadBaseActivity
- {
- public:
- friend class XmlRecordProcessor;
- protected:
- IHThorXmlReadArg *helper;
- public:
- CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, true)
- {
- onCreate();
- helper = (IHThorXmlReadArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("XmlRead %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createXmlRecordProcessor(*this, manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators)));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doTransform(IMessagePacker * output, IXmlToRowTransformer *rowTransformer, IColumnProvider *lastMatch, IThorDiskCallback *callback) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = rowTransformer->transform(rowBuilder, lastMatch, callback);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieDiskReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- class CRoxieCsvReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- size32_t maxRowSize;
- public:
- CRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- maxRowSize = defaultMaxCsvRowSize * 1024 * 1024;
- IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
- if (workunit)
- maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, translators, datafile, maxRowSize);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- class CRoxieXmlReadActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- class RecordProcessor : implements IInMemoryFileProcessor, public CInterface
- {
- protected:
- bool aborted = false;
- IDirectReader *reader;
- public:
- IMPLEMENT_IINTERFACE
- RecordProcessor(IDirectReader *_reader) : reader(_reader) {}
- virtual void abort()
- {
- aborted = true;
- }
- };
- //================================================================================================
- // Disk read activity
- class ReadRecordProcessor : public RecordProcessor
- {
- public:
- ReadRecordProcessor(CRoxieDiskReadActivity &_owner, bool _isGrouped, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner), isGrouped(_isGrouped)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- helper->setCallback(reader);
- unsigned lastGroupProcessed = processed;
- bool someInGroup = false;
- while (!aborted)
- {
- // This loop is the inner loop for memory diskreads - so keep it efficient!
- const byte *nextRec = reader->nextRow();
- if (nextRec)
- {
- someInGroup = true;
- size32_t transformedSize = owner.doTransform(output, nextRec);
- reader->finishedRow();
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize && !isGrouped)
- break;
- }
- }
- else if (isGrouped)
- {
- if (processed>lastGroupProcessed)
- break; // We return grouped data one whole group at a time
- else if (!someInGroup) // eof
- return;
- }
- else
- return;
- }
- // Send continuation message indicating end of group or too much data
- if (!aborted)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- }
- }
- protected:
- CRoxieDiskReadActivity &owner;
- IHThorDiskReadArg *helper;
- bool isGrouped;
- };
- IInMemoryFileProcessor *createReadRecordProcessor(CRoxieDiskReadActivity &owner, bool isGrouped, IDirectReader *_reader)
- {
- return new ReadRecordProcessor(owner, isGrouped, _reader);
- }
- //================================================================================================
- // RecordProcessor used by CSV read activity. We don't try to index these ...
- class CsvRecordProcessor : public RecordProcessor
- {
- protected:
- CRoxieCsvReadActivity &owner;
- IHThorCsvReadArg *helper;
- bool skipHeader;
- const IResolvedFile *datafile;
- size32_t maxRowSize;
- public:
- CsvRecordProcessor(CRoxieCsvReadActivity &_owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *_datafile, size32_t _maxRowSize)
- : RecordProcessor(_reader), owner(_owner), datafile(_datafile), maxRowSize(_maxRowSize)
- {
- // NOTE - postfilter not used at present - but may be in future
- helper = _owner.helper;
- skipHeader = _skipHeader;
- helper->setCallback(reader);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- ICsvParameters * csvInfo = helper->queryCsvParameters();
- unsigned headerLines = skipHeader ? csvInfo->queryHeaderLen() : 0;
- const char *quotes = NULL;
- const char *separators = NULL;
- const char *terminators = NULL;
- const char *escapes = NULL;
- CSVSplitter csvSplitter;
- if (datafile)
- {
- const IPropertyTree *options = datafile->queryProperties();
- if (options)
- {
- quotes = options->queryProp("@csvQuote");
- separators = options->queryProp("@csvSeparate");
- terminators = options->queryProp("@csvTerminate");
- escapes = options->queryProp("@csvEscape");
- }
- }
- csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
- ISerialStream *stream = reader->queryDirectStreamReader();
- while (!aborted)
- {
- size32_t thisLineLength = csvSplitter.splitLine(stream, maxRowSize);
- if (0 == thisLineLength)
- break;
- if (headerLines)
- {
- headerLines--;
- stream->skip(thisLineLength);
- }
- else
- {
- unsigned transformedSize = owner.doTransform(output, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData());
- stream->skip(thisLineLength);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return;
- }
- }
- }
- }
- }
- };
- //================================================================================================
- // RecordProcessor used by XML read activity. We don't try to index these...
- class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect, implements IThorDiskCallback
- {
- public:
- IMPLEMENT_IINTERFACE;
- XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner), streamReader(_reader->queryDirectStreamReader()), fileposition(0)
- {
- helper = _owner.helper;
- helper->setCallback(this);
- }
- //interface IThorDiskCallback
- virtual unsigned __int64 getFilePosition(const void * row)
- {
- return fileposition;
- }
- virtual unsigned __int64 getLocalFilePosition(const void * row)
- {
- return streamReader->makeFilePositionLocal(fileposition);
- }
- virtual const char * queryLogicalFilename(const void * row)
- {
- return reader->queryLogicalFilename(row);
- }
- virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
- {
- fileposition = startOffset;
- lastMatch.set(&entry);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- // Note: xml read does not support continuation record stuff as too hard to serialize state of xml parser
- Linked<IXmlToRowTransformer> rowTransformer = helper->queryTransformer();
- OwnedRoxieString xmlIterator(helper->getXmlIteratorPath());
- Owned<IXMLParse> xmlParser;
- if (owner.basefactory->getKind() == TAKjsonread)
- xmlParser.setown(createJSONParse(*streamReader, xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
- else
- xmlParser.setown(createXMLParse(*streamReader, xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
- while (!aborted)
- {
- //call to next() will callback on the IXmlSelect interface
- bool gotNext = false;
- gotNext = xmlParser->next();
- if(!gotNext)
- break;
- else if (lastMatch)
- {
- unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, this);
- lastMatch.clear();
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- if (processed == stopAfter)
- return;
- }
- }
- }
- }
- protected:
- CRoxieXmlReadActivity &owner;
- IHThorXmlReadArg *helper;
- Owned<IColumnProvider> lastMatch;
- IDirectStreamReader *streamReader;
- unsigned __int64 fileposition;
- };
- IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
- {
- return new CsvRecordProcessor(owner, _reader, _skipHeader, datafile, maxRowSize);
- }
- IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *_reader)
- {
- return new XmlRecordProcessor(owner, _reader);
- }
- ISlaveActivityFactory *createRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieCsvReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieXmlReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskNormalizeActivity;
- IInMemoryFileProcessor *createNormalizeRecordProcessor(CRoxieDiskNormalizeActivity &owner, IDirectReader *_reader);
- class CRoxieDiskNormalizeActivity : public CRoxieDiskReadBaseActivity
- {
- friend class NormalizeRecordProcessor;
- protected:
- IHThorDiskNormalizeArg *helper;
- public:
- CRoxieDiskNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskNormalizeArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskNormalize %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createNormalizeRecordProcessor(*this, reader));
- }
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, rowLimit, stopAfter);
- }
- size32_t doNormalizeTransform(IMessagePacker * output) const
- {
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned transformedSize = helper->transform(rowBuilder);
- return rowBuilder.writeToOutput(transformedSize, false);
- }
- };
- class CRoxieDiskNormalizeActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskNormalize "));
- }
- };
- ISlaveActivityFactory *createRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- // RecordProcessors used by Disk Normalize activity.
- class NormalizeRecordProcessor : public RecordProcessor
- {
- public:
- NormalizeRecordProcessor(CRoxieDiskNormalizeActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned totalSizeSent = 0;
- helper->setCallback(reader);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- if (helper->first(nextRec))
- {
- do
- {
- size32_t transformedSize = owner.doNormalizeTransform(output);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- owner.limitExceeded();
- return;
- }
- totalSizeSent += transformedSize;
- if (processed == stopAfter)
- return;
- }
- } while (helper->next());
- }
- reader->finishedRow();
- if (totalSizeSent > indexReadChunkSize)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(processed);
- si.append(reader->isKeyed());
- reader->serializeCursorPos(si);
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return;
- }
- }
- }
- protected:
- CRoxieDiskNormalizeActivity &owner;
- IHThorDiskNormalizeArg *helper;
- };
- IInMemoryFileProcessor *createNormalizeRecordProcessor(CRoxieDiskNormalizeActivity &owner, IDirectReader *_reader)
- {
- return new NormalizeRecordProcessor(owner, _reader);
- }
- //================================================================================================
- class CRoxieDiskCountActivity;
- IInMemoryFileProcessor *createCountRecordProcessor(CRoxieDiskCountActivity &owner, IDirectReader *reader);
- class CRoxieDiskCountActivity : public CRoxieDiskReadBaseActivity
- {
- friend class CountRecordProcessor;
- protected:
- IHThorDiskCountArg *helper;
- public:
- CRoxieDiskCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, 1, false)
- {
- onCreate();
- helper = (IHThorDiskCountArg *) basehelper;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskCount %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createCountRecordProcessor(*this, reader));
- }
- unsigned __int64 stopAfter = helper->getChooseNLimit();
- processor->doQuery(output, processed, (unsigned __int64) -1, stopAfter);
- }
- };
- class CRoxieDiskCountActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager, translators);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessors used by Disk Count activity.
- class CountRecordProcessor : public RecordProcessor
- {
- public:
- CountRecordProcessor(CRoxieDiskCountActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- unsigned outputRecordSize = owner.meta.getFixedSize();
- void *recBuffer = output->getBuffer(outputRecordSize, false);
- helper->setCallback(reader);
- unsigned __int64 totalCount = 0;
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- totalCount += helper->numValid(nextRec);
- if (totalCount >= stopAfter)
- {
- totalCount = stopAfter;
- break;
- }
- reader->finishedRow();
- }
- if (!aborted)
- {
- assert(!owner.serializer); // A count can never need serializing, surely!
- if (outputRecordSize == 1)
- *(byte *)recBuffer = (byte)totalCount;
- else
- {
- assertex(outputRecordSize == sizeof(unsigned __int64));
- *(unsigned __int64 *)recBuffer = totalCount;
- }
- output->putBuffer(recBuffer, outputRecordSize, false);
- }
- }
- protected:
- CRoxieDiskCountActivity &owner;
- IHThorDiskCountArg *helper;
- };
- IInMemoryFileProcessor *createCountRecordProcessor(CRoxieDiskCountActivity &owner, IDirectReader *reader)
- {
- return new CountRecordProcessor(owner, reader);
- }
- ISlaveActivityFactory *createRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskAggregateActivity;
- IInMemoryFileProcessor *createAggregateRecordProcessor(CRoxieDiskAggregateActivity &owner, IDirectReader *_reader);
- class CRoxieDiskAggregateActivity : public CRoxieDiskReadBaseActivity
- {
- friend class AggregateRecordProcessor;
- protected:
- IHThorDiskAggregateArg *helper;
- public:
- CRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, _parallelPartNo, _numParallel, _forceUnkeyed)
- {
- onCreate();
- helper = (IHThorDiskAggregateArg *) basehelper;
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskAggregate %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createAggregateRecordProcessor(*this, reader));
- }
- processor->doQuery(output, 0, 0, 0);
- }
- };
- //================================================================================================
- class CParallelRoxieActivity : public CRoxieSlaveActivity
- {
- protected:
- IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> parts;
- unsigned numParallel;
- CriticalSection parCrit;
- Owned<IOutputRowDeserializer> deserializer;
- public:
- CParallelRoxieActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory, unsigned _numParallel)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _factory), numParallel(_numParallel)
- {
- assertex(numParallel > 1);
- }
- virtual void abort()
- {
- ForEachItemIn(idx, parts)
- {
- parts.item(idx).abort();
- }
- }
- virtual void setPartNo(bool filechanged) { throwUnexpected(); }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return parts.item(0).toString(ret);
- }
- virtual const char *queryDynamicFileName() const
- {
- throwUnexpected();
- }
- virtual void setVariableFileInfo()
- {
- throwUnexpected();
- }
- virtual void doProcess(IMessagePacker * output) = 0;
- virtual void processRow(CDummyMessagePacker &output) = 0;
- virtual IMessagePacker *process()
- {
- if (numParallel == 1)
- {
- return parts.item(0).process();
- }
- else
- {
- MTIME_SECTION(queryActiveTimer(), "CParallelRoxieActivity::process");
- diskReadStarted++;
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- class casyncfor: public CAsyncFor
- {
- IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> &parts;
- CParallelRoxieActivity &parent;
- public:
- casyncfor(IBasedArrayOf<CRoxieDiskReadBaseActivity, IRoxieSlaveActivity> &_parts, CParallelRoxieActivity &_parent)
- : parts(_parts), parent(_parent)
- {
- }
- void Do(unsigned i)
- {
- try
- {
- CDummyMessagePacker d;
- parts.item(i).doProcess(&d);
- d.flush(true);
- parent.processRow(d);
- }
- catch (IException *)
- {
- // if one throws exception, may as well abort the rest
- parent.abort();
- throw;
- }
- }
- } afor(parts, *this);
- afor.For(numParallel, numParallel);
- //for (unsigned i = 0; i < numParallel; i++) afor.Do(i); // use this instead of line above to make them serial - handy for debugging!
- if (aborted)
- return NULL;
- else
- {
- doProcess(output);
- diskReadCompleted++;
- return output.getClear();
- }
- }
- }
- };
- class CParallelRoxieDiskAggregateActivity : public CParallelRoxieActivity
- {
- protected:
- IHThorDiskAggregateArg *helper;
- OwnedConstRoxieRow finalRow;
- public:
- CParallelRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
- CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel)
- {
- helper = (IHThorDiskAggregateArg *) basehelper;
- onCreate();
- if (meta.needsSerializeDisk())
- {
- // MORE - avoiding serializing to dummy would be more efficient...
- deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
- }
- CRoxieDiskAggregateActivity *part0 = new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
- parts.append(*part0);
- if (part0->queryKeyed())
- {
- numParallel = 1;
- part0->setParallel(0, 1);
- }
- else
- {
- for (unsigned i = 1; i < numParallel; i++)
- parts.append(*new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
- }
- }
- ~CParallelRoxieDiskAggregateActivity()
- {
- finalRow.clear();
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual void doProcess(IMessagePacker *output)
- {
- if (!aborted)
- {
- if (!finalRow)
- {
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size32_t size = helper->clearAggregate(rowBuilder);
- finalRow.setown(rowBuilder.finalizeRowClear(size));
- }
- //GH-RKC: This can probably be cleaner and more efficient using the OptimizedRowBuilder class introduced since I fixed this code
- if (serializer)
- serializeRow(output, finalRow);
- else
- {
- size32_t size = meta.getRecordSize(finalRow);
- appendBuffer(output, size, finalRow, meta.isVariableSize());
- }
- }
- finalRow.clear();
- helper->setCallback(NULL);
- }
- virtual void processRow(CDummyMessagePacker &d)
- {
- CriticalBlock c(parCrit);
- MemoryBuffer &m = d.data;
- RtlDynamicRowBuilder finalBuilder(rowAllocator, false);
- if (deserializer)
- {
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
- CThorStreamDeserializerSource rowSource(stream);
- while (m.remaining())
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- RecordLengthType len = *rowLen;
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
- if (!finalBuilder.exists())
- finalBuilder.swapWith(rowBuilder);
- else
- {
- const void * deserialized = rowBuilder.finalizeRowClear(outsize);
- helper->mergeAggregate(finalBuilder, deserialized);
- ReleaseRoxieRow(deserialized);
- }
- }
- }
- else
- {
- RecordLengthType len = meta.getFixedSize();
- while (m.remaining())
- {
- const void *row;
- if (!meta.isFixedSize())
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- len = *rowLen;
- }
- row = m.readDirect(len);
- if (!finalBuilder.exists())
- cloneRow(finalBuilder, row, meta);
- else
- helper->mergeAggregate(finalBuilder, row);
- }
- }
- if (finalBuilder.exists())
- {
- size32_t finalSize = meta.getRecordSize(finalBuilder.getSelf()); // MORE - can probably track it above...
- finalRow.setown(finalBuilder.finalizeRowClear(finalSize));
- }
- }
- };
- class CRoxieDiskAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- if (parallelAggregate > 1)
- return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
- else
- return new CRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessor used by Disk Aggregate activity.
- class AggregateRecordProcessor : public RecordProcessor
- {
- public:
- AggregateRecordProcessor(CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
- : RecordProcessor(_reader), owner(_owner)
- {
- helper = _owner.helper;
- helper->setCallback(reader);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
- helper->clearAggregate(rowBuilder);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- helper->processRow(rowBuilder, nextRec);
- reader->finishedRow();
- }
- if (!aborted)
- {
- if (helper->processedAnyRows())
- {
- size32_t finalSize = owner.meta.getRecordSize(rowBuilder.getSelf());
- rowBuilder.writeToOutput(finalSize, true);
- }
- }
- }
- protected:
- CRoxieDiskAggregateActivity &owner;
- IHThorDiskAggregateArg *helper;
- };
- IInMemoryFileProcessor *createAggregateRecordProcessor(CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
- {
- return new AggregateRecordProcessor(_owner, _reader);
- }
- ISlaveActivityFactory *createRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieDiskGroupAggregateActivity : public CRoxieDiskReadBaseActivity
- {
- protected:
- IHThorDiskGroupAggregateArg *helper;
- RowAggregator results;
- void outputResults(IMessagePacker *output)
- {
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = results.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- }
- public:
- CRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager,
- ITranslatorSet *_translators,
- unsigned partNo, unsigned numParts, bool _forceUnkeyed)
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, partNo, numParts, _forceUnkeyed),
- helper((IHThorDiskGroupAggregateArg *) basehelper),
- results(*helper, *helper)
- {
- onCreate();
- results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("DiskGroupAggregate %u", packet->queryHeader().activityId);
- }
- virtual void doProcess(IMessagePacker * output)
- {
- {
- CriticalBlock p(pcrit);
- processor.setown(createGroupAggregateRecordProcessor(results, *helper, reader));
- }
- processor->doQuery(output, 0, 0, 0);
- if (!aborted)
- outputResults(output);
- results.reset();
- }
- };
- class CParallelRoxieDiskGroupAggregateActivity : public CParallelRoxieActivity
- {
- protected:
- IHThorDiskGroupAggregateArg *helper;
- RowAggregator resultAggregator;
- Owned<IRowManager> rowManager;
- public:
- CParallelRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
- IInMemoryIndexManager *_manager, ITranslatorSet *_translators, unsigned _numParallel) :
- CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel),
- helper((IHThorDiskGroupAggregateArg *) basehelper),
- resultAggregator(*helper, *helper)
- {
- onCreate();
- resultAggregator.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- if (meta.needsSerializeDisk())
- {
- // MORE - avoiding serializing to dummy would be more efficient...
- deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
- }
- CRoxieDiskGroupAggregateActivity *part0 = new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, 0, numParallel, false);
- parts.append(*part0);
- if (part0->queryKeyed())
- {
- numParallel = 1;
- part0->setParallel(0, 1);
- }
- else
- {
- for (unsigned i = 1; i < numParallel; i++)
- parts.append(*new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _translators, i, numParallel, true));
- }
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual void doProcess(IMessagePacker *output)
- {
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = resultAggregator.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- resultAggregator.reset();
- helper->setCallback(NULL);
- }
- void processRow(CDummyMessagePacker &d)
- {
- CriticalBlock b(parCrit); // MORE - use a spinlock
- MemoryBuffer &m = d.data;
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
- CThorStreamDeserializerSource rowSource(stream);
- while (m.remaining())
- {
- const void *row;
- if (meta.isFixedSize() && !deserializer)
- {
- row = m.readDirect(meta.getFixedSize());
- resultAggregator.mergeElement(row);
- }
- else
- {
- RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
- if (!*rowLen)
- break;
- RecordLengthType len = *rowLen;
- if (deserializer)
- {
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
- OwnedConstRoxieRow deserialized = rowBuilder.finalizeRowClear(outsize);
- resultAggregator.mergeElement(deserialized);
- }
- else
- {
- row = m.readDirect(len);
- resultAggregator.mergeElement(row);
- }
- }
- }
- }
- };
- class CRoxieDiskGroupAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
- {
- public:
- CRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- if (parallelAggregate > 1)
- return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, parallelAggregate);
- else
- return new CRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, translators, 0, 1, false);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("DiskRead "));
- }
- };
- //================================================================================================
- // RecordProcessors used by Disk Group Aggregate activity.
- class GroupAggregateRecordProcessor : public RecordProcessor, implements IHThorGroupAggregateCallback
- {
- public:
- IMPLEMENT_IINTERFACE;
- GroupAggregateRecordProcessor(RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper, IDirectReader *_reader)
- : RecordProcessor(_reader), results(_results), helper(_helper)
- {
- }
- virtual void processRow(const void * next)
- {
- results.addRow(next);
- }
- virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
- {
- helper.setCallback(reader);
- while (!aborted)
- {
- const byte *nextRec = reader->nextRow();
- if (!nextRec)
- break;
- helper.processRow(nextRec, this);
- reader->finishedRow();
- }
- }
- protected:
- RowAggregator &results;
- IHThorDiskGroupAggregateArg &helper;
- };
- IInMemoryFileProcessor *createGroupAggregateRecordProcessor(RowAggregator &results, IHThorDiskGroupAggregateArg &helper, IDirectReader *reader)
- {
- return new GroupAggregateRecordProcessor(results, helper, reader);
- }
- ISlaveActivityFactory *createRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieDiskGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedActivityFactory : public CSlaveActivityFactory
- {
- friend class CRoxieKeyedActivity;
- protected:
- Owned<IKeyArray> keyArray;
- Owned<ITranslatorSet> translators;
- unsigned formatCrc = 0;
- IOutputMetaData *projectedMeta = nullptr;
- IOutputMetaData *expectedMeta = nullptr;
- CRoxieKeyedActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- };
- class CRoxieIndexActivityFactory : public CRoxieKeyedActivityFactory
- {
- public:
- CRoxieIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- void init(IHThorIndexReadBaseArg * helper, IPropertyTree &graphNode)
- {
- formatCrc = getFormatCrc(helper->getFormatCrc());
- projectedMeta = helper->queryProjectedDiskRecordSize();
- expectedMeta = helper->queryDiskRecordSize();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isOpt = (helper->getFlags() & TIRoptional) != 0;
- OwnedRoxieString indexName(helper->getFileName());
- datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- translators.setown(datafile->getTranslators(formatCrc, projectedMeta, expectedMeta, queryFactory.queryOptions().enableFieldTranslation, true));
- keyArray.setown(datafile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- };
- class CRoxieKeyedActivity : public CRoxieSlaveActivity
- {
- // Common base class for all activities that deal with keys - keyed join or indexread and its allies
- protected:
- Owned<IKeyManager> tlk;
- Linked<ITranslatorSet> translators;
- Linked<IKeyArray> keyArray;
- const RtlRecord *keyRecInfo = nullptr;
- bool createSegmentMonitorsPending;
- virtual void createSegmentMonitors() = 0;
- virtual void setPartNo(bool filechanged)
- {
- if (!lastPartNo.partNo) // Check for ,LOCAL indexes
- {
- assertex(filechanged);
- Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
- for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
- if (kib)
- {
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (k)
- {
- assertex(!k->isTopLevelKey());
- allKeys->addIndex(LINK(k));
- }
- }
- }
- if (allKeys->numParts())
- {
- tlk.setown(createKeyMerger(*keyRecInfo, allKeys, 0, &logctx));
- createSegmentMonitorsPending = true;
- }
- else
- tlk.clear();
- }
- else
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(lastPartNo.partNo);
- assertex(kib != NULL);
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (filechanged)
- {
- tlk.setown(createLocalKeyManager(*keyRecInfo, k, &logctx));
- createSegmentMonitorsPending = true;
- }
- else
- tlk->setKey(k);
- }
- }
- virtual void setVariableFileInfo()
- {
- const CRoxieKeyedActivityFactory &aFactory = *static_cast<const CRoxieKeyedActivityFactory *>(basefactory);
- translators.setown(varFileInfo->getTranslators(aFactory.formatCrc, aFactory.projectedMeta, aFactory.expectedMeta, allowFieldTranslation, true));
- keyArray.setown(varFileInfo->getKeyArray(isOpt, packet->queryHeader().channel));
- }
- void noteStats(unsigned accepted, unsigned rejected)
- {
- // Note that the key-level statistics (seeks, scans, skips etc) are handled inside jhtree
- logctx.noteStatistic(StNumIndexAccepted, accepted);
- logctx.noteStatistic(StNumIndexRejected, rejected);
- logctx.noteStatistic(StNumIndexRowsRead, accepted+rejected);
- }
- CRoxieKeyedActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedActivityFactory *_aFactory)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- keyArray(_aFactory->keyArray),
- translators(_aFactory->translators),
- createSegmentMonitorsPending(true)
- {
- }
- };
- class CRoxieIndexActivity : public CRoxieKeyedActivity
- {
- // Common base class for indexread, indexcount and related activities
- protected:
- PartNoType *inputData; // list of channels
- IHThorIndexReadBaseArg * indexHelper;
- unsigned inputCount;
- unsigned inputsDone;
- unsigned processed;
- unsigned keyprocessed;
- unsigned steppingOffset;
- unsigned steppingLength;
- unsigned short numSkipFields;
- unsigned numSeeks;
- bool seeksAreEof;
- bool lastRowCompleteMatch;
- CIndexTransformCallback callback;
- SmartStepExtra stepExtra; // just used for flags - a little unnecessary...
- const byte *steppingRow;
- bool checkLimit(unsigned __int64 limit)
- {
- assertex(!resent);
- unsigned __int64 result = 0;
- unsigned inputsDone = 0;
- bool ret = true;
- unsigned saveStepping = steppingOffset; // Avoid using a stepping keymerger for the checkCount - they don't support it (and would be inefficient)
- steppingOffset = 0;
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- result += tlk->checkCount(limit-result);
- if (result > limit)
- {
- ret = false;
- break;
- }
- }
- inputsDone++;
- }
- if (saveStepping)
- {
- steppingOffset = saveStepping;
- lastPartNo.partNo = 0xffff;
- lastPartNo.fileNo = 0xffff;
- tlk.clear();
- }
- return ret;
- }
- public:
- CRoxieIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
- : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory),
- steppingOffset(_steppingOffset),
- stepExtra(SSEFreadAhead, NULL)
- {
- indexHelper = (IHThorIndexReadBaseArg *) basehelper;
- keyRecInfo = &indexHelper->queryDiskRecordSize()->queryRecordAccessor(true);
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
- isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
- inputData = NULL;
- inputCount = 0;
- inputsDone = 0;
- processed = 0;
- keyprocessed = 0;
- numSkipFields = 0;
- lastRowCompleteMatch = true; // default is we only return complete matches....
- seeksAreEof = false;
- steppingLength = 0;
- steppingRow = NULL;
- numSeeks = 0;
- if (packet->getSmartStepInfoLength())
- {
- const byte *smartStepInfoValue = packet->querySmartStepInfoData();
- numSkipFields = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- steppingLength = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- unsigned flags = * (unsigned short *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned short);
- seeksAreEof = * (bool *) smartStepInfoValue;
- smartStepInfoValue += sizeof(bool);
- numSeeks = * (unsigned *) smartStepInfoValue;
- smartStepInfoValue += sizeof(unsigned);
- assertex(numSeeks); // Given that we put the first seek in here to there should always be at least one!
- steppingRow = smartStepInfoValue; // the first of them...
- stepExtra.set(flags, NULL);
- if (logctx.queryTraceLevel() > 10)
- {
- logctx.CTXLOG("%d seek rows provided. mismatch(%d) readahead(%d) onlyfirst(%d)", numSeeks,
- (int)stepExtra.returnMismatches(), (int)stepExtra.readAheadManyResults(), (int)stepExtra.onlyReturnFirstSeekMatch());
- if (logctx.queryTraceLevel() > 15)
- {
- for (unsigned i = 0; i < numSeeks; i++)
- {
- StringBuffer b;
- for (unsigned j = 0; j < steppingLength; j++)
- b.appendf("%02x ", steppingRow[i*steppingLength + j]);
- logctx.CTXLOG("Seek row %d: %s", i+1, b.str());
- }
- }
- }
- }
- else
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("0 seek rows provided.");
- }
- }
- virtual void onCreate()
- {
- CRoxieKeyedActivity::onCreate();
- inputData = (PartNoType *) serializedCreate.readDirect(0);
- inputCount = (serializedCreate.length() - serializedCreate.getPos()) / sizeof(*inputData);
- indexHelper->setCallback(&callback);
- }
- virtual const char *queryDynamicFileName() const
- {
- return indexHelper->getFileName();
- }
- virtual void createSegmentMonitors()
- {
- if (createSegmentMonitorsPending)
- {
- createSegmentMonitorsPending = false;
- tlk->setLayoutTranslator(translators->queryTranslator(lastPartNo.fileNo));
- indexHelper->createSegmentMonitors(tlk);
- tlk->finishSegmentMonitors();
- }
- }
- bool sendContinuation(IMessagePacker * output)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(lastRowCompleteMatch);
- si.append(inputsDone);
- si.append(processed);
- si.append(keyprocessed);
- si.append(lastPartNo.partNo);
- si.append(lastPartNo.fileNo);
- tlk->serializeCursorPos(si);
- if (si.length() <= maxContinuationSize)
- {
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- return true;
- }
- else
- return false;
- }
- void readContinuationInfo()
- {
- resentInfo.read(lastRowCompleteMatch);
- resentInfo.read(inputsDone);
- resentInfo.read(processed);
- resentInfo.read(keyprocessed);
- resentInfo.read(lastPartNo.partNo);
- resentInfo.read(lastPartNo.fileNo);
- setPartNo(true);
- tlk->deserializeCursorPos(resentInfo);
- assertex(resentInfo.remaining() == 0);
- }
- virtual void setPartNo(bool fileChanged)
- {
- // NOTE - may be used by both indexread and normalize...
- if (steppingOffset) // MORE - may be other cases too - eg want output sorted and there are multiple subfiles...
- {
- unsigned i = 0;
- Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
- while (!aborted && i < inputCount)
- {
- PartNoType &part = inputData[i];
- lastPartNo.partNo = part.partNo;
- lastPartNo.fileNo = part.fileNo; // This is a hack so that the translator can be retrieved. We don't support record translation properly when doing keymerging...)
- // MORE - this code looks like it could be commoned up with code in CRoxieKeyedActivity::setPartNo
- if (!lastPartNo.partNo)
- {
- for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
- if (kib)
- {
- IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
- if (k)
- {
- assertex(!k->isTopLevelKey());
- allKeys->addIndex(LINK(k));
- }
- }
- }
- }
- else
- {
- IKeyIndexBase *kib = keyArray->queryKeyPart(part.partNo);
- assertex(kib != NULL);
- IKeyIndex *k = kib->queryPart(part.fileNo);
- allKeys->addIndex(LINK(k));
- }
- i++;
- }
- if (allKeys->numParts())
- tlk.setown(::createKeyMerger(*keyRecInfo, allKeys, steppingOffset, &logctx));
- else
- tlk.clear();
- createSegmentMonitorsPending = true;
- }
- else
- CRoxieKeyedActivity::setPartNo(fileChanged);
- }
- };
- //================================================================================================
- class CRoxieIndexReadActivity : public CRoxieIndexActivity, implements IIndexReadActivityInfo
- {
- protected:
- IHThorCompoundReadExtra * readHelper;
- public:
- CRoxieIndexReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, _steppingOffset)
- {
- onCreate();
- readHelper = (IHThorIndexReadArg *) basehelper;
- if (resent)
- readContinuationInfo();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexRead %u", packet->queryHeader().activityId);
- }
- /* Notes on Global smart stepping implementation:
- When smart stepping, I get from the Roxie server:
- 1 or more seek positions
- some flags
-
- I can read from the index in an order that matches the seek positions order (because of index merger), and which match my hard filter (they may not match my postfilter)
- I can skip forwards in the index to the first record GE a skip position
- I am not going to try to implement a (possible future) flag to return mismatches after any but the last of the seek positions (yet)
- I want to return M matching (keyed matches, seek field matches, and postfilter matches) rows for all provided seek positions
- where M is 1 if SSEFonlyReturnFirstSeekMatch flag set, otherwise all...
- THEN once we are beyond the last provided seek:
- if returnMismatches flag set,
- current row = (next row matching keyed filter)
- if someNewAsYetUnnamedAndUnimplementedFlag flag set
- //if the post filter matches then there may be scope for returning all records which match the seek fields of that first match
- // But not very likely to help unless terms correlated, and may well hinder
- return current row and all following rows matching keyed filter, seek data from current row, and postfilter
- return next row matching keyed filter (even if doesn't match postfilter)
- else
- return next N rows matching keyed filter and postfilter (where N depends on readAheadManyResults flag - 1 if not set)
- */
- inline void advanceToNextSeek()
- {
- assertex(numSeeks != 0);
- if (--numSeeks)
- steppingRow += steppingLength;
- else
- steppingRow = NULL;
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexReadActivity ::process");
- unsigned __int64 keyedLimit = readHelper->getKeyedLimit();
- unsigned __int64 limit = readHelper->getRowLimit();
- if (!resent && (keyedLimit != (unsigned __int64) -1) && ((keyedLimit > preabortIndexReadsThreshold) || (indexHelper->getFlags() & TIRcountkeyedlimit) != 0)) // Don't recheck the limit every time!
- {
- if (!checkLimit(keyedLimit))
- {
- limitExceeded(true);
- return NULL;
- }
- }
- unsigned __int64 stopAfter = readHelper->getChooseNLimit();
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned totalSizeSent = 0;
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- unsigned keyprocessedBefore = keyprocessed;
- bool continuationFailed = false;
- const byte *rawSeek = NULL;
- if (steppingRow)
- rawSeek = steppingRow;
- bool continuationNeeded = false;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- while (!aborted && inputsDone < inputCount)
- {
- if (!resent || !steppingOffset) // Bit of a hack... In the resent case, we have already set up the tlk, and all keys are processed at once in the steppingOffset case (which makes checkPartChanged gives a false positive in this case)
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(resent);
- resent = false;
- {
- TransformCallbackAssociation associate(callback, tlk); // want to destroy this before we advance to next key...
- while (!aborted && (rawSeek ? tlk->lookupSkip(rawSeek, steppingOffset, steppingLength) : tlk->lookup(true)))
- {
- rawSeek = NULL; // only want to do the seek first time we look for a particular seek value
- keyprocessed++;
- if ((keyedLimit != (unsigned __int64) -1) && keyprocessed > keyedLimit)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- limitExceeded(true);
- break;
- }
- indexRecordsRead++;
- size32_t transformedSize;
- const byte * keyRow = tlk->queryKeyBuffer();
- int diff = 0;
- if (steppingRow)
- {
- diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
- assertex(diff >= 0);
- }
- while (diff > 0)
- {
- advanceToNextSeek();
- if (!steppingRow)
- break;
- diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
- if (diff < 0)
- {
- rawSeek = steppingRow;
- break;
- }
- }
- if (diff >= 0)
- {
- if (diff > 0 && seeksAreEof)
- {
- assertex(!steppingRow);
- break;
- }
- rowBuilder.ensureRow();
- transformedSize = readHelper->transform(rowBuilder, keyRow);
- callback.finishedRow();
- if (transformedSize)
- {
- if (logctx.queryTraceLevel() > 15)
- {
- StringBuffer b;
- for (unsigned j = 0; j < (steppingLength ? steppingLength : 6); j++)
- b.appendf("%02x ", keyRow[steppingOffset + j]);
- logctx.CTXLOG("Returning seek row %s", b.str());
- }
- // Did get a match
- processed++;
- if (limit && processed > limit)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- limitExceeded(false);
- break;
- }
- if (processed > stopAfter)
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- return output.getClear();
- }
- rowBuilder.writeToOutput(transformedSize, true);
- totalSizeSent += transformedSize;
- if (totalSizeSent > indexReadChunkSize || (steppingOffset && !steppingRow && !stepExtra.readAheadManyResults()))
- continuationNeeded = true;
- lastRowCompleteMatch = true;
- if (steppingRow && stepExtra.onlyReturnFirstSeekMatch())
- advanceToNextSeek();
- }
- else
- {
- // Didn't get a match
- if (steppingOffset && !steppingRow && stepExtra.returnMismatches())
- {
- transformedSize = readHelper->unfilteredTransform(rowBuilder, keyRow);
- if (transformedSize) // will only be zero in odd situations where codegen can't work out how to transform (eg because of a skip)
- {
- callback.finishedRow();
- rowBuilder.writeToOutput(transformedSize, true);
- totalSizeSent += transformedSize;
- continuationNeeded = true;
- lastRowCompleteMatch = false;
- }
- }
- else
- {
- postFiltered++;
- skipped++;
- }
- }
- }
- if (continuationNeeded && !continuationFailed)
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("Indexread returning partial result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
- if (sendContinuation(output))
- {
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- return output.getClear();
- }
- else
- {
- // This is actually pretty fatal for smart-stepping case
- if (logctx.queryTraceLevel())
- logctx.CTXLOG("Indexread unable to return partial result set");
- continuationFailed = true;
- }
- }
- rowBuilder.clear();
- }
- }
- }
- if (steppingOffset)
- inputsDone = inputCount;
- else
- inputsDone++;
- }
- if (tlk) // a very early abort can mean it is NULL.... MORE is this the right place to put it or should it be inside the loop??
- {
- if (logctx.queryTraceLevel() > 10 && !aborted)
- {
- logctx.CTXLOG("Indexread returning result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
- if (steppingOffset)
- logctx.CTXLOG("Indexread return: steppingOffset %d, steppingRow %p, stepExtra.returnMismatches() %d",steppingOffset, steppingRow, (int) stepExtra.returnMismatches());
- }
- noteStats(keyprocessed-keyprocessedBefore, skipped);
- }
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- virtual IIndexReadActivityInfo *queryIndexReadActivity()
- {
- return this;
- }
- virtual IKeyArray *getKeySet() const
- {
- return keyArray.getLink();
- }
- virtual const IResolvedFile *getVarFileInfo() const
- {
- return varFileInfo.getLink();
- }
- virtual ITranslatorSet *getTranslators() const override
- {
- return translators.getLink();
- }
- virtual void mergeSegmentMonitors(IIndexReadContext *irc) const
- {
- indexHelper->createSegmentMonitors(irc); // NOTE: they will merge;
- }
- virtual IRoxieServerActivity *queryActivity() { throwUnexpected(); }
- virtual const RemoteActivityId &queryRemoteId() const { throwUnexpected(); }
- };
- //================================================================================================
- class CRoxieIndexReadActivityFactory : public CRoxieIndexActivityFactory
- {
- unsigned steppingOffset;
- public:
- CRoxieIndexReadActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexReadArg> helper = (IHThorIndexReadArg *) helperFactory();
- init(helper, graphNode);
- ISteppingMeta *rawMeta = helper->queryRawSteppingMeta();
- if (rawMeta)
- {
- // MORE - should check all keys in maxFields list can actually be keyed.
- const CFieldOffsetSize * fields = rawMeta->queryFields();
- steppingOffset = fields[0].offset;
- }
- else
- {
- steppingOffset = 0;
- }
- }
-
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexReadActivity(logctx, packet, helperFactory, this, steppingOffset);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXREAD "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- //MORE: Very similar to the indexRead code, but I'm not sure it's worth commoning up....
- class CRoxieIndexNormalizeActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorCompoundNormalizeExtra * normalizeHelper;
- public:
- CRoxieIndexNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0) //MORE - stepping?
- {
- onCreate();
- normalizeHelper = (IHThorIndexNormalizeArg *) basehelper;
- if (resent)
- readContinuationInfo();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexNormalize %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexNormalizeActivity ::process");
- unsigned __int64 keyedLimit = normalizeHelper->getKeyedLimit();
- unsigned __int64 rowLimit = normalizeHelper->getRowLimit();
- if (!resent && (keyedLimit != (unsigned __int64) -1) && (indexHelper->getFlags() & TIRcountkeyedlimit) != 0) // Don't recheck the limit every time!
- {
- if (!checkLimit(keyedLimit))
- {
- limitExceeded(true);
- return NULL;
- }
- }
- unsigned __int64 stopAfter = normalizeHelper->getChooseNLimit();
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- unsigned totalSizeSent = 0;
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- bool continuationFailed = false;
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(resent);
- resent = false;
- TransformCallbackAssociation associate(callback, tlk);
- while (!aborted && tlk->lookup(true))
- {
- keyprocessed++;
- if (keyedLimit && processed > keyedLimit)
- {
- noteStats(processed-processedBefore, skipped);
- limitExceeded(true);
- break;
- }
- indexRecordsRead++;
- if (normalizeHelper->first(tlk->queryKeyBuffer()))
- {
- do
- {
- rowBuilder.ensureRow();
- size32_t transformedSize = normalizeHelper->transform(rowBuilder);
- if (transformedSize)
- {
- processed++;
- if (processed > rowLimit)
- {
- limitExceeded(false);
- break;
- }
- if (processed > stopAfter)
- {
- noteStats(processed-processedBefore, skipped);
- return output.getClear();
- }
- totalSizeSent += rowBuilder.writeToOutput(transformedSize, true);
- }
- } while (normalizeHelper->next());
- callback.finishedRow();
- if (totalSizeSent > indexReadChunkSize && !continuationFailed)
- {
- if (sendContinuation(output))
- {
- noteStats(processed-processedBefore, skipped);
- return output.getClear();
- }
- else
- continuationFailed = true;
- }
- }
- else
- {
- postFiltered++;
- skipped++;
- }
- }
- }
- inputsDone++;
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexNormalizeActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexNormalizeActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexNormalizeArg> helper = (IHThorIndexNormalizeArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexNormalizeActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("IndexNormalize "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexCountActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorIndexCountArg * countHelper;
- unsigned __int64 choosenLimit;
- unsigned __int64 rowLimit;
- unsigned __int64 keyedLimit;
- public:
- CRoxieIndexCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
- {
- onCreate();
- countHelper = (IHThorIndexCountArg *) basehelper;
- assertex(!resent);
- choosenLimit = countHelper->getChooseNLimit();
- rowLimit = countHelper->getRowLimit();
- keyedLimit = countHelper->getKeyedLimit();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexCount %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexCountActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- unsigned __int64 count = 0;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- while (!aborted && inputsDone < inputCount && count < choosenLimit)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- if (countHelper->hasFilter())
- {
- callback.setManager(tlk);
- while (!aborted && (count < choosenLimit) && tlk->lookup(true))
- {
- keyprocessed++;
- indexRecordsRead++;
- count += countHelper->numValid(tlk->queryKeyBuffer());
- if (count > rowLimit)
- limitExceeded(false);
- else if (count > keyedLimit)
- limitExceeded(true);
- callback.finishedRow();
- }
- callback.setManager(NULL);
- }
- else
- {
- //MORE: GH->RKC There should be value in providing a choosenLimit to getCount()
- //MORE: note that tlk->checkCount() is NOT suitable as it makes assumptions about the segmonitors (only checks leading ones)
- count += tlk->getCount();
- if (count > rowLimit)
- limitExceeded(false);
- else if (count > keyedLimit)
- limitExceeded(true); // MORE - is this right?
- }
- }
- inputsDone++;
- }
- if (!aborted && count)
- {
- if (count > choosenLimit)
- count = choosenLimit;
- processed++;
- assertex(!serializer);
- void *recBuffer = output->getBuffer(meta.getFixedSize(), false);
- if (meta.getFixedSize() == 1)
- *(byte *)recBuffer = (byte)count;
- else
- {
- assertex(meta.getFixedSize() == sizeof(unsigned __int64));
- *(unsigned __int64 *)recBuffer = count;
- }
- output->putBuffer(recBuffer, meta.getFixedSize(), false);
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexCountActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexCountActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexCountArg> helper = (IHThorIndexCountArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexCountActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXCOUNT "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexAggregateActivity : public CRoxieIndexActivity
- {
- protected:
- IHThorCompoundAggregateExtra * aggregateHelper;
- public:
- CRoxieIndexAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
- {
- onCreate();
- aggregateHelper = (IHThorIndexAggregateArg *) basehelper;
- assertex(!resent);
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexAggregate %u", packet->queryHeader().activityId);
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexAggregateActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- rowBuilder.ensureRow();
- aggregateHelper->clearAggregate(rowBuilder);
- unsigned skipped = 0;
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- callback.setManager(tlk);
- while (!aborted && tlk->lookup(true))
- {
- keyprocessed++;
- indexRecordsRead++;
- aggregateHelper->processRow(rowBuilder, tlk->queryKeyBuffer());
- callback.finishedRow();
- }
- callback.setManager(NULL);
- }
- inputsDone++;
- }
- if (!aborted && aggregateHelper->processedAnyRows())
- {
- processed++;
- size32_t transformedSize = meta.getRecordSize(rowBuilder.getSelf());
- rowBuilder.writeToOutput(transformedSize, true);
- }
- noteStats(processed-processedBefore, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexAggregateActivityFactory : public CRoxieIndexActivityFactory
- {
- public:
- CRoxieIndexAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorIndexAggregateArg> helper = (IHThorIndexAggregateArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexAggregateActivity(logctx, packet, helperFactory, this);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXAGGREGATE "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieIndexAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieIndexGroupAggregateActivity : public CRoxieIndexActivity, implements IHThorGroupAggregateCallback
- {
- protected:
- IHThorCompoundGroupAggregateExtra * aggregateHelper;
- RowAggregator results;
- unsigned groupSegCount;
- ThorActivityKind kind;
- public:
- IMPLEMENT_IINTERFACE_USING(CRoxieIndexActivity)
- CRoxieIndexGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, ThorActivityKind _kind)
- : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0),
- aggregateHelper((IHThorIndexGroupAggregateArg *) basehelper),
- results(*aggregateHelper, *aggregateHelper), kind(_kind)
- {
- onCreate();
- results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
- assertex(!resent);
- groupSegCount = 0;
- }
- virtual bool needsRowAllocator()
- {
- return true;
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("IndexGroupAggregate %u", packet->queryHeader().activityId);
- }
- virtual void processRow(const void * next)
- {
- results.addRow(next);
- }
- virtual void createSegmentMonitors()
- {
- if (createSegmentMonitorsPending)
- {
- CRoxieIndexActivity::createSegmentMonitors();
- if ((kind==TAKindexgroupcount || kind==TAKindexgroupexists))
- groupSegCount = aggregateHelper->getGroupingMaxField();
- else
- groupSegCount = 0;
- }
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieIndexGroupAggregateActivity ::process");
- Owned<IRowManager> rowManager = roxiemem::createRowManager(0, NULL, logctx, NULL, true); // MORE - should not really use default limits
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned processedBefore = processed;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- try
- {
- while (!aborted && inputsDone < inputCount)
- {
- checkPartChanged(inputData[inputsDone]);
- if (tlk)
- {
- createSegmentMonitors();
- tlk->reset(false);
- callback.setManager(tlk);
- while (!aborted && tlk->lookup(true))
- {
- if (groupSegCount && !translators->queryTranslator(lastPartNo.fileNo))
- {
- AggregateRowBuilder &rowBuilder = results.addRow(tlk->queryKeyBuffer());
- callback.finishedRow();
- if (kind == TAKindexgroupcount)
- {
- unsigned __int64 count = tlk->getCurrentRangeCount(groupSegCount);
- aggregateHelper->processCountGrouping(rowBuilder, count-1);
- }
- if (!tlk->nextRange(groupSegCount))
- break;
- }
- else
- {
- keyprocessed++;
- indexRecordsRead++;
- aggregateHelper->processRow(tlk->queryKeyBuffer(), this);
- callback.finishedRow();
- }
- }
- callback.setManager(NULL);
- }
- inputsDone++;
- }
- if (!aborted)
- {
- for (;;)
- {
- Owned<AggregateRowBuilder> next = results.nextResult();
- if (!next)
- break;
- unsigned rowSize = next->querySize();
- OwnedConstRoxieRow row(next->finalizeRowClear());
- if (serializer)
- {
- serializeRow(output, row);
- }
- else
- {
- void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
- memcpy(recBuffer, row, rowSize);
- output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
- }
- }
- }
- }
- catch (...)
- {
- results.reset(); // kill entries before the rowManager dies.
- throw;
- }
- results.reset();
- noteStats(processed-processedBefore, 0);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- };
- //================================================================================================
- class CRoxieIndexGroupAggregateActivityFactory : public CRoxieIndexActivityFactory
- {
- ThorActivityKind kind;
- public:
- CRoxieIndexGroupAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
- : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), kind(_kind)
- {
- Owned<IHThorIndexGroupAggregateArg> helper = (IHThorIndexGroupAggregateArg *) helperFactory();
- init(helper, graphNode);
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieIndexGroupAggregateActivity(logctx, packet, helperFactory, this, kind);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("INDEXGROUPAGGREGATE "));
- }
- };
- ISlaveActivityFactory *createRoxieIndexGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
- {
- return new CRoxieIndexGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _kind);
- }
- //================================================================================================
- class CRoxieFetchActivityFactory : public CSlaveActivityFactory
- {
- public:
- Owned<ITranslatorSet> translators;
- Owned<IFileIOArray> fileArray;
- CRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorFetchBaseArg> helper = (IHThorFetchBaseArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- OwnedRoxieString fname(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- unsigned formatCrc = getFormatCrc(helper->getDiskFormatCrc());
- translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("FETCH "));
- }
- };
- class CRoxieFetchActivityBase : public CRoxieSlaveActivity
- {
- protected:
- IHThorFetchBaseArg *helper;
- const CRoxieFetchActivityFactory *factory;
- Owned<IFileIO> rawFile;
- Owned<ISerialStream> rawStream;
- offset_t base;
- char *inputData;
- char *inputLimit;
- Linked<ITranslatorSet> translators;
- Linked<IFileIOArray> files;
- const IDynamicTransform *translator = nullptr;
- bool needsRHS;
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData) = 0;
- public:
- CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory), translators(_translators), files(_files)
- {
- helper = (IHThorFetchBaseArg *) basehelper;
- base = 0;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- onCreate();
- inputData = (char *) serializedCreate.readDirect(0);
- inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
- needsRHS = helper->transformNeedsRhs();
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned formatCrc = basefactory->getFormatCrc(helper->getDiskFormatCrc());
- translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false));
- files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("Fetch %u", packet->queryHeader().activityId);
- }
- virtual void setPartNo(bool filechanged);
- };
- IMessagePacker *CRoxieFetchActivityBase::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieFetchActivityBase::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned accepted = 0;
- unsigned rejected = 0;
- unsigned __int64 rowLimit = helper->getRowLimit();
- OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
- while (!aborted && inputData < inputLimit)
- {
- checkPartChanged(*(PartNoType *) inputData);
- inputData += sizeof(PartNoType);
- offset_t rp = *(offset_t *)inputData;
- inputData += sizeof(offset_t);
- unsigned rhsSize;
- if (needsRHS)
- {
- rhsSize = *(unsigned *)inputData;
- inputData += sizeof(unsigned);
- }
- else
- rhsSize = 0;
- offset_t pos;
- if (isLocalFpos(rp))
- pos = getLocalFposOffset(rp);
- else
- pos = rp-base;
- unsigned thisSize = doFetch(rowBuilder, pos, rp, inputData);
- inputData += rhsSize;
- if (thisSize)
- {
- rowBuilder.writeToOutput(thisSize, true);
- accepted++;
- if (accepted > rowLimit)
- {
- logctx.noteStatistic(StNumDiskSeeks, accepted+rejected);
- logctx.noteStatistic(StNumDiskRowsRead, accepted+rejected);
- logctx.noteStatistic(StNumDiskAccepted, accepted);
- logctx.noteStatistic(StNumDiskRejected, rejected);
- limitExceeded();
- return NULL;
- }
- }
- else
- rejected++;
- }
- logctx.noteStatistic(StNumDiskSeeks, accepted+rejected);
- logctx.noteStatistic(StNumDiskRowsRead, accepted+rejected);
- logctx.noteStatistic(StNumDiskAccepted, accepted);
- logctx.noteStatistic(StNumDiskRejected, rejected);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- class CRoxieFetchActivity : public CRoxieFetchActivityBase
- {
- Owned<IEngineRowAllocator> diskAllocator;
- CThorContiguousRowBuffer prefetchSource;
- Owned<ISourceRowPrefetcher> rowPrefetcher;
- public:
- CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files)
- {
- // If we ever supported superfiles this would need to move to setPartNo, and pass proper subfile idx in
- rowPrefetcher.setown(translators->getPrefetcher(0));
- IOutputMetaData *diskMeta = helper->queryProjectedDiskRecordSize();
- diskAllocator.setown(getRowAllocator(diskMeta, basefactory->queryId()));
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- prefetchSource.reset(pos);
- rowPrefetcher->readAhead(prefetchSource);
- const byte *diskRow = prefetchSource.queryRow();
- if (translator)
- {
- MemoryBuffer buf;
- MemoryBufferBuilder aBuilder(buf, 0);
- translator->translate(aBuilder, diskRow);
- // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
- return h->transform(rowBuilder, buf.toByteArray(), inputData, rawpos);
- }
- else
- {
- // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
- return h->transform(rowBuilder, diskRow, inputData, rawpos);
- }
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- prefetchSource.setStream(rawStream);
- }
- };
- IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieFetchActivity(logctx, packet, helperFactory, this, translators, fileArray);
- }
- //------------------------------------------------------------------------------------
- class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
- {
- CSVSplitter csvSplitter;
- size32_t maxRowSize;
- CThorStreamDeserializerSource deserializeSource;
- public:
- CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files,
- unsigned _maxColumns, size32_t _maxRowSize)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files), maxRowSize(_maxRowSize)
- {
- const char * quotes = NULL;
- const char * separators = NULL;
- const char * terminators = NULL;
- const char * escapes = NULL;
- const IResolvedFile *fileInfo = varFileInfo ? varFileInfo : factory->datafile;
- if (fileInfo)
- {
- const IPropertyTree *options = fileInfo->queryProperties();
- if (options)
- {
- quotes = options->queryProp("@csvQuote");
- separators = options->queryProp("@csvSeparate");
- terminators = options->queryProp("@csvTerminate");
- escapes = options->queryProp("@csvEscape");
- }
- }
- IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
- ICsvParameters *csvInfo = h->queryCsvParameters();
- csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators, escapes);
- if (translator)
- UNIMPLEMENTED; // It's not obvious how we would support this.
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
- rawStream->reset(pos);
- size32_t rowSize = 4096; // MORE - make configurable
- for (;;)
- {
- size32_t avail;
- const void *peek = rawStream->peek(rowSize, avail);
- if (csvSplitter.splitLine(avail, (const byte *)peek) < rowSize || avail < rowSize)
- break;
- if (rowSize == maxRowSize)
- throw MakeStringException(0, "File contained a line of length greater than %d bytes.", maxRowSize);
- if (rowSize >= maxRowSize/2)
- rowSize = maxRowSize;
- else
- rowSize += rowSize;
- }
- return h->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), inputData, rawpos);
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- deserializeSource.setStream(rawStream);
- }
- };
- class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSelect
- {
- Owned<IXMLParse> parser;
- Owned<IColumnProvider> lastMatch;
- Owned<IFileIOStream> rawStreamX;
- unsigned streamBufferSize;
- CThorStreamDeserializerSource deserializeSource;
- public:
- IMPLEMENT_IINTERFACE_USING(CRoxieFetchActivityBase)
- CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
- const CRoxieFetchActivityFactory *_aFactory,
- ITranslatorSet *_translators,
- IFileIOArray *_files,
- unsigned _streamBufferSize)
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory, _translators, _files),
- streamBufferSize(_streamBufferSize)
- {
- if (translator)
- UNIMPLEMENTED;
- }
- virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
- {
- rawStreamX->seek(pos, IFSbegin);
- try
- {
- while(!lastMatch)
- if(!parser->next())
- throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
- IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
- unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
- lastMatch.clear();
- parser->reset();
- return thisSize;
- }
- catch (IException *E)
- {
- ::Release(E);
- throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
- }
- }
- virtual void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
- {
- lastMatch.set(&entry);
- }
- virtual void setPartNo(bool filechanged)
- {
- CRoxieFetchActivityBase::setPartNo(filechanged);
- deserializeSource.setStream(rawStream);
- rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
- parser.setown((factory->getKind()==TAKjsonfetch) ? createJSONParse(*rawStreamX, "/", *this) : createXMLParse(*rawStreamX, "/", *this));
- }
- };
- void CRoxieFetchActivityBase::setPartNo(bool filechanged)
- {
- rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
- translator = translators->queryTranslator(0); // MORE - superfiles
- assertex(rawFile != NULL);
- rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
- }
- class CRoxieCSVFetchActivityFactory : public CRoxieFetchActivityFactory
- {
- unsigned maxColumns;
- size32_t maxRowSize;
- public:
- CRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorCsvFetchArg> helper = (IHThorCsvFetchArg*) helperFactory();
- maxColumns = helper->getMaxColumns();
- ICsvParameters *csvInfo = helper->queryCsvParameters();
- assertex(!csvInfo->queryEBCDIC());
- maxRowSize = defaultMaxCsvRowSize * 1024 * 1024;
- IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
- if (workunit)
- maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, maxColumns, maxRowSize);
- }
- };
- class CRoxieXMLFetchActivityFactory : public CRoxieFetchActivityFactory
- {
- public:
- CRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, translators, fileArray, 4096);
- }
- };
- ISlaveActivityFactory *createRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieCSVFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- ISlaveActivityFactory *createRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieXMLFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedJoinIndexActivityFactory : public CRoxieKeyedActivityFactory
- {
- public:
- CRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename|JFindexfromactivity)) != 0);
- formatCrc = getFormatCrc(helper->getIndexFormatCrc());
- projectedMeta = helper->queryProjectedIndexRecordSize();
- expectedMeta = helper->queryIndexRecordSize();
- if (!variableFileName)
- {
- bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
- OwnedRoxieString indexFileName(helper->getIndexFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- translators.setown(datafile->getTranslators(formatCrc, projectedMeta, expectedMeta, queryFactory.queryOptions().enableFieldTranslation, true));
- keyArray.setown(datafile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("KEYEDJOIN INDEX "));
- }
- };
- class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
- {
- IHThorKeyedJoinArg *helper;
- const CRoxieKeyedJoinIndexActivityFactory *factory;
- unsigned inputLength;
- char *inputData;
- Owned<IRoxieSlaveActivity> rootIndexActivity;
- IIndexReadActivityInfo *rootIndex;
- unsigned processed;
- unsigned candidateCount;
- unsigned keepCount;
- unsigned inputDone;
- public:
- CRoxieKeyedJoinIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
- : factory(_aFactory), CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory)
- {
- helper = (IHThorKeyedJoinArg *) basehelper;
- keyRecInfo = &helper->queryIndexRecordSize()->queryRecordAccessor(true);
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename|JFindexfromactivity)) != 0);
- inputDone = 0;
- processed = 0;
- candidateCount = 0;
- keepCount = 0;
- rootIndex = NULL;
- onCreate();
- inputData = (char *) serializedCreate.readDirect(0);
- inputLength = (serializedCreate.length() - serializedCreate.getPos());
- if (resent)
- {
- resentInfo.read(inputDone);
- inputData += inputDone;
- resentInfo.read(processed);
- resentInfo.read(candidateCount);
- resentInfo.read(keepCount);
- resentInfo.read(lastPartNo.partNo);
- resentInfo.read(lastPartNo.fileNo);
- setPartNo(true);
- tlk->deserializeCursorPos(resentInfo);
- assertex(resentInfo.remaining() == 0);
- }
- }
- ~CRoxieKeyedJoinIndexActivity()
- {
- }
- virtual void deserializeExtra(MemoryBuffer &buff)
- {
- if (helper->getJoinFlags() & JFindexfromactivity)
- {
- RemoteActivityId indexActivityId(buff);
- assertex(indexActivityId.activityId);
- unsigned indexCtxLen;
- buff.read(indexCtxLen);
- const void *indexCtx = buff.readDirect(indexCtxLen);
- //We create a packet for the index activity to use. Header, trace info and parentextract are clone of mine, context info is copied from buff
- MemoryBuffer indexPacketData;
- indexPacketData.append(sizeof(RoxiePacketHeader), &packet->queryHeader());
- indexPacketData.append(packet->getTraceLength(), packet->queryTraceInfo());
- const byte *parentExtract = (const byte *) packet->queryContextData();
- unsigned parentExtractLen = *(unsigned*) parentExtract;
- indexPacketData.append(parentExtractLen);
- indexPacketData.append(parentExtractLen, parentExtract + sizeof(unsigned));
- indexPacketData.append(indexCtxLen, indexCtx);
- RoxiePacketHeader *newHeader = (RoxiePacketHeader *) indexPacketData.toByteArray();
- newHeader->continueSequence = 0;
- newHeader->activityId = indexActivityId.activityId;
- newHeader->queryHash = indexActivityId.queryHash;
- Owned<IRoxieQueryPacket> indexPacket = createRoxiePacket(indexPacketData);
- Owned<ISlaveActivityFactory> indexActivityFactory = factory->queryQueryFactory().getSlaveActivityFactory(indexActivityId.activityId);
- assertex(indexActivityFactory != NULL);
- rootIndexActivity.setown(indexActivityFactory->createActivity(logctx, indexPacket));
- rootIndex = rootIndexActivity->queryIndexReadActivity();
-
- varFileInfo.setown(rootIndex->getVarFileInfo());
- translators.setown(rootIndex->getTranslators());
- keyArray.setown(rootIndex->getKeySet());
- }
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getIndexFileName();
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("KeyedJoinIndex %u", packet->queryHeader().activityId);
- }
- virtual void createSegmentMonitors()
- {
- // This is called to create the segmonitors that apply to ALL rows - not the per-row ones
- // At present there are none. However we should still set up the layout translation.
- if (createSegmentMonitorsPending)
- {
- createSegmentMonitorsPending = false;
- tlk->setLayoutTranslator(translators->queryTranslator(lastPartNo.fileNo));
- }
- }
- };
- IRoxieSlaveActivity *CRoxieKeyedJoinIndexActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieKeyedJoinIndexActivity(logctx, packet, helperFactory, this);
- }
- IMessagePacker *CRoxieKeyedJoinIndexActivity::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieKeyedJoinIndexActivity::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- CachedOutputMetaData joinFieldsMeta(helper->queryJoinFieldsRecordSize());
- Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
- OptimizedKJRowBuilder rowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned atmost = helper->getJoinLimit();
- if (!atmost) atmost = (unsigned) -1;
- unsigned abortLimit = helper->getMatchAbortLimit();
- if (!abortLimit) abortLimit = (unsigned) -1;
- if (abortLimit < atmost)
- atmost = abortLimit;
- unsigned keepLimit = helper->getKeepLimit();
- unsigned joinFlags = helper->getJoinFlags();
- if (joinFlags & (JFtransformMaySkip | JFfetchMayFilter))
- keepLimit = 0;
- if ((joinFlags & (JFexclude|JFleftouter)) == (JFexclude|JFleftouter) && (!(joinFlags & JFfetchMayFilter))) // For left-only joins, all we care about is existence of a match. Return as soon as we know that there is one
- keepLimit = 1;
- unsigned processedBefore = processed;
- unsigned rejected = 0;
- CachedOutputMetaData inputFields(helper->queryIndexReadInputRecordSize());
- unsigned inputSize = inputFields.getFixedSize();
- unsigned totalSizeSent = 0;
- // Now go fetch the records
- bool continuationFailed = false;
- while (!aborted && inputDone < inputLength)
- {
- checkPartChanged(*(PartNoType *) inputData);
- CJoinGroup *jg = *(CJoinGroup **) (inputData + sizeof(PartNoType)); // NOTE - this is a pointer in Roxie server's address space - don't go following it!
- char *inputRow = inputData+sizeof(PartNoType)+sizeof(const void *);
- if (inputFields.isVariableSize())
- {
- inputSize = *(unsigned *)inputRow;
- inputRow += sizeof(unsigned);
- }
- if (tlk)
- {
- createSegmentMonitors();
- helper->createSegmentMonitors(tlk, inputRow);
- if (rootIndex)
- rootIndex->mergeSegmentMonitors(tlk);
- tlk->finishSegmentMonitors();
- if (logctx.queryTraceLevel() >= 20)
- {
- StringBuffer out;
- printKeyedValues(out, tlk, helper->queryIndexRecordSize());
- logctx.CTXLOG("Using filter %s", out.str());
- }
- if (!resent && (atmost != (unsigned) -1) && ((atmost > preabortKeyedJoinsThreshold) || (joinFlags & JFcountmatchabortlimit) || (keepLimit != 0)))
- {
- unsigned __int64 precount = tlk->checkCount(atmost);
- if (precount > atmost)
- {
- candidateCount = atmost+1;
- if (logctx.queryTraceLevel() > 5)
- logctx.CTXLOG("Pre-aborting since candidate count is at least %" I64F "d", precount);
- }
- else
- {
- if (logctx.queryTraceLevel() > 10)
- logctx.CTXLOG("NOT Pre-aborting since candidate count is %" I64F "d", precount);
- tlk->reset(false);
- }
- }
- else
- tlk->reset(resent);
- resent = false;
- ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
- ScopedAtomic<unsigned> postFiltered(::postFiltered);
- while (candidateCount <= atmost)
- {
- if (tlk->lookup(true))
- {
- candidateCount++;
- indexRecordsRead++;
- KLBlobProviderAdapter adapter(tlk);
- const byte *indexRow = tlk->queryKeyBuffer();
- size_t fposOffset = tlk->queryRowSize() - sizeof(offset_t);
- offset_t fpos = rtlReadBigUInt8(indexRow + fposOffset);
- if (helper->indexReadMatch(inputRow, indexRow, &adapter))
- {
- processed++;
- if (keepLimit)
- {
- keepCount++;
- if (keepCount > keepLimit)
- break;
- }
- if (processed > rowLimit)
- {
- if (logctx.queryTraceLevel() > 1)
- {
- StringBuffer s;
- logctx.CTXLOG("limit exceeded for %s", packet->queryHeader().toString(s).str());
- }
- noteStats(processed-processedBefore, rejected);
- limitExceeded();
- return NULL;
- }
- unsigned totalSize = 0;
- if (helper->diskAccessRequired())
- {
- const void *self = output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- KeyedJoinHeader *rec = (KeyedJoinHeader *) self;
- rec->fpos = fpos;
- rec->thisGroup = jg;
- rec->partNo = lastPartNo.partNo;
- output->putBuffer(self, KEYEDJOIN_RECORD_SIZE(0), true);
- }
- else
- {
- KLBlobProviderAdapter adapter(tlk);
- totalSize = helper->extractJoinFields(rowBuilder, indexRow, &adapter);
- rowBuilder.writeToOutput(totalSize, fpos, jg, lastPartNo.partNo);
- }
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(totalSize);
- if (totalSizeSent > indexReadChunkSize && !continuationFailed)
- {
- MemoryBuffer si;
- unsigned short siLen = 0;
- si.append(siLen);
- si.append(inputDone);
- si.append(processed);
- si.append(candidateCount);
- si.append(keepCount);
- si.append(lastPartNo.partNo);
- si.append(lastPartNo.fileNo);
- tlk->serializeCursorPos(si);
- if (si.length() <= maxContinuationSize)
- {
- siLen = si.length() - sizeof(siLen);
- si.writeDirect(0, sizeof(siLen), &siLen);
- output->sendMetaInfo(si.toByteArray(), si.length());
- noteStats(processed-processedBefore, rejected);
- return output.getClear();
- }
- else
- continuationFailed = true;
- }
- }
- else
- {
- rejected++;
- postFiltered++;
- }
- }
- else
- break;
- }
- tlk->releaseSegmentMonitors();
- }
- // output an end marker for the matches to this group
- KeyedJoinHeader *rec = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- rec->fpos = candidateCount;
- rec->thisGroup = jg;
- rec->partNo = (unsigned short) -1;
- output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(0), true);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(0); // note - don't interrupt here though - too complicated.
- candidateCount = 0;
- keepCount = 0;
- inputData = inputRow + inputSize;
- inputDone += sizeof(PartNoType) + sizeof(const void *);
- if (inputFields.isVariableSize())
- inputDone += sizeof(unsigned);
- inputDone += inputSize;
- }
- noteStats(processed-processedBefore, rejected);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- //================================================================================================
- ISlaveActivityFactory *createRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieKeyedJoinIndexActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieKeyedJoinFetchActivityFactory : public CSlaveActivityFactory
- {
- public:
- Owned<ITranslatorSet> translators;
- Owned<IFileIOArray> files;
- CRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
- {
- Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
- assertex(helper->diskAccessRequired());
- bool variableFileName = allFilesDynamic || queryFactory.isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- if (!variableFileName)
- {
- bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
- OwnedRoxieString fileName(helper->getFileName());
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- unsigned formatCrc = getFormatCrc(helper->getDiskFormatCrc());
- translators.setown(datafile->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), getEnableFieldTranslation(), false));
- files.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("KEYEDJOIN FETCH "));
- }
- };
- class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
- {
- IHThorKeyedJoinArg *helper;
- Owned<IFileIO> rawFile;
- offset_t base;
- const char *inputLimit;
- const char *inputData;
- Linked<ITranslatorSet> translators;
- Linked<IFileIOArray> files;
- Owned<ISerialStream> rawStream;
- CThorContiguousRowBuffer prefetchSource;
- Owned<ISourceRowPrefetcher> prefetcher;
- const IDynamicTransform *translator = nullptr;
- virtual void setPartNo(bool filechanged)
- {
- rawFile.setown(files->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
- translator = translators->queryTranslator(0); // MORE - superfiles
- rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
- prefetcher.setown(translators->getPrefetcher(0));
- prefetchSource.setStream(rawStream);
- }
- public:
- CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory,
- IFileIOArray *_files, ITranslatorSet *_translators)
- : files(_files),
- translators(_translators),
- CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory)
- {
- // MORE - no continuation row support?
- base = 0;
- helper = (IHThorKeyedJoinArg *) basehelper;
- variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
- onCreate();
- inputData = (const char *) serializedCreate.readDirect(0);
- inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
- }
- ~CRoxieKeyedJoinFetchActivity()
- {
- }
- virtual const char *queryDynamicFileName() const
- {
- return helper->getFileName();
- }
- virtual void setVariableFileInfo()
- {
- unsigned formatCrc = basefactory->getFormatCrc(helper->getDiskFormatCrc());
- translators.setown(varFileInfo->getTranslators(formatCrc, helper->queryProjectedDiskRecordSize(), helper->queryDiskRecordSize(), basefactory->getEnableFieldTranslation(), false));
- files.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
- }
- virtual IMessagePacker *process();
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("KeyedJoinFetch %u", packet->queryHeader().activityId);
- }
- };
- IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieKeyedJoinFetchActivity::process");
- // MORE - where we are returning everything there is an optimization or two to be had
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned processed = 0;
- unsigned skipped = 0;
- unsigned __int64 rowLimit = helper->getRowLimit();
- unsigned totalSizeSent = 0;
- CachedOutputMetaData joinFieldsMeta(helper->queryJoinFieldsRecordSize());
- Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
- OptimizedKJRowBuilder jfRowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
- CachedOutputMetaData inputFields(helper->queryFetchInputRecordSize());
- size32_t inputSize = inputFields.getFixedSize();
- while (!aborted && inputData < inputLimit)
- {
- checkPartChanged(*(PartNoType *) inputData);
- inputData += sizeof(PartNoType);
- offset_t rp;
- memcpy(&rp, inputData, sizeof(rp));
- offset_t pos;
- if (isLocalFpos(rp))
- pos = getLocalFposOffset(rp);
- else
- pos = rp-base;
- prefetchSource.reset(pos);
- prefetcher->readAhead(prefetchSource);
- const byte *rawRHS = prefetchSource.queryRow();
- MemoryBuffer buf;
- if (translator)
- {
- MemoryBufferBuilder aBuilder(buf, 0);
- translator->translate(aBuilder, rawRHS);
- rawRHS = (const byte *) buf.toByteArray();
- }
- const KeyedJoinHeader *headerPtr = (KeyedJoinHeader *) inputData;
- inputData = &headerPtr->rhsdata[0];
- if (inputFields.isVariableSize())
- {
- memcpy(&inputSize, inputData, sizeof(inputSize));
- inputData += sizeof(inputSize);
- }
- if (helper->fetchMatch(inputData, rawRHS))
- {
- unsigned thisSize = helper->extractJoinFields(jfRowBuilder, rawRHS, (IBlobProvider*)NULL);
- jfRowBuilder.writeToOutput(thisSize, headerPtr->fpos, headerPtr->thisGroup, headerPtr->partNo);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(thisSize);
- processed++;
- if (processed > rowLimit)
- {
- logctx.noteStatistic(StNumDiskSeeks, processed+skipped);
- logctx.noteStatistic(StNumDiskRowsRead, processed+skipped);
- logctx.noteStatistic(StNumDiskAccepted, processed);
- logctx.noteStatistic(StNumDiskRejected, skipped);
- limitExceeded();
- return NULL;
- }
- }
- else
- {
- skipped++;
- KeyedJoinHeader *out = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
- out->fpos = 0;
- out->thisGroup = headerPtr->thisGroup;
- out->partNo = (unsigned short) -1;
- output->putBuffer(out, KEYEDJOIN_RECORD_SIZE(0), true);
- totalSizeSent += KEYEDJOIN_RECORD_SIZE(0);
- }
- inputData += inputSize;
- }
- logctx.noteStatistic(StNumDiskSeeks, processed+skipped);
- logctx.noteStatistic(StNumDiskRowsRead, processed+skipped);
- logctx.noteStatistic(StNumDiskAccepted, processed);
- logctx.noteStatistic(StNumDiskRejected, skipped);
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this, files, translators);
- }
- ISlaveActivityFactory *createRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
- {
- return new CRoxieKeyedJoinFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
- }
- //================================================================================================
- class CRoxieRemoteActivity : public CRoxieSlaveActivity
- {
- protected:
- IHThorRemoteArg * remoteHelper;
- unsigned processed;
- unsigned remoteId;
- public:
- CRoxieRemoteActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory, unsigned _remoteId)
- : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
- remoteId(_remoteId)
- {
- remoteHelper = (IHThorRemoteArg *) basehelper;
- processed = 0;
- onCreate();
- }
- virtual StringBuffer &toString(StringBuffer &ret) const
- {
- return ret.appendf("Remote %u", packet->queryHeader().activityId);
- }
- virtual const char *queryDynamicFileName() const
- {
- throwUnexpected();
- }
- virtual void setVariableFileInfo()
- {
- throwUnexpected();
- }
- virtual IMessagePacker *process()
- {
- MTIME_SECTION(queryActiveTimer(), "CRoxieRemoteActivity ::process");
- Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
- unsigned __int64 rowLimit = remoteHelper->getRowLimit();
- rtlRowBuilder remoteExtractBuilder;
- remoteHelper->createParentExtract(remoteExtractBuilder);
- Linked<IActivityGraph> remoteQuery = queryContext->queryChildGraph(remoteId);
- Linked<IRoxieServerChildGraph> remoteGraph = remoteQuery->queryLoopGraph();
- try
- {
- remoteGraph->beforeExecute();
- Owned<IFinalRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
- Owned<IStrandJunction> junction;
- IEngineRowStream *stream = connectSingleStream(queryContext, input, 0, junction, 0);
- while (!aborted)
- {
- const void * next = stream->ungroupedNextRow();
- if (!next)
- break;
- size32_t nextSize = meta.getRecordSize(next);
- //MORE - what about grouping?
- processed++;
- if (processed > rowLimit)
- {
- ReleaseRoxieRow(next);
- limitExceeded();
- break;
- }
- if (serializer)
- serializeRow(output, next);
- else
- {
- void * recBuffer = output->getBuffer(nextSize, meta.isVariableSize());
- memcpy(recBuffer, next, nextSize);
- output->putBuffer(recBuffer, nextSize, meta.isVariableSize());
- }
- ReleaseRoxieRow(next);
- }
- remoteGraph->afterExecute();
- }
- catch (IException *E)
- {
- remoteGraph->afterExecute();
- if (aborted)
- ::Release(E);
- else
- throw;
- }
- catch (...)
- {
- remoteGraph->afterExecute();
- throw;
- }
- if (aborted)
- return NULL;
- else
- return output.getClear();
- }
- virtual void setPartNo(bool filechanged)
- {
- throwUnexpected();
- }
- };
- //================================================================================================
- class CRoxieRemoteActivityFactory : public CSlaveActivityFactory
- {
- unsigned remoteId;
- public:
- CRoxieRemoteActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
- : CSlaveActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), remoteId(_remoteId)
- {
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- return new CRoxieRemoteActivity(logctx, packet, helperFactory, this, remoteId);
- }
- virtual StringBuffer &toString(StringBuffer &s) const
- {
- return CSlaveActivityFactory::toString(s.append("Remote "));
- }
- };
- ISlaveActivityFactory *createRoxieRemoteActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
- {
- return new CRoxieRemoteActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _remoteId);
- }
- //================================================================================================
- class CRoxieDummyActivityFactory : public CSlaveActivityFactory // not a real activity - just used to properly link files
- {
- protected:
- Owned<const IResolvedFile> indexfile;
- Owned<IKeyArray> keyArray;
- Owned<IFileIOArray> fileArray;
- public:
- CRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
- : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, NULL)
- {
- if (_graphNode.getPropBool("att[@name='_isSpill']/@value", false) || _graphNode.getPropBool("att[@name='_isSpillGlobal']/@value", false))
- return; // ignore 'spills'
- try // operations does not want any missing file errors to be fatal, or throw traps - just log it
- {
- ThorActivityKind kind = getActivityKind(_graphNode);
- if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
- {
- const char *fileName = queryNodeFileName(_graphNode, kind);
- const char *indexName = queryNodeIndexName(_graphNode, kind);
- if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
- {
- bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
- indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (indexfile)
- keyArray.setown(indexfile->getKeyArray(isOpt, queryFactory.queryChannel()));
- }
- if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
- {
- bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
- datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
- if (datafile)
- {
- fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
- }
- }
- }
- }
- catch(IException *E)
- {
- StringBuffer errors;
- E->errorMessage(errors);
- DBGLOG("%s File error = %s", (isLoadDataOnly) ? "LOADDATAONLY" : "SUSPENDED QUERY", errors.str());
- E->Release();
- }
- }
- virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
- {
- throwUnexpected(); // don't actually want to create an activity
- }
- };
- //================================================================================================
- ISlaveActivityFactory *createRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
- {
- // MORE - bool isLoadDataOnly may need to be an enum if more than just LOADDATAONLY and suspended queries use this
- return new CRoxieDummyActivityFactory(_graphNode, _subgraphId, _queryFactory, isLoadDataOnly);
- }
|